Skip to content

Commit

Permalink
Add pace metering package
Browse files Browse the repository at this point in the history
  • Loading branch information
Max Kupriianov committed Jun 2, 2022
1 parent a538c43 commit ce1a630
Show file tree
Hide file tree
Showing 6 changed files with 445 additions and 0 deletions.
51 changes: 51 additions & 0 deletions pkg/pace/default_reporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package pace

import (
"log"
"strconv"
"time"
)

// DefaultReporter reports using log.Printf and stops reporting when flow of events is stoped.
func DefaultReporter() ReporterFunc {
var previous float64
var stalled time.Time
return func(label string, timeframe time.Duration, value float64) {
switch {
case value == 0 && previous == 0:
return // don't report anything
case value == 0 && previous != 0:
dur := timeframe
if !stalled.IsZero() {
dur = time.Since(stalled)
n := dur / timeframe
if dur-n*timeframe < 10*time.Millisecond {
dur = n * timeframe
}
} else {
stalled = time.Now().Add(-dur)
}
log.Printf("%s: stalled for %v", label, dur)
return
default:
previous = value
stalled = time.Time{}
}
floatFmt := func(f float64) string {
return strconv.FormatFloat(f, 'f', 3, 64)
}
switch timeframe {
case time.Second:
log.Printf("%s: %s/s in %v", label, floatFmt(value), timeframe)
case time.Minute:
log.Printf("%s: %s/m in %v", label, floatFmt(value), timeframe)
case time.Hour:
log.Printf("%s: %s/h in %v", label, floatFmt(value), timeframe)
case 24 * time.Hour:
log.Printf("%s: %s/day in %v", label, floatFmt(value), timeframe)
default:
log.Printf("%s %s in %v (pace: %s/s)", floatFmt(value), label,
timeframe, floatFmt(value/(float64(timeframe)/float64(time.Second))))
}
}
}
141 changes: 141 additions & 0 deletions pkg/pace/pace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// Package pace provides a threadsafe counter for measuring ticks in the specified timeframe.
package pace

import (
"sync"
"time"
)

// Pace is a an interface to register ticks, force reporting and pause/resume the meter.
type Pace interface {
// Step increments the counter of pace.
Step(f float64)
// StepN increments the counter of pace, using integer N.
StepN(n int)
// Pause stops reporting until resumed, all steps continue to be counted.
Pause()
// Resume resumes the reporting, starting a report with info since the last tick.
// Specify a new interval or 0 if you don't want to override it.
Resume(interval time.Duration)
// Report manually triggers a report with time frame less than the defined interval.
// Specify a custom reporter function just for this one report.
Report(reporter ReporterFunc)
}

// ReporterFunc defines a function used to report current pace.
type ReporterFunc func(label string, timeframe time.Duration, value float64)

type paceImpl struct {
mux *sync.RWMutex

value float64
label string
paused bool
interval time.Duration
lastTick time.Time
repFn ReporterFunc
t *time.Timer
}

func (p *paceImpl) Step(f float64) {
p.mux.Lock()
p.value += f
p.mux.Unlock()
}

func (p *paceImpl) StepN(n int) {
p.mux.Lock()
p.value += float64(n)
p.mux.Unlock()
}

func (p *paceImpl) Pause() {
p.t.Stop()

p.mux.Lock()
defer p.mux.Unlock()
p.report(nil)

p.paused = true
p.value = 0
p.lastTick = time.Now()
}

func (p *paceImpl) Resume(interval time.Duration) {
p.mux.Lock()
defer p.mux.Unlock()
p.report(nil)

p.paused = false
p.value = 0
p.lastTick = time.Now()
if interval > 0 {
// override the interval if provided
p.interval = interval
}
p.t.Reset(p.interval)
}

func (p *paceImpl) Report(reporter ReporterFunc) {
p.t.Stop()
p.mux.Lock()
defer p.mux.Unlock()
p.report(reporter)

p.value = 0
p.lastTick = time.Now()
if !p.paused {
p.t.Reset(p.interval)
}
}

func (p *paceImpl) report(reporter ReporterFunc) {
if reporter == nil {
reporter = p.repFn
}
timeframe := time.Since(p.lastTick)
if abs(timeframe-p.interval) < 10*time.Millisecond {
timeframe = p.interval
}
label := p.label
value := p.value
reporter(label, timeframe, value)
}

// New creates a new pace meter with provided label and reporting function.
// All ticks (or steps) are aggregated in timeframes specified using interval.
func New(label string, interval time.Duration, repFn ReporterFunc) Pace {
if repFn == nil {
repFn = DefaultReporter()
}
p := &paceImpl{
mux: new(sync.RWMutex),

label: label,
interval: interval,
repFn: repFn,
lastTick: time.Now(),
t: time.NewTimer(interval),
}
go func() {
for range p.t.C {
func() {
p.mux.Lock()
defer p.mux.Unlock()
p.report(nil)

p.value = 0
p.lastTick = time.Now()
p.t.Reset(interval)
}()
}
}()
return p
}

