/
nb-chan-get.go
198 lines (172 loc) · 5.4 KB
/
nb-chan-get.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
/*
© 2023–present Harald Rudell <harald.rudell@gmail.com> (https://haraldrudell.github.io/haraldrudell/)
ISC License
*/
package parl
import "math"
// Get returns a slice of elementCount or default or zero for all available items held by the channel.
// - if channel is empty, 0 items are returned
// - Get is non-blocking
// - n > 0: max this many items
// - n == 0 (or <0): all items
// - Get is panic-free non-blocking error-free thread-safe
func (n *NBChan[T]) Get(elementCount ...int) (allItems []T) {
// empty NBChan: noop return
if n.unsentCount.Load() == 0 {
return // no items available return: nil slice
}
// no Get after CloseNow
if n.isCloseNow.IsInvoked() {
return
}
// notify of pending Get
n.preGet()
// arguments
// soughtItemCount: 0 for isAllItems, >0 for that many items
var soughtItemCount int
if len(elementCount) > 0 {
if soughtItemCount = elementCount[0]; soughtItemCount < 0 {
soughtItemCount = 0
}
}
// Get request seeks all available items
var isAllItems = soughtItemCount == 0
if isAllItems {
if n := n.unsentCount.Load(); n > 0 {
allItems = make([]T, 0, n) // approximate size
}
}
n.outputLock.Lock()
defer n.postGet()
// get possible item from send thread
// - thread decrements unsent count
if item, itemValid := n.tcCollectThreadValue(); itemValid {
allItems = append(allItems, item)
if !isAllItems {
if soughtItemCount--; soughtItemCount == 0 {
return // fetch complete return
}
}
}
// fetch from n.outputQueue
// - updates unsent count
allItems = n.fetchFromOutput(&soughtItemCount, isAllItems, allItems)
if !isAllItems && soughtItemCount == 0 {
return // fetch complete return
}
// fetch from m.inputQueue
if n.swapQueues() {
allItems = n.fetchFromOutput(&soughtItemCount, isAllItems, allItems)
}
return
}
// preGet registers a pending Get invocation prior to outputLock
// - increases gets and may hold getsWait
// - block concurrent always-alert
func (n *NBChan[T]) preGet() {
if n.gets.Add(1) == 1 {
n.getsWait.HoldWaiters()
if !n.isThreadAlways.Load() {
return
}
// await any Send SendMany always-alert operation has ended
// and will not be started again before all Get have exited
n.collectorLock.Lock()
defer n.collectorLock.Unlock()
}
}
// postGet is the deferred ending function for [NBChan.Get]
// - release outputLock
// - update dataWaitCh
// - decrease number of Get invocations
// - if more Get invocations are pending, do nothing
// - otherwise, release getsWait
// - check for deferred progress, if so ensure thread progress
func (n *NBChan[T]) postGet() {
n.outputLock.Unlock()
// update dataAvailable
var unsentCount = n.unsentCount.Load()
n.setDataAvailable(unsentCount > 0)
// check for last Get
if n.gets.Add(math.MaxUint64) > 0 {
return // more Get pending
}
n.getsWait.ReleaseWaiters()
// last ending Get handles progress
// - Send and SendMany was invoked finding unsent count zero
// - this is endangers thread progress because:
// - — an on-demand thread may exit
// - — an always-thread may enter alert wait
// - sends may still be in progress
// - sends will not take action while Get active.
// This is after the final Get ended
// - it is on-demand or always thread
// - a progress guaranteeing event must be observed
for {
if isZeroObserved, isGets := n.tcIsDeferredSend(); !isZeroObserved || isGets {
// progress not required or
// additional Get invocations exist
return
} else if !n.tcAwaitProgress() {
// progress was secured
return
}
}
}
// swapQueues swaps n.inputQueue and n.outputQueue0
// - hasData true means data is available
// - hasData false means inputQueue was empty and a swap did not take place
// - n.outputQueue must be empty
// - invoked while holding [NBChan.outputLock]
// - [NBChan.inputLock] cannot be held
func (n *NBChan[T]) swapQueues() (hasData bool) {
n.inputLock.Lock()
defer n.inputLock.Unlock()
if hasData = len(n.inputQueue) > 0; !hasData {
return // no data in input queue return
}
// swap the queues
n.outputQueue = n.inputQueue
n.outputCapacity.Store(uint64(cap(n.outputQueue)))
n.inputQueue = n.outputQueue0
n.inputCapacity.Store(uint64(cap(n.inputQueue)))
n.outputQueue0 = n.outputQueue[:0]
return
}
// fetchFromOutput gets items from [NBChan.outputQueue]
// - [NBChan.outputLock] must be held
// - decrements unsent count
func (n *NBChan[T]) fetchFromOutput(soughtItemCount *int, isAllItems bool, allItems0 []T) (allItems []T) {
allItems = allItems0
// empty queue case: no items
var itemGetCount = len(n.outputQueue)
if itemGetCount == 0 {
return // no available items return
}
var zeroValue T
var soughtIC = *soughtItemCount
// entire queue case: itemCount items
if isAllItems || itemGetCount <= soughtIC {
allItems = append(allItems, n.outputQueue...)
for i := 0; i < itemGetCount; i++ {
n.outputQueue[i] = zeroValue
}
n.outputQueue = n.outputQueue[:0]
n.unsentCount.Add(uint64(-itemGetCount))
if !isAllItems {
*soughtItemCount -= itemGetCount
}
return // all queue items return: done
}
// first part of queue: *soughtItemCount items
allItems = append(allItems, n.outputQueue[:soughtIC]...)
copy(n.outputQueue, n.outputQueue[soughtIC:])
var endIndex = itemGetCount - soughtIC
for i := endIndex; i < itemGetCount; i++ {
n.outputQueue[i] = zeroValue
}
n.outputQueue = n.outputQueue[:endIndex]
n.unsentCount.Add(uint64(-soughtIC))
*soughtItemCount = 0
return
}