forked from openshift/origin
-
Notifications
You must be signed in to change notification settings - Fork 1
/
limiter.go
144 lines (115 loc) · 5.42 KB
/
limiter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package limiter
import (
"sync"
"time"
"github.com/golang/glog"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
)
// HandlerFunc defines function signature for a CoalescingSerializingRateLimiter.
type HandlerFunc func() error
// CoalescingSerializingRateLimiter guarantees that calls will not happen to the given function
// more frequently than the given interval, and it guarantees that only one call will happen at a time.
// The calls are not queued, i.e. if you make 5 calls to RegisterChange(), it does not guarantee that the
// handler will be invoked 5 times, it merely guarantees it will be invoked once, and no more often than
// the rate.
// The calls to the handler will happen in the background and are expected to do their own locking if needed.
type CoalescingSerializingRateLimiter struct {
// handlerFunc is the function to rate limit and seriaize calls to.
handlerFunc HandlerFunc
// callInterval is the minimum time between the starts of handler calls.
callInterval time.Duration
// lastStart is the time the last run of the handler started.
lastStart time.Time
// changeReqTime is nil if no change has been registered since the last handler run completed, otherwise it is the
// time last change was registered.
changeReqTime *time.Time
// handlerRunning indicates whether the Handler is actively running.
handlerRunning bool
// lock protects the CoalescingSerializingRateLimiter structure from multiple threads manipulating it at once.
lock sync.Mutex
// callbackTimer is the timer we use to make callbacks to re-run the function to decide if we need to do work.
callbackTimer *time.Timer
}
func NewCoalescingSerializingRateLimiter(interval time.Duration, handlerFunc HandlerFunc) *CoalescingSerializingRateLimiter {
limiter := &CoalescingSerializingRateLimiter{
handlerFunc: handlerFunc,
callInterval: interval,
lastStart: time.Time{},
changeReqTime: nil,
handlerRunning: false,
}
return limiter
}
// RegisterChange() indicates that the rate limited function should be called. It may not immediately run it, but it will cause it to run within
// the ReloadInterval. It will always immediately return, the function will be run in the background. Not every call to RegisterChange() will
// result in the function getting called. If it is called repeatedly while it is still within the ReloadInterval since the last run, it will
// only run once when the time allows it.
func (csrl *CoalescingSerializingRateLimiter) RegisterChange() {
glog.V(8).Infof("RegisterChange called")
csrl.changeWorker(true)
}
func (csrl *CoalescingSerializingRateLimiter) changeWorker(userChanged bool) {
csrl.lock.Lock()
defer csrl.lock.Unlock()
glog.V(8).Infof("changeWorker called")
if userChanged && csrl.changeReqTime == nil {
// They just registered a change manually (and we aren't in the middle of a change)
now := time.Now()
csrl.changeReqTime = &now
}
if csrl.handlerRunning {
// We don't need to do anything else... there's a run in progress, and when it is done it will re-call this function at which point the work will then happen
glog.V(8).Infof("The handler was already running (%v) started at %s, returning from the worker", csrl.handlerRunning, csrl.lastStart.String())
return
}
if csrl.changeReqTime == nil {
// There's no work queued so we have nothing to do. We should only get here when
// the function is re-called after a reload
glog.V(8).Infof("No invoke requested time, so there's no queued work. Nothing to do.")
return
}
// There is no handler running, let's see if we should run yet, or schedule a callback
now := time.Now()
sinceLastRun := now.Sub(csrl.lastStart)
untilNextCallback := csrl.callInterval - sinceLastRun
glog.V(8).Infof("Checking reload; now: %v, lastStart: %v, sinceLast %v, limit %v, remaining %v", now, csrl.lastStart, sinceLastRun, csrl.callInterval, untilNextCallback)
if untilNextCallback > 0 {
// We want to reload... but can't yet because some window is not satisfied
if csrl.callbackTimer == nil {
csrl.callbackTimer = time.AfterFunc(untilNextCallback, func() { csrl.changeWorker(false) })
} else {
// While we are resetting the timer, it should have fired and be stopped.
// The first time the worker is called it will know the precise duration
// until when a run would be valid and has scheduled a timer for that point
csrl.callbackTimer.Reset(untilNextCallback)
}
glog.V(8).Infof("Can't invoke the handler yet, need to delay %s, callback scheduled", untilNextCallback.String())
return
}
// Otherwise we can reload immediately... let's do it!
glog.V(8).Infof("Calling the handler function (for invoke time %v)", csrl.changeReqTime)
csrl.handlerRunning = true
csrl.changeReqTime = nil
csrl.lastStart = now
// Go run the handler so we don't block the caller
go csrl.runHandler()
return
}
func (csrl *CoalescingSerializingRateLimiter) runHandler() {
// Call the handler, but do it in its own function so we can cleanup in case the handler panics
runHandler := func() error {
defer func() {
csrl.lock.Lock()
csrl.handlerRunning = false
csrl.lock.Unlock()
}()
return csrl.handlerFunc()
}
if err := runHandler(); err != nil {
utilruntime.HandleError(err)
}
// Re-call the commit in case there is work waiting that came in while we were working
// we want to call the top level commit in case the state has not changed
glog.V(8).Infof("Re-Calling the worker after a reload in case work came in")
csrl.changeWorker(false)
}