This repository has been archived by the owner on Aug 30, 2019. It is now read-only.
/
sampler.go
120 lines (101 loc) · 3.66 KB
/
sampler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package main
import (
"fmt"
"reflect"
"sync/atomic"
"time"
log "github.com/cihub/seelog"
"github.com/DataDog/datadog-trace-agent/internal/agent"
"github.com/DataDog/datadog-trace-agent/internal/config"
"github.com/DataDog/datadog-trace-agent/internal/info"
"github.com/DataDog/datadog-trace-agent/internal/sampler"
"github.com/DataDog/datadog-trace-agent/internal/watchdog"
)
// Sampler chooses wich spans to write to the API
type Sampler struct {
// For stats
keptTraceCount uint64
totalTraceCount uint64
lastFlush time.Time
// actual implementation of the sampling logic
engine sampler.Engine
}
// NewScoreSampler creates a new empty sampler ready to be started
func NewScoreSampler(conf *config.AgentConfig) *Sampler {
return &Sampler{
engine: sampler.NewScoreEngine(conf.ExtraSampleRate, conf.MaxTPS),
}
}
// NewErrorsSampler creates a new sampler dedicated to traces containing errors
// to isolate them from the global max tps. It behaves exactly like the normal
// ScoreSampler except that its statistics are reported under a different name.
func NewErrorsSampler(conf *config.AgentConfig) *Sampler {
return &Sampler{
engine: sampler.NewErrorsEngine(conf.ExtraSampleRate, conf.MaxTPS),
}
}
// NewPrioritySampler creates a new empty distributed sampler ready to be started
func NewPrioritySampler(conf *config.AgentConfig, dynConf *sampler.DynamicConfig) *Sampler {
return &Sampler{
engine: sampler.NewPriorityEngine(conf.ExtraSampleRate, conf.MaxTPS, &dynConf.RateByService),
}
}
// Run starts sampling traces
func (s *Sampler) Run() {
go func() {
defer watchdog.LogOnPanic()
s.engine.Run()
}()
go func() {
defer watchdog.LogOnPanic()
s.logStats()
}()
}
// Add samples a trace and returns true if trace was sampled (should be kept), false otherwise
func (s *Sampler) Add(t agent.ProcessedTrace) (sampled bool, rate float64) {
atomic.AddUint64(&s.totalTraceCount, 1)
sampled, rate = s.engine.Sample(t.Trace, t.Root, t.Env)
if sampled {
atomic.AddUint64(&s.keptTraceCount, 1)
}
return sampled, rate
}
// Stop stops the sampler
func (s *Sampler) Stop() {
s.engine.Stop()
}
// logStats reports statistics and update the info exposed.
func (s *Sampler) logStats() {
for now := range time.Tick(10 * time.Second) {
keptTraceCount := atomic.SwapUint64(&s.keptTraceCount, 0)
totalTraceCount := atomic.SwapUint64(&s.totalTraceCount, 0)
duration := now.Sub(s.lastFlush)
s.lastFlush = now
// TODO: do we still want that? figure out how it conflicts with what the `state` exposes / what is public metrics.
var stats info.SamplerStats
if duration > 0 {
stats.KeptTPS = float64(keptTraceCount) / duration.Seconds()
stats.TotalTPS = float64(totalTraceCount) / duration.Seconds()
}
engineType := fmt.Sprint(reflect.TypeOf(s.engine))
log.Debugf("%s: flushed %d sampled traces out of %d", engineType, keptTraceCount, totalTraceCount)
state := s.engine.GetState()
switch state := state.(type) {
case sampler.InternalState:
log.Debugf("%s: inTPS: %f, outTPS: %f, maxTPS: %f, offset: %f, slope: %f, cardinality: %d",
engineType, state.InTPS, state.OutTPS, state.MaxTPS, state.Offset, state.Slope, state.Cardinality)
// publish through expvar
// TODO: avoid type switch, prefer engine method
switch s.engine.GetType() {
case sampler.NormalScoreEngineType:
info.UpdateSamplerInfo(info.SamplerInfo{Stats: stats, State: state})
case sampler.ErrorsScoreEngineType:
info.UpdateErrorsSamplerInfo(info.SamplerInfo{Stats: stats, State: state})
case sampler.PriorityEngineType:
info.UpdatePrioritySamplerInfo(info.SamplerInfo{Stats: stats, State: state})
}
default:
log.Debugf("unhandled sampler engine, can't log state")
}
}
}