-
Notifications
You must be signed in to change notification settings - Fork 1
/
nb-chan-thread-control.go
398 lines (347 loc) · 13.1 KB
/
nb-chan-thread-control.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
/*
© 2023–present Harald Rudell <harald.rudell@gmail.com> (https://haraldrudell.github.io/haraldrudell/)
ISC License
*/
package parl
const (
// tcStartThread
NoValue = false
// tcStartThread
HasValue = true
)
// tcCreateWinner seeks permission to create sendThread
// - isWinner true: go n.sendThread should be invoked by this invocation
// - isWinner false: a thread is running, being started or it is Close/CloseNow
func (n *NBChan[T]) tcCreateWinner() (isWinner bool) {
n.tcThreadLock.Lock() // atomizes Close/CloseNow tcRunningThread
defer n.tcThreadLock.Unlock()
// no thread creation after CloseNow
if n.isCloseNow.IsInvoked() {
return
// no thread creation after Close if channel object empty
} else if n.isCloseInvoked.IsInvoked() && n.unsentCount.Load() == 0 {
return
}
// note that thread did at one time launch
// - before tcRunningThread is set to true
n.tcDidLaunchThread.CompareAndSwap(false, true)
// check for thread already running or this invocation should not launch it
if isWinner = n.tcRunningThread.CompareAndSwap(false, true); !isWinner {
return // thread was already running return
}
// arm thread exit awaitable
n.tcThreadExitAwaitable.Open()
return
}
// tcAlertOrLaunchThreadWithValue ensures thread progress from a Send endMany during unsent count zero
// - didProvideValue true: value was provided to thread via launch or alert and
// unsent count was incremented
// - if on-demand or always thread and a unsent count zero, thread progress must be guaranteed
// - thread progress is deferred if Get is in progress
// - Invoked by [NBChan.Send] and [NBChan.SendMany]
// - on invocation, value is not part of unsentCount
func (n *NBChan[T]) tcAlertOrLaunchThreadWithValue(value T) (didProvideValue bool) {
// if unsent count is zero, a thread may have to be alerted or launched
// - no thread may have been launched
// - on-demand thread may have exited on unsent count zero
// - always thread may await an alert
// - no action is necessary if:
// - — threading is not used
// - — unsent count is not zero
// - — Gets are in progress for which a thread would be a detour
// - NBChan may be configured for no-thread on-demand-thread or always-on thread
// - no thread may be running
// - this is the only Send SendMany invocation
// - Get invocations may be ongoing
// if NBChan is configured for no thread, value cannot be provided
if n.isNoThread.Load() {
return // no-thread configuration
}
// if unsent count is not zero, no change is required to threading
// - only Send SendMany which use inputLock can increase this value
if n.unsentCount.Load() > 0 {
return // channel is not empty return
}
// - unsent count is zero
// - configuration is on-demand-thread or always-thread
// - there may be no thread running
// - progress must now be guaranteed by:
// - — launching the send-thread
// - — alerting an always-thread
// - — for on-demand thread in unknown state,
// observing it in a state once data has been added to inputBuffer.
// It may otherwise exit
// if Get is in progress, thread should launch later
if _, isGets := n.tcIsDeferredSend(); isGets {
return // deferred: threadProgressRequired true
}
// starting the thread, that is most important
// - a launched thread will go to send value
if _, didProvideValue = n.tcCreateProgress(); didProvideValue {
n.unsentCount.Add(1)
go n.sendThread(value, HasValue)
return // thread was started with value
}
// try alerting a waiting always-on thread without waiting
// - if successful, thread will go to send value
if didProvideValue = n.tcAlertProgress(&value); didProvideValue {
return // always-on thread alerted with value
}
// there is a thread running without holding an item
// - on-demand or always
// - may be executing towards: exit getsWait sendsWait alert
// - Close or CloseNow may have been invoked
// - at end of Send SendMany Get invocations, a static thread-state must be awaited
// so that progress is guaranteed
// - issue is that:
// - — an on-demand thread may exit while items are present
// - — an always thread may enter alert state where it must be alerted
// - flag it to be dealt with once all Send SendMany Get completes
return
}
// tcCreateProgress seeks progress by creating the send-thread
// - honorProgressRaised true: isProgress is only true if tcProgressRaised remains false
// - isProgress true: this operation is thread progress, isCreateThread is also true
// - isCreateThread true: caller should invoke go n.sendThread
func (n *NBChan[T]) tcCreateProgress(honorProgressRaised ...bool) (isProgress, isCreateThread bool) {
n.tcProgressLock.Lock()
defer n.tcProgressLock.Unlock()
if isCreateThread = n.tcCreateWinner(); !isCreateThread {
return // no progress no thread to be created
} else if len(honorProgressRaised) > 0 && honorProgressRaised[0] && n.tcDoProgressRaised() {
return // progressRaised true, so this is not progress
}
isProgress = true
return
}
// HonorProgressRaised ignore any progress made if tcProgressRaised is true
const HonorProgressRaised = true
// record progress regardless of tcProgressRaised
const IgnoreProgressRaised = false
// tcAlertProgress attempts progress via alert ignoring tcProgressRaised
// - valuep: if present and non-nil provided in alert
// - isProgress true: value was provided, operation was thread progress
func (n *NBChan[T]) tcAlertProgress(valuep ...*T) (isProgress bool) {
isProgress, _ = n.tcAlertProgress2(IgnoreProgressRaised, valuep...)
return
}
// tcAlertProgress2 attempts progress via alert optionally honoring tcProgressRaised
// - honorProgressRaised true: isProgress is only true if tcProgressRaised remains false
// - valuep: optional value provided with alert
// - isProgress operation counts as progress
// - didAlert an alert was successfully sent, if a value was present is was provided
// - sendThread upon receiving the value, increases unsent count
func (n *NBChan[T]) tcAlertProgress2(honorProgressRaised bool, valuep ...*T) (isProgress, didAlert bool) {
n.tcProgressLock.Lock()
defer n.tcProgressLock.Unlock()
// try alerting the thread
var valuep0 *T
if len(valuep) > 0 {
valuep0 = valuep[0]
}
if didAlert = //
n.tcAlertActive.Load() && // only when send-thread awaits alert
n.tcAlertThread(valuep0); //
!didAlert {
return // no successful alert
} else if isProgress = !honorProgressRaised || !n.tcDoProgressRaised(); !isProgress {
return // was progress raised true
}
n.tcProgressRequired.Store(false)
return
}
func (n *NBChan[T]) tcAddProgress(count int) {
n.tcDoProgressRaised(true)
n.tcProgressLock.Lock()
defer n.tcProgressLock.Unlock()
if n.unsentCount.Add(uint64(count)) == uint64(count) && !n.isNoThread.Load() {
n.tcProgressRequired.Store(true)
}
}
// tcAwaitProgress awaits a static state from thread then ensures progress
// - threadProgressRequired true: progress is currently not guaranteed
// - progress is:
// - — creating sendThread
// - — alerting sendThread
// - — a non-exit thread-state observed
// - if tcProgressRequired was true on invocation and a zero-count event occured during wait,
// threadProgressRequired is true
func (n *NBChan[T]) tcAwaitProgress() (threadProgressRequired bool) {
n.tcAwaitProgressLock.Lock() // ensures critical section
defer n.tcAwaitProgressLock.Unlock()
// check if progress action remains required
if threadProgressRequired = n.tcProgressRequired.Load(); !threadProgressRequired {
return
}
// reset progress raised, ie.
// - thread observing unsent count zero
// - Send SendMany increasing unsent count from zero
n.tcDoProgressRaised(false)
// await static thread status
// - cannot hold tcProgressLock lock
var threadState NBChanTState
select {
// thread exit
case <-n.tcThreadExitAwaitable.Ch():
if n.isOnDemandThread.Load() {
var isProgress, isCreateThread = n.tcCreateProgress(HonorProgressRaised)
if isCreateThread {
// start thread without value
var value T
go n.sendThread(value, NoValue)
}
threadProgressRequired = !isProgress
}
return
case threadState = <-n.stateCh():
}
// received thread status
// alert
if threadState == NBChanAlert {
if isProgress, _ := n.tcAlertProgress2(HonorProgressRaised); isProgress {
threadProgressRequired = false
}
return
}
// all other states are good states
// - if a zero unsent period occurred in the meantime,
// - tcProgressRaised is true and
// - the operation was not progress
threadProgressRequired = n.tcDoProgressRaised()
return
}
// tcDoProgressRaised updates and or returns tcProgressRaised
// - wasRaised is the tcProgressRaised at time of invocation
// - isRaised missing: read operation, otherwise tcProgressRaised is set to isRaised
// - tcProgressRaised is set upon:
// on Send SendMany adding from unsent count zero
// - — the send-thread taking action on unsent count zero or
// - — Send SendMany adding items from unsent count zero
// - it indicates that sendThread progress may fail if not ensured
// - tcProgressRaised is used when awaiting thread-state since tcProgressLock
// cannot be held
// - if a zero condition occured during await of thread-state, the wait operation must be retried
func (n *NBChan[T]) tcDoProgressRaised(isRaised ...bool) (wasRaised bool) {
if len(isRaised) == 0 {
wasRaised = n.tcProgressRaised.Load()
return
}
wasRaised = n.tcProgressRaised.Swap(isRaised[0])
return
}
// tcAlertThread alerts any waiting always-thread
// - invoked from Send/SendMany
// - value has not been added to unsentCount yet
// - increments unsentCount if value is non-nil and was provided to thread
func (n *NBChan[T]) tcAlertThread(valuep ...*T) (didAlert bool) {
// atomizes always-alert with detecting no Get in progress
n.collectorLock.Lock()
defer n.collectorLock.Unlock()
// filter send request
if n.gets.Load() > 0 || // not when no Get in progress
!n.tcAlertActive.Load() { // only when send-thread awaits alert
return // no alert now
}
// prepare two-chan send second channel
var alertChan2 = n.alertChan2.Get(1)
if len(alertChan2) > 0 {
<-alertChan2
}
n.alertChan2Active.Store(&alertChan2)
// verify that two-chan send still available
if !n.tcAlertActive.CompareAndSwap(true, false) {
return
}
// send value to always-thread
// - always thread increments unsentCount on reception
var valuep0 *T
if len(valuep) > 0 {
valuep0 = valuep[0]
}
select {
case n.alertChan.Get() <- valuep0:
didAlert = true
case <-alertChan2:
}
return
}
// tcIsDeferredSend checks for a progress requirement
// - invoked by the last ending Get invocation
func (n *NBChan[T]) tcIsDeferredSend() (isProgressRequired, isGets bool) {
n.tcGetProgressLock.Lock()
defer n.tcGetProgressLock.Unlock()
isProgressRequired = n.tcProgressRequired.Load()
isGets = n.gets.Load() > 0
return
}
// tcCollectThreadValue receives any value in sendThread channel send
// - invoked by [NBChan.Get] while holding output lock
// - must await any thread value to ensure values provided in order
// - thread receives value from:
// - — Send SendMany that launches thread, but only when sent count 0
// - — always: thread alert
// - —on-demand: GetNextValue
func (n *NBChan[T]) tcCollectThreadValue() (value T, hasValue bool) {
// if thread is not running, it does not hold data
if !n.tcRunningThread.Load() {
return // thread not running
}
// await static thread state
// - state must be awaited since the thread may be in progress
// with a value towards NBChanSendBlock
// - if NBChanSendBlock, threads hold a value
// - in all other static thread states, thread holds no value
// - because this thread holds outputLock,
// on-demand thread cannot collect additional values
// - collectorLock ensures that no always-thread alerts are carried out
// while Get is in progress
select {
// thread exited
case <-n.tcThreadExitAwaitable.Ch():
return // thread exited return
case chanState := <-n.stateCh():
// if it is not send value block, ignore
// - NBChanSendBlock is the only wait where thread has value
if chanState != NBChanSendBlock {
return // thread is not held in send value
}
}
// thread holds value in state NBChanSendBlock
// because this thread holds outputLock,
// only one thread at a time may arrive here
// - competing with consumers and closeNow for the value
// ensure two-chan receive operation is available
if !n.tcSendBlock.Load() {
return // the value went to another thread
}
// prepare two-chan receive second channel
var collectChan = n.collectChan.Get(1)
if len(collectChan) > 0 {
<-collectChan
}
n.collectChanActive.Store(&collectChan)
// seek permission for two-chan receive
if !n.tcSendBlock.CompareAndSwap(true, false) {
return // the value went to another thread
}
// two-chan fetch of value
select {
case value, hasValue = <-n.closableChan.Ch():
case <-collectChan:
}
return
}
// returns a channel producing values if thread is holding:
// - NBChanSendBlock NBChanAlert NBChanGets NBChanSends
func (n *NBChan[T]) stateCh() (ch chan NBChanTState) {
if chp := n.tcState.Load(); chp != nil {
ch = *chp
return
}
ch = make(chan NBChanTState)
if n.tcState.CompareAndSwap(nil, &ch) {
return
}
ch = *n.tcState.Load()
return
}