This repository has been archived by the owner on Aug 30, 2019. It is now read-only.
/
coresampler.go
163 lines (137 loc) · 4.7 KB
/
coresampler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
package sampler
import (
"math"
"time"
"github.com/DataDog/datadog-trace-agent/internal/pb"
"github.com/DataDog/datadog-trace-agent/internal/watchdog"
)
const (
// Sampler parameters not (yet?) configurable
defaultDecayPeriod time.Duration = 5 * time.Second
// With this factor, any past trace counts for less than 50% after 6*decayPeriod and >1% after 39*decayPeriod
// We can keep it hardcoded, but having `decayPeriod` configurable should be enough?
defaultDecayFactor float64 = 1.125 // 9/8
adjustPeriod time.Duration = 10 * time.Second
initialSignatureScoreOffset float64 = 1
minSignatureScoreOffset float64 = 0.01
defaultSignatureScoreSlope float64 = 3
)
// EngineType represents the type of a sampler engine.
type EngineType int
const (
// NormalScoreEngineType is the type of the ScoreEngine sampling non-error traces.
NormalScoreEngineType EngineType = iota
// ErrorsScoreEngineType is the type of the ScoreEngine sampling error traces.
ErrorsScoreEngineType
// PriorityEngineType is type of the priority sampler engine type.
PriorityEngineType
)
// Engine is a common basic interface for sampler engines.
type Engine interface {
// Run the sampler.
Run()
// Stop the sampler.
Stop()
// Sample a trace.
Sample(trace pb.Trace, root *pb.Span, env string) (sampled bool, samplingRate float64)
// GetState returns information about the sampler.
GetState() interface{}
// GetType returns the type of the sampler.
GetType() EngineType
}
// Sampler is the main component of the sampling logic
type Sampler struct {
// Storage of the state of the sampler
Backend Backend
// Extra sampling rate to combine to the existing sampling
extraRate float64
// Maximum limit to the total number of traces per second to sample
maxTPS float64
// Sample any signature with a score lower than scoreSamplingOffset
// It is basically the number of similar traces per second after which we start sampling
signatureScoreOffset *atomicFloat64
// Logarithm slope for the scoring function
signatureScoreSlope *atomicFloat64
// signatureScoreFactor = math.Pow(signatureScoreSlope, math.Log10(scoreSamplingOffset))
signatureScoreFactor *atomicFloat64
exit chan struct{}
}
// newSampler returns an initialized Sampler
func newSampler(extraRate float64, maxTPS float64) *Sampler {
s := &Sampler{
Backend: NewMemoryBackend(defaultDecayPeriod, defaultDecayFactor),
extraRate: extraRate,
maxTPS: maxTPS,
signatureScoreOffset: newFloat64(0),
signatureScoreSlope: newFloat64(0),
signatureScoreFactor: newFloat64(0),
exit: make(chan struct{}),
}
s.SetSignatureCoefficients(initialSignatureScoreOffset, defaultSignatureScoreSlope)
return s
}
// SetSignatureCoefficients updates the internal scoring coefficients used by the signature scoring
func (s *Sampler) SetSignatureCoefficients(offset float64, slope float64) {
s.signatureScoreOffset.Store(offset)
s.signatureScoreSlope.Store(slope)
s.signatureScoreFactor.Store(math.Pow(slope, math.Log10(offset)))
}
// UpdateExtraRate updates the extra sample rate
func (s *Sampler) UpdateExtraRate(extraRate float64) {
s.extraRate = extraRate
}
// UpdateMaxTPS updates the max TPS limit
func (s *Sampler) UpdateMaxTPS(maxTPS float64) {
s.maxTPS = maxTPS
}
// Run runs and block on the Sampler main loop
func (s *Sampler) Run() {
go func() {
defer watchdog.LogOnPanic()
s.Backend.Run()
}()
s.RunAdjustScoring()
}
// Stop stops the main Run loop
func (s *Sampler) Stop() {
s.Backend.Stop()
close(s.exit)
}
// RunAdjustScoring is the sampler feedback loop to adjust the scoring coefficients
func (s *Sampler) RunAdjustScoring() {
t := time.NewTicker(adjustPeriod)
defer t.Stop()
for {
select {
case <-t.C:
s.AdjustScoring()
case <-s.exit:
return
}
}
}
// GetSampleRate returns the sample rate to apply to a trace.
func (s *Sampler) GetSampleRate(trace pb.Trace, root *pb.Span, signature Signature) float64 {
rate := s.GetSignatureSampleRate(signature) * s.extraRate
return rate
}
// GetMaxTPSSampleRate returns an extra sample rate to apply if we are above maxTPS.
func (s *Sampler) GetMaxTPSSampleRate() float64 {
// When above maxTPS, apply an additional sample rate to statistically respect the limit
maxTPSrate := 1.0
if s.maxTPS > 0 {
currentTPS := s.Backend.GetUpperSampledScore()
if currentTPS > s.maxTPS {
maxTPSrate = s.maxTPS / currentTPS
}
}
return maxTPSrate
}
// CombineRates merges two rates from Sampler1, Sampler2. Both samplers law are independant,
// and {sampled} = {sampled by Sampler1} or {sampled by Sampler2}
func CombineRates(rate1 float64, rate2 float64) float64 {
if rate1 >= 1 || rate2 >= 1 {
return 1
}
return rate1 + rate2 - rate1*rate2
}