Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add test suite for LogReader implementations #43231

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion daemon/logger/adapter_test.go
Expand Up @@ -174,7 +174,7 @@ func TestAdapterReadLogs(t *testing.T) {
t.Fatal("timeout waiting for message channel to close")

}
lw.ProducerGone()
lw.ConsumerGone()

lw = lr.ReadLogs(ReadConfig{Follow: true})
for _, x := range testMsg {
Expand Down
14 changes: 9 additions & 5 deletions daemon/logger/journald/journald.go
Expand Up @@ -8,7 +8,6 @@ package journald // import "github.com/docker/docker/daemon/logger/journald"
import (
"fmt"
"strconv"
"sync"
"unicode"

"github.com/coreos/go-systemd/v22/journal"
Expand All @@ -20,9 +19,9 @@ import (
const name = "journald"

type journald struct {
mu sync.Mutex //nolint:structcheck,unused
vars map[string]string // additional variables and values to send to the journal along with the log message
readers map[*logger.LogWatcher]struct{}
vars map[string]string // additional variables and values to send to the journal along with the log message

closed chan struct{}
}

func init() {
Expand Down Expand Up @@ -82,7 +81,7 @@ func New(info logger.Info) (logger.Logger, error) {
for k, v := range extraAttrs {
vars[k] = v
}
return &journald{vars: vars, readers: make(map[*logger.LogWatcher]struct{})}, nil
return &journald{vars: vars, closed: make(chan struct{})}, nil
}

// We don't actually accept any options, but we have to supply a callback for
Expand Down Expand Up @@ -129,3 +128,8 @@ func (s *journald) Log(msg *logger.Message) error {
func (s *journald) Name() string {
return name
}

func (s *journald) Close() error {
close(s.closed)
return nil
}
25 changes: 4 additions & 21 deletions daemon/logger/journald/read.go
Expand Up @@ -116,16 +116,6 @@ import (
"github.com/sirupsen/logrus"
)

func (s *journald) Close() error {
s.mu.Lock()
for r := range s.readers {
r.ProducerGone()
delete(s.readers, r)
}
s.mu.Unlock()
return nil
}

// CErr converts error code returned from a sd_journal_* function
// (which returns -errno) to a string
func CErr(ret C.int) string {
Expand Down Expand Up @@ -233,22 +223,20 @@ drain:
}

func (s *journald) followJournal(logWatcher *logger.LogWatcher, j *C.sd_journal, cursor *C.char, untilUnixMicro uint64) *C.char {
s.mu.Lock()
s.readers[logWatcher] = struct{}{}
s.mu.Unlock()
defer close(logWatcher.Msg)

waitTimeout := C.uint64_t(250000) // 0.25s

for {
status := C.sd_journal_wait(j, waitTimeout)
if status < 0 {
logWatcher.Err <- errors.New("error waiting for journal: " + CErr(status))
goto cleanup
break
}
select {
case <-logWatcher.WatchConsumerGone():
goto cleanup // won't be able to write anything anymore
case <-logWatcher.WatchProducerGone():
break // won't be able to write anything anymore
case <-s.closed:
// container is gone, drain journal
default:
// container is still alive
Expand All @@ -264,11 +252,6 @@ func (s *journald) followJournal(logWatcher *logger.LogWatcher, j *C.sd_journal,
}
}

cleanup:
s.mu.Lock()
delete(s.readers, logWatcher)
s.mu.Unlock()
close(logWatcher.Msg)
return cursor
}

Expand Down
8 changes: 0 additions & 8 deletions daemon/logger/journald/read_unsupported.go

This file was deleted.

23 changes: 7 additions & 16 deletions daemon/logger/jsonfilelog/jsonfilelog.go
Expand Up @@ -23,11 +23,9 @@ const Name = "json-file"

// JSONFileLogger is Logger implementation for default Docker logging.
type JSONFileLogger struct {
mu sync.Mutex
closed bool
writer *loggerutils.LogFile
readers map[*logger.LogWatcher]struct{} // stores the active log followers
tag string // tag values requested by the user to log
mu sync.Mutex
writer *loggerutils.LogFile
tag string // tag values requested by the user to log
}

func init() {
Expand Down Expand Up @@ -116,9 +114,8 @@ func New(info logger.Info) (logger.Logger, error) {
}

return &JSONFileLogger{
writer: writer,
readers: make(map[*logger.LogWatcher]struct{}),
tag: tag,
writer: writer,
tag: tag,
}, nil
}

Expand Down Expand Up @@ -171,14 +168,8 @@ func ValidateLogOpt(cfg map[string]string) error {
// that the logs producer is gone.
func (l *JSONFileLogger) Close() error {
l.mu.Lock()
l.closed = true
err := l.writer.Close()
for r := range l.readers {
r.ProducerGone()
delete(l.readers, r)
}
l.mu.Unlock()
return err
defer l.mu.Unlock()
return l.writer.Close()
}

// Name returns name of this logger.
Expand Down
19 changes: 1 addition & 18 deletions daemon/logger/jsonfilelog/read.go
Expand Up @@ -18,24 +18,7 @@ const maxJSONDecodeRetry = 20000
// ReadLogs implements the logger's LogReader interface for the logs
// created by this driver.
func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
logWatcher := logger.NewLogWatcher()

go l.readLogs(logWatcher, config)
return logWatcher
}

func (l *JSONFileLogger) readLogs(watcher *logger.LogWatcher, config logger.ReadConfig) {
defer close(watcher.Msg)

l.mu.Lock()
l.readers[watcher] = struct{}{}
l.mu.Unlock()

l.writer.ReadLogs(config, watcher)

l.mu.Lock()
delete(l.readers, watcher)
l.mu.Unlock()
return l.writer.ReadLogs(config)
}

func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, error) {
Expand Down
23 changes: 20 additions & 3 deletions daemon/logger/jsonfilelog/read_test.go
Expand Up @@ -4,10 +4,12 @@ import (
"bytes"
"encoding/json"
"io"
"path/filepath"
"testing"
"time"

"github.com/docker/docker/daemon/logger"
"github.com/docker/docker/daemon/logger/loggertest"
"gotest.tools/v3/assert"
"gotest.tools/v3/fs"
)
Expand Down Expand Up @@ -53,9 +55,10 @@ func BenchmarkJSONFileLoggerReadLogs(b *testing.B) {
lw := jsonlogger.(*JSONFileLogger).ReadLogs(logger.ReadConfig{Follow: true})
for {
select {
case <-lw.Msg:
case <-lw.WatchProducerGone():
return
case _, ok := <-lw.Msg:
if !ok {
return
}
case err := <-chError:
if err != nil {
b.Fatal(err)
Expand Down Expand Up @@ -134,6 +137,20 @@ func TestUnexpectedEOF(t *testing.T) {
assert.Error(t, err, io.EOF.Error())
}

func TestReadLogs(t *testing.T) {
loggertest.Reader{
Factory: func(t *testing.T, info logger.Info) func(*testing.T) logger.Logger {
dir := t.TempDir()
info.LogPath = filepath.Join(dir, info.ContainerID+".log")
return func(t *testing.T) logger.Logger {
l, err := New(info)
assert.NilError(t, err)
return l
}
},
}.Do(t)
}

type readerWithErr struct {
err error
after int
Expand Down
18 changes: 4 additions & 14 deletions daemon/logger/local/local.go
Expand Up @@ -58,9 +58,7 @@ func init() {

type driver struct {
mu sync.Mutex
closed bool
logfile *loggerutils.LogFile
readers map[*logger.LogWatcher]struct{} // stores the active log followers
}

// New creates a new local logger
Expand Down Expand Up @@ -146,7 +144,6 @@ func newDriver(logPath string, cfg *CreateConfig) (logger.Logger, error) {
}
return &driver{
logfile: lf,
readers: make(map[*logger.LogWatcher]struct{}),
}, nil
}

Expand All @@ -156,21 +153,14 @@ func (d *driver) Name() string {

func (d *driver) Log(msg *logger.Message) error {
d.mu.Lock()
err := d.logfile.WriteLogEntry(msg)
d.mu.Unlock()
return err
defer d.mu.Unlock()
return d.logfile.WriteLogEntry(msg)
}

func (d *driver) Close() error {
d.mu.Lock()
d.closed = true
err := d.logfile.Close()
for r := range d.readers {
r.ProducerGone()
delete(d.readers, r)
}
d.mu.Unlock()
return err
defer d.mu.Unlock()
return d.logfile.Close()
}

func messageToProto(msg *logger.Message, proto *logdriver.LogEntry, partial *logdriver.PartialLogEntryMetadata) {
Expand Down
98 changes: 11 additions & 87 deletions daemon/logger/local/local_test.go
Expand Up @@ -2,19 +2,18 @@ package local

import (
"bytes"
"context"
"encoding/binary"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"testing"
"time"

"github.com/docker/docker/api/types/backend"
"github.com/docker/docker/api/types/plugins/logdriver"
"github.com/docker/docker/daemon/logger"
"github.com/docker/docker/daemon/logger/loggertest"
protoio "github.com/gogo/protobuf/io"
"gotest.tools/v3/assert"
is "gotest.tools/v3/assert/cmp"
Expand Down Expand Up @@ -80,92 +79,17 @@ func TestWriteLog(t *testing.T) {
}

func TestReadLog(t *testing.T) {
t.Parallel()

dir, err := os.MkdirTemp("", t.Name())
assert.NilError(t, err)
defer os.RemoveAll(dir)

logPath := filepath.Join(dir, "test.log")
l, err := New(logger.Info{LogPath: logPath})
assert.NilError(t, err)
defer l.Close()

m1 := logger.Message{Source: "stdout", Timestamp: time.Now().Add(-1 * 30 * time.Minute), Line: []byte("a message")}
m2 := logger.Message{Source: "stdout", Timestamp: time.Now().Add(-1 * 20 * time.Minute), Line: []byte("another message"), PLogMetaData: &backend.PartialLogMetaData{Ordinal: 1, Last: true}}
longMessage := []byte("a really long message " + strings.Repeat("a", initialBufSize*2))
m3 := logger.Message{Source: "stderr", Timestamp: time.Now().Add(-1 * 10 * time.Minute), Line: longMessage}
m4 := logger.Message{Source: "stderr", Timestamp: time.Now().Add(-1 * 10 * time.Minute), Line: []byte("just one more message")}

// copy the log message because the underlying log writer resets the log message and returns it to a buffer pool
err = l.Log(copyLogMessage(&m1))
assert.NilError(t, err)
err = l.Log(copyLogMessage(&m2))
assert.NilError(t, err)
err = l.Log(copyLogMessage(&m3))
assert.NilError(t, err)
err = l.Log(copyLogMessage(&m4))
assert.NilError(t, err)

lr := l.(logger.LogReader)

testMessage := func(t *testing.T, lw *logger.LogWatcher, m *logger.Message) {
t.Helper()
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
select {
case <-ctx.Done():
assert.Assert(t, ctx.Err())
case err := <-lw.Err:
assert.NilError(t, err)
case msg, open := <-lw.Msg:
if !open {
select {
case err := <-lw.Err:
assert.NilError(t, err)
default:
assert.Assert(t, m == nil)
return
}
loggertest.Reader{
Factory: func(t *testing.T, info logger.Info) func(*testing.T) logger.Logger {
dir := t.TempDir()
info.LogPath = filepath.Join(dir, info.ContainerID+".log")
return func(t *testing.T) logger.Logger {
l, err := New(info)
assert.NilError(t, err)
return l
}
assert.Assert(t, m != nil)
if m.PLogMetaData == nil {
// a `\n` is appended on read to make this work with the existing API's when the message is not a partial.
// make sure it's the last entry in the line, and then truncate it for the deep equal below.
assert.Check(t, msg.Line[len(msg.Line)-1] == '\n')
msg.Line = msg.Line[:len(msg.Line)-1]
}
assert.Check(t, is.DeepEqual(m, msg), fmt.Sprintf("\n%+v\n%+v", m, msg))
}
}

t.Run("tail exact", func(t *testing.T) {
lw := lr.ReadLogs(logger.ReadConfig{Tail: 4})

testMessage(t, lw, &m1)
testMessage(t, lw, &m2)
testMessage(t, lw, &m3)
testMessage(t, lw, &m4)
testMessage(t, lw, nil) // no more messages
})

t.Run("tail less than available", func(t *testing.T) {
lw := lr.ReadLogs(logger.ReadConfig{Tail: 2})

testMessage(t, lw, &m3)
testMessage(t, lw, &m4)
testMessage(t, lw, nil) // no more messages
})

t.Run("tail more than available", func(t *testing.T) {
lw := lr.ReadLogs(logger.ReadConfig{Tail: 100})

testMessage(t, lw, &m1)
testMessage(t, lw, &m2)
testMessage(t, lw, &m3)
testMessage(t, lw, &m4)
testMessage(t, lw, nil) // no more messages
})
},
}.Do(t)
}

func BenchmarkLogWrite(b *testing.B) {
Expand Down