-
Notifications
You must be signed in to change notification settings - Fork 1
/
nb-rare-chan.go
230 lines (202 loc) · 6.96 KB
/
nb-rare-chan.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
/*
© 2022–present Harald Rudell <harald.rudell@gmail.com> (https://haraldrudell.github.io/haraldrudell/)
ISC License
*/
package parl
import (
"sync"
"sync/atomic"
"github.com/haraldrudell/parl/perrors"
)
// NBRareChan is a simplified [NBChan] using on-demand thread
// - NBRareChan is a channel with unbound buffer
// like an unbuffered channel reading from a thread-safe slice
// - [NBRareChan.Send] provides value send that is non-blocking, thread-safe, panic, dead-lock and error-free
// - [NBRareChan.Ch] provides real-time value stream or
// - [NBRareChan.Close] provides value collection
// - [NBRareChan.StopSend] blocks further Send allowing for graceful shutdown
// - [NBRareChan.IsClose] returns whether the underlying channel is closed
// - [NBRareChan.PanicCh] provides real-time notice of thread panic, should not happen
// - NBRareChan is initialization-free and thread-safe with
// thread-safe panic-free idempotent observable deferrable Close
// - used as an error sink, NBRareChan[error] prevents error propagation from affecting the thread
// - ignoring thread panics and Close errp is reasonably safe simplification
// - intended for infrequent use such as an error sink
// - benefits over plain channel:
// - — [NBRareChan.Send] is non-blocking-send panic-free non-dead-locking
// - — initialization-free
// - — unbound buffer
// - — any thread can close the channel as opposed to only the sending thread
// - — thread-synchronizing unbuffered channel-send mechanic as opposed to a buffered channel
// - — graceful shutdown like a buffered channel
// - — [NBRareChan.Close] is any-thread data-race-free thread-safe panic-free idempotent observable deferrable
// - drawbacks compared to [NBChan]:
// - — on-demand thread may lead to high cpu if used frequently like every second.
// NBChan offers no-thread or always-thread operation
// - — buffering a large number of items leads to a temporary memory leak in queue
// - — there is no contention-separation between Send and reading Ch
// - — no multiple-item operations like SendMany or Get
// - — less observable and configurable
type NBRareChan[T any] struct {
// underlying channel
closableChan ClosableChan[T]
// makes all created threads awaitable
threadWait sync.WaitGroup
// queueLock ensures thread-safety of queue
// - also ensures sequenced access to isThread isStopSend
queueLock sync.Mutex
// accessed behind queueLock
didSend bool
// accessed behind queueLock
queue []T
// accessed behind queueLock
threadReadingValues CyclicAwaitable
// accessed behind queueLock
isStopSend atomic.Bool
// returned by StopSend await empty channel
isEmpty Awaitable
// sendThread panics, should be none
errs atomic.Pointer[error]
// returned by PanicCh await thread panic
isPanic Awaitable
// ensures close executed once
closeOnce OnceCh
}
// Ch obtains the receive-only channel
// - values can be retrieved using this channel or [NBChan.Get]
// - not available for [NBChanNone] NBChan
func (n *NBRareChan[T]) Ch() (ch <-chan T) { return n.closableChan.Ch() }
// Send sends a single value on the channel
// - non-blocking, thread-safe, panic-free and error-free
// - if Close or CloseNow was invoked, items are discarded
func (n *NBRareChan[T]) Send(value T) {
n.queueLock.Lock()
defer n.queueLock.Unlock()
// ignore values after Close or StopSend
if n.closableChan.IsClosed() || n.isStopSend.Load() {
return
}
// possibly create thread with value
var createThread bool
if createThread = !n.didSend; createThread {
n.didSend = true
} else {
createThread = n.threadReadingValues.IsClosed()
}
if createThread {
n.threadReadingValues.Open()
n.threadWait.Add(1)
go n.sendThread(value)
return
}
// append value to buffer
n.queue = append(n.queue, value)
}
// StopSend ignores further Send allowing for the channel to be drained
// - emptyAwaitable triggers once the channel is empty
func (n *NBRareChan[T]) StopSend() (emptyAwaitable AwaitableCh) {
n.queueLock.Lock()
defer n.queueLock.Unlock()
n.isStopSend.CompareAndSwap(false, true)
emptyAwaitable = n.isEmpty.Ch()
if len(n.queue) == 0 && n.threadReadingValues.IsClosed() {
n.isEmpty.Close()
}
return
}
// PanicCh is real-time awaitable for panic in sendThread
// - this should not happen
func (n *NBRareChan[T]) PanicCh() (emptyAwaitable AwaitableCh) { return n.isPanic.Ch() }
// IsClose returns true if underlying channel is closed
func (n *NBRareChan[T]) IsClose() (isClose bool) { return n.closableChan.IsClosed() }
// Close immediately closes the channel returning any contained values
// - values: possible values that were in channel, may be nil
// - errp: receives any panics from thread. Should be none. may be nil
// - upon return, resources are released and further Send ineffective
func (n *NBRareChan[T]) Close(values *[]T, errp *error) {
// ensure once execution
if isWinner, done := n.closeOnce.IsWinner(); !isWinner {
return // loser thread has already awaited done
} else {
defer done.Done()
}
// collect queue and stop further Send
var queue = n.close()
// collect possible value from thread and shut it down
if n.didSend {
select {
case value := <-n.closableChan.Ch():
queue = append([]T{value}, queue...)
case <-n.threadReadingValues.Ch():
n.isEmpty.Close()
}
// wait for all created threads to exit
n.threadWait.Wait()
if errp != nil {
if ep := n.errs.Load(); ep != nil {
*errp = perrors.AppendError(*errp, *ep)
}
}
}
// close underlying channel
n.closableChan.Close()
// return values
if values != nil && len(queue) > 0 {
*values = queue
}
}
// collect queue and stop further Send
func (n *NBRareChan[T]) close() (values []T) {
n.queueLock.Lock()
defer n.queueLock.Unlock()
if values = n.queue; values != nil {
n.queue = nil
}
n.isStopSend.Store(true)
return
}
// sendThread carries out send operations on the channel
func (n *NBRareChan[T]) sendThread(value T) {
defer n.threadWait.Done()
defer Recover(func() DA { return A() }, nil, n.sendThreadPanic)
var ch = n.closableChan.Ch()
for {
ch <- value
var hasValue bool
if value, hasValue = n.sendThreadNextValue(); !hasValue {
return
}
}
}
// sendThreadNextValue obtains the next valkue to send for thread if any
func (n *NBRareChan[T]) sendThreadNextValue() (value T, hasValue bool) {
n.queueLock.Lock()
defer n.queueLock.Unlock()
if hasValue = len(n.queue) > 0; hasValue {
value = n.queue[0]
n.queue = n.queue[1:]
return
}
n.threadReadingValues.Close()
if n.isStopSend.Load() {
n.isEmpty.Close()
}
return
}
// sendThreadPanic aggregates thread panics
func (n *NBRareChan[T]) sendThreadPanic(err error) {
for {
var errp0 = n.errs.Load()
if errp0 == nil && n.errs.CompareAndSwap(nil, &err) {
break // wrote new error
}
var err2 = perrors.AppendError(*errp0, err)
if n.errs.CompareAndSwap(errp0, &err2) {
break // appended error
}
}
n.isPanic.Close()
n.queueLock.Lock()
defer n.queueLock.Unlock()
n.threadReadingValues.Close()
}