From a3f4b66f07bcfd8d7fe224e3830c0d90b728fd88 Mon Sep 17 00:00:00 2001 From: Alessio Nossa Date: Tue, 9 Dec 2025 19:07:17 +0100 Subject: [PATCH 1/5] Introduce test_share_with_no_buffering --- Tests/AsyncAlgorithmsTests/TestShare.swift | 36 ++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/Tests/AsyncAlgorithmsTests/TestShare.swift b/Tests/AsyncAlgorithmsTests/TestShare.swift index ac817536..14aa273c 100644 --- a/Tests/AsyncAlgorithmsTests/TestShare.swift +++ b/Tests/AsyncAlgorithmsTests/TestShare.swift @@ -129,6 +129,42 @@ final class TestShare: XCTestCase { XCTAssertEqual(results1.withLock { $0 }.sorted(), [1, 2, 3, 4, 5]) XCTAssertEqual(results2.withLock { $0 }.sorted(), [1, 2, 3, 4, 5]) } + + func test_share_with_no_buffering() async { + let shared = [1, 2, 3, 4, 5].async.share(bufferingPolicy: .bounded(0)) + + let results1 = Mutex([Int]()) + let results2 = Mutex([Int]()) + + let consumer1 = Task { + // Consumer 1 reads first element + for await value in shared { + results1.withLock { $0.append(value) } + // Delay to allow consumer 2 to get ahead + try? await Task.sleep(for: .milliseconds(10)) + } + } + + let consumer2 = Task { + // Consumer 2 reads all elements quickly + for await value in shared { + results2.withLock { $0.append(value) } + } + } + + await consumer1.value + await consumer2.value + + // Both consumers should receive all elements + XCTAssertEqual(results1.withLock { $0 }.sorted(), [1, 2, 3, 4, 5]) + XCTAssertEqual(results2.withLock { $0 }.sorted(), [1, 2, 3, 4, 5]) + } + + func test_share_with_no_buffering_multiple() async { + for _ in 0..<10 { + await test_share_with_no_buffering() + } + } func test_share_with_unbounded_buffering() async { let source = [1, 2, 3, 4, 5] From a524bda0e23bd9b60e367e24ce76f9162d95e690 Mon Sep 17 00:00:00 2001 From: Alessio Nossa Date: Tue, 9 Dec 2025 19:34:38 +0100 Subject: [PATCH 2/5] Update test_share_with_no_buffering implementation --- Tests/AsyncAlgorithmsTests/TestShare.swift | 27 ++++++++++++++++------ 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/Tests/AsyncAlgorithmsTests/TestShare.swift b/Tests/AsyncAlgorithmsTests/TestShare.swift index 14aa273c..d4c21456 100644 --- a/Tests/AsyncAlgorithmsTests/TestShare.swift +++ b/Tests/AsyncAlgorithmsTests/TestShare.swift @@ -132,26 +132,39 @@ final class TestShare: XCTestCase { func test_share_with_no_buffering() async { let shared = [1, 2, 3, 4, 5].async.share(bufferingPolicy: .bounded(0)) + + let expectation1 = XCTestExpectation(description: "Consumer 1 finished looping") + let expectation2 = XCTestExpectation(description: "Consumer 2 finished looping") let results1 = Mutex([Int]()) let results2 = Mutex([Int]()) + let gate1 = Gate() + let gate2 = Gate() let consumer1 = Task { - // Consumer 1 reads first element - for await value in shared { + var iterator = shared.makeAsyncIterator() + gate2.open() + await gate1.enter() + while let value = await iterator.next(isolation: nil) { results1.withLock { $0.append(value) } - // Delay to allow consumer 2 to get ahead - try? await Task.sleep(for: .milliseconds(10)) + // Add some delay to consumer 1 + try? await Task.sleep(for: .milliseconds(1)) } + expectation1.fulfill() } let consumer2 = Task { - // Consumer 2 reads all elements quickly - for await value in shared { + var iterator = shared.makeAsyncIterator() + gate1.open() + await gate2.enter() + while let value = await iterator.next(isolation: nil) { results2.withLock { $0.append(value) } } + expectation2.fulfill() } - + + await fulfillment(of: [expectation1, expectation2], timeout: 5) + await consumer1.value await consumer2.value From 3c9e1ab0ffee3dfc9680c5120ba3035a439939ca Mon Sep 17 00:00:00 2001 From: Alessio Nossa Date: Wed, 10 Dec 2025 18:21:53 +0100 Subject: [PATCH 3/5] Check limit bigger than 0 in AsyncShareSequence iterate --- Sources/AsyncAlgorithms/AsyncShareSequence.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/AsyncAlgorithms/AsyncShareSequence.swift b/Sources/AsyncAlgorithms/AsyncShareSequence.swift index ebe3492d..7468a083 100644 --- a/Sources/AsyncAlgorithms/AsyncShareSequence.swift +++ b/Sources/AsyncAlgorithms/AsyncShareSequence.swift @@ -426,7 +426,7 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab if let limit { let cancelled = await withUnsafeContinuation { (continuation: UnsafeContinuation) in let (resume, cancelled) = state.withLock { state -> (UnsafeContinuation?, Bool) in - guard state.buffer.count >= limit else { + guard limit > 0, state.buffer.count >= limit else { assert(state.limit == nil) guard case .cancelled = state.iteratingTask else { return (continuation, false) From 00a3aeea096a85bc7a4c04acf8f540e4081a4ada Mon Sep 17 00:00:00 2001 From: Alessio Nossa Date: Wed, 10 Dec 2025 18:41:51 +0100 Subject: [PATCH 4/5] Revert "Check limit bigger than 0 in AsyncShareSequence iterate" This reverts commit 3c9e1ab0ffee3dfc9680c5120ba3035a439939ca. --- Sources/AsyncAlgorithms/AsyncShareSequence.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/AsyncAlgorithms/AsyncShareSequence.swift b/Sources/AsyncAlgorithms/AsyncShareSequence.swift index 7468a083..ebe3492d 100644 --- a/Sources/AsyncAlgorithms/AsyncShareSequence.swift +++ b/Sources/AsyncAlgorithms/AsyncShareSequence.swift @@ -426,7 +426,7 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab if let limit { let cancelled = await withUnsafeContinuation { (continuation: UnsafeContinuation) in let (resume, cancelled) = state.withLock { state -> (UnsafeContinuation?, Bool) in - guard limit > 0, state.buffer.count >= limit else { + guard state.buffer.count >= limit else { assert(state.limit == nil) guard case .cancelled = state.iteratingTask else { return (continuation, false) From f1ade417a55bc2afae2bb83fa5024d6a5b52365f Mon Sep 17 00:00:00 2001 From: Alessio Nossa Date: Wed, 10 Dec 2025 18:39:00 +0100 Subject: [PATCH 5/5] Check limit bigger than 0 in iterate in AsyncShareSequence --- Sources/AsyncAlgorithms/AsyncShareSequence.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/AsyncAlgorithms/AsyncShareSequence.swift b/Sources/AsyncAlgorithms/AsyncShareSequence.swift index ebe3492d..253edf9e 100644 --- a/Sources/AsyncAlgorithms/AsyncShareSequence.swift +++ b/Sources/AsyncAlgorithms/AsyncShareSequence.swift @@ -423,7 +423,7 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab } func iterate() async -> Bool { - if let limit { + if let limit, limit > 0 { let cancelled = await withUnsafeContinuation { (continuation: UnsafeContinuation) in let (resume, cancelled) = state.withLock { state -> (UnsafeContinuation?, Bool) in guard state.buffer.count >= limit else {