Skip to content

Latest commit

 

History

History
806 lines (671 loc) · 37.9 KB

0406-async-stream-backpressure.md

File metadata and controls

806 lines (671 loc) · 37.9 KB

Backpressure support for AsyncStream

Introduction

SE-0314 introduced new Async[Throwing]Stream types which act as root asynchronous sequences. These two types allow bridging from synchronous callbacks such as delegates to an asynchronous sequence. This proposal adds a new way of constructing asynchronous streams with the goal to bridge backpressured systems into an asynchronous sequence. Furthermore, this proposal aims to clarify the cancellation behaviour both when the consuming task is cancelled and when the production side indicates termination.

Motivation

After using the AsyncSequence protocol and the Async[Throwing]Stream types extensively over the past years, we learned that there are a few important behavioral details that any AsyncSequence implementation needs to support. These behaviors are:

  1. Backpressure
  2. Multi/single consumer support
  3. Downstream consumer termination
  4. Upstream producer termination

In general, AsyncSequence implementations can be divided into two kinds: Root asynchronous sequences that are the source of values such as Async[Throwing]Stream and transformational asynchronous sequences such as AsyncMapSequence. Most transformational asynchronous sequences implicitly fulfill the above behaviors since they forward any demand to a base asynchronous sequence that should implement the behaviors. On the other hand, root asynchronous sequences need to make sure that all of the above behaviors are correctly implemented. Let's look at the current behavior of Async[Throwing]Stream to see if and how it achieves these behaviors.

Backpressure

Root asynchronous sequences need to relay the backpressure to the producing system. Async[Throwing]Stream aims to support backpressure by providing a configurable buffer and returning Async[Throwing]Stream.Continuation.YieldResult which contains the current buffer depth from the yield() method. However, only providing the current buffer depth on yield() is not enough to bridge a backpressured system into an asynchronous sequence since this can only be used as a "stop" signal but we are missing a signal to indicate resuming the production. The only viable backpressure strategy that can be implemented with the current API is a timed backoff where we stop producing for some period of time and then speculatively produce again. This is a very inefficient pattern that produces high latencies and inefficient use of resources.

Multi/single consumer support

The AsyncSequence protocol itself makes no assumptions about whether the implementation supports multiple consumers or not. This allows the creation of unicast and multicast asynchronous sequences. The difference between a unicast and multicast asynchronous sequence is if they allow multiple iterators to be created. AsyncStream does support the creation of multiple iterators and it does handle multiple consumers correctly. On the other hand, AsyncThrowingStream also supports multiple iterators but does fatalError when more than one iterator has to suspend. The original proposal states:

As with any sequence, iterating over an AsyncStream multiple times, or creating multiple iterators and iterating over them separately, may produce an unexpected series of values.

While that statement leaves room for any behavior we learned that a clear distinction of behavior for root asynchronous sequences is beneficial; especially, when it comes to how transformation algorithms are applied on top.

Downstream consumer termination

Downstream consumer termination allows the producer to notify the consumer that no more values are going to be produced. Async[Throwing]Stream does support this by calling the finish() or finish(throwing:) methods of the Async[Throwing]Stream.Continuation. However, Async[Throwing]Stream does not handle the case that the Continuation may be deinited before one of the finish methods is called. This currently leads to async streams that never terminate. The behavior could be changed but it could result in semantically breaking code.

Upstream producer termination

Upstream producer termination is the inverse of downstream consumer termination where the producer is notified once the consumption has terminated. Currently, Async[Throwing]Stream does expose the onTermination property on the Continuation. The onTermination closure is invoked once the consumer has terminated. The consumer can terminate in four separate cases:

  1. The asynchronous sequence was deinited and no iterator was created
  2. The iterator was deinited and the asynchronous sequence is unicast
  3. The consuming task is canceled
  4. The asynchronous sequence returned nil or threw

Async[Throwing]Stream currently invokes onTermination in all cases; however, since Async[Throwing]Stream supports multiple consumers (as discussed in the Multi/single consumer support section), a single consumer task being canceled leads to the termination of all consumers. This is not expected from multicast asynchronous sequences in general.

Proposed solution

