From c138bfe42b424ed5ce4e0385efb574627034a1be Mon Sep 17 00:00:00 2001 From: Nalin Dahyabhai Date: Wed, 7 Feb 2018 10:33:32 -0500 Subject: [PATCH] Try to avoid getting stuck when draining the journal When draining the journal, if the output message channel is full, stop reading. This keeps us from blocking indefinitely when a client hangs up on us and the channel is full. Doing this requires that we make multiple calls to drainJournal(), even in non-follow mode. In non-follow mode, keep trying to read until we run out of entries or hit the "until" cutoff, which if not specified, we set to "now". If we don't do that, and we start reading but can't keep up with the journal, we'll never catch up even after we get to journal messages which were logged long after we started looking for the then-present logs. The journal API opens the journal files when the journal handle is opened by sd_journal_open(), and uses an inotify descriptor to notice when files have been added or removed, but it doesn't set up that inotify descriptor until the first time that sd_journal_get_fd() is called (either by a client, or as part of sd_journal_wait()). We hadn't been doing that until our initial read-through of entries was done, meaning that we've been missing file deletion events that occurred during our first pass at reading the journal. Make that window shorter. Periodically call sd_journal_process() when reading the journal, to let it skip over and close handles to journal files which have been deleted since we started reading the journal, for cases where our keeping them open contributes to a shortage of disk space. Treat SD_JOURNAL_INVALIDATE like SD_JOURNAL_APPEND, since sd_journal_process() prefers to return INVALIDATE over APPEND when both are applicable. Clean up a deferred function call in the journal reading logic. Signed-off-by: Nalin Dahyabhai --- daemon/logger/journald/read.go | 177 ++++++++++++++++++++++++++++----- 1 file changed, 150 insertions(+), 27 deletions(-) diff --git a/daemon/logger/journald/read.go b/daemon/logger/journald/read.go index 6aff21f441f5d..86f7066105068 100644 --- a/daemon/logger/journald/read.go +++ b/daemon/logger/journald/read.go @@ -140,7 +140,9 @@ package journald // /* The close notification pipe was closed. */ // return 0; // } -// if (sd_journal_process(j) == SD_JOURNAL_APPEND) { +// switch (sd_journal_process(j)) { +// case SD_JOURNAL_APPEND: +// case SD_JOURNAL_INVALIDATE: // /* Data, which we might care about, was appended. */ // return 1; // } @@ -178,28 +180,102 @@ func (s *journald) drainJournal(logWatcher *logger.LogWatcher, j *C.sd_journal, var priority, partial C.int var done bool - // Walk the journal from here forward until we run out of new entries - // or we reach the until value (if provided). -drain: + // Give the journal handle an opportunity to close any open descriptors + // for files that have been removed. + C.sd_journal_process(j) + + // Seek to the location of the last entry that we sent. + if oldCursor != nil { + // We know which entry was read last, so try to go to that + // location. + rc := C.sd_journal_seek_cursor(j, oldCursor) + if rc != 0 { + return oldCursor, false + } + // Go forward to the first unsent message. + rc = C.sd_journal_next(j) + if rc < 0 { + return oldCursor, false + } + // We want to avoid sending a given entry twice (or more), so + // attempt to advance to the first unread entry in the journal + // so long as "this" one matches the last entry that we read. + for C.sd_journal_test_cursor(j, oldCursor) > 0 { + if C.sd_journal_next(j) <= 0 { + return oldCursor, false + } + } + } + + // Walk the journal from here forward until we run out of new entries. + sent := uint64(0) for { - // Try not to send a given entry twice. - if oldCursor != nil { - for C.sd_journal_test_cursor(j, oldCursor) > 0 { - if C.sd_journal_next(j) <= 0 { - break drain + // If we're not keeping up with journald writing to the journal, some of the + // files between where we are and "now" may have been deleted since we started + // walking the set of entries. If that's happened, the inotify descriptor in + // the journal handle will have pending deletion events after we've been reading + // for a while. Letting the journal library process them will close any that + // are already deleted, so that we'll skip over them and allow space that would + // have been reclaimed by deleting these files to actually be reclaimed. + if sent > 0 && sent%1024 == 0 { + if status := C.sd_journal_process(j); status < 0 { + cerrstr := C.strerror(C.int(-status)) + errstr := C.GoString(cerrstr) + fmtstr := "error %q while attempting to process journal events for container %q" + logrus.Errorf(fmtstr, errstr, s.vars["CONTAINER_ID_FULL"]) + // Attempt to rewind the last-read cursor to the + // entry that we last sent. + if status = C.sd_journal_previous(j); status < 0 { + cerrstr := C.strerror(C.int(-status)) + errstr := C.GoString(cerrstr) + fmtstr := "error %q while attempting to rewind journal by 1 for container %q" + logrus.Errorf(fmtstr, errstr, s.vars["CONTAINER_ID_FULL"]) } + break + } + } + // If the output channel is full, stop here, so that we don't block indefinitely + // waiting until we can output another message, which won't ever happen if the + // client has already disconnected. + if len(logWatcher.Msg) >= cap(logWatcher.Msg) { + // Attempt to rewind the last-read cursor to the entry + // that we last sent. + if status := C.sd_journal_previous(j); status < 0 { + cerrstr := C.strerror(C.int(-status)) + errstr := C.GoString(cerrstr) + fmtstr := "error %q while attempting to rewind journal by 1 for container %q" + logrus.Errorf(fmtstr, errstr, s.vars["CONTAINER_ID_FULL"]) } + break } - // Read and send the logged message, if there is one to read. + // Read and send the current message, if there is one to read. i := C.get_message(j, &msg, &length, &partial) if i != -C.ENOENT && i != -C.EADDRNOTAVAIL { // Read the entry's timestamp. if C.sd_journal_get_realtime_usec(j, &stamp) != 0 { + // Attempt to rewind the last-read + // cursor to the entry that we last + // sent. + if status := C.sd_journal_previous(j); status < 0 { + cerrstr := C.strerror(C.int(-status)) + errstr := C.GoString(cerrstr) + fmtstr := "error %q while attempting to rewind journal by 1 for container %q" + logrus.Errorf(fmtstr, errstr, s.vars["CONTAINER_ID_FULL"]) + } break } // Break if the timestamp exceeds any provided until flag. if untilUnixMicro != 0 && untilUnixMicro < uint64(stamp) { done = true + // Attempt to rewind the last-read + // cursor to the entry that we last + // sent. + if status := C.sd_journal_previous(j); status < 0 { + cerrstr := C.strerror(C.int(-status)) + errstr := C.GoString(cerrstr) + fmtstr := "error %q while attempting to rewind journal by 1 for container %q" + logrus.Errorf(fmtstr, errstr, s.vars["CONTAINER_ID_FULL"]) + } break } @@ -236,14 +312,20 @@ drain: Attrs: attrs, } } - // If we're at the end of the journal, we're done (for now). + // If we've hit the end of the journal, we're done (for now). + sent++ if C.sd_journal_next(j) <= 0 { break } } + // If we didn't send any entries, just return the same cursor value. + if oldCursor != nil && sent == 0 { + return oldCursor, done + } // free(NULL) is safe C.free(unsafe.Pointer(oldCursor)) + // Take note of which entry we most recently sent. if C.sd_journal_get_cursor(j, &cursor) != 0 { // ensure that we won't be freeing an address that's invalid cursor = nil @@ -323,17 +405,30 @@ func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadCon close(logWatcher.Msg) return } + // The journal library uses an inotify descriptor to notice when + // journal files are removed, but it isn't allocated until our first + // call to sd_journal_get_fd(), which means that it will not notice the + // removal of any files that happens after we open the journal and + // before the first time we try to read that descriptor. Do it now, + // even though we don't need its value just yet, to try to make that + // window smaller. + rc = C.sd_journal_get_fd(j) + if rc < 0 { + logWatcher.Err <- fmt.Errorf("error opening journal inotify descriptor") + close(logWatcher.Msg) + return + } // If we end up following the log, we can set the journal context // pointer and the channel pointer to nil so that we won't close them // here, potentially while the goroutine that uses them is still // running. Otherwise, close them when we return from this function. following := false - defer func(pfollowing *bool) { - if !*pfollowing { + defer func() { + if !following { close(logWatcher.Msg) } C.sd_journal_close(j) - }(&following) + }() // Remove limits on the size of data items that we'll retrieve. rc = C.sd_journal_set_data_threshold(j, C.size_t(0)) if rc != 0 { @@ -357,6 +452,15 @@ func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadCon if !config.Until.IsZero() { nano := config.Until.UnixNano() untilUnixMicro = uint64(nano / 1000) + } else if !config.Follow { + // Get the current time, so that we know when to stop in non-follow mode. + var ts C.struct_timespec + if C.clock_gettime(C.CLOCK_REALTIME, &ts) == 0 { + nano := uint64(ts.tv_sec)*1000000000 + uint64(ts.tv_nsec) + untilUnixMicro = nano / 1000 + } else { + untilUnixMicro = 0xffffffffffffffff + } } if config.Tail > 0 { lines := config.Tail @@ -412,22 +516,41 @@ func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadCon } cursor, _ = s.drainJournal(logWatcher, j, nil, untilUnixMicro) if config.Follow { - // Allocate a descriptor for following the journal, if we'll - // need one. Do it here so that we can report if it fails. - if fd := C.sd_journal_get_fd(j); fd < C.int(0) { - logWatcher.Err <- fmt.Errorf("error opening journald follow descriptor: %q", C.GoString(C.strerror(-fd))) + // Create a pipe that we can poll at the same time as + // the journald descriptor. + if C.pipe(&pipes[0]) == C.int(-1) { + logWatcher.Err <- fmt.Errorf("error opening journald close notification pipe") } else { - // Create a pipe that we can poll at the same time as - // the journald descriptor. - if C.pipe(&pipes[0]) == C.int(-1) { - logWatcher.Err <- fmt.Errorf("error opening journald close notification pipe") - } else { - cursor = s.followJournal(logWatcher, j, pipes, cursor, untilUnixMicro) - // Let followJournal handle freeing the journal context - // object and closing the channel. - following = true + cursor = s.followJournal(logWatcher, j, pipes, cursor, untilUnixMicro) + // Let followJournal handle freeing the journal context + // object and closing the channel. + following = true + } + } else { + // In case we stopped reading because the output channel was + // temporarily full, keep going until we find that we can't + // wind forward to the next entry, which we take as an + // indication that there are no more entries for us to read, or + // until we cross the "until" threshold, or until we get told + // to close the reader, whichever comes first. + duration := 10 * time.Millisecond + timer := time.NewTimer(duration) + drainCatchup: + for C.sd_journal_next(j) > 0 { + var done bool + timer.Stop() + cursor, done = s.drainJournal(logWatcher, j, cursor, untilUnixMicro) + if done { + break + } + timer.Reset(duration) + select { + case <-logWatcher.WatchClose(): + break drainCatchup + case <-timer.C: } } + timer.Stop() } C.free(unsafe.Pointer(cursor))