-
Notifications
You must be signed in to change notification settings - Fork 1
/
period-waiter.go
84 lines (72 loc) · 2.09 KB
/
period-waiter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/*
© 2022–present Harald Rudell <harald.rudell@gmail.com> (https://haraldrudell.github.io/haraldrudell/)
ISC License
*/
package parl
import (
"math"
"sync"
"sync/atomic"
)
// PeriodWaiter blocks Wait invokers while a HoldWaiters
// that has not been succeeded by a ReleaseWaiters invocation. Thread-safe
type PeriodWaiter struct {
// wg holds Wait invokers
// - wg is atomically updated
wg atomic.Pointer[sync.WaitGroup]
count uint64 // atomic. number of threads in Wait
}
// NewPeriodWaiter returns an object that can hold threads periodically. Thread-safe
func NewPeriodWaiter() (periodWaiter *PeriodWaiter) {
return &PeriodWaiter{}
}
// HoldWaiters causes a thread invoking [PeriodWaiter.Wait] to wait. Thread-safe
func (p *PeriodWaiter) HoldWaiters() {
if p.wg.Load() != nil {
return // already waiting return: noop
}
var wg sync.WaitGroup
wg.Add(1)
// atomic nil to value of p.wg
p.wg.CompareAndSwap(nil, &wg)
}
// ReleaseWaiters releases any threads blocked in [PeriodWaiter.Wait]
// and lets new Wait invokers proceed. Thread-safe
func (p *PeriodWaiter) ReleaseWaiters() {
// atomic read of the current wg
var wg = p.wg.Load()
if wg == nil {
return // state not waiting return: noop
}
// resolve any pending waitgroup
if !p.wg.CompareAndSwap(wg, nil) {
return // other thread already niled it: noop
}
wg.Done()
}
// Count returns the number of threads currently in Wait
func (p *PeriodWaiter) Count() (waitingThreads int) {
return int(atomic.LoadUint64(&p.count))
}
// IsHold returns whether Wait will currently block
func (p *PeriodWaiter) IsHold() (isHold bool) {
return p.wg.Load() != nil
}
// Wait blocks the thread if a HoldWaiters invocation took place with no
// ReleaseWaiters succeeding it. Thread-safe
func (p *PeriodWaiter) Wait() {
// first atomic read of current p.wg
var wg = p.wg.Load()
if wg == nil {
return // no mandated wait return
}
atomic.AddUint64(&p.count, 1)
defer atomic.AddUint64(&p.count, math.MaxUint64)
// keep waiting until the current wg value is nil
for {
wg.Wait()
if wg = p.wg.Load(); wg == nil {
return // wait complete return
}
}
}