The above motivation lays out the expected behaviors from a root asynchronous sequence and compares them to the behaviors of Async[Throwing]Stream. These are the behaviors where Async[Throwing]Stream diverges from the expectations.

  • Backpressure: Doesn't expose a "resumption" signal to the producer
  • Multi/single consumer:
    • Divergent implementation between throwing and non-throwing variant
    • Supports multiple consumers even though proposal positions it as a unicast asynchronous sequence
  • Consumer termination: Doesn't handle the Continuation being deinited
  • Producer termination: Happens on first consumer termination

This section proposes new APIs for Async[Throwing]Stream that implement all of the above-mentioned behaviors.

Creating an AsyncStream with backpressure support

You can create an Async[Throwing]Stream instance using the new makeStream(of: backpressureStrategy:) method. This method returns you the stream and the source. The source can be used to write new values to the asynchronous stream. The new API specifically provides a multi-producer/single-consumer pattern.

let (stream, source) = AsyncStream.makeStream(
    of: Int.self,
    backpressureStrategy: .watermark(low: 2, high: 4)
)

The new proposed APIs offer three different ways to bridge a backpressured system. The foundation is the multi-step synchronous interface. Below is an example of how it can be used:

do {
    let writeResult = try source.write(contentsOf: sequence)
    
    switch writeResult {
    case .produceMore:
       // Trigger more production
    
    case .enqueueCallback(let callbackToken):
        source.enqueueCallback(token: callbackToken, onProduceMore: { result in
            switch result {
            case .success:
                // Trigger more production
            case .failure(let error):
                // Terminate the underlying producer
            }
        })
    }
} catch {
    // `write(contentsOf:)` throws if the asynchronous stream already terminated
}

The above API offers the most control and highest performance when bridging a synchronous producer to an asynchronous sequence. First, you have to write values using the write(contentsOf:) which returns a WriteResult. The result either indicates that more values should be produced or that a callback should be enqueued by calling the enqueueCallback(callbackToken: onProduceMore:) method. This callback is invoked once the backpressure strategy decided that more values should be produced. This API aims to offer the most flexibility with the greatest performance. The callback only has to be allocated in the case where the producer needs to be suspended.

Additionally, the above API is the building block for some higher-level and easier-to-use APIs to write values to the asynchronous stream. Below is an example of the two higher-level APIs.

// Writing new values and providing a callback when to produce more
try source.write(contentsOf: sequence, onProduceMore: { result in
    switch result {
    case .success:
        // Trigger more production
    case .failure(let error):
        // Terminate the underlying producer
    }
})

// This method suspends until more values should be produced
try await source.write(contentsOf: sequence)

With the above APIs, we should be able to effectively bridge any system into an asynchronous stream regardless if the system is callback-based, blocking or asynchronous.

Downstream consumer termination

When reading the next two examples around termination behaviour keep in mind that the newly proposed APIs are providing a strict unicast asynchronous sequence.

Calling finish() terminates the downstream consumer. Below is an example of this:

// Termination through calling finish
let (stream, source) = AsyncStream.makeStream(
    of: Int.self,
    backpressureStrategy: .watermark(low: 2, high: 4)
)

_ = try await source.write(1)
source.finish()

for await element in stream {
    print(element)
}
print("Finished")

// Prints
// 1
// Finished

The other way to terminate the consumer is by deiniting the source. This has the same effect as calling finish() and makes sure that no consumer is stuck indefinitely.

// Termination through deiniting the source
let (stream, _) = AsyncStream.makeStream(
    of: Int.self,
    backpressureStrategy: .watermark(low: 2, high: 4)
)

for await element in stream {
    print(element)
}
print("Finished")

// Prints
// Finished

Trying to write more elements after the source has been finish will result in an error thrown from the write methods.

Upstream producer termination

The producer will get notified about termination through the onTerminate callback. Termination of the producer happens in the following scenarios:

// Termination through task cancellation
let (stream, source) = AsyncStream.makeStream(
    of: Int.self,
    backpressureStrategy: .watermark(low: 2, high: 4)
)

let task = Task {
    for await element in stream {

    }
}
task.cancel()
// Termination through deiniting the sequence
let (_, source) = AsyncStream.makeStream(
    of: Int.self,
    backpressureStrategy: .watermark(low: 2, high: 4)
)
// Termination through deiniting the iterator
let (stream, source) = AsyncStream.makeStream(
    of: Int.self,
    backpressureStrategy: .watermark(low: 2, high: 4)
)
_ = stream.makeAsyncIterator()
// Termination through calling finish
let (stream, source) = AsyncStream.makeStream(
    of: Int.self,
    backpressureStrategy: .watermark(low: 2, high: 4)
)

