Skip to content

Commit

Permalink
Remove mutex in async appender.
Browse files Browse the repository at this point in the history
Async appender now requires Start() be called before appending log.
  • Loading branch information
nqv committed Sep 25, 2016
1 parent 85baf80 commit 186d558
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 88 deletions.
86 changes: 39 additions & 47 deletions async/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,89 +11,77 @@ import (
)

// Appender sends the logging event to all appenders asynchronously.
// It implements gol.Appender.
type Appender struct {
// DrainTimeout is maximum duration before timing out flush a channel.
DrainTimeout time.Duration
// drainTimeout is maximum duration before timing out flush a channel.
drainTimeout time.Duration

wg sync.WaitGroup
appenders []gol.Appender
chans []chan *gol.LoggingEvent

mu sync.Mutex
// started in an indicator for this appender state.
started bool
// finish is used in Start and Stop
finish chan struct{}

forceStopped bool
}

var _ (gol.Appender) = (*Appender)(nil)

// NewAppender allocates and returns a new Appender.
func NewAppender(bufferSize int, appenders ...gol.Appender) *Appender {
a := &Appender{
appenders: appenders,
chans: make([]chan *gol.LoggingEvent, len(appenders)),
// Start() must be called before writing data.
func NewAppender(appenders ...gol.Appender) *Appender {
return NewAppenderWithBufSize(10, appenders...)
}

DrainTimeout: 10 * time.Second,
// NewAppenderWithBufSize returns an asynchronous appender with a buffer size
// of bufSize for each appender channel.
func NewAppenderWithBufSize(bufSize int, appenders ...gol.Appender) *Appender {
a := &Appender{
appenders: appenders,
drainTimeout: 10 * time.Second,
}
a.chans = make([]chan *gol.LoggingEvent, len(appenders))
for i := range appenders {
a.chans[i] = make(chan *gol.LoggingEvent, bufferSize)
a.chans[i] = make(chan *gol.LoggingEvent, bufSize)
}
return a
}

// Append sends the event to all appenders.
func (a *Appender) Append(e *gol.LoggingEvent) {
// Need to use mutex here to make sure messages are in correct order
// for all channels.
a.mu.Lock()
defer a.mu.Unlock()
if a.finish == nil {
if a.forceStopped {
// Skip the event if appender is stopped.
return
}
a.start()
if !a.started {
// Skip the event if appender is stopped.
return
}
for _, c := range a.chans {
// FIXME: This is still blocking if a channel buffer is full.
c <- e
}
}

// Start does not do anything at the moment.
func (a *Appender) Start() error {
a.mu.Lock()
defer a.mu.Unlock()

a.forceStopped = false
if a.finish != nil {
return nil // started
// Start starts go routines for each writer.
func (a *Appender) Start() {
if a.started {
return
}
a.start()
return nil
}

func (a *Appender) start() {
a.finish = make(chan struct{})
a.wg.Add(len(a.chans))
for i, c := range a.chans {
go a.receive(c, a.appenders[i])
}
a.started = true
}

// Stop makes sure all appenders finish. It must be called when exiting.
func (a *Appender) Stop() error {
a.mu.Lock()
defer a.mu.Unlock()

a.forceStopped = true
if a.finish == nil {
return nil // not started
// Stop stops and waits until all go routines exited.
// Once Stop is called, this appender can not be started again.
func (a *Appender) Stop() {
if !a.started {
return
}
a.started = false
close(a.finish)
a.wg.Wait()
a.finish = nil
return nil
// all channels can be closed now, but then users can not start this
// appender again.
}

func (a *Appender) receive(c chan *gol.LoggingEvent, appender gol.Appender) {
Expand All @@ -110,14 +98,18 @@ func (a *Appender) receive(c chan *gol.LoggingEvent, appender gol.Appender) {
}
}

// flush sends all pending data in the channel to writer or timeout after
// maximum of durationTimeout and the appender timeout.
func (a *Appender) flush(c chan *gol.LoggingEvent, appender gol.Appender) {
timeout := time.After(a.DrainTimeout)
timeout := time.After(a.drainTimeout)
for {
select {
case <-timeout:
// Timeout channel has higher priority.
return
case e := <-c:
appender.Append(e)
// Continue reading from the appender.
default:
// Channel is empty.
return
Expand Down
89 changes: 48 additions & 41 deletions async/async_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/goburrow/gol"
)

var _ (gol.Appender) = (*Appender)(nil)

// channelWriter is used for testing async appender
type channelWriter chan string

Expand All @@ -17,14 +19,11 @@ func (c channelWriter) Write(b []byte) (int, error) {
return len(b), nil
}

func TestAppender(t *testing.T) {
func TestAppenderAsync(t *testing.T) {
c := make(chan string)

appender := NewAppender(1, gol.NewAppender(channelWriter(c)))
err := appender.Start()
if err != nil {
t.Fatal(err)
}
appender := NewAppender(gol.NewAppender(channelWriter(c)))
appender.Start()
defer appender.Stop()
event := &gol.LoggingEvent{
FormattedMessage: "run",
Expand All @@ -38,47 +37,61 @@ func TestAppender(t *testing.T) {
if !strings.Contains(msg, "async: run") {
t.Fatalf("unexpected message: %#v", msg)
}
case <-time.After(1 * time.Second):
t.Fatal("not received after 1 second")
}
}

// slowWriter is a io.Writer which appends data with delay.
type slowWriter struct {
d time.Duration
s []string
}

func (s *slowWriter) Write(b []byte) (int, error) {
time.Sleep(s.d)
s.s = append(s.s, string(b))
return len(b), nil
}

func TestAppenderStop(t *testing.T) {
buffers := [...]bytes.Buffer{
bytes.Buffer{},
bytes.Buffer{},
bytes.Buffer{},
writers := [...]*slowWriter{
&slowWriter{20 * time.Millisecond, nil},
&slowWriter{100 * time.Millisecond, nil},
&slowWriter{50 * time.Millisecond, nil},
}

appender := NewAppender(1,
gol.NewAppender(&buffers[0]),
gol.NewAppender(&buffers[1]),
gol.NewAppender(&buffers[2]),
size := 5
appender := NewAppenderWithBufSize(size,
gol.NewAppender(writers[0]),
gol.NewAppender(writers[1]),
gol.NewAppender(writers[2]),
)
err := appender.Start()
if err != nil {
t.Fatal(err)
}
appender.Start()
event := &gol.LoggingEvent{
FormattedMessage: "run",
Name: "async",
Level: gol.Info,
Time: time.Now(),
}
appender.Append(event)
err = appender.Stop()
if err != nil {
t.Fatal(err)
for i := 0; i < size; i++ {
appender.Append(event)
}
for i := range buffers {
if !strings.Contains(buffers[i].String(), "async: run") {
t.Fatalf("unexpected message: %#v", buffers[i].String())
appender.Stop()
for _, w := range writers {
if size != len(w.s) {
t.Fatalf("unexpected message count: %#v", len(w.s))
}
if !strings.Contains(w.s[0], "async: run") {
t.Fatalf("unexpected message: %#v", w.s[0])
}
}
}

func TestAppenderLifeCycle(t *testing.T) {
var buf bytes.Buffer
size := 5
appender := NewAppender(size, gol.NewAppender(&buf))
appender := NewAppenderWithBufSize(size, gol.NewAppender(&buf))

event := &gol.LoggingEvent{
FormattedMessage: "run",
Expand All @@ -87,26 +100,20 @@ func TestAppenderLifeCycle(t *testing.T) {
Time: time.Now(),
}

appender.Start()
for i := 0; i < size; i++ {
appender.Append(event)
}
err := appender.Stop()
if err != nil {
t.Fatal(err)
}
err = appender.Start()
if err != nil {
t.Fatal(err)
}
err = appender.Start()
if err != nil {
t.Fatal(err)
appender.Stop()
if strings.Count(buf.String(), "async: run") != size {
t.Fatalf("unexpected message: %v", buf.String())
}
err = appender.Stop()
if err != nil {
t.Fatal(err)
appender.Start()
for i := 0; i < size; i++ {
appender.Append(event)
}
if strings.Count(buf.String(), "async: run") != size {
appender.Stop()
if strings.Count(buf.String(), "async: run") != 2*size {
t.Fatalf("unexpected message: %v", buf.String())
}
}

0 comments on commit 186d558

Please sign in to comment.