diff --git a/Sources/GRPCCore/Call/Client/ClientRPCExecutionConfiguration.swift b/Sources/GRPCCore/Call/Client/ClientRPCExecutionConfiguration.swift deleted file mode 100644 index b771e0b76..000000000 --- a/Sources/GRPCCore/Call/Client/ClientRPCExecutionConfiguration.swift +++ /dev/null @@ -1,290 +0,0 @@ -/* - * Copyright 2023, gRPC Authors All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/// Configuration values for executing an RPC. -@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) -public struct ClientRPCExecutionConfiguration: Hashable, Sendable { - /// The default timeout for the RPC. - /// - /// If no reply is received in the specified amount of time the request is aborted - /// with an ``RPCError`` with code ``RPCError/Code/deadlineExceeded``. - /// - /// The actual deadline used will be the minimum of the value specified here - /// and the value set by the application by the client API. If either one isn't set - /// then the other value is used. If neither is set then the request has no deadline. - /// - /// The timeout applies to the overall execution of an RPC. If, for example, a retry - /// policy is set then the timeout begins when the first attempt is started and _isn't_ reset - /// when subsequent attempts start. - public var timeout: Duration? - - /// The policy determining how many times, and when, the RPC is executed. - /// - /// There are two policy types: - /// 1. Retry - /// 2. Hedging - /// - /// The retry policy allows an RPC to be retried a limited number of times if the RPC - /// fails with one of the configured set of status codes. RPCs are only retried if they - /// fail immediately, that is, the first response part received from the server is a - /// status code. - /// - /// The hedging policy allows an RPC to be executed multiple times concurrently. Typically - /// each execution will be staggered by some delay. The first successful response will be - /// reported to the client. Hedging is only suitable for idempotent RPCs. - public var executionPolicy: ExecutionPolicy? - - /// Create an execution configuration. - /// - /// - Parameters: - /// - executionPolicy: The execution policy to use for the RPC. - /// - timeout: The default timeout for the RPC. - public init( - executionPolicy: ExecutionPolicy?, - timeout: Duration? - ) { - self.executionPolicy = executionPolicy - self.timeout = timeout - } - - /// Create an execution configuration with a retry policy. - /// - /// - Parameters: - /// - retryPolicy: The policy for retrying the RPC. - /// - timeout: The default timeout for the RPC. - public init( - retryPolicy: RetryPolicy, - timeout: Duration? = nil - ) { - self.executionPolicy = .retry(retryPolicy) - self.timeout = timeout - } - - /// Create an execution configuration with a hedging policy. - /// - /// - Parameters: - /// - hedgingPolicy: The policy for hedging the RPC. - /// - timeout: The default timeout for the RPC. - public init( - hedgingPolicy: HedgingPolicy, - timeout: Duration? = nil - ) { - self.executionPolicy = .hedge(hedgingPolicy) - self.timeout = timeout - } -} - -@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) -extension ClientRPCExecutionConfiguration { - /// The execution policy for an RPC. - public enum ExecutionPolicy: Hashable, Sendable { - /// Policy for retrying an RPC. - /// - /// See ``RetryPolicy`` for more details. - case retry(RetryPolicy) - - /// Policy for hedging an RPC. - /// - /// See ``HedgingPolicy`` for more details. - case hedge(HedgingPolicy) - } -} - -/// Policy for retrying an RPC. -/// -/// gRPC retries RPCs when the first response from the server is a status code which matches -/// one of the configured retryable status codes. If the server begins processing the RPC and -/// first responds with metadata and later responds with a retryable status code then the RPC -/// won't be retried. -/// -/// Execution attempts are limited by ``maximumAttempts`` which includes the original attempt. The -/// maximum number of attempts is limited to five. -/// -/// Subsequent attempts are executed after some delay. The first _retry_, or second attempt, will -/// be started after a randomly chosen delay between zero and ``initialBackoff``. More generally, -/// the nth retry will happen after a randomly chosen delay between zero -/// and `min(initialBackoff * backoffMultiplier^(n-1), maximumBackoff)`. -/// -/// For more information see [gRFC A6 Client -/// Retries](https://github.com/grpc/proposal/blob/master/A6-client-retries.md). -@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) -public struct RetryPolicy: Hashable, Sendable { - /// The maximum number of RPC attempts, including the original attempt. - /// - /// Must be greater than one, values greater than five are treated as five. - public var maximumAttempts: Int { - didSet { self.maximumAttempts = validateMaxAttempts(self.maximumAttempts) } - } - - /// The initial backoff duration. - /// - /// The initial retry will occur after a random amount of time up to this value. - /// - /// - Precondition: Must be greater than zero. - public var initialBackoff: Duration { - willSet { Self.validateInitialBackoff(newValue) } - } - - /// The maximum amount of time to backoff for. - /// - /// - Precondition: Must be greater than zero. - public var maximumBackoff: Duration { - willSet { Self.validateMaxBackoff(newValue) } - } - - /// The multiplier to apply to backoff. - /// - /// - Precondition: Must be greater than zero. - public var backoffMultiplier: Double { - willSet { Self.validateBackoffMultiplier(newValue) } - } - - /// The set of status codes which may be retried. - /// - /// - Precondition: Must not be empty. - public var retryableStatusCodes: Set { - willSet { Self.validateRetryableStatusCodes(newValue) } - } - - /// Create a new retry policy. - /// - /// - Parameters: - /// - maximumAttempts: The maximum number of attempts allowed for the RPC. - /// - initialBackoff: The initial backoff period for the first retry attempt. Must be - /// greater than zero. - /// - maximumBackoff: The maximum period of time to wait between attempts. Must be greater than - /// zero. - /// - backoffMultiplier: The exponential backoff multiplier. Must be greater than zero. - /// - retryableStatusCodes: The set of status codes which may be retried. Must not be empty. - /// - Precondition: `maximumAttempts`, `initialBackoff`, `maximumBackoff` and `backoffMultiplier` - /// must be greater than zero. - /// - Precondition: `retryableStatusCodes` must not be empty. - public init( - maximumAttempts: Int, - initialBackoff: Duration, - maximumBackoff: Duration, - backoffMultiplier: Double, - retryableStatusCodes: Set - ) { - self.maximumAttempts = validateMaxAttempts(maximumAttempts) - - Self.validateInitialBackoff(initialBackoff) - self.initialBackoff = initialBackoff - - Self.validateMaxBackoff(maximumBackoff) - self.maximumBackoff = maximumBackoff - - Self.validateBackoffMultiplier(backoffMultiplier) - self.backoffMultiplier = backoffMultiplier - - Self.validateRetryableStatusCodes(retryableStatusCodes) - self.retryableStatusCodes = retryableStatusCodes - } - - private static func validateInitialBackoff(_ value: Duration) { - precondition(value.isGreaterThanZero, "initialBackoff must be greater than zero") - } - - private static func validateMaxBackoff(_ value: Duration) { - precondition(value.isGreaterThanZero, "maximumBackoff must be greater than zero") - } - - private static func validateBackoffMultiplier(_ value: Double) { - precondition(value > 0, "backoffMultiplier must be greater than zero") - } - - private static func validateRetryableStatusCodes(_ value: Set) { - precondition(!value.isEmpty, "retryableStatusCodes mustn't be empty") - } -} - -/// Policy for hedging an RPC. -/// -/// Hedged RPCs may execute more than once on a server so only idempotent methods should -/// be hedged. -/// -/// gRPC executes the RPC at most ``maximumAttempts`` times, staggering each attempt -/// by ``hedgingDelay``. -/// -/// For more information see [gRFC A6 Client -/// Retries](https://github.com/grpc/proposal/blob/master/A6-client-retries.md). -@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) -public struct HedgingPolicy: Hashable, Sendable { - /// The maximum number of RPC attempts, including the original attempt. - /// - /// Values greater than five are treated as five. - /// - /// - Precondition: Must be greater than one. - public var maximumAttempts: Int { - didSet { self.maximumAttempts = validateMaxAttempts(self.maximumAttempts) } - } - - /// The first RPC will be sent immediately, but each subsequent RPC will be sent at intervals - /// of `hedgingDelay`. Set this to zero to immediately send all RPCs. - public var hedgingDelay: Duration { - willSet { Self.validateHedgingDelay(newValue) } - } - - /// The set of status codes which indicate other hedged RPCs may still succeed. - /// - /// If a non-fatal status code is returned by the server, hedged RPCs will continue. - /// Otherwise, outstanding requests will be cancelled and the error returned to the - /// application layer. - public var nonFatalStatusCodes: Set - - /// Create a new hedging policy. - /// - /// - Parameters: - /// - maximumAttempts: The maximum number of attempts allowed for the RPC. - /// - hedgingDelay: The delay between each hedged RPC. - /// - nonFatalStatusCodes: The set of status codes which indicated other hedged RPCs may still - /// succeed. - /// - Precondition: `maximumAttempts` must be greater than zero. - public init( - maximumAttempts: Int, - hedgingDelay: Duration, - nonFatalStatusCodes: Set - ) { - self.maximumAttempts = validateMaxAttempts(maximumAttempts) - - Self.validateHedgingDelay(hedgingDelay) - self.hedgingDelay = hedgingDelay - self.nonFatalStatusCodes = nonFatalStatusCodes - } - - private static func validateHedgingDelay(_ value: Duration) { - precondition( - value.isGreaterThanOrEqualToZero, - "hedgingDelay must be greater than or equal to zero" - ) - } -} - -private func validateMaxAttempts(_ value: Int) -> Int { - precondition(value > 0, "maximumAttempts must be greater than zero") - return min(value, 5) -} - -@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) -extension Duration { - fileprivate var isGreaterThanZero: Bool { - self.components.seconds > 0 || self.components.attoseconds > 0 - } - - fileprivate var isGreaterThanOrEqualToZero: Bool { - self.components.seconds >= 0 || self.components.attoseconds >= 0 - } -} diff --git a/Sources/GRPCCore/Call/Client/ClientRPCExecutionConfigurationCollection.swift b/Sources/GRPCCore/Call/Client/ClientRPCExecutionConfigurationCollection.swift deleted file mode 100644 index 9207fd757..000000000 --- a/Sources/GRPCCore/Call/Client/ClientRPCExecutionConfigurationCollection.swift +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright 2023, gRPC Authors All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) - -/// A collection of ``ClientRPCExecutionConfiguration``s, mapped to specific methods or services. -/// -/// When creating a new instance, you must provide a default configuration to be used when getting -/// a configuration for a method that has not been given a specific override. -/// Use ``setDefaultConfiguration(_:forService:)`` to set a specific override for a whole -/// service. -/// -/// Use the subscript to get and set configurations for methods. -public struct ClientRPCExecutionConfigurationCollection: Sendable, Hashable { - private var elements: [MethodDescriptor: ClientRPCExecutionConfiguration] - private let defaultConfiguration: ClientRPCExecutionConfiguration - - public init( - defaultConfiguration: ClientRPCExecutionConfiguration = ClientRPCExecutionConfiguration( - executionPolicy: nil, - timeout: nil - ) - ) { - self.elements = [:] - self.defaultConfiguration = defaultConfiguration - } - - public subscript(_ descriptor: MethodDescriptor) -> ClientRPCExecutionConfiguration { - get { - if let methodLevelOverride = self.elements[descriptor] { - return methodLevelOverride - } - var serviceLevelDescriptor = descriptor - serviceLevelDescriptor.method = "" - return self.elements[serviceLevelDescriptor, default: self.defaultConfiguration] - } - - set { - precondition( - !descriptor.service.isEmpty, - "Method descriptor's service cannot be empty." - ) - - self.elements[descriptor] = newValue - } - } - - /// Set a default configuration for a service. - /// - /// If getting a configuration for a method that's part of a service, and the method itself doesn't have an - /// override, then this configuration will be used instead of the default configuration passed when creating - /// this instance of ``ClientRPCExecutionConfigurationCollection``. - /// - /// - Parameters: - /// - configuration: The default configuration for the service. - /// - service: The name of the service for which this override applies. - public mutating func setDefaultConfiguration( - _ configuration: ClientRPCExecutionConfiguration, - forService service: String - ) { - self[MethodDescriptor(service: service, method: "")] = configuration - } -} diff --git a/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+HedgingExecutor.swift b/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+HedgingExecutor.swift index 63f46db74..42002a263 100644 --- a/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+HedgingExecutor.swift +++ b/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+HedgingExecutor.swift @@ -30,7 +30,7 @@ extension ClientRPCExecutor { @usableFromInline let transport: Transport @usableFromInline - let policy: HedgingPolicy + let policy: GRPCClient.MethodConfiguration.HedgingPolicy @usableFromInline let timeout: Duration? @usableFromInline @@ -45,7 +45,7 @@ extension ClientRPCExecutor { @inlinable init( transport: Transport, - policy: HedgingPolicy, + policy: GRPCClient.MethodConfiguration.HedgingPolicy, timeout: Duration?, interceptors: [any ClientInterceptor], serializer: Serializer, @@ -420,7 +420,7 @@ extension ClientRPCExecutor.HedgingExecutor { private(set) var hasUsableResponse: Bool @inlinable - init(policy: HedgingPolicy) { + init(policy: GRPCClient.MethodConfiguration.HedgingPolicy) { self._maximumAttempts = policy.maximumAttempts self.attempt = 1 self.hasUsableResponse = false diff --git a/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+RetryExecutor.swift b/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+RetryExecutor.swift index 619352723..f433021f9 100644 --- a/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+RetryExecutor.swift +++ b/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+RetryExecutor.swift @@ -30,7 +30,7 @@ extension ClientRPCExecutor { @usableFromInline let transport: Transport @usableFromInline - let policy: RetryPolicy + let policy: GRPCClient.MethodConfiguration.RetryPolicy @usableFromInline let timeout: Duration? @usableFromInline @@ -45,7 +45,7 @@ extension ClientRPCExecutor { @inlinable init( transport: Transport, - policy: RetryPolicy, + policy: GRPCClient.MethodConfiguration.RetryPolicy, timeout: Duration?, interceptors: [any ClientInterceptor], serializer: Serializer, diff --git a/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor.swift b/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor.swift index 03e12c87a..213a8bc79 100644 --- a/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor.swift +++ b/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor.swift @@ -35,7 +35,7 @@ enum ClientRPCExecutor { static func execute( request: ClientRequest.Stream, method: MethodDescriptor, - configuration: ClientRPCExecutionConfiguration, + configuration: GRPCClient.MethodConfiguration, serializer: some MessageSerializer, deserializer: some MessageDeserializer, transport: some ClientTransport, diff --git a/Sources/GRPCCore/Call/Client/Internal/RetryDelaySequence.swift b/Sources/GRPCCore/Call/Client/Internal/RetryDelaySequence.swift index b07691e4e..57592bc13 100644 --- a/Sources/GRPCCore/Call/Client/Internal/RetryDelaySequence.swift +++ b/Sources/GRPCCore/Call/Client/Internal/RetryDelaySequence.swift @@ -26,10 +26,10 @@ struct RetryDelaySequence: Sequence { typealias Element = Duration @usableFromInline - let policy: RetryPolicy + let policy: GRPCClient.MethodConfiguration.RetryPolicy @inlinable - init(policy: RetryPolicy) { + init(policy: GRPCClient.MethodConfiguration.RetryPolicy) { self.policy = policy } @@ -41,12 +41,12 @@ struct RetryDelaySequence: Sequence { @usableFromInline struct Iterator: IteratorProtocol { @usableFromInline - let policy: RetryPolicy + let policy: GRPCClient.MethodConfiguration.RetryPolicy @usableFromInline private(set) var n = 1 @inlinable - init(policy: RetryPolicy) { + init(policy: GRPCClient.MethodConfiguration.RetryPolicy) { self.policy = policy } diff --git a/Sources/GRPCCore/ClientError.swift b/Sources/GRPCCore/ClientError.swift new file mode 100644 index 000000000..77ad48719 --- /dev/null +++ b/Sources/GRPCCore/ClientError.swift @@ -0,0 +1,148 @@ +/* + * Copyright 2023, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/// A runtime error thrown by the client. +/// +/// In contrast to ``RPCError``, the ``ClientError`` represents errors which happen at a scope +/// wider than an individual RPC. For example, attempting to start a client which is already +/// stopped would result in a ``ClientError``. +public struct ClientError: Error, Hashable, @unchecked Sendable { + private var storage: Storage + + // Ensures the underlying storage is unique. + private mutating func ensureUniqueStorage() { + if !isKnownUniquelyReferenced(&self.storage) { + self.storage = self.storage.copy() + } + } + + /// The code indicating the domain of the error. + public var code: Code { + get { self.storage.code } + set { + self.ensureUniqueStorage() + self.storage.code = newValue + } + } + + /// A message providing more details about the error which may include details specific to this + /// instance of the error. + public var message: String { + get { self.storage.message } + set { + self.ensureUniqueStorage() + self.storage.message = newValue + } + } + + /// The original error which led to this error being thrown. + public var cause: Error? { + get { self.storage.cause } + set { + self.ensureUniqueStorage() + self.storage.cause = newValue + } + } + + /// Creates a new error. + /// + /// - Parameters: + /// - code: The error code. + /// - message: A description of the error. + /// - cause: The original error which led to this error being thrown. + public init(code: Code, message: String, cause: Error? = nil) { + self.storage = Storage(code: code, message: message, cause: cause) + } +} + +extension ClientError: CustomStringConvertible { + public var description: String { + if let cause = self.cause { + return "\(self.code): \"\(self.message)\" (cause: \"\(cause)\")" + } else { + return "\(self.code): \"\(self.message)\"" + } + } +} + +extension ClientError { + private final class Storage: Hashable { + var code: Code + var message: String + var cause: Error? + + init(code: Code, message: String, cause: Error?) { + self.code = code + self.message = message + self.cause = cause + } + + func copy() -> Storage { + return Storage(code: self.code, message: self.message, cause: self.cause) + } + + func hash(into hasher: inout Hasher) { + hasher.combine(self.code) + hasher.combine(self.message) + } + + static func == (lhs: Storage, rhs: Storage) -> Bool { + return lhs.code == rhs.code && lhs.message == rhs.message + } + } +} + +extension ClientError { + public struct Code: Hashable, Sendable { + private enum Value { + case clientIsAlreadyRunning + case clientIsNotRunning + case clientIsStopped + case transportError + } + + private var value: Value + private init(_ value: Value) { + self.value = value + } + + /// At attempt to start the client was made but it is already running. + public static var clientIsAlreadyRunning: Self { + Self(.clientIsAlreadyRunning) + } + + /// An attempt to start an RPC was made but the client is not running. + public static var clientIsNotRunning: Self { + Self(.clientIsNotRunning) + } + + /// At attempt to start the client was made but it has already stopped. + public static var clientIsStopped: Self { + Self(.clientIsStopped) + } + + /// The transport threw an error whilst connected. + public static var transportError: Self { + Self(.transportError) + } + } +} + +extension ClientError.Code: CustomStringConvertible { + public var description: String { + String(describing: self.value) + } +} diff --git a/Sources/GRPCCore/GRPCClient.swift b/Sources/GRPCCore/GRPCClient.swift new file mode 100644 index 000000000..515e29978 --- /dev/null +++ b/Sources/GRPCCore/GRPCClient.swift @@ -0,0 +1,804 @@ +/* + * Copyright 2023, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/// A gRPC client. +/// +/// A ``GRPCClient`` communicates to a server via a given ``ClientTransport``. +/// You can start RPCs to the server by calling the corresponding method: +/// - ``unary(request:descriptor:serializer:deserializer:handler:)`` +/// - ``clientStreaming(request:descriptor:serializer:deserializer:handler:)`` +/// - ``serverStreaming(request:descriptor:serializer:deserializer:handler:)`` +/// - ``bidirectionalStreaming(request:descriptor:serializer:deserializer:handler:)`` +/// +/// You can set ``MethodConfiguration``s on this client to override whatever configurations have been +/// set on the given transport. +/// You can also use ``ClientInterceptor``s to implement cross-cutting logic which apply to all +/// RPCs. Example uses of interceptors include authentication and logging. +/// +/// ## Creating and configuring a client +/// +/// The following example demonstrates how to create and configure a server. +/// +/// ```swift +/// // Create and add an in-process transport. +/// let inProcessServerTransport = InProcessServerTransport() +/// let inProcessClientTransport = InProcessClientTransport(serverTransport: inProcessServerTransport) +/// let client = GRPCClient(transport: inProcessClientTransport) +/// +/// // Create and add some interceptors. +/// client.interceptors.add(StatsRecordingServerInterceptors()) +/// +/// // Create and add some method configurations. +/// let defaultConfiguration = MethodConfiguration( +/// executionPolicy: ..., +/// timeout: ... +/// ) +/// var registry = MethodConfigurationRegistry() +/// registry.setOverallDefaultConfiguration(defaultConfiguration) +/// +/// // Set as configuration overrides to make the client-side configuration +/// // take precedence over the transport-defined configurations. +/// client.methodConfigurationOverrides = registry +/// +/// // Or set as configuration defaults to make the transport-defined configurations +/// // take precendece over these. +/// client.methodConfigurationDefaults = registry +/// ``` +/// +/// ## Starting and stopping the client +/// +/// Once you have configured the client, call ``run()`` to start it. Calling ``run()`` connects to the given +/// transport. +/// +/// ```swift +/// // Start running the client. +/// // Since it's a long-running task, we do it in a task group so it runs in +/// // the background. +/// try await withThrowingTaskGroup(of: Void.self) { group in +/// group.addTask { +/// try await client.run() +/// } +/// +/// try await client.unary( +/// request: .init(message: "Hello!"), +/// descriptor: MethodDescriptor(service: "service", method: "method"), +/// serializer: ..., +/// deserializer: ... +/// ) { response in +/// // Do something with the response +/// } +/// } +/// ``` +/// +/// The ``run()`` method won't return until the client has finished handling all requests. You can +/// signal to the client that it should stop creating new request streams by calling ``close()``. +/// This gives the client enough time to drain any requests already in flight. To stop the client more abruptly +/// you can cancel the task running your client. If your application requires additional resources +/// that need their lifecycles managed you should consider using [Swift Service +/// Lifecycle](https://github.com/swift-server/swift-service-lifecycle). +@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) +public final class GRPCClient: Sendable { + /// A collection of ``ClientInterceptor`` implementations which are applied to all accepted + /// RPCs. + /// + /// RPCs are intercepted in the order that interceptors are added. That is, a request sent from the client to + /// the server will first be intercepted by the first added interceptor followed by the second, and so on. + /// For responses from the server, they'll be applied in the opposite order. + public var interceptors: Interceptors { + get { + self.storage.withLockedValue { $0.interceptors } + } + set { + self.storage.withLockedValue { storage in + if case .notStarted = storage.state { + storage.interceptors = newValue + } + } + } + } + + /// A ``MethodConfigurationRegistry`` containing ``MethodConfiguration``s for calls + /// made from this ``Client``. + /// + /// - Note: These configurations will override those configurations set in the ``ClientTransport``. + public var methodConfigurationOverrides: MethodConfigurationRegistry { + get { + self.storage.withLockedValue { $0.methodConfigurationOverrides } + } + set { + self.storage.withLockedValue { storage in + if case .notStarted = storage.state { + storage.methodConfigurationOverrides = newValue + } + } + } + } + + /// A ``MethodConfigurationRegistry`` containing ``MethodConfiguration``s for calls + /// made from this ``Client``. + /// + /// - Note: These configurations will be used if there were no client overrides set in ``methodConfigurationOverrides`` + /// and no configuration set in the transport. + public var methodConfigurationDefaults: MethodConfigurationRegistry { + get { + self.storage.withLockedValue { $0.methodConfigurationDefaults } + } + set { + self.storage.withLockedValue { storage in + if case .notStarted = storage.state { + storage.methodConfigurationDefaults = newValue + } + } + } + } + + /// The state of the client. + private enum State { + /// The client hasn't been started yet. Can transition to `running` or `stopped`. + case notStarted + /// The client is running and can send RPCs. Can transition to `stopping`. + case running + /// The client is stopping and no new RPCs will be sent. Existing RPCs may run to + /// completion. May transition to `stopped`. + case stopping + /// The client has stopped, no RPCs are in flight and no more will be accepted. This state + /// is terminal. + case stopped + } + + /// Underlying storage for the client. + private struct Storage { + var state: State + + /// Client interceptors. + var interceptors: Interceptors + + /// A ``MethodConfigurationRegistry`` containing configuration overrides. + /// These will take precedence over the transport-defined configuration. + var methodConfigurationOverrides: MethodConfigurationRegistry + + /// A ``MethodConfigurationRegistry`` containing configuration defaults. + /// These will be used if the transport didn't define any configuration. + var methodConfigurationDefaults: MethodConfigurationRegistry + + init() { + self.interceptors = Interceptors() + self.methodConfigurationOverrides = MethodConfigurationRegistry() + self.methodConfigurationDefaults = MethodConfigurationRegistry() + self.state = .notStarted + } + } + + private let storage: LockedValueBox + + /// The transport which provides a bidirectional communication channel with the server. + private let transport: any ClientTransport + + /// Creates a new client with no resources (i.e., with no ``Interceptors-swift.struct`` and no + /// ``MethodDescriptor``s). + /// + /// You can add resources to the client via ``interceptors-swift.property``, + /// ``methodConfigurationOverrides-swift.property``, and ``methodConfigurationDefaults-swift.property``, + /// and start the client by calling ``run()``. + /// + /// - Note: Any changes to resources after ``run()`` has been called will be ignored. + /// + /// - Parameter transport: The ``ClientTransport`` to be used for this ``GRPCClient``. + public init(transport: any ClientTransport) { + self.storage = LockedValueBox(Storage()) + self.transport = transport + } + + /// Start the client. + /// + /// This is a long-running task that will return once ``close()`` has been called and all in-flight RPCs + /// finished executing. + /// + /// If you need to immediately stop all work, cancel the task executing this method. + public func run() async throws { + try self.storage.withLockedValue { storage in + switch storage.state { + case .notStarted: + storage.state = .running + case .running: + throw ClientError( + code: .clientIsAlreadyRunning, + message: "The client is already running and can only be started once." + ) + case .stopping, .stopped: + throw ClientError( + code: .clientIsStopped, + message: "The client has stopped and can only be started once." + ) + } + } + + // When we exit this function we must have stopped. + defer { + self.storage.withLockedValue { $0.state = .stopped } + } + + do { + try await self.transport.connect(lazily: false) + } catch { + throw ClientError( + code: .transportError, + message: "The transport threw an error while connected.", + cause: error + ) + } + } + + /// Close the client. + /// + /// The transport will be closed: this means that it will be given enough time to wait for in-flight RPCs to + /// finish executing, but no new RPCs will be accepted. + /// You can cancel the task executing ``run()`` if you want to immediately stop all work. + public func close() { + self.storage.withLockedValue { storage in + switch storage.state { + case .notStarted: + storage.state = .stopped + case .running: + storage.state = .stopping + case .stopping, .stopped: + () + } + } + + self.transport.close() + } + + /// Executes a unary RPC. + /// + /// - Parameters: + /// - request: The unary request. + /// - descriptor: The method descriptor for which to execute this request. + /// - serializer: A request serializer. + /// - deserializer: A response deserializer. + /// - handler: A unary response handler. + /// + /// - Returns: The return value from the `handler`. + public func unary( + request: ClientRequest.Single, + descriptor: MethodDescriptor, + serializer: some MessageSerializer, + deserializer: some MessageDeserializer, + handler: @Sendable @escaping (ClientResponse.Single) async throws -> ReturnValue + ) async throws -> ReturnValue { + try await bidirectionalStreaming( + request: ClientRequest.Stream(single: request), + descriptor: descriptor, + serializer: serializer, + deserializer: deserializer + ) { stream in + let singleResponse = await ClientResponse.Single(stream: stream) + return try await handler(singleResponse) + } + } + + /// Start a client-streaming RPC. + /// + /// - Parameters: + /// - request: The request stream. + /// - descriptor: The method descriptor for which to execute this request. + /// - serializer: A request serializer. + /// - deserializer: A response deserializer. + /// - handler: A unary response handler. + /// + /// - Returns: The return value from the `handler`. + public func clientStreaming( + request: ClientRequest.Stream, + descriptor: MethodDescriptor, + serializer: some MessageSerializer, + deserializer: some MessageDeserializer, + handler: @Sendable @escaping (ClientResponse.Single) async throws -> ReturnValue + ) async throws -> ReturnValue { + try await bidirectionalStreaming( + request: request, + descriptor: descriptor, + serializer: serializer, + deserializer: deserializer + ) { stream in + let singleResponse = await ClientResponse.Single(stream: stream) + return try await handler(singleResponse) + } + } + + /// Start a server-streaming RPC. + /// + /// - Parameters: + /// - request: The unary request. + /// - descriptor: The method descriptor for which to execute this request. + /// - serializer: A request serializer. + /// - deserializer: A response deserializer. + /// - handler: A response stream handler. + /// + /// - Returns: The return value from the `handler`. + public func serverStreaming( + request: ClientRequest.Single, + descriptor: MethodDescriptor, + serializer: some MessageSerializer, + deserializer: some MessageDeserializer, + handler: @Sendable @escaping (ClientResponse.Stream) async throws -> ReturnValue + ) async throws -> ReturnValue { + try await bidirectionalStreaming( + request: ClientRequest.Stream(single: request), + descriptor: descriptor, + serializer: serializer, + deserializer: deserializer, + handler: handler + ) + } + + /// Start a bidirectional streaming RPC. + /// + /// - Note: ``run()`` must have been called and still executing, and ``close()`` mustn't + /// have been called. + /// + /// - Parameters: + /// - request: The streaming request. + /// - descriptor: The method descriptor for which to execute this request. + /// - serializer: A request serializer. + /// - deserializer: A response deserializer. + /// - handler: A response stream handler. + /// + /// - Returns: The return value from the `handler`. + public func bidirectionalStreaming( + request: ClientRequest.Stream, + descriptor: MethodDescriptor, + serializer: some MessageSerializer, + deserializer: some MessageDeserializer, + handler: @Sendable @escaping (ClientResponse.Stream) async throws -> ReturnValue + ) async throws -> ReturnValue { + let (configurationOverrides, configurationDefaults, interceptors) = try self.storage + .withLockedValue { storage in + switch storage.state { + case .running: + return ( + storage.methodConfigurationOverrides, + storage.methodConfigurationDefaults, + storage.interceptors.values + ) + case .notStarted: + throw ClientError( + code: .clientIsNotRunning, + message: "Client must be running to make an RPC: call run() first." + ) + case .stopping, .stopped: + throw ClientError( + code: .clientIsStopped, + message: "Client has been stopped. Can't make any more RPCs." + ) + } + } + + let applicableConfiguration = self.resolveMethodConfiguration( + descriptor: descriptor, + configurationOverrides: configurationOverrides, + configurationDefaults: configurationDefaults + ) + + return try await ClientRPCExecutor.execute( + request: request, + method: descriptor, + configuration: applicableConfiguration, + serializer: serializer, + deserializer: deserializer, + transport: transport, + interceptors: interceptors, + handler: handler + ) + } + + private func resolveMethodConfiguration( + descriptor: MethodDescriptor, + configurationOverrides: MethodConfigurationRegistry, + configurationDefaults: MethodConfigurationRegistry + ) -> MethodConfiguration { + if let clientOverride = configurationOverrides[descriptor] { + return clientOverride + } + + if let transportConfiguration = self.transport.executionConfiguration(forMethod: descriptor) { + return transportConfiguration + } + + // If there is no configuration override for this method descriptor in this + // client, nor in the transport, then get the default from the client. + // If no default has been specified, then fall back to an empty confiuration. + return configurationDefaults[descriptor] + ?? MethodConfiguration( + executionPolicy: nil, + timeout: nil + ) + } +} + +@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) +extension GRPCClient { + /// A collection of interceptors providing cross-cutting functionality to each accepted RPC. + public struct Interceptors: Sendable { + private(set) var values: [any ClientInterceptor] = [] + + /// Add an interceptor to the client. + /// + /// The order in which interceptors are added reflects the order in which they are called. The + /// first interceptor added will be the first interceptor to intercept each request. The last + /// interceptor added will be the final interceptor to intercept each request before calling + /// the appropriate handler. + /// + /// - Parameter interceptor: The interceptor to add. + public mutating func add(_ interceptor: some ClientInterceptor) { + self.values.append(interceptor) + } + } +} + +@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) +extension GRPCClient.Interceptors: CustomStringConvertible { + public var description: String { + return String(describing: self.values.map { String(describing: type(of: $0)) }) + } +} + +@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) +extension GRPCClient { + /// The execution policy for an RPC. + public enum ExecutionPolicy: Hashable, Sendable { + /// Policy for retrying an RPC. + /// + /// See ``RetryPolicy`` for more details. + case retry(MethodConfiguration.RetryPolicy) + + /// Policy for hedging an RPC. + /// + /// See ``HedgingPolicy`` for more details. + case hedge(MethodConfiguration.HedgingPolicy) + } + + /// Configuration values for executing an RPC. + public struct MethodConfiguration: Hashable, Sendable { + /// The default timeout for the RPC. + /// + /// If no reply is received in the specified amount of time the request is aborted + /// with an ``RPCError`` with code ``RPCError/Code/deadlineExceeded``. + /// + /// The actual deadline used will be the minimum of the value specified here + /// and the value set by the application by the client API. If either one isn't set + /// then the other value is used. If neither is set then the request has no deadline. + /// + /// The timeout applies to the overall execution of an RPC. If, for example, a retry + /// policy is set then the timeout begins when the first attempt is started and _isn't_ reset + /// when subsequent attempts start. + public var timeout: Duration? + + /// The policy determining how many times, and when, the RPC is executed. + /// + /// There are two policy types: + /// 1. Retry + /// 2. Hedging + /// + /// The retry policy allows an RPC to be retried a limited number of times if the RPC + /// fails with one of the configured set of status codes. RPCs are only retried if they + /// fail immediately, that is, the first response part received from the server is a + /// status code. + /// + /// The hedging policy allows an RPC to be executed multiple times concurrently. Typically + /// each execution will be staggered by some delay. The first successful response will be + /// reported to the client. Hedging is only suitable for idempotent RPCs. + public var executionPolicy: ExecutionPolicy? + + /// Create an execution configuration. + /// + /// - Parameters: + /// - executionPolicy: The execution policy to use for the RPC. + /// - timeout: The default timeout for the RPC. + public init( + executionPolicy: ExecutionPolicy?, + timeout: Duration? + ) { + self.executionPolicy = executionPolicy + self.timeout = timeout + } + + /// Create an execution configuration with a retry policy. + /// + /// - Parameters: + /// - retryPolicy: The policy for retrying the RPC. + /// - timeout: The default timeout for the RPC. + public init( + retryPolicy: RetryPolicy, + timeout: Duration? = nil + ) { + self.executionPolicy = .retry(retryPolicy) + self.timeout = timeout + } + + /// Create an execution configuration with a hedging policy. + /// + /// - Parameters: + /// - hedgingPolicy: The policy for hedging the RPC. + /// - timeout: The default timeout for the RPC. + public init( + hedgingPolicy: HedgingPolicy, + timeout: Duration? = nil + ) { + self.executionPolicy = .hedge(hedgingPolicy) + self.timeout = timeout + } + } +} + +@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) +extension GRPCClient.MethodConfiguration { + /// Policy for retrying an RPC. + /// + /// gRPC retries RPCs when the first response from the server is a status code which matches + /// one of the configured retryable status codes. If the server begins processing the RPC and + /// first responds with metadata and later responds with a retryable status code then the RPC + /// won't be retried. + /// + /// Execution attempts are limited by ``maximumAttempts`` which includes the original attempt. The + /// maximum number of attempts is limited to five. + /// + /// Subsequent attempts are executed after some delay. The first _retry_, or second attempt, will + /// be started after a randomly chosen delay between zero and ``initialBackoff``. More generally, + /// the nth retry will happen after a randomly chosen delay between zero + /// and `min(initialBackoff * backoffMultiplier^(n-1), maximumBackoff)`. + /// + /// For more information see [gRFC A6 Client + /// Retries](https://github.com/grpc/proposal/blob/master/A6-client-retries.md). + public struct RetryPolicy: Hashable, Sendable { + /// The maximum number of RPC attempts, including the original attempt. + /// + /// Must be greater than one, values greater than five are treated as five. + public var maximumAttempts: Int { + didSet { self.maximumAttempts = validateMaxAttempts(self.maximumAttempts) } + } + + /// The initial backoff duration. + /// + /// The initial retry will occur after a random amount of time up to this value. + /// + /// - Precondition: Must be greater than zero. + public var initialBackoff: Duration { + willSet { Self.validateInitialBackoff(newValue) } + } + + /// The maximum amount of time to backoff for. + /// + /// - Precondition: Must be greater than zero. + public var maximumBackoff: Duration { + willSet { Self.validateMaxBackoff(newValue) } + } + + /// The multiplier to apply to backoff. + /// + /// - Precondition: Must be greater than zero. + public var backoffMultiplier: Double { + willSet { Self.validateBackoffMultiplier(newValue) } + } + + /// The set of status codes which may be retried. + /// + /// - Precondition: Must not be empty. + public var retryableStatusCodes: Set { + willSet { Self.validateRetryableStatusCodes(newValue) } + } + + /// Create a new retry policy. + /// + /// - Parameters: + /// - maximumAttempts: The maximum number of attempts allowed for the RPC. + /// - initialBackoff: The initial backoff period for the first retry attempt. Must be + /// greater than zero. + /// - maximumBackoff: The maximum period of time to wait between attempts. Must be greater than + /// zero. + /// - backoffMultiplier: The exponential backoff multiplier. Must be greater than zero. + /// - retryableStatusCodes: The set of status codes which may be retried. Must not be empty. + /// - Precondition: `maximumAttempts`, `initialBackoff`, `maximumBackoff` and `backoffMultiplier` + /// must be greater than zero. + /// - Precondition: `retryableStatusCodes` must not be empty. + public init( + maximumAttempts: Int, + initialBackoff: Duration, + maximumBackoff: Duration, + backoffMultiplier: Double, + retryableStatusCodes: Set + ) { + self.maximumAttempts = validateMaxAttempts(maximumAttempts) + + Self.validateInitialBackoff(initialBackoff) + self.initialBackoff = initialBackoff + + Self.validateMaxBackoff(maximumBackoff) + self.maximumBackoff = maximumBackoff + + Self.validateBackoffMultiplier(backoffMultiplier) + self.backoffMultiplier = backoffMultiplier + + Self.validateRetryableStatusCodes(retryableStatusCodes) + self.retryableStatusCodes = retryableStatusCodes + } + + private static func validateInitialBackoff(_ value: Duration) { + precondition(value.isGreaterThanZero, "initialBackoff must be greater than zero") + } + + private static func validateMaxBackoff(_ value: Duration) { + precondition(value.isGreaterThanZero, "maximumBackoff must be greater than zero") + } + + private static func validateBackoffMultiplier(_ value: Double) { + precondition(value > 0, "backoffMultiplier must be greater than zero") + } + + private static func validateRetryableStatusCodes(_ value: Set) { + precondition(!value.isEmpty, "retryableStatusCodes mustn't be empty") + } + } + + /// Policy for hedging an RPC. + /// + /// Hedged RPCs may execute more than once on a server so only idempotent methods should + /// be hedged. + /// + /// gRPC executes the RPC at most ``maximumAttempts`` times, staggering each attempt + /// by ``hedgingDelay``. + /// + /// For more information see [gRFC A6 Client + /// Retries](https://github.com/grpc/proposal/blob/master/A6-client-retries.md). + public struct HedgingPolicy: Hashable, Sendable { + /// The maximum number of RPC attempts, including the original attempt. + /// + /// Values greater than five are treated as five. + /// + /// - Precondition: Must be greater than one. + public var maximumAttempts: Int { + didSet { self.maximumAttempts = validateMaxAttempts(self.maximumAttempts) } + } + + /// The first RPC will be sent immediately, but each subsequent RPC will be sent at intervals + /// of `hedgingDelay`. Set this to zero to immediately send all RPCs. + public var hedgingDelay: Duration { + willSet { Self.validateHedgingDelay(newValue) } + } + + /// The set of status codes which indicate other hedged RPCs may still succeed. + /// + /// If a non-fatal status code is returned by the server, hedged RPCs will continue. + /// Otherwise, outstanding requests will be cancelled and the error returned to the + /// application layer. + public var nonFatalStatusCodes: Set + + /// Create a new hedging policy. + /// + /// - Parameters: + /// - maximumAttempts: The maximum number of attempts allowed for the RPC. + /// - hedgingDelay: The delay between each hedged RPC. + /// - nonFatalStatusCodes: The set of status codes which indicated other hedged RPCs may still + /// succeed. + /// - Precondition: `maximumAttempts` must be greater than zero. + public init( + maximumAttempts: Int, + hedgingDelay: Duration, + nonFatalStatusCodes: Set + ) { + self.maximumAttempts = validateMaxAttempts(maximumAttempts) + + Self.validateHedgingDelay(hedgingDelay) + self.hedgingDelay = hedgingDelay + self.nonFatalStatusCodes = nonFatalStatusCodes + } + + private static func validateHedgingDelay(_ value: Duration) { + precondition( + value.isGreaterThanOrEqualToZero, + "hedgingDelay must be greater than or equal to zero" + ) + } + } + + fileprivate static func validateMaxAttempts(_ value: Int) -> Int { + precondition(value > 0, "maximumAttempts must be greater than zero") + return min(value, 5) + } +} + +@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) +extension GRPCClient { + /// A collection of ``ClientRPCExecutionConfiguration``s, mapped to specific methods or services. + /// + /// When creating a new instance, no overrides and no default will be set for using when getting + /// a configuration for a method that has not been given a specific override. + /// Use ``setDefaultConfiguration(_:forService:)`` to set a specific override for a whole + /// service, or set a default configuration for all methods by calling ``setOverallDefaultConfiguration(_:)``. + /// + /// Use the subscript to get and set configurations for specific methods. + public struct MethodConfigurationRegistry: Sendable, Hashable { + private var elements: [MethodDescriptor: MethodConfiguration] + private var defaultConfiguration: MethodConfiguration? + + /// Create a new ``MethodConfigurationRegistry`` with no overrides and no default configuration. + public init() { + self.elements = [:] + } + + /// Get or set the corresponding ``MethodConfiguration`` for the given ``MethodDescriptor``. + /// + /// If no configuration has been set for the given descriptor, the value returned will be the default + /// passed in ``init(defaultConfiguration:)`` + /// + /// - Parameters: + /// - descriptor: The ``MethodDescriptor`` for which to get or set a ``MethodConfiguration``. + public subscript(_ descriptor: MethodDescriptor) -> MethodConfiguration? + { + get { + if let methodLevelOverride = self.elements[descriptor] { + return methodLevelOverride + } + var serviceLevelDescriptor = descriptor + serviceLevelDescriptor.method = "" + + return self.elements[serviceLevelDescriptor] ?? self.defaultConfiguration + } + + set { + precondition( + !descriptor.service.isEmpty, + "Method descriptor's service cannot be empty." + ) + + self.elements[descriptor] = newValue + } + } + + /// Set a default configuration for all methods that have no overrides. + /// + /// - Parameter configuration: The default configuration. + public mutating func setOverallDefaultConfiguration(_ configuration: MethodConfiguration?) { + self.defaultConfiguration = configuration + } + + /// Set a default configuration for a service. + /// + /// If getting a configuration for a method that's part of a service, and the method itself doesn't have an + /// override, then this configuration will be used instead of the default configuration passed when creating + /// this instance of ``ClientRPCExecutionConfigurationCollection``. + /// + /// - Parameters: + /// - configuration: The default configuration for the service. + /// - service: The name of the service for which this override applies. + public mutating func setDefaultConfiguration( + _ configuration: MethodConfiguration, + forService service: String + ) { + self[MethodDescriptor(service: service, method: "")] = configuration + } + } +} + +@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) +extension Duration { + fileprivate var isGreaterThanZero: Bool { + self.components.seconds > 0 || self.components.attoseconds > 0 + } + + fileprivate var isGreaterThanOrEqualToZero: Bool { + self.components.seconds >= 0 || self.components.attoseconds >= 0 + } +} diff --git a/Sources/GRPCCore/GRPCServer.swift b/Sources/GRPCCore/GRPCServer.swift index 07c92715e..1388f0d28 100644 --- a/Sources/GRPCCore/GRPCServer.swift +++ b/Sources/GRPCCore/GRPCServer.swift @@ -23,7 +23,7 @@ import Atomics /// streams to a service to handle the RPC or rejects them with an appropriate error if no service /// can handle the RPC. /// -/// A ``Server`` may listen with multiple transports (for example, HTTP/2 and in-process) and route +/// A ``GRPCServer`` may listen with multiple transports (for example, HTTP/2 and in-process) and route /// requests from each transport to the same service instance. You can also use "interceptors", /// to implement cross-cutting logic which apply to all accepted RPCs. Example uses of interceptors /// include request filtering, authentication, and logging. Once requests have been intercepted @@ -76,7 +76,11 @@ public final class GRPCServer: Sendable { self.storage.withLockedValue { $0.transports } } set { - self.storage.withLockedValue { $0.transports = newValue } + self.storage.withLockedValue { storage in + if case .notStarted = storage.state { + storage.transports = newValue + } + } } } @@ -86,7 +90,11 @@ public final class GRPCServer: Sendable { self.storage.withLockedValue { $0.services } } set { - self.storage.withLockedValue { $0.services = newValue } + self.storage.withLockedValue { storage in + if case .notStarted = storage.state { + storage.services = newValue + } + } } } @@ -101,7 +109,11 @@ public final class GRPCServer: Sendable { self.storage.withLockedValue { $0.interceptors } } set { - self.storage.withLockedValue { $0.interceptors = newValue } + self.storage.withLockedValue { storage in + if case .notStarted = storage.state { + storage.interceptors = newValue + } + } } } diff --git a/Sources/GRPCCore/Transport/ClientTransport.swift b/Sources/GRPCCore/Transport/ClientTransport.swift index 6e7d20bcb..1233183b0 100644 --- a/Sources/GRPCCore/Transport/ClientTransport.swift +++ b/Sources/GRPCCore/Transport/ClientTransport.swift @@ -16,8 +16,8 @@ @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) public protocol ClientTransport: Sendable { - associatedtype Inbound: (AsyncSequence & Sendable) where Inbound.Element == RPCResponsePart - associatedtype Outbound: ClosableRPCWriterProtocol + typealias Inbound = RPCAsyncSequence + typealias Outbound = RPCWriter.Closable /// Returns a throttle which gRPC uses to determine whether retries can be executed. /// @@ -77,5 +77,5 @@ public protocol ClientTransport: Sendable { /// - Returns: Execution configuration for the method, if it exists. func executionConfiguration( forMethod descriptor: MethodDescriptor - ) -> ClientRPCExecutionConfiguration? + ) -> GRPCClient.MethodConfiguration? } diff --git a/Sources/GRPCCore/Transport/InProcessClientTransport.swift b/Sources/GRPCCore/Transport/InProcessClientTransport.swift index 47b1bbf73..635a49895 100644 --- a/Sources/GRPCCore/Transport/InProcessClientTransport.swift +++ b/Sources/GRPCCore/Transport/InProcessClientTransport.swift @@ -96,12 +96,12 @@ public struct InProcessClientTransport: ClientTransport { public let retryThrottle: RetryThrottle - private let executionConfigurations: ClientRPCExecutionConfigurationCollection + private let executionConfigurations: GRPCClient.MethodConfigurationRegistry private let state: LockedValueBox public init( server: InProcessServerTransport, - executionConfigurations: ClientRPCExecutionConfigurationCollection + executionConfigurations: GRPCClient.MethodConfigurationRegistry ) { self.retryThrottle = RetryThrottle(maximumTokens: 10, tokenRatio: 0.1) self.executionConfigurations = executionConfigurations @@ -329,7 +329,7 @@ public struct InProcessClientTransport: ClientTransport { /// - Returns: Execution configuration for the method, if it exists. public func executionConfiguration( forMethod descriptor: MethodDescriptor - ) -> ClientRPCExecutionConfiguration? { + ) -> GRPCClient.MethodConfiguration? { self.executionConfigurations[descriptor] } } diff --git a/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness+Transport.swift b/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness+Transport.swift index 42529470e..abaa8e6ef 100644 --- a/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness+Transport.swift +++ b/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness+Transport.swift @@ -24,15 +24,7 @@ extension InProcessServerTransport { ) -> InProcessClientTransport { return InProcessClientTransport( server: self, - executionConfigurations: .init( - defaultConfiguration: .init( - hedgingPolicy: .init( - maximumAttempts: 2, - hedgingDelay: .milliseconds(100), - nonFatalStatusCodes: [] - ) - ) - ) + executionConfigurations: .init() ) } } diff --git a/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness.swift b/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness.swift index 3b546d1e0..04a05a598 100644 --- a/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness.swift +++ b/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness.swift @@ -67,7 +67,7 @@ struct ClientRPCExecutorTestHarness { func unary( request: ClientRequest.Single<[UInt8]>, - configuration: ClientRPCExecutionConfiguration? = nil, + configuration: GRPCClient.MethodConfiguration? = nil, handler: @escaping @Sendable (ClientResponse.Single<[UInt8]>) async throws -> Void ) async throws { try await self.bidirectional( @@ -80,7 +80,7 @@ struct ClientRPCExecutorTestHarness { func clientStreaming( request: ClientRequest.Stream<[UInt8]>, - configuration: ClientRPCExecutionConfiguration? = nil, + configuration: GRPCClient.MethodConfiguration? = nil, handler: @escaping @Sendable (ClientResponse.Single<[UInt8]>) async throws -> Void ) async throws { try await self.bidirectional( @@ -93,7 +93,7 @@ struct ClientRPCExecutorTestHarness { func serverStreaming( request: ClientRequest.Single<[UInt8]>, - configuration: ClientRPCExecutionConfiguration? = nil, + configuration: GRPCClient.MethodConfiguration? = nil, handler: @escaping @Sendable (ClientResponse.Stream<[UInt8]>) async throws -> Void ) async throws { try await self.bidirectional( @@ -106,7 +106,7 @@ struct ClientRPCExecutorTestHarness { func bidirectional( request: ClientRequest.Stream<[UInt8]>, - configuration: ClientRPCExecutionConfiguration? = nil, + configuration: GRPCClient.MethodConfiguration? = nil, handler: @escaping @Sendable (ClientResponse.Stream<[UInt8]>) async throws -> Void ) async throws { try await self.execute( @@ -122,7 +122,7 @@ struct ClientRPCExecutorTestHarness { request: ClientRequest.Stream, serializer: some MessageSerializer, deserializer: some MessageDeserializer, - configuration: ClientRPCExecutionConfiguration?, + configuration: GRPCClient.MethodConfiguration?, handler: @escaping @Sendable (ClientResponse.Stream) async throws -> Void ) async throws { try await withThrowingTaskGroup(of: Void.self) { group in @@ -141,11 +141,11 @@ struct ClientRPCExecutorTestHarness { try await self.clientTransport.connect(lazily: false) } - let executionConfiguration: ClientRPCExecutionConfiguration + let executionConfiguration: GRPCClient.MethodConfiguration if let configuration = configuration { executionConfiguration = configuration } else { - executionConfiguration = ClientRPCExecutionConfiguration(executionPolicy: nil, timeout: nil) + executionConfiguration = GRPCClient.MethodConfiguration(executionPolicy: nil, timeout: nil) } // Execute the request. diff --git a/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTests+Hedging.swift b/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTests+Hedging.swift index ab6a6fca5..bc994dcc3 100644 --- a/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTests+Hedging.swift +++ b/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTests+Hedging.swift @@ -187,7 +187,7 @@ extension ClientRPCExecutorTests { } @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) -extension ClientRPCExecutionConfiguration { +extension GRPCClient.MethodConfiguration { fileprivate static func hedge( maximumAttempts: Int = 5, delay: Duration = .milliseconds(25), @@ -200,6 +200,6 @@ extension ClientRPCExecutionConfiguration { nonFatalStatusCodes: nonFatalCodes ) - return ClientRPCExecutionConfiguration(hedgingPolicy: policy, timeout: timeout) + return GRPCClient.MethodConfiguration(hedgingPolicy: policy, timeout: timeout) } } diff --git a/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTests+Retries.swift b/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTests+Retries.swift index a313148ea..839abd11a 100644 --- a/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTests+Retries.swift +++ b/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTests+Retries.swift @@ -219,7 +219,7 @@ extension ClientRPCExecutorTests { } ) - let retryPolicy = RetryPolicy( + let retryPolicy = GRPCClient.MethodConfiguration.RetryPolicy( maximumAttempts: 5, initialBackoff: .seconds(60), maximumBackoff: .seconds(50), @@ -256,7 +256,7 @@ extension ClientRPCExecutorTests { ) ) - let retryPolicy = RetryPolicy( + let retryPolicy = GRPCClient.MethodConfiguration.RetryPolicy( maximumAttempts: 5, initialBackoff: .seconds(60), maximumBackoff: .seconds(50), @@ -286,7 +286,7 @@ extension ClientRPCExecutorTests { } @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) -extension ClientRPCExecutionConfiguration { +extension GRPCClient.MethodConfiguration { fileprivate static func retry( maximumAttempts: Int = 5, codes: Set, @@ -300,6 +300,6 @@ extension ClientRPCExecutionConfiguration { retryableStatusCodes: codes ) - return ClientRPCExecutionConfiguration(retryPolicy: policy, timeout: timeout) + return GRPCClient.MethodConfiguration(retryPolicy: policy, timeout: timeout) } } diff --git a/Tests/GRPCCoreTests/Call/Client/ClientRPCExecutionConfigurationCollectionTests.swift b/Tests/GRPCCoreTests/Call/Client/MethodConfigurationRegistryTestsTests.swift similarity index 62% rename from Tests/GRPCCoreTests/Call/Client/ClientRPCExecutionConfigurationCollectionTests.swift rename to Tests/GRPCCoreTests/Call/Client/MethodConfigurationRegistryTestsTests.swift index 38f964652..6103d396c 100644 --- a/Tests/GRPCCoreTests/Call/Client/ClientRPCExecutionConfigurationCollectionTests.swift +++ b/Tests/GRPCCoreTests/Call/Client/MethodConfigurationRegistryTestsTests.swift @@ -17,50 +17,48 @@ import GRPCCore import XCTest @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) -final class ClientRPCExecutionConfigurationCollectionTests: XCTestCase { - func testGetConfigurationForKnownMethod() { - let policy = HedgingPolicy( +final class MethodConfigurationRegistryTests: XCTestCase { + func testGetConfigurationForKnownMethod() async throws { + let policy = GRPCClient.MethodConfiguration.HedgingPolicy( maximumAttempts: 10, hedgingDelay: .seconds(1), nonFatalStatusCodes: [] ) - let defaultConfiguration = ClientRPCExecutionConfiguration(hedgingPolicy: policy) - var configurations = ClientRPCExecutionConfigurationCollection( - defaultConfiguration: defaultConfiguration - ) + let defaultConfiguration = GRPCClient.MethodConfiguration(hedgingPolicy: policy) + var configurations = GRPCClient.MethodConfigurationRegistry() + configurations.setOverallDefaultConfiguration(defaultConfiguration) let descriptor = MethodDescriptor(service: "test", method: "first") - let retryPolicy = RetryPolicy( + let retryPolicy = GRPCClient.MethodConfiguration.RetryPolicy( maximumAttempts: 10, initialBackoff: .seconds(1), maximumBackoff: .seconds(1), backoffMultiplier: 1.0, retryableStatusCodes: [.unavailable] ) - let overrideConfiguration = ClientRPCExecutionConfiguration(retryPolicy: retryPolicy) + let overrideConfiguration = GRPCClient.MethodConfiguration(retryPolicy: retryPolicy) configurations[descriptor] = overrideConfiguration XCTAssertEqual(configurations[descriptor], overrideConfiguration) } func testGetConfigurationForUnknownMethodButServiceOverride() { - let policy = HedgingPolicy( + let policy = GRPCClient.MethodConfiguration.HedgingPolicy( maximumAttempts: 10, hedgingDelay: .seconds(1), nonFatalStatusCodes: [] ) - let defaultConfiguration = ClientRPCExecutionConfiguration(hedgingPolicy: policy) - var configurations = ClientRPCExecutionConfigurationCollection( - defaultConfiguration: defaultConfiguration - ) + let defaultConfiguration = GRPCClient.MethodConfiguration(hedgingPolicy: policy) + var configurations = GRPCClient.MethodConfigurationRegistry() + configurations.setOverallDefaultConfiguration(defaultConfiguration) let firstDescriptor = MethodDescriptor(service: "test", method: "") - let retryPolicy = RetryPolicy( + let retryPolicy = GRPCClient.MethodConfiguration.RetryPolicy( maximumAttempts: 10, initialBackoff: .seconds(1), maximumBackoff: .seconds(1), backoffMultiplier: 1.0, retryableStatusCodes: [.unavailable] ) - let overrideConfiguration = ClientRPCExecutionConfiguration(retryPolicy: retryPolicy) + let overrideConfiguration = GRPCClient.MethodConfiguration(retryPolicy: retryPolicy) configurations[firstDescriptor] = overrideConfiguration let secondDescriptor = MethodDescriptor(service: "test", method: "second") @@ -68,24 +66,23 @@ final class ClientRPCExecutionConfigurationCollectionTests: XCTestCase { } func testGetConfigurationForUnknownMethodDefaultValue() { - let policy = HedgingPolicy( + let policy = GRPCClient.MethodConfiguration.HedgingPolicy( maximumAttempts: 10, hedgingDelay: .seconds(1), nonFatalStatusCodes: [] ) - let defaultConfiguration = ClientRPCExecutionConfiguration(hedgingPolicy: policy) - var configurations = ClientRPCExecutionConfigurationCollection( - defaultConfiguration: defaultConfiguration - ) + let defaultConfiguration = GRPCClient.MethodConfiguration(hedgingPolicy: policy) + var configurations = GRPCClient.MethodConfigurationRegistry() + configurations.setOverallDefaultConfiguration(defaultConfiguration) let firstDescriptor = MethodDescriptor(service: "test1", method: "first") - let retryPolicy = RetryPolicy( + let retryPolicy = GRPCClient.MethodConfiguration.RetryPolicy( maximumAttempts: 10, initialBackoff: .seconds(1), maximumBackoff: .seconds(1), backoffMultiplier: 1.0, retryableStatusCodes: [.unavailable] ) - let overrideConfiguration = ClientRPCExecutionConfiguration(retryPolicy: retryPolicy) + let overrideConfiguration = GRPCClient.MethodConfiguration(retryPolicy: retryPolicy) configurations[firstDescriptor] = overrideConfiguration let secondDescriptor = MethodDescriptor(service: "test2", method: "second") diff --git a/Tests/GRPCCoreTests/Call/Client/ClientRPCExecutionConfigurationTests.swift b/Tests/GRPCCoreTests/Call/Client/MethodConfigurationTests.swift similarity index 89% rename from Tests/GRPCCoreTests/Call/Client/ClientRPCExecutionConfigurationTests.swift rename to Tests/GRPCCoreTests/Call/Client/MethodConfigurationTests.swift index 768b808b2..aaa5c2a76 100644 --- a/Tests/GRPCCoreTests/Call/Client/ClientRPCExecutionConfigurationTests.swift +++ b/Tests/GRPCCoreTests/Call/Client/MethodConfigurationTests.swift @@ -17,9 +17,9 @@ import GRPCCore import XCTest @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) -final class ClientRPCExecutionConfigurationTests: XCTestCase { +final class MethodConfigurationTests: XCTestCase { func testRetryPolicyClampsMaxAttempts() { - var policy = RetryPolicy( + var policy = GRPCClient.MethodConfiguration.RetryPolicy( maximumAttempts: 10, initialBackoff: .seconds(1), maximumBackoff: .seconds(1), @@ -35,7 +35,7 @@ final class ClientRPCExecutionConfigurationTests: XCTestCase { } func testHedgingPolicyClampsMaxAttempts() { - var policy = HedgingPolicy( + var policy = GRPCClient.MethodConfiguration.HedgingPolicy( maximumAttempts: 10, hedgingDelay: .seconds(1), nonFatalStatusCodes: [] diff --git a/Tests/GRPCCoreTests/Call/Client/RetryDelaySequenceTests.swift b/Tests/GRPCCoreTests/Call/Client/RetryDelaySequenceTests.swift index 09bc182cd..6b261340f 100644 --- a/Tests/GRPCCoreTests/Call/Client/RetryDelaySequenceTests.swift +++ b/Tests/GRPCCoreTests/Call/Client/RetryDelaySequenceTests.swift @@ -20,7 +20,7 @@ import XCTest @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) final class RetryDelaySequenceTests: XCTestCase { func testSequence() { - let policy = RetryPolicy( + let policy = GRPCClient.MethodConfiguration.RetryPolicy( maximumAttempts: 1, // ignored here initialBackoff: .seconds(1), maximumBackoff: .seconds(8), @@ -40,7 +40,7 @@ final class RetryDelaySequenceTests: XCTestCase { } func testSequenceSupportsMultipleIteration() { - let policy = RetryPolicy( + let policy = GRPCClient.MethodConfiguration.RetryPolicy( maximumAttempts: 1, // ignored here initialBackoff: .seconds(1), maximumBackoff: .seconds(8), diff --git a/Tests/GRPCCoreTests/GRPCClientTests.swift b/Tests/GRPCCoreTests/GRPCClientTests.swift new file mode 100644 index 000000000..1488638e2 --- /dev/null +++ b/Tests/GRPCCoreTests/GRPCClientTests.swift @@ -0,0 +1,449 @@ +/* + * Copyright 2023, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import Atomics +import GRPCCore +import XCTest + +@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) +final class GRPCClientTests: XCTestCase { + func makeInProcessPair() -> (client: InProcessClientTransport, server: InProcessServerTransport) { + let server = InProcessServerTransport() + let client = InProcessClientTransport( + server: server, + executionConfigurations: GRPCClient.MethodConfigurationRegistry() + ) + + return (client, server) + } + + func withInProcessConnectedClient( + services: [any RegistrableRPCService], + interceptors: [any ClientInterceptor] = [], + _ body: (GRPCClient, GRPCServer) async throws -> Void + ) async throws { + let inProcess = self.makeInProcessPair() + let client = GRPCClient(transport: inProcess.client) + let server = GRPCServer() + server.transports.add(inProcess.server) + + for service in services { + server.services.register(service) + } + + for interceptor in interceptors { + client.interceptors.add(interceptor) + } + + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await server.run() + } + + group.addTask { + try await client.run() + } + + // Make sure both server and client are running + try await Task.sleep(for: .milliseconds(100)) + try await body(client, server) + client.close() + server.stopListening() + } + } + + struct IdentitySerializer: MessageSerializer { + typealias Message = [UInt8] + + func serialize(_ message: [UInt8]) throws -> [UInt8] { + return message + } + } + + struct IdentityDeserializer: MessageDeserializer { + typealias Message = [UInt8] + + func deserialize(_ serializedMessageBytes: [UInt8]) throws -> [UInt8] { + return serializedMessageBytes + } + } + + func testUnary() async throws { + try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in + try await client.unary( + request: .init(message: [3, 1, 4, 1, 5]), + descriptor: BinaryEcho.Methods.collect, + serializer: IdentitySerializer(), + deserializer: IdentityDeserializer() + ) { response in + let message = try response.message + XCTAssertEqual(message, [3, 1, 4, 1, 5]) + } + } + } + + func testClientStreaming() async throws { + try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in + try await client.clientStreaming( + request: .init(producer: { writer in + for byte in [3, 1, 4, 1, 5] as [UInt8] { + try await writer.write([byte]) + } + }), + descriptor: BinaryEcho.Methods.collect, + serializer: IdentitySerializer(), + deserializer: IdentityDeserializer() + ) { response in + let message = try response.message + XCTAssertEqual(message, [3, 1, 4, 1, 5]) + } + } + } + + func testServerStreaming() async throws { + try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in + try await client.serverStreaming( + request: .init(message: [3, 1, 4, 1, 5]), + descriptor: BinaryEcho.Methods.expand, + serializer: IdentitySerializer(), + deserializer: IdentityDeserializer() + ) { response in + var responseParts = response.messages.makeAsyncIterator() + for byte in [3, 1, 4, 1, 5] as [UInt8] { + let message = try await responseParts.next() + XCTAssertEqual(message, [byte]) + } + } + } + } + + func testBidirectionalStreaming() async throws { + try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in + try await client.bidirectionalStreaming( + request: .init(producer: { writer in + for byte in [3, 1, 4, 1, 5] as [UInt8] { + try await writer.write([byte]) + } + }), + descriptor: BinaryEcho.Methods.update, + serializer: IdentitySerializer(), + deserializer: IdentityDeserializer() + ) { response in + var responseParts = response.messages.makeAsyncIterator() + for byte in [3, 1, 4, 1, 5] as [UInt8] { + let message = try await responseParts.next() + XCTAssertEqual(message, [byte]) + } + } + } + } + + func testUnimplementedMethod_Unary() async throws { + try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in + try await client.unary( + request: .init(message: [3, 1, 4, 1, 5]), + descriptor: MethodDescriptor(service: "not", method: "implemented"), + serializer: IdentitySerializer(), + deserializer: IdentityDeserializer() + ) { response in + XCTAssertThrowsRPCError(try response.accepted.get()) { error in + XCTAssertEqual(error.code, .unimplemented) + } + } + } + } + + func testUnimplementedMethod_ClientStreaming() async throws { + try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in + try await client.clientStreaming( + request: .init(producer: { writer in + for byte in [3, 1, 4, 1, 5] as [UInt8] { + try await writer.write([byte]) + } + }), + descriptor: MethodDescriptor(service: "not", method: "implemented"), + serializer: IdentitySerializer(), + deserializer: IdentityDeserializer() + ) { response in + XCTAssertThrowsRPCError(try response.accepted.get()) { error in + XCTAssertEqual(error.code, .unimplemented) + } + } + } + } + + func testUnimplementedMethod_ServerStreaming() async throws { + try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in + try await client.serverStreaming( + request: .init(message: [3, 1, 4, 1, 5]), + descriptor: MethodDescriptor(service: "not", method: "implemented"), + serializer: IdentitySerializer(), + deserializer: IdentityDeserializer() + ) { response in + XCTAssertThrowsRPCError(try response.accepted.get()) { error in + XCTAssertEqual(error.code, .unimplemented) + } + } + } + } + + func testUnimplementedMethod_BidirectionalStreaming() async throws { + try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in + try await client.bidirectionalStreaming( + request: .init(producer: { writer in + for byte in [3, 1, 4, 1, 5] as [UInt8] { + try await writer.write([byte]) + } + }), + descriptor: MethodDescriptor(service: "not", method: "implemented"), + serializer: IdentitySerializer(), + deserializer: IdentityDeserializer() + ) { response in + XCTAssertThrowsRPCError(try response.accepted.get()) { error in + XCTAssertEqual(error.code, .unimplemented) + } + } + } + } + + func testMultipleConcurrentRequests() async throws { + try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in + await withThrowingTaskGroup(of: Void.self) { group in + for i in UInt8.min ..< UInt8.max { + group.addTask { + try await client.unary( + request: .init(message: [i]), + descriptor: BinaryEcho.Methods.collect, + serializer: IdentitySerializer(), + deserializer: IdentityDeserializer() + ) { response in + let message = try response.message + XCTAssertEqual(message, [i]) + } + } + } + } + } + } + + func testInterceptorsAreAppliedInOrder() async throws { + let counter1 = ManagedAtomic(0) + let counter2 = ManagedAtomic(0) + + try await self.withInProcessConnectedClient( + services: [BinaryEcho()], + interceptors: [ + .requestCounter(counter1), + .rejectAll(with: RPCError(code: .unavailable, message: "")), + .requestCounter(counter2), + ] + ) { client, _ in + try await client.unary( + request: .init(message: [3, 1, 4, 1, 5]), + descriptor: BinaryEcho.Methods.collect, + serializer: IdentitySerializer(), + deserializer: IdentityDeserializer() + ) { response in + XCTAssertRejected(response) { error in + XCTAssertEqual(error.code, .unavailable) + } + } + } + + XCTAssertEqual(counter1.load(ordering: .sequentiallyConsistent), 1) + XCTAssertEqual(counter2.load(ordering: .sequentiallyConsistent), 0) + } + + func testNoNewRPCsAfterClientClose() async throws { + try await withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in + // Run an RPC so we know the client is running properly. + try await client.unary( + request: .init(message: [3, 1, 4, 1, 5]), + descriptor: BinaryEcho.Methods.collect, + serializer: IdentitySerializer(), + deserializer: IdentityDeserializer() + ) { response in + let message = try response.message + XCTAssertEqual(message, [3, 1, 4, 1, 5]) + } + + // New RPCs should fail immediately after this. + client.close() + + // RPC should fail now. + await XCTAssertThrowsErrorAsync(ofType: ClientError.self) { + try await client.unary( + request: .init(message: [3, 1, 4, 1, 5]), + descriptor: BinaryEcho.Methods.collect, + serializer: IdentitySerializer(), + deserializer: IdentityDeserializer() + ) { _ in } + } errorHandler: { error in + XCTAssertEqual(error.code, .clientIsStopped) + } + } + } + + func testInFlightRPCsCanContinueAfterClientIsClosed() async throws { + try await withInProcessConnectedClient(services: [BinaryEcho()]) { client, server in + try await client.clientStreaming( + request: .init(producer: { writer in + + // Close the client once this RCP has been started. + client.close() + + // Attempts to start a new RPC should fail. + await XCTAssertThrowsErrorAsync(ofType: ClientError.self) { + try await client.unary( + request: .init(message: [3, 1, 4, 1, 5]), + descriptor: BinaryEcho.Methods.collect, + serializer: IdentitySerializer(), + deserializer: IdentityDeserializer() + ) { _ in } + } errorHandler: { error in + XCTAssertEqual(error.code, .clientIsStopped) + } + + // Now write to the already opened stream to confirm that opened streams + // can successfully run to completion. + for byte in [3, 1, 4, 1, 5] as [UInt8] { + try await writer.write([byte]) + } + }), + descriptor: BinaryEcho.Methods.collect, + serializer: IdentitySerializer(), + deserializer: IdentityDeserializer() + ) { response in + let message = try response.message + XCTAssertEqual(message, [3, 1, 4, 1, 5]) + } + } + } + + func testCancelRunningClient() async throws { + let inProcess = self.makeInProcessPair() + let client = GRPCClient(transport: inProcess.client) + + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + let server = GRPCServer() + server.services.register(BinaryEcho()) + server.transports.add(inProcess.server) + try await server.run() + } + + group.addTask { + try await client.run() + } + + // Wait for client and server to be running. + try await Task.sleep(for: .milliseconds(10)) + + let task = Task { + try await client.clientStreaming( + request: .init(producer: { writer in + try await Task.sleep(for: .seconds(5)) + }), + descriptor: BinaryEcho.Methods.collect, + serializer: IdentitySerializer(), + deserializer: IdentityDeserializer() + ) { response in + XCTAssertRejected(response) { error in + XCTAssertEqual(error.code, .unknown) + } + } + } + + // Check requests are getting through. + try await client.unary( + request: .init(message: [3, 1, 4, 1, 5]), + descriptor: BinaryEcho.Methods.collect, + serializer: IdentitySerializer(), + deserializer: IdentityDeserializer() + ) { response in + let message = try response.message + XCTAssertEqual(message, [3, 1, 4, 1, 5]) + } + + task.cancel() + try await task.value + group.cancelAll() + } + } + + func testRunStoppedClient() async throws { + let (clientTransport, _) = self.makeInProcessPair() + let client = GRPCClient(transport: clientTransport) + // Run the client. + let task = Task { try await client.run() } + task.cancel() + try await task.value + + // Client is stopped, should throw an error. + await XCTAssertThrowsErrorAsync(ofType: ClientError.self) { + try await client.run() + } errorHandler: { error in + XCTAssertEqual(error.code, .clientIsStopped) + } + } + + func testRunAlreadyRunningClient() async throws { + let (clientTransport, _) = self.makeInProcessPair() + let client = GRPCClient(transport: clientTransport) + // Run the client. + let task = Task { try await client.run() } + // Make sure the client is run for the first time here. + try await Task.sleep(for: .milliseconds(10)) + + // Client is already running, should throw an error. + await XCTAssertThrowsErrorAsync(ofType: ClientError.self) { + try await client.run() + } errorHandler: { error in + XCTAssertEqual(error.code, .clientIsAlreadyRunning) + } + + task.cancel() + } + + func testRunClientNotRunning() async throws { + let (clientTransport, _) = self.makeInProcessPair() + let client = GRPCClient(transport: clientTransport) + + // Client is not running, should throw an error. + await XCTAssertThrowsErrorAsync(ofType: ClientError.self) { + try await client.unary( + request: .init(message: [3, 1, 4, 1, 5]), + descriptor: BinaryEcho.Methods.collect, + serializer: IdentitySerializer(), + deserializer: IdentityDeserializer() + ) { response in + let message = try response.message + XCTAssertEqual(message, [3, 1, 4, 1, 5]) + } + } errorHandler: { error in + XCTAssertEqual(error.code, .clientIsNotRunning) + } + } + + func testInterceptorsDescription() async throws { + let (clientTransport, _) = self.makeInProcessPair() + let client = GRPCClient(transport: clientTransport) + client.interceptors.add(.rejectAll(with: .init(code: .aborted, message: ""))) + client.interceptors.add(.requestCounter(.init(0))) + let description = String(describing: client.interceptors) + let expected = #"["RejectAllClientInterceptor", "RequestCountingClientInterceptor"]"# + XCTAssertEqual(description, expected) + } +} diff --git a/Tests/GRPCCoreTests/GRPCServerTests.swift b/Tests/GRPCCoreTests/GRPCServerTests.swift index e99a46914..f64b5fb2c 100644 --- a/Tests/GRPCCoreTests/GRPCServerTests.swift +++ b/Tests/GRPCCoreTests/GRPCServerTests.swift @@ -23,7 +23,7 @@ final class GRPCServerTests: XCTestCase { let server = InProcessServerTransport() let client = InProcessClientTransport( server: server, - executionConfigurations: ClientRPCExecutionConfigurationCollection() + executionConfigurations: GRPCClient.MethodConfigurationRegistry() ) return (client, server) @@ -59,7 +59,6 @@ final class GRPCServerTests: XCTestCase { inProcess.client.close() server.stopListening() } - } func testServerHandlesUnary() async throws { diff --git a/Tests/GRPCCoreTests/Test Utilities/Call/Client/ClientInterceptors.swift b/Tests/GRPCCoreTests/Test Utilities/Call/Client/ClientInterceptors.swift new file mode 100644 index 000000000..e89681d58 --- /dev/null +++ b/Tests/GRPCCoreTests/Test Utilities/Call/Client/ClientInterceptors.swift @@ -0,0 +1,84 @@ +/* + * Copyright 2023, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import Atomics +import GRPCCore + +extension ClientInterceptor where Self == RejectAllClientInterceptor { + static func rejectAll(with error: RPCError) -> Self { + return RejectAllClientInterceptor(error: error, throw: false) + } + + static func throwError(_ error: RPCError) -> Self { + return RejectAllClientInterceptor(error: error, throw: true) + } + +} + +extension ClientInterceptor where Self == RequestCountingClientInterceptor { + static func requestCounter(_ counter: ManagedAtomic) -> Self { + return RequestCountingClientInterceptor(counter: counter) + } +} + +/// Rejects all RPCs with the provided error. +struct RejectAllClientInterceptor: ClientInterceptor { + /// The error to reject all RPCs with. + let error: RPCError + /// Whether the error should be thrown. If `false` then the request is rejected with the error + /// instead. + let `throw`: Bool + + init(error: RPCError, throw: Bool = false) { + self.error = error + self.`throw` = `throw` + } + + func intercept( + request: ClientRequest.Stream, + context: ClientInterceptorContext, + next: @Sendable ( + ClientRequest.Stream, + ClientInterceptorContext + ) async throws -> ClientResponse.Stream + ) async throws -> ClientResponse.Stream { + if self.throw { + throw self.error + } else { + return ClientResponse.Stream(error: self.error) + } + } +} + +struct RequestCountingClientInterceptor: ClientInterceptor { + /// The number of requests made. + let counter: ManagedAtomic + + init(counter: ManagedAtomic) { + self.counter = counter + } + + func intercept( + request: ClientRequest.Stream, + context: ClientInterceptorContext, + next: @Sendable ( + ClientRequest.Stream, + ClientInterceptorContext + ) async throws -> ClientResponse.Stream + ) async throws -> ClientResponse.Stream { + self.counter.wrappingIncrement(ordering: .sequentiallyConsistent) + return try await next(request, context) + } +} diff --git a/Tests/GRPCCoreTests/Test Utilities/Call/Server/ServerInterceptors.swift b/Tests/GRPCCoreTests/Test Utilities/Call/Server/ServerInterceptors.swift index 266648768..02d4061a6 100644 --- a/Tests/GRPCCoreTests/Test Utilities/Call/Server/ServerInterceptors.swift +++ b/Tests/GRPCCoreTests/Test Utilities/Call/Server/ServerInterceptors.swift @@ -24,7 +24,6 @@ extension ServerInterceptor where Self == RejectAllServerInterceptor { static func throwError(_ error: RPCError) -> Self { return RejectAllServerInterceptor(error: error, throw: true) } - } extension ServerInterceptor where Self == RequestCountingServerInterceptor { @@ -63,7 +62,7 @@ struct RejectAllServerInterceptor: ServerInterceptor { } struct RequestCountingServerInterceptor: ServerInterceptor { - /// The error to reject all RPCs with. + /// The number of requests made. let counter: ManagedAtomic init(counter: ManagedAtomic) { diff --git a/Tests/GRPCCoreTests/Test Utilities/Transport/AnyTransport.swift b/Tests/GRPCCoreTests/Test Utilities/Transport/AnyTransport.swift index fa64d76fd..95bcb6778 100644 --- a/Tests/GRPCCoreTests/Test Utilities/Transport/AnyTransport.swift +++ b/Tests/GRPCCoreTests/Test Utilities/Transport/AnyTransport.swift @@ -28,7 +28,7 @@ struct AnyClientTransport: ClientTransport, Sendable { ) async throws -> Any private let _connect: @Sendable (Bool) async throws -> Void private let _close: @Sendable () -> Void - private let _configuration: @Sendable (MethodDescriptor) -> ClientRPCExecutionConfiguration? + private let _configuration: @Sendable (MethodDescriptor) -> GRPCClient.MethodConfiguration? init(wrapping transport: Transport) where Transport.Inbound == Inbound, Transport.Outbound == Outbound { @@ -74,7 +74,7 @@ struct AnyClientTransport: ClientTransport, Sendable { func executionConfiguration( forMethod descriptor: MethodDescriptor - ) -> ClientRPCExecutionConfiguration? { + ) -> GRPCClient.MethodConfiguration? { self._configuration(descriptor) } } diff --git a/Tests/GRPCCoreTests/Test Utilities/Transport/StreamCountingTransport.swift b/Tests/GRPCCoreTests/Test Utilities/Transport/StreamCountingTransport.swift index 11c294e03..8058567d4 100644 --- a/Tests/GRPCCoreTests/Test Utilities/Transport/StreamCountingTransport.swift +++ b/Tests/GRPCCoreTests/Test Utilities/Transport/StreamCountingTransport.swift @@ -68,7 +68,7 @@ struct StreamCountingClientTransport: ClientTransport, Sendable { func executionConfiguration( forMethod descriptor: MethodDescriptor - ) -> ClientRPCExecutionConfiguration? { + ) -> GRPCClient.MethodConfiguration? { self.transport.executionConfiguration(forMethod: descriptor) } } diff --git a/Tests/GRPCCoreTests/Test Utilities/Transport/ThrowingTransport.swift b/Tests/GRPCCoreTests/Test Utilities/Transport/ThrowingTransport.swift index 0dd1cee9b..f428ec06a 100644 --- a/Tests/GRPCCoreTests/Test Utilities/Transport/ThrowingTransport.swift +++ b/Tests/GRPCCoreTests/Test Utilities/Transport/ThrowingTransport.swift @@ -38,7 +38,7 @@ struct ThrowOnStreamCreationTransport: ClientTransport { func executionConfiguration( forMethod descriptor: MethodDescriptor - ) -> ClientRPCExecutionConfiguration? { + ) -> GRPCClient.MethodConfiguration? { return nil } diff --git a/Tests/GRPCCoreTests/Test Utilities/XCTest+Utilities.swift b/Tests/GRPCCoreTests/Test Utilities/XCTest+Utilities.swift index 7bc88edef..71ca6dd7a 100644 --- a/Tests/GRPCCoreTests/Test Utilities/XCTest+Utilities.swift +++ b/Tests/GRPCCoreTests/Test Utilities/XCTest+Utilities.swift @@ -104,6 +104,18 @@ func XCTAssertRejected( } } +func XCTAssertRejected( + _ response: ClientResponse.Single, + errorHandler: (RPCError) -> Void +) { + switch response.accepted { + case .success: + XCTFail("Expected RPC to be rejected") + case .failure(let error): + errorHandler(error) + } +} + func XCTAssertMetadata( _ part: RPCResponsePart?, metadataHandler: (Metadata) -> Void = { _ in } @@ -116,6 +128,18 @@ func XCTAssertMetadata( } } +func XCTAssertMetadata( + _ part: RPCRequestPart?, + metadataHandler: (Metadata) async throws -> Void = { _ in } +) async throws { + switch part { + case .some(.metadata(let metadata)): + try await metadataHandler(metadata) + default: + XCTFail("Expected '.metadata' but found '\(String(describing: part))'") + } +} + func XCTAssertMessage( _ part: RPCResponsePart?, messageHandler: ([UInt8]) -> Void = { _ in } @@ -124,7 +148,19 @@ func XCTAssertMessage( case .some(.message(let message)): messageHandler(message) default: - XCTFail("Expected '.metadata' but found '\(String(describing: part))'") + XCTFail("Expected '.message' but found '\(String(describing: part))'") + } +} + +func XCTAssertMessage( + _ part: RPCRequestPart?, + messageHandler: ([UInt8]) async throws -> Void = { _ in } +) async throws { + switch part { + case .some(.message(let message)): + try await messageHandler(message) + default: + XCTFail("Expected '.message' but found '\(String(describing: part))'") } } diff --git a/Tests/GRPCCoreTests/Transport/InProcessClientTransportTests.swift b/Tests/GRPCCoreTests/Transport/InProcessClientTransportTests.swift index c7bbdef80..ff4f2dce4 100644 --- a/Tests/GRPCCoreTests/Transport/InProcessClientTransportTests.swift +++ b/Tests/GRPCCoreTests/Transport/InProcessClientTransportTests.swift @@ -182,29 +182,28 @@ final class InProcessClientTransportTests: XCTestCase { } func testExecutionConfiguration() { - let policy = HedgingPolicy( + let policy = GRPCClient.MethodConfiguration.HedgingPolicy( maximumAttempts: 10, hedgingDelay: .seconds(1), nonFatalStatusCodes: [] ) - let defaultConfiguration = ClientRPCExecutionConfiguration(hedgingPolicy: policy) - var configurations = ClientRPCExecutionConfigurationCollection( - defaultConfiguration: defaultConfiguration - ) + let defaultConfiguration = GRPCClient.MethodConfiguration(hedgingPolicy: policy) + var configurations = GRPCClient.MethodConfigurationRegistry() + configurations.setOverallDefaultConfiguration(defaultConfiguration) var client = InProcessClientTransport(server: .init(), executionConfigurations: configurations) let firstDescriptor = MethodDescriptor(service: "test", method: "first") XCTAssertEqual(client.executionConfiguration(forMethod: firstDescriptor), defaultConfiguration) - let retryPolicy = RetryPolicy( + let retryPolicy = GRPCClient.MethodConfiguration.RetryPolicy( maximumAttempts: 10, initialBackoff: .seconds(1), maximumBackoff: .seconds(1), backoffMultiplier: 1.0, retryableStatusCodes: [.unavailable] ) - let overrideConfiguration = ClientRPCExecutionConfiguration(retryPolicy: retryPolicy) + let overrideConfiguration = GRPCClient.MethodConfiguration(retryPolicy: retryPolicy) configurations[firstDescriptor] = overrideConfiguration client = InProcessClientTransport(server: .init(), executionConfigurations: configurations) let secondDescriptor = MethodDescriptor(service: "test", method: "second") @@ -243,10 +242,10 @@ final class InProcessClientTransportTests: XCTestCase { } func makeClient( - configuration: ClientRPCExecutionConfiguration? = nil, + configuration: GRPCClient.MethodConfiguration? = nil, server: InProcessServerTransport = InProcessServerTransport() ) -> InProcessClientTransport { - let defaultPolicy = RetryPolicy( + let defaultPolicy = GRPCClient.MethodConfiguration.RetryPolicy( maximumAttempts: 10, initialBackoff: .seconds(1), maximumBackoff: .seconds(1), @@ -254,11 +253,13 @@ final class InProcessClientTransportTests: XCTestCase { retryableStatusCodes: [.unavailable] ) + var methodConfiguration = GRPCClient.MethodConfigurationRegistry() + methodConfiguration.setOverallDefaultConfiguration( + configuration ?? .init(retryPolicy: defaultPolicy) + ) return InProcessClientTransport( server: server, - executionConfigurations: .init( - defaultConfiguration: configuration ?? .init(retryPolicy: defaultPolicy) - ) + executionConfigurations: methodConfiguration ) } }