Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ extension ClientRPCExecutor.HedgingExecutor {
case .response(let response):
switch response.accepted {
case .success:
self.transport.retryThrottle.recordSuccess()
self.transport.retryThrottle?.recordSuccess()

if state.withLockedValue({ $0.receivedUsableResponse() }) {
try? await picker.continuation.write(attempt)
Expand All @@ -376,11 +376,11 @@ extension ClientRPCExecutor.HedgingExecutor {

if self.policy.nonFatalStatusCodes.contains(Status.Code(error.code)) {
// The response failed and the status code is non-fatal, we can make another attempt.
self.transport.retryThrottle.recordFailure()
self.transport.retryThrottle?.recordFailure()
return .unusableResponse(response, error.metadata.retryPushback)
} else {
// A fatal error code counts as a success to the throttle.
self.transport.retryThrottle.recordSuccess()
self.transport.retryThrottle?.recordSuccess()

if state.withLockedValue({ $0.receivedUsableResponse() }) {
try! await picker.continuation.write(attempt)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ extension ClientRPCExecutor.RetryExecutor {
case .success:
// Request was accepted. This counts as success to the throttle and there's no need
// to retry.
self.transport.retryThrottle.recordSuccess()
self.transport.retryThrottle?.recordSuccess()
retryDelayOverride = nil
shouldRetry = false

Expand All @@ -170,7 +170,7 @@ extension ClientRPCExecutor.RetryExecutor {

if isRetryableStatusCode {
// Counted as failure for throttling.
let throttled = self.transport.retryThrottle.recordFailure()
let throttled = self.transport.retryThrottle?.recordFailure() ?? false

// Status code can be retried, Did the server send pushback?
switch error.metadata.retryPushback {
Expand All @@ -190,7 +190,7 @@ extension ClientRPCExecutor.RetryExecutor {
}
} else {
// Not-retryable; this is considered a success.
self.transport.retryThrottle.recordSuccess()
self.transport.retryThrottle?.recordSuccess()
shouldRetry = false
retryDelayOverride = nil
}
Expand Down
2 changes: 1 addition & 1 deletion Sources/GRPCCore/Transport/ClientTransport.swift
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public protocol ClientTransport: Sendable {
/// Client transports don't need to implement the throttle or interact with it beyond its
/// creation. gRPC will record the results of requests to determine whether retries can be
/// performed.
var retryThrottle: RetryThrottle { get }
var retryThrottle: RetryThrottle? { get }

/// Establish and maintain a connection to the remote destination.
///
Expand Down
13 changes: 10 additions & 3 deletions Sources/GRPCInProcessTransport/InProcessClientTransport.swift
Original file line number Diff line number Diff line change
Expand Up @@ -96,16 +96,23 @@ public struct InProcessClientTransport: ClientTransport {
public typealias Inbound = RPCAsyncSequence<RPCResponsePart>
public typealias Outbound = RPCWriter<RPCRequestPart>.Closable

public let retryThrottle: RetryThrottle
public let retryThrottle: RetryThrottle?

private let methodConfiguration: MethodConfigurations
private let state: _LockedValueBox<State>

/// Creates a new in-process client transport.
///
/// - Parameters:
/// - server: The in-process server transport to connect to.
/// - methodConfiguration: Method specific configuration.
/// - retryThrottle: A throttle to apply to RPCs which are hedged or retried.
public init(
server: InProcessServerTransport,
methodConfiguration: MethodConfigurations = MethodConfigurations()
methodConfiguration: MethodConfigurations = MethodConfigurations(),
retryThrottle: RetryThrottle? = nil
) {
self.retryThrottle = RetryThrottle(maximumTokens: 10, tokenRatio: 0.1)
self.retryThrottle = retryThrottle
self.methodConfiguration = methodConfiguration
self.state = _LockedValueBox(.unconnected(.init(serverTransport: server)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public struct InProcessServerTransport: ServerTransport, Sendable {
/// to this transport using the ``acceptStream(_:)`` method.
///
/// - Returns: An ``RPCAsyncSequence`` of all published ``RPCStream``s.
public func listen() -> RPCAsyncSequence<RPCStream<Inbound, Outbound>> {
public func listen() async throws -> RPCAsyncSequence<RPCStream<Inbound, Outbound>> {
RPCAsyncSequence(wrapping: self.newStreams)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ struct AnyClientTransport: ClientTransport, Sendable {
typealias Inbound = RPCAsyncSequence<RPCResponsePart>
typealias Outbound = RPCWriter<RPCRequestPart>.Closable

private let _retryThrottle: @Sendable () -> RetryThrottle
private let _retryThrottle: @Sendable () -> RetryThrottle?
private let _withStream:
@Sendable (
_ method: MethodDescriptor,
Expand Down Expand Up @@ -52,7 +52,7 @@ struct AnyClientTransport: ClientTransport, Sendable {
}
}

var retryThrottle: RetryThrottle {
var retryThrottle: RetryThrottle? {
self._retryThrottle()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ struct StreamCountingClientTransport: ClientTransport, Sendable {
self.transport = AnyClientTransport(wrapping: transport)
}

var retryThrottle: RetryThrottle {
var retryThrottle: RetryThrottle? {
self.transport.retryThrottle
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ struct ThrowOnStreamCreationTransport: ClientTransport {
self.code = code
}

let retryThrottle = RetryThrottle(maximumTokens: 10, tokenRatio: 0.1)
let retryThrottle: RetryThrottle? = RetryThrottle(maximumTokens: 10, tokenRatio: 0.1)

func connect(lazily: Bool) async throws {
// no-op
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ final class InProcessClientTransportTests: XCTestCase {
}

group.addTask {
for try await stream in server.listen() {
for try await stream in try await server.listen() {
let receivedMessages = try await stream.inbound.reduce(into: []) { $0.append($1) }
try await stream.outbound.write(RPCResponsePart.message([42]))
stream.outbound.finish()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ final class InProcessServerTransportTests: XCTestCase {
)
)

let streamSequence = transport.listen()
let streamSequence = try await transport.listen()
var streamSequenceInterator = streamSequence.makeAsyncIterator()

try transport.acceptStream(stream)
Expand Down Expand Up @@ -66,7 +66,7 @@ final class InProcessServerTransportTests: XCTestCase {
)
)

let streamSequence = transport.listen()
let streamSequence = try await transport.listen()
var streamSequenceInterator = streamSequence.makeAsyncIterator()

try transport.acceptStream(firstStream)
Expand Down