diff --git a/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+HedgingExecutor.swift b/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+HedgingExecutor.swift index 63f46db74..aba9d9423 100644 --- a/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+HedgingExecutor.swift +++ b/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+HedgingExecutor.swift @@ -359,7 +359,7 @@ extension ClientRPCExecutor.HedgingExecutor { case .response(let response): switch response.accepted { case .success: - self.transport.retryThrottle.recordSuccess() + self.transport.retryThrottle?.recordSuccess() if state.withLockedValue({ $0.receivedUsableResponse() }) { try? await picker.continuation.write(attempt) @@ -376,11 +376,11 @@ extension ClientRPCExecutor.HedgingExecutor { if self.policy.nonFatalStatusCodes.contains(Status.Code(error.code)) { // The response failed and the status code is non-fatal, we can make another attempt. - self.transport.retryThrottle.recordFailure() + self.transport.retryThrottle?.recordFailure() return .unusableResponse(response, error.metadata.retryPushback) } else { // A fatal error code counts as a success to the throttle. - self.transport.retryThrottle.recordSuccess() + self.transport.retryThrottle?.recordSuccess() if state.withLockedValue({ $0.receivedUsableResponse() }) { try! await picker.continuation.write(attempt) diff --git a/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+RetryExecutor.swift b/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+RetryExecutor.swift index 619352723..1f9fd549a 100644 --- a/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+RetryExecutor.swift +++ b/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+RetryExecutor.swift @@ -152,7 +152,7 @@ extension ClientRPCExecutor.RetryExecutor { case .success: // Request was accepted. This counts as success to the throttle and there's no need // to retry. - self.transport.retryThrottle.recordSuccess() + self.transport.retryThrottle?.recordSuccess() retryDelayOverride = nil shouldRetry = false @@ -170,7 +170,7 @@ extension ClientRPCExecutor.RetryExecutor { if isRetryableStatusCode { // Counted as failure for throttling. - let throttled = self.transport.retryThrottle.recordFailure() + let throttled = self.transport.retryThrottle?.recordFailure() ?? false // Status code can be retried, Did the server send pushback? switch error.metadata.retryPushback { @@ -190,7 +190,7 @@ extension ClientRPCExecutor.RetryExecutor { } } else { // Not-retryable; this is considered a success. - self.transport.retryThrottle.recordSuccess() + self.transport.retryThrottle?.recordSuccess() shouldRetry = false retryDelayOverride = nil } diff --git a/Sources/GRPCCore/Transport/ClientTransport.swift b/Sources/GRPCCore/Transport/ClientTransport.swift index 913712ea7..989e31801 100644 --- a/Sources/GRPCCore/Transport/ClientTransport.swift +++ b/Sources/GRPCCore/Transport/ClientTransport.swift @@ -24,7 +24,7 @@ public protocol ClientTransport: Sendable { /// Client transports don't need to implement the throttle or interact with it beyond its /// creation. gRPC will record the results of requests to determine whether retries can be /// performed. - var retryThrottle: RetryThrottle { get } + var retryThrottle: RetryThrottle? { get } /// Establish and maintain a connection to the remote destination. /// diff --git a/Sources/GRPCInProcessTransport/InProcessClientTransport.swift b/Sources/GRPCInProcessTransport/InProcessClientTransport.swift index b42792146..9a9b5ec95 100644 --- a/Sources/GRPCInProcessTransport/InProcessClientTransport.swift +++ b/Sources/GRPCInProcessTransport/InProcessClientTransport.swift @@ -96,16 +96,23 @@ public struct InProcessClientTransport: ClientTransport { public typealias Inbound = RPCAsyncSequence public typealias Outbound = RPCWriter.Closable - public let retryThrottle: RetryThrottle + public let retryThrottle: RetryThrottle? private let methodConfiguration: MethodConfigurations private let state: _LockedValueBox + /// Creates a new in-process client transport. + /// + /// - Parameters: + /// - server: The in-process server transport to connect to. + /// - methodConfiguration: Method specific configuration. + /// - retryThrottle: A throttle to apply to RPCs which are hedged or retried. public init( server: InProcessServerTransport, - methodConfiguration: MethodConfigurations = MethodConfigurations() + methodConfiguration: MethodConfigurations = MethodConfigurations(), + retryThrottle: RetryThrottle? = nil ) { - self.retryThrottle = RetryThrottle(maximumTokens: 10, tokenRatio: 0.1) + self.retryThrottle = retryThrottle self.methodConfiguration = methodConfiguration self.state = _LockedValueBox(.unconnected(.init(serverTransport: server))) } diff --git a/Sources/GRPCInProcessTransport/InProcessServerTransport.swift b/Sources/GRPCInProcessTransport/InProcessServerTransport.swift index 95181b00c..845d38c19 100644 --- a/Sources/GRPCInProcessTransport/InProcessServerTransport.swift +++ b/Sources/GRPCInProcessTransport/InProcessServerTransport.swift @@ -59,7 +59,7 @@ public struct InProcessServerTransport: ServerTransport, Sendable { /// to this transport using the ``acceptStream(_:)`` method. /// /// - Returns: An ``RPCAsyncSequence`` of all published ``RPCStream``s. - public func listen() -> RPCAsyncSequence> { + public func listen() async throws -> RPCAsyncSequence> { RPCAsyncSequence(wrapping: self.newStreams) } diff --git a/Tests/GRPCCoreTests/Test Utilities/Transport/AnyTransport.swift b/Tests/GRPCCoreTests/Test Utilities/Transport/AnyTransport.swift index 5fb153bee..e211d214c 100644 --- a/Tests/GRPCCoreTests/Test Utilities/Transport/AnyTransport.swift +++ b/Tests/GRPCCoreTests/Test Utilities/Transport/AnyTransport.swift @@ -20,7 +20,7 @@ struct AnyClientTransport: ClientTransport, Sendable { typealias Inbound = RPCAsyncSequence typealias Outbound = RPCWriter.Closable - private let _retryThrottle: @Sendable () -> RetryThrottle + private let _retryThrottle: @Sendable () -> RetryThrottle? private let _withStream: @Sendable ( _ method: MethodDescriptor, @@ -52,7 +52,7 @@ struct AnyClientTransport: ClientTransport, Sendable { } } - var retryThrottle: RetryThrottle { + var retryThrottle: RetryThrottle? { self._retryThrottle() } diff --git a/Tests/GRPCCoreTests/Test Utilities/Transport/StreamCountingTransport.swift b/Tests/GRPCCoreTests/Test Utilities/Transport/StreamCountingTransport.swift index 0cdb2d1fd..9c6ebb276 100644 --- a/Tests/GRPCCoreTests/Test Utilities/Transport/StreamCountingTransport.swift +++ b/Tests/GRPCCoreTests/Test Utilities/Transport/StreamCountingTransport.swift @@ -39,7 +39,7 @@ struct StreamCountingClientTransport: ClientTransport, Sendable { self.transport = AnyClientTransport(wrapping: transport) } - var retryThrottle: RetryThrottle { + var retryThrottle: RetryThrottle? { self.transport.retryThrottle } diff --git a/Tests/GRPCCoreTests/Test Utilities/Transport/ThrowingTransport.swift b/Tests/GRPCCoreTests/Test Utilities/Transport/ThrowingTransport.swift index def987fac..39cdbaceb 100644 --- a/Tests/GRPCCoreTests/Test Utilities/Transport/ThrowingTransport.swift +++ b/Tests/GRPCCoreTests/Test Utilities/Transport/ThrowingTransport.swift @@ -26,7 +26,7 @@ struct ThrowOnStreamCreationTransport: ClientTransport { self.code = code } - let retryThrottle = RetryThrottle(maximumTokens: 10, tokenRatio: 0.1) + let retryThrottle: RetryThrottle? = RetryThrottle(maximumTokens: 10, tokenRatio: 0.1) func connect(lazily: Bool) async throws { // no-op diff --git a/Tests/GRPCInProcessTransportTests/InProcessClientTransportTests.swift b/Tests/GRPCInProcessTransportTests/InProcessClientTransportTests.swift index e751e364a..7aa64d05c 100644 --- a/Tests/GRPCInProcessTransportTests/InProcessClientTransportTests.swift +++ b/Tests/GRPCInProcessTransportTests/InProcessClientTransportTests.swift @@ -162,7 +162,7 @@ final class InProcessClientTransportTests: XCTestCase { } group.addTask { - for try await stream in server.listen() { + for try await stream in try await server.listen() { let receivedMessages = try await stream.inbound.reduce(into: []) { $0.append($1) } try await stream.outbound.write(RPCResponsePart.message([42])) stream.outbound.finish() diff --git a/Tests/GRPCInProcessTransportTests/InProcessServerTransportTests.swift b/Tests/GRPCInProcessTransportTests/InProcessServerTransportTests.swift index febdeb977..3e57d6ac4 100644 --- a/Tests/GRPCInProcessTransportTests/InProcessServerTransportTests.swift +++ b/Tests/GRPCInProcessTransportTests/InProcessServerTransportTests.swift @@ -37,7 +37,7 @@ final class InProcessServerTransportTests: XCTestCase { ) ) - let streamSequence = transport.listen() + let streamSequence = try await transport.listen() var streamSequenceInterator = streamSequence.makeAsyncIterator() try transport.acceptStream(stream) @@ -66,7 +66,7 @@ final class InProcessServerTransportTests: XCTestCase { ) ) - let streamSequence = transport.listen() + let streamSequence = try await transport.listen() var streamSequenceInterator = streamSequence.makeAsyncIterator() try transport.acceptStream(firstStream)