Skip to content

Commit

Permalink
journald/read: simplify/fix followJournal()
Browse files Browse the repository at this point in the history
TL;DR: simplify the code, fix --follow hanging indefinitely

Do the following to simplify the followJournal() code:

1. Use Go-native select instead of C-native polling.

2. Use Watch{Producer,Consumer}Gone(), eliminating the need
to have journald.closed variable, and an extra goroutine.

3. Use sd_journal_wait(). In the words of its own man page:

> A synchronous alternative for using sd_journal_get_fd(),
> sd_journal_get_events(), sd_journal_get_timeout() and
> sd_journal_process() is sd_journal_wait().

Unfortunately, the logic is still not as simple as it
could be; the reason being, once the container has exited,
journald might still be writing some logs from its internal
buffers onto journal file(s), and there is no way to
figure out whether it's done so we are guaranteed to
read all of it back. This bug can be reproduced with
something like

> $ ID=$(docker run -d busybox seq 1 150000); docker logs --follow $ID
> ...
> 128123
> $

(The last expected output line should be `150000`).

To avoid exiting from followJournal() early, add the
following logic: once the container is gone, keep trying
to drain the journal until there's no new data for at
least `waitTimeout` time period.

Should fix docker/for-linux#575

(cherry picked from commit f091feb)

Signed-off-by: Kir Kolyshkin <kolyshkin@gmail.com>
Signed-off-by: Robert Günzler <robertg@balena.io>
  • Loading branch information
kolyshkin authored and robertgzr committed Aug 22, 2019
1 parent 0bf996d commit 92a4571
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 120 deletions.
1 change: 0 additions & 1 deletion daemon/logger/journald/journald.go
Expand Up @@ -21,7 +21,6 @@ type journald struct {
mu sync.Mutex
vars map[string]string // additional variables and values to send to the journal along with the log message
readers map[*logger.LogWatcher]struct{}
closed bool
}

func init() {
Expand Down
178 changes: 59 additions & 119 deletions daemon/logger/journald/read.go
Expand Up @@ -101,55 +101,10 @@ package journald // import "github.com/docker/docker/daemon/logger/journald"
// }
// return rc;
//}
//static int wait_for_data_cancelable(sd_journal *j, int pipefd)
//{
// struct pollfd fds[2];
// uint64_t when = 0;
// int timeout, jevents, i;
// struct timespec ts;
// uint64_t now;
//
// memset(&fds, 0, sizeof(fds));
// fds[0].fd = pipefd;
// fds[0].events = POLLHUP;
// fds[1].fd = sd_journal_get_fd(j);
// if (fds[1].fd < 0) {
// return fds[1].fd;
// }
//
// do {
// jevents = sd_journal_get_events(j);
// if (jevents < 0) {
// return jevents;
// }
// fds[1].events = jevents;
// sd_journal_get_timeout(j, &when);
// if (when == -1) {
// timeout = -1;
// } else {
// clock_gettime(CLOCK_MONOTONIC, &ts);
// now = (uint64_t) ts.tv_sec * 1000000 + ts.tv_nsec / 1000;
// timeout = when > now ? (int) ((when - now + 999) / 1000) : 0;
// }
// i = poll(fds, 2, timeout);
// if ((i == -1) && (errno != EINTR)) {
// /* An unexpected error. */
// return (errno != 0) ? -errno : -EINTR;
// }
// if (fds[0].revents & POLLHUP) {
// /* The close notification pipe was closed. */
// return 0;
// }
// if (sd_journal_process(j) == SD_JOURNAL_APPEND) {
// /* Data, which we might care about, was appended. */
// return 1;
// }
// } while ((fds[0].revents & POLLHUP) == 0);
// return 0;
//}
import "C"

import (
"errors"
"fmt"
"strings"
"time"
Expand All @@ -158,27 +113,33 @@ import (
"github.com/coreos/go-systemd/journal"
"github.com/docker/docker/api/types/backend"
"github.com/docker/docker/daemon/logger"
"github.com/sirupsen/logrus"
)

func (s *journald) Close() error {
s.mu.Lock()
s.closed = true
for r := range s.readers {
r.ProducerGone()
delete(s.readers, r)

}
s.mu.Unlock()
return nil
}

func (s *journald) drainJournal(logWatcher *logger.LogWatcher, j *C.sd_journal, oldCursor *C.char, untilUnixMicro uint64) (*C.char, bool) {
var msg, data, cursor *C.char
var length C.size_t
var stamp C.uint64_t
var priority, partial C.int
var done bool
// convert error code returned from a sd_journal_* function
// (which returns -errno) to a string
func CErr(ret C.int) string {
return C.GoString(C.strerror(C.int(-ret)))
}

func (s *journald) drainJournal(logWatcher *logger.LogWatcher, j *C.sd_journal, oldCursor *C.char, untilUnixMicro uint64) (*C.char, bool, int) {
var (
msg, data, cursor *C.char
length C.size_t
stamp C.uint64_t
priority, partial C.int
done bool
shown int
)

// Walk the journal from here forward until we run out of new entries
// or we reach the until value (if provided).
Expand Down Expand Up @@ -230,6 +191,7 @@ drain:
kv := strings.SplitN(C.GoStringN(data, C.int(length)), "=", 2)
attrs = append(attrs, backend.LogAttr{Key: kv[0], Value: kv[1]})
}

// Send the log message, unless the consumer is gone
select {
case <-logWatcher.WatchConsumerGone():
Expand All @@ -241,6 +203,7 @@ drain:
Timestamp: timestamp.In(time.UTC),
Attrs: attrs,
}:
shown++
}
}
// If we're at the end of the journal, we're done (for now).
Expand All @@ -255,73 +218,57 @@ drain:
// ensure that we won't be freeing an address that's invalid
cursor = nil
}
return cursor, done
return cursor, done, shown
}

