Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a broadcast async sequence #1684

Merged
merged 6 commits into from
Oct 26, 2023
Merged

Conversation

glbrntt
Copy link
Collaborator

@glbrntt glbrntt commented Oct 24, 2023

Motivation:

To support retries and hedging we need a way to buffer elements over time that can support multiple consumers concurrently and allows for consumers to start consuming after some elements have been produced.

An AsyncSequence fits this quite naturally but we don't yet have a general purpose implementat that fits this requirement. This change adds BroadcastAsyncSequence which isn't a general purpose async sequence but instead is tailored to the needs of grpc for hedging and retries. This means it supports a low number of concurrent iterators and maintains a limited size internal buffer and drops the slowest consumers when the buffer becomes full.

Modifications:

  • Add a BroadcastAsyncSequence and tests
  • Made a bunch of things inlinable/usableFromInline which necessitated a switch from @_spi(Testing) to @testable imports.
  • Rename the 'Stream' directory to 'Streaming'

Result:

  • BroadcastAsyncSequence can be used to implement retries and hedging.

Motivation:

To support retries and hedging we need a way to buffer elements over
time that can support multiple consumers concurrently and allows for
consumers to start consuming after some elements have been produced.

An `AsyncSequence` fits this quite naturally but we don't yet have a
general purpose implementat that fits this requirement. This change
adds `BroadcastAsyncSequence` which isn't a general purpose async
sequence but instead is tailored to the needs of grpc for hedging and
retries. This means it supports a low number of concurrent iterators and
maintains a limited size internal buffer and drops the slowest consumers
when the buffer becomes full.

Modifications:

- Add a `BroadcastAsyncSequence` and tests
- Made a bunch of things inlinable/usableFromInline which necessitated a
  switch from `@_spi(Testing)` to `@testable` imports.
- Rename the 'Stream' directory to 'Streaming'

Result:

- `BroadcastAsyncSequence` can be used to implement retries and hedging.
@glbrntt glbrntt added the semver-noop No SemVer version change required label Oct 24, 2023
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
extension BroadcastAsyncSequence {
@usableFromInline
struct Continuation: Sendable {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we rename this to Source please. We have been leaning towards Source lately since Continuation is very overloaded in Swift

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will update the naming.

init(_storage: _BroadcastSequenceStorage<Element>) {
self._storage = _storage
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we want a deinit here as well so we can differentiate when the producer and when the consumer go away?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comes back to being not general purpose: users never touch this and we control it carefully and will always finish it in the right place which lets us avoid allocating here.

/// one subscriber at a time, for hedging there may be at most five subscribers at a time.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
@usableFromInline
struct BroadcastAsyncSequence<Element: Sendable>: Sendable, AsyncSequence {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall I am a bit surprised that this is a root asynchronous sequence and not a transformational one. In my mind broadcast should always be transformational async sequence that you can apply to something like an AsyncStream or AsyncBufferredStream is there any particular reason why this is a root asynchronous sequence?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It better fits the use case here. A general purpose one probably would be transformational rather than a root. Mostly I want to avoid double buffering, but also since requests have a provider rather than an async sequence we can avoid a bunch of allocations by just creating the one async sequence.

@glbrntt glbrntt enabled auto-merge (squash) October 26, 2023 12:48
@glbrntt glbrntt merged commit e595df4 into grpc:main Oct 26, 2023
14 checks passed
@glbrntt glbrntt added the v2 A change for v2 label Nov 1, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
semver-noop No SemVer version change required v2 A change for v2
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants