-
Notifications
You must be signed in to change notification settings - Fork 119
/
sampling_rule.go
217 lines (167 loc) · 5.63 KB
/
sampling_rule.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
// Copyright 2017-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with the License. A copy of the License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
package sampling
import (
"sync"
"github.com/aws/aws-xray-sdk-go/internal/logger"
"github.com/aws/aws-xray-sdk-go/pattern"
"github.com/aws/aws-xray-sdk-go/utils"
xraySvc "github.com/aws/aws-sdk-go/service/xray"
)
// Properties is the base set of properties that define a sampling rule.
type Properties struct {
ServiceName string `json:"service_name"`
Host string `json:"host"`
HTTPMethod string `json:"http_method"`
URLPath string `json:"url_path"`
FixedTarget int64 `json:"fixed_target"`
Rate float64 `json:"rate"`
}
// AppliesTo returns true if the sampling rule matches against given parameters. False Otherwise.
// Assumes lock is already held, if required.
func (p *Properties) AppliesTo(host, path, method string) bool {
return (host == "" || pattern.WildcardMatchCaseInsensitive(p.Host, host)) &&
(path == "" || pattern.WildcardMatchCaseInsensitive(p.URLPath, path)) &&
(method == "" || pattern.WildcardMatchCaseInsensitive(p.HTTPMethod, method))
}
// AppliesTo returns true if the sampling rule matches against given sampling request. False Otherwise.
// Assumes lock is already held, if required.
func (r *CentralizedRule) AppliesTo(request *Request) bool {
return (request.Host == "" || pattern.WildcardMatchCaseInsensitive(r.Host, request.Host)) &&
(request.URL == "" || pattern.WildcardMatchCaseInsensitive(r.URLPath, request.URL)) &&
(request.Method == "" || pattern.WildcardMatchCaseInsensitive(r.HTTPMethod, request.Method)) &&
(request.ServiceName == "" || pattern.WildcardMatchCaseInsensitive(r.ServiceName, request.ServiceName)) &&
(request.ServiceType == "" || pattern.WildcardMatchCaseInsensitive(r.serviceType, request.ServiceType))
}
// CentralizedRule represents a centralized sampling rule
type CentralizedRule struct {
// Centralized reservoir for keeping track of reservoir usage
reservoir *CentralizedReservoir
// Rule name identifying this rule
ruleName string
// Priority of matching against rule
priority int64
// Number of requests matched against this rule
requests int64
// Number of requests sampled using this rule
sampled int64
// Number of requests burrowed
borrows int64
// Timestamp for last match against this rule
usedAt int64
// Common sampling rule properties
*Properties
// ServiceType for the sampling rule
serviceType string
// ResourceARN for the sampling rule
resourceARN string
// Attributes for the sampling rule
attributes map[string]*string
// Provides system time
clock utils.Clock
// Provides random numbers
rand utils.Rand
mu sync.RWMutex
}
// stale returns true if the quota is due for a refresh. False otherwise.
func (r *CentralizedRule) stale(now int64) bool {
r.mu.RLock()
defer r.mu.RUnlock()
return r.requests != 0 && now >= r.reservoir.refreshedAt+r.reservoir.interval
}
// Sample returns true if the request should be sampled. False otherwise.
func (r *CentralizedRule) Sample() *Decision {
now := r.clock.Now().Unix()
sd := &Decision{
Rule: &r.ruleName,
}
r.mu.Lock()
defer r.mu.Unlock()
r.requests++
// Fallback to bernoulli sampling if quota has expired
if r.reservoir.expired(now) {
if r.reservoir.borrow(now) {
logger.Debugf(
"Sampling target has expired for rule %s. Borrowing a request.",
r.ruleName,
)
sd.Sample = true
r.borrows++
return sd
}
logger.Debugf(
"Sampling target has expired for rule %s. Using fixed rate.",
r.ruleName,
)
sd.Sample = r.bernoulliSample()
return sd
}
// Take from reservoir quota, if possible
if r.reservoir.Take(now) {
r.sampled++
sd.Sample = true
return sd
}
logger.Debugf(
"Sampling target has been exhausted for rule %s. Using fixed rate.",
r.ruleName,
)
// Use bernoulli sampling if quota expended
sd.Sample = r.bernoulliSample()
return sd
}
// bernoulliSample uses bernoulli sampling rate to make a sampling decision
func (r *CentralizedRule) bernoulliSample() bool {
if r.rand.Float64() < r.Rate {
r.sampled++
return true
}
return false
}
// snapshot takes a snapshot of the sampling statistics counters, returning
// xraySvc.SamplingStatistics. It also resets statistics counters.
func (r *CentralizedRule) snapshot() *xraySvc.SamplingStatisticsDocument {
r.mu.Lock()
name := &r.ruleName
// Copy statistics counters since xraySvc.SamplingStatistics expects
// pointers to counters, and ours are mutable.
requests, sampled, borrows := r.requests, r.sampled, r.borrows
// Reset counters
r.requests, r.sampled, r.borrows = 0, 0, 0
r.mu.Unlock()
now := r.clock.Now()
s := &xraySvc.SamplingStatisticsDocument{
RequestCount: &requests,
SampledCount: &sampled,
BorrowCount: &borrows,
RuleName: name,
Timestamp: &now,
}
return s
}
// Rule is local sampling rule.
type Rule struct {
reservoir *Reservoir
// Provides random numbers
rand utils.Rand
// Common sampling rule properties
*Properties
mu sync.RWMutex
}
// Sample is used to provide sampling decision.
func (r *Rule) Sample() *Decision {
var sd Decision
r.mu.Lock()
if r.reservoir.Take() {
sd.Sample = true
} else {
sd.Sample = r.rand.Float64() < r.Rate
}
r.mu.Unlock()
return &sd
}