diff --git a/pkg/pace/pace.go b/pkg/pace/pace.go new file mode 100644 index 0000000..5f56f82 --- /dev/null +++ b/pkg/pace/pace.go @@ -0,0 +1,139 @@ +// Package pace provides a threadsafe counter for measuring ticks in the specified timeframe. +package pace + +import ( + "context" + "sync" + "sync/atomic" + "time" + + "github.com/pkg/errors" + "go.uber.org/zap" +) + +// Pace is a an interface to register ticks, force reporting and pause/resume the meter. +type Pace interface { + // Step increments the counter of pace. + Step(n int) + // Stop shutdowns reporting, emits a final report for the time passed since previous report. + Stop() +} + +// ReporterFunc defines a function used to report current pace. +type ReporterFunc func(label string, timeframe time.Duration, value int) + +type paceImpl struct { + lastTickMux *sync.RWMutex + lastTick time.Time + + label string + interval time.Duration + value int64 + + reportFn ReporterFunc + defaultLogger *zap.Logger + cancelFn context.CancelFunc + timer *time.Timer +} + +func (p *paceImpl) Step(n int) { + atomic.AddInt64(&p.value, int64(n)) +} + +func (p *paceImpl) resetValue() { + atomic.StoreInt64(&p.value, 0) +} + +func (p *paceImpl) Stop() { + p.cancelFn() +} + +func (p *paceImpl) report() { + timeframe := time.Since(p.lastTick) + if abs(timeframe-p.interval) < 10*time.Millisecond { + timeframe = p.interval + } + + p.reportFn(p.label, timeframe, int(atomic.LoadInt64(&p.value))) +} + +// New creates a new pace meter with provided label and optional reporting function. +// All ticks (or steps) are aggregated in timeframes specified using interval. +// If the reporting function was not provided, ZapReporter will be used as default. +func New(ctx context.Context, label string, interval time.Duration, reportFn ...ReporterFunc) Pace { + p := &paceImpl{ + lastTickMux: new(sync.RWMutex), + lastTick: time.Now(), + + label: label, + interval: interval, + + timer: time.NewTimer(interval), + } + + logger, _ := zap.NewProduction() + p.defaultLogger = logger.With(zap.String("label", p.label)) + + if len(reportFn) > 0 { + p.reportFn = reportFn[0] + } else { + p.reportFn = ZapReporter(p.defaultLogger) + } + + paceCtx, cancelFn := context.WithCancel(ctx) + p.cancelFn = cancelFn + + go p.reportingLoop(paceCtx) + + return p +} + +func (p *paceImpl) reportingLoop(ctx context.Context) { + defer func() { + if v := recover(); v != nil { + if err, ok := v.(error); ok { + p.defaultLogger.With(zap.Error(err)).Warn("pace reportingLoop panicked") + return + } + + p.defaultLogger.With(zap.Error( + errors.Errorf("error: %v", v), + )).Warn("pace reportingLoop panicked") + } + }() + + for { + select { + case <-ctx.Done(): + p.timer.Stop() + + func() { + p.lastTickMux.RLock() + defer p.lastTickMux.RUnlock() + p.report() + }() + + // exits the loop + return + case <-p.timer.C: + func() { + p.lastTickMux.Lock() + defer p.lastTickMux.Unlock() + p.report() + + p.resetValue() + p.lastTick = time.Now() + p.timer.Reset(p.interval) + }() + } + } + +} + +func abs(v time.Duration) time.Duration { + if v < 0 { + return -v + } + + return v +} diff --git a/pkg/pace/pace_test.go b/pkg/pace/pace_test.go new file mode 100644 index 0000000..56aecba --- /dev/null +++ b/pkg/pace/pace_test.go @@ -0,0 +1,50 @@ +package pace + +import ( + "context" + "testing" + "time" +) + +func TestSimple(t *testing.T) { + var ticks int + var stopped bool + + testReportingFn := func(_ string, _ time.Duration, value int) { + ticks += value + + if stopped { + t.Fatalf("not expected to fire after stop") + t.FailNow() + } + } + p := New(context.Background(), "items", 100*time.Millisecond, testReportingFn) + + go func() { + for i := 0; i < 10; i++ { + p.Step(1) + } + }() + + time.Sleep(50 * time.Millisecond) + + if ticks != 0 { + t.Fatalf("expeted ticks: 0, got: %d", ticks) + t.FailNow() + } + + time.Sleep(60 * time.Millisecond) + + if ticks != 10 { + t.Fatalf("expeted ticks: 10, got: %d", ticks) + t.FailNow() + } + + p.Stop() + + time.Sleep(100 * time.Millisecond) + + stopped = true + + time.Sleep(200 * time.Millisecond) +} diff --git a/pkg/pace/zap_reporter.go b/pkg/pace/zap_reporter.go new file mode 100644 index 0000000..d9fd80d --- /dev/null +++ b/pkg/pace/zap_reporter.go @@ -0,0 +1,59 @@ +package pace + +import ( + "strconv" + "time" + + "go.uber.org/zap" +) + +// ZapReporter reports using the provided zap logger and stops reporting when flow of events is stoped. +func ZapReporter(log *zap.Logger) ReporterFunc { + var previous int + var stalled time.Time + + return func(label string, timeframe time.Duration, value int) { + switch { + case value == 0 && previous == 0: + return // don't report anything + case value == 0 && previous != 0: + dur := timeframe + if !stalled.IsZero() { + dur = time.Since(stalled) + n := dur / timeframe + if dur-n*timeframe < 10*time.Millisecond { + dur = n * timeframe + } + } else { + stalled = time.Now().Add(-dur) + } + log.Sugar().Infof("%s: stalled for %v", label, dur) + return + default: + previous = value + stalled = time.Time{} + } + + floatFmt := func(f float64) string { + return strconv.FormatFloat(f, 'f', 3, 64) + } + + intFmt := func(n int) string { + return strconv.FormatInt(int64(n), 10) + } + + switch timeframe { + case time.Second: + log.Sugar().Infof("%s: %s/s in %v", label, intFmt(value), timeframe) + case time.Minute: + log.Sugar().Infof("%s: %s/m in %v", label, intFmt(value), timeframe) + case time.Hour: + log.Sugar().Infof("%s: %s/h in %v", label, intFmt(value), timeframe) + case 24 * time.Hour: + log.Sugar().Infof("%s: %s/day in %v", label, intFmt(value), timeframe) + default: + log.Sugar().Infof("%s %s in %v (pace: %s/s)", intFmt(value), label, + timeframe, floatFmt(float64(value)/(float64(timeframe)/float64(time.Second)))) + } + } +}