Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Change buffer to be managed by another structure
  • Loading branch information
daichirata committed Dec 15, 2017
1 parent 38301f4 commit e0f40b1
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 32 deletions.
48 changes: 48 additions & 0 deletions buffer.go
@@ -0,0 +1,48 @@
package fluent

import (
"sync"
)

type buffer struct {
buf []byte
mu sync.Mutex
Dirty chan struct{}
}

func newBuffer() buffer {
return buffer{
buf: []byte{},
Dirty: make(chan struct{}),
}
}

func (buffer *buffer) Add(raw []byte) {
buffer.mu.Lock()
defer buffer.mu.Unlock()

buffer.buf = append(buffer.buf, raw...)
go func() {
buffer.Dirty <- struct{}{}
}()
}

func (buffer *buffer) Remove() []byte {
buffer.mu.Lock()
defer buffer.mu.Unlock()

if len(buffer.buf) == 0 {
return nil
}

data := buffer.buf
buffer.buf = buffer.buf[:0]
return data
}

func (buffer *buffer) Back(raw []byte) {
buffer.mu.Lock()
defer buffer.mu.Unlock()

buffer.buf = append(buffer.buf, raw...)
}
52 changes: 20 additions & 32 deletions logger.go
Expand Up @@ -34,22 +34,19 @@ func withDefaultConfig(c Config) Config {
}

type Logger struct {
conf Config
conn io.WriteCloser
bmu sync.Mutex
cmu sync.Mutex
buf []byte
wg sync.WaitGroup
done chan struct{}
dirty chan struct{}
conf Config
conn io.WriteCloser
mu sync.Mutex
buf buffer
wg sync.WaitGroup
done chan struct{}
}

func NewLogger(c Config) (*Logger, error) {
logger := &Logger{
conf: withDefaultConfig(c),
buf: []byte{},
done: make(chan struct{}),
dirty: make(chan struct{}),
conf: withDefaultConfig(c),
buf: newBuffer(),
done: make(chan struct{}),
}
if err := logger.connect(); err != nil {
return nil, err
Expand All @@ -74,15 +71,7 @@ func (logger *Logger) PostWithTime(tag string, t time.Time, obj interface{}) err
if err := enc.Encode(record); err != nil {
return err
}
raw := buf.Bytes()

logger.bmu.Lock()
logger.buf = append(logger.buf, raw...)
logger.bmu.Unlock()

go func() {
logger.dirty <- struct{}{}
}()
logger.buf.Add(buf.Bytes())
return nil
}

Expand All @@ -92,8 +81,8 @@ func (logger *Logger) Close() error {
}

func (logger *Logger) connect() error {
logger.cmu.Lock()
defer logger.cmu.Unlock()
logger.mu.Lock()
defer logger.mu.Unlock()

if logger.conn != nil {
return nil
Expand All @@ -116,8 +105,8 @@ func (logger *Logger) connect() error {
}

func (logger *Logger) disconnect() error {
logger.cmu.Lock()
defer logger.cmu.Unlock()
logger.mu.Lock()
defer logger.mu.Unlock()

if logger.conn == nil {
return nil
Expand All @@ -130,26 +119,25 @@ func (logger *Logger) disconnect() error {
const maxWriteAttempts = 3

func (logger *Logger) send() error {
logger.bmu.Lock()
defer logger.bmu.Unlock()

data := logger.buf
data := logger.buf.Remove()
if len(data) == 0 {
return nil
}

var err error
for i := 0; i < maxWriteAttempts; i++ {
err = logger.connect()
if err == nil {
_, err := logger.conn.Write(data)
if err == nil {
logger.buf = logger.buf[:0]
break
}
}
logger.disconnect()
}

if err != nil {
logger.buf.Back(data)
}
return err
}

Expand All @@ -163,7 +151,7 @@ func (logger *Logger) start() {
case <-logger.done:
logger.send()
return
case <-logger.dirty:
case <-logger.buf.Dirty:
case <-ticker.C:
}
logger.send()
Expand Down

0 comments on commit e0f40b1

Please sign in to comment.