-
Notifications
You must be signed in to change notification settings - Fork 188
Description
Using .share(bufferingPolicy: .bounded(0)) causes the shared sequence (introduced with #357) to often deadlock. The same code works correctly with .bounded(1) or .unbounded. This suggests that .bounded(0) is incorrectly implemented.
I some to test different aspects of the share() algorithm, after reading SAA-0015 and Guides - Share. My goal was to reproduce a similar behavior to Combine's PassthroughSubject.
Minimal reproduction
import AsyncAlgorithms
import Foundation
actor Flag {
var started = false
func mark() { started = true }
}
func runOnce(_ iteration: Int) async {
let (stream, continuation) = AsyncStream.makeStream(
of: Int.self,
bufferingPolicy: .bufferingNewest(0)
)
// Only .bounded(0) causes the hang.
let shared = stream.share(bufferingPolicy: .bounded(0))
let flag = Flag()
let consumerTask = Task {
await flag.mark()
var values: [Int] = []
for await value in shared {
values.append(value)
try? await Task.sleep(for: .milliseconds(5))
}
print("Iteration \(iteration) received:", values)
}
// Producer
Task {
while await !flag.started {
try? await Task.sleep(for: .milliseconds(5))
}
continuation.yield(1)
try? await Task.sleep(for: .milliseconds(20))
continuation.yield(2)
continuation.finish()
}
_ = await consumerTask.result
}
for i in 1...10 {
print("Running iteration \(i)...")
await runOnce(i)
}Running iteration 1...
Iteration 1 received: [1, 2]
Running iteration 2...
Observed behavior
Running iteration 1...
Iteration 1 received: [1, 2]
Running iteration 2...
// hangs here forever
Switching .bounded(0) to .bounded(1) makes all iterations complete normally - but it would also send values to the consumer that are enqueued by the producer before the consumer is created.
Expected behavior
.bounded(0)should behave predictably and should not deadlock.
Likely root cause
Inside AsyncShareSequence<Base>.Iteration.iterate():
guard state.buffer.count >= limit else ...With .bounded(0), limit == 0, so at startup:
0 >= 0 // always trueThis forces iterate() into the branch that installs a continuation into state.limit and suspends the producer—even when the buffer is empty.
If a consumer calls next() before iterate() installs that continuation, emit(nil, limit: 0) sees:
state.limit == nil- so it cannot resume the producer
Then when iterate() later suspends on a newly created limit continuation, nothing will ever resume it, causing a permanent deadlock.
The implementation appears to assume limit >= 1 and does not define correct semantics for limit == 0.