-
Notifications
You must be signed in to change notification settings - Fork 1
/
cyclic-waiter.go
120 lines (98 loc) · 3.08 KB
/
cyclic-waiter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
/*
© 2022–present Harald Rudell <harald.rudell@gmail.com> (https://haraldrudell.github.io/haraldrudell/)
ISC License
*/
package parl
import (
"context"
"sync"
"github.com/haraldrudell/parl/perrors"
)
// CyclicWait allows any number of threads to wait for a next occurrence.
// - a parent context may be passed in that on cancel triggers the wait
// and prevents further cycles
// - a channel can be obtained that sends one item on the next trig
// but never closes
// - a channel can be obtained that closes on next trig
// - next trig can be awaited
// - a did-occurer object can be obtained that returns true once the cycle
// trigs.
// - a context can be obtained that cancels on the next trig
// - the cycles can be permanently canceled or trigged and rearmed
type CyclicWait struct {
parentContext context.Context
isCancel AtomicBool
lock sync.RWMutex
ow OnceWaiter
}
// NewCyclicWait returns a channel that will send one item
// when the context cancels or immediately if the context was already canceled.
func NewCyclicWait(ctx context.Context) (onceReceiver *CyclicWait) {
if ctx == nil {
panic(perrors.NewPF("ctx cannot be nil"))
}
return &CyclicWait{
parentContext: ctx,
ow: *NewOnceWaiter(ctx),
}
}
// Ch returns a channel that will emit one item on
// the next trig. It will then not send anything else.
// the channel never closes.
func (cw *CyclicWait) Ch() (ch <-chan struct{}) {
cw.lock.RLock()
defer cw.lock.RUnlock()
return cw.ow.Ch()
}
// Done returns a channel that will close on the next trig or parent context cancel.
// Similar to the Done method of a context.
func (cw *CyclicWait) Done() (done <-chan struct{}) {
cw.lock.RLock()
defer cw.lock.RUnlock()
return cw.ow.Done()
}
// Wait waits until the next trig or parent context cancel.
func (cw *CyclicWait) Wait() {
done := cw.Done()
<-done
}
// DidOccurer returns an object with a DidOccur method returning
// true after this cycle has trigged.
func (cw *CyclicWait) DidOccurer() (didOccurer *OnceWaiterRO) {
cw.lock.RLock()
defer cw.lock.RUnlock()
return NewOnceWaiterRO(&cw.ow)
}
// Context returns a context that cancels on the next trig.
func (cw *CyclicWait) Context() (ctx context.Context) {
cw.lock.RLock()
defer cw.lock.RUnlock()
return cw.ow.Context()
}
// Cancel cancels the object and prevents rearming.
func (cw *CyclicWait) Cancel() {
cw.lock.Lock()
defer cw.lock.Unlock()
// trig this cycle
cw.isCancel.Set()
cw.ow.Cancel()
}
// IsCancel returns whether Cancel has been invoked.
// ISCancel will return false during CancelAndRearm cycles.
func (cw *CyclicWait) IsCancel() (isCancel bool) {
return cw.isCancel.IsTrue()
}
// CancelAndRearm trigs the object and then rearms unless
// a possible parent context has been canceled.
func (cw *CyclicWait) CancelAndRearm() (wasRearmed bool) {
cw.lock.Lock()
defer cw.lock.Unlock()
// trig this cycle
cw.ow.Cancel()
if cw.parentContext.Err() != nil || cw.isCancel.IsTrue() {
return // ream false: parent context has been canceled
}
// rearm: new context
cw.ow = *NewOnceWaiter(cw.parentContext)
return
}