Skip to content

Commit

Permalink
#55 add max_lines_in_buffer option for input config
Browse files Browse the repository at this point in the history
  • Loading branch information
fstab committed May 1, 2019
1 parent 879e3bc commit 6f7d651
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 50 deletions.
1 change: 1 addition & 0 deletions config/v2/configV2.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type InputConfig struct {
Readall bool `yaml:",omitempty"`
PollIntervalSeconds string `yaml:"poll_interval_seconds,omitempty"` // TODO: Use time.Duration directly
PollInterval time.Duration `yaml:"-"` // parsed version of PollIntervalSeconds
MaxLinesInBuffer int `yaml:"max_lines_in_buffer,omitempty"`
}

type GrokConfig struct {
Expand Down
92 changes: 56 additions & 36 deletions exporter/bufferLoadMetric.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@ type bufferLoadMetric struct {
mutex *sync.Cond
tick *time.Ticker
log logrus.FieldLogger
lineLimitSet bool
}

func NewBufferLoadMetric(log logrus.FieldLogger) *bufferLoadMetric {
func NewBufferLoadMetric(log logrus.FieldLogger, lineLimitSet bool) *bufferLoadMetric {
m := &bufferLoadMetric{
mutex: sync.NewCond(&sync.Mutex{}),
log: log,
mutex: sync.NewCond(&sync.Mutex{}),
log: log,
lineLimitSet: lineLimitSet,
}
return m
}
Expand All @@ -56,31 +58,33 @@ func (m *bufferLoadMetric) start(ticker *time.Ticker, tickProcessed chan struct{
go func() {
var ticksSinceLastLog = 0
for range m.tick.C {
m.mutex.L.Lock()

ticksSinceLastLog++
if ticksSinceLastLog >= 4 { // every minute
if m.min60s > 1000 {
m.log.Warnf("Log lines are written faster than grok_exporter processes them. In the last minute there were constantly more than %d log lines in the buffer waiting to be processed. Check the built-in grok_exporter_lines_processing_time_microseconds_total metric to learn which metric takes most of the processing time.", m.min60s)
func() {
m.mutex.L.Lock()
defer m.mutex.L.Unlock()

ticksSinceLastLog++
if ticksSinceLastLog >= 4 { // every minute
if m.min60s > 1000 && !m.lineLimitSet {
m.log.Warnf("Log lines are written faster than grok_exporter processes them. In the last minute there were constantly more than %d log lines in the buffer waiting to be processed. Check the built-in grok_exporter_lines_processing_time_microseconds_total metric to learn which metric takes most of the processing time.", m.min60s)
}
ticksSinceLastLog = 0
}
ticksSinceLastLog = 0
}

m.bufferLoad.With(minLabel).Set(float64(m.min60s))
m.bufferLoad.With(minLabel).Set(float64(m.min60s))

m.min60s = m.min45s
m.min45s = m.min30s
m.min30s = m.min15s
m.min15s = m.cur
m.min60s = m.min45s
m.min45s = m.min30s
m.min30s = m.min15s
m.min15s = m.cur

m.bufferLoad.With(maxLabel).Set(float64(m.max60s))
m.bufferLoad.With(maxLabel).Set(float64(m.max60s))

m.max60s = m.max45s
m.max45s = m.max30s
m.max30s = m.max15s
m.max15s = m.cur
m.max60s = m.max45s
m.max45s = m.max30s
m.max30s = m.max15s
m.max15s = m.cur

m.mutex.L.Unlock()
}()

if tickProcessed != nil {
tickProcessed <- struct{}{}
Expand All @@ -96,25 +100,19 @@ func (m *bufferLoadMetric) Stop() {

func (m *bufferLoadMetric) Inc() {
m.mutex.L.Lock()
defer m.mutex.L.Unlock()
m.cur++
if m.max15s < m.cur {
m.max15s = m.cur
}
if m.max30s < m.cur {
m.max30s = m.cur
}
if m.max45s < m.cur {
m.max45s = m.cur
}
if m.max60s < m.cur {
m.max60s = m.cur
}
m.mutex.L.Unlock()
m.updateMax()
}

func (m *bufferLoadMetric) Dec() {
m.mutex.L.Lock()
defer m.mutex.L.Unlock()
m.cur--
m.updateMin()
}

func (m *bufferLoadMetric) updateMin() {
if m.min15s > m.cur {
m.min15s = m.cur
}
Expand All @@ -127,5 +125,27 @@ func (m *bufferLoadMetric) Dec() {
if m.min60s > m.cur {
m.min60s = m.cur
}
m.mutex.L.Unlock()
}

func (m *bufferLoadMetric) updateMax() {
if m.max15s < m.cur {
m.max15s = m.cur
}
if m.max30s < m.cur {
m.max30s = m.cur
}
if m.max45s < m.cur {
m.max45s = m.cur
}
if m.max60s < m.cur {
m.max60s = m.cur
}
}

func (m *bufferLoadMetric) Set(value int64) {
m.mutex.L.Lock()
defer m.mutex.L.Unlock()
m.cur = value
m.updateMin()
m.updateMax()
}
2 changes: 1 addition & 1 deletion exporter/bufferLoadMetric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

func TestBufferLoadMetric(t *testing.T) {
m := NewBufferLoadMetric(logrus.New())
m := NewBufferLoadMetric(logrus.New(), false)
c := make(chan time.Time)
tick := &time.Ticker{
C: c,
Expand Down
3 changes: 2 additions & 1 deletion grok_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,5 +291,6 @@ func startTailer(cfg *v2.Config) (fswatcher.FileTailer, error) {
default:
return nil, fmt.Errorf("Config error: Input type '%v' unknown.", cfg.Input.Type)
}
return tailer.BufferedTailerWithMetrics(tail, exporter.NewBufferLoadMetric(logger)), nil
bufferLoadMetric := exporter.NewBufferLoadMetric(logger, cfg.Input.MaxLinesInBuffer > 0)
return tailer.BufferedTailerWithMetrics(tail, bufferLoadMetric, logger, cfg.Input.MaxLinesInBuffer), nil
}
24 changes: 16 additions & 8 deletions tailer/bufferedTailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package tailer

import (
"github.com/fstab/grok_exporter/tailer/fswatcher"
"github.com/sirupsen/logrus"
)

// implements fswatcher.FileTailer
Expand All @@ -37,7 +38,7 @@ func (b *bufferedTailer) Close() {
}

func BufferedTailer(orig fswatcher.FileTailer) fswatcher.FileTailer {
return BufferedTailerWithMetrics(orig, &noopMetric{})
return BufferedTailerWithMetrics(orig, &noopMetric{}, logrus.New(), 0)
}

// Wrapper around a tailer that consumes the lines channel quickly.
Expand Down Expand Up @@ -99,7 +100,7 @@ func BufferedTailer(orig fswatcher.FileTailer) fswatcher.FileTailer {
//
// To minimize the risk, use the buffered tailer to make sure file system events are handled
// as quickly as possible without waiting for the grok patterns to be processed.
func BufferedTailerWithMetrics(orig fswatcher.FileTailer, bufferLoadMetric BufferLoadMetric) fswatcher.FileTailer {
func BufferedTailerWithMetrics(orig fswatcher.FileTailer, bufferLoadMetric BufferLoadMetric, log logrus.FieldLogger, maxLinesInBuffer int) fswatcher.FileTailer {
buffer := NewLineBuffer()
out := make(chan *fswatcher.Line)

Expand All @@ -109,6 +110,11 @@ func BufferedTailerWithMetrics(orig fswatcher.FileTailer, bufferLoadMetric Buffe
for {
line, ok := <-orig.Lines()
if ok {
if maxLinesInBuffer > 0 && buffer.Len() > maxLinesInBuffer-1 {
log.Warnf("Line buffer reached limit of %v lines. Dropping lines in buffer.", maxLinesInBuffer)
buffer.Clear()
bufferLoadMetric.Set(0)
}
buffer.Push(line)
bufferLoadMetric.Inc()
} else {
Expand Down Expand Up @@ -140,14 +146,16 @@ func BufferedTailerWithMetrics(orig fswatcher.FileTailer, bufferLoadMetric Buffe

type BufferLoadMetric interface {
Start()
Inc() // put a log line into the buffer
Dec() // take a log line from the buffer
Inc() // put a log line into the buffer
Dec() // take a log line from the buffer
Set(value int64) // set the current number of lines in the buffer
Stop()
}

type noopMetric struct{}

func (m *noopMetric) Start() {}
func (m *noopMetric) Inc() {}
func (m *noopMetric) Dec() {}
func (m *noopMetric) Stop() {}
func (m *noopMetric) Start() {}
func (m *noopMetric) Inc() {}
func (m *noopMetric) Dec() {}
func (m *noopMetric) Set(value int64) {}
func (m *noopMetric) Stop() {}
11 changes: 9 additions & 2 deletions tailer/bufferedTailer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package tailer
import (
"fmt"
"github.com/fstab/grok_exporter/tailer/fswatcher"
"github.com/sirupsen/logrus"
"math/rand"
"sync"
"testing"
Expand All @@ -25,6 +26,8 @@ import (

const nTestLines = 10000

var log = logrus.New()

type sourceTailer struct {
lines chan *fswatcher.Line
}
Expand All @@ -45,7 +48,7 @@ func (tail *sourceTailer) Close() {
func TestLineBufferSequential_withMetrics(t *testing.T) {
src := &sourceTailer{lines: make(chan *fswatcher.Line)}
metric := &peakLoadMetric{}
buffered := BufferedTailerWithMetrics(src, metric)
buffered := BufferedTailerWithMetrics(src, metric, log, 0)
for i := 1; i <= nTestLines; i++ {
src.lines <- &fswatcher.Line{Line: fmt.Sprintf("This is line number %v.", i)}
}
Expand Down Expand Up @@ -79,7 +82,7 @@ func TestLineBufferSequential_withMetrics(t *testing.T) {
func TestLineBufferParallel_withMetrics(t *testing.T) {
src := &sourceTailer{lines: make(chan *fswatcher.Line)}
metric := &peakLoadMetric{}
buffered := BufferedTailerWithMetrics(src, metric)
buffered := BufferedTailerWithMetrics(src, metric, log, 0)
var wg sync.WaitGroup
go func() {
start := time.Now()
Expand Down Expand Up @@ -150,6 +153,10 @@ func (m *peakLoadMetric) Dec() {
m.currentLoad--
}

func (m *peakLoadMetric) Set(value int64) {
m.currentLoad = value
}

func (m *peakLoadMetric) Stop() {
m.stopCalled = true
}
4 changes: 2 additions & 2 deletions tailer/lineBuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"container/list"
"github.com/fstab/grok_exporter/tailer/fswatcher"
"io"
"log"
logFatal "log"
"sync"
)

Expand Down Expand Up @@ -56,7 +56,7 @@ func (b *lineBufferImpl) BlockingPop() *fswatcher.Line {
return line
default:
// this cannot happen
log.Fatal("unexpected type in tailer b.buffer")
logFatal.Fatal("unexpected type in tailer b.buffer")
}
}
}
Expand Down

0 comments on commit 6f7d651

Please sign in to comment.