_ = try source.write(1)
source.finish()

for await element in stream {}

// onTerminate will be called after all elements have been consumed

Similar to the downstream consumer termination, trying to write more elements after the producer has been terminated will result in an error thrown from the write methods.

Detailed design

All new APIs on AsyncStream and AsyncThrowingStream are as follows:

/// Error that is thrown from the various `write` methods of the
/// ``AsyncStream.Source`` and ``AsyncThrowingStream.Source``.
/// 
/// This error is thrown when the asynchronous stream is already finished when
/// trying to write new elements.
public struct AsyncStreamAlreadyFinishedError: Error {}

extension AsyncStream {
    /// A mechanism to interface between producer code and an asynchronous stream.
    ///
    /// Use this source to provide elements to the stream by calling one of the `write` methods, then terminate the stream normally
    /// by calling the `finish()` method.
    public struct Source: Sendable {
        /// A strategy that handles the backpressure of the asynchronous stream.
        public struct BackpressureStrategy: Sendable {
            /// When the high watermark is reached producers will be suspended. All producers will be resumed again once
            /// the low watermark is reached.
            public static func watermark(low: Int, high: Int) -> BackpressureStrategy {}
        }

        /// A type that indicates the result of writing elements to the source.
        @frozen
        public enum WriteResult: Sendable {
            /// A token that is returned when the asynchronous stream's backpressure strategy indicated that production should
            /// be suspended. Use this token to enqueue a callback by  calling the ``enqueueCallback(_:)`` method.
            public struct CallbackToken: Sendable {}

            /// Indicates that more elements should be produced and written to the source.
            case produceMore

            /// Indicates that a callback should be enqueued.
            ///
            /// The associated token should be passed to the ``enqueueCallback(_:)`` method.
            case enqueueCallback(CallbackToken)
        }

        /// A callback to invoke when the stream finished.
        ///
        /// The stream finishes and calls this closure in the following cases:
        /// - No iterator was created and the sequence was deinited
        /// - An iterator was created and deinited
        /// - After ``finish(throwing:)`` was called and all elements have been consumed
        /// - The consuming task got cancelled
        public var onTermination: (@Sendable () -> Void)?

        /// Writes new elements to the asynchronous stream.
        ///
        /// If there is a task consuming the stream and awaiting the next element then the task will get resumed with the
        /// first element of the provided sequence. If the asynchronous stream already terminated then this method will throw an error
        /// indicating the failure.
        ///
        /// - Parameter sequence: The elements to write to the asynchronous stream.
        /// - Returns: The result that indicates if more elements should be produced at this time.
        public func write<S>(contentsOf sequence: S) throws -> WriteResult where Element == S.Element, S: Sequence {}

        /// Write the element to the asynchronous stream.
        ///
        /// If there is a task consuming the stream and awaiting the next element then the task will get resumed with the
        /// provided element. If the asynchronous stream already terminated then this method will throw an error
        /// indicating the failure.
        ///
        /// - Parameter element: The element to write to the asynchronous stream.
        /// - Returns: The result that indicates if more elements should be produced at this time.
        public func write(_ element: Element) throws -> WriteResult {}

        /// Enqueues a callback that will be invoked once more elements should be produced.
        ///
        /// Call this method after ``write(contentsOf:)`` or ``write(_:)`` returned ``WriteResult/enqueueCallback(_:)``.
        ///
        /// - Important: Enqueueing the same token multiple times is not allowed.
        ///
        /// - Parameters:
        ///   - token: The callback token.
        ///   - onProduceMore: The callback which gets invoked once more elements should be produced.
        public func enqueueCallback(token: WriteResult.CallbackToken, onProduceMore: @escaping @Sendable (Result<Void, Error>) -> Void) {}

        /// Cancel an enqueued callback.
        ///
        /// Call this method to cancel a callback enqueued by the ``enqueueCallback(callbackToken:onProduceMore:)`` method.
        ///
        /// - Note: This method supports being called before ``enqueueCallback(callbackToken:onProduceMore:)`` is called and
        /// will mark the passed `token` as cancelled.
        ///
        /// - Parameter token: The callback token.
        public func cancelCallback(token: WriteResult.CallbackToken) {}

        /// Write new elements to the asynchronous stream and provide a callback which will be invoked once more elements should be produced.
        ///
        /// If there is a task consuming the stream and awaiting the next element then the task will get resumed with the
        /// first element of the provided sequence. If the asynchronous stream already terminated then `onProduceMore` will be invoked with
        /// a `Result.failure`.
        ///
        /// - Parameters:
        ///   - sequence: The elements to write to the asynchronous stream.
        ///   - onProduceMore: The callback which gets invoked once more elements should be produced. This callback might be
        ///   invoked during the call to ``write(contentsOf:onProduceMore:)``.
        public func write<S>(contentsOf sequence: S, onProduceMore: @escaping @Sendable (Result<Void, Error>) -> Void) where Element == S.Element, S: Sequence {}

        /// Writes the element to the asynchronous stream.
        ///
        /// If there is a task consuming the stream and awaiting the next element then the task will get resumed with the
        /// provided element. If the asynchronous stream already terminated then `onProduceMore` will be invoked with
        /// a `Result.failure`.
        ///
        /// - Parameters:
        ///   - sequence: The element to write to the asynchronous stream.
        ///   - onProduceMore: The callback which gets invoked once more elements should be produced. This callback might be
        ///   invoked during the call to ``write(_:onProduceMore:)``.
        public func write(_ element: Element, onProduceMore: @escaping @Sendable (Result<Void, Error>) -> Void) {}

        /// Write new elements to the asynchronous stream.
        ///
        /// If there is a task consuming the stream and awaiting the next element then the task will get resumed with the
        /// first element of the provided sequence. If the asynchronous stream already terminated then this method will throw an error
        /// indicating the failure.
        ///
        /// This method returns once more elements should be produced.
        ///
        /// - Parameters:
        ///   - sequence: The elements to write to the asynchronous stream.
        public func write<S>(contentsOf sequence: S) async throws where Element == S.Element, S: Sequence {}

        /// Write new element to the asynchronous stream.
        ///
        /// If there is a task consuming the stream and awaiting the next element then the task will get resumed with the
        /// provided element. If the asynchronous stream already terminated then this method will throw an error
        /// indicating the failure.
        ///
        /// This method returns once more elements should be produced.
        ///
        /// - Parameters:
        ///   - sequence: The element to write to the asynchronous stream.
        public func write(_ element: Element) async throws {}

        /// Write the elements of the asynchronous sequence to the asynchronous stream.
        ///
        /// This method returns once the provided asynchronous sequence or the asynchronous stream finished.
        ///
        /// - Important: This method does not finish the source if consuming the upstream sequence terminated.
        ///
        /// - Parameters:
        ///   - sequence: The elements to write to the asynchronous stream.
        public func write<S>(contentsOf sequence: S) async throws where Element == S.Element, S: AsyncSequence {}

        /// Indicates that the production terminated.
        ///
        /// After all buffered elements are consumed the next iteration point will return `nil`.
        ///
        /// Calling this function more than once has no effect. After calling finish, the stream enters a terminal state and doesn't accept
        /// new elements.
        public func finish() {}
    }

    /// Initializes a new ``AsyncStream`` and an ``AsyncStream/Source``.
    ///
    /// - Parameters:
    ///   - elementType: The element type of the stream.
    ///   - backpressureStrategy: The backpressure strategy that the stream should use.
    /// - Returns: A tuple containing the stream and its source. The source should be passed to the
    ///   producer while the stream should be passed to the consumer.
    public static func makeStream(
        of elementType: Element.Type = Element.self,
        backpressureStrategy: Source.BackpressureStrategy
    ) -> (`Self`, Source) {}
}

extension AsyncThrowingStream {
    /// A mechanism to interface between producer code and an asynchronous stream.
    ///
    /// Use this source to provide elements to the stream by calling one of the `write` methods, then terminate the stream normally
    /// by calling the `finish()` method. You can also use the source's `finish(throwing:)` method to terminate the stream by
    /// throwing an error.
    public struct Source: Sendable {
        /// A strategy that handles the backpressure of the asynchronous stream.
        public struct BackpressureStrategy: Sendable {
            /// When the high watermark is reached, producers will be suspended. All producers will be resumed again once
            /// the low watermark is reached.
            public static func watermark(low: Int, high: Int) -> BackpressureStrategy {}
        }

        /// A type that indicates the result of writing elements to the source.
        @frozen
        public enum WriteResult: Sendable {
            /// A token that is returned when the asynchronous stream's backpressure strategy indicated that production should
            /// be suspended. Use this token to enqueue a callback by  calling the ``enqueueCallback(_:)`` method.
            public struct CallbackToken: Sendable {}

            /// Indicates that more elements should be produced and written to the source.
            case produceMore

            /// Indicates that a callback should be enqueued.
            ///
            /// The associated token should be passed to the ``enqueueCallback(_:)`` method.
            case enqueueCallback(CallbackToken)
        }

        /// A callback to invoke when the stream finished.
        ///
        /// The stream finishes and calls this closure in the following cases:
        /// - No iterator was created and the sequence was deinited
        /// - An iterator was created and deinited
        /// - After ``finish(throwing:)`` was called and all elements have been consumed
        /// - The consuming task got cancelled
        public var onTermination: (@Sendable () -> Void)? {}

        /// Writes new elements to the asynchronous stream.
        ///
        /// If there is a task consuming the stream and awaiting the next element then the task will get resumed with the
        /// first element of the provided sequence. If the asynchronous stream already terminated then this method will throw an error
        /// indicating the failure.
        ///
        /// - Parameter sequence: The elements to write to the asynchronous stream.
        /// - Returns: The result that indicates if more elements should be produced at this time.
        public func write<S>(contentsOf sequence: S) throws -> WriteResult where Element == S.Element, S: Sequence {}

        /// Write the element to the asynchronous stream.
        ///
        /// If there is a task consuming the stream and awaiting the next element then the task will get resumed with the
        /// provided element. If the asynchronous stream already terminated then this method will throw an error
        /// indicating the failure.
        ///
        /// - Parameter element: The element to write to the asynchronous stream.
        /// - Returns: The result that indicates if more elements should be produced at this time.
        public func write(_ element: Element) throws -> WriteResult {}

        /// Enqueues a callback that will be invoked once more elements should be produced.
        ///
        /// Call this method after ``write(contentsOf:)`` or ``write(_:)`` returned ``WriteResult/enqueueCallback(_:)``.
        ///
        /// - Important: Enqueueing the same token multiple times is not allowed.
        ///
        /// - Parameters:
        ///   - token: The callback token.
        ///   - onProduceMore: The callback which gets invoked once more elements should be produced.
        public func enqueueCallback(token: WriteResult.CallbackToken, onProduceMore: @escaping @Sendable (Result<Void, Error>) -> Void) {}

        /// Cancel an enqueued callback.
        ///
        /// Call this method to cancel a callback enqueued by the ``enqueueCallback(callbackToken:onProduceMore:)`` method.
        ///
        /// - Note: This method supports being called before ``enqueueCallback(callbackToken:onProduceMore:)`` is called and
        /// will mark the passed `token` as cancelled.
        ///
        /// - Parameter token: The callback token.
        public func cancelCallback(token: WriteResult.CallbackToken) {}

        /// Write new elements to the asynchronous stream and provide a callback which will be invoked once more elements should be produced.
        ///
        /// If there is a task consuming the stream and awaiting the next element then the task will get resumed with the
        /// first element of the provided sequence. If the asynchronous stream already terminated then `onProduceMore` will be invoked with
        /// a `Result.failure`.
        ///
        /// - Parameters:
        ///   - sequence: The elements to write to the asynchronous stream.
        ///   - onProduceMore: The callback which gets invoked once more elements should be produced. This callback might be
        ///   invoked during the call to ``write(contentsOf:onProduceMore:)``.
        public func write<S>(contentsOf sequence: S, onProduceMore: @escaping @Sendable (Result<Void, Error>) -> Void) where Element == S.Element, S: Sequence {}

        /// Writes the element to the asynchronous stream.
        ///
        /// If there is a task consuming the stream and awaiting the next element then the task will get resumed with the
        /// provided element. If the asynchronous stream already terminated then `onProduceMore` will be invoked with
        /// a `Result.failure`.
        ///
        /// - Parameters:
        ///   - sequence: The element to write to the asynchronous stream.
        ///   - onProduceMore: The callback which gets invoked once more elements should be produced. This callback might be
        ///   invoked during the call to ``write(_:onProduceMore:)``.
        public func write(_ element: Element, onProduceMore: @escaping @Sendable (Result<Void, Error>) -> Void) {}

        /// Write new elements to the asynchronous stream.
        ///
        /// If there is a task consuming the stream and awaiting the next element then the task will get resumed with the
        /// first element of the provided sequence. If the asynchronous stream already terminated then this method will throw an error
        /// indicating the failure.
        ///
        /// This method returns once more elements should be produced.
        ///
        /// - Parameters:
        ///   - sequence: The elements to write to the asynchronous stream.
        public func write<S>(contentsOf sequence: S) async throws where Element == S.Element, S: Sequence {}

        /// Write new element to the asynchronous stream.
        ///
        /// If there is a task consuming the stream and awaiting the next element then the task will get resumed with the
        /// provided element. If the asynchronous stream already terminated then this method will throw an error
        /// indicating the failure.
        ///
        /// This method returns once more elements should be produced.
        ///
        /// - Parameters:
        ///   - sequence: The element to write to the asynchronous stream.
        public func write(_ element: Element) async throws {}

        /// Write the elements of the asynchronous sequence to the asynchronous stream.
        ///
        /// This method returns once the provided asynchronous sequence or the  the asynchronous stream finished.
        ///
        /// - Important: This method does not finish the source if consuming the upstream sequence terminated.
        ///
        /// - Parameters:
        ///   - sequence: The elements to write to the asynchronous stream.
        public func write<S>(contentsOf sequence: S) async throws where Element == S.Element, S: AsyncSequence {}

        /// Indicates that the production terminated.
        ///
        /// After all buffered elements are consumed the next iteration point will return `nil` or throw an error.
        ///
        /// Calling this function more than once has no effect. After calling finish, the stream enters a terminal state and doesn't accept
        /// new elements.
        ///
        /// - Parameters:
        ///   - error: The error to throw, or `nil`, to finish normally.
        public func finish(throwing error: Failure?) {}
    }

    /// Initializes a new ``AsyncThrowingStream`` and an ``AsyncThrowingStream/Source``.
    ///
    /// - Parameters:
    ///   - elementType: The element type of the stream.
    ///   - failureType: The failure type of the stream.
    ///   - backpressureStrategy: The backpressure strategy that the stream should use.
    /// - Returns: A tuple containing the stream and its source. The source should be passed to the
    ///   producer while the stream should be passed to the consumer.
    public static func makeStream(
        of elementType: Element.Type = Element.self,
        throwing failureType: Failure.Type = Failure.self,
        backpressureStrategy: Source.BackpressureStrategy
    ) -> (`Self`, Source) where Failure == Error {}
}

Comparison to other root asynchronous sequences

swift-async-algorithm: AsyncChannel

The AsyncChannel is a multi-consumer/multi-producer root asynchronous sequence which can be used to communicate between two tasks. It only offers asynchronous production APIs and has no internal buffer. This means that any producer will be suspended until its value has been consumed. AsyncChannel can handle multiple consumers and resumes them in FIFO order.

### swift-nio: NIOAsyncSequenceProducer

The NIO team have created their own root asynchronous sequence with the goal to provide a high performance sequence that can be used to bridge a NIO Channel inbound stream into Concurrency. The NIOAsyncSequenceProducer is a highly generic and fully inlinable type and quite unwieldy to use. This proposal is heavily inspired by the learnings from this type but tries to create a more flexible and easier to use API that fits into the standard library.

Source compatibility

This change is additive and does not affect source compatibility.

ABI compatibility

This change is additive and does not affect ABI compatibility. All new methods are non-inlineable leaving us flexiblity to change the implementation in the future.

Future directions

Adaptive backpressure strategy

The high/low watermark strategy is common in networking code; however, there are other strategies such as an adaptive strategy that we could offer in the future. An adaptive strategy regulates the backpressure based on the rate of consumption and production. With the proposed new APIs we can easily add further strategies.

Element size dependent strategy

When the stream's element is a collection type then the proposed high/low watermark backpressure strategy might lead to unexpected results since each element can vary in actual memory size. In the future, we could provide a new backpressure strategy that supports inspecting the size of the collection.

Deprecate Async[Throwing]Stream.Continuation

In the future, we could deprecate the current continuation based APIs since the new proposed APIs are also capable of bridging non-backpressured producers by just discarding the WriteResult. The only use-case that the new APIs do not cover is the anycast behaviour of the current AsyncStream where one can create multiple iterators to the stream as long as no two iterators are consuming the stream at the same time. This can be solved via additional algorithms such as broadcast in the swift-async-algorithms package.

To give developers more time to adopt the new APIs the deprecation of the current APIs should be deferred to a future version. Especially since those new APIs are not backdeployed like the current Concurrency runtime.

Introduce a Writer and an AsyncWriter protocol

The newly introduced Source type offers a bunch of different write methods. We have seen similar types used in other places such as file abstraction or networking APIs. We could introduce a new Writer and AsyncWriter protocol in the future to enable writing generic algorithms on top of writers. The Source type could then conform to these new protocols.

Alternatives considered

Providing an Async[Throwing]Stream.Continuation.onConsume

We could add a new closure property to the Async[Throwing]Stream.Continuation which is invoked once an element has been consumed to implement a backpressure strategy; however, this requires the usage of a synchronization mechanism since the consumption and production often happen on separate threads. The added complexity and performance impact led to avoiding this approach.

Provide a getter for the current buffer depth

We could provide a getter for the current buffer depth on the Async[Throwing]Stream.Continuation. This could be used to query the buffer depth at an arbitrary time; however, it wouldn't allow us to implement backpressure strategies such as high/low watermarks without continuously asking what the buffer depth is. That would result in a very inefficient implementation.

Extending Async[Throwing]Stream.Continuation

Extending the current APIs to support all expected behaviors is problematic since it would change the semantics and might lead to currently working code misbehaving. Furthermore, extending the current APIs to support backpressure turns out to be problematic without compromising performance or usability.

Introducing a new type

We could introduce a new type such as AsyncBackpressured[Throwing]Stream; however, one of the original intentions of Async[Throwing]Stream was to be able to bridge backpressured systems. Furthermore, Async[Throwing]Stream is the best name. Therefore, this proposal decided to provide new interfaces to Async[Throwing]Stream.

Stick with the current Continuation and yield naming

The proposal decided against sticking to the current names since the existing names caused confusion to them being used in multiple places. Continuation was both used by the AsyncStream but also by Swift Concurrency via CheckedContinuation and UnsafeContinuation. Similarly, yield was used by both AsyncStream.Continuation.yield(), Task.yield() and the yield keyword. Having different names for these different concepts makes it easier to explain their usage. The currently proposed write names were chosen to align with the future direction of adding an AsyncWriter protocol. Source is a common name in flow based systems such as Akka. Other names that were considered:

  • enqueue
  • send

Provide the onTermination callback to the factory method

During development of the new APIs, I first tried to provide the onTermination callback in the makeStream method. However, that showed significant usability problems in scenarios where one wants to store the source in a type and reference self in the onTermination closure at the same time; hence, I kept the current pattern of setting the onTermination closure on the source.

Provide a onConsumerCancellation callback

During the pitch phase, it was raised that we should provide a onConsumerCancellation callback which gets invoked once the asynchronous stream notices that the consuming task got cancelled. This callback could be used to customize how cancellation is handled by the stream e.g. one could imagine writing a few more elements to the stream before finishing it. Right now the stream immediately returns nil or throws a CancellationError when it notices cancellation. This proposal decided to not provide this customization because it opens up the possibility that asynchronous streams are not terminating when implemented incorrectly. Additionally, asynchronous sequences are not the only place where task cancellation leads to an immediate error being thrown i.e. Task.sleep() does the same. Hence, the value of the asynchronous not terminating immediately brings little value when the next call in the iterating task might throw. However, the implementation is flexible enough to add this in the future and we can just default it to the current behaviour.

Create a custom type for the Result of the onProduceMore callback

The onProduceMore callback takes a Result<Void, Error> which is used to indicate if the producer should produce more or if the asynchronous stream finished. We could introduce a new type for this but the proposal decided against it since it effectively is a result type.

Use an initializer instead of factory methods

Instead of providing a makeStream factory method we could use an initializer approach that takes a closure which gets the Source passed into. A similar API has been offered with the Continuation based approach and SE-0388 introduced new factory methods to solve some of the usability ergonomics with the initializer based APIs.

## Acknowledgements

  • Johannes Weiss - For making me aware how important this problem is and providing great ideas on how to shape the API.
  • Philippe Hausler - For helping me designing the APIs and continuously providing feedback
  • George Barnett - For providing extensive code reviews and testing the implementation.