/
nb-chan-send.go
158 lines (136 loc) · 3.88 KB
/
nb-chan-send.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
/*
© 2023–present Harald Rudell <harald.rudell@gmail.com> (https://haraldrudell.github.io/haraldrudell/)
ISC License
*/
package parl
import "math"
// Send sends a single value on the channel
// - non-blocking, thread-safe, panic-free and error-free
// - if Close or CloseNow was invoked, items are discarded
func (n *NBChan[T]) Send(value T) {
if n.isCloseInvoked.IsInvoked() {
return // no send after Close(), atomic performance: noop
}
n.preSend()
n.inputLock.Lock()
defer n.postSend()
// if Close or CloseNow was invoked, items are discarded
if n.isCloseInvoked.IsInvoked() {
return // no send after Close() return: noop
}
// try providing value to thread
// - ensures a thread is running if configured
// - updates threadProgressRequired
if n.tcAlertOrLaunchThreadWithValue(value) {
return // value was provided to a thread
}
// save value in [NBChan.inputQueue]
if n.inputQueue == nil {
n.inputQueue = n.newQueue(1) // will allocation proper size
}
n.inputQueue = append(n.inputQueue, value)
n.inputCapacity.Store(uint64(cap(n.inputQueue)))
n.tcAddProgress(1)
}
// Send sends many values non-blocking, thread-safe, panic-free and error-free on the channel
// - if values is length 0 or nil, SendMany only returns count and capacity
func (n *NBChan[T]) SendMany(values []T) {
var valueCount = len(values)
if n.isCloseInvoked.IsInvoked() || valueCount == 0 {
return // no send after Close(), atomic performance: noop
}
n.preSend()
n.inputLock.Lock()
defer n.postSend()
if n.isCloseInvoked.IsInvoked() {
return // no send after Close() return: noop
}
if n.tcAlertOrLaunchThreadWithValue(values[0]) {
values = values[1:]
valueCount--
if valueCount == 0 {
return // one value handed to thread return: complete
}
}
// save values in [NBChan.inputQueue]
if n.inputQueue == nil {
n.inputQueue = n.newQueue(valueCount)
}
n.inputQueue = append(n.inputQueue, values...)
n.inputCapacity.Store(uint64(cap(n.inputQueue)))
n.tcAddProgress(valueCount)
}
// preSend registers a Send or SendMany invocation pre-inputLock
// - send count is in [NBChan.sends]
// - handles [NBChan.sendsWait] that prevents a thread from exiting
// during Send SendMany invocations
func (n *NBChan[T]) preSend() {
if n.sends.Add(1) == 1 {
n.sendsWait.HoldWaiters()
}
}
// post send is deferred for [NBChan.Send] and [NBChan.SendMany]
// - release inputLock
// - alert thread if no pending Get and values ar present
func (n *NBChan[T]) postSend() {
n.inputLock.Unlock()
// update dataWaitCh
n.updateDataAvailable()
// decrement sends
if n.sends.Add(math.MaxUint64) == 0 {
n.sendsWait.ReleaseWaiters()
}
// ensure progress
for {
if isZeroObserved, isGets := n.tcIsDeferredSend(); !isZeroObserved || isGets {
// progress not required or
// deferred by Get invocations
return
} else if !n.tcAwaitProgress() {
// progress was secured
return
} else if n.gets.Load() > 0 {
// subsequent Send SendMany exist
// - after sends decrement, those will arrive at ensure progress
return
}
}
}
// ensureInput allocates or enlarges for [NBChan.SetAllocationSize]
func (n *NBChan[T]) ensureInput(size int) (queue []T) {
n.inputLock.Lock()
defer n.inputLock.Unlock()
if n.inputQueue != nil {
return
}
n.inputQueue = n.newQueue(size)
return
}
func (n *NBChan[T]) ensureOutput(size int) (queue []T) {
n.outputLock.Lock()
defer n.outputLock.Unlock()
if n.outputQueue != nil {
return
}
n.outputQueue = n.newQueue(size)
return
}
// newQueue allocates a new queue slice
// - capacity is at least count elements
// - the slice is empty
func (n *NBChan[T]) newQueue(count int) (queue []T) {
// determine size
var size = int(n.allocationSize.Load())
if size > 0 {
if count > size {
size = count
}
} else {
size = defaultNBChanSize
if count > size {
size = count * 2
}
}
// return allocated zero-length queue
return make([]T, size)[:0]
}