/
adaptive.go
132 lines (117 loc) · 4.79 KB
/
adaptive.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package adaptive provides functionality for adaptive client-side throttling.
package adaptive
import (
"sync"
"time"
"google.golang.org/grpc/internal/grpcrand"
)
// For overriding in unittests.
var (
timeNowFunc = func() time.Time { return time.Now() }
randFunc = func() float64 { return grpcrand.Float64() }
)
const (
defaultDuration = 30 * time.Second
defaultBins = 100
defaultRatioForAccepts = 2.0
defaultRequestsPadding = 8.0
)
// Throttler implements a client-side throttling recommendation system. All
// methods are safe for concurrent use by multiple goroutines.
//
// The throttler has the following knobs for which we will use defaults for
// now. If there is a need to make them configurable at a later point in time,
// support for the same will be added.
// * Duration: amount of recent history that will be taken into account for
// making client-side throttling decisions. A default of 30 seconds is used.
// * Bins: number of bins to be used for bucketing historical data. A default
// of 100 is used.
// * RatioForAccepts: ratio by which accepts are multiplied, typically a value
// slightly larger than 1.0. This is used to make the throttler behave as if
// the backend had accepted more requests than it actually has, which lets us
// err on the side of sending to the backend more requests than we think it
// will accept for the sake of speeding up the propagation of state. A
// default of 2.0 is used.
// * RequestsPadding: is used to decrease the (client-side) throttling
// probability in the low QPS regime (to speed up propagation of state), as
// well as to safeguard against hitting a client-side throttling probability
// of 100%. The weight of this value decreases as the number of requests in
// recent history grows. A default of 8 is used.
//
// The adaptive throttler attempts to estimate the probability that a request
// will be throttled using recent history. Server requests (both throttled and
// accepted) are registered with the throttler (via the RegisterBackendResponse
// method), which then recommends client-side throttling (via the
// ShouldThrottle method) with probability given by:
// (requests - RatioForAccepts * accepts) / (requests + RequestsPadding)
type Throttler struct {
ratioForAccepts float64
requestsPadding float64
// Number of total accepts and throttles in the lookback period.
mu sync.Mutex
accepts *lookback
throttles *lookback
}
// New initializes a new adaptive throttler with the default values.
func New() *Throttler {
return newWithArgs(defaultDuration, defaultBins, defaultRatioForAccepts, defaultRequestsPadding)
}
// newWithArgs initializes a new adaptive throttler with the provided values.
// Used only in unittests.
func newWithArgs(duration time.Duration, bins int64, ratioForAccepts, requestsPadding float64) *Throttler {
return &Throttler{
ratioForAccepts: ratioForAccepts,
requestsPadding: requestsPadding,
accepts: newLookback(bins, duration),
throttles: newLookback(bins, duration),
}
}
// ShouldThrottle returns a probabilistic estimate of whether the server would
// throttle the next request. This should be called for every request before
// allowing it to hit the network. If the returned value is true, the request
// should be aborted immediately (as if it had been throttled by the server).
func (t *Throttler) ShouldThrottle() bool {
randomProbability := randFunc()
now := timeNowFunc()
t.mu.Lock()
defer t.mu.Unlock()
accepts, throttles := float64(t.accepts.sum(now)), float64(t.throttles.sum(now))
requests := accepts + throttles
throttleProbability := (requests - t.ratioForAccepts*accepts) / (requests + t.requestsPadding)
if throttleProbability <= randomProbability {
return false
}
t.throttles.add(now, 1)
return true
}
// RegisterBackendResponse registers a response received from the backend for a
// request allowed by ShouldThrottle. This should be called for every response
// received from the backend (i.e., once for each request for which
// ShouldThrottle returned false).
func (t *Throttler) RegisterBackendResponse(throttled bool) {
now := timeNowFunc()
t.mu.Lock()
if throttled {
t.throttles.add(now, 1)
} else {
t.accepts.add(now, 1)
}
t.mu.Unlock()
}