From ce1a630d9698c8a684736d22f773de46fd91939d Mon Sep 17 00:00:00 2001 From: Max Kupriianov Date: Thu, 2 Jun 2022 14:13:14 +0200 Subject: [PATCH] Add pace metering package --- pkg/pace/default_reporter.go | 51 +++++++++++++ pkg/pace/pace.go | 141 +++++++++++++++++++++++++++++++++++ pkg/pace/pace_atomic.go | 123 ++++++++++++++++++++++++++++++ pkg/pace/pace_bench_test.go | 32 ++++++++ pkg/pace/pace_test.go | 46 ++++++++++++ pkg/pace/zap_reporter.go | 52 +++++++++++++ 6 files changed, 445 insertions(+) create mode 100644 pkg/pace/default_reporter.go create mode 100644 pkg/pace/pace.go create mode 100644 pkg/pace/pace_atomic.go create mode 100644 pkg/pace/pace_bench_test.go create mode 100644 pkg/pace/pace_test.go create mode 100644 pkg/pace/zap_reporter.go diff --git a/pkg/pace/default_reporter.go b/pkg/pace/default_reporter.go new file mode 100644 index 0000000..4302d86 --- /dev/null +++ b/pkg/pace/default_reporter.go @@ -0,0 +1,51 @@ +package pace + +import ( + "log" + "strconv" + "time" +) + +// DefaultReporter reports using log.Printf and stops reporting when flow of events is stoped. +func DefaultReporter() ReporterFunc { + var previous float64 + var stalled time.Time + return func(label string, timeframe time.Duration, value float64) { + switch { + case value == 0 && previous == 0: + return // don't report anything + case value == 0 && previous != 0: + dur := timeframe + if !stalled.IsZero() { + dur = time.Since(stalled) + n := dur / timeframe + if dur-n*timeframe < 10*time.Millisecond { + dur = n * timeframe + } + } else { + stalled = time.Now().Add(-dur) + } + log.Printf("%s: stalled for %v", label, dur) + return + default: + previous = value + stalled = time.Time{} + } + floatFmt := func(f float64) string { + return strconv.FormatFloat(f, 'f', 3, 64) + } + switch timeframe { + case time.Second: + log.Printf("%s: %s/s in %v", label, floatFmt(value), timeframe) + case time.Minute: + log.Printf("%s: %s/m in %v", label, floatFmt(value), timeframe) + case time.Hour: + log.Printf("%s: %s/h in %v", label, floatFmt(value), timeframe) + case 24 * time.Hour: + log.Printf("%s: %s/day in %v", label, floatFmt(value), timeframe) + default: + log.Printf("%s %s in %v (pace: %s/s)", floatFmt(value), label, + timeframe, floatFmt(value/(float64(timeframe)/float64(time.Second)))) + } + } +} diff --git a/pkg/pace/pace.go b/pkg/pace/pace.go new file mode 100644 index 0000000..426716c --- /dev/null +++ b/pkg/pace/pace.go @@ -0,0 +1,141 @@ +// Package pace provides a threadsafe counter for measuring ticks in the specified timeframe. +package pace + +import ( + "sync" + "time" +) + +// Pace is a an interface to register ticks, force reporting and pause/resume the meter. +type Pace interface { + // Step increments the counter of pace. + Step(f float64) + // StepN increments the counter of pace, using integer N. + StepN(n int) + // Pause stops reporting until resumed, all steps continue to be counted. + Pause() + // Resume resumes the reporting, starting a report with info since the last tick. + // Specify a new interval or 0 if you don't want to override it. + Resume(interval time.Duration) + // Report manually triggers a report with time frame less than the defined interval. + // Specify a custom reporter function just for this one report. + Report(reporter ReporterFunc) +} + +// ReporterFunc defines a function used to report current pace. +type ReporterFunc func(label string, timeframe time.Duration, value float64) + +type paceImpl struct { + mux *sync.RWMutex + + value float64 + label string + paused bool + interval time.Duration + lastTick time.Time + repFn ReporterFunc + t *time.Timer +} + +func (p *paceImpl) Step(f float64) { + p.mux.Lock() + p.value += f + p.mux.Unlock() +} + +func (p *paceImpl) StepN(n int) { + p.mux.Lock() + p.value += float64(n) + p.mux.Unlock() +} + +func (p *paceImpl) Pause() { + p.t.Stop() + + p.mux.Lock() + defer p.mux.Unlock() + p.report(nil) + + p.paused = true + p.value = 0 + p.lastTick = time.Now() +} + +func (p *paceImpl) Resume(interval time.Duration) { + p.mux.Lock() + defer p.mux.Unlock() + p.report(nil) + + p.paused = false + p.value = 0 + p.lastTick = time.Now() + if interval > 0 { + // override the interval if provided + p.interval = interval + } + p.t.Reset(p.interval) +} + +func (p *paceImpl) Report(reporter ReporterFunc) { + p.t.Stop() + p.mux.Lock() + defer p.mux.Unlock() + p.report(reporter) + + p.value = 0 + p.lastTick = time.Now() + if !p.paused { + p.t.Reset(p.interval) + } +} + +func (p *paceImpl) report(reporter ReporterFunc) { + if reporter == nil { + reporter = p.repFn + } + timeframe := time.Since(p.lastTick) + if abs(timeframe-p.interval) < 10*time.Millisecond { + timeframe = p.interval + } + label := p.label + value := p.value + reporter(label, timeframe, value) +} + +// New creates a new pace meter with provided label and reporting function. +// All ticks (or steps) are aggregated in timeframes specified using interval. +func New(label string, interval time.Duration, repFn ReporterFunc) Pace { + if repFn == nil { + repFn = DefaultReporter() + } + p := &paceImpl{ + mux: new(sync.RWMutex), + + label: label, + interval: interval, + repFn: repFn, + lastTick: time.Now(), + t: time.NewTimer(interval), + } + go func() { + for range p.t.C { + func() { + p.mux.Lock() + defer p.mux.Unlock() + p.report(nil) + + p.value = 0 + p.lastTick = time.Now() + p.t.Reset(interval) + }() + } + }() + return p +} + +func abs(v time.Duration) time.Duration { + if v < 0 { + return -v + } + return v +} diff --git a/pkg/pace/pace_atomic.go b/pkg/pace/pace_atomic.go new file mode 100644 index 0000000..e7c06e9 --- /dev/null +++ b/pkg/pace/pace_atomic.go @@ -0,0 +1,123 @@ +package pace + +import ( + "sync" + "sync/atomic" + "time" +) + +type paceAtomic struct { + mux *sync.RWMutex + + value int64 + label string + paused bool + interval time.Duration + lastTick time.Time + repFn ReporterFunc + t *time.Timer +} + +func (p *paceAtomic) Step(f float64) { + panic("Step is not implemented in atomic, use StepN") +} + +func (p *paceAtomic) StepN(n int) { + atomic.AddInt64(&p.value, int64(n)) +} + +func (p *paceAtomic) resetValue() { + atomic.StoreInt64(&p.value, 0) +} + +func (p *paceAtomic) Pause() { + p.t.Stop() + + p.mux.Lock() + defer p.mux.Unlock() + p.report(nil) + + p.paused = true + p.resetValue() + p.lastTick = time.Now() +} + +func (p *paceAtomic) Resume(interval time.Duration) { + p.mux.Lock() + defer p.mux.Unlock() + p.report(nil) + + p.paused = false + p.resetValue() + p.lastTick = time.Now() + if interval > 0 { + // override the interval if provided + p.interval = interval + } + + p.t.Reset(p.interval) +} + +func (p *paceAtomic) Report(reporter ReporterFunc) { + p.t.Stop() + p.mux.Lock() + defer p.mux.Unlock() + p.report(reporter) + + p.resetValue() + p.lastTick = time.Now() + if !p.paused { + p.t.Reset(p.interval) + } +} + +func (p *paceAtomic) report(reporter ReporterFunc) { + if reporter == nil { + reporter = p.repFn + } + + timeframe := time.Since(p.lastTick) + if abs(timeframe-p.interval) < 10*time.Millisecond { + timeframe = p.interval + } + + reporter(p.label, timeframe, float64(atomic.LoadInt64(&p.value))) +} + +// New creates a new pace meter (atomic impl) with provided label and reporting function. +// All ticks (or steps) are aggregated in timeframes specified using interval. +// +// paceAtomic is an implementation that uses atomic primitives +// to manage the counter. Offers 3x performance improvement (6ns vs 18ns) per step, +// at expense of not supporting float steps. +func NewAtomic(label string, interval time.Duration, repFn ReporterFunc) Pace { + if repFn == nil { + repFn = DefaultReporter() + } + + p := &paceAtomic{ + mux: new(sync.RWMutex), + + label: label, + interval: interval, + repFn: repFn, + lastTick: time.Now(), + t: time.NewTimer(interval), + } + + go func() { + for range p.t.C { + func() { + p.mux.Lock() + defer p.mux.Unlock() + p.report(nil) + + p.resetValue() + p.lastTick = time.Now() + p.t.Reset(interval) + }() + } + }() + + return p +} diff --git a/pkg/pace/pace_bench_test.go b/pkg/pace/pace_bench_test.go new file mode 100644 index 0000000..454aa43 --- /dev/null +++ b/pkg/pace/pace_bench_test.go @@ -0,0 +1,32 @@ +package pace + +import ( + "testing" + "time" +) + +func BenchmarkStepN(b *testing.B) { + pace := New("steps", time.Minute, DefaultReporter()) + b.ResetTimer() + + for i := 0; i < b.N; i++ { + for j := 0; j < 1000; j++ { + pace.StepN(1) + } + } + + pace.Pause() +} + +func BenchmarkAtomicStepN(b *testing.B) { + pace := NewAtomic("steps", time.Minute, DefaultReporter()) + b.ResetTimer() + + for i := 0; i < b.N; i++ { + for j := 0; j < 1000; j++ { + pace.StepN(1) + } + } + + pace.Pause() +} diff --git a/pkg/pace/pace_test.go b/pkg/pace/pace_test.go new file mode 100644 index 0000000..29cd733 --- /dev/null +++ b/pkg/pace/pace_test.go @@ -0,0 +1,46 @@ +package pace + +import ( + "log" + "sync" + "testing" + "time" +) + +const timeframe = time.Second + +func TestSimple(t *testing.T) { + items := make(chan struct{}, 100) + wg := new(sync.WaitGroup) + + p := New("items", timeframe, nil) + go func() { + for range items { + wg.Done() + p.Step(1) + } + }() + + push := func(interval, duration time.Duration) { + tick := time.NewTicker(interval) + start := time.Now() + for range tick.C { + wg.Add(1) + items <- struct{}{} + if time.Since(start) > duration { + break + } + } + } + push(1*time.Millisecond, 3*time.Second) + push(10*time.Millisecond, 3*time.Second) + push(100*time.Millisecond, 3*time.Second) + push(500*time.Millisecond, 3*time.Second) + + wg.Wait() + time.Sleep(3 * time.Second) + push(10*time.Millisecond, 3*time.Second) + time.Sleep(3 * time.Second) + p.Report(nil) + log.Println("done") +} diff --git a/pkg/pace/zap_reporter.go b/pkg/pace/zap_reporter.go new file mode 100644 index 0000000..0016345 --- /dev/null +++ b/pkg/pace/zap_reporter.go @@ -0,0 +1,52 @@ +package pace + +import ( + "strconv" + "time" + + "go.uber.org/zap" +) + +// ZapReporter reports using the provided zap logger and stops reporting when flow of events is stoped. +func ZapReporter(log *zap.Logger) ReporterFunc { + var previous float64 + var stalled time.Time + return func(label string, timeframe time.Duration, value float64) { + switch { + case value == 0 && previous == 0: + return // don't report anything + case value == 0 && previous != 0: + dur := timeframe + if !stalled.IsZero() { + dur = time.Since(stalled) + n := dur / timeframe + if dur-n*timeframe < 10*time.Millisecond { + dur = n * timeframe + } + } else { + stalled = time.Now().Add(-dur) + } + log.Sugar().Infof("%s: stalled for %v", label, dur) + return + default: + previous = value + stalled = time.Time{} + } + floatFmt := func(f float64) string { + return strconv.FormatFloat(f, 'f', 3, 64) + } + switch timeframe { + case time.Second: + log.Sugar().Infof("%s: %s/s in %v", label, floatFmt(value), timeframe) + case time.Minute: + log.Sugar().Infof("%s: %s/m in %v", label, floatFmt(value), timeframe) + case time.Hour: + log.Sugar().Infof("%s: %s/h in %v", label, floatFmt(value), timeframe) + case 24 * time.Hour: + log.Sugar().Infof("%s: %s/day in %v", label, floatFmt(value), timeframe) + default: + log.Sugar().Infof("%s %s in %v (pace: %s/s)", floatFmt(value), label, + timeframe, floatFmt(value/(float64(timeframe)/float64(time.Second)))) + } + } +}