Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Try to avoid getting stuck when draining the journal #36254

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
177 changes: 150 additions & 27 deletions daemon/logger/journald/read.go
Expand Up @@ -140,7 +140,9 @@ package journald
// /* The close notification pipe was closed. */
// return 0;
// }
// if (sd_journal_process(j) == SD_JOURNAL_APPEND) {
// switch (sd_journal_process(j)) {
// case SD_JOURNAL_APPEND:
// case SD_JOURNAL_INVALIDATE:
// /* Data, which we might care about, was appended. */
// return 1;
// }
Expand Down Expand Up @@ -178,28 +180,102 @@ func (s *journald) drainJournal(logWatcher *logger.LogWatcher, j *C.sd_journal,
var priority, partial C.int
var done bool

// Walk the journal from here forward until we run out of new entries
// or we reach the until value (if provided).
drain:
// Give the journal handle an opportunity to close any open descriptors
// for files that have been removed.
C.sd_journal_process(j)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Err check?


// Seek to the location of the last entry that we sent.
if oldCursor != nil {
// We know which entry was read last, so try to go to that
// location.
rc := C.sd_journal_seek_cursor(j, oldCursor)
if rc != 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Would be nice to stick to one style through this code. Either rc <0 (Preferable) or rc != 0.

return oldCursor, false
}
// Go forward to the first unsent message.
rc = C.sd_journal_next(j)
if rc < 0 {
return oldCursor, false
}
// We want to avoid sending a given entry twice (or more), so
// attempt to advance to the first unread entry in the journal
// so long as "this" one matches the last entry that we read.
for C.sd_journal_test_cursor(j, oldCursor) > 0 {
if C.sd_journal_next(j) <= 0 {
return oldCursor, false
}
}
}

