Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AsyncStream multi-consumptions does not propagate .finished event to all consumers #66541

Open
inamiy opened this issue Jun 11, 2023 · 2 comments
Labels
bug A deviation from expected or documented behavior. Also: expected but undesirable behavior. triage needed This issue needs more specific labels

Comments

@inamiy
Copy link
Contributor

inamiy commented Jun 11, 2023

Description

As of main branch's 4d1d8a9, AsyncStream's underlying AsyncStream._Storage.State.continuations (array of continuations) management is slightly different from AsyncThrowingStream._Storage.State.continuations (single optional continuation) which doesn't have fatalError("attempt to await next() on more than one task") thus causing the following multi-consumption issue which .finished event does not propagate correctly to all consumers:

@MainActor
func test() async throws {
    let stream = AsyncStream<Int> { cont in
        let task = Task {
            try await Task.sleep(nanoseconds: 1_000_000)
            cont.yield(1)
            cont.finish()
        }
        cont.onTermination = { termination in
            task.cancel()
        }
    }

    var iter = stream.makeAsyncIterator()

    for i in 1 ... 3 {
        Task {
            let value = await iter.next()
            print("===> await \(i) = \(value as Any)")
        }
    }

    try await Task.sleep(nanoseconds: 1_000_000_000)
}

Example console log:

===> await 1 = Optional(1)
===> await 3 = nil

/* NOTE: result varies for each run, but correct 3 awaits will rarerly be called */

The reason behind this is AsyncStream._Storage.finish() is not sending .finished events to all managed continuations but only first one.

However, due to the current nature of unshared AsyncStream by default, it might be simpler to just add fatalError("attempt to await next() on more than one task") for AsyncStream as well.
(Related: apple/swift-async-algorithms#110)

Expected behavior

Either one would be the expected behavior:

  1. Sending .finished events to all managed continuations in AsyncStream._Storage.finish()
  2. Add fatalError("attempt to await next() on more than one task") for AsyncStream to explicitly tell multi-consumption is disallowed.
@inamiy inamiy added bug A deviation from expected or documented behavior. Also: expected but undesirable behavior. triage needed This issue needs more specific labels labels Jun 11, 2023
@FranzBusch
Copy link
Contributor

I covered this in my recent pitch for a new interface for AysncStream: https://forums.swift.org/t/pitch-new-apis-for-async-throwing-stream-with-backpressure-support/65449.

I do think that a follow up pitch the clarifies the semantics around AsyncStream makes sense; however, we have to be very careful here since changing semantics might break user code

@inamiy
Copy link
Contributor Author

inamiy commented Jun 18, 2023

Thanks @FranzBusch for comment.
Your pitch looks great overall, so hope to see it coming in future Swift version!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug A deviation from expected or documented behavior. Also: expected but undesirable behavior. triage needed This issue needs more specific labels
Projects
None yet
Development

No branches or pull requests

2 participants