Skip to content

Commit

Permalink
#55 prepare max number of lines in lineBuffer
Browse files Browse the repository at this point in the history
  • Loading branch information
fstab committed Apr 28, 2019
1 parent d2a94db commit 879e3bc
Show file tree
Hide file tree
Showing 6 changed files with 254 additions and 61 deletions.
42 changes: 10 additions & 32 deletions tailer/bufferedTailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,16 @@
package tailer

import (
"container/list"
"github.com/fstab/grok_exporter/tailer/fswatcher"
"log"
"sync"
)

// implements fswatcher.FileTailer
type bufferedTailer struct {
out chan fswatcher.Line
out chan *fswatcher.Line
orig fswatcher.FileTailer
}

func (b *bufferedTailer) Lines() chan fswatcher.Line {
func (b *bufferedTailer) Lines() chan *fswatcher.Line {
return b.out
}

Expand Down Expand Up @@ -103,26 +100,19 @@ func BufferedTailer(orig fswatcher.FileTailer) fswatcher.FileTailer {
// To minimize the risk, use the buffered tailer to make sure file system events are handled
// as quickly as possible without waiting for the grok patterns to be processed.
func BufferedTailerWithMetrics(orig fswatcher.FileTailer, bufferLoadMetric BufferLoadMetric) fswatcher.FileTailer {
buffer := list.New()
bufferSync := sync.NewCond(&sync.Mutex{}) // coordinate producer and consumer
out := make(chan fswatcher.Line)
buffer := NewLineBuffer()
out := make(chan *fswatcher.Line)

// producer
go func() {
bufferLoadMetric.Start()
for {
line, ok := <-orig.Lines()
if ok {
bufferSync.L.Lock()
buffer.PushBack(line)
bufferSync.Signal()
bufferSync.L.Unlock()
buffer.Push(line)
bufferLoadMetric.Inc()
} else {
bufferSync.L.Lock()
buffer = nil // make the consumer quit
bufferSync.Signal()
bufferSync.L.Unlock()
buffer.Close()
bufferLoadMetric.Stop()
return
}
Expand All @@ -132,26 +122,14 @@ func BufferedTailerWithMetrics(orig fswatcher.FileTailer, bufferLoadMetric Buffe
// consumer
go func() {
for {
bufferSync.L.Lock()
for buffer != nil && buffer.Len() == 0 {
bufferSync.Wait()
}
if buffer == nil {
bufferSync.L.Unlock()
line := buffer.BlockingPop()
if line == nil {
// buffer closed
close(out)
return
}
first := buffer.Front()
buffer.Remove(first)
bufferSync.L.Unlock()
bufferLoadMetric.Dec()
switch line := first.Value.(type) {
case fswatcher.Line:
out <- line
default:
// this cannot happen
log.Fatal("unexpected type in tailer buffer")
}
out <- line
}
}()
return &bufferedTailer{
Expand Down
40 changes: 20 additions & 20 deletions tailer/bufferedTailer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ import (
"time"
)

const nTestLines = 10000

type sourceTailer struct {
lines chan fswatcher.Line
lines chan *fswatcher.Line
}

func (tail *sourceTailer) Lines() chan fswatcher.Line {
func (tail *sourceTailer) Lines() chan *fswatcher.Line {
return tail.lines
}

Expand All @@ -39,22 +41,20 @@ func (tail *sourceTailer) Close() {
close(tail.lines)
}

// First produce 10,000 lines, then consume 10,000 lines.
func TestLineBufferSequential(t *testing.T) {
src := &sourceTailer{lines: make(chan fswatcher.Line)}
// TODO: As we separated lineBuffer and the metrics, this test is now partially copy-and-paste from lineBuffer_test
func TestLineBufferSequential_withMetrics(t *testing.T) {
src := &sourceTailer{lines: make(chan *fswatcher.Line)}
metric := &peakLoadMetric{}
buffered := BufferedTailerWithMetrics(src, metric)
for i := 1; i <= 10000; i++ {
src.lines <- fswatcher.Line{Line: fmt.Sprintf("This is line number %v.", i)}
for i := 1; i <= nTestLines; i++ {
src.lines <- &fswatcher.Line{Line: fmt.Sprintf("This is line number %v.", i)}
}
for i := 1; i <= 10000; i++ {
for i := 1; i <= nTestLines; i++ {
line := <-buffered.Lines()
if line.Line != fmt.Sprintf("This is line number %v.", i) {
t.Errorf("Expected 'This is line number %v', but got '%v'.", i, line)
}
}
// wait until peak load is observed (buffered tailer observes the max of each 1 Sec interval)
time.Sleep(1100 * time.Millisecond)
buffered.Close()
_, stillOpen := <-buffered.Lines()
if stillOpen {
Expand All @@ -70,21 +70,21 @@ func TestLineBufferSequential(t *testing.T) {
if !metric.stopCalled {
t.Error("metric.Stop() not called.")
}
// The peak load should be 9999 or 9998, depending on how quick
// The peak load should be 1 or two less than nTestLines, depending on how quick
// the consumer loop started reading
fmt.Printf("peak load: %v\n", metric.peakLoad)
fmt.Printf("peak load (should be 1 or 2 less than %v): %v\n", nTestLines, metric.peakLoad)
}

// Produce and consume in parallel.
func TestLineBufferParallel(t *testing.T) {
src := &sourceTailer{lines: make(chan fswatcher.Line)}
// TODO: As we separated lineBuffer and the metrics, this test is now partially copy-and-paste from lineBuffer_test
func TestLineBufferParallel_withMetrics(t *testing.T) {
src := &sourceTailer{lines: make(chan *fswatcher.Line)}
metric := &peakLoadMetric{}
buffered := BufferedTailerWithMetrics(src, metric)
var wg sync.WaitGroup
go func() {
start := time.Now()
for i := 1; i <= 10000; i++ {
src.lines <- fswatcher.Line{Line: fmt.Sprintf("This is line number %v.", i)}
for i := 1; i <= nTestLines; i++ {
src.lines <- &fswatcher.Line{Line: fmt.Sprintf("This is line number %v.", i)}
if rand.Int()%64 == 0 { // Sleep from time to time
time.Sleep(10 * time.Millisecond)
}
Expand All @@ -94,7 +94,7 @@ func TestLineBufferParallel(t *testing.T) {
}()
go func() {
start := time.Now()
for i := 1; i <= 10000; i++ {
for i := 1; i <= nTestLines; i++ {
line := <-buffered.Lines()
if line.Line != fmt.Sprintf("This is line number %v.", i) {
t.Errorf("Expected 'This is line number %v', but got '%v'.", i, line)
Expand Down Expand Up @@ -125,8 +125,8 @@ func TestLineBufferParallel(t *testing.T) {
if !metric.stopCalled {
t.Error("metric.Unregister() not called.")
}
// Should be much less than 10000, because consumer and producer work in parallel.
fmt.Printf("peak load: %v\n", metric.peakLoad)
// Should be much less than nTestLines, because consumer and producer work in parallel.
fmt.Printf("peak load (should be an order of magnitude less than %v): %v\n", nTestLines, metric.peakLoad)
}

type peakLoadMetric struct {
Expand Down
10 changes: 5 additions & 5 deletions tailer/fswatcher/fswatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
)

type FileTailer interface {
Lines() chan Line
Lines() chan *Line
Errors() chan Error
Close()
}
Expand All @@ -51,7 +51,7 @@ type fileTailer struct {
watchedDirs []*Dir
watchedFiles map[string]*fileWithReader // path -> fileWithReader
osSpecific fswatcher
lines chan Line
lines chan *Line
errors chan Error
done chan struct{}
}
Expand All @@ -78,7 +78,7 @@ type fileMeta interface {
Name() string
}

func (t *fileTailer) Lines() chan Line {
func (t *fileTailer) Lines() chan *Line {
return t.lines
}

Expand Down Expand Up @@ -116,7 +116,7 @@ func runFileTailer(initFunc func() (fswatcher, Error), globs []glob.Glob, readal
t = &fileTailer{
globs: globs,
watchedFiles: make(map[string]*fileWithReader),
lines: make(chan Line),
lines: make(chan *Line),
errors: make(chan Error),
done: make(chan struct{}),
}
Expand Down Expand Up @@ -337,7 +337,7 @@ func (t *fileTailer) readNewLines(file *fileWithReader, log logrus.FieldLogger)
select {
case <-t.done:
return nil
case t.lines <- Line{Line: line, File: file.file.Name()}:
case t.lines <- &Line{Line: line, File: file.file.Name()}:
}
}
}
Expand Down
86 changes: 86 additions & 0 deletions tailer/lineBuffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package tailer

import (
"container/list"
"github.com/fstab/grok_exporter/tailer/fswatcher"
"io"
"log"
"sync"
)

// lineBuffer is a thread safe queue for *fswatcher.Line.
type lineBuffer interface {
Push(line *fswatcher.Line)
BlockingPop() *fswatcher.Line // can be interrupted by calling Close()
Len() int
io.Closer // will interrupt BlockingPop()
Clear()
}

func NewLineBuffer() lineBuffer {
return &lineBufferImpl{
buffer: list.New(),
lock: sync.NewCond(&sync.Mutex{}),
closed: false,
}
}

type lineBufferImpl struct {
buffer *list.List
lock *sync.Cond
closed bool
}

func (b *lineBufferImpl) Push(line *fswatcher.Line) {
b.lock.L.Lock()
defer b.lock.L.Unlock()
if !b.closed {
b.buffer.PushBack(line)
b.lock.Signal()
}
}

// Interrupted by Close(), returns nil when Close() is called.
func (b *lineBufferImpl) BlockingPop() *fswatcher.Line {
b.lock.L.Lock()
defer b.lock.L.Unlock()
if !b.closed {
for b.buffer.Len() == 0 && !b.closed {
b.lock.Wait()
}
if !b.closed {
first := b.buffer.Front()
b.buffer.Remove(first)
switch line := first.Value.(type) {
case *fswatcher.Line:
return line
default:
// this cannot happen
log.Fatal("unexpected type in tailer b.buffer")
}
}
}
return nil
}

func (b *lineBufferImpl) Close() error {
b.lock.L.Lock()
defer b.lock.L.Unlock()
if !b.closed {
b.closed = true
b.lock.Signal()
}
return nil
}

func (b *lineBufferImpl) Len() int {
b.lock.L.Lock()
defer b.lock.L.Unlock()
return b.buffer.Len()
}

func (b *lineBufferImpl) Clear() {
b.lock.L.Lock()
defer b.lock.L.Unlock()
b.buffer = list.New()
}

0 comments on commit 879e3bc

Please sign in to comment.