This repository has been archived by the owner on Aug 30, 2019. It is now read-only.
/
sampler_max_eps.go
149 lines (123 loc) · 4.1 KB
/
sampler_max_eps.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package event
import (
"time"
log "github.com/cihub/seelog"
"github.com/DataDog/datadog-trace-agent/internal/metrics"
"github.com/DataDog/datadog-trace-agent/internal/pb"
"github.com/DataDog/datadog-trace-agent/internal/sampler"
)
const maxEPSReportFrequency = 10 * time.Second
// maxEPSSampler (Max Events Per Second Sampler) is an event maxEPSSampler that samples provided events so as to try to ensure
// no more than a certain amount of events is sampled per second.
//
// Note that events associated with traces with UserPriorityKeep are always sampled and don't influence underlying
// rate counters so as not to skew stats.
type maxEPSSampler struct {
maxEPS float64
rateCounter rateCounter
reportFrequency time.Duration
reportDone chan bool
}
// NewMaxEPSSampler creates a new instance of a maxEPSSampler with the provided maximum amount of events per second.
func newMaxEPSSampler(maxEPS float64) *maxEPSSampler {
return &maxEPSSampler{
maxEPS: maxEPS,
rateCounter: newSamplerBackendRateCounter(),
reportDone: make(chan bool),
}
}
// Start starts the underlying rate counter.
func (s *maxEPSSampler) Start() {
s.rateCounter.Start()
go func() {
ticker := time.NewTicker(maxEPSReportFrequency)
defer close(s.reportDone)
defer ticker.Stop()
for {
select {
case <-s.reportDone:
return
case <-ticker.C:
s.report()
}
}
}()
}
// Stop stops the underlying rate counter.
func (s *maxEPSSampler) Stop() {
s.reportDone <- true
<-s.reportDone
s.rateCounter.Stop()
}
// Sample determines whether or not we should sample the provided event in order to ensure no more than maxEPS events
// are sampled every second.
func (s *maxEPSSampler) Sample(event *pb.Span) (sampled bool, rate float64) {
// Count that we saw a new event
s.rateCounter.Count()
rate = 1.0
currentEPS := s.rateCounter.GetRate()
if currentEPS > s.maxEPS {
rate = s.maxEPS / currentEPS
}
sampled = sampler.SampleByRate(event.TraceID, rate)
return
}
// getSampleRate returns the applied sample rate based on this sampler's current state.
func (s *maxEPSSampler) getSampleRate() float64 {
rate := 1.0
currentEPS := s.rateCounter.GetRate()
if currentEPS > s.maxEPS {
rate = s.maxEPS / currentEPS
}
return rate
}
func (s *maxEPSSampler) report() {
maxRate := s.maxEPS
metrics.Gauge("datadog.trace_agent.events.max_eps.max_rate", maxRate, nil, 1)
currentRate := s.rateCounter.GetRate()
metrics.Gauge("datadog.trace_agent.events.max_eps.current_rate", currentRate, nil, 1)
sampleRate := s.getSampleRate()
metrics.Gauge("datadog.trace_agent.events.max_eps.sample_rate", sampleRate, nil, 1)
reachedMaxGaugeV := 0.
if sampleRate < 1 {
reachedMaxGaugeV = 1.
log.Warnf("Max events per second reached (current=%.2f/s, max=%.2f/s). "+
"Some events are now being dropped (sample rate=%.2f). Consider adjusting event sampling rates.",
currentRate, maxRate, sampleRate)
}
metrics.Gauge("datadog.trace_agent.events.max_eps.reached_max", reachedMaxGaugeV, nil, 1)
}
// rateCounter keeps track of different event rates.
type rateCounter interface {
Start()
Count()
GetRate() float64
Stop()
}
// samplerBackendRateCounter is a rateCounter backed by a maxEPSSampler.Backend.
type samplerBackendRateCounter struct {
backend sampler.Backend
}
// newSamplerBackendRateCounter creates a new samplerBackendRateCounter based on exponential decay counters.
func newSamplerBackendRateCounter() *samplerBackendRateCounter {
return &samplerBackendRateCounter{
// TODO: Allow these to be configurable or study better defaults based on intended target
backend: sampler.NewMemoryBackend(1*time.Second, 1.125),
}
}
// Start starts the decaying of the backend rate counter.
func (sb *samplerBackendRateCounter) Start() {
go sb.backend.Run()
}
// Stop stops the decaying of the backend rate counter.
func (sb *samplerBackendRateCounter) Stop() {
sb.backend.Stop()
}
// Count adds an event to the rate computation.
func (sb *samplerBackendRateCounter) Count() {
sb.backend.CountSample()
}
// GetRate gets the current event rate.
func (sb *samplerBackendRateCounter) GetRate() float64 {
return sb.backend.GetUpperSampledScore()
}