Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[17.06] Fix log readers can block writes indefinitely #98

Merged
merged 2 commits into from Jul 12, 2017
Merged
Changes from 1 commit
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

@@ -7,6 +7,7 @@ import (
"bytes"
"encoding/json"
"fmt"
"io"
"strconv"
"sync"

@@ -15,19 +16,21 @@ import (
"github.com/docker/docker/daemon/logger/loggerutils"
"github.com/docker/docker/pkg/jsonlog"
"github.com/docker/go-units"
"github.com/pkg/errors"
)

// Name is the name of the file that the jsonlogger logs to.
const Name = "json-file"

// JSONFileLogger is Logger implementation for default Docker logging.
type JSONFileLogger struct {
buf *bytes.Buffer
extra []byte // json-encoded extra attributes

mu sync.RWMutex
buf *bytes.Buffer // avoids allocating a new buffer on each call to `Log()`
closed bool
writer *loggerutils.RotateFileWriter
mu sync.Mutex
readers map[*logger.LogWatcher]struct{} // stores the active log followers
extra []byte // json-encoded extra attributes
closed bool
}

func init() {
@@ -90,33 +93,45 @@ func New(info logger.Info) (logger.Logger, error) {

// Log converts logger.Message to jsonlog.JSONLog and serializes it to file.
func (l *JSONFileLogger) Log(msg *logger.Message) error {
l.mu.Lock()
err := writeMessageBuf(l.writer, msg, l.extra, l.buf)
l.buf.Reset()
l.mu.Unlock()
return err
}

func writeMessageBuf(w io.Writer, m *logger.Message, extra json.RawMessage, buf *bytes.Buffer) error {
if err := marshalMessage(m, extra, buf); err != nil {
logger.PutMessage(m)
return err
}
logger.PutMessage(m)
if _, err := w.Write(buf.Bytes()); err != nil {
return errors.Wrap(err, "error writing log entry")
}
return nil
}

func marshalMessage(msg *logger.Message, extra json.RawMessage, buf *bytes.Buffer) error {
timestamp, err := jsonlog.FastTimeMarshalJSON(msg.Timestamp)
if err != nil {
return err
}
l.mu.Lock()
logline := msg.Line
logLine := msg.Line
if !msg.Partial {
logline = append(msg.Line, '\n')
logLine = append(msg.Line, '\n')
}
err = (&jsonlog.JSONLogs{
Log: logline,
Log: logLine,
Stream: msg.Source,
Created: timestamp,
RawAttrs: l.extra,
}).MarshalJSONBuf(l.buf)
logger.PutMessage(msg)
RawAttrs: extra,
}).MarshalJSONBuf(buf)
if err != nil {
l.mu.Unlock()
return err
return errors.Wrap(err, "error writing log message to buffer")
}

l.buf.WriteByte('\n')
_, err = l.writer.Write(l.buf.Bytes())
l.buf.Reset()
l.mu.Unlock()

return err
err = buf.WriteByte('\n')
return errors.Wrap(err, "error finalizing log buffer")
}

// ValidateLogOpt looks for json specific log options max-file & max-size.
@@ -3,7 +3,6 @@ package jsonfilelog
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"os"
@@ -18,6 +17,7 @@ import (
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/jsonlog"
"github.com/docker/docker/pkg/tailfile"
"github.com/pkg/errors"
)

const maxJSONDecodeRetry = 20000
@@ -48,34 +48,46 @@ func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) {
defer close(logWatcher.Msg)

// lock so the read stream doesn't get corrupted due to rotations or other log data written while we read
// lock so the read stream doesn't get corrupted due to rotations or other log data written while we open these files
// This will block writes!!!
l.mu.Lock()
l.mu.RLock()

// TODO it would be nice to move a lot of this reader implementation to the rotate logger object
pth := l.writer.LogPath()
var files []io.ReadSeeker
for i := l.writer.MaxFiles(); i > 1; i-- {
f, err := os.Open(fmt.Sprintf("%s.%d", pth, i-1))
if err != nil {
if !os.IsNotExist(err) {
logWatcher.Err <- err
break
l.mu.RUnlock()
return
}
continue
}
defer f.Close()

files = append(files, f)
}

latestFile, err := os.Open(pth)
if err != nil {
logWatcher.Err <- err
l.mu.Unlock()
logWatcher.Err <- errors.Wrap(err, "error opening latest log file")
l.mu.RUnlock()
return
}
defer latestFile.Close()

latestChunk, err := newSectionReader(latestFile)

// Now we have the reader sectioned, all fd's opened, we can unlock.
// New writes/rotates will not affect seeking through these files
l.mu.RUnlock()

if err != nil {
logWatcher.Err <- err
return
}

if config.Tail != 0 {
tailer := ioutils.MultiReadSeeker(append(files, latestFile)...)

This comment has been minimized.

Copy link
@cpuguy83

cpuguy83 Jul 6, 2017

Contributor

Need to replace latestFile here with latestChunk

This comment has been minimized.

Copy link
@andrewhsu

andrewhsu Jul 12, 2017

Author Contributor

done with an additional commit

tailFile(tailer, logWatcher, config.Tail, config.Since)
@@ -89,19 +101,14 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R
}

if !config.Follow || l.closed {
l.mu.Unlock()
return
}

if config.Tail >= 0 {
latestFile.Seek(0, os.SEEK_END)
}

notifyRotate := l.writer.NotifyRotate()
defer l.writer.NotifyRotateEvict(notifyRotate)

l.mu.Lock()
l.readers[logWatcher] = struct{}{}

l.mu.Unlock()

followLogs(latestFile, logWatcher, notifyRotate, config.Since)
@@ -111,6 +118,16 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R
l.mu.Unlock()
}

func newSectionReader(f *os.File) (*io.SectionReader, error) {
// seek to the end to get the size
// we'll leave this at the end of the file since section reader does not advance the reader
size, err := f.Seek(0, os.SEEK_END)
if err != nil {
return nil, errors.Wrap(err, "error getting current file size")
}
return io.NewSectionReader(f, 0, size), nil
}

func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since time.Time) {
var rdr io.Reader
rdr = f
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.