/
sampling.go
113 lines (100 loc) · 3 KB
/
sampling.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package stages
import (
"math"
"math/rand"
"time"
"github.com/go-kit/log"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/uber/jaeger-client-go/utils"
)
const (
ErrSamplingStageInvalidRate = "sampling stage failed to parse rate,Sampling Rate must be between 0.0 and 1.0, received %f"
)
const maxRandomNumber = ^(uint64(1) << 63) // i.e. 0x7fffffffffffffff
var (
defaultSamplingpReason = "sampling_stage"
)
// SamplingConfig contains the configuration for a samplingStage
type SamplingConfig struct {
DropReason *string `mapstructure:"drop_counter_reason"`
//
SamplingRate float64 `mapstructure:"rate"`
}
// validateSamplingConfig validates the SamplingConfig for the sampleStage
func validateSamplingConfig(cfg *SamplingConfig) error {
if cfg.DropReason == nil || *cfg.DropReason == "" {
cfg.DropReason = &defaultSamplingpReason
}
if cfg.SamplingRate < 0.0 || cfg.SamplingRate > 1.0 {
return errors.Errorf(ErrSamplingStageInvalidRate, cfg.SamplingRate)
}
return nil
}
// newSamplingStage creates a SamplingStage from config
// code from jaeger project.
// github.com/uber/jaeger-client-go@v2.30.0+incompatible/tracer.go:126
func newSamplingStage(logger log.Logger, config interface{}, registerer prometheus.Registerer) (Stage, error) {
cfg := &SamplingConfig{}
err := mapstructure.WeakDecode(config, cfg)
if err != nil {
return nil, err
}
err = validateSamplingConfig(cfg)
if err != nil {
return nil, err
}
samplingRate := math.Max(0.0, math.Min(cfg.SamplingRate, 1.0))
samplingBoundary := uint64(float64(maxRandomNumber) * samplingRate)
seedGenerator := utils.NewRand(time.Now().UnixNano())
source := rand.NewSource(seedGenerator.Int63())
return &samplingStage{
logger: log.With(logger, "component", "stage", "type", "sampling"),
cfg: cfg,
dropCount: getDropCountMetric(registerer),
samplingBoundary: samplingBoundary,
source: source,
}, nil
}
type samplingStage struct {
logger log.Logger
cfg *SamplingConfig
dropCount *prometheus.CounterVec
samplingBoundary uint64
source rand.Source
}
func (m *samplingStage) Run(in chan Entry) chan Entry {
out := make(chan Entry)
go func() {
defer close(out)
for e := range in {
if m.isSampled() {
out <- e
continue
}
m.dropCount.WithLabelValues(*m.cfg.DropReason).Inc()
}
}()
return out
}
// code from jaeger project.
// github.com/uber/jaeger-client-go@v2.30.0+incompatible/sampler.go:144
// func (s *ProbabilisticSampler) IsSampled(id TraceID, operation string) (bool, []Tag)
func (m *samplingStage) isSampled() bool {
return m.samplingBoundary >= m.randomID()&maxRandomNumber
}
func (m *samplingStage) randomID() uint64 {
val := m.randomNumber()
for val == 0 {
val = m.randomNumber()
}
return val
}
func (m *samplingStage) randomNumber() uint64 {
return uint64(m.source.Int63())
}
// Name implements Stage
func (m *samplingStage) Name() string {
return StageTypeSampling
}