From c457a11add38d6ab8fecd348f072d72c170ea282 Mon Sep 17 00:00:00 2001 From: Kir Kolyshkin Date: Mon, 11 Mar 2019 17:20:56 -0700 Subject: [PATCH] journald/read: simplify/fix followJournal() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit TL;DR: simplify the code, fix --follow hanging indefinitely Do the following to simplify the followJournal() code: 1. Use Go-native select instead of C-native polling. 2. Use Watch{Producer,Consumer}Gone(), eliminating the need to have journald.closed variable, and an extra goroutine. 3. Use sd_journal_wait(). In the words of its own man page: > A synchronous alternative for using sd_journal_get_fd(), > sd_journal_get_events(), sd_journal_get_timeout() and > sd_journal_process() is sd_journal_wait(). Unfortunately, the logic is still not as simple as it could be; the reason being, once the container has exited, journald might still be writing some logs from its internal buffers onto journal file(s), and there is no way to figure out whether it's done so we are guaranteed to read all of it back. This bug can be reproduced with something like > $ ID=$(docker run -d busybox seq 1 150000); docker logs --follow $ID > ... > 128123 > $ (The last expected output line should be `150000`). To avoid exiting from followJournal() early, add the following logic: once the container is gone, keep trying to drain the journal until there's no new data for at least `waitTimeout` time period. Should fix https://github.com/docker/for-linux/issues/575 Signed-off-by: Kir Kolyshkin (cherry picked from commit f091febc942859ffbc881f3a3aa327366603ae65) Signed-off-by: Robert Günzler --- daemon/logger/journald/journald.go | 1 - daemon/logger/journald/read.go | 178 ++++++++++------------------- 2 files changed, 59 insertions(+), 120 deletions(-) diff --git a/daemon/logger/journald/journald.go b/daemon/logger/journald/journald.go index 113ed585c9..c569002f7b 100644 --- a/daemon/logger/journald/journald.go +++ b/daemon/logger/journald/journald.go @@ -21,7 +21,6 @@ type journald struct { mu sync.Mutex vars map[string]string // additional variables and values to send to the journal along with the log message readers map[*logger.LogWatcher]struct{} - closed bool } func init() { diff --git a/daemon/logger/journald/read.go b/daemon/logger/journald/read.go index 8aa01c460d..3dfa08b0a4 100644 --- a/daemon/logger/journald/read.go +++ b/daemon/logger/journald/read.go @@ -101,55 +101,10 @@ package journald // import "github.com/docker/docker/daemon/logger/journald" // } // return rc; //} -//static int wait_for_data_cancelable(sd_journal *j, int pipefd) -//{ -// struct pollfd fds[2]; -// uint64_t when = 0; -// int timeout, jevents, i; -// struct timespec ts; -// uint64_t now; -// -// memset(&fds, 0, sizeof(fds)); -// fds[0].fd = pipefd; -// fds[0].events = POLLHUP; -// fds[1].fd = sd_journal_get_fd(j); -// if (fds[1].fd < 0) { -// return fds[1].fd; -// } -// -// do { -// jevents = sd_journal_get_events(j); -// if (jevents < 0) { -// return jevents; -// } -// fds[1].events = jevents; -// sd_journal_get_timeout(j, &when); -// if (when == -1) { -// timeout = -1; -// } else { -// clock_gettime(CLOCK_MONOTONIC, &ts); -// now = (uint64_t) ts.tv_sec * 1000000 + ts.tv_nsec / 1000; -// timeout = when > now ? (int) ((when - now + 999) / 1000) : 0; -// } -// i = poll(fds, 2, timeout); -// if ((i == -1) && (errno != EINTR)) { -// /* An unexpected error. */ -// return (errno != 0) ? -errno : -EINTR; -// } -// if (fds[0].revents & POLLHUP) { -// /* The close notification pipe was closed. */ -// return 0; -// } -// if (sd_journal_process(j) == SD_JOURNAL_APPEND) { -// /* Data, which we might care about, was appended. */ -// return 1; -// } -// } while ((fds[0].revents & POLLHUP) == 0); -// return 0; -//} import "C" import ( + "errors" "fmt" "strings" "time" @@ -158,27 +113,33 @@ import ( "github.com/coreos/go-systemd/journal" "github.com/docker/docker/api/types/backend" "github.com/docker/docker/daemon/logger" - "github.com/sirupsen/logrus" ) func (s *journald) Close() error { s.mu.Lock() - s.closed = true for r := range s.readers { r.ProducerGone() delete(s.readers, r) - } s.mu.Unlock() return nil } -func (s *journald) drainJournal(logWatcher *logger.LogWatcher, j *C.sd_journal, oldCursor *C.char, untilUnixMicro uint64) (*C.char, bool) { - var msg, data, cursor *C.char - var length C.size_t - var stamp C.uint64_t - var priority, partial C.int - var done bool +// convert error code returned from a sd_journal_* function +// (which returns -errno) to a string +func CErr(ret C.int) string { + return C.GoString(C.strerror(C.int(-ret))) +} + +func (s *journald) drainJournal(logWatcher *logger.LogWatcher, j *C.sd_journal, oldCursor *C.char, untilUnixMicro uint64) (*C.char, bool, int) { + var ( + msg, data, cursor *C.char + length C.size_t + stamp C.uint64_t + priority, partial C.int + done bool + shown int + ) // Walk the journal from here forward until we run out of new entries // or we reach the until value (if provided). @@ -230,6 +191,7 @@ drain: kv := strings.SplitN(C.GoStringN(data, C.int(length)), "=", 2) attrs = append(attrs, backend.LogAttr{Key: kv[0], Value: kv[1]}) } + // Send the log message, unless the consumer is gone select { case <-logWatcher.WatchConsumerGone(): @@ -241,6 +203,7 @@ drain: Timestamp: timestamp.In(time.UTC), Attrs: attrs, }: + shown++ } } // If we're at the end of the journal, we're done (for now). @@ -255,73 +218,57 @@ drain: // ensure that we won't be freeing an address that's invalid cursor = nil } - return cursor, done + return cursor, done, shown } -func (s *journald) followJournal(logWatcher *logger.LogWatcher, j *C.sd_journal, pfd [2]C.int, cursor *C.char, untilUnixMicro uint64) *C.char { +func (s *journald) followJournal(logWatcher *logger.LogWatcher, j *C.sd_journal, cursor *C.char, untilUnixMicro uint64) *C.char { s.mu.Lock() s.readers[logWatcher] = struct{}{} - if s.closed { - // the journald Logger is closed, presumably because the container has been - // reset. So we shouldn't follow, because we'll never be woken up. But we - // should make one more drainJournal call to be sure we've got all the logs. - // Close pfd[1] so that one drainJournal happens, then cleanup, then return. - C.close(pfd[1]) - } s.mu.Unlock() - newCursor := make(chan *C.char) + waitTimeout := C.uint64_t(250000) // 0.25s - go func() { - for { - // Keep copying journal data out until we're notified to stop - // or we hit an error. - status := C.wait_for_data_cancelable(j, pfd[0]) - if status < 0 { - cerrstr := C.strerror(C.int(-status)) - errstr := C.GoString(cerrstr) - fmtstr := "error %q while attempting to follow journal for container %q" - logrus.Errorf(fmtstr, errstr, s.vars["CONTAINER_ID_FULL"]) - break - } - - var done bool - cursor, done = s.drainJournal(logWatcher, j, cursor, untilUnixMicro) - - if status != 1 || done { - // We were notified to stop - break + for { + status := C.sd_journal_wait(j, waitTimeout) + if status < 0 { + logWatcher.Err <- errors.New("error waiting for journal: " + CErr(status)) + goto cleanup + } + select { + case <-logWatcher.WatchConsumerGone(): + goto cleanup // won't be able to write anything anymore + case <-logWatcher.WatchProducerGone(): + // container is gone, drain journal + default: + // container is still alive + if status == C.SD_JOURNAL_NOP { + // no new data -- keep waiting + continue } } - - // Clean up. - C.close(pfd[0]) - s.mu.Lock() - delete(s.readers, logWatcher) - s.mu.Unlock() - close(logWatcher.Msg) - newCursor <- cursor - }() - - // Wait until we're told to stop. - select { - case cursor = <-newCursor: - case <-logWatcher.WatchConsumerGone(): - // Notify the other goroutine that its work is done. - C.close(pfd[1]) - cursor = <-newCursor + newCursor, done, recv := s.drainJournal(logWatcher, j, cursor, untilUnixMicro) + cursor = newCursor + if done || (status == C.SD_JOURNAL_NOP && recv == 0) { + break + } } +cleanup: + s.mu.Lock() + delete(s.readers, logWatcher) + s.mu.Unlock() + close(logWatcher.Msg) return cursor } func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) { - var j *C.sd_journal - var cmatch, cursor *C.char - var stamp C.uint64_t - var sinceUnixMicro uint64 - var untilUnixMicro uint64 - var pipes [2]C.int + var ( + j *C.sd_journal + cmatch, cursor *C.char + stamp C.uint64_t + sinceUnixMicro uint64 + untilUnixMicro uint64 + ) // Get a handle to the journal. rc := C.sd_journal_open(&j, C.int(0)) @@ -409,19 +356,12 @@ func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadCon return } } - cursor, _ = s.drainJournal(logWatcher, j, nil, untilUnixMicro) + cursor, _, _ = s.drainJournal(logWatcher, j, nil, untilUnixMicro) if config.Follow { - // Create a pipe that we can poll at the same time as - // the journald descriptor. - ret := C.pipe(&pipes[0]) - if ret < 0 { - logWatcher.Err <- fmt.Errorf("error creating journald notification pipe") - } else { - cursor = s.followJournal(logWatcher, j, pipes, cursor, untilUnixMicro) - // Let followJournal handle freeing the journal context - // object and closing the channel. - following = true - } + cursor = s.followJournal(logWatcher, j, cursor, untilUnixMicro) + // Let followJournal handle freeing the journal context + // object and closing the channel. + following = true } C.free(unsafe.Pointer(cursor))