Skip to content

Commit

Permalink
Add a broadcast async sequence (#1684)
Browse files Browse the repository at this point in the history
Motivation:

To support retries and hedging we need a way to buffer elements over
time that can support multiple consumers concurrently and allows for
consumers to start consuming after some elements have been produced.

An `AsyncSequence` fits this quite naturally but we don't yet have a
general purpose implementat that fits this requirement. This change
adds `BroadcastAsyncSequence` which isn't a general purpose async
sequence but instead is tailored to the needs of grpc for hedging and
retries. This means it supports a low number of concurrent iterators and
maintains a limited size internal buffer and drops the slowest consumers
when the buffer becomes full.

Modifications:

- Add a `BroadcastAsyncSequence` and tests
- Made a bunch of things inlinable/usableFromInline which necessitated a
  switch from `@_spi(Testing)` to `@testable` imports.
- Rename the 'Stream' directory to 'Streaming'

Result:

- `BroadcastAsyncSequence` can be used to implement retries and hedging.
  • Loading branch information
glbrntt authored Oct 26, 2023
1 parent 6ccafcc commit e595df4
Show file tree
Hide file tree
Showing 11 changed files with 2,034 additions and 14 deletions.
2 changes: 1 addition & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ extension Target {
static let grpcCore: Target = .target(
name: "GRPCCore",
dependencies: [
.dequeModule
.dequeModule,
],
path: "Sources/GRPCCore"
)
Expand Down
3 changes: 1 addition & 2 deletions Sources/GRPCCore/Call/Client/ClientResponse.swift
Original file line number Diff line number Diff line change
Expand Up @@ -371,8 +371,7 @@ extension ClientResponse.Stream {

@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
extension ClientResponse.Single {
@_spi(Testing)
public init(stream response: ClientResponse.Stream<Message>) async {
init(stream response: ClientResponse.Stream<Message>) async {
switch response.accepted {
case .success(let contents):
do {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,38 +17,46 @@
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
extension RPCAsyncSequence {
/// Returns an ``RPCAsyncSequence`` containing just the given element.
@_spi(Testing)
public static func one(_ element: Element) -> Self {
@inlinable
static func one(_ element: Element) -> Self {
return Self(wrapping: AsyncSequenceOfOne<Element, Never>(result: .success(element)))
}

/// Returns an ``RPCAsyncSequence`` throwing the given error.
@_spi(Testing)
public static func throwing<E: Error>(_ error: E) -> Self {
@inlinable
static func throwing<E: Error>(_ error: E) -> Self {
return Self(wrapping: AsyncSequenceOfOne<Element, E>(result: .failure(error)))
}
}

/// An `AsyncSequence` of a single value.
@usableFromInline
@available(macOS 10.15, iOS 13.0, tvOS 13, watchOS 6, *)
private struct AsyncSequenceOfOne<Element: Sendable, Failure: Error>: AsyncSequence {
private let result: Result<Element, Failure>
struct AsyncSequenceOfOne<Element: Sendable, Failure: Error>: AsyncSequence {
@usableFromInline
let result: Result<Element, Failure>

@inlinable
init(result: Result<Element, Failure>) {
self.result = result
}

@inlinable
func makeAsyncIterator() -> AsyncIterator {
AsyncIterator(result: self.result)
}

@usableFromInline
struct AsyncIterator: AsyncIteratorProtocol {
private var result: Result<Element, Failure>?
@usableFromInline
private(set) var result: Result<Element, Failure>?

fileprivate init(result: Result<Element, Failure>) {
@inlinable
init(result: Result<Element, Failure>) {
self.result = result
}

@inlinable
mutating func next() async throws -> Element? {
guard let result = self.result else { return nil }

Expand Down
Loading

0 comments on commit e595df4

Please sign in to comment.