func abs(v time.Duration) time.Duration {
if v < 0 {
return -v
}
return v
}
123 changes: 123 additions & 0 deletions pkg/pace/pace_atomic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package pace

import (
"sync"
"sync/atomic"
"time"
)

type paceAtomic struct {
mux *sync.RWMutex

value int64
label string
paused bool
interval time.Duration
lastTick time.Time
repFn ReporterFunc
t *time.Timer
}

func (p *paceAtomic) Step(f float64) {
panic("Step is not implemented in atomic, use StepN")
}

func (p *paceAtomic) StepN(n int) {
atomic.AddInt64(&p.value, int64(n))
}

func (p *paceAtomic) resetValue() {
atomic.StoreInt64(&p.value, 0)
}

func (p *paceAtomic) Pause() {
p.t.Stop()

p.mux.Lock()
defer p.mux.Unlock()
p.report(nil)

p.paused = true
p.resetValue()
p.lastTick = time.Now()
}

func (p *paceAtomic) Resume(interval time.Duration) {
p.mux.Lock()
defer p.mux.Unlock()
p.report(nil)

p.paused = false
p.resetValue()
p.lastTick = time.Now()
if interval > 0 {
// override the interval if provided
p.interval = interval
}

p.t.Reset(p.interval)
}

func (p *paceAtomic) Report(reporter ReporterFunc) {
p.t.Stop()
p.mux.Lock()
defer p.mux.Unlock()
p.report(reporter)

p.resetValue()
p.lastTick = time.Now()
if !p.paused {
p.t.Reset(p.interval)
}
}

func (p *paceAtomic) report(reporter ReporterFunc) {
if reporter == nil {
reporter = p.repFn
}

timeframe := time.Since(p.lastTick)
if abs(timeframe-p.interval) < 10*time.Millisecond {
timeframe = p.interval
}

reporter(p.label, timeframe, float64(atomic.LoadInt64(&p.value)))
}

// New creates a new pace meter (atomic impl) with provided label and reporting function.
// All ticks (or steps) are aggregated in timeframes specified using interval.
//
// paceAtomic is an implementation that uses atomic primitives
// to manage the counter. Offers 3x performance improvement (6ns vs 18ns) per step,
// at expense of not supporting float steps.
func NewAtomic(label string, interval time.Duration, repFn ReporterFunc) Pace {
if repFn == nil {
repFn = DefaultReporter()
}

p := &paceAtomic{
mux: new(sync.RWMutex),

label: label,
interval: interval,
repFn: repFn,
lastTick: time.Now(),
t: time.NewTimer(interval),
}

go func() {
for range p.t.C {
func() {
p.mux.Lock()
defer p.mux.Unlock()
p.report(nil)

p.resetValue()
p.lastTick = time.Now()
p.t.Reset(interval)
}()
}
}()

return p
}
32 changes: 32 additions & 0 deletions pkg/pace/pace_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package pace

import (
"testing"
"time"
)

func BenchmarkStepN(b *testing.B) {
pace := New("steps", time.Minute, DefaultReporter())
b.ResetTimer()

for i := 0; i < b.N; i++ {
for j := 0; j < 1000; j++ {
pace.StepN(1)
}
}

pace.Pause()
}

func BenchmarkAtomicStepN(b *testing.B) {
pace := NewAtomic("steps", time.Minute, DefaultReporter())
b.ResetTimer()

for i := 0; i < b.N; i++ {
for j := 0; j < 1000; j++ {
pace.StepN(1)
}
}

pace.Pause()
}
46 changes: 46 additions & 0 deletions pkg/pace/pace_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package pace

import (
"log"
"sync"
"testing"
"time"
)

const timeframe = time.Second

func TestSimple(t *testing.T) {
items := make(chan struct{}, 100)
wg := new(sync.WaitGroup)

p := New("items", timeframe, nil)
go func() {
for range items {
wg.Done()
p.Step(1)
}
}()

push := func(interval, duration time.Duration) {
tick := time.NewTicker(interval)
start := time.Now()
for range tick.C {
wg.Add(1)
items <- struct{}{}
if time.Since(start) > duration {
break
}
}
}
push(1*time.Millisecond, 3*time.Second)
push(10*time.Millisecond, 3*time.Second)
push(100*time.Millisecond, 3*time.Second)
push(500*time.Millisecond, 3*time.Second)

wg.Wait()
time.Sleep(3 * time.Second)
push(10*time.Millisecond, 3*time.Second)
time.Sleep(3 * time.Second)
p.Report(nil)
log.Println("done")
}
Loading

0 comments on commit ce1a630

Please sign in to comment.