-
Notifications
You must be signed in to change notification settings - Fork 1
/
slow-detector-core.go
130 lines (109 loc) · 3.26 KB
/
slow-detector-core.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
/*
© 2023–present Harald Rudell <harald.rudell@gmail.com> (https://haraldrudell.github.io/haraldrudell/)
ISC License
*/
package parl
import (
"sync/atomic"
"time"
"github.com/haraldrudell/parl/perrors"
"github.com/haraldrudell/parl/ptime"
)
const (
defaultMinReportDuration = 100 * time.Millisecond
defaultNonReturnPeriod = time.Minute
)
type CbSlowDetector func(sdi *SlowDetectorInvocation, hasReturned bool, duration time.Duration)
type slowID uint64
var slowIDGenerator UniqueIDTypedUint64[slowID]
// SlowDetectorCore measures latency via Start-Stop invocations
// - Thread-Safe and multi-threaded, parallel invocations
// - Separate thread measures time of non-returning, hung invocations
type SlowDetectorCore struct {
ID slowID
callback CbSlowDetector
thread *SlowDetectorThread
max AtomicMax[time.Duration]
alwaysMax AtomicMax[time.Duration]
last time.Duration // atomic
average ptime.Averager[time.Duration]
}
func NewSlowDetectorCore(callback CbSlowDetector, slowTyp slowType, goGen GoGen, threshold ...time.Duration) (slowDetector *SlowDetectorCore) {
if callback == nil {
panic(perrors.NewPF("callback cannot be nil"))
}
// threshold0: minimum slowness to report, default 100 ms
var threshold0 time.Duration
if len(threshold) > 0 {
threshold0 = threshold[0]
} else {
threshold0 = defaultMinReportDuration
}
// threshold 1: time between non-return reports, default 1 minute
var nonReturnPeriod time.Duration
if len(threshold) > 1 {
nonReturnPeriod = threshold[1]
} else {
nonReturnPeriod = defaultNonReturnPeriod
}
return &SlowDetectorCore{
ID: slowIDGenerator.ID(),
callback: callback,
thread: NewSlowDetectorThread(slowTyp, nonReturnPeriod, goGen),
max: *NewAtomicMax(threshold0),
average: *ptime.NewAverager[time.Duration](),
}
}
// Start returns the effective start time for a new timing cycle
// - value is optional start time, default time.Now()
func (sd *SlowDetectorCore) Start(invoLabel string, value ...time.Time) (invocation *SlowDetectorInvocation) {
// get time value for this operation
var t0 time.Time
if len(value) > 0 {
t0 = value[0]
} else {
t0 = time.Now()
}
// save in map, launch thread if not already running
s := SlowDetectorInvocation{
sID: slowIDGenerator.ID(),
invoLabel: invoLabel,
threadID: goID(),
t0: t0,
stop: sd.stop,
sd: sd,
}
sd.thread.Start(&s)
return &s
}
func (sd *SlowDetectorCore) Values() (
last, average, max time.Duration,
hasValue bool,
) {
last = time.Duration(atomic.LoadInt64((*int64)(&sd.last)))
averageFloat, _ := sd.average.Average()
average = time.Duration(averageFloat)
max, hasValue = sd.alwaysMax.Max()
return
}
// Stop is invoked via SlowDetectorInvocation
func (sd *SlowDetectorCore) stop(sdi *SlowDetectorInvocation, value ...time.Time) {
// remove from map and possibly shutdown thread
sd.thread.Stop(sdi)
// get time value for this operation
var t1 time.Time
if len(value) > 0 {
t1 = value[0]
} else {
t1 = time.Now()
}
// store last and average
duration := t1.Sub(sdi.t0)
atomic.StoreInt64((*int64)(&sd.last), int64(duration))
sd.average.Add(duration, t1)
sd.alwaysMax.Value(duration)
// check against max
if sd.max.Value(duration) {
sd.callback(sdi, true, duration)
}
}