-
Notifications
You must be signed in to change notification settings - Fork 24
/
windowed-counter.go
113 lines (97 loc) · 3.5 KB
/
windowed-counter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package scheduler
import (
"time"
"github.com/jonboulle/clockwork"
)
type tokenCounts struct {
tokens uint64
}
// WindowedCounter is a token bucket with a windowed counter.
type WindowedCounter struct {
// stats
nextSlotTime time.Time // Time when we advance the slot
counters []tokenCounts // Window of counters
slotDuration time.Duration // time duration of slot
totalSlots uint8 // total slots in sliding window
currentSlot uint8 // currentSlot being updated for counters
clk clockwork.Clock
bootstrapping bool
}
// NewWindowedCounter creates a new WindowedCounter with extra slot for the current window.
func NewWindowedCounter(clk clockwork.Clock, totalSlots uint8, slotDuration time.Duration) *WindowedCounter {
counter := &WindowedCounter{}
counter.clk = clk
// create an extra slot for aggregating current window
counter.totalSlots = totalSlots + 1
counter.slotDuration = slotDuration
counter.counters = make([]tokenCounts, counter.totalSlots)
counter.currentSlot = 0
counter.nextSlotTime = counter.clk.Now().Add(counter.slotDuration)
counter.bootstrapping = true
return counter
}
// CalculateTokenRate returns the calculated token rate in the current window.
func (counter *WindowedCounter) CalculateTokenRate() float64 {
var total uint64
// calculate total (ignoring the currentSlot)
for i := uint8(0); i < counter.totalSlots; i++ {
if i != counter.currentSlot {
total += counter.counters[i].tokens
}
}
// recalculate tokenRate
return float64(total) * 1e9 / float64(int64(counter.totalSlots-1)*int64(counter.slotDuration))
}
// IsBootstrapping checks whether the counter is in bootstrapping mode.
func (counter *WindowedCounter) IsBootstrapping() bool {
return counter.bootstrapping
}
// AddTokens to the counter. Return value is true when counter shifted slots and the all the slots in the counter is valid.
func (counter *WindowedCounter) AddTokens(request *Request) bool {
now := counter.clk.Now()
shifted := false
if now.After(counter.nextSlotTime) {
// we are going to shift slots
shifted = true
delta := now.Sub(counter.nextSlotTime)
ticks := int64(int64(delta) / int64(counter.slotDuration))
// advance ticks by 1 slot
ticks++
// reset nextSlotTime
counter.nextSlotTime = counter.nextSlotTime.Add(time.Duration((ticks) * int64(counter.slotDuration)))
// If entire window is invalid avoid unnecessary loops
if ticks > int64(counter.totalSlots) {
// fast forward totalSlots
ticks = int64(counter.totalSlots)
}
for i := int64(0); i < ticks; i++ {
counter.currentSlot++
if counter.currentSlot == counter.totalSlots {
// reset to first slot
counter.currentSlot = 0
if counter.bootstrapping {
// This is the first time window has filled, make the tokenRate valid
// The actual value will be calculated outside for loop
counter.bootstrapping = false
}
}
// reset slot counter
counter.counters[counter.currentSlot].tokens = 0
}
// If entire window was invalidated, it is better to go back to the bootstrap mode
// Traffic might have restarted after an outage so we should not drop excessive traffic because
// tokenRate will be calculated with incomplete counts
if ticks >= int64(counter.totalSlots) {
counter.currentSlot = 0
counter.bootstrapping = true
shifted = false
}
}
// Increment counter
counter.counters[counter.currentSlot].tokens += request.Tokens
if shifted && !counter.bootstrapping {
return true
}
// still bootstrapping or did not shift slots yet
return false
}