-
Notifications
You must be signed in to change notification settings - Fork 1
/
win-or-waiter-core.go
187 lines (158 loc) · 5.94 KB
/
win-or-waiter-core.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
/*
© 2022–present Harald Rudell <harald.rudell@gmail.com> (https://haraldrudell.github.io/haraldrudell/)
ISC License
*/
package parl
import (
"context"
"sync"
"time"
"github.com/haraldrudell/parl/perrors"
"github.com/haraldrudell/parl/ptime"
"github.com/haraldrudell/parl/sets"
)
const (
// WinOrWaiterAnyValue causes a thread to accept any calculated value
WinOrWaiterAnyValue WinOrWaiterStrategy = iota + 1
// WinOrWaiterMustBeLater forces a calculation after the last arriving thread.
// WinOrWaiter caclulations are serialized, ie. a new calculation does not start prior to
// the conlusion of the previous calulation
WinOrWaiterMustBeLater
)
// WinOrWaiter picks a winner thread to carry out some task used by many threads.
// - threads arriving to an idle WinorWaiter are winners that complete the task
// - After a winning thread completes the task, it invokes WinnerDone
// - threads arriving to a WinOrWait in progress are held waiting until WinnerDone
// - the task is completed on demand, but only by the first thread requesting it
type WinOrWaiterCore struct {
cond sync.Cond
// calculation strategy for this WinOrWaiter
// - WinOrWaiterAnyValue WinOrWaiterMustBeLater
strategy WinOrWaiterStrategy
// context used for cancellation, may be nil
ctx context.Context
// dataVersion indicates the available data version with atomic access
// - data version is the time scanning of the current valid version of data began
// - zero value if no data version has been completed
// - only updated by the winner on its completion using winnerFunc
dataVersion ptime.EpochValue
// winnerPicker picks winner thread using atomic access
// - winner is the thread that on Set gets wasNotSet true
// - true while a winner calculates next data value
// - set to zero after winnerFunc invoked
winnerPicker AtomicBool
// calculationStart is the time of starting the last initiated calculation with atomic access
calculationStart ptime.EpochValue
}
// WinOrWaiter returns a semaphore used for completing an on-demand task by
// the first thread requesting it, and that result shared by subsequent threads held
// waiting for the result.
// - strategy: WinOrWaiterAnyValue WinOrWaiterMustBeLater
// - ctx allows foir cancelation of the WinOrWaiter
func NewWinOrWaiterCore(strategy WinOrWaiterStrategy, ctx ...context.Context) (winOrWaiter *WinOrWaiterCore) {
var ctx0 context.Context
if len(ctx) > 0 {
ctx0 = ctx[0]
}
if !strategy.IsValid() {
panic(perrors.ErrorfPF("Bad WInOrWaiter strategy: %s", strategy))
}
return &WinOrWaiterCore{
cond: *sync.NewCond(&sync.Mutex{}),
strategy: strategy,
ctx: ctx0,
}
}
// WinOrWaiter picks a winner thread to carry out some task used by many threads.
// - threads arriving to an idle WinorWaiter are winners that complete the task
// - After a winning thread completes the task, it invokes WinnerDone
// - threads arriving to a WinOrWait in progress are held waiting until WinnerDone
// - the task is completed on demand, but only by the first thread requesting it
func (ww *WinOrWaiterCore) WinOrWait() (winnerFunc func(errp *error)) {
checkWinOrWaiter(ww)
ww.cond.L.Lock()
defer ww.cond.L.Unlock()
// the time this thread arrived
var arrivalTime = time.Now()
// the data version available when this thread arrived
var lastSeenDataVersion = ww.dataVersion.Get().Time()
// wait for a data update
for {
// check context
if ww.IsCancel() {
return // context canceled return
}
// if there has been a data update since this thread arrived
dataVersionNow := ww.dataVersion.Get().Time()
// if we have data and it has changed since arrival…
if !dataVersionNow.IsZero() && !lastSeenDataVersion.Equal(dataVersionNow) {
switch ww.strategy {
case WinOrWaiterAnyValue:
return // any new valid value accepted return
case WinOrWaiterMustBeLater:
if !arrivalTime.Before(dataVersionNow) {
// arrival time the same or after dataVersionNow
return // must be later and the data version is of a later time than when this thread arrived return
}
}
}
lastSeenDataVersion = dataVersionNow // absorb any changes
// ensure data processing is in progress
if isWinner := ww.winnerPicker.Set(); isWinner {
// this thread is a winner!
ww.calculationStart.SetTime()
winnerFunc = ww.winnerFunc
return // this thread is a winner: do task return
}
// wait for any updates
ww.cond.Wait()
}
}
// Invalidate invalidates any completed calculation.
// A calculation in progress may still be accepted.
func (ww *WinOrWaiterCore) Invalidate() {
checkWinOrWaiter(ww)
// invalidate current data version
// for performance, important to do outside of lock
ww.dataVersion.Set(0)
ww.cond.L.Lock()
defer ww.cond.L.Unlock()
ww.cond.Broadcast()
}
func (ww *WinOrWaiterCore) IsCancel() (isCancel bool) {
checkWinOrWaiter(ww)
return ww.ctx != nil && ww.ctx.Err() != nil
}
func (ww *WinOrWaiterCore) winnerFunc(errp *error) {
// if successful, update data version
// for performance, important to do outside of lock
// when dataVersion is updated, waiting threads will begin to return
if errp == nil || *errp == nil {
ww.dataVersion.Set(ww.calculationStart.Get())
}
ww.cond.L.Lock()
defer ww.cond.L.Unlock()
// broadcast to wake all waiting threads
ww.cond.Broadcast()
// allow for next winner to be picked
ww.winnerPicker.Clear()
}
func checkWinOrWaiter(ww *WinOrWaiterCore) {
if ww == nil {
panic(perrors.NewPF("use of nil WinOrWaiterCore"))
} else if ww.cond.L == nil {
panic(perrors.NewPF("use of uninitialized WinOrWaiterCore"))
}
}
type WinOrWaiterStrategy uint8
func (ws WinOrWaiterStrategy) String() (s string) {
return winOrWaiterSet.StringT(ws)
}
func (ws WinOrWaiterStrategy) IsValid() (isValid bool) {
return winOrWaiterSet.IsValid(ws)
}
var winOrWaiterSet = sets.NewSet(sets.NewElements[WinOrWaiterStrategy](
[]sets.SetElement[WinOrWaiterStrategy]{
{ValueV: WinOrWaiterAnyValue, Name: "anyValue"},
{ValueV: WinOrWaiterMustBeLater, Name: "mustBeLater"},
}))