forked from pinpoint-apm/pinpoint-go-agent
-
Notifications
You must be signed in to change notification settings - Fork 0
/
sampler.go
114 lines (97 loc) · 2.16 KB
/
sampler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package pinpoint
import (
"golang.org/x/time/rate"
"sync/atomic"
"time"
)
type sampler interface {
isSampled() bool
}
type rateSampler struct {
samplingRate uint64
counter uint64
}
func newRateSampler(r uint64) *rateSampler {
return &rateSampler{
samplingRate: r,
counter: 0,
}
}
func (s *rateSampler) isSampled() bool {
samplingCount := atomic.AddUint64(&s.counter, 1)
isSampled := samplingCount % s.samplingRate
return isSampled == 0
}
type traceSampler interface {
isNewSampled() bool
isContinueSampled() bool
}
type basicTraceSampler struct {
baseSampler sampler
}
func newBasicTraceSampler(base sampler) *basicTraceSampler {
return &basicTraceSampler{
baseSampler: base,
}
}
func (s *basicTraceSampler) isNewSampled() bool {
sampled := s.baseSampler.isSampled()
if sampled {
incrSampleNew()
} else {
incrUnsampleNew()
}
return sampled
}
func (s *basicTraceSampler) isContinueSampled() bool {
sampled := s.baseSampler.isSampled()
if sampled {
incrSampleCont()
} else {
incrUnsampleCont()
}
return sampled
}
type throughputLimitTraceSampler struct {
baseSampler sampler
newSamplelimiter *rate.Limiter
continueSamplelimiter *rate.Limiter
}
func newThroughputLimitTraceSampler(base sampler, newTps int, continueTps int) *throughputLimitTraceSampler {
return &throughputLimitTraceSampler{
baseSampler: base,
newSamplelimiter: rate.NewLimiter(per(newTps, time.Second), 1),
continueSamplelimiter: rate.NewLimiter(per(continueTps, time.Second), 1),
}
}
func per(throughput int, d time.Duration) rate.Limit {
return rate.Every(d / time.Duration(throughput))
}
func (s *throughputLimitTraceSampler) isNewSampled() bool {
sampled := s.baseSampler.isSampled()
if sampled {
sampled = s.newSamplelimiter.Allow()
if sampled {
incrSampleNew()
} else {
incrSkipNew()
}
} else {
incrUnsampleNew()
}
return sampled
}
func (s *throughputLimitTraceSampler) isContinueSampled() bool {
sampled := s.baseSampler.isSampled()
if sampled {
sampled = s.continueSamplelimiter.Allow()
if sampled {
incrSampleCont()
} else {
incrSkipCont()
}
} else {
incrUnsampleCont()
}
return sampled
}