/
tick.go
127 lines (110 loc) · 3.37 KB
/
tick.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package tick
import (
"math"
"sync"
"time"
)
var WindowCap = 10000
var CleanPct = 0.1
type window struct {
tickCount uint64
}
// MovingAverage collates ticks and determines the moving
// average of the collated ticks. It collects ticks within
// fixed time window, then uses the count of ticks in the
// windows to calculate the moving average.
type MovingAverage struct {
sync.Mutex
currentWindow *window
currentWindowEndTime time.Time
concludeWindows []*window
windowDur time.Duration
}
// NewMovingAverage creates an instance of MovingAverage.
// windowDur is the lifespan of a window
func NewMovingAverage(windowDur time.Duration) *MovingAverage {
return &MovingAverage{
windowDur: windowDur,
}
}
// Tick increments the tick count of the current window
// and creates a new window if the current window has expired
func (m *MovingAverage) Tick() {
m.Lock()
defer m.Unlock()
now := time.Now()
// Create a new window if no current window is active
if m.currentWindow == nil {
m.currentWindow = new(window)
m.currentWindowEndTime = time.Now().Add(m.windowDur)
}
// When the current window expires, add it to the
// list of concluded windows and create a new window
if m.currentWindowEndTime.Before(now) {
m.concludeWindows = append(m.concludeWindows, m.currentWindow)
m.currentWindow = new(window)
m.currentWindowEndTime = time.Now().Add(m.windowDur)
}
// Increment window's tick count
m.currentWindow.tickCount++
m.clean()
}
// clean removes older windows from the slice of
// concluded windows. When the number of concluded
// windows equal the cap, 10% of the older windows
// are dropped.
// Note: Not thread-safe. Must be called with lock acquired
func (m *MovingAverage) clean() {
if curLen := len(m.concludeWindows); curLen == WindowCap {
var tenPct = math.Round(CleanPct * float64(WindowCap))
m.concludeWindows = m.concludeWindows[int(tenPct):]
}
}
// Averages calculates the dur moving averages.
// dur must be divisible by the initial window duration
// without remainder. E.g if initial window duration is
// 10 seconds, then 10, 20, 30, 40 are valid values
func (m *MovingAverage) Averages(dur time.Duration) []float64 {
m.Lock()
defer m.Unlock()
if math.Mod(dur.Seconds(), m.windowDur.Seconds()) != 0 {
panic("dur must be divisible by initial window duration without remainder")
}
slideCount := dur.Seconds() / m.windowDur.Seconds()
nConcludedWin := len(m.concludeWindows)
averages := []float64{}
for i, w := range m.concludeWindows {
curSlideWindows := []*window{w}
slideEnd := i + int(slideCount) - 1
next := i + 1
for next < nConcludedWin {
curSlideWindows = append(curSlideWindows, m.concludeWindows[next])
if next == slideEnd {
break
}
next++
}
slideSum := uint64(0)
for _, _w := range curSlideWindows {
slideSum += _w.tickCount
}
averages = append(averages, float64(slideSum)/float64(len(curSlideWindows)))
}
return averages
}
// Average calculates the average of all derived
// moving averages of dur. dur must be divisible by the
// initial window duration without remainder. E.g if
// initial window duration is 10 seconds, then 10, 20,
// 30, 40 are valid values
func (m *MovingAverage) Average(dur time.Duration) float64 {
averages := m.Averages(dur)
if len(averages) == 0 {
return 0
}
sum := 0.0
for _, avg := range averages {
sum += avg
}
return sum / float64(len(averages))
}