Skip to content

Commit

Permalink
Introducing a circuit breaker for error control
Browse files Browse the repository at this point in the history
  • Loading branch information
daichirata committed Dec 15, 2017
1 parent 3112bd1 commit cd0e0b7
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 32 deletions.
10 changes: 8 additions & 2 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions glide.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package: github.com/daichirata/fluent-logger-go
import:
- package: gopkg.in/vmihailenco/msgpack.v2
- package: github.com/rubyist/circuitbreaker
testImport:
- package: github.com/stretchr/testify
subpackages:
Expand Down
76 changes: 46 additions & 30 deletions logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,21 @@ import (
"strings"
"sync"
"time"

"github.com/rubyist/circuitbreaker"
)

var (
defaultAddress = "127.0.0.1:24224"
defaultFlushInterval = 5 * time.Second
defaultAddress = "127.0.0.1:24224"
defaultFlushInterval = 5 * time.Second
defaultFailureThreshold = int64(1)
)

type Config struct {
Address string
ConnectionTimeout time.Duration
FlushInterval time.Duration
FailureThreshold int64
}

func withDefaultConfig(c Config) Config {
Expand All @@ -26,23 +30,29 @@ func withDefaultConfig(c Config) Config {
if c.FlushInterval == 0 {
c.FlushInterval = defaultFlushInterval
}
if c.FailureThreshold == 0 {
c.FailureThreshold = defaultFailureThreshold
}
return c
}

type Logger struct {
conf Config
conn io.WriteCloser
mu sync.Mutex
buf buffer
wg sync.WaitGroup
done chan struct{}
conf Config
conn io.WriteCloser
buf buffer
breaker *circuit.Breaker
mu sync.Mutex
wg sync.WaitGroup
done chan struct{}
}

func NewLogger(c Config) (*Logger, error) {
conf := withDefaultConfig(c)
logger := &Logger{
conf: withDefaultConfig(c),
buf: newBuffer(),
done: make(chan struct{}),
conf: conf,
buf: newBuffer(),
breaker: circuit.NewConsecutiveBreaker(conf.FailureThreshold),
done: make(chan struct{}),
}
if err := logger.connect(); err != nil {
return nil, err
Expand Down Expand Up @@ -70,6 +80,10 @@ func (logger *Logger) PostWithTime(tag string, t time.Time, obj interface{}) err
return nil
}

func (logger *Logger) Subscribe() <-chan circuit.BreakerEvent {
return logger.breaker.Subscribe()
}

func (logger *Logger) Close() error {
logger.stop()
return logger.disconnect()
Expand Down Expand Up @@ -111,38 +125,40 @@ func (logger *Logger) disconnect() error {
return err
}

const maxWriteAttempts = 2

func (logger *Logger) write(messages []*Message) error {
if len(messages) == 0 {
return nil
}
var data []byte
for _, m := range messages {
data = append(data, m.buf.Bytes()...)
}
var err error
for i := 0; i < maxWriteAttempts; i++ {
err = logger.connect()
func (logger *Logger) write(data []byte) error {
err := logger.connect()
if err == nil {
_, err = logger.conn.Write(data)
if err == nil {
_, err := logger.conn.Write(data)
if err == nil {
break
}
return nil
}
logger.disconnect()
}
logger.disconnect()
return err
}

func (logger *Logger) writeWithBreaker(data []byte) error {
return logger.breaker.Call(func() error {
return logger.write(data)
}, 0)
}

func (logger *Logger) send() error {
messages := logger.buf.Remove()
if len(messages) == 0 {
return nil
}

err := logger.write(messages)
var data []byte
for _, m := range messages {
data = append(data, m.buf.Bytes()...)
}
err := logger.write(data)
if err != nil {
logger.buf.Back(messages)
return err
}

for _, m := range messages {
putMessage(m)
}
Expand Down

0 comments on commit cd0e0b7

Please sign in to comment.