From e0f40b163ac68657d734463a30a601190d64a498 Mon Sep 17 00:00:00 2001 From: Daichi HIRATA Date: Fri, 15 Dec 2017 14:15:43 +0900 Subject: [PATCH] Change buffer to be managed by another structure --- buffer.go | 48 ++++++++++++++++++++++++++++++++++++++++++++++++ logger.go | 52 ++++++++++++++++++++-------------------------------- 2 files changed, 68 insertions(+), 32 deletions(-) create mode 100644 buffer.go diff --git a/buffer.go b/buffer.go new file mode 100644 index 0000000..392ba79 --- /dev/null +++ b/buffer.go @@ -0,0 +1,48 @@ +package fluent + +import ( + "sync" +) + +type buffer struct { + buf []byte + mu sync.Mutex + Dirty chan struct{} +} + +func newBuffer() buffer { + return buffer{ + buf: []byte{}, + Dirty: make(chan struct{}), + } +} + +func (buffer *buffer) Add(raw []byte) { + buffer.mu.Lock() + defer buffer.mu.Unlock() + + buffer.buf = append(buffer.buf, raw...) + go func() { + buffer.Dirty <- struct{}{} + }() +} + +func (buffer *buffer) Remove() []byte { + buffer.mu.Lock() + defer buffer.mu.Unlock() + + if len(buffer.buf) == 0 { + return nil + } + + data := buffer.buf + buffer.buf = buffer.buf[:0] + return data +} + +func (buffer *buffer) Back(raw []byte) { + buffer.mu.Lock() + defer buffer.mu.Unlock() + + buffer.buf = append(buffer.buf, raw...) +} diff --git a/logger.go b/logger.go index 06f69f3..08c4d91 100644 --- a/logger.go +++ b/logger.go @@ -34,22 +34,19 @@ func withDefaultConfig(c Config) Config { } type Logger struct { - conf Config - conn io.WriteCloser - bmu sync.Mutex - cmu sync.Mutex - buf []byte - wg sync.WaitGroup - done chan struct{} - dirty chan struct{} + conf Config + conn io.WriteCloser + mu sync.Mutex + buf buffer + wg sync.WaitGroup + done chan struct{} } func NewLogger(c Config) (*Logger, error) { logger := &Logger{ - conf: withDefaultConfig(c), - buf: []byte{}, - done: make(chan struct{}), - dirty: make(chan struct{}), + conf: withDefaultConfig(c), + buf: newBuffer(), + done: make(chan struct{}), } if err := logger.connect(); err != nil { return nil, err @@ -74,15 +71,7 @@ func (logger *Logger) PostWithTime(tag string, t time.Time, obj interface{}) err if err := enc.Encode(record); err != nil { return err } - raw := buf.Bytes() - - logger.bmu.Lock() - logger.buf = append(logger.buf, raw...) - logger.bmu.Unlock() - - go func() { - logger.dirty <- struct{}{} - }() + logger.buf.Add(buf.Bytes()) return nil } @@ -92,8 +81,8 @@ func (logger *Logger) Close() error { } func (logger *Logger) connect() error { - logger.cmu.Lock() - defer logger.cmu.Unlock() + logger.mu.Lock() + defer logger.mu.Unlock() if logger.conn != nil { return nil @@ -116,8 +105,8 @@ func (logger *Logger) connect() error { } func (logger *Logger) disconnect() error { - logger.cmu.Lock() - defer logger.cmu.Unlock() + logger.mu.Lock() + defer logger.mu.Unlock() if logger.conn == nil { return nil @@ -130,26 +119,25 @@ func (logger *Logger) disconnect() error { const maxWriteAttempts = 3 func (logger *Logger) send() error { - logger.bmu.Lock() - defer logger.bmu.Unlock() - - data := logger.buf + data := logger.buf.Remove() if len(data) == 0 { return nil } + var err error for i := 0; i < maxWriteAttempts; i++ { err = logger.connect() if err == nil { _, err := logger.conn.Write(data) if err == nil { - logger.buf = logger.buf[:0] break } } logger.disconnect() } - + if err != nil { + logger.buf.Back(data) + } return err } @@ -163,7 +151,7 @@ func (logger *Logger) start() { case <-logger.done: logger.send() return - case <-logger.dirty: + case <-logger.buf.Dirty: case <-ticker.C: } logger.send()