-
Notifications
You must be signed in to change notification settings - Fork 1
/
slow-detector-thread.go
162 lines (130 loc) · 3.75 KB
/
slow-detector-thread.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
/*
© 2023–present Harald Rudell <harald.rudell@gmail.com> (https://haraldrudell.github.io/haraldrudell/)
ISC License
*/
package parl
import (
"sync"
"sync/atomic"
"time"
"github.com/haraldrudell/parl/parli"
"github.com/haraldrudell/parl/perrors"
"github.com/haraldrudell/parl/pmaps"
"github.com/haraldrudell/parl/sets"
)
const (
SlowDefault slowType = iota
SlowOwnThread
SlowShutdownThread
slowScanPeriod = time.Second
)
// shared SlowDetectorThread for SlowDefault threads
var slowDetectorThread SlowDetectorThread
type SlowDetectorThread struct {
slowTyp slowType
nonReturnPeriod time.Duration
slowMap pmaps.RWMap[slowID, *SlowDetectorInvocation]
hasThread atomic.Bool
slowLock sync.Mutex
goGen GoGen
cancelGo func()
}
func NewSlowDetectorThread(slowTyp slowType, nonReturnPeriod time.Duration, goGen GoGen) (sdt *SlowDetectorThread) {
if goGen == nil {
panic(perrors.NewPF("goGen cannot be nil"))
}
// dedicated thread case
if slowTyp != SlowDefault {
return &SlowDetectorThread{
slowTyp: slowTyp,
nonReturnPeriod: nonReturnPeriod,
slowMap: *pmaps.NewRWMap2[slowID, *SlowDetectorInvocation](),
goGen: goGen,
}
}
sdt = &slowDetectorThread
sdt.slowLock.Lock()
defer sdt.slowLock.Unlock()
if sdt.goGen != nil {
return // slowDetectorThread already initialized return
}
// slowDetectorThread initialization
sdt.slowTyp = slowTyp
sdt.nonReturnPeriod = nonReturnPeriod
sdt.slowMap = *pmaps.NewRWMap2[slowID, *SlowDetectorInvocation]()
sdt.goGen = goGen
return
}
func (sdt *SlowDetectorThread) Start(sdi *SlowDetectorInvocation) {
// store in map
sdt.slowMap.Put(sdi.sID, sdi)
if !sdt.hasThread.CompareAndSwap(false, true) {
return // thread already running return
}
// launch thread
subGo := sdt.goGen.SubGo()
g0 := subGo.Go()
go sdt.thread(g0)
if sdt.slowTyp != SlowShutdownThread {
return // thread is not to be shutdown return
}
// save cancel method
sdt.slowLock.Lock()
defer sdt.slowLock.Unlock()
sdt.cancelGo = subGo.Cancel
}
func (sdt *SlowDetectorThread) Stop(sdi *SlowDetectorInvocation) {
// remove from map
sdt.slowMap.Delete(sdi.sID, parli.MapDeleteWithZeroValue)
if sdt.slowMap.Length() > 0 || sdt.slowTyp != SlowShutdownThread {
return // not to be shutdown or not to be shutdown now return
}
sdt.cancelGo()
}
func (sdt *SlowDetectorThread) thread(g0 Go) {
var err error
defer g0.Register("SlowDetectorThread" + goID().String()).Done(&err)
defer PanicToErr(&err)
ticker := time.NewTicker(slowScanPeriod)
defer ticker.Stop()
var C <-chan time.Time = ticker.C
var done <-chan struct{} = g0.Context().Done()
var t time.Time
for {
select {
case <-done:
return // context cancelled return
case t = <-C:
}
// check all invocations for non-return
for _, sdi := range sdt.slowMap.List() {
// duration is how long the invocation has been in progress
duration := t.Sub(sdi.t0)
if duration < 0 {
// if t coming from the ticker was delayed,
// then t may be a time in the past,
// so early that sdi.t0 is after t
continue // ignore negative durations
}
sd := sdi.sd
sd.alwaysMax.Value(duration)
if sd.max.Value(duration) {
// it is a new max, check whether nonReturnPeriod has elapsed
if tLast := sdi.Time(time.Time{}); tLast.IsZero() || t.Sub(tLast) >= sdt.nonReturnPeriod {
// store new nonReturnPeriod start
sdi.Time(t)
sd.callback(sdi, false, duration)
}
}
}
}
}
type slowType uint8
func (st slowType) String() (s string) {
return slowTypeSet.StringT(st)
}
var slowTypeSet = sets.NewSet[slowType]([]sets.SetElement[slowType]{
{ValueV: SlowDefault, Name: "sharedThread"},
{ValueV: SlowOwnThread, Name: "ownThread"},
{ValueV: SlowShutdownThread, Name: "shutdownThread"},
})