forked from vitessio/vitess
-
Notifications
You must be signed in to change notification settings - Fork 0
/
thread_throttler.go
122 lines (108 loc) · 4.61 KB
/
thread_throttler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
// Copyright 2016, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package throttler
import (
"time"
log "github.com/golang/glog"
"github.com/youtube/vitess/go/sync2"
)
// threadThrottler implements the core logic which decides if a Throttle() call
// should be throttled (and for how long) or not.
// It does so by splitting the time into 1 second intervals. For example,
// the current rate is based on the number of requests received within the
// current second.
type threadThrottler struct {
threadID int
// maxRate is the maximum allowed rate.
// If it gets updated, we won't consider it until the next second starts
// because our throttler operates at a 1s granularity and doesn't support
// changes within the same second.
// For example in case of a steep rate increase at the end of the second,
// it would let too many requests through for the remainder of the second:
// old rate = 10 (1 request every 100 ms)
// new rate = 1000 (1 request every 1 ms)
// milliseconds left in the second = 100 ms (i.e. 900 ms elapsed)
// => old rate would allow 1 more request
// => new rate would allow 991 more requests
maxRate sync2.AtomicInt64
// Fields below are unguarded because they must not be modified concurrently.
// initialized is false if we must update the fields below first.
initialized bool
currentSecond time.Time
// maxRateSecond is the allowed max rate since the second started.
// It won't change until currentSecond changes.
maxRateSecond int64
// currentRate is the number of allowed requests since currentSecond started.
currentRate int64
// nextRequestInterval is the time when the next request will be allowed.
// Tracking this ensures there won't be more than one request per interval.
nextRequestInterval time.Time
}
func newThreadThrottler(threadID int) *threadThrottler {
return &threadThrottler{threadID: threadID}
}
func (t *threadThrottler) throttle(now time.Time) time.Duration {
// Initialize or advance the current second interval when necessary.
nowSecond := now.Truncate(time.Second)
if !t.initialized {
t.resetSecond(nowSecond)
t.initialized = true
}
if !t.currentSecond.Equal(nowSecond) {
t.resetSecond(nowSecond)
}
maxRate := t.maxRateSecond
if maxRate == ZeroRateNoProgess {
// Throughput is effectively paused. Do not let anything through until
// the max rate changes.
return t.currentSecond.Add(1 * time.Second).Sub(now)
}
// Check if we have already received too many requests within this second.
if t.currentRate >= maxRate {
return t.currentSecond.Add(1 * time.Second).Sub(now)
}
// Next request isn't expected earlier than nextRequestInterval.
// With this check we ensure there's one request per request interval at most.
if now.Before(t.nextRequestInterval) {
return t.nextRequestInterval.Sub(now)
}
// Check if we have to pace the user.
// NOTE: Pacing won't work if maxRate > 1e9 (since 1e9ns = 1s) and therefore
// the returned backoff will always be zero.
// Minimum time between two requests.
requestIntervalNs := (1 * time.Second).Nanoseconds() / maxRate
// End of the previous request is the earliest allowed time of this request.
earliestArrivalOffsetNs := t.currentRate * requestIntervalNs
earliestArrival := t.currentSecond.Add(time.Duration(earliestArrivalOffsetNs) * time.Nanosecond)
// TODO(mberlin): Most likely we overshoot here since we don't take into
// account our and the user's processing time. Due to too long backoffs, they
// might not be able to fully use their capacity/maximum rate.
backoff := earliestArrival.Sub(now)
if backoff > 0 {
return backoff
}
// Calculate the earlist time the next request can pass.
requestInterval := time.Duration(requestIntervalNs) * time.Nanosecond
currentRequestInterval := now.Truncate(requestInterval)
t.nextRequestInterval = currentRequestInterval.Add(requestInterval)
// QPS rates >= 10k are prone to skipping their next request interval.
// We have to be more relaxed in this case.
if requestInterval <= 100*time.Microsecond {
t.nextRequestInterval = t.nextRequestInterval.Add(-requestInterval)
}
t.currentRate++
return NotThrottled
}
func (t *threadThrottler) resetSecond(nowSecond time.Time) {
if nowSecond.Before(t.currentSecond) {
log.Warningf("Time did not increase monotonously. Make sure your system operates properly. time observed before: %v now: %v", t.currentSecond, nowSecond)
}
t.currentSecond = nowSecond
t.maxRateSecond = t.maxRate.Get()
t.currentRate = 0
t.nextRequestInterval = nowSecond
}
func (t *threadThrottler) setMaxRate(rate int64) {
t.maxRate.Set(rate)
}