Skip to content

Commit

Permalink
Merge pull request #556 from rgooch/master
Browse files Browse the repository at this point in the history
Add lib/bufwriter package and use it to fix idle flushing corner case in lib/logbuf package.
  • Loading branch information
rgooch committed Feb 4, 2019
2 parents 4c2e524 + 8ee3f45 commit 098c7c2
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 11 deletions.
30 changes: 30 additions & 0 deletions lib/bufwriter/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Package bufwriter implements a simplified buffered writer, similar to the
// bufio package in the Go standard library, but adds automatic flushing.
package bufwriter

import (
"io"
"sync"
"time"
)

type flushingWriter interface {
Flush() error
io.Writer
}

type Writer struct {
flushDelay time.Duration
flushingWriter flushingWriter
mutex sync.Mutex // Protect everything below.
err error
flushPending bool
}

func NewWriter(writer io.Writer, flushDelay time.Duration) *Writer {
return newWriter(writer, flushDelay)
}

func (b *Writer) Flush() error { return b.flush() }

func (b *Writer) Write(p []byte) (int, error) { return b.write(p) }
53 changes: 53 additions & 0 deletions lib/bufwriter/impl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package bufwriter

import (
"bufio"
"io"
"time"
)

func newWriter(writer io.Writer, flushDelay time.Duration) *Writer {
b := &Writer{flushDelay: flushDelay}
if bufWriter, ok := writer.(flushingWriter); ok {
b.flushingWriter = bufWriter
} else {
b.flushingWriter = bufio.NewWriter(writer)
}
return b
}

func (b *Writer) delayedFlush() {
time.Sleep(b.flushDelay)
b.flush()
}

func (b *Writer) flush() error {
b.mutex.Lock()
defer b.mutex.Unlock()
b.flushPending = false
if b.err != nil {
return b.err
}
b.err = b.flushingWriter.Flush()
return b.err
}

func (b *Writer) lockAndScheduleFlush() {
b.mutex.Lock()
if b.flushPending {
return
}
b.flushPending = true
go b.delayedFlush()
}

func (b *Writer) write(p []byte) (int, error) {
b.lockAndScheduleFlush()
defer b.mutex.Unlock()
if b.err != nil {
return 0, b.err
}
nWritten, err := b.flushingWriter.Write(p)
b.err = err
return nWritten, err
}
4 changes: 2 additions & 2 deletions lib/logbuf/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
package logbuf

import (
"bufio"
"container/ring"
"flag"
"io"
Expand All @@ -18,6 +17,7 @@ import (
"sync"
"time"

"github.com/Symantec/Dominator/lib/bufwriter"
"github.com/Symantec/Dominator/lib/flagutil"
)

Expand All @@ -38,7 +38,7 @@ type LogBuffer struct {
rwMutex sync.RWMutex
buffer *ring.Ring // Always points to next insert position.
file *os.File
writer *bufio.Writer
writer *bufwriter.Writer
fileSize flagutil.Size
usage flagutil.Size
writeNotifier chan<- struct{}
Expand Down
16 changes: 7 additions & 9 deletions lib/logbuf/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"syscall"
"time"

"github.com/Symantec/Dominator/lib/bufwriter"
"github.com/Symantec/Dominator/lib/flagutil"
"github.com/Symantec/Dominator/lib/format"
)
Expand Down Expand Up @@ -61,7 +62,7 @@ func (lb *LogBuffer) setupFileLogging() error {
}
writeNotifier := make(chan struct{}, 1)
lb.writeNotifier = writeNotifier
go lb.flushWhenIdle(writeNotifier)
go lb.idleMarker(writeNotifier)
return nil
}

Expand Down Expand Up @@ -196,7 +197,7 @@ func (lb *LogBuffer) openNewFile() error {
syscall.Dup2(int(file.Fd()), int(os.Stderr.Fd()))
}
lb.file = file
lb.writer = bufio.NewWriter(file)
lb.writer = bufwriter.NewWriter(file, time.Second)
symlink := path.Join(lb.options.Directory, "latest")
tmpSymlink := symlink + "~"
os.Remove(tmpSymlink)
Expand Down Expand Up @@ -275,23 +276,20 @@ func (lb *LogBuffer) enforceQuota() error {
return nil
}

func (lb *LogBuffer) flushWhenIdle(writeNotifier <-chan struct{}) {
flushTimer := time.NewTimer(time.Second)
func (lb *LogBuffer) idleMarker(writeNotifier <-chan struct{}) {
idleMarkDuration := lb.options.IdleMarkTimeout
if idleMarkDuration < 1 {
idleMarkDuration = time.Hour * 24 * 365 * 280 // Far in the future.
for {
<-writeNotifier
}
}
idleMarkTimer := time.NewTimer(idleMarkDuration)
for {
select {
case <-writeNotifier:
flushTimer.Reset(time.Second)
idleMarkTimer.Reset(idleMarkDuration)
case <-flushTimer.C:
lb.flush()
case <-idleMarkTimer.C:
lb.writeMark()
flushTimer.Reset(time.Second)
idleMarkTimer.Reset(idleMarkDuration)
}
}
Expand Down

0 comments on commit 098c7c2

Please sign in to comment.