func (s *journald) followJournal(logWatcher *logger.LogWatcher, j *C.sd_journal, pfd [2]C.int, cursor *C.char, untilUnixMicro uint64) *C.char {
func (s *journald) followJournal(logWatcher *logger.LogWatcher, j *C.sd_journal, cursor *C.char, untilUnixMicro uint64) *C.char {
s.mu.Lock()
s.readers[logWatcher] = struct{}{}
if s.closed {
// the journald Logger is closed, presumably because the container has been
// reset. So we shouldn't follow, because we'll never be woken up. But we
// should make one more drainJournal call to be sure we've got all the logs.
// Close pfd[1] so that one drainJournal happens, then cleanup, then return.
C.close(pfd[1])
}
s.mu.Unlock()

newCursor := make(chan *C.char)
waitTimeout := C.uint64_t(250000) // 0.25s

go func() {
for {
// Keep copying journal data out until we're notified to stop
// or we hit an error.
status := C.wait_for_data_cancelable(j, pfd[0])
if status < 0 {
cerrstr := C.strerror(C.int(-status))
errstr := C.GoString(cerrstr)
fmtstr := "error %q while attempting to follow journal for container %q"
logrus.Errorf(fmtstr, errstr, s.vars["CONTAINER_ID_FULL"])
break
}

var done bool
cursor, done = s.drainJournal(logWatcher, j, cursor, untilUnixMicro)

if status != 1 || done {
// We were notified to stop
break
for {
status := C.sd_journal_wait(j, waitTimeout)
if status < 0 {
logWatcher.Err <- errors.New("error waiting for journal: " + CErr(status))
goto cleanup
}
select {
case <-logWatcher.WatchConsumerGone():
goto cleanup // won't be able to write anything anymore
case <-logWatcher.WatchProducerGone():
// container is gone, drain journal
default:
// container is still alive
if status == C.SD_JOURNAL_NOP {
// no new data -- keep waiting
continue
}
}

// Clean up.
C.close(pfd[0])
s.mu.Lock()
delete(s.readers, logWatcher)
s.mu.Unlock()
close(logWatcher.Msg)
newCursor <- cursor
}()

// Wait until we're told to stop.
select {
case cursor = <-newCursor:
case <-logWatcher.WatchConsumerGone():
// Notify the other goroutine that its work is done.
C.close(pfd[1])
cursor = <-newCursor
newCursor, done, recv := s.drainJournal(logWatcher, j, cursor, untilUnixMicro)
cursor = newCursor
if done || (status == C.SD_JOURNAL_NOP && recv == 0) {
break
}
}

cleanup:
s.mu.Lock()
delete(s.readers, logWatcher)
s.mu.Unlock()
close(logWatcher.Msg)
return cursor
}

func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) {
var j *C.sd_journal
var cmatch, cursor *C.char
var stamp C.uint64_t
var sinceUnixMicro uint64
var untilUnixMicro uint64
var pipes [2]C.int
var (
j *C.sd_journal
cmatch, cursor *C.char
stamp C.uint64_t
sinceUnixMicro uint64
untilUnixMicro uint64
)

// Get a handle to the journal.
rc := C.sd_journal_open(&j, C.int(0))
Expand Down Expand Up @@ -409,19 +356,12 @@ func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadCon
return
}
}
cursor, _ = s.drainJournal(logWatcher, j, nil, untilUnixMicro)
cursor, _, _ = s.drainJournal(logWatcher, j, nil, untilUnixMicro)
if config.Follow {
// Create a pipe that we can poll at the same time as
// the journald descriptor.
ret := C.pipe(&pipes[0])
if ret < 0 {
logWatcher.Err <- fmt.Errorf("error creating journald notification pipe")
} else {
cursor = s.followJournal(logWatcher, j, pipes, cursor, untilUnixMicro)
// Let followJournal handle freeing the journal context
// object and closing the channel.
following = true
}
cursor = s.followJournal(logWatcher, j, cursor, untilUnixMicro)
// Let followJournal handle freeing the journal context
// object and closing the channel.
following = true
}

C.free(unsafe.Pointer(cursor))
Expand Down

0 comments on commit 92a4571

Please sign in to comment.