diff --git a/daemon/logger/journald/journald.go b/daemon/logger/journald/journald.go index 113ed585c9..c569002f7b 100644 --- a/daemon/logger/journald/journald.go +++ b/daemon/logger/journald/journald.go @@ -21,7 +21,6 @@ type journald struct { mu sync.Mutex vars map[string]string // additional variables and values to send to the journal along with the log message readers map[*logger.LogWatcher]struct{} - closed bool } func init() { diff --git a/daemon/logger/journald/read.go b/daemon/logger/journald/read.go index 8aa01c460d..3dfa08b0a4 100644 --- a/daemon/logger/journald/read.go +++ b/daemon/logger/journald/read.go @@ -101,55 +101,10 @@ package journald // import "github.com/docker/docker/daemon/logger/journald" // } // return rc; //} -//static int wait_for_data_cancelable(sd_journal *j, int pipefd) -//{ -// struct pollfd fds[2]; -// uint64_t when = 0; -// int timeout, jevents, i; -// struct timespec ts; -// uint64_t now; -// -// memset(&fds, 0, sizeof(fds)); -// fds[0].fd = pipefd; -// fds[0].events = POLLHUP; -// fds[1].fd = sd_journal_get_fd(j); -// if (fds[1].fd < 0) { -// return fds[1].fd; -// } -// -// do { -// jevents = sd_journal_get_events(j); -// if (jevents < 0) { -// return jevents; -// } -// fds[1].events = jevents; -// sd_journal_get_timeout(j, &when); -// if (when == -1) { -// timeout = -1; -// } else { -// clock_gettime(CLOCK_MONOTONIC, &ts); -// now = (uint64_t) ts.tv_sec * 1000000 + ts.tv_nsec / 1000; -// timeout = when > now ? (int) ((when - now + 999) / 1000) : 0; -// } -// i = poll(fds, 2, timeout); -// if ((i == -1) && (errno != EINTR)) { -// /* An unexpected error. */ -// return (errno != 0) ? -errno : -EINTR; -// } -// if (fds[0].revents & POLLHUP) { -// /* The close notification pipe was closed. */ -// return 0; -// } -// if (sd_journal_process(j) == SD_JOURNAL_APPEND) { -// /* Data, which we might care about, was appended. */ -// return 1; -// } -// } while ((fds[0].revents & POLLHUP) == 0); -// return 0; -//} import "C" import ( + "errors" "fmt" "strings" "time" @@ -158,27 +113,33 @@ import ( "github.com/coreos/go-systemd/journal" "github.com/docker/docker/api/types/backend" "github.com/docker/docker/daemon/logger" - "github.com/sirupsen/logrus" ) func (s *journald) Close() error { s.mu.Lock() - s.closed = true for r := range s.readers { r.ProducerGone() delete(s.readers, r) - } s.mu.Unlock() return nil } -func (s *journald) drainJournal(logWatcher *logger.LogWatcher, j *C.sd_journal, oldCursor *C.char, untilUnixMicro uint64) (*C.char, bool) { - var msg, data, cursor *C.char - var length C.size_t - var stamp C.uint64_t - var priority, partial C.int - var done bool +// convert error code returned from a sd_journal_* function +// (which returns -errno) to a string +func CErr(ret C.int) string { + return C.GoString(C.strerror(C.int(-ret))) +} + +func (s *journald) drainJournal(logWatcher *logger.LogWatcher, j *C.sd_journal, oldCursor *C.char, untilUnixMicro uint64) (*C.char, bool, int) { + var ( + msg, data, cursor *C.char + length C.size_t + stamp C.uint64_t + priority, partial C.int + done bool + shown int + ) // Walk the journal from here forward until we run out of new entries // or we reach the until value (if provided). @@ -230,6 +191,7 @@ drain: kv := strings.SplitN(C.GoStringN(data, C.int(length)), "=", 2) attrs = append(attrs, backend.LogAttr{Key: kv[0], Value: kv[1]}) } + // Send the log message, unless the consumer is gone select { case <-logWatcher.WatchConsumerGone(): @@ -241,6 +203,7 @@ drain: Timestamp: timestamp.In(time.UTC), Attrs: attrs, }: + shown++ } } // If we're at the end of the journal, we're done (for now). @@ -255,73 +218,57 @@ drain: // ensure that we won't be freeing an address that's invalid cursor = nil } - return cursor, done + return cursor, done, shown } -func (s *journald) followJournal(logWatcher *logger.LogWatcher, j *C.sd_journal, pfd [2]C.int, cursor *C.char, untilUnixMicro uint64) *C.char { +func (s *journald) followJournal(logWatcher *logger.LogWatcher, j *C.sd_journal, cursor *C.char, untilUnixMicro uint64) *C.char { s.mu.Lock() s.readers[logWatcher] = struct{}{} - if s.closed { - // the journald Logger is closed, presumably because the container has been - // reset. So we shouldn't follow, because we'll never be woken up. But we - // should make one more drainJournal call to be sure we've got all the logs. - // Close pfd[1] so that one drainJournal happens, then cleanup, then return. - C.close(pfd[1]) - } s.mu.Unlock() - newCursor := make(chan *C.char) + waitTimeout := C.uint64_t(250000) // 0.25s - go func() { - for { - // Keep copying journal data out until we're notified to stop - // or we hit an error. - status := C.wait_for_data_cancelable(j, pfd[0]) - if status < 0 { - cerrstr := C.strerror(C.int(-status)) - errstr := C.GoString(cerrstr) - fmtstr := "error %q while attempting to follow journal for container %q" - logrus.Errorf(fmtstr, errstr, s.vars["CONTAINER_ID_FULL"]) - break - } - - var done bool - cursor, done = s.drainJournal(logWatcher, j, cursor, untilUnixMicro) - - if status != 1 || done { - // We were notified to stop - break + for { + status := C.sd_journal_wait(j, waitTimeout) + if status < 0 { + logWatcher.Err <- errors.New("error waiting for journal: " + CErr(status)) + goto cleanup + } + select { + case <-logWatcher.WatchConsumerGone(): + goto cleanup // won't be able to write anything anymore + case <-logWatcher.WatchProducerGone(): + // container is gone, drain journal + default: + // container is still alive + if status == C.SD_JOURNAL_NOP { + // no new data -- keep waiting + continue } } - - // Clean up. - C.close(pfd[0]) - s.mu.Lock() - delete(s.readers, logWatcher) - s.mu.Unlock() - close(logWatcher.Msg) - newCursor <- cursor - }() - - // Wait until we're told to stop. - select { - case cursor = <-newCursor: - case <-logWatcher.WatchConsumerGone(): - // Notify the other goroutine that its work is done. - C.close(pfd[1]) - cursor = <-newCursor + newCursor, done, recv := s.drainJournal(logWatcher, j, cursor, untilUnixMicro) + cursor = newCursor + if done || (status == C.SD_JOURNAL_NOP && recv == 0) { + break + } } +cleanup: + s.mu.Lock() + delete(s.readers, logWatcher) + s.mu.Unlock() + close(logWatcher.Msg) return cursor } func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) { - var j *C.sd_journal - var cmatch, cursor *C.char - var stamp C.uint64_t - var sinceUnixMicro uint64 - var untilUnixMicro uint64 - var pipes [2]C.int + var ( + j *C.sd_journal + cmatch, cursor *C.char + stamp C.uint64_t + sinceUnixMicro uint64 + untilUnixMicro uint64 + ) // Get a handle to the journal. rc := C.sd_journal_open(&j, C.int(0)) @@ -409,19 +356,12 @@ func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadCon return } } - cursor, _ = s.drainJournal(logWatcher, j, nil, untilUnixMicro) + cursor, _, _ = s.drainJournal(logWatcher, j, nil, untilUnixMicro) if config.Follow { - // Create a pipe that we can poll at the same time as - // the journald descriptor. - ret := C.pipe(&pipes[0]) - if ret < 0 { - logWatcher.Err <- fmt.Errorf("error creating journald notification pipe") - } else { - cursor = s.followJournal(logWatcher, j, pipes, cursor, untilUnixMicro) - // Let followJournal handle freeing the journal context - // object and closing the channel. - following = true - } + cursor = s.followJournal(logWatcher, j, cursor, untilUnixMicro) + // Let followJournal handle freeing the journal context + // object and closing the channel. + following = true } C.free(unsafe.Pointer(cursor))