// Walk the journal from here forward until we run out of new entries.
sent := uint64(0)
for {
// Try not to send a given entry twice.
if oldCursor != nil {
for C.sd_journal_test_cursor(j, oldCursor) > 0 {
if C.sd_journal_next(j) <= 0 {
break drain
// If we're not keeping up with journald writing to the journal, some of the
// files between where we are and "now" may have been deleted since we started
// walking the set of entries. If that's happened, the inotify descriptor in
// the journal handle will have pending deletion events after we've been reading
// for a while. Letting the journal library process them will close any that
// are already deleted, so that we'll skip over them and allow space that would
// have been reclaimed by deleting these files to actually be reclaimed.
if sent > 0 && sent%1024 == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Since this is garbage collection logic, it'll be easier to read if we can extract this out into a function.

if status := C.sd_journal_process(j); status < 0 {
cerrstr := C.strerror(C.int(-status))
errstr := C.GoString(cerrstr)
fmtstr := "error %q while attempting to process journal events for container %q"
logrus.Errorf(fmtstr, errstr, s.vars["CONTAINER_ID_FULL"])
// Attempt to rewind the last-read cursor to the
// entry that we last sent.
if status = C.sd_journal_previous(j); status < 0 {
cerrstr := C.strerror(C.int(-status))
errstr := C.GoString(cerrstr)
fmtstr := "error %q while attempting to rewind journal by 1 for container %q"
logrus.Errorf(fmtstr, errstr, s.vars["CONTAINER_ID_FULL"])
}
break
}
}
// If the output channel is full, stop here, so that we don't block indefinitely
// waiting until we can output another message, which won't ever happen if the
// client has already disconnected.
if len(logWatcher.Msg) >= cap(logWatcher.Msg) {
// Attempt to rewind the last-read cursor to the entry
// that we last sent.
if status := C.sd_journal_previous(j); status < 0 {
cerrstr := C.strerror(C.int(-status))
errstr := C.GoString(cerrstr)
fmtstr := "error %q while attempting to rewind journal by 1 for container %q"
logrus.Errorf(fmtstr, errstr, s.vars["CONTAINER_ID_FULL"])
}
break
}
// Read and send the logged message, if there is one to read.
// Read and send the current message, if there is one to read.
i := C.get_message(j, &msg, &length, &partial)
if i != -C.ENOENT && i != -C.EADDRNOTAVAIL {
// Read the entry's timestamp.
if C.sd_journal_get_realtime_usec(j, &stamp) != 0 {
// Attempt to rewind the last-read
// cursor to the entry that we last
// sent.
if status := C.sd_journal_previous(j); status < 0 {
cerrstr := C.strerror(C.int(-status))
errstr := C.GoString(cerrstr)
fmtstr := "error %q while attempting to rewind journal by 1 for container %q"
logrus.Errorf(fmtstr, errstr, s.vars["CONTAINER_ID_FULL"])
}
break
}
// Break if the timestamp exceeds any provided until flag.
if untilUnixMicro != 0 && untilUnixMicro < uint64(stamp) {
done = true
// Attempt to rewind the last-read
// cursor to the entry that we last
// sent.
if status := C.sd_journal_previous(j); status < 0 {
cerrstr := C.strerror(C.int(-status))
errstr := C.GoString(cerrstr)
fmtstr := "error %q while attempting to rewind journal by 1 for container %q"
logrus.Errorf(fmtstr, errstr, s.vars["CONTAINER_ID_FULL"])
}
break
}

Expand Down Expand Up @@ -236,14 +312,20 @@ drain:
Attrs: attrs,
}
}
// If we're at the end of the journal, we're done (for now).
// If we've hit the end of the journal, we're done (for now).
sent++
if C.sd_journal_next(j) <= 0 {
break
}
}

// If we didn't send any entries, just return the same cursor value.
if oldCursor != nil && sent == 0 {
return oldCursor, done
}
// free(NULL) is safe
C.free(unsafe.Pointer(oldCursor))
// Take note of which entry we most recently sent.
if C.sd_journal_get_cursor(j, &cursor) != 0 {
// ensure that we won't be freeing an address that's invalid
cursor = nil
Expand Down Expand Up @@ -323,17 +405,30 @@ func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadCon
close(logWatcher.Msg)
return
}
// The journal library uses an inotify descriptor to notice when
// journal files are removed, but it isn't allocated until our first
// call to sd_journal_get_fd(), which means that it will not notice the
// removal of any files that happens after we open the journal and
// before the first time we try to read that descriptor. Do it now,
// even though we don't need its value just yet, to try to make that
// window smaller.
rc = C.sd_journal_get_fd(j)
if rc < 0 {
logWatcher.Err <- fmt.Errorf("error opening journal inotify descriptor")
close(logWatcher.Msg)
return
}
// If we end up following the log, we can set the journal context
// pointer and the channel pointer to nil so that we won't close them
// here, potentially while the goroutine that uses them is still
// running. Otherwise, close them when we return from this function.
following := false
defer func(pfollowing *bool) {
if !*pfollowing {
defer func() {
if !following {
close(logWatcher.Msg)
}
C.sd_journal_close(j)
}(&following)
}()
// Remove limits on the size of data items that we'll retrieve.
rc = C.sd_journal_set_data_threshold(j, C.size_t(0))
if rc != 0 {
Expand All @@ -357,6 +452,15 @@ func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadCon
if !config.Until.IsZero() {
nano := config.Until.UnixNano()
untilUnixMicro = uint64(nano / 1000)
} else if !config.Follow {
// Get the current time, so that we know when to stop in non-follow mode.
var ts C.struct_timespec
if C.clock_gettime(C.CLOCK_REALTIME, &ts) == 0 {
nano := uint64(ts.tv_sec)*1000000000 + uint64(ts.tv_nsec)
untilUnixMicro = nano / 1000
} else {
untilUnixMicro = 0xffffffffffffffff
}
}
if config.Tail > 0 {
lines := config.Tail
Expand Down Expand Up @@ -412,22 +516,41 @@ func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadCon
}
cursor, _ = s.drainJournal(logWatcher, j, nil, untilUnixMicro)
if config.Follow {
// Allocate a descriptor for following the journal, if we'll
// need one. Do it here so that we can report if it fails.
if fd := C.sd_journal_get_fd(j); fd < C.int(0) {
logWatcher.Err <- fmt.Errorf("error opening journald follow descriptor: %q", C.GoString(C.strerror(-fd)))
// Create a pipe that we can poll at the same time as
// the journald descriptor.
if C.pipe(&pipes[0]) == C.int(-1) {
logWatcher.Err <- fmt.Errorf("error opening journald close notification pipe")
} else {
// Create a pipe that we can poll at the same time as
// the journald descriptor.
if C.pipe(&pipes[0]) == C.int(-1) {
logWatcher.Err <- fmt.Errorf("error opening journald close notification pipe")
} else {
cursor = s.followJournal(logWatcher, j, pipes, cursor, untilUnixMicro)
// Let followJournal handle freeing the journal context
// object and closing the channel.
following = true
cursor = s.followJournal(logWatcher, j, pipes, cursor, untilUnixMicro)
// Let followJournal handle freeing the journal context
// object and closing the channel.
following = true
}
} else {
// In case we stopped reading because the output channel was
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my clarification: Did you mean the "output buffer"?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldnt we be checking the done bool returned by drainJournal in L517 to determine if we are done, rather than ignoring it?

// temporarily full, keep going until we find that we can't
// wind forward to the next entry, which we take as an
// indication that there are no more entries for us to read, or
// until we cross the "until" threshold, or until we get told
// to close the reader, whichever comes first.
duration := 10 * time.Millisecond
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this an arbitrary frequency to drain? If this is purely to prevent the client from indefinite blocking scenarios, then I'm thinking that we can increase the duration.

timer := time.NewTimer(duration)
drainCatchup:
for C.sd_journal_next(j) > 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't there be a difference handling EOF (return value 0) and error cases (negative) here?

var done bool
timer.Stop()
cursor, done = s.drainJournal(logWatcher, j, cursor, untilUnixMicro)
if done {
break
}
timer.Reset(duration)
select {
case <-logWatcher.WatchClose():
break drainCatchup
case <-timer.C:
}
}
timer.Stop()
}

C.free(unsafe.Pointer(cursor))
Expand Down