From 75a06e347bfa3b4bbc36962601ec128821cfa7a1 Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Tue, 28 Nov 2023 16:42:46 +0000 Subject: [PATCH 1/5] Add Client object --- .../ClientRPCExecutionConfiguration.swift | 290 ------ ...tRPCExecutionConfigurationCollection.swift | 76 -- .../ClientRPCExecutor+HedgingExecutor.swift | 6 +- .../ClientRPCExecutor+RetryExecutor.swift | 4 +- .../Client/Internal/ClientRPCExecutor.swift | 2 +- .../Client/Internal/RetryDelaySequence.swift | 8 +- Sources/GRPCCore/Client.swift | 824 ++++++++++++++++++ Sources/GRPCCore/ClientError.swift | 142 +++ .../GRPCCore/Transport/ClientTransport.swift | 6 +- .../Transport/InProcessClientTransport.swift | 6 +- ...xecutionConfigurationCollectionTests.swift | 37 +- ...ClientRPCExecutionConfigurationTests.swift | 4 +- .../ClientRPCExecutorTestHarness.swift | 14 +- .../ClientRPCExecutorTests+Hedging.swift | 4 +- .../ClientRPCExecutorTests+Retries.swift | 8 +- .../Call/Client/RetryDelaySequenceTests.swift | 4 +- Tests/GRPCCoreTests/GRPCServerTests.swift | 2 +- .../Transport/AnyTransport.swift | 4 +- .../Transport/StreamCountingTransport.swift | 2 +- .../Transport/ThrowingTransport.swift | 2 +- .../InProcessClientTransportTests.swift | 14 +- 21 files changed, 1032 insertions(+), 427 deletions(-) delete mode 100644 Sources/GRPCCore/Call/Client/ClientRPCExecutionConfiguration.swift delete mode 100644 Sources/GRPCCore/Call/Client/ClientRPCExecutionConfigurationCollection.swift create mode 100644 Sources/GRPCCore/Client.swift create mode 100644 Sources/GRPCCore/ClientError.swift diff --git a/Sources/GRPCCore/Call/Client/ClientRPCExecutionConfiguration.swift b/Sources/GRPCCore/Call/Client/ClientRPCExecutionConfiguration.swift deleted file mode 100644 index b771e0b76..000000000 --- a/Sources/GRPCCore/Call/Client/ClientRPCExecutionConfiguration.swift +++ /dev/null @@ -1,290 +0,0 @@ -/* - * Copyright 2023, gRPC Authors All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/// Configuration values for executing an RPC. -@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) -public struct ClientRPCExecutionConfiguration: Hashable, Sendable { - /// The default timeout for the RPC. - /// - /// If no reply is received in the specified amount of time the request is aborted - /// with an ``RPCError`` with code ``RPCError/Code/deadlineExceeded``. - /// - /// The actual deadline used will be the minimum of the value specified here - /// and the value set by the application by the client API. If either one isn't set - /// then the other value is used. If neither is set then the request has no deadline. - /// - /// The timeout applies to the overall execution of an RPC. If, for example, a retry - /// policy is set then the timeout begins when the first attempt is started and _isn't_ reset - /// when subsequent attempts start. - public var timeout: Duration? - - /// The policy determining how many times, and when, the RPC is executed. - /// - /// There are two policy types: - /// 1. Retry - /// 2. Hedging - /// - /// The retry policy allows an RPC to be retried a limited number of times if the RPC - /// fails with one of the configured set of status codes. RPCs are only retried if they - /// fail immediately, that is, the first response part received from the server is a - /// status code. - /// - /// The hedging policy allows an RPC to be executed multiple times concurrently. Typically - /// each execution will be staggered by some delay. The first successful response will be - /// reported to the client. Hedging is only suitable for idempotent RPCs. - public var executionPolicy: ExecutionPolicy? - - /// Create an execution configuration. - /// - /// - Parameters: - /// - executionPolicy: The execution policy to use for the RPC. - /// - timeout: The default timeout for the RPC. - public init( - executionPolicy: ExecutionPolicy?, - timeout: Duration? - ) { - self.executionPolicy = executionPolicy - self.timeout = timeout - } - - /// Create an execution configuration with a retry policy. - /// - /// - Parameters: - /// - retryPolicy: The policy for retrying the RPC. - /// - timeout: The default timeout for the RPC. - public init( - retryPolicy: RetryPolicy, - timeout: Duration? = nil - ) { - self.executionPolicy = .retry(retryPolicy) - self.timeout = timeout - } - - /// Create an execution configuration with a hedging policy. - /// - /// - Parameters: - /// - hedgingPolicy: The policy for hedging the RPC. - /// - timeout: The default timeout for the RPC. - public init( - hedgingPolicy: HedgingPolicy, - timeout: Duration? = nil - ) { - self.executionPolicy = .hedge(hedgingPolicy) - self.timeout = timeout - } -} - -@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) -extension ClientRPCExecutionConfiguration { - /// The execution policy for an RPC. - public enum ExecutionPolicy: Hashable, Sendable { - /// Policy for retrying an RPC. - /// - /// See ``RetryPolicy`` for more details. - case retry(RetryPolicy) - - /// Policy for hedging an RPC. - /// - /// See ``HedgingPolicy`` for more details. - case hedge(HedgingPolicy) - } -} - -/// Policy for retrying an RPC. -/// -/// gRPC retries RPCs when the first response from the server is a status code which matches -/// one of the configured retryable status codes. If the server begins processing the RPC and -/// first responds with metadata and later responds with a retryable status code then the RPC -/// won't be retried. -/// -/// Execution attempts are limited by ``maximumAttempts`` which includes the original attempt. The -/// maximum number of attempts is limited to five. -/// -/// Subsequent attempts are executed after some delay. The first _retry_, or second attempt, will -/// be started after a randomly chosen delay between zero and ``initialBackoff``. More generally, -/// the nth retry will happen after a randomly chosen delay between zero -/// and `min(initialBackoff * backoffMultiplier^(n-1), maximumBackoff)`. -/// -/// For more information see [gRFC A6 Client -/// Retries](https://github.com/grpc/proposal/blob/master/A6-client-retries.md). -@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) -public struct RetryPolicy: Hashable, Sendable { - /// The maximum number of RPC attempts, including the original attempt. - /// - /// Must be greater than one, values greater than five are treated as five. - public var maximumAttempts: Int { - didSet { self.maximumAttempts = validateMaxAttempts(self.maximumAttempts) } - } - - /// The initial backoff duration. - /// - /// The initial retry will occur after a random amount of time up to this value. - /// - /// - Precondition: Must be greater than zero. - public var initialBackoff: Duration { - willSet { Self.validateInitialBackoff(newValue) } - } - - /// The maximum amount of time to backoff for. - /// - /// - Precondition: Must be greater than zero. - public var maximumBackoff: Duration { - willSet { Self.validateMaxBackoff(newValue) } - } - - /// The multiplier to apply to backoff. - /// - /// - Precondition: Must be greater than zero. - public var backoffMultiplier: Double { - willSet { Self.validateBackoffMultiplier(newValue) } - } - - /// The set of status codes which may be retried. - /// - /// - Precondition: Must not be empty. - public var retryableStatusCodes: Set { - willSet { Self.validateRetryableStatusCodes(newValue) } - } - - /// Create a new retry policy. - /// - /// - Parameters: - /// - maximumAttempts: The maximum number of attempts allowed for the RPC. - /// - initialBackoff: The initial backoff period for the first retry attempt. Must be - /// greater than zero. - /// - maximumBackoff: The maximum period of time to wait between attempts. Must be greater than - /// zero. - /// - backoffMultiplier: The exponential backoff multiplier. Must be greater than zero. - /// - retryableStatusCodes: The set of status codes which may be retried. Must not be empty. - /// - Precondition: `maximumAttempts`, `initialBackoff`, `maximumBackoff` and `backoffMultiplier` - /// must be greater than zero. - /// - Precondition: `retryableStatusCodes` must not be empty. - public init( - maximumAttempts: Int, - initialBackoff: Duration, - maximumBackoff: Duration, - backoffMultiplier: Double, - retryableStatusCodes: Set - ) { - self.maximumAttempts = validateMaxAttempts(maximumAttempts) - - Self.validateInitialBackoff(initialBackoff) - self.initialBackoff = initialBackoff - - Self.validateMaxBackoff(maximumBackoff) - self.maximumBackoff = maximumBackoff - - Self.validateBackoffMultiplier(backoffMultiplier) - self.backoffMultiplier = backoffMultiplier - - Self.validateRetryableStatusCodes(retryableStatusCodes) - self.retryableStatusCodes = retryableStatusCodes - } - - private static func validateInitialBackoff(_ value: Duration) { - precondition(value.isGreaterThanZero, "initialBackoff must be greater than zero") - } - - private static func validateMaxBackoff(_ value: Duration) { - precondition(value.isGreaterThanZero, "maximumBackoff must be greater than zero") - } - - private static func validateBackoffMultiplier(_ value: Double) { - precondition(value > 0, "backoffMultiplier must be greater than zero") - } - - private static func validateRetryableStatusCodes(_ value: Set) { - precondition(!value.isEmpty, "retryableStatusCodes mustn't be empty") - } -} - -/// Policy for hedging an RPC. -/// -/// Hedged RPCs may execute more than once on a server so only idempotent methods should -/// be hedged. -/// -/// gRPC executes the RPC at most ``maximumAttempts`` times, staggering each attempt -/// by ``hedgingDelay``. -/// -/// For more information see [gRFC A6 Client -/// Retries](https://github.com/grpc/proposal/blob/master/A6-client-retries.md). -@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) -public struct HedgingPolicy: Hashable, Sendable { - /// The maximum number of RPC attempts, including the original attempt. - /// - /// Values greater than five are treated as five. - /// - /// - Precondition: Must be greater than one. - public var maximumAttempts: Int { - didSet { self.maximumAttempts = validateMaxAttempts(self.maximumAttempts) } - } - - /// The first RPC will be sent immediately, but each subsequent RPC will be sent at intervals - /// of `hedgingDelay`. Set this to zero to immediately send all RPCs. - public var hedgingDelay: Duration { - willSet { Self.validateHedgingDelay(newValue) } - } - - /// The set of status codes which indicate other hedged RPCs may still succeed. - /// - /// If a non-fatal status code is returned by the server, hedged RPCs will continue. - /// Otherwise, outstanding requests will be cancelled and the error returned to the - /// application layer. - public var nonFatalStatusCodes: Set - - /// Create a new hedging policy. - /// - /// - Parameters: - /// - maximumAttempts: The maximum number of attempts allowed for the RPC. - /// - hedgingDelay: The delay between each hedged RPC. - /// - nonFatalStatusCodes: The set of status codes which indicated other hedged RPCs may still - /// succeed. - /// - Precondition: `maximumAttempts` must be greater than zero. - public init( - maximumAttempts: Int, - hedgingDelay: Duration, - nonFatalStatusCodes: Set - ) { - self.maximumAttempts = validateMaxAttempts(maximumAttempts) - - Self.validateHedgingDelay(hedgingDelay) - self.hedgingDelay = hedgingDelay - self.nonFatalStatusCodes = nonFatalStatusCodes - } - - private static func validateHedgingDelay(_ value: Duration) { - precondition( - value.isGreaterThanOrEqualToZero, - "hedgingDelay must be greater than or equal to zero" - ) - } -} - -private func validateMaxAttempts(_ value: Int) -> Int { - precondition(value > 0, "maximumAttempts must be greater than zero") - return min(value, 5) -} - -@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) -extension Duration { - fileprivate var isGreaterThanZero: Bool { - self.components.seconds > 0 || self.components.attoseconds > 0 - } - - fileprivate var isGreaterThanOrEqualToZero: Bool { - self.components.seconds >= 0 || self.components.attoseconds >= 0 - } -} diff --git a/Sources/GRPCCore/Call/Client/ClientRPCExecutionConfigurationCollection.swift b/Sources/GRPCCore/Call/Client/ClientRPCExecutionConfigurationCollection.swift deleted file mode 100644 index 9207fd757..000000000 --- a/Sources/GRPCCore/Call/Client/ClientRPCExecutionConfigurationCollection.swift +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright 2023, gRPC Authors All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) - -/// A collection of ``ClientRPCExecutionConfiguration``s, mapped to specific methods or services. -/// -/// When creating a new instance, you must provide a default configuration to be used when getting -/// a configuration for a method that has not been given a specific override. -/// Use ``setDefaultConfiguration(_:forService:)`` to set a specific override for a whole -/// service. -/// -/// Use the subscript to get and set configurations for methods. -public struct ClientRPCExecutionConfigurationCollection: Sendable, Hashable { - private var elements: [MethodDescriptor: ClientRPCExecutionConfiguration] - private let defaultConfiguration: ClientRPCExecutionConfiguration - - public init( - defaultConfiguration: ClientRPCExecutionConfiguration = ClientRPCExecutionConfiguration( - executionPolicy: nil, - timeout: nil - ) - ) { - self.elements = [:] - self.defaultConfiguration = defaultConfiguration - } - - public subscript(_ descriptor: MethodDescriptor) -> ClientRPCExecutionConfiguration { - get { - if let methodLevelOverride = self.elements[descriptor] { - return methodLevelOverride - } - var serviceLevelDescriptor = descriptor - serviceLevelDescriptor.method = "" - return self.elements[serviceLevelDescriptor, default: self.defaultConfiguration] - } - - set { - precondition( - !descriptor.service.isEmpty, - "Method descriptor's service cannot be empty." - ) - - self.elements[descriptor] = newValue - } - } - - /// Set a default configuration for a service. - /// - /// If getting a configuration for a method that's part of a service, and the method itself doesn't have an - /// override, then this configuration will be used instead of the default configuration passed when creating - /// this instance of ``ClientRPCExecutionConfigurationCollection``. - /// - /// - Parameters: - /// - configuration: The default configuration for the service. - /// - service: The name of the service for which this override applies. - public mutating func setDefaultConfiguration( - _ configuration: ClientRPCExecutionConfiguration, - forService service: String - ) { - self[MethodDescriptor(service: service, method: "")] = configuration - } -} diff --git a/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+HedgingExecutor.swift b/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+HedgingExecutor.swift index 63f46db74..11a665b5f 100644 --- a/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+HedgingExecutor.swift +++ b/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+HedgingExecutor.swift @@ -30,7 +30,7 @@ extension ClientRPCExecutor { @usableFromInline let transport: Transport @usableFromInline - let policy: HedgingPolicy + let policy: Client.MethodConfiguration.HedgingPolicy @usableFromInline let timeout: Duration? @usableFromInline @@ -45,7 +45,7 @@ extension ClientRPCExecutor { @inlinable init( transport: Transport, - policy: HedgingPolicy, + policy: Client.MethodConfiguration.HedgingPolicy, timeout: Duration?, interceptors: [any ClientInterceptor], serializer: Serializer, @@ -420,7 +420,7 @@ extension ClientRPCExecutor.HedgingExecutor { private(set) var hasUsableResponse: Bool @inlinable - init(policy: HedgingPolicy) { + init(policy: Client.MethodConfiguration.HedgingPolicy) { self._maximumAttempts = policy.maximumAttempts self.attempt = 1 self.hasUsableResponse = false diff --git a/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+RetryExecutor.swift b/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+RetryExecutor.swift index 619352723..7720916c5 100644 --- a/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+RetryExecutor.swift +++ b/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+RetryExecutor.swift @@ -30,7 +30,7 @@ extension ClientRPCExecutor { @usableFromInline let transport: Transport @usableFromInline - let policy: RetryPolicy + let policy: Client.MethodConfiguration.RetryPolicy @usableFromInline let timeout: Duration? @usableFromInline @@ -45,7 +45,7 @@ extension ClientRPCExecutor { @inlinable init( transport: Transport, - policy: RetryPolicy, + policy: Client.MethodConfiguration.RetryPolicy, timeout: Duration?, interceptors: [any ClientInterceptor], serializer: Serializer, diff --git a/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor.swift b/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor.swift index 03e12c87a..a9452eb5e 100644 --- a/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor.swift +++ b/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor.swift @@ -35,7 +35,7 @@ enum ClientRPCExecutor { static func execute( request: ClientRequest.Stream, method: MethodDescriptor, - configuration: ClientRPCExecutionConfiguration, + configuration: Client.MethodConfiguration, serializer: some MessageSerializer, deserializer: some MessageDeserializer, transport: some ClientTransport, diff --git a/Sources/GRPCCore/Call/Client/Internal/RetryDelaySequence.swift b/Sources/GRPCCore/Call/Client/Internal/RetryDelaySequence.swift index b07691e4e..6c61f40c4 100644 --- a/Sources/GRPCCore/Call/Client/Internal/RetryDelaySequence.swift +++ b/Sources/GRPCCore/Call/Client/Internal/RetryDelaySequence.swift @@ -26,10 +26,10 @@ struct RetryDelaySequence: Sequence { typealias Element = Duration @usableFromInline - let policy: RetryPolicy + let policy: Client.MethodConfiguration.RetryPolicy @inlinable - init(policy: RetryPolicy) { + init(policy: Client.MethodConfiguration.RetryPolicy) { self.policy = policy } @@ -41,12 +41,12 @@ struct RetryDelaySequence: Sequence { @usableFromInline struct Iterator: IteratorProtocol { @usableFromInline - let policy: RetryPolicy + let policy: Client.MethodConfiguration.RetryPolicy @usableFromInline private(set) var n = 1 @inlinable - init(policy: RetryPolicy) { + init(policy: Client.MethodConfiguration.RetryPolicy) { self.policy = policy } diff --git a/Sources/GRPCCore/Client.swift b/Sources/GRPCCore/Client.swift new file mode 100644 index 000000000..ffc6e85e3 --- /dev/null +++ b/Sources/GRPCCore/Client.swift @@ -0,0 +1,824 @@ +/* + * Copyright 2023, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/// A gRPC client. +/// +/// A ``Client`` communicates to a server via a given ``ClientTransport``. +/// You can start RPCs to the server by calling the corresponding method: +/// - ``unary(request:descriptor:serializer:deserializer:handler:)`` +/// - ``clientStreaming(request:descriptor:serializer:deserializer:handler:)`` +/// - ``serverStreaming(request:descriptor:serializer:deserializer:handler:)`` +/// - ``bidirectionalStreaming(request:descriptor:serializer:deserializer:handler:)`` +/// +/// You can set ``MethodConfiguration``s on this client to override whatever configurations have been +/// set on the given transport. +/// You can also use ``ClientInterceptor``s to implement cross-cutting logic which apply to all +/// RPCs. Example uses of interceptors include authentication and logging. +/// +/// ## Creating and configuring a client +/// +/// The following example demonstrates how to create and configure a server. +/// +/// ```swift +/// // Create and add an in-process transport. +/// let inProcessTransport = InProcessClientTransport() +/// let client = Client(transport: inProcessTransport) +/// +/// // Create and add some interceptors. +/// client.interceptors.add(StatsRecordingServerInterceptors()) +/// +/// // Create and add some method configurations. +/// let defaultConfiguration = MethodConfiguration( +/// executionPolicy: ..., +/// timeout: ... +/// ) +/// let registry = MethodConfigurationRegistry(defaultConfiguration: defaultConfiguration) +/// client.methodConfigurationOverrides = registry +/// ``` +/// +/// ## Starting and stopping the client +/// +/// Once you have configured the client, call ``run()`` to start it. Calling ``run()`` connects to the given +/// transport. +/// +/// ```swift +/// // Start running the client. +/// try await client.run() +/// ``` +/// +/// The ``run()`` method won't return until the client has finished handling all requests. You can +/// signal to the client that it should stop creating new request streams by calling ``close()``. +/// This gives the client enough time to drain any requests already in flight. To stop the client more abruptly +/// you can cancel the task running your client. If your application requires additional resources +/// that need their lifecycles managed you should consider using [Swift Service +/// Lifecycle](https://github.com/swift-server/swift-service-lifecycle). +@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) +public final class Client: Sendable { + /// A collection of ``ClientInterceptor`` implementations which are applied to all accepted + /// RPCs. + /// + /// RPCs are intercepted in the order that interceptors are added. That is, a request sent from the client to + /// the server will first be intercepted by the first added interceptor followed by the second, and so on. + /// For responses from the server, they'll be applied in the opposite order. + public var interceptors: Interceptors { + get { + self.storage.withLockedValue { $0.interceptors } + } + set { + self.storage.withLockedValue { storage in + if case .notStarted = storage.state { + storage.interceptors = newValue + } + } + } + } + + /// A ``MethodConfigurationRegistry`` containing ``MethodConfiguration``s for calls + /// made from this ``Client``. + /// + /// - Note: These configurations will override those configurations set in the ``ClientTransport``. + public var methodConfigurationOverrides: MethodConfigurationRegistry { + get { + self.storage.withLockedValue { $0.methodConfigurationOverrides } + } + set { + self.storage.withLockedValue { storage in + if case .notStarted = storage.state { + storage.methodConfigurationOverrides = newValue + } + } + } + } + + /// The state of the client. + private enum State { + /// The client hasn't been started yet. Can transition to `running` or `stopped`. + case notStarted + /// The client is running and can send RPCs. Can transition to `stopping`. + case running + /// The client is stopping and no new RPCs will be sent. Existing RPCs may run to + /// completion. May transition to `stopped`. + case stopping + /// The client has stopped, no RPCs are in flight and no more will be accepted. This state + /// is terminal. + case stopped + } + + /// Underlying storage for the client. + private struct Storage { + var interceptors: Interceptors + var methodConfigurationOverrides: MethodConfigurationRegistry + var state: State + + init() { + self.interceptors = Interceptors() + self.methodConfigurationOverrides = MethodConfigurationRegistry() + self.state = .notStarted + } + } + + private let storage: LockedValueBox + + /// The transport which provides a bidirectional communication channel with the server. + private let transport: ClientTransport + + /// Creates a new client with no resources. + /// + /// You can add resources to the client via ``interceptors-swift.property`` and + /// ``methodConfigurationOverrides-swift.property``, and start the client by calling ``run()``. + /// + /// - Note: Any changes to resources after ``run()`` has been called will be ignored. + /// + /// - Parameter transport: The ``ClientTransport`` to be used for this ``Client``. + public init(transport: ClientTransport) { + self.storage = LockedValueBox(Storage()) + self.transport = transport + } + + /// Start the client. + /// + /// This is a long-running task that will return once ``close()`` has been called and all in-flight RPCs + /// finished executing. + /// + /// If you need to immediately stop all work, cancel the task executing this method. + public func run() async throws { + let interceptors = try self.storage.withLockedValue { storage in + switch storage.state { + case .notStarted: + storage.state = .running + return storage.interceptors + case .running: + throw ClientError( + code: .clientIsAlreadyRunning, + message: "The client is already running and can only be started once." + ) + case .stopping, .stopped: + throw ClientError( + code: .clientIsStopped, + message: "The client has stopped and can only be started once." + ) + } + } + + // When we exit this function we must have stopped. + defer { + self.storage.withLockedValue { $0.state = .stopped } + } + + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await self.transport.connect(lazily: false) + } + try await group.next() + } + } + + /// Close the client. + /// + /// The transport will be closed: this means that it will be given enough time to wait for in-flight RPCs to + /// finish executing, but no new RPCs will be accepted. + /// You can cancel the task executing ``run()`` if you want to immediately stop all work. + public func close() { + self.storage.withLockedValue { storage in + switch storage.state { + case .notStarted: + storage.state = .stopped + case .running: + storage.state = .stopping + case .stopping, .stopped: + () + } + } + + self.transport.close() + } + + /// Start a unary RPC. + /// + /// - Parameters: + /// - request: The unary request. + /// - descriptor: The method descriptor for which to execute this request. + /// - serializer: A request serializer. + /// - deserializer: A response deserializer. + /// - handler: A unary response handler. + /// + /// - Returns: The return value from the `handler`. + public func unary( + request: ClientRequest.Single, + descriptor: MethodDescriptor, + serializer: some MessageSerializer, + deserializer: some MessageDeserializer, + handler: @Sendable @escaping (ClientResponse.Single) async throws -> ReturnValue + ) async throws -> ReturnValue { + let (configurationOverrides, interceptors) = try self.storage.withLockedValue { storage in + switch storage.state { + case .running: + return (storage.methodConfigurationOverrides, storage.interceptors.values) + case .notStarted: + throw ClientError( + code: .clientIsNotRunning, + message: "Client must be running to make an RPC: call run() first." + ) + case .stopping, .stopped: + throw ClientError( + code: .clientIsStopped, + message: "Client has been stopped. Can't make any more RPCs." + ) + } + } + + let applicableConfiguration = self.resolveMethodConfiguration( + descriptor: descriptor, + clientConfigurations: configurationOverrides + ) + + return try await ClientRPCExecutor.execute( + request: ClientRequest.Stream(single: request), + method: descriptor, + configuration: applicableConfiguration, + serializer: serializer, + deserializer: deserializer, + transport: self.transport, + interceptors: interceptors, + handler: { stream in + let singleResponse = await ClientResponse.Single(stream: stream) + return try await handler(singleResponse) + } + ) + } + + /// Start a client-streaming RPC. + /// + /// - Parameters: + /// - request: The request stream. + /// - descriptor: The method descriptor for which to execute this request. + /// - serializer: A request serializer. + /// - deserializer: A response deserializer. + /// - handler: A unary response handler. + /// + /// - Returns: The return value from the `handler`. + public func clientStreaming( + request: ClientRequest.Stream, + descriptor: MethodDescriptor, + serializer: some MessageSerializer, + deserializer: some MessageDeserializer, + handler: @Sendable @escaping (ClientResponse.Single) async throws -> ReturnValue + ) async throws -> ReturnValue { + let (configurationOverrides, interceptors) = try self.storage.withLockedValue { storage in + switch storage.state { + case .running: + return (storage.methodConfigurationOverrides, storage.interceptors.values) + case .notStarted: + throw ClientError( + code: .clientIsNotRunning, + message: "Client must be running to make an RPC: call run() first." + ) + case .stopping, .stopped: + throw ClientError( + code: .clientIsStopped, + message: "Client has been stopped. Can't make any more RPCs." + ) + } + } + + let applicableConfiguration = self.resolveMethodConfiguration( + descriptor: descriptor, + clientConfigurations: configurationOverrides + ) + + return try await ClientRPCExecutor.execute( + request: request, + method: descriptor, + configuration: applicableConfiguration, + serializer: serializer, + deserializer: deserializer, + transport: transport, + interceptors: interceptors, + handler: { stream in + let singleResponse = await ClientResponse.Single(stream: stream) + return try await handler(singleResponse) + } + ) + } + + /// Start a server-streaming RPC. + /// + /// - Parameters: + /// - request: The unary request. + /// - descriptor: The method descriptor for which to execute this request. + /// - serializer: A request serializer. + /// - deserializer: A response deserializer. + /// - handler: A response stream handler. + /// + /// - Returns: The return value from the `handler`. + public func serverStreaming( + request: ClientRequest.Single, + descriptor: MethodDescriptor, + serializer: some MessageSerializer, + deserializer: some MessageDeserializer, + handler: @Sendable @escaping (ClientResponse.Stream) async throws -> ReturnValue + ) async throws -> ReturnValue { + let (configurationOverrides, interceptors) = try self.storage.withLockedValue { storage in + switch storage.state { + case .running: + return (storage.methodConfigurationOverrides, storage.interceptors.values) + case .notStarted: + throw ClientError( + code: .clientIsNotRunning, + message: "Client must be running to make an RPC: call run() first." + ) + case .stopping, .stopped: + throw ClientError( + code: .clientIsStopped, + message: "Client has been stopped. Can't make any more RPCs." + ) + } + } + + let applicableConfiguration = self.resolveMethodConfiguration( + descriptor: descriptor, + clientConfigurations: configurationOverrides + ) + + return try await ClientRPCExecutor.execute( + request: ClientRequest.Stream(single: request), + method: descriptor, + configuration: applicableConfiguration, + serializer: serializer, + deserializer: deserializer, + transport: transport, + interceptors: interceptors, + handler: handler + ) + } + + /// Start a bidirectional streaming RPC. + /// + /// - Note: ``run()`` must have been called and still executing, and ``close()`` mustn't + /// have been called. + /// + /// - Parameters: + /// - request: The streaming request. + /// - descriptor: The method descriptor for which to execute this request. + /// - serializer: A request serializer. + /// - deserializer: A response deserializer. + /// - handler: A response stream handler. + /// + /// - Returns: The return value from the `handler`. + public func bidirectionalStreaming( + request: ClientRequest.Stream, + descriptor: MethodDescriptor, + serializer: some MessageSerializer, + deserializer: some MessageDeserializer, + handler: @Sendable @escaping (ClientResponse.Stream) async throws -> ReturnValue + ) async throws -> ReturnValue { + let (configurationOverrides, interceptors) = try self.storage.withLockedValue { storage in + switch storage.state { + case .running: + return (storage.methodConfigurationOverrides, storage.interceptors.values) + case .notStarted: + throw ClientError( + code: .clientIsNotRunning, + message: "Client must be running to make an RPC: call run() first." + ) + case .stopping, .stopped: + throw ClientError( + code: .clientIsStopped, + message: "Client has been stopped. Can't make any more RPCs." + ) + } + } + + let applicableConfiguration = self.resolveMethodConfiguration( + descriptor: descriptor, + clientConfigurations: configurationOverrides + ) + + return try await ClientRPCExecutor.execute( + request: request, + method: descriptor, + configuration: applicableConfiguration, + serializer: serializer, + deserializer: deserializer, + transport: transport, + interceptors: interceptors, + handler: handler + ) + } + + private func resolveMethodConfiguration( + descriptor: MethodDescriptor, + clientConfigurations configurationOverrides: MethodConfigurationRegistry + ) -> MethodConfiguration { + if let clientOverride = configurationOverrides[descriptor, useDefault: false] { + return clientOverride + } + + if let transportConfiguration = self.transport.executionConfiguration(forMethod: descriptor) { + return transportConfiguration + } + + return configurationOverrides[descriptor] + } +} + +@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) +extension Client { + /// A collection of interceptors providing cross-cutting functionality to each accepted RPC. + public struct Interceptors: Sendable { + private(set) var values: [any ClientInterceptor] = [] + + /// Add an interceptor to the server. + /// + /// The order in which interceptors are added reflects the order in which they are called. The + /// first interceptor added will be the first interceptor to intercept each request. The last + /// interceptor added will be the final interceptor to intercept each request before calling + /// the appropriate handler. + /// + /// - Parameter interceptor: The interceptor to add. + public mutating func add(_ interceptor: some ClientInterceptor) { + self.values.append(interceptor) + } + } +} + +@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) +extension Client { + /// The execution policy for an RPC. + public enum ExecutionPolicy: Hashable, Sendable { + /// Policy for retrying an RPC. + /// + /// See ``RetryPolicy`` for more details. + case retry(MethodConfiguration.RetryPolicy) + + /// Policy for hedging an RPC. + /// + /// See ``HedgingPolicy`` for more details. + case hedge(MethodConfiguration.HedgingPolicy) + } + + /// Configuration values for executing an RPC. + public struct MethodConfiguration: Hashable, Sendable { + /// The default timeout for the RPC. + /// + /// If no reply is received in the specified amount of time the request is aborted + /// with an ``RPCError`` with code ``RPCError/Code/deadlineExceeded``. + /// + /// The actual deadline used will be the minimum of the value specified here + /// and the value set by the application by the client API. If either one isn't set + /// then the other value is used. If neither is set then the request has no deadline. + /// + /// The timeout applies to the overall execution of an RPC. If, for example, a retry + /// policy is set then the timeout begins when the first attempt is started and _isn't_ reset + /// when subsequent attempts start. + public var timeout: Duration? + + /// The policy determining how many times, and when, the RPC is executed. + /// + /// There are two policy types: + /// 1. Retry + /// 2. Hedging + /// + /// The retry policy allows an RPC to be retried a limited number of times if the RPC + /// fails with one of the configured set of status codes. RPCs are only retried if they + /// fail immediately, that is, the first response part received from the server is a + /// status code. + /// + /// The hedging policy allows an RPC to be executed multiple times concurrently. Typically + /// each execution will be staggered by some delay. The first successful response will be + /// reported to the client. Hedging is only suitable for idempotent RPCs. + public var executionPolicy: ExecutionPolicy? + + /// Create an execution configuration. + /// + /// - Parameters: + /// - executionPolicy: The execution policy to use for the RPC. + /// - timeout: The default timeout for the RPC. + public init( + executionPolicy: ExecutionPolicy?, + timeout: Duration? + ) { + self.executionPolicy = executionPolicy + self.timeout = timeout + } + + /// Create an execution configuration with a retry policy. + /// + /// - Parameters: + /// - retryPolicy: The policy for retrying the RPC. + /// - timeout: The default timeout for the RPC. + public init( + retryPolicy: RetryPolicy, + timeout: Duration? = nil + ) { + self.executionPolicy = .retry(retryPolicy) + self.timeout = timeout + } + + /// Create an execution configuration with a hedging policy. + /// + /// - Parameters: + /// - hedgingPolicy: The policy for hedging the RPC. + /// - timeout: The default timeout for the RPC. + public init( + hedgingPolicy: HedgingPolicy, + timeout: Duration? = nil + ) { + self.executionPolicy = .hedge(hedgingPolicy) + self.timeout = timeout + } + } +} + +@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) +extension Client.MethodConfiguration { + /// Policy for retrying an RPC. + /// + /// gRPC retries RPCs when the first response from the server is a status code which matches + /// one of the configured retryable status codes. If the server begins processing the RPC and + /// first responds with metadata and later responds with a retryable status code then the RPC + /// won't be retried. + /// + /// Execution attempts are limited by ``maximumAttempts`` which includes the original attempt. The + /// maximum number of attempts is limited to five. + /// + /// Subsequent attempts are executed after some delay. The first _retry_, or second attempt, will + /// be started after a randomly chosen delay between zero and ``initialBackoff``. More generally, + /// the nth retry will happen after a randomly chosen delay between zero + /// and `min(initialBackoff * backoffMultiplier^(n-1), maximumBackoff)`. + /// + /// For more information see [gRFC A6 Client + /// Retries](https://github.com/grpc/proposal/blob/master/A6-client-retries.md). + public struct RetryPolicy: Hashable, Sendable { + /// The maximum number of RPC attempts, including the original attempt. + /// + /// Must be greater than one, values greater than five are treated as five. + public var maximumAttempts: Int { + didSet { self.maximumAttempts = validateMaxAttempts(self.maximumAttempts) } + } + + /// The initial backoff duration. + /// + /// The initial retry will occur after a random amount of time up to this value. + /// + /// - Precondition: Must be greater than zero. + public var initialBackoff: Duration { + willSet { Self.validateInitialBackoff(newValue) } + } + + /// The maximum amount of time to backoff for. + /// + /// - Precondition: Must be greater than zero. + public var maximumBackoff: Duration { + willSet { Self.validateMaxBackoff(newValue) } + } + + /// The multiplier to apply to backoff. + /// + /// - Precondition: Must be greater than zero. + public var backoffMultiplier: Double { + willSet { Self.validateBackoffMultiplier(newValue) } + } + + /// The set of status codes which may be retried. + /// + /// - Precondition: Must not be empty. + public var retryableStatusCodes: Set { + willSet { Self.validateRetryableStatusCodes(newValue) } + } + + /// Create a new retry policy. + /// + /// - Parameters: + /// - maximumAttempts: The maximum number of attempts allowed for the RPC. + /// - initialBackoff: The initial backoff period for the first retry attempt. Must be + /// greater than zero. + /// - maximumBackoff: The maximum period of time to wait between attempts. Must be greater than + /// zero. + /// - backoffMultiplier: The exponential backoff multiplier. Must be greater than zero. + /// - retryableStatusCodes: The set of status codes which may be retried. Must not be empty. + /// - Precondition: `maximumAttempts`, `initialBackoff`, `maximumBackoff` and `backoffMultiplier` + /// must be greater than zero. + /// - Precondition: `retryableStatusCodes` must not be empty. + public init( + maximumAttempts: Int, + initialBackoff: Duration, + maximumBackoff: Duration, + backoffMultiplier: Double, + retryableStatusCodes: Set + ) { + self.maximumAttempts = validateMaxAttempts(maximumAttempts) + + Self.validateInitialBackoff(initialBackoff) + self.initialBackoff = initialBackoff + + Self.validateMaxBackoff(maximumBackoff) + self.maximumBackoff = maximumBackoff + + Self.validateBackoffMultiplier(backoffMultiplier) + self.backoffMultiplier = backoffMultiplier + + Self.validateRetryableStatusCodes(retryableStatusCodes) + self.retryableStatusCodes = retryableStatusCodes + } + + private static func validateInitialBackoff(_ value: Duration) { + precondition(value.isGreaterThanZero, "initialBackoff must be greater than zero") + } + + private static func validateMaxBackoff(_ value: Duration) { + precondition(value.isGreaterThanZero, "maximumBackoff must be greater than zero") + } + + private static func validateBackoffMultiplier(_ value: Double) { + precondition(value > 0, "backoffMultiplier must be greater than zero") + } + + private static func validateRetryableStatusCodes(_ value: Set) { + precondition(!value.isEmpty, "retryableStatusCodes mustn't be empty") + } + } + + /// Policy for hedging an RPC. + /// + /// Hedged RPCs may execute more than once on a server so only idempotent methods should + /// be hedged. + /// + /// gRPC executes the RPC at most ``maximumAttempts`` times, staggering each attempt + /// by ``hedgingDelay``. + /// + /// For more information see [gRFC A6 Client + /// Retries](https://github.com/grpc/proposal/blob/master/A6-client-retries.md). + public struct HedgingPolicy: Hashable, Sendable { + /// The maximum number of RPC attempts, including the original attempt. + /// + /// Values greater than five are treated as five. + /// + /// - Precondition: Must be greater than one. + public var maximumAttempts: Int { + didSet { self.maximumAttempts = validateMaxAttempts(self.maximumAttempts) } + } + + /// The first RPC will be sent immediately, but each subsequent RPC will be sent at intervals + /// of `hedgingDelay`. Set this to zero to immediately send all RPCs. + public var hedgingDelay: Duration { + willSet { Self.validateHedgingDelay(newValue) } + } + + /// The set of status codes which indicate other hedged RPCs may still succeed. + /// + /// If a non-fatal status code is returned by the server, hedged RPCs will continue. + /// Otherwise, outstanding requests will be cancelled and the error returned to the + /// application layer. + public var nonFatalStatusCodes: Set + + /// Create a new hedging policy. + /// + /// - Parameters: + /// - maximumAttempts: The maximum number of attempts allowed for the RPC. + /// - hedgingDelay: The delay between each hedged RPC. + /// - nonFatalStatusCodes: The set of status codes which indicated other hedged RPCs may still + /// succeed. + /// - Precondition: `maximumAttempts` must be greater than zero. + public init( + maximumAttempts: Int, + hedgingDelay: Duration, + nonFatalStatusCodes: Set + ) { + self.maximumAttempts = validateMaxAttempts(maximumAttempts) + + Self.validateHedgingDelay(hedgingDelay) + self.hedgingDelay = hedgingDelay + self.nonFatalStatusCodes = nonFatalStatusCodes + } + + private static func validateHedgingDelay(_ value: Duration) { + precondition( + value.isGreaterThanOrEqualToZero, + "hedgingDelay must be greater than or equal to zero" + ) + } + } + + fileprivate static func validateMaxAttempts(_ value: Int) -> Int { + precondition(value > 0, "maximumAttempts must be greater than zero") + return min(value, 5) + } +} + +@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) +extension Client { + /// A collection of ``ClientRPCExecutionConfiguration``s, mapped to specific methods or services. + /// + /// When creating a new instance, you must provide a default configuration to be used when getting + /// a configuration for a method that has not been given a specific override. + /// Use ``setDefaultConfiguration(_:forService:)`` to set a specific override for a whole + /// service. + /// + /// Use the subscript to get and set configurations for methods. + public struct MethodConfigurationRegistry: Sendable, Hashable { + private var elements: [MethodDescriptor: MethodConfiguration] + private let defaultConfiguration: MethodConfiguration + + public init( + defaultConfiguration: MethodConfiguration = MethodConfiguration( + executionPolicy: nil, + timeout: nil + ) + ) { + self.elements = [:] + self.defaultConfiguration = defaultConfiguration + } + + /// Get the corresponding ``MethodConfiguration`` for the given ``MethodDescriptor``. + /// + /// If `useDefault` is true, then fall back to the default configuration given in ``init(defaultConfiguration:)`` + /// if there is no set configuration for the descriptor. Otherwise, return `nil`. + /// + /// - Parameters: + /// - descriptor: The ``MethodDescriptor`` for which to get a ``MethodConfiguration``. + /// - useDefault: Whether the default value should be returned if no configuration was specified + /// for the given descriptor. + public subscript(_ descriptor: MethodDescriptor, useDefault useDefault: Bool) + -> MethodConfiguration? + { + get { + if let methodLevelOverride = self.elements[descriptor] { + return methodLevelOverride + } + var serviceLevelDescriptor = descriptor + serviceLevelDescriptor.method = "" + + if useDefault { + return self.elements[serviceLevelDescriptor, default: self.defaultConfiguration] + } else { + return self.elements[serviceLevelDescriptor] + } + } + } + + /// Get or set the corresponding ``MethodConfiguration`` for the given ``MethodDescriptor``. + /// + /// If no configuration has been set for the given descriptor, the value returned will be the default + /// passed in ``init(defaultConfiguration:)`` + /// + /// - Parameters: + /// - descriptor: The ``MethodDescriptor`` for which to get or set a ``MethodConfiguration``. + public subscript(_ descriptor: MethodDescriptor) -> MethodConfiguration { + get { + // This force unwrap is safe, because we'll always have a default value + // present, and we'll always use it if `useDefault` is true. + self[descriptor, useDefault: true]! + } + + set { + precondition( + !descriptor.service.isEmpty, + "Method descriptor's service cannot be empty." + ) + + self.elements[descriptor] = newValue + } + } + + /// Set a default configuration for a service. + /// + /// If getting a configuration for a method that's part of a service, and the method itself doesn't have an + /// override, then this configuration will be used instead of the default configuration passed when creating + /// this instance of ``ClientRPCExecutionConfigurationCollection``. + /// + /// - Parameters: + /// - configuration: The default configuration for the service. + /// - service: The name of the service for which this override applies. + public mutating func setDefaultConfiguration( + _ configuration: MethodConfiguration, + forService service: String + ) { + self[MethodDescriptor(service: service, method: "")] = configuration + } + } +} + +@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) +extension Duration { + fileprivate var isGreaterThanZero: Bool { + self.components.seconds > 0 || self.components.attoseconds > 0 + } + + fileprivate var isGreaterThanOrEqualToZero: Bool { + self.components.seconds >= 0 || self.components.attoseconds >= 0 + } +} diff --git a/Sources/GRPCCore/ClientError.swift b/Sources/GRPCCore/ClientError.swift new file mode 100644 index 000000000..3b86235cb --- /dev/null +++ b/Sources/GRPCCore/ClientError.swift @@ -0,0 +1,142 @@ +/* + * Copyright 2023, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/// A runtime error thrown by the client. +/// +/// In contrast to ``RPCError``, the ``ClientError`` represents errors which happen at a scope +/// wider than an individual RPC. For example, attempting to start a client which is already +/// stopped would result in a ``ClientError``. +public struct ClientError: Error, Hashable, @unchecked Sendable { + private var storage: Storage + + // Ensures the underlying storage is unique. + private mutating func ensureUniqueStorage() { + if !isKnownUniquelyReferenced(&self.storage) { + self.storage = self.storage.copy() + } + } + + /// The code indicating the domain of the error. + public var code: Code { + get { self.storage.code } + set { + self.ensureUniqueStorage() + self.storage.code = newValue + } + } + + /// A message providing more details about the error which may include details specific to this + /// instance of the error. + public var message: String { + get { self.storage.message } + set { + self.ensureUniqueStorage() + self.storage.message = newValue + } + } + + /// The original error which led to this error being thrown. + public var cause: Error? { + get { self.storage.cause } + set { + self.ensureUniqueStorage() + self.storage.cause = newValue + } + } + + /// Creates a new error. + /// + /// - Parameters: + /// - code: The error code. + /// - message: A description of the error. + /// - cause: The original error which led to this error being thrown. + public init(code: Code, message: String, cause: Error? = nil) { + self.storage = Storage(code: code, message: message, cause: cause) + } +} + +extension ClientError: CustomStringConvertible { + public var description: String { + if let cause = self.cause { + return "\(self.code): \"\(self.message)\" (cause: \"\(cause)\")" + } else { + return "\(self.code): \"\(self.message)\"" + } + } +} + +extension ClientError { + private final class Storage: Hashable { + var code: Code + var message: String + var cause: Error? + + init(code: Code, message: String, cause: Error?) { + self.code = code + self.message = message + self.cause = cause + } + + func copy() -> Storage { + return Storage(code: self.code, message: self.message, cause: self.cause) + } + + func hash(into hasher: inout Hasher) { + hasher.combine(self.code) + hasher.combine(self.message) + } + + static func == (lhs: Storage, rhs: Storage) -> Bool { + return lhs.code == rhs.code && lhs.message == rhs.message + } + } +} + +extension ClientError { + public struct Code: Hashable, Sendable { + private enum Value { + case clientIsAlreadyRunning + case clientIsNotRunning + case clientIsStopped + } + + private var value: Value + private init(_ value: Value) { + self.value = value + } + + /// At attempt to start the client was made but it is already running. + public static var clientIsAlreadyRunning: Self { + Self(.clientIsAlreadyRunning) + } + + /// An attempt to start an RPC was made but the client is not running. + public static var clientIsNotRunning: Self { + Self(.clientIsNotRunning) + } + + /// At attempt to start the client was made but it has already stopped. + public static var clientIsStopped: Self { + Self(.clientIsStopped) + } + } +} + +extension ClientError.Code: CustomStringConvertible { + public var description: String { + String(describing: self.value) + } +} diff --git a/Sources/GRPCCore/Transport/ClientTransport.swift b/Sources/GRPCCore/Transport/ClientTransport.swift index 6e7d20bcb..be83c6452 100644 --- a/Sources/GRPCCore/Transport/ClientTransport.swift +++ b/Sources/GRPCCore/Transport/ClientTransport.swift @@ -16,8 +16,8 @@ @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) public protocol ClientTransport: Sendable { - associatedtype Inbound: (AsyncSequence & Sendable) where Inbound.Element == RPCResponsePart - associatedtype Outbound: ClosableRPCWriterProtocol + typealias Inbound = RPCAsyncSequence + typealias Outbound = RPCWriter.Closable /// Returns a throttle which gRPC uses to determine whether retries can be executed. /// @@ -77,5 +77,5 @@ public protocol ClientTransport: Sendable { /// - Returns: Execution configuration for the method, if it exists. func executionConfiguration( forMethod descriptor: MethodDescriptor - ) -> ClientRPCExecutionConfiguration? + ) -> Client.MethodConfiguration? } diff --git a/Sources/GRPCCore/Transport/InProcessClientTransport.swift b/Sources/GRPCCore/Transport/InProcessClientTransport.swift index 47b1bbf73..aa902e338 100644 --- a/Sources/GRPCCore/Transport/InProcessClientTransport.swift +++ b/Sources/GRPCCore/Transport/InProcessClientTransport.swift @@ -96,12 +96,12 @@ public struct InProcessClientTransport: ClientTransport { public let retryThrottle: RetryThrottle - private let executionConfigurations: ClientRPCExecutionConfigurationCollection + private let executionConfigurations: Client.MethodConfigurationRegistry private let state: LockedValueBox public init( server: InProcessServerTransport, - executionConfigurations: ClientRPCExecutionConfigurationCollection + executionConfigurations: Client.MethodConfigurationRegistry ) { self.retryThrottle = RetryThrottle(maximumTokens: 10, tokenRatio: 0.1) self.executionConfigurations = executionConfigurations @@ -329,7 +329,7 @@ public struct InProcessClientTransport: ClientTransport { /// - Returns: Execution configuration for the method, if it exists. public func executionConfiguration( forMethod descriptor: MethodDescriptor - ) -> ClientRPCExecutionConfiguration? { + ) -> Client.MethodConfiguration? { self.executionConfigurations[descriptor] } } diff --git a/Tests/GRPCCoreTests/Call/Client/ClientRPCExecutionConfigurationCollectionTests.swift b/Tests/GRPCCoreTests/Call/Client/ClientRPCExecutionConfigurationCollectionTests.swift index 38f964652..e65cb4dd6 100644 --- a/Tests/GRPCCoreTests/Call/Client/ClientRPCExecutionConfigurationCollectionTests.swift +++ b/Tests/GRPCCoreTests/Call/Client/ClientRPCExecutionConfigurationCollectionTests.swift @@ -18,49 +18,54 @@ import XCTest @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) final class ClientRPCExecutionConfigurationCollectionTests: XCTestCase { - func testGetConfigurationForKnownMethod() { - let policy = HedgingPolicy( + func testGetConfigurationForKnownMethod() async throws { + let first = ContinuousClock.now + let second = first.advanced(by: .seconds(1)) + let result = second.duration(to: first) + print(result.components) + + let policy = Client.MethodConfiguration.HedgingPolicy( maximumAttempts: 10, hedgingDelay: .seconds(1), nonFatalStatusCodes: [] ) - let defaultConfiguration = ClientRPCExecutionConfiguration(hedgingPolicy: policy) - var configurations = ClientRPCExecutionConfigurationCollection( + let defaultConfiguration = Client.MethodConfiguration(hedgingPolicy: policy) + var configurations = Client.MethodConfigurationRegistry( defaultConfiguration: defaultConfiguration ) let descriptor = MethodDescriptor(service: "test", method: "first") - let retryPolicy = RetryPolicy( + let retryPolicy = Client.MethodConfiguration.RetryPolicy( maximumAttempts: 10, initialBackoff: .seconds(1), maximumBackoff: .seconds(1), backoffMultiplier: 1.0, retryableStatusCodes: [.unavailable] ) - let overrideConfiguration = ClientRPCExecutionConfiguration(retryPolicy: retryPolicy) + let overrideConfiguration = Client.MethodConfiguration(retryPolicy: retryPolicy) configurations[descriptor] = overrideConfiguration XCTAssertEqual(configurations[descriptor], overrideConfiguration) } func testGetConfigurationForUnknownMethodButServiceOverride() { - let policy = HedgingPolicy( + let policy = Client.MethodConfiguration.HedgingPolicy( maximumAttempts: 10, hedgingDelay: .seconds(1), nonFatalStatusCodes: [] ) - let defaultConfiguration = ClientRPCExecutionConfiguration(hedgingPolicy: policy) - var configurations = ClientRPCExecutionConfigurationCollection( + let defaultConfiguration = Client.MethodConfiguration(hedgingPolicy: policy) + var configurations = Client.MethodConfigurationRegistry( defaultConfiguration: defaultConfiguration ) let firstDescriptor = MethodDescriptor(service: "test", method: "") - let retryPolicy = RetryPolicy( + let retryPolicy = Client.MethodConfiguration.RetryPolicy( maximumAttempts: 10, initialBackoff: .seconds(1), maximumBackoff: .seconds(1), backoffMultiplier: 1.0, retryableStatusCodes: [.unavailable] ) - let overrideConfiguration = ClientRPCExecutionConfiguration(retryPolicy: retryPolicy) + let overrideConfiguration = Client.MethodConfiguration(retryPolicy: retryPolicy) configurations[firstDescriptor] = overrideConfiguration let secondDescriptor = MethodDescriptor(service: "test", method: "second") @@ -68,24 +73,24 @@ final class ClientRPCExecutionConfigurationCollectionTests: XCTestCase { } func testGetConfigurationForUnknownMethodDefaultValue() { - let policy = HedgingPolicy( + let policy = Client.MethodConfiguration.HedgingPolicy( maximumAttempts: 10, hedgingDelay: .seconds(1), nonFatalStatusCodes: [] ) - let defaultConfiguration = ClientRPCExecutionConfiguration(hedgingPolicy: policy) - var configurations = ClientRPCExecutionConfigurationCollection( + let defaultConfiguration = Client.MethodConfiguration(hedgingPolicy: policy) + var configurations = Client.MethodConfigurationRegistry( defaultConfiguration: defaultConfiguration ) let firstDescriptor = MethodDescriptor(service: "test1", method: "first") - let retryPolicy = RetryPolicy( + let retryPolicy = Client.MethodConfiguration.RetryPolicy( maximumAttempts: 10, initialBackoff: .seconds(1), maximumBackoff: .seconds(1), backoffMultiplier: 1.0, retryableStatusCodes: [.unavailable] ) - let overrideConfiguration = ClientRPCExecutionConfiguration(retryPolicy: retryPolicy) + let overrideConfiguration = Client.MethodConfiguration(retryPolicy: retryPolicy) configurations[firstDescriptor] = overrideConfiguration let secondDescriptor = MethodDescriptor(service: "test2", method: "second") diff --git a/Tests/GRPCCoreTests/Call/Client/ClientRPCExecutionConfigurationTests.swift b/Tests/GRPCCoreTests/Call/Client/ClientRPCExecutionConfigurationTests.swift index 768b808b2..8c25d543d 100644 --- a/Tests/GRPCCoreTests/Call/Client/ClientRPCExecutionConfigurationTests.swift +++ b/Tests/GRPCCoreTests/Call/Client/ClientRPCExecutionConfigurationTests.swift @@ -19,7 +19,7 @@ import XCTest @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) final class ClientRPCExecutionConfigurationTests: XCTestCase { func testRetryPolicyClampsMaxAttempts() { - var policy = RetryPolicy( + var policy = Client.MethodConfiguration.RetryPolicy( maximumAttempts: 10, initialBackoff: .seconds(1), maximumBackoff: .seconds(1), @@ -35,7 +35,7 @@ final class ClientRPCExecutionConfigurationTests: XCTestCase { } func testHedgingPolicyClampsMaxAttempts() { - var policy = HedgingPolicy( + var policy = Client.MethodConfiguration.HedgingPolicy( maximumAttempts: 10, hedgingDelay: .seconds(1), nonFatalStatusCodes: [] diff --git a/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness.swift b/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness.swift index 3b546d1e0..9e3daf098 100644 --- a/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness.swift +++ b/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness.swift @@ -67,7 +67,7 @@ struct ClientRPCExecutorTestHarness { func unary( request: ClientRequest.Single<[UInt8]>, - configuration: ClientRPCExecutionConfiguration? = nil, + configuration: Client.MethodConfiguration? = nil, handler: @escaping @Sendable (ClientResponse.Single<[UInt8]>) async throws -> Void ) async throws { try await self.bidirectional( @@ -80,7 +80,7 @@ struct ClientRPCExecutorTestHarness { func clientStreaming( request: ClientRequest.Stream<[UInt8]>, - configuration: ClientRPCExecutionConfiguration? = nil, + configuration: Client.MethodConfiguration? = nil, handler: @escaping @Sendable (ClientResponse.Single<[UInt8]>) async throws -> Void ) async throws { try await self.bidirectional( @@ -93,7 +93,7 @@ struct ClientRPCExecutorTestHarness { func serverStreaming( request: ClientRequest.Single<[UInt8]>, - configuration: ClientRPCExecutionConfiguration? = nil, + configuration: Client.MethodConfiguration? = nil, handler: @escaping @Sendable (ClientResponse.Stream<[UInt8]>) async throws -> Void ) async throws { try await self.bidirectional( @@ -106,7 +106,7 @@ struct ClientRPCExecutorTestHarness { func bidirectional( request: ClientRequest.Stream<[UInt8]>, - configuration: ClientRPCExecutionConfiguration? = nil, + configuration: Client.MethodConfiguration? = nil, handler: @escaping @Sendable (ClientResponse.Stream<[UInt8]>) async throws -> Void ) async throws { try await self.execute( @@ -122,7 +122,7 @@ struct ClientRPCExecutorTestHarness { request: ClientRequest.Stream, serializer: some MessageSerializer, deserializer: some MessageDeserializer, - configuration: ClientRPCExecutionConfiguration?, + configuration: Client.MethodConfiguration?, handler: @escaping @Sendable (ClientResponse.Stream) async throws -> Void ) async throws { try await withThrowingTaskGroup(of: Void.self) { group in @@ -141,11 +141,11 @@ struct ClientRPCExecutorTestHarness { try await self.clientTransport.connect(lazily: false) } - let executionConfiguration: ClientRPCExecutionConfiguration + let executionConfiguration: Client.MethodConfiguration if let configuration = configuration { executionConfiguration = configuration } else { - executionConfiguration = ClientRPCExecutionConfiguration(executionPolicy: nil, timeout: nil) + executionConfiguration = Client.MethodConfiguration(executionPolicy: nil, timeout: nil) } // Execute the request. diff --git a/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTests+Hedging.swift b/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTests+Hedging.swift index ab6a6fca5..9ec43192a 100644 --- a/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTests+Hedging.swift +++ b/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTests+Hedging.swift @@ -187,7 +187,7 @@ extension ClientRPCExecutorTests { } @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) -extension ClientRPCExecutionConfiguration { +extension Client.MethodConfiguration { fileprivate static func hedge( maximumAttempts: Int = 5, delay: Duration = .milliseconds(25), @@ -200,6 +200,6 @@ extension ClientRPCExecutionConfiguration { nonFatalStatusCodes: nonFatalCodes ) - return ClientRPCExecutionConfiguration(hedgingPolicy: policy, timeout: timeout) + return Client.MethodConfiguration(hedgingPolicy: policy, timeout: timeout) } } diff --git a/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTests+Retries.swift b/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTests+Retries.swift index a313148ea..a24f16567 100644 --- a/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTests+Retries.swift +++ b/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTests+Retries.swift @@ -219,7 +219,7 @@ extension ClientRPCExecutorTests { } ) - let retryPolicy = RetryPolicy( + let retryPolicy = Client.MethodConfiguration.RetryPolicy( maximumAttempts: 5, initialBackoff: .seconds(60), maximumBackoff: .seconds(50), @@ -256,7 +256,7 @@ extension ClientRPCExecutorTests { ) ) - let retryPolicy = RetryPolicy( + let retryPolicy = Client.MethodConfiguration.RetryPolicy( maximumAttempts: 5, initialBackoff: .seconds(60), maximumBackoff: .seconds(50), @@ -286,7 +286,7 @@ extension ClientRPCExecutorTests { } @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) -extension ClientRPCExecutionConfiguration { +extension Client.MethodConfiguration { fileprivate static func retry( maximumAttempts: Int = 5, codes: Set, @@ -300,6 +300,6 @@ extension ClientRPCExecutionConfiguration { retryableStatusCodes: codes ) - return ClientRPCExecutionConfiguration(retryPolicy: policy, timeout: timeout) + return Client.MethodConfiguration(retryPolicy: policy, timeout: timeout) } } diff --git a/Tests/GRPCCoreTests/Call/Client/RetryDelaySequenceTests.swift b/Tests/GRPCCoreTests/Call/Client/RetryDelaySequenceTests.swift index 09bc182cd..d144c9227 100644 --- a/Tests/GRPCCoreTests/Call/Client/RetryDelaySequenceTests.swift +++ b/Tests/GRPCCoreTests/Call/Client/RetryDelaySequenceTests.swift @@ -20,7 +20,7 @@ import XCTest @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) final class RetryDelaySequenceTests: XCTestCase { func testSequence() { - let policy = RetryPolicy( + let policy = Client.MethodConfiguration.RetryPolicy( maximumAttempts: 1, // ignored here initialBackoff: .seconds(1), maximumBackoff: .seconds(8), @@ -40,7 +40,7 @@ final class RetryDelaySequenceTests: XCTestCase { } func testSequenceSupportsMultipleIteration() { - let policy = RetryPolicy( + let policy = Client.MethodConfiguration.RetryPolicy( maximumAttempts: 1, // ignored here initialBackoff: .seconds(1), maximumBackoff: .seconds(8), diff --git a/Tests/GRPCCoreTests/GRPCServerTests.swift b/Tests/GRPCCoreTests/GRPCServerTests.swift index e99a46914..69eb684c8 100644 --- a/Tests/GRPCCoreTests/GRPCServerTests.swift +++ b/Tests/GRPCCoreTests/GRPCServerTests.swift @@ -23,7 +23,7 @@ final class GRPCServerTests: XCTestCase { let server = InProcessServerTransport() let client = InProcessClientTransport( server: server, - executionConfigurations: ClientRPCExecutionConfigurationCollection() + executionConfigurations: Client.MethodConfigurationRegistry() ) return (client, server) diff --git a/Tests/GRPCCoreTests/Test Utilities/Transport/AnyTransport.swift b/Tests/GRPCCoreTests/Test Utilities/Transport/AnyTransport.swift index fa64d76fd..f1c9a5617 100644 --- a/Tests/GRPCCoreTests/Test Utilities/Transport/AnyTransport.swift +++ b/Tests/GRPCCoreTests/Test Utilities/Transport/AnyTransport.swift @@ -28,7 +28,7 @@ struct AnyClientTransport: ClientTransport, Sendable { ) async throws -> Any private let _connect: @Sendable (Bool) async throws -> Void private let _close: @Sendable () -> Void - private let _configuration: @Sendable (MethodDescriptor) -> ClientRPCExecutionConfiguration? + private let _configuration: @Sendable (MethodDescriptor) -> Client.MethodConfiguration? init(wrapping transport: Transport) where Transport.Inbound == Inbound, Transport.Outbound == Outbound { @@ -74,7 +74,7 @@ struct AnyClientTransport: ClientTransport, Sendable { func executionConfiguration( forMethod descriptor: MethodDescriptor - ) -> ClientRPCExecutionConfiguration? { + ) -> Client.MethodConfiguration? { self._configuration(descriptor) } } diff --git a/Tests/GRPCCoreTests/Test Utilities/Transport/StreamCountingTransport.swift b/Tests/GRPCCoreTests/Test Utilities/Transport/StreamCountingTransport.swift index 11c294e03..b6f12a255 100644 --- a/Tests/GRPCCoreTests/Test Utilities/Transport/StreamCountingTransport.swift +++ b/Tests/GRPCCoreTests/Test Utilities/Transport/StreamCountingTransport.swift @@ -68,7 +68,7 @@ struct StreamCountingClientTransport: ClientTransport, Sendable { func executionConfiguration( forMethod descriptor: MethodDescriptor - ) -> ClientRPCExecutionConfiguration? { + ) -> Client.MethodConfiguration? { self.transport.executionConfiguration(forMethod: descriptor) } } diff --git a/Tests/GRPCCoreTests/Test Utilities/Transport/ThrowingTransport.swift b/Tests/GRPCCoreTests/Test Utilities/Transport/ThrowingTransport.swift index 0dd1cee9b..d3e724099 100644 --- a/Tests/GRPCCoreTests/Test Utilities/Transport/ThrowingTransport.swift +++ b/Tests/GRPCCoreTests/Test Utilities/Transport/ThrowingTransport.swift @@ -38,7 +38,7 @@ struct ThrowOnStreamCreationTransport: ClientTransport { func executionConfiguration( forMethod descriptor: MethodDescriptor - ) -> ClientRPCExecutionConfiguration? { + ) -> Client.MethodConfiguration? { return nil } diff --git a/Tests/GRPCCoreTests/Transport/InProcessClientTransportTests.swift b/Tests/GRPCCoreTests/Transport/InProcessClientTransportTests.swift index c7bbdef80..c5fc7013d 100644 --- a/Tests/GRPCCoreTests/Transport/InProcessClientTransportTests.swift +++ b/Tests/GRPCCoreTests/Transport/InProcessClientTransportTests.swift @@ -182,13 +182,13 @@ final class InProcessClientTransportTests: XCTestCase { } func testExecutionConfiguration() { - let policy = HedgingPolicy( + let policy = Client.MethodConfiguration.HedgingPolicy( maximumAttempts: 10, hedgingDelay: .seconds(1), nonFatalStatusCodes: [] ) - let defaultConfiguration = ClientRPCExecutionConfiguration(hedgingPolicy: policy) - var configurations = ClientRPCExecutionConfigurationCollection( + let defaultConfiguration = Client.MethodConfiguration(hedgingPolicy: policy) + var configurations = Client.MethodConfigurationRegistry( defaultConfiguration: defaultConfiguration ) @@ -197,14 +197,14 @@ final class InProcessClientTransportTests: XCTestCase { let firstDescriptor = MethodDescriptor(service: "test", method: "first") XCTAssertEqual(client.executionConfiguration(forMethod: firstDescriptor), defaultConfiguration) - let retryPolicy = RetryPolicy( + let retryPolicy = Client.MethodConfiguration.RetryPolicy( maximumAttempts: 10, initialBackoff: .seconds(1), maximumBackoff: .seconds(1), backoffMultiplier: 1.0, retryableStatusCodes: [.unavailable] ) - let overrideConfiguration = ClientRPCExecutionConfiguration(retryPolicy: retryPolicy) + let overrideConfiguration = Client.MethodConfiguration(retryPolicy: retryPolicy) configurations[firstDescriptor] = overrideConfiguration client = InProcessClientTransport(server: .init(), executionConfigurations: configurations) let secondDescriptor = MethodDescriptor(service: "test", method: "second") @@ -243,10 +243,10 @@ final class InProcessClientTransportTests: XCTestCase { } func makeClient( - configuration: ClientRPCExecutionConfiguration? = nil, + configuration: Client.MethodConfiguration? = nil, server: InProcessServerTransport = InProcessServerTransport() ) -> InProcessClientTransport { - let defaultPolicy = RetryPolicy( + let defaultPolicy = Client.MethodConfiguration.RetryPolicy( maximumAttempts: 10, initialBackoff: .seconds(1), maximumBackoff: .seconds(1), From 7d63e045bda39fe4526fa43088ca443e6771a141 Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Tue, 28 Nov 2023 16:51:21 +0000 Subject: [PATCH 2/5] Rename to GRPCClient --- .../ClientRPCExecutor+HedgingExecutor.swift | 6 ++-- .../ClientRPCExecutor+RetryExecutor.swift | 4 +-- .../Client/Internal/ClientRPCExecutor.swift | 2 +- .../Client/Internal/RetryDelaySequence.swift | 8 ++--- .../{Client.swift => GRPCClient.swift} | 19 ++++++------ Sources/GRPCCore/GRPCServer.swift | 2 +- .../GRPCCore/Transport/ClientTransport.swift | 2 +- .../Transport/InProcessClientTransport.swift | 6 ++-- ...xecutionConfigurationCollectionTests.swift | 30 +++++++++---------- ...ClientRPCExecutionConfigurationTests.swift | 4 +-- .../ClientRPCExecutorTestHarness.swift | 14 ++++----- .../ClientRPCExecutorTests+Hedging.swift | 4 +-- .../ClientRPCExecutorTests+Retries.swift | 8 ++--- .../Call/Client/RetryDelaySequenceTests.swift | 4 +-- Tests/GRPCCoreTests/GRPCServerTests.swift | 2 +- .../Transport/AnyTransport.swift | 4 +-- .../Transport/StreamCountingTransport.swift | 2 +- .../Transport/ThrowingTransport.swift | 2 +- .../InProcessClientTransportTests.swift | 14 ++++----- 19 files changed, 68 insertions(+), 69 deletions(-) rename Sources/GRPCCore/{Client.swift => GRPCClient.swift} (98%) diff --git a/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+HedgingExecutor.swift b/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+HedgingExecutor.swift index 11a665b5f..42002a263 100644 --- a/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+HedgingExecutor.swift +++ b/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+HedgingExecutor.swift @@ -30,7 +30,7 @@ extension ClientRPCExecutor { @usableFromInline let transport: Transport @usableFromInline - let policy: Client.MethodConfiguration.HedgingPolicy + let policy: GRPCClient.MethodConfiguration.HedgingPolicy @usableFromInline let timeout: Duration? @usableFromInline @@ -45,7 +45,7 @@ extension ClientRPCExecutor { @inlinable init( transport: Transport, - policy: Client.MethodConfiguration.HedgingPolicy, + policy: GRPCClient.MethodConfiguration.HedgingPolicy, timeout: Duration?, interceptors: [any ClientInterceptor], serializer: Serializer, @@ -420,7 +420,7 @@ extension ClientRPCExecutor.HedgingExecutor { private(set) var hasUsableResponse: Bool @inlinable - init(policy: Client.MethodConfiguration.HedgingPolicy) { + init(policy: GRPCClient.MethodConfiguration.HedgingPolicy) { self._maximumAttempts = policy.maximumAttempts self.attempt = 1 self.hasUsableResponse = false diff --git a/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+RetryExecutor.swift b/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+RetryExecutor.swift index 7720916c5..f433021f9 100644 --- a/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+RetryExecutor.swift +++ b/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+RetryExecutor.swift @@ -30,7 +30,7 @@ extension ClientRPCExecutor { @usableFromInline let transport: Transport @usableFromInline - let policy: Client.MethodConfiguration.RetryPolicy + let policy: GRPCClient.MethodConfiguration.RetryPolicy @usableFromInline let timeout: Duration? @usableFromInline @@ -45,7 +45,7 @@ extension ClientRPCExecutor { @inlinable init( transport: Transport, - policy: Client.MethodConfiguration.RetryPolicy, + policy: GRPCClient.MethodConfiguration.RetryPolicy, timeout: Duration?, interceptors: [any ClientInterceptor], serializer: Serializer, diff --git a/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor.swift b/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor.swift index a9452eb5e..213a8bc79 100644 --- a/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor.swift +++ b/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor.swift @@ -35,7 +35,7 @@ enum ClientRPCExecutor { static func execute( request: ClientRequest.Stream, method: MethodDescriptor, - configuration: Client.MethodConfiguration, + configuration: GRPCClient.MethodConfiguration, serializer: some MessageSerializer, deserializer: some MessageDeserializer, transport: some ClientTransport, diff --git a/Sources/GRPCCore/Call/Client/Internal/RetryDelaySequence.swift b/Sources/GRPCCore/Call/Client/Internal/RetryDelaySequence.swift index 6c61f40c4..57592bc13 100644 --- a/Sources/GRPCCore/Call/Client/Internal/RetryDelaySequence.swift +++ b/Sources/GRPCCore/Call/Client/Internal/RetryDelaySequence.swift @@ -26,10 +26,10 @@ struct RetryDelaySequence: Sequence { typealias Element = Duration @usableFromInline - let policy: Client.MethodConfiguration.RetryPolicy + let policy: GRPCClient.MethodConfiguration.RetryPolicy @inlinable - init(policy: Client.MethodConfiguration.RetryPolicy) { + init(policy: GRPCClient.MethodConfiguration.RetryPolicy) { self.policy = policy } @@ -41,12 +41,12 @@ struct RetryDelaySequence: Sequence { @usableFromInline struct Iterator: IteratorProtocol { @usableFromInline - let policy: Client.MethodConfiguration.RetryPolicy + let policy: GRPCClient.MethodConfiguration.RetryPolicy @usableFromInline private(set) var n = 1 @inlinable - init(policy: Client.MethodConfiguration.RetryPolicy) { + init(policy: GRPCClient.MethodConfiguration.RetryPolicy) { self.policy = policy } diff --git a/Sources/GRPCCore/Client.swift b/Sources/GRPCCore/GRPCClient.swift similarity index 98% rename from Sources/GRPCCore/Client.swift rename to Sources/GRPCCore/GRPCClient.swift index ffc6e85e3..1e72ba9e4 100644 --- a/Sources/GRPCCore/Client.swift +++ b/Sources/GRPCCore/GRPCClient.swift @@ -16,7 +16,7 @@ /// A gRPC client. /// -/// A ``Client`` communicates to a server via a given ``ClientTransport``. +/// A ``GRPCClient`` communicates to a server via a given ``ClientTransport``. /// You can start RPCs to the server by calling the corresponding method: /// - ``unary(request:descriptor:serializer:deserializer:handler:)`` /// - ``clientStreaming(request:descriptor:serializer:deserializer:handler:)`` @@ -35,7 +35,7 @@ /// ```swift /// // Create and add an in-process transport. /// let inProcessTransport = InProcessClientTransport() -/// let client = Client(transport: inProcessTransport) +/// let client = GRPCClient(transport: inProcessTransport) /// /// // Create and add some interceptors. /// client.interceptors.add(StatsRecordingServerInterceptors()) @@ -66,7 +66,7 @@ /// that need their lifecycles managed you should consider using [Swift Service /// Lifecycle](https://github.com/swift-server/swift-service-lifecycle). @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) -public final class Client: Sendable { +public final class GRPCClient: Sendable { /// A collection of ``ClientInterceptor`` implementations which are applied to all accepted /// RPCs. /// @@ -142,7 +142,7 @@ public final class Client: Sendable { /// /// - Note: Any changes to resources after ``run()`` has been called will be ignored. /// - /// - Parameter transport: The ``ClientTransport`` to be used for this ``Client``. + /// - Parameter transport: The ``ClientTransport`` to be used for this ``GRPCClient``. public init(transport: ClientTransport) { self.storage = LockedValueBox(Storage()) self.transport = transport @@ -155,11 +155,10 @@ public final class Client: Sendable { /// /// If you need to immediately stop all work, cancel the task executing this method. public func run() async throws { - let interceptors = try self.storage.withLockedValue { storage in + try self.storage.withLockedValue { storage in switch storage.state { case .notStarted: storage.state = .running - return storage.interceptors case .running: throw ClientError( code: .clientIsAlreadyRunning, @@ -436,7 +435,7 @@ public final class Client: Sendable { } @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) -extension Client { +extension GRPCClient { /// A collection of interceptors providing cross-cutting functionality to each accepted RPC. public struct Interceptors: Sendable { private(set) var values: [any ClientInterceptor] = [] @@ -456,7 +455,7 @@ extension Client { } @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) -extension Client { +extension GRPCClient { /// The execution policy for an RPC. public enum ExecutionPolicy: Hashable, Sendable { /// Policy for retrying an RPC. @@ -544,7 +543,7 @@ extension Client { } @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) -extension Client.MethodConfiguration { +extension GRPCClient.MethodConfiguration { /// Policy for retrying an RPC. /// /// gRPC retries RPCs when the first response from the server is a status code which matches @@ -720,7 +719,7 @@ extension Client.MethodConfiguration { } @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) -extension Client { +extension GRPCClient { /// A collection of ``ClientRPCExecutionConfiguration``s, mapped to specific methods or services. /// /// When creating a new instance, you must provide a default configuration to be used when getting diff --git a/Sources/GRPCCore/GRPCServer.swift b/Sources/GRPCCore/GRPCServer.swift index 07c92715e..626816f9b 100644 --- a/Sources/GRPCCore/GRPCServer.swift +++ b/Sources/GRPCCore/GRPCServer.swift @@ -23,7 +23,7 @@ import Atomics /// streams to a service to handle the RPC or rejects them with an appropriate error if no service /// can handle the RPC. /// -/// A ``Server`` may listen with multiple transports (for example, HTTP/2 and in-process) and route +/// A ``GRPCServer`` may listen with multiple transports (for example, HTTP/2 and in-process) and route /// requests from each transport to the same service instance. You can also use "interceptors", /// to implement cross-cutting logic which apply to all accepted RPCs. Example uses of interceptors /// include request filtering, authentication, and logging. Once requests have been intercepted diff --git a/Sources/GRPCCore/Transport/ClientTransport.swift b/Sources/GRPCCore/Transport/ClientTransport.swift index be83c6452..1233183b0 100644 --- a/Sources/GRPCCore/Transport/ClientTransport.swift +++ b/Sources/GRPCCore/Transport/ClientTransport.swift @@ -77,5 +77,5 @@ public protocol ClientTransport: Sendable { /// - Returns: Execution configuration for the method, if it exists. func executionConfiguration( forMethod descriptor: MethodDescriptor - ) -> Client.MethodConfiguration? + ) -> GRPCClient.MethodConfiguration? } diff --git a/Sources/GRPCCore/Transport/InProcessClientTransport.swift b/Sources/GRPCCore/Transport/InProcessClientTransport.swift index aa902e338..635a49895 100644 --- a/Sources/GRPCCore/Transport/InProcessClientTransport.swift +++ b/Sources/GRPCCore/Transport/InProcessClientTransport.swift @@ -96,12 +96,12 @@ public struct InProcessClientTransport: ClientTransport { public let retryThrottle: RetryThrottle - private let executionConfigurations: Client.MethodConfigurationRegistry + private let executionConfigurations: GRPCClient.MethodConfigurationRegistry private let state: LockedValueBox public init( server: InProcessServerTransport, - executionConfigurations: Client.MethodConfigurationRegistry + executionConfigurations: GRPCClient.MethodConfigurationRegistry ) { self.retryThrottle = RetryThrottle(maximumTokens: 10, tokenRatio: 0.1) self.executionConfigurations = executionConfigurations @@ -329,7 +329,7 @@ public struct InProcessClientTransport: ClientTransport { /// - Returns: Execution configuration for the method, if it exists. public func executionConfiguration( forMethod descriptor: MethodDescriptor - ) -> Client.MethodConfiguration? { + ) -> GRPCClient.MethodConfiguration? { self.executionConfigurations[descriptor] } } diff --git a/Tests/GRPCCoreTests/Call/Client/ClientRPCExecutionConfigurationCollectionTests.swift b/Tests/GRPCCoreTests/Call/Client/ClientRPCExecutionConfigurationCollectionTests.swift index e65cb4dd6..996aa638e 100644 --- a/Tests/GRPCCoreTests/Call/Client/ClientRPCExecutionConfigurationCollectionTests.swift +++ b/Tests/GRPCCoreTests/Call/Client/ClientRPCExecutionConfigurationCollectionTests.swift @@ -24,48 +24,48 @@ final class ClientRPCExecutionConfigurationCollectionTests: XCTestCase { let result = second.duration(to: first) print(result.components) - let policy = Client.MethodConfiguration.HedgingPolicy( + let policy = GRPCClient.MethodConfiguration.HedgingPolicy( maximumAttempts: 10, hedgingDelay: .seconds(1), nonFatalStatusCodes: [] ) - let defaultConfiguration = Client.MethodConfiguration(hedgingPolicy: policy) - var configurations = Client.MethodConfigurationRegistry( + let defaultConfiguration = GRPCClient.MethodConfiguration(hedgingPolicy: policy) + var configurations = GRPCClient.MethodConfigurationRegistry( defaultConfiguration: defaultConfiguration ) let descriptor = MethodDescriptor(service: "test", method: "first") - let retryPolicy = Client.MethodConfiguration.RetryPolicy( + let retryPolicy = GRPCClient.MethodConfiguration.RetryPolicy( maximumAttempts: 10, initialBackoff: .seconds(1), maximumBackoff: .seconds(1), backoffMultiplier: 1.0, retryableStatusCodes: [.unavailable] ) - let overrideConfiguration = Client.MethodConfiguration(retryPolicy: retryPolicy) + let overrideConfiguration = GRPCClient.MethodConfiguration(retryPolicy: retryPolicy) configurations[descriptor] = overrideConfiguration XCTAssertEqual(configurations[descriptor], overrideConfiguration) } func testGetConfigurationForUnknownMethodButServiceOverride() { - let policy = Client.MethodConfiguration.HedgingPolicy( + let policy = GRPCClient.MethodConfiguration.HedgingPolicy( maximumAttempts: 10, hedgingDelay: .seconds(1), nonFatalStatusCodes: [] ) - let defaultConfiguration = Client.MethodConfiguration(hedgingPolicy: policy) - var configurations = Client.MethodConfigurationRegistry( + let defaultConfiguration = GRPCClient.MethodConfiguration(hedgingPolicy: policy) + var configurations = GRPCClient.MethodConfigurationRegistry( defaultConfiguration: defaultConfiguration ) let firstDescriptor = MethodDescriptor(service: "test", method: "") - let retryPolicy = Client.MethodConfiguration.RetryPolicy( + let retryPolicy = GRPCClient.MethodConfiguration.RetryPolicy( maximumAttempts: 10, initialBackoff: .seconds(1), maximumBackoff: .seconds(1), backoffMultiplier: 1.0, retryableStatusCodes: [.unavailable] ) - let overrideConfiguration = Client.MethodConfiguration(retryPolicy: retryPolicy) + let overrideConfiguration = GRPCClient.MethodConfiguration(retryPolicy: retryPolicy) configurations[firstDescriptor] = overrideConfiguration let secondDescriptor = MethodDescriptor(service: "test", method: "second") @@ -73,24 +73,24 @@ final class ClientRPCExecutionConfigurationCollectionTests: XCTestCase { } func testGetConfigurationForUnknownMethodDefaultValue() { - let policy = Client.MethodConfiguration.HedgingPolicy( + let policy = GRPCClient.MethodConfiguration.HedgingPolicy( maximumAttempts: 10, hedgingDelay: .seconds(1), nonFatalStatusCodes: [] ) - let defaultConfiguration = Client.MethodConfiguration(hedgingPolicy: policy) - var configurations = Client.MethodConfigurationRegistry( + let defaultConfiguration = GRPCClient.MethodConfiguration(hedgingPolicy: policy) + var configurations = GRPCClient.MethodConfigurationRegistry( defaultConfiguration: defaultConfiguration ) let firstDescriptor = MethodDescriptor(service: "test1", method: "first") - let retryPolicy = Client.MethodConfiguration.RetryPolicy( + let retryPolicy = GRPCClient.MethodConfiguration.RetryPolicy( maximumAttempts: 10, initialBackoff: .seconds(1), maximumBackoff: .seconds(1), backoffMultiplier: 1.0, retryableStatusCodes: [.unavailable] ) - let overrideConfiguration = Client.MethodConfiguration(retryPolicy: retryPolicy) + let overrideConfiguration = GRPCClient.MethodConfiguration(retryPolicy: retryPolicy) configurations[firstDescriptor] = overrideConfiguration let secondDescriptor = MethodDescriptor(service: "test2", method: "second") diff --git a/Tests/GRPCCoreTests/Call/Client/ClientRPCExecutionConfigurationTests.swift b/Tests/GRPCCoreTests/Call/Client/ClientRPCExecutionConfigurationTests.swift index 8c25d543d..c635b2f05 100644 --- a/Tests/GRPCCoreTests/Call/Client/ClientRPCExecutionConfigurationTests.swift +++ b/Tests/GRPCCoreTests/Call/Client/ClientRPCExecutionConfigurationTests.swift @@ -19,7 +19,7 @@ import XCTest @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) final class ClientRPCExecutionConfigurationTests: XCTestCase { func testRetryPolicyClampsMaxAttempts() { - var policy = Client.MethodConfiguration.RetryPolicy( + var policy = GRPCClient.MethodConfiguration.RetryPolicy( maximumAttempts: 10, initialBackoff: .seconds(1), maximumBackoff: .seconds(1), @@ -35,7 +35,7 @@ final class ClientRPCExecutionConfigurationTests: XCTestCase { } func testHedgingPolicyClampsMaxAttempts() { - var policy = Client.MethodConfiguration.HedgingPolicy( + var policy = GRPCClient.MethodConfiguration.HedgingPolicy( maximumAttempts: 10, hedgingDelay: .seconds(1), nonFatalStatusCodes: [] diff --git a/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness.swift b/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness.swift index 9e3daf098..04a05a598 100644 --- a/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness.swift +++ b/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness.swift @@ -67,7 +67,7 @@ struct ClientRPCExecutorTestHarness { func unary( request: ClientRequest.Single<[UInt8]>, - configuration: Client.MethodConfiguration? = nil, + configuration: GRPCClient.MethodConfiguration? = nil, handler: @escaping @Sendable (ClientResponse.Single<[UInt8]>) async throws -> Void ) async throws { try await self.bidirectional( @@ -80,7 +80,7 @@ struct ClientRPCExecutorTestHarness { func clientStreaming( request: ClientRequest.Stream<[UInt8]>, - configuration: Client.MethodConfiguration? = nil, + configuration: GRPCClient.MethodConfiguration? = nil, handler: @escaping @Sendable (ClientResponse.Single<[UInt8]>) async throws -> Void ) async throws { try await self.bidirectional( @@ -93,7 +93,7 @@ struct ClientRPCExecutorTestHarness { func serverStreaming( request: ClientRequest.Single<[UInt8]>, - configuration: Client.MethodConfiguration? = nil, + configuration: GRPCClient.MethodConfiguration? = nil, handler: @escaping @Sendable (ClientResponse.Stream<[UInt8]>) async throws -> Void ) async throws { try await self.bidirectional( @@ -106,7 +106,7 @@ struct ClientRPCExecutorTestHarness { func bidirectional( request: ClientRequest.Stream<[UInt8]>, - configuration: Client.MethodConfiguration? = nil, + configuration: GRPCClient.MethodConfiguration? = nil, handler: @escaping @Sendable (ClientResponse.Stream<[UInt8]>) async throws -> Void ) async throws { try await self.execute( @@ -122,7 +122,7 @@ struct ClientRPCExecutorTestHarness { request: ClientRequest.Stream, serializer: some MessageSerializer, deserializer: some MessageDeserializer, - configuration: Client.MethodConfiguration?, + configuration: GRPCClient.MethodConfiguration?, handler: @escaping @Sendable (ClientResponse.Stream) async throws -> Void ) async throws { try await withThrowingTaskGroup(of: Void.self) { group in @@ -141,11 +141,11 @@ struct ClientRPCExecutorTestHarness { try await self.clientTransport.connect(lazily: false) } - let executionConfiguration: Client.MethodConfiguration + let executionConfiguration: GRPCClient.MethodConfiguration if let configuration = configuration { executionConfiguration = configuration } else { - executionConfiguration = Client.MethodConfiguration(executionPolicy: nil, timeout: nil) + executionConfiguration = GRPCClient.MethodConfiguration(executionPolicy: nil, timeout: nil) } // Execute the request. diff --git a/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTests+Hedging.swift b/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTests+Hedging.swift index 9ec43192a..bc994dcc3 100644 --- a/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTests+Hedging.swift +++ b/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTests+Hedging.swift @@ -187,7 +187,7 @@ extension ClientRPCExecutorTests { } @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) -extension Client.MethodConfiguration { +extension GRPCClient.MethodConfiguration { fileprivate static func hedge( maximumAttempts: Int = 5, delay: Duration = .milliseconds(25), @@ -200,6 +200,6 @@ extension Client.MethodConfiguration { nonFatalStatusCodes: nonFatalCodes ) - return Client.MethodConfiguration(hedgingPolicy: policy, timeout: timeout) + return GRPCClient.MethodConfiguration(hedgingPolicy: policy, timeout: timeout) } } diff --git a/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTests+Retries.swift b/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTests+Retries.swift index a24f16567..839abd11a 100644 --- a/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTests+Retries.swift +++ b/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTests+Retries.swift @@ -219,7 +219,7 @@ extension ClientRPCExecutorTests { } ) - let retryPolicy = Client.MethodConfiguration.RetryPolicy( + let retryPolicy = GRPCClient.MethodConfiguration.RetryPolicy( maximumAttempts: 5, initialBackoff: .seconds(60), maximumBackoff: .seconds(50), @@ -256,7 +256,7 @@ extension ClientRPCExecutorTests { ) ) - let retryPolicy = Client.MethodConfiguration.RetryPolicy( + let retryPolicy = GRPCClient.MethodConfiguration.RetryPolicy( maximumAttempts: 5, initialBackoff: .seconds(60), maximumBackoff: .seconds(50), @@ -286,7 +286,7 @@ extension ClientRPCExecutorTests { } @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) -extension Client.MethodConfiguration { +extension GRPCClient.MethodConfiguration { fileprivate static func retry( maximumAttempts: Int = 5, codes: Set, @@ -300,6 +300,6 @@ extension Client.MethodConfiguration { retryableStatusCodes: codes ) - return Client.MethodConfiguration(retryPolicy: policy, timeout: timeout) + return GRPCClient.MethodConfiguration(retryPolicy: policy, timeout: timeout) } } diff --git a/Tests/GRPCCoreTests/Call/Client/RetryDelaySequenceTests.swift b/Tests/GRPCCoreTests/Call/Client/RetryDelaySequenceTests.swift index d144c9227..6b261340f 100644 --- a/Tests/GRPCCoreTests/Call/Client/RetryDelaySequenceTests.swift +++ b/Tests/GRPCCoreTests/Call/Client/RetryDelaySequenceTests.swift @@ -20,7 +20,7 @@ import XCTest @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) final class RetryDelaySequenceTests: XCTestCase { func testSequence() { - let policy = Client.MethodConfiguration.RetryPolicy( + let policy = GRPCClient.MethodConfiguration.RetryPolicy( maximumAttempts: 1, // ignored here initialBackoff: .seconds(1), maximumBackoff: .seconds(8), @@ -40,7 +40,7 @@ final class RetryDelaySequenceTests: XCTestCase { } func testSequenceSupportsMultipleIteration() { - let policy = Client.MethodConfiguration.RetryPolicy( + let policy = GRPCClient.MethodConfiguration.RetryPolicy( maximumAttempts: 1, // ignored here initialBackoff: .seconds(1), maximumBackoff: .seconds(8), diff --git a/Tests/GRPCCoreTests/GRPCServerTests.swift b/Tests/GRPCCoreTests/GRPCServerTests.swift index 69eb684c8..b75f22d7c 100644 --- a/Tests/GRPCCoreTests/GRPCServerTests.swift +++ b/Tests/GRPCCoreTests/GRPCServerTests.swift @@ -23,7 +23,7 @@ final class GRPCServerTests: XCTestCase { let server = InProcessServerTransport() let client = InProcessClientTransport( server: server, - executionConfigurations: Client.MethodConfigurationRegistry() + executionConfigurations: GRPCClient.MethodConfigurationRegistry() ) return (client, server) diff --git a/Tests/GRPCCoreTests/Test Utilities/Transport/AnyTransport.swift b/Tests/GRPCCoreTests/Test Utilities/Transport/AnyTransport.swift index f1c9a5617..95bcb6778 100644 --- a/Tests/GRPCCoreTests/Test Utilities/Transport/AnyTransport.swift +++ b/Tests/GRPCCoreTests/Test Utilities/Transport/AnyTransport.swift @@ -28,7 +28,7 @@ struct AnyClientTransport: ClientTransport, Sendable { ) async throws -> Any private let _connect: @Sendable (Bool) async throws -> Void private let _close: @Sendable () -> Void - private let _configuration: @Sendable (MethodDescriptor) -> Client.MethodConfiguration? + private let _configuration: @Sendable (MethodDescriptor) -> GRPCClient.MethodConfiguration? init(wrapping transport: Transport) where Transport.Inbound == Inbound, Transport.Outbound == Outbound { @@ -74,7 +74,7 @@ struct AnyClientTransport: ClientTransport, Sendable { func executionConfiguration( forMethod descriptor: MethodDescriptor - ) -> Client.MethodConfiguration? { + ) -> GRPCClient.MethodConfiguration? { self._configuration(descriptor) } } diff --git a/Tests/GRPCCoreTests/Test Utilities/Transport/StreamCountingTransport.swift b/Tests/GRPCCoreTests/Test Utilities/Transport/StreamCountingTransport.swift index b6f12a255..8058567d4 100644 --- a/Tests/GRPCCoreTests/Test Utilities/Transport/StreamCountingTransport.swift +++ b/Tests/GRPCCoreTests/Test Utilities/Transport/StreamCountingTransport.swift @@ -68,7 +68,7 @@ struct StreamCountingClientTransport: ClientTransport, Sendable { func executionConfiguration( forMethod descriptor: MethodDescriptor - ) -> Client.MethodConfiguration? { + ) -> GRPCClient.MethodConfiguration? { self.transport.executionConfiguration(forMethod: descriptor) } } diff --git a/Tests/GRPCCoreTests/Test Utilities/Transport/ThrowingTransport.swift b/Tests/GRPCCoreTests/Test Utilities/Transport/ThrowingTransport.swift index d3e724099..f428ec06a 100644 --- a/Tests/GRPCCoreTests/Test Utilities/Transport/ThrowingTransport.swift +++ b/Tests/GRPCCoreTests/Test Utilities/Transport/ThrowingTransport.swift @@ -38,7 +38,7 @@ struct ThrowOnStreamCreationTransport: ClientTransport { func executionConfiguration( forMethod descriptor: MethodDescriptor - ) -> Client.MethodConfiguration? { + ) -> GRPCClient.MethodConfiguration? { return nil } diff --git a/Tests/GRPCCoreTests/Transport/InProcessClientTransportTests.swift b/Tests/GRPCCoreTests/Transport/InProcessClientTransportTests.swift index c5fc7013d..830f59973 100644 --- a/Tests/GRPCCoreTests/Transport/InProcessClientTransportTests.swift +++ b/Tests/GRPCCoreTests/Transport/InProcessClientTransportTests.swift @@ -182,13 +182,13 @@ final class InProcessClientTransportTests: XCTestCase { } func testExecutionConfiguration() { - let policy = Client.MethodConfiguration.HedgingPolicy( + let policy = GRPCClient.MethodConfiguration.HedgingPolicy( maximumAttempts: 10, hedgingDelay: .seconds(1), nonFatalStatusCodes: [] ) - let defaultConfiguration = Client.MethodConfiguration(hedgingPolicy: policy) - var configurations = Client.MethodConfigurationRegistry( + let defaultConfiguration = GRPCClient.MethodConfiguration(hedgingPolicy: policy) + var configurations = GRPCClient.MethodConfigurationRegistry( defaultConfiguration: defaultConfiguration ) @@ -197,14 +197,14 @@ final class InProcessClientTransportTests: XCTestCase { let firstDescriptor = MethodDescriptor(service: "test", method: "first") XCTAssertEqual(client.executionConfiguration(forMethod: firstDescriptor), defaultConfiguration) - let retryPolicy = Client.MethodConfiguration.RetryPolicy( + let retryPolicy = GRPCClient.MethodConfiguration.RetryPolicy( maximumAttempts: 10, initialBackoff: .seconds(1), maximumBackoff: .seconds(1), backoffMultiplier: 1.0, retryableStatusCodes: [.unavailable] ) - let overrideConfiguration = Client.MethodConfiguration(retryPolicy: retryPolicy) + let overrideConfiguration = GRPCClient.MethodConfiguration(retryPolicy: retryPolicy) configurations[firstDescriptor] = overrideConfiguration client = InProcessClientTransport(server: .init(), executionConfigurations: configurations) let secondDescriptor = MethodDescriptor(service: "test", method: "second") @@ -243,10 +243,10 @@ final class InProcessClientTransportTests: XCTestCase { } func makeClient( - configuration: Client.MethodConfiguration? = nil, + configuration: GRPCClient.MethodConfiguration? = nil, server: InProcessServerTransport = InProcessServerTransport() ) -> InProcessClientTransport { - let defaultPolicy = Client.MethodConfiguration.RetryPolicy( + let defaultPolicy = GRPCClient.MethodConfiguration.RetryPolicy( maximumAttempts: 10, initialBackoff: .seconds(1), maximumBackoff: .seconds(1), From b64dfcddb9ddeb61bc224c6f01b45879c127d66d Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Wed, 29 Nov 2023 15:02:25 +0000 Subject: [PATCH 3/5] Add tests --- Sources/GRPCCore/GRPCClient.swift | 9 + ...thodConfigurationRegistryTestsTests.swift} | 2 +- ...s.swift => MethodConfigurationTests.swift} | 2 +- Tests/GRPCCoreTests/GRPCClientTests.swift | 449 ++++++++++++++++++ Tests/GRPCCoreTests/GRPCServerTests.swift | 1 - .../Call/Client/ClientInterceptors.swift | 84 ++++ .../Call/Server/ServerInterceptors.swift | 3 +- .../Test Utilities/XCTest+Utilities.swift | 38 +- 8 files changed, 582 insertions(+), 6 deletions(-) rename Tests/GRPCCoreTests/Call/Client/{ClientRPCExecutionConfigurationCollectionTests.swift => MethodConfigurationRegistryTestsTests.swift} (98%) rename Tests/GRPCCoreTests/Call/Client/{ClientRPCExecutionConfigurationTests.swift => MethodConfigurationTests.swift} (96%) create mode 100644 Tests/GRPCCoreTests/GRPCClientTests.swift create mode 100644 Tests/GRPCCoreTests/Test Utilities/Call/Client/ClientInterceptors.swift diff --git a/Sources/GRPCCore/GRPCClient.swift b/Sources/GRPCCore/GRPCClient.swift index 1e72ba9e4..8ddb9cd08 100644 --- a/Sources/GRPCCore/GRPCClient.swift +++ b/Sources/GRPCCore/GRPCClient.swift @@ -430,6 +430,8 @@ public final class GRPCClient: Sendable { return transportConfiguration } + // If there is no configuration override for this method descriptor in this + // client, nor in the transport, then get the default from the client. return configurationOverrides[descriptor] } } @@ -454,6 +456,13 @@ extension GRPCClient { } } +@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) +extension GRPCClient.Interceptors: CustomStringConvertible { + public var description: String { + return String(describing: self.values.map { String(describing: type(of: $0)) }) + } +} + @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) extension GRPCClient { /// The execution policy for an RPC. diff --git a/Tests/GRPCCoreTests/Call/Client/ClientRPCExecutionConfigurationCollectionTests.swift b/Tests/GRPCCoreTests/Call/Client/MethodConfigurationRegistryTestsTests.swift similarity index 98% rename from Tests/GRPCCoreTests/Call/Client/ClientRPCExecutionConfigurationCollectionTests.swift rename to Tests/GRPCCoreTests/Call/Client/MethodConfigurationRegistryTestsTests.swift index 996aa638e..21b20915d 100644 --- a/Tests/GRPCCoreTests/Call/Client/ClientRPCExecutionConfigurationCollectionTests.swift +++ b/Tests/GRPCCoreTests/Call/Client/MethodConfigurationRegistryTestsTests.swift @@ -17,7 +17,7 @@ import GRPCCore import XCTest @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) -final class ClientRPCExecutionConfigurationCollectionTests: XCTestCase { +final class MethodConfigurationRegistryTests: XCTestCase { func testGetConfigurationForKnownMethod() async throws { let first = ContinuousClock.now let second = first.advanced(by: .seconds(1)) diff --git a/Tests/GRPCCoreTests/Call/Client/ClientRPCExecutionConfigurationTests.swift b/Tests/GRPCCoreTests/Call/Client/MethodConfigurationTests.swift similarity index 96% rename from Tests/GRPCCoreTests/Call/Client/ClientRPCExecutionConfigurationTests.swift rename to Tests/GRPCCoreTests/Call/Client/MethodConfigurationTests.swift index c635b2f05..aaa5c2a76 100644 --- a/Tests/GRPCCoreTests/Call/Client/ClientRPCExecutionConfigurationTests.swift +++ b/Tests/GRPCCoreTests/Call/Client/MethodConfigurationTests.swift @@ -17,7 +17,7 @@ import GRPCCore import XCTest @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) -final class ClientRPCExecutionConfigurationTests: XCTestCase { +final class MethodConfigurationTests: XCTestCase { func testRetryPolicyClampsMaxAttempts() { var policy = GRPCClient.MethodConfiguration.RetryPolicy( maximumAttempts: 10, diff --git a/Tests/GRPCCoreTests/GRPCClientTests.swift b/Tests/GRPCCoreTests/GRPCClientTests.swift new file mode 100644 index 000000000..1488638e2 --- /dev/null +++ b/Tests/GRPCCoreTests/GRPCClientTests.swift @@ -0,0 +1,449 @@ +/* + * Copyright 2023, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import Atomics +import GRPCCore +import XCTest + +@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) +final class GRPCClientTests: XCTestCase { + func makeInProcessPair() -> (client: InProcessClientTransport, server: InProcessServerTransport) { + let server = InProcessServerTransport() + let client = InProcessClientTransport( + server: server, + executionConfigurations: GRPCClient.MethodConfigurationRegistry() + ) + + return (client, server) + } + + func withInProcessConnectedClient( + services: [any RegistrableRPCService], + interceptors: [any ClientInterceptor] = [], + _ body: (GRPCClient, GRPCServer) async throws -> Void + ) async throws { + let inProcess = self.makeInProcessPair() + let client = GRPCClient(transport: inProcess.client) + let server = GRPCServer() + server.transports.add(inProcess.server) + + for service in services { + server.services.register(service) + } + + for interceptor in interceptors { + client.interceptors.add(interceptor) + } + + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await server.run() + } + + group.addTask { + try await client.run() + } + + // Make sure both server and client are running + try await Task.sleep(for: .milliseconds(100)) + try await body(client, server) + client.close() + server.stopListening() + } + } + + struct IdentitySerializer: MessageSerializer { + typealias Message = [UInt8] + + func serialize(_ message: [UInt8]) throws -> [UInt8] { + return message + } + } + + struct IdentityDeserializer: MessageDeserializer { + typealias Message = [UInt8] + + func deserialize(_ serializedMessageBytes: [UInt8]) throws -> [UInt8] { + return serializedMessageBytes + } + } + + func testUnary() async throws { + try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in + try await client.unary( + request: .init(message: [3, 1, 4, 1, 5]), + descriptor: BinaryEcho.Methods.collect, + serializer: IdentitySerializer(), + deserializer: IdentityDeserializer() + ) { response in + let message = try response.message + XCTAssertEqual(message, [3, 1, 4, 1, 5]) + } + } + } + + func testClientStreaming() async throws { + try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in + try await client.clientStreaming( + request: .init(producer: { writer in + for byte in [3, 1, 4, 1, 5] as [UInt8] { + try await writer.write([byte]) + } + }), + descriptor: BinaryEcho.Methods.collect, + serializer: IdentitySerializer(), + deserializer: IdentityDeserializer() + ) { response in + let message = try response.message + XCTAssertEqual(message, [3, 1, 4, 1, 5]) + } + } + } + + func testServerStreaming() async throws { + try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in + try await client.serverStreaming( + request: .init(message: [3, 1, 4, 1, 5]), + descriptor: BinaryEcho.Methods.expand, + serializer: IdentitySerializer(), + deserializer: IdentityDeserializer() + ) { response in + var responseParts = response.messages.makeAsyncIterator() + for byte in [3, 1, 4, 1, 5] as [UInt8] { + let message = try await responseParts.next() + XCTAssertEqual(message, [byte]) + } + } + } + } + + func testBidirectionalStreaming() async throws { + try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in + try await client.bidirectionalStreaming( + request: .init(producer: { writer in + for byte in [3, 1, 4, 1, 5] as [UInt8] { + try await writer.write([byte]) + } + }), + descriptor: BinaryEcho.Methods.update, + serializer: IdentitySerializer(), + deserializer: IdentityDeserializer() + ) { response in + var responseParts = response.messages.makeAsyncIterator() + for byte in [3, 1, 4, 1, 5] as [UInt8] { + let message = try await responseParts.next() + XCTAssertEqual(message, [byte]) + } + } + } + } + + func testUnimplementedMethod_Unary() async throws { + try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in + try await client.unary( + request: .init(message: [3, 1, 4, 1, 5]), + descriptor: MethodDescriptor(service: "not", method: "implemented"), + serializer: IdentitySerializer(), + deserializer: IdentityDeserializer() + ) { response in + XCTAssertThrowsRPCError(try response.accepted.get()) { error in + XCTAssertEqual(error.code, .unimplemented) + } + } + } + } + + func testUnimplementedMethod_ClientStreaming() async throws { + try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in + try await client.clientStreaming( + request: .init(producer: { writer in + for byte in [3, 1, 4, 1, 5] as [UInt8] { + try await writer.write([byte]) + } + }), + descriptor: MethodDescriptor(service: "not", method: "implemented"), + serializer: IdentitySerializer(), + deserializer: IdentityDeserializer() + ) { response in + XCTAssertThrowsRPCError(try response.accepted.get()) { error in + XCTAssertEqual(error.code, .unimplemented) + } + } + } + } + + func testUnimplementedMethod_ServerStreaming() async throws { + try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in + try await client.serverStreaming( + request: .init(message: [3, 1, 4, 1, 5]), + descriptor: MethodDescriptor(service: "not", method: "implemented"), + serializer: IdentitySerializer(), + deserializer: IdentityDeserializer() + ) { response in + XCTAssertThrowsRPCError(try response.accepted.get()) { error in + XCTAssertEqual(error.code, .unimplemented) + } + } + } + } + + func testUnimplementedMethod_BidirectionalStreaming() async throws { + try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in + try await client.bidirectionalStreaming( + request: .init(producer: { writer in + for byte in [3, 1, 4, 1, 5] as [UInt8] { + try await writer.write([byte]) + } + }), + descriptor: MethodDescriptor(service: "not", method: "implemented"), + serializer: IdentitySerializer(), + deserializer: IdentityDeserializer() + ) { response in + XCTAssertThrowsRPCError(try response.accepted.get()) { error in + XCTAssertEqual(error.code, .unimplemented) + } + } + } + } + + func testMultipleConcurrentRequests() async throws { + try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in + await withThrowingTaskGroup(of: Void.self) { group in + for i in UInt8.min ..< UInt8.max { + group.addTask { + try await client.unary( + request: .init(message: [i]), + descriptor: BinaryEcho.Methods.collect, + serializer: IdentitySerializer(), + deserializer: IdentityDeserializer() + ) { response in + let message = try response.message + XCTAssertEqual(message, [i]) + } + } + } + } + } + } + + func testInterceptorsAreAppliedInOrder() async throws { + let counter1 = ManagedAtomic(0) + let counter2 = ManagedAtomic(0) + + try await self.withInProcessConnectedClient( + services: [BinaryEcho()], + interceptors: [ + .requestCounter(counter1), + .rejectAll(with: RPCError(code: .unavailable, message: "")), + .requestCounter(counter2), + ] + ) { client, _ in + try await client.unary( + request: .init(message: [3, 1, 4, 1, 5]), + descriptor: BinaryEcho.Methods.collect, + serializer: IdentitySerializer(), + deserializer: IdentityDeserializer() + ) { response in + XCTAssertRejected(response) { error in + XCTAssertEqual(error.code, .unavailable) + } + } + } + + XCTAssertEqual(counter1.load(ordering: .sequentiallyConsistent), 1) + XCTAssertEqual(counter2.load(ordering: .sequentiallyConsistent), 0) + } + + func testNoNewRPCsAfterClientClose() async throws { + try await withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in + // Run an RPC so we know the client is running properly. + try await client.unary( + request: .init(message: [3, 1, 4, 1, 5]), + descriptor: BinaryEcho.Methods.collect, + serializer: IdentitySerializer(), + deserializer: IdentityDeserializer() + ) { response in + let message = try response.message + XCTAssertEqual(message, [3, 1, 4, 1, 5]) + } + + // New RPCs should fail immediately after this. + client.close() + + // RPC should fail now. + await XCTAssertThrowsErrorAsync(ofType: ClientError.self) { + try await client.unary( + request: .init(message: [3, 1, 4, 1, 5]), + descriptor: BinaryEcho.Methods.collect, + serializer: IdentitySerializer(), + deserializer: IdentityDeserializer() + ) { _ in } + } errorHandler: { error in + XCTAssertEqual(error.code, .clientIsStopped) + } + } + } + + func testInFlightRPCsCanContinueAfterClientIsClosed() async throws { + try await withInProcessConnectedClient(services: [BinaryEcho()]) { client, server in + try await client.clientStreaming( + request: .init(producer: { writer in + + // Close the client once this RCP has been started. + client.close() + + // Attempts to start a new RPC should fail. + await XCTAssertThrowsErrorAsync(ofType: ClientError.self) { + try await client.unary( + request: .init(message: [3, 1, 4, 1, 5]), + descriptor: BinaryEcho.Methods.collect, + serializer: IdentitySerializer(), + deserializer: IdentityDeserializer() + ) { _ in } + } errorHandler: { error in + XCTAssertEqual(error.code, .clientIsStopped) + } + + // Now write to the already opened stream to confirm that opened streams + // can successfully run to completion. + for byte in [3, 1, 4, 1, 5] as [UInt8] { + try await writer.write([byte]) + } + }), + descriptor: BinaryEcho.Methods.collect, + serializer: IdentitySerializer(), + deserializer: IdentityDeserializer() + ) { response in + let message = try response.message + XCTAssertEqual(message, [3, 1, 4, 1, 5]) + } + } + } + + func testCancelRunningClient() async throws { + let inProcess = self.makeInProcessPair() + let client = GRPCClient(transport: inProcess.client) + + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + let server = GRPCServer() + server.services.register(BinaryEcho()) + server.transports.add(inProcess.server) + try await server.run() + } + + group.addTask { + try await client.run() + } + + // Wait for client and server to be running. + try await Task.sleep(for: .milliseconds(10)) + + let task = Task { + try await client.clientStreaming( + request: .init(producer: { writer in + try await Task.sleep(for: .seconds(5)) + }), + descriptor: BinaryEcho.Methods.collect, + serializer: IdentitySerializer(), + deserializer: IdentityDeserializer() + ) { response in + XCTAssertRejected(response) { error in + XCTAssertEqual(error.code, .unknown) + } + } + } + + // Check requests are getting through. + try await client.unary( + request: .init(message: [3, 1, 4, 1, 5]), + descriptor: BinaryEcho.Methods.collect, + serializer: IdentitySerializer(), + deserializer: IdentityDeserializer() + ) { response in + let message = try response.message + XCTAssertEqual(message, [3, 1, 4, 1, 5]) + } + + task.cancel() + try await task.value + group.cancelAll() + } + } + + func testRunStoppedClient() async throws { + let (clientTransport, _) = self.makeInProcessPair() + let client = GRPCClient(transport: clientTransport) + // Run the client. + let task = Task { try await client.run() } + task.cancel() + try await task.value + + // Client is stopped, should throw an error. + await XCTAssertThrowsErrorAsync(ofType: ClientError.self) { + try await client.run() + } errorHandler: { error in + XCTAssertEqual(error.code, .clientIsStopped) + } + } + + func testRunAlreadyRunningClient() async throws { + let (clientTransport, _) = self.makeInProcessPair() + let client = GRPCClient(transport: clientTransport) + // Run the client. + let task = Task { try await client.run() } + // Make sure the client is run for the first time here. + try await Task.sleep(for: .milliseconds(10)) + + // Client is already running, should throw an error. + await XCTAssertThrowsErrorAsync(ofType: ClientError.self) { + try await client.run() + } errorHandler: { error in + XCTAssertEqual(error.code, .clientIsAlreadyRunning) + } + + task.cancel() + } + + func testRunClientNotRunning() async throws { + let (clientTransport, _) = self.makeInProcessPair() + let client = GRPCClient(transport: clientTransport) + + // Client is not running, should throw an error. + await XCTAssertThrowsErrorAsync(ofType: ClientError.self) { + try await client.unary( + request: .init(message: [3, 1, 4, 1, 5]), + descriptor: BinaryEcho.Methods.collect, + serializer: IdentitySerializer(), + deserializer: IdentityDeserializer() + ) { response in + let message = try response.message + XCTAssertEqual(message, [3, 1, 4, 1, 5]) + } + } errorHandler: { error in + XCTAssertEqual(error.code, .clientIsNotRunning) + } + } + + func testInterceptorsDescription() async throws { + let (clientTransport, _) = self.makeInProcessPair() + let client = GRPCClient(transport: clientTransport) + client.interceptors.add(.rejectAll(with: .init(code: .aborted, message: ""))) + client.interceptors.add(.requestCounter(.init(0))) + let description = String(describing: client.interceptors) + let expected = #"["RejectAllClientInterceptor", "RequestCountingClientInterceptor"]"# + XCTAssertEqual(description, expected) + } +} diff --git a/Tests/GRPCCoreTests/GRPCServerTests.swift b/Tests/GRPCCoreTests/GRPCServerTests.swift index b75f22d7c..f64b5fb2c 100644 --- a/Tests/GRPCCoreTests/GRPCServerTests.swift +++ b/Tests/GRPCCoreTests/GRPCServerTests.swift @@ -59,7 +59,6 @@ final class GRPCServerTests: XCTestCase { inProcess.client.close() server.stopListening() } - } func testServerHandlesUnary() async throws { diff --git a/Tests/GRPCCoreTests/Test Utilities/Call/Client/ClientInterceptors.swift b/Tests/GRPCCoreTests/Test Utilities/Call/Client/ClientInterceptors.swift new file mode 100644 index 000000000..e89681d58 --- /dev/null +++ b/Tests/GRPCCoreTests/Test Utilities/Call/Client/ClientInterceptors.swift @@ -0,0 +1,84 @@ +/* + * Copyright 2023, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import Atomics +import GRPCCore + +extension ClientInterceptor where Self == RejectAllClientInterceptor { + static func rejectAll(with error: RPCError) -> Self { + return RejectAllClientInterceptor(error: error, throw: false) + } + + static func throwError(_ error: RPCError) -> Self { + return RejectAllClientInterceptor(error: error, throw: true) + } + +} + +extension ClientInterceptor where Self == RequestCountingClientInterceptor { + static func requestCounter(_ counter: ManagedAtomic) -> Self { + return RequestCountingClientInterceptor(counter: counter) + } +} + +/// Rejects all RPCs with the provided error. +struct RejectAllClientInterceptor: ClientInterceptor { + /// The error to reject all RPCs with. + let error: RPCError + /// Whether the error should be thrown. If `false` then the request is rejected with the error + /// instead. + let `throw`: Bool + + init(error: RPCError, throw: Bool = false) { + self.error = error + self.`throw` = `throw` + } + + func intercept( + request: ClientRequest.Stream, + context: ClientInterceptorContext, + next: @Sendable ( + ClientRequest.Stream, + ClientInterceptorContext + ) async throws -> ClientResponse.Stream + ) async throws -> ClientResponse.Stream { + if self.throw { + throw self.error + } else { + return ClientResponse.Stream(error: self.error) + } + } +} + +struct RequestCountingClientInterceptor: ClientInterceptor { + /// The number of requests made. + let counter: ManagedAtomic + + init(counter: ManagedAtomic) { + self.counter = counter + } + + func intercept( + request: ClientRequest.Stream, + context: ClientInterceptorContext, + next: @Sendable ( + ClientRequest.Stream, + ClientInterceptorContext + ) async throws -> ClientResponse.Stream + ) async throws -> ClientResponse.Stream { + self.counter.wrappingIncrement(ordering: .sequentiallyConsistent) + return try await next(request, context) + } +} diff --git a/Tests/GRPCCoreTests/Test Utilities/Call/Server/ServerInterceptors.swift b/Tests/GRPCCoreTests/Test Utilities/Call/Server/ServerInterceptors.swift index 266648768..02d4061a6 100644 --- a/Tests/GRPCCoreTests/Test Utilities/Call/Server/ServerInterceptors.swift +++ b/Tests/GRPCCoreTests/Test Utilities/Call/Server/ServerInterceptors.swift @@ -24,7 +24,6 @@ extension ServerInterceptor where Self == RejectAllServerInterceptor { static func throwError(_ error: RPCError) -> Self { return RejectAllServerInterceptor(error: error, throw: true) } - } extension ServerInterceptor where Self == RequestCountingServerInterceptor { @@ -63,7 +62,7 @@ struct RejectAllServerInterceptor: ServerInterceptor { } struct RequestCountingServerInterceptor: ServerInterceptor { - /// The error to reject all RPCs with. + /// The number of requests made. let counter: ManagedAtomic init(counter: ManagedAtomic) { diff --git a/Tests/GRPCCoreTests/Test Utilities/XCTest+Utilities.swift b/Tests/GRPCCoreTests/Test Utilities/XCTest+Utilities.swift index 7bc88edef..71ca6dd7a 100644 --- a/Tests/GRPCCoreTests/Test Utilities/XCTest+Utilities.swift +++ b/Tests/GRPCCoreTests/Test Utilities/XCTest+Utilities.swift @@ -104,6 +104,18 @@ func XCTAssertRejected( } } +func XCTAssertRejected( + _ response: ClientResponse.Single, + errorHandler: (RPCError) -> Void +) { + switch response.accepted { + case .success: + XCTFail("Expected RPC to be rejected") + case .failure(let error): + errorHandler(error) + } +} + func XCTAssertMetadata( _ part: RPCResponsePart?, metadataHandler: (Metadata) -> Void = { _ in } @@ -116,6 +128,18 @@ func XCTAssertMetadata( } } +func XCTAssertMetadata( + _ part: RPCRequestPart?, + metadataHandler: (Metadata) async throws -> Void = { _ in } +) async throws { + switch part { + case .some(.metadata(let metadata)): + try await metadataHandler(metadata) + default: + XCTFail("Expected '.metadata' but found '\(String(describing: part))'") + } +} + func XCTAssertMessage( _ part: RPCResponsePart?, messageHandler: ([UInt8]) -> Void = { _ in } @@ -124,7 +148,19 @@ func XCTAssertMessage( case .some(.message(let message)): messageHandler(message) default: - XCTFail("Expected '.metadata' but found '\(String(describing: part))'") + XCTFail("Expected '.message' but found '\(String(describing: part))'") + } +} + +func XCTAssertMessage( + _ part: RPCRequestPart?, + messageHandler: ([UInt8]) async throws -> Void = { _ in } +) async throws { + switch part { + case .some(.message(let message)): + try await messageHandler(message) + default: + XCTFail("Expected '.message' but found '\(String(describing: part))'") } } From f568af333de0b8276aafa7cc4d43e902b99364ae Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Wed, 29 Nov 2023 16:02:17 +0000 Subject: [PATCH 4/5] PR changes --- Sources/GRPCCore/ClientError.swift | 6 + Sources/GRPCCore/GRPCClient.swift | 141 +++++------------- Sources/GRPCCore/GRPCServer.swift | 18 ++- ...ethodConfigurationRegistryTestsTests.swift | 5 - 4 files changed, 62 insertions(+), 108 deletions(-) diff --git a/Sources/GRPCCore/ClientError.swift b/Sources/GRPCCore/ClientError.swift index 3b86235cb..77ad48719 100644 --- a/Sources/GRPCCore/ClientError.swift +++ b/Sources/GRPCCore/ClientError.swift @@ -111,6 +111,7 @@ extension ClientError { case clientIsAlreadyRunning case clientIsNotRunning case clientIsStopped + case transportError } private var value: Value @@ -132,6 +133,11 @@ extension ClientError { public static var clientIsStopped: Self { Self(.clientIsStopped) } + + /// The transport threw an error whilst connected. + public static var transportError: Self { + Self(.transportError) + } } } diff --git a/Sources/GRPCCore/GRPCClient.swift b/Sources/GRPCCore/GRPCClient.swift index 8ddb9cd08..dc41384fd 100644 --- a/Sources/GRPCCore/GRPCClient.swift +++ b/Sources/GRPCCore/GRPCClient.swift @@ -34,8 +34,9 @@ /// /// ```swift /// // Create and add an in-process transport. -/// let inProcessTransport = InProcessClientTransport() -/// let client = GRPCClient(transport: inProcessTransport) +/// let inProcessServerTransport = InProcessServerTransport() +/// let inProcessClientTransport = InProcessClientTransport(serverTransport: inProcessServerTransport) +/// let client = GRPCClient(transport: inProcessClientTransport) /// /// // Create and add some interceptors. /// client.interceptors.add(StatsRecordingServerInterceptors()) @@ -56,7 +57,22 @@ /// /// ```swift /// // Start running the client. -/// try await client.run() +/// // Since it's a long-running task, we do it in a task group so it runs in +/// // the background. +/// try await withThrowingTaskGroup(of: Void.self) { group in +/// group.addTask { +/// try await client.run() +/// } +/// +/// try await client.unary( +/// request: .init(message: "Hello!"), +/// descriptor: MethodDescriptor(service: "service", method: "method"), +/// serializer: ..., +/// deserializer: ... +/// ) { response in +/// // Do something with the response +/// } +/// } /// ``` /// /// The ``run()`` method won't return until the client has finished handling all requests. You can @@ -133,9 +149,10 @@ public final class GRPCClient: Sendable { private let storage: LockedValueBox /// The transport which provides a bidirectional communication channel with the server. - private let transport: ClientTransport + private let transport: any ClientTransport - /// Creates a new client with no resources. + /// Creates a new client with no resources (i.e., with no ``Interceptors-swift.struct`` and no + /// ``MethodDescriptor``s). /// /// You can add resources to the client via ``interceptors-swift.property`` and /// ``methodConfigurationOverrides-swift.property``, and start the client by calling ``run()``. @@ -143,7 +160,7 @@ public final class GRPCClient: Sendable { /// - Note: Any changes to resources after ``run()`` has been called will be ignored. /// /// - Parameter transport: The ``ClientTransport`` to be used for this ``GRPCClient``. - public init(transport: ClientTransport) { + public init(transport: any ClientTransport) { self.storage = LockedValueBox(Storage()) self.transport = transport } @@ -177,11 +194,14 @@ public final class GRPCClient: Sendable { self.storage.withLockedValue { $0.state = .stopped } } - try await withThrowingTaskGroup(of: Void.self) { group in - group.addTask { - try await self.transport.connect(lazily: false) - } - try await group.next() + do { + try await self.transport.connect(lazily: false) + } catch { + throw ClientError( + code: .transportError, + message: "The transport threw an error while connected.", + cause: error + ) } } @@ -205,7 +225,7 @@ public final class GRPCClient: Sendable { self.transport.close() } - /// Start a unary RPC. + /// Executes a unary RPC. /// /// - Parameters: /// - request: The unary request. @@ -222,41 +242,14 @@ public final class GRPCClient: Sendable { deserializer: some MessageDeserializer, handler: @Sendable @escaping (ClientResponse.Single) async throws -> ReturnValue ) async throws -> ReturnValue { - let (configurationOverrides, interceptors) = try self.storage.withLockedValue { storage in - switch storage.state { - case .running: - return (storage.methodConfigurationOverrides, storage.interceptors.values) - case .notStarted: - throw ClientError( - code: .clientIsNotRunning, - message: "Client must be running to make an RPC: call run() first." - ) - case .stopping, .stopped: - throw ClientError( - code: .clientIsStopped, - message: "Client has been stopped. Can't make any more RPCs." - ) - } - } - - let applicableConfiguration = self.resolveMethodConfiguration( - descriptor: descriptor, - clientConfigurations: configurationOverrides - ) - - return try await ClientRPCExecutor.execute( + try await bidirectionalStreaming( request: ClientRequest.Stream(single: request), - method: descriptor, - configuration: applicableConfiguration, + descriptor: descriptor, serializer: serializer, - deserializer: deserializer, - transport: self.transport, - interceptors: interceptors, - handler: { stream in + deserializer: deserializer) { stream in let singleResponse = await ClientResponse.Single(stream: stream) return try await handler(singleResponse) } - ) } /// Start a client-streaming RPC. @@ -276,41 +269,14 @@ public final class GRPCClient: Sendable { deserializer: some MessageDeserializer, handler: @Sendable @escaping (ClientResponse.Single) async throws -> ReturnValue ) async throws -> ReturnValue { - let (configurationOverrides, interceptors) = try self.storage.withLockedValue { storage in - switch storage.state { - case .running: - return (storage.methodConfigurationOverrides, storage.interceptors.values) - case .notStarted: - throw ClientError( - code: .clientIsNotRunning, - message: "Client must be running to make an RPC: call run() first." - ) - case .stopping, .stopped: - throw ClientError( - code: .clientIsStopped, - message: "Client has been stopped. Can't make any more RPCs." - ) - } - } - - let applicableConfiguration = self.resolveMethodConfiguration( - descriptor: descriptor, - clientConfigurations: configurationOverrides - ) - - return try await ClientRPCExecutor.execute( + try await bidirectionalStreaming( request: request, - method: descriptor, - configuration: applicableConfiguration, + descriptor: descriptor, serializer: serializer, - deserializer: deserializer, - transport: transport, - interceptors: interceptors, - handler: { stream in + deserializer: deserializer) { stream in let singleResponse = await ClientResponse.Single(stream: stream) return try await handler(singleResponse) } - ) } /// Start a server-streaming RPC. @@ -330,36 +296,11 @@ public final class GRPCClient: Sendable { deserializer: some MessageDeserializer, handler: @Sendable @escaping (ClientResponse.Stream) async throws -> ReturnValue ) async throws -> ReturnValue { - let (configurationOverrides, interceptors) = try self.storage.withLockedValue { storage in - switch storage.state { - case .running: - return (storage.methodConfigurationOverrides, storage.interceptors.values) - case .notStarted: - throw ClientError( - code: .clientIsNotRunning, - message: "Client must be running to make an RPC: call run() first." - ) - case .stopping, .stopped: - throw ClientError( - code: .clientIsStopped, - message: "Client has been stopped. Can't make any more RPCs." - ) - } - } - - let applicableConfiguration = self.resolveMethodConfiguration( - descriptor: descriptor, - clientConfigurations: configurationOverrides - ) - - return try await ClientRPCExecutor.execute( + try await bidirectionalStreaming( request: ClientRequest.Stream(single: request), - method: descriptor, - configuration: applicableConfiguration, + descriptor: descriptor, serializer: serializer, deserializer: deserializer, - transport: transport, - interceptors: interceptors, handler: handler ) } @@ -442,7 +383,7 @@ extension GRPCClient { public struct Interceptors: Sendable { private(set) var values: [any ClientInterceptor] = [] - /// Add an interceptor to the server. + /// Add an interceptor to the client. /// /// The order in which interceptors are added reflects the order in which they are called. The /// first interceptor added will be the first interceptor to intercept each request. The last diff --git a/Sources/GRPCCore/GRPCServer.swift b/Sources/GRPCCore/GRPCServer.swift index 626816f9b..1388f0d28 100644 --- a/Sources/GRPCCore/GRPCServer.swift +++ b/Sources/GRPCCore/GRPCServer.swift @@ -76,7 +76,11 @@ public final class GRPCServer: Sendable { self.storage.withLockedValue { $0.transports } } set { - self.storage.withLockedValue { $0.transports = newValue } + self.storage.withLockedValue { storage in + if case .notStarted = storage.state { + storage.transports = newValue + } + } } } @@ -86,7 +90,11 @@ public final class GRPCServer: Sendable { self.storage.withLockedValue { $0.services } } set { - self.storage.withLockedValue { $0.services = newValue } + self.storage.withLockedValue { storage in + if case .notStarted = storage.state { + storage.services = newValue + } + } } } @@ -101,7 +109,11 @@ public final class GRPCServer: Sendable { self.storage.withLockedValue { $0.interceptors } } set { - self.storage.withLockedValue { $0.interceptors = newValue } + self.storage.withLockedValue { storage in + if case .notStarted = storage.state { + storage.interceptors = newValue + } + } } } diff --git a/Tests/GRPCCoreTests/Call/Client/MethodConfigurationRegistryTestsTests.swift b/Tests/GRPCCoreTests/Call/Client/MethodConfigurationRegistryTestsTests.swift index 21b20915d..e1ffb7431 100644 --- a/Tests/GRPCCoreTests/Call/Client/MethodConfigurationRegistryTestsTests.swift +++ b/Tests/GRPCCoreTests/Call/Client/MethodConfigurationRegistryTestsTests.swift @@ -19,11 +19,6 @@ import XCTest @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) final class MethodConfigurationRegistryTests: XCTestCase { func testGetConfigurationForKnownMethod() async throws { - let first = ContinuousClock.now - let second = first.advanced(by: .seconds(1)) - let result = second.duration(to: first) - print(result.components) - let policy = GRPCClient.MethodConfiguration.HedgingPolicy( maximumAttempts: 10, hedgingDelay: .seconds(1), From 33f3c702994c1a14aa88286f4c2d426ea5a27c35 Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Wed, 29 Nov 2023 16:45:46 +0000 Subject: [PATCH 5/5] Change method config to have defaults and overrides --- Sources/GRPCCore/GRPCClient.swift | 171 +++++++++++------- ...ientRPCExecutorTestHarness+Transport.swift | 10 +- ...ethodConfigurationRegistryTestsTests.swift | 15 +- .../InProcessClientTransportTests.swift | 13 +- 4 files changed, 115 insertions(+), 94 deletions(-) diff --git a/Sources/GRPCCore/GRPCClient.swift b/Sources/GRPCCore/GRPCClient.swift index dc41384fd..515e29978 100644 --- a/Sources/GRPCCore/GRPCClient.swift +++ b/Sources/GRPCCore/GRPCClient.swift @@ -46,8 +46,16 @@ /// executionPolicy: ..., /// timeout: ... /// ) -/// let registry = MethodConfigurationRegistry(defaultConfiguration: defaultConfiguration) +/// var registry = MethodConfigurationRegistry() +/// registry.setOverallDefaultConfiguration(defaultConfiguration) +/// +/// // Set as configuration overrides to make the client-side configuration +/// // take precedence over the transport-defined configurations. /// client.methodConfigurationOverrides = registry +/// +/// // Or set as configuration defaults to make the transport-defined configurations +/// // take precendece over these. +/// client.methodConfigurationDefaults = registry /// ``` /// /// ## Starting and stopping the client @@ -119,6 +127,24 @@ public final class GRPCClient: Sendable { } } + /// A ``MethodConfigurationRegistry`` containing ``MethodConfiguration``s for calls + /// made from this ``Client``. + /// + /// - Note: These configurations will be used if there were no client overrides set in ``methodConfigurationOverrides`` + /// and no configuration set in the transport. + public var methodConfigurationDefaults: MethodConfigurationRegistry { + get { + self.storage.withLockedValue { $0.methodConfigurationDefaults } + } + set { + self.storage.withLockedValue { storage in + if case .notStarted = storage.state { + storage.methodConfigurationDefaults = newValue + } + } + } + } + /// The state of the client. private enum State { /// The client hasn't been started yet. Can transition to `running` or `stopped`. @@ -135,13 +161,23 @@ public final class GRPCClient: Sendable { /// Underlying storage for the client. private struct Storage { + var state: State + + /// Client interceptors. var interceptors: Interceptors + + /// A ``MethodConfigurationRegistry`` containing configuration overrides. + /// These will take precedence over the transport-defined configuration. var methodConfigurationOverrides: MethodConfigurationRegistry - var state: State + + /// A ``MethodConfigurationRegistry`` containing configuration defaults. + /// These will be used if the transport didn't define any configuration. + var methodConfigurationDefaults: MethodConfigurationRegistry init() { self.interceptors = Interceptors() self.methodConfigurationOverrides = MethodConfigurationRegistry() + self.methodConfigurationDefaults = MethodConfigurationRegistry() self.state = .notStarted } } @@ -154,8 +190,9 @@ public final class GRPCClient: Sendable { /// Creates a new client with no resources (i.e., with no ``Interceptors-swift.struct`` and no /// ``MethodDescriptor``s). /// - /// You can add resources to the client via ``interceptors-swift.property`` and - /// ``methodConfigurationOverrides-swift.property``, and start the client by calling ``run()``. + /// You can add resources to the client via ``interceptors-swift.property``, + /// ``methodConfigurationOverrides-swift.property``, and ``methodConfigurationDefaults-swift.property``, + /// and start the client by calling ``run()``. /// /// - Note: Any changes to resources after ``run()`` has been called will be ignored. /// @@ -246,10 +283,11 @@ public final class GRPCClient: Sendable { request: ClientRequest.Stream(single: request), descriptor: descriptor, serializer: serializer, - deserializer: deserializer) { stream in - let singleResponse = await ClientResponse.Single(stream: stream) - return try await handler(singleResponse) - } + deserializer: deserializer + ) { stream in + let singleResponse = await ClientResponse.Single(stream: stream) + return try await handler(singleResponse) + } } /// Start a client-streaming RPC. @@ -273,10 +311,11 @@ public final class GRPCClient: Sendable { request: request, descriptor: descriptor, serializer: serializer, - deserializer: deserializer) { stream in - let singleResponse = await ClientResponse.Single(stream: stream) - return try await handler(singleResponse) - } + deserializer: deserializer + ) { stream in + let singleResponse = await ClientResponse.Single(stream: stream) + return try await handler(singleResponse) + } } /// Start a server-streaming RPC. @@ -325,26 +364,32 @@ public final class GRPCClient: Sendable { deserializer: some MessageDeserializer, handler: @Sendable @escaping (ClientResponse.Stream) async throws -> ReturnValue ) async throws -> ReturnValue { - let (configurationOverrides, interceptors) = try self.storage.withLockedValue { storage in - switch storage.state { - case .running: - return (storage.methodConfigurationOverrides, storage.interceptors.values) - case .notStarted: - throw ClientError( - code: .clientIsNotRunning, - message: "Client must be running to make an RPC: call run() first." - ) - case .stopping, .stopped: - throw ClientError( - code: .clientIsStopped, - message: "Client has been stopped. Can't make any more RPCs." - ) + let (configurationOverrides, configurationDefaults, interceptors) = try self.storage + .withLockedValue { storage in + switch storage.state { + case .running: + return ( + storage.methodConfigurationOverrides, + storage.methodConfigurationDefaults, + storage.interceptors.values + ) + case .notStarted: + throw ClientError( + code: .clientIsNotRunning, + message: "Client must be running to make an RPC: call run() first." + ) + case .stopping, .stopped: + throw ClientError( + code: .clientIsStopped, + message: "Client has been stopped. Can't make any more RPCs." + ) + } } - } let applicableConfiguration = self.resolveMethodConfiguration( descriptor: descriptor, - clientConfigurations: configurationOverrides + configurationOverrides: configurationOverrides, + configurationDefaults: configurationDefaults ) return try await ClientRPCExecutor.execute( @@ -361,9 +406,10 @@ public final class GRPCClient: Sendable { private func resolveMethodConfiguration( descriptor: MethodDescriptor, - clientConfigurations configurationOverrides: MethodConfigurationRegistry + configurationOverrides: MethodConfigurationRegistry, + configurationDefaults: MethodConfigurationRegistry ) -> MethodConfiguration { - if let clientOverride = configurationOverrides[descriptor, useDefault: false] { + if let clientOverride = configurationOverrides[descriptor] { return clientOverride } @@ -373,7 +419,12 @@ public final class GRPCClient: Sendable { // If there is no configuration override for this method descriptor in this // client, nor in the transport, then get the default from the client. - return configurationOverrides[descriptor] + // If no default has been specified, then fall back to an empty confiuration. + return configurationDefaults[descriptor] + ?? MethodConfiguration( + executionPolicy: nil, + timeout: nil + ) } } @@ -672,37 +723,29 @@ extension GRPCClient.MethodConfiguration { extension GRPCClient { /// A collection of ``ClientRPCExecutionConfiguration``s, mapped to specific methods or services. /// - /// When creating a new instance, you must provide a default configuration to be used when getting + /// When creating a new instance, no overrides and no default will be set for using when getting /// a configuration for a method that has not been given a specific override. /// Use ``setDefaultConfiguration(_:forService:)`` to set a specific override for a whole - /// service. + /// service, or set a default configuration for all methods by calling ``setOverallDefaultConfiguration(_:)``. /// - /// Use the subscript to get and set configurations for methods. + /// Use the subscript to get and set configurations for specific methods. public struct MethodConfigurationRegistry: Sendable, Hashable { private var elements: [MethodDescriptor: MethodConfiguration] - private let defaultConfiguration: MethodConfiguration + private var defaultConfiguration: MethodConfiguration? - public init( - defaultConfiguration: MethodConfiguration = MethodConfiguration( - executionPolicy: nil, - timeout: nil - ) - ) { + /// Create a new ``MethodConfigurationRegistry`` with no overrides and no default configuration. + public init() { self.elements = [:] - self.defaultConfiguration = defaultConfiguration } - /// Get the corresponding ``MethodConfiguration`` for the given ``MethodDescriptor``. + /// Get or set the corresponding ``MethodConfiguration`` for the given ``MethodDescriptor``. /// - /// If `useDefault` is true, then fall back to the default configuration given in ``init(defaultConfiguration:)`` - /// if there is no set configuration for the descriptor. Otherwise, return `nil`. + /// If no configuration has been set for the given descriptor, the value returned will be the default + /// passed in ``init(defaultConfiguration:)`` /// /// - Parameters: - /// - descriptor: The ``MethodDescriptor`` for which to get a ``MethodConfiguration``. - /// - useDefault: Whether the default value should be returned if no configuration was specified - /// for the given descriptor. - public subscript(_ descriptor: MethodDescriptor, useDefault useDefault: Bool) - -> MethodConfiguration? + /// - descriptor: The ``MethodDescriptor`` for which to get or set a ``MethodConfiguration``. + public subscript(_ descriptor: MethodDescriptor) -> MethodConfiguration? { get { if let methodLevelOverride = self.elements[descriptor] { @@ -711,26 +754,7 @@ extension GRPCClient { var serviceLevelDescriptor = descriptor serviceLevelDescriptor.method = "" - if useDefault { - return self.elements[serviceLevelDescriptor, default: self.defaultConfiguration] - } else { - return self.elements[serviceLevelDescriptor] - } - } - } - - /// Get or set the corresponding ``MethodConfiguration`` for the given ``MethodDescriptor``. - /// - /// If no configuration has been set for the given descriptor, the value returned will be the default - /// passed in ``init(defaultConfiguration:)`` - /// - /// - Parameters: - /// - descriptor: The ``MethodDescriptor`` for which to get or set a ``MethodConfiguration``. - public subscript(_ descriptor: MethodDescriptor) -> MethodConfiguration { - get { - // This force unwrap is safe, because we'll always have a default value - // present, and we'll always use it if `useDefault` is true. - self[descriptor, useDefault: true]! + return self.elements[serviceLevelDescriptor] ?? self.defaultConfiguration } set { @@ -743,6 +767,13 @@ extension GRPCClient { } } + /// Set a default configuration for all methods that have no overrides. + /// + /// - Parameter configuration: The default configuration. + public mutating func setOverallDefaultConfiguration(_ configuration: MethodConfiguration?) { + self.defaultConfiguration = configuration + } + /// Set a default configuration for a service. /// /// If getting a configuration for a method that's part of a service, and the method itself doesn't have an diff --git a/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness+Transport.swift b/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness+Transport.swift index 42529470e..abaa8e6ef 100644 --- a/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness+Transport.swift +++ b/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness+Transport.swift @@ -24,15 +24,7 @@ extension InProcessServerTransport { ) -> InProcessClientTransport { return InProcessClientTransport( server: self, - executionConfigurations: .init( - defaultConfiguration: .init( - hedgingPolicy: .init( - maximumAttempts: 2, - hedgingDelay: .milliseconds(100), - nonFatalStatusCodes: [] - ) - ) - ) + executionConfigurations: .init() ) } } diff --git a/Tests/GRPCCoreTests/Call/Client/MethodConfigurationRegistryTestsTests.swift b/Tests/GRPCCoreTests/Call/Client/MethodConfigurationRegistryTestsTests.swift index e1ffb7431..6103d396c 100644 --- a/Tests/GRPCCoreTests/Call/Client/MethodConfigurationRegistryTestsTests.swift +++ b/Tests/GRPCCoreTests/Call/Client/MethodConfigurationRegistryTestsTests.swift @@ -25,9 +25,8 @@ final class MethodConfigurationRegistryTests: XCTestCase { nonFatalStatusCodes: [] ) let defaultConfiguration = GRPCClient.MethodConfiguration(hedgingPolicy: policy) - var configurations = GRPCClient.MethodConfigurationRegistry( - defaultConfiguration: defaultConfiguration - ) + var configurations = GRPCClient.MethodConfigurationRegistry() + configurations.setOverallDefaultConfiguration(defaultConfiguration) let descriptor = MethodDescriptor(service: "test", method: "first") let retryPolicy = GRPCClient.MethodConfiguration.RetryPolicy( maximumAttempts: 10, @@ -49,9 +48,8 @@ final class MethodConfigurationRegistryTests: XCTestCase { nonFatalStatusCodes: [] ) let defaultConfiguration = GRPCClient.MethodConfiguration(hedgingPolicy: policy) - var configurations = GRPCClient.MethodConfigurationRegistry( - defaultConfiguration: defaultConfiguration - ) + var configurations = GRPCClient.MethodConfigurationRegistry() + configurations.setOverallDefaultConfiguration(defaultConfiguration) let firstDescriptor = MethodDescriptor(service: "test", method: "") let retryPolicy = GRPCClient.MethodConfiguration.RetryPolicy( maximumAttempts: 10, @@ -74,9 +72,8 @@ final class MethodConfigurationRegistryTests: XCTestCase { nonFatalStatusCodes: [] ) let defaultConfiguration = GRPCClient.MethodConfiguration(hedgingPolicy: policy) - var configurations = GRPCClient.MethodConfigurationRegistry( - defaultConfiguration: defaultConfiguration - ) + var configurations = GRPCClient.MethodConfigurationRegistry() + configurations.setOverallDefaultConfiguration(defaultConfiguration) let firstDescriptor = MethodDescriptor(service: "test1", method: "first") let retryPolicy = GRPCClient.MethodConfiguration.RetryPolicy( maximumAttempts: 10, diff --git a/Tests/GRPCCoreTests/Transport/InProcessClientTransportTests.swift b/Tests/GRPCCoreTests/Transport/InProcessClientTransportTests.swift index 830f59973..ff4f2dce4 100644 --- a/Tests/GRPCCoreTests/Transport/InProcessClientTransportTests.swift +++ b/Tests/GRPCCoreTests/Transport/InProcessClientTransportTests.swift @@ -188,9 +188,8 @@ final class InProcessClientTransportTests: XCTestCase { nonFatalStatusCodes: [] ) let defaultConfiguration = GRPCClient.MethodConfiguration(hedgingPolicy: policy) - var configurations = GRPCClient.MethodConfigurationRegistry( - defaultConfiguration: defaultConfiguration - ) + var configurations = GRPCClient.MethodConfigurationRegistry() + configurations.setOverallDefaultConfiguration(defaultConfiguration) var client = InProcessClientTransport(server: .init(), executionConfigurations: configurations) @@ -254,11 +253,13 @@ final class InProcessClientTransportTests: XCTestCase { retryableStatusCodes: [.unavailable] ) + var methodConfiguration = GRPCClient.MethodConfigurationRegistry() + methodConfiguration.setOverallDefaultConfiguration( + configuration ?? .init(retryPolicy: defaultPolicy) + ) return InProcessClientTransport( server: server, - executionConfigurations: .init( - defaultConfiguration: configuration ?? .init(retryPolicy: defaultPolicy) - ) + executionConfigurations: methodConfiguration ) } }