Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 44 additions & 44 deletions Evolution/0016-mutli-producer-single-consumer-channel.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
introduced new `Async[Throwing]Stream` types which act as root asynchronous
sequences. These two types allow bridging from synchronous callbacks such as
delegates to an asynchronous sequence. This proposal adds a new root primitive
with the goal to model asynchronous multi-producer-single-consumer systems.
with the goal of modeling asynchronous multi-producer-single-consumer systems.

## Motivation

Expand All @@ -38,31 +38,31 @@ The below sections are providing a detailed explanation of each of those.
### Backpressure

In general, backpressure is the mechanism that prevents a fast producer from
overwhelming a slow consumer. It helps stability of the overall system by
overwhelming a slow consumer. It helps the stability of the overall system by
regulating the flow of data between different components. Additionally, it
allows to put an upper bound on resource consumption of a system. In reality,
allows us to put an upper bound on the resource consumption of a system. In reality,
backpressure is used in almost all networked applications.

In Swift, asynchronous sequence also have the concept of internal backpressure.
This modeled by the pull-based implementation where a consumer has to call
In Swift, asynchronous sequences also have the concept of internal backpressure.
This is modeled by the pull-based implementation where a consumer has to call
`next` on the `AsyncIterator`. In this model, there is no way for a consumer to
overwhelm a producer since the producer controls the rate of pulling elements.

However, the internal backpressure of an asynchronous isn't the only
However, the internal backpressure of an asynchronous sequence isn't the only
backpressure in play. There is also the source backpressure that is producing
the actual elements. For a backpressured system it is important that every
the actual elements. For a backpressured system, it is important that every
component of such a system is aware of the backpressure of its consumer and its
producer.

Let's take a quick look how our current root asynchronous sequences are handling
Let's take a quick look at how our current root asynchronous sequences are handling
this.

`Async[Throwing]Stream` aims to support backpressure by providing a configurable
buffer and returning `Async[Throwing]Stream.Continuation.YieldResult` which
contains the current buffer depth from the `yield()` method. However, only
providing the current buffer depth on `yield()` is not enough to bridge a
backpressured system into an asynchronous sequence since this can only be used
as a "stop" signal but we are missing a signal to indicate resuming the
as a "stop" signal, but we are missing a signal to indicate resuming the
production. The only viable backpressure strategy that can be implemented with
the current API is a timed backoff where we stop producing for some period of
time and then speculatively produce again. This is a very inefficient pattern
Expand Down Expand Up @@ -90,8 +90,8 @@ when more than one iterator has to suspend. The original proposal states:
creating multiple iterators and iterating over them separately, may produce an
unexpected series of values.

While that statement leaves room for any behavior we learned that a clear distinction
of behavior for root asynchronous sequences is beneficial; especially, when it comes to
While that statement leaves room for any behavior, we learned that a clear distinction
of behavior for root asynchronous sequences is beneficial; especially when it comes to
how transformation algorithms are applied on top.

### Downstream consumer termination
Expand All @@ -106,16 +106,16 @@ terminate.

### Upstream producer termination

Upstream producer termination is the inverse of downstream consumer termination
Upstream producer termination is the inverse of downstream consumer termination,
where the producer is notified once the consumption has terminated. Currently,
`Async[Throwing]Stream` does expose the `onTermination` property on the
`Continuation`. The `onTermination` closure is invoked once the consumer has
terminated. The consumer can terminate in four separate cases:

1. The asynchronous sequence was `deinit`ed and no iterator was created
2. The iterator was `deinit`ed and the asynchronous sequence is unicast
3. The consuming task is canceled
4. The asynchronous sequence returned `nil` or threw
1. The asynchronous sequence was `deinit`ed and no iterator was created.
2. The iterator was `deinit`ed and the asynchronous sequence is unicast.
3. The consuming task is canceled.
4. The asynchronous sequence returned `nil` or threw.

`Async[Throwing]Stream` currently invokes `onTermination` in all cases; however,
since `Async[Throwing]Stream` supports multiple consumers (as discussed in the
Expand All @@ -130,17 +130,17 @@ system and compares them to the behaviors of `Async[Throwing]Stream` and
`Async[Throwing]Channel`.

This section proposes a new type called `MultiProducerSingleConsumerAsyncChannel`
that implement all of the above-mentioned behaviors. Importantly, this proposed
that implements all of the above-mentioned behaviors. Importantly, this proposed
solution is taking advantage of `~Copyable` types to model the
multi-producer-single-consumer behavior. While the current `AsyncSequence`
protocols are not supporting `~Copyable` types we provide a way to convert the
protocols are not supporting `~Copyable` types, we provide a way to convert the
proposed channel to an asynchronous sequence. This leaves us room to support any
potential future asynchronous streaming protocol that supports `~Copyable`.

### Creating a MultiProducerSingleConsumerAsyncChannel

You can create an `MultiProducerSingleConsumerAsyncChannel` instance using the
`makeChannel(of: backpressureStrategy:)` method. This method returns you the
`makeChannel(of:backpressureStrategy:)` method. This method returns you the
channel and the source. The source can be used to send new values to the
asynchronous channel. The new API specifically provides a
multi-producer/single-consumer pattern.
Expand All @@ -157,9 +157,9 @@ let source = consume channelAndSource.source
```

The new proposed APIs offer two different backpressure strategies:
- Watermark: Using a low and high watermark
- Watermark: Using a low and high watermark.
- Unbounded: Unbounded buffering of the channel. **Only** use this if the
production is limited through some other mean.
production is limited through some other means.

The source is used to send values to the channel. It provides different APIs for
synchronous and asynchronous producers. All of the APIs are relaying the
Expand Down Expand Up @@ -197,7 +197,7 @@ to send values using the `send(contentsOf:)` which returns a `SendResult`. The
result either indicates that more values should be produced or that a callback
should be enqueued by calling the `enqueueCallback(onProduceMore:)` method.
This callback is invoked once the backpressure strategy
decided that more values should be produced. This API aims to offer the most
decides that more values should be produced. This API aims to offer the most
flexibility with the greatest performance. The callback only has to be allocated
in the case where the producer needs to pause production.

Expand All @@ -221,14 +221,14 @@ try await source.send(contentsOf: sequence)
```

With the above APIs, we should be able to effectively bridge any system into a
`MultiProducerSingleConsumerAsyncChannel` regardless if the system is callback-based,
`MultiProducerSingleConsumerAsyncChannel` regardless of whether the system is callback-based,
blocking, or asynchronous.

### Multi producer

To support multiple producers the source offers a `copy` method to produce a new
source. The source is returned `sending` so it is in a disconnected isolation
region than the original source allowing to pass it into a different isolation
To support multiple producers, the source offers a `copy` method to produce a new
source. The source is returned `sending`, so it is in a disconnected isolation
region from the original source, allowing it to be passed into a different isolation
region to concurrently produce elements.

```swift
Expand All @@ -254,8 +254,8 @@ print(await channel.next()) // Prints either 1 or 2 depending on which child tas

### Downstream consumer termination

> When reading the next two examples around termination behaviour keep in mind
that the newly proposed APIs are providing a strict a single consumer channel.
> When reading the next two examples of termination behavior, keep in mind
that the newly proposed APIs are providing a strict single consumer channel.

Calling `finish()` terminates the downstream consumer. Below is an example of
this:
Expand Down Expand Up @@ -296,7 +296,7 @@ print(try await channel.next()) // Throws SomeError
```

The other way to terminate the consumer is by deiniting the source. This has the
same effect as calling `finish()`. Since the source is a `~Copyable` type this
same effect as calling `finish()`. Since the source is a `~Copyable` type, this
will happen automatically when the source is last used or explicitly consumed.

```swift
Expand Down Expand Up @@ -753,12 +753,12 @@ can handle multiple consumers and resumes them in FIFO order.

### swift-nio: NIOAsyncSequenceProducer

The NIO team have created their own root asynchronous sequence with the goal to
provide a high performance sequence that can be used to bridge a NIO `Channel`
The NIO team has created their own root asynchronous sequence with the goal to
provide a high-performance sequence that can be used to bridge a NIO `Channel`
inbound stream into Concurrency. The `NIOAsyncSequenceProducer` is a highly
generic and fully inlinable type and quite unwiedly to use. This proposal is
generic and fully inlinable type and quite unwieldy to use. This proposal is
heavily inspired by the learnings from this type but tries to create a more
flexible and easier to use API that fits into the standard library.
flexible and easier-to-use API that fits into the standard library.

## Future directions

Expand All @@ -767,15 +767,15 @@ flexible and easier to use API that fits into the standard library.
The high/low watermark strategy is common in networking code; however, there are
other strategies such as an adaptive strategy that we could offer in the future.
An adaptive strategy regulates the backpressure based on the rate of
consumption and production. With the proposed new APIs we can easily add further
consumption and production. With the proposed new APIs, we can easily add further
strategies.

### Support `~Copyable` elements

In the future, we can extend the channel to support `~Copyable` elements. We
only need an underlying buffer primitive that can hold `~Copyable` types and the
only need an underlying buffer primitive that can hold `~Copyable` types, and the
continuations need to support `~Copyable` elements as well. By making the
channel not directly conform to `AsyncSequence` we can support this down the
channel not directly conform to `AsyncSequence`, we can support this down the
road.

## Alternatives considered
Expand All @@ -793,28 +793,28 @@ the current pattern of setting the `onTermination` closure on the source.
During the pitch phase, it was raised that we should provide a
`onConsumerCancellation` callback which gets invoked once the asynchronous
channel notices that the consuming task got cancelled. This callback could be
used to customize how cancellation is handled by the channel e.g. one could
imagine writing a few more elements to the channel before finishing it. Right now
used to customize how cancellation is handled by the channel, e.g. one could
imagine writing a few more elements to the channel before finishing it. Right now,
the channel immediately returns `nil` or throws a `CancellationError` when it
notices cancellation. This proposal decided to not provide this customization
because it opens up the possiblity that asynchronous channels are not terminating
notices cancellation. This proposal decided not to provide this customization
because it opens up the possibility that asynchronous channels are not terminating
when implemented incorrectly. Additionally, asynchronous sequences are not the
only place where task cancellation leads to an immediate error being thrown i.e.
`Task.sleep()` does the same. Hence, the value of the asynchronous not
terminating immediately brings little value when the next call in the iterating
task might throw. However, the implementation is flexible enough to add this in
the future and we can just default it to the current behaviour.
the future, and we can just default it to the current behaviour.

### Create a custom type for the `Result` of the `onProduceMore` callback

The `onProducerMore` callback takes a `Result<Void, Error>` which is used to
indicate if the producer should produce more or if the asynchronous channel
finished. We could introduce a new type for this but the proposal decided
finished. We could introduce a new type for this, but the proposal decided
against it since it effectively is a result type.

### Use an initializer instead of factory methods

Instead of providing a `makeChannel` factory method we could use an initializer
Instead of providing a `makeChannel` factory method, we could use an initializer
approach that takes a closure which gets the `Source` passed into. A similar API
has been offered with the `Continuation` based approach and
[SE-0388](https://github.com/apple/swift-evolution/blob/main/proposals/0388-async-stream-factory.md)
Expand All @@ -823,7 +823,7 @@ the initializer based APIs.

### Provide the type on older compilers

To achieve maximum performance the implementation is using `~Copyable` extensively.
To achieve maximum performance, the implementation is using `~Copyable` extensively.
On Swift versions before 6.1, there is a https://github.com/swiftlang/swift/issues/78048 when using; hence, this type
is only usable with Swift 6.1 and later compilers.

Expand Down