/
nb-chan-thread.go
334 lines (290 loc) · 9.83 KB
/
nb-chan-thread.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
/*
© 2023–present Harald Rudell <harald.rudell@gmail.com> (https://haraldrudell.github.io/haraldrudell/)
ISC License
*/
package parl
import (
"math"
"github.com/haraldrudell/parl/pruntime"
)
// sendThread feeds values to the send channel
// - may be on-demand or always-on thread
// - verbose='NBChan.*sendThread'
func (n *NBChan[T]) sendThread(value T, hasValue bool) {
var zeroValue T
// signal thread exit to CloseNow if it is waiting
defer n.tcThreadExitAwaitable.Close()
// execute possible deferred close from Close invocation
defer n.sendThreadDeferredClose()
defer Recover(func() DA { return A() }, nil, n.sendThreadOnError)
// send value loop
for {
if hasValue {
// send the value: blocks here
// - until consumer receive, Get, CloseNow or panic
// - decrements unsent count
n.sendThreadBlockingSend(value)
hasValue = false
value = zeroValue
}
// obtain next value loop
for {
// check for CloseNow prior to next value
if n.sendThreadIsCloseNow() {
return // close now exit: immediate discard and exit
}
// if no data, decide on action
if n.unsentCount.Load() == 0 {
n.sendThreadZero()
// always-thread not in deferred close: wait for alert
if n.isThreadAlways.Load() && !n.isCloseInvoked.IsInvoked() {
// blocks here
if value, hasValue = n.sendThreadWaitForAlert(); hasValue {
break // send the value received by alert
}
continue // re-check closeNow and unsent count for next action
// on-demand thread or always in deferred close:
// exit on no data and no pending sends
} else if n.sendThreadExitCheck() {
// on-demand thread or always-on after Close exits here
return // no data, no pending sends: exit thread
}
} // obtain next value loop
// there is more data to send
if hasValue {
break // send the always-on thread value from alert
}
// is on-demand thread and data is available
// wait for [NBChan.gets] to be or reach 0
// - Get invocations get items before sendThread
if ch := n.getsWait.Ch(); ch != nil {
for {
select {
case <-ch: // Get ceased
case n.stateCh() <- NBChanGets: // respond is in Gets wait
continue
}
break
}
}
// try to get value from any queue
if value, hasValue = n.sendThreadGetNextValue(); hasValue {
break // send the value fetched from queues
}
// unsent count has reached zero or Get is in progress
// - wait for any sends to conclude that may provide additional items
if ch := n.sendsWait.Ch(); ch != nil {
for {
select {
case <-ch:
case n.stateCh() <- NBChanSends:
continue
}
break
}
}
}
// a value was obtained
// only send if closeNow has not been invoked
if !n.sendThreadNewSendCheck() {
return // CloseNow: item is discarded, thread exits
}
}
}
// sendThreadDeferredClose may close the underlying channel
// - is how sendThread executes deferred close
// - closes if Close was invoked while thread running and not CloseNow
// - invoked by sendThread on exit
// - updates dataWaitCh
func (n *NBChan[T]) sendThreadDeferredClose() {
// is it deferred close?
if !n.isCloseInvoked.IsInvoked() || // no: Close has not been invoked
n.isCloseNow.IsInvoked() { // CloseNow overrides deferred close
n.updateDataAvailable()
return // no deferred close pending return: noop
}
// for on-demand thread, ensure out of data
if !n.isThreadAlways.Load() {
if n.unsentCount.Load() > 0 {
return
}
// tcThread
}
// execute deferred close
// - error is stored in error container. isClosed is active
n.executeChClose()
// close data waiter
n.setDataAvailableAfterClose()
}
// sendThreadZero notifies background that thread
// took action on unsent count zero
func (n *NBChan[T]) sendThreadZero() {
n.tcDoProgressRaised(true)
n.tcProgressLock.Lock()
defer n.tcProgressLock.Unlock()
if n.unsentCount.Load() == 0 {
n.tcProgressRequired.Store(true)
}
}
// sendThreadOnError submits thread panic
// - ignores send on closed channel after closenow
func (n *NBChan[T]) sendThreadOnError(err error) {
if pruntime.IsSendOnClosedChannel(err) && n.isCloseNow.IsInvoked() {
return // ignore if the channel was or became closed
}
n.AddError(err)
}
// sendThreadBlockingSend sends blocking on consumer-receive channel
// - decrements unsent count
// - blocks until:
// - — consumer read the value
// - — Get empties the channel using collectSendThreadValue
// - — CloseNow discards the value using discardSendThreadValue
// - invoked by sendThread holding inputLock
func (n *NBChan[T]) sendThreadBlockingSend(value T) {
defer n.updateDataAvailable()
// count the item just sent — even if panic
defer n.unsentCount.Add(math.MaxUint64)
// clear two-chan receive second channel
n.collectChanActive.Store(nil)
// receive value with default has proven to result in default. Therefore:
// - two-chan receive is used by tcCollectThreadValue to prevent deadlock and aba
// - send-thread provides an atomic true and a nil atomic channel value
// upon commencing send operation
// - the atomic true allows other threads to write the atomic channel value and
// reset the atomic true to false
// - a winner thread observing the atomic true value stores a 1-size empty channel,
// and proceeds if it is able to set the atomic true value to false
// - at end of send operation, send-thread attempts to change the atomic value from true to false
// - if the atomic value was true, no two-chan receive is in progress
// - otherwise, send-thread sends on the atomic channel
// - thereby, send-thread will not enter dead-lock and avoids aba-issue
defer n.sendThreadBlockingSendEnd()
// tcSendBlock makes collectChanActive available to tcCollectThreadValue threads
n.tcSendBlock.Store(true)
for {
select {
// send value to consumer or Get
// - may block or panic
case n.closableChan.Ch() <- value:
return
case n.stateCh() <- NBChanSendBlock:
}
}
}
// sendThreadBlockingSendEnd completes any two-chan receive operation
func (n *NBChan[T]) sendThreadBlockingSendEnd() {
// check if two-chan send was initiated
if n.tcSendBlock.CompareAndSwap(true, false) {
return // no value collect
}
// send to ensure tcCollectThreadValue is not blocked
if cp := n.collectChanActive.Load(); cp != nil {
*cp <- struct{}{}
}
}
// sendThreadGetNextValue gets the next value for thread
// - invoked by [NBChan.sendThread]
// - fails if pending Get or unsentCount ends up zero
func (n *NBChan[T]) sendThreadGetNextValue() (value T, hasValue bool) {
if n.gets.Load() > 0 || n.unsentCount.Load() == 0 {
return // send thread suspended by Get return: hasValue: false
}
// if a thread holding outputLock awaited thread state,
// acquiring outputLock here could cause dead-lock
// - only Get invocations do this
// - therefore, ensure outputLock is not acquired while Get
// in progress
n.collectorLock.Lock()
defer n.collectorLock.Unlock()
if n.gets.Load() > 0 {
return // cancel: Get in progress
}
n.outputLock.Lock()
defer n.outputLock.Unlock()
if hasValue = len(n.outputQueue) > 0 || n.swapQueues(); !hasValue {
return // no value available return: hasValue false
}
value = n.outputQueue[0]
n.outputQueue = n.outputQueue[1:]
return // have item return: value: valid, hasValue: true
}
// sendThreadWaitForAlert allows an always-on thread to await alert
// - the alert is a two-chan send that may provide a value
// - always threads do not exit, instead at end of data
// they wait for background events:
// - not if didClose
// - not if data available
// - an alert that may provide a data item
func (n *NBChan[T]) sendThreadWaitForAlert() (value T, hasValue bool) {
// reset atomic channel
n.alertChan2Active.Store(nil)
// n.threadChWinner true exposes channels to clients
n.tcAlertActive.Store(true)
// sending on threadCh2 ensures no client is hanging
defer n.sendThreadAlertEnd()
// blocks here
// - n.threadCh must be unbuffered for effect to be immediate
// - n.threadCh2 is present to prevent client from hanging in threadCh send
for {
select {
// wait for alert
case valuep := <-n.alertChan.Get():
if hasValue = valuep != nil; hasValue {
value = *valuep
n.unsentCount.Add(1)
}
return
// broadcast Alert wait
case n.stateCh() <- NBChanAlert:
}
}
}
func (n *NBChan[T]) sendThreadAlertEnd() {
// see if two-chan send operation in progress
if n.tcAlertActive.CompareAndSwap(true, false) {
return // no
} else if cp := n.alertChan2Active.Load(); cp != nil {
*cp <- struct{}{}
}
}
// sendThreadExitCheck stops thread if inside threadLock, unsentCount is 0
// - doStop true: channel has been read to end and no Send SendMany active
// - invoked when thread has detected Close invocation and is in deferred close
func (n *NBChan[T]) sendThreadExitCheck() (doStop bool) {
n.tcThreadLock.Lock()
defer n.tcThreadLock.Unlock()
if doStop = //
n.unsentCount.Load() == 0 && // only stop if out of data
n.sends.Load() == 0; // while no sends in progress
!doStop {
return
}
n.tcRunningThread.Store(false)
return
}
// sendThreadNewSendCheck ensures a new channel send does not start after
// CloseNow
// - on closeNow, value is discarded
// - re-arms closesOnThreadSend
func (n *NBChan[T]) sendThreadNewSendCheck() (doSend bool) {
if doSend = !n.isCloseNow.IsInvoked(); !doSend {
n.unsentCount.Add(math.MaxUint64) // drop the value
return // CloseNow inoked return: doSend: false
}
return // doSend: true
}
// sendThreadIsCloseNow checks for CloseNow invocation
// - isExit true: CloseNow was invoked
func (n *NBChan[T]) sendThreadIsCloseNow() (isExit bool) {
if !n.isCloseNow.IsInvoked() {
return // no CloseNow invocation return
}
n.tcThreadLock.Lock()
defer n.tcThreadLock.Unlock()
if isExit = n.isCloseNow.IsInvoked(); !isExit {
return
}
n.tcRunningThread.Store(false)
return // close now exit
}