diff --git a/Sources/GRPCCore/Call/Client/ClientRPCExecutionConfigurationCollection.swift b/Sources/GRPCCore/Call/Client/ClientRPCExecutionConfigurationCollection.swift deleted file mode 100644 index 9207fd757..000000000 --- a/Sources/GRPCCore/Call/Client/ClientRPCExecutionConfigurationCollection.swift +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright 2023, gRPC Authors All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) - -/// A collection of ``ClientRPCExecutionConfiguration``s, mapped to specific methods or services. -/// -/// When creating a new instance, you must provide a default configuration to be used when getting -/// a configuration for a method that has not been given a specific override. -/// Use ``setDefaultConfiguration(_:forService:)`` to set a specific override for a whole -/// service. -/// -/// Use the subscript to get and set configurations for methods. -public struct ClientRPCExecutionConfigurationCollection: Sendable, Hashable { - private var elements: [MethodDescriptor: ClientRPCExecutionConfiguration] - private let defaultConfiguration: ClientRPCExecutionConfiguration - - public init( - defaultConfiguration: ClientRPCExecutionConfiguration = ClientRPCExecutionConfiguration( - executionPolicy: nil, - timeout: nil - ) - ) { - self.elements = [:] - self.defaultConfiguration = defaultConfiguration - } - - public subscript(_ descriptor: MethodDescriptor) -> ClientRPCExecutionConfiguration { - get { - if let methodLevelOverride = self.elements[descriptor] { - return methodLevelOverride - } - var serviceLevelDescriptor = descriptor - serviceLevelDescriptor.method = "" - return self.elements[serviceLevelDescriptor, default: self.defaultConfiguration] - } - - set { - precondition( - !descriptor.service.isEmpty, - "Method descriptor's service cannot be empty." - ) - - self.elements[descriptor] = newValue - } - } - - /// Set a default configuration for a service. - /// - /// If getting a configuration for a method that's part of a service, and the method itself doesn't have an - /// override, then this configuration will be used instead of the default configuration passed when creating - /// this instance of ``ClientRPCExecutionConfigurationCollection``. - /// - /// - Parameters: - /// - configuration: The default configuration for the service. - /// - service: The name of the service for which this override applies. - public mutating func setDefaultConfiguration( - _ configuration: ClientRPCExecutionConfiguration, - forService service: String - ) { - self[MethodDescriptor(service: service, method: "")] = configuration - } -} diff --git a/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor.swift b/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor.swift index 03e12c87a..d91ec3d46 100644 --- a/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor.swift +++ b/Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor.swift @@ -35,7 +35,7 @@ enum ClientRPCExecutor { static func execute( request: ClientRequest.Stream, method: MethodDescriptor, - configuration: ClientRPCExecutionConfiguration, + configuration: MethodConfiguration, serializer: some MessageSerializer, deserializer: some MessageDeserializer, transport: some ClientTransport, diff --git a/Sources/GRPCCore/Call/Server/RPCRouter.swift b/Sources/GRPCCore/Call/Server/RPCRouter.swift index 4bfe57c3c..f4a041c41 100644 --- a/Sources/GRPCCore/Call/Server/RPCRouter.swift +++ b/Sources/GRPCCore/Call/Server/RPCRouter.swift @@ -24,7 +24,7 @@ /// given method by calling ``removeHandler(forMethod:)``. /// /// In most cases you won't need to interact with the router directly. Instead you should register -/// your services with ``Server/Services-swift.struct/register(_:)`` which will in turn register +/// your services with ``GRPCServer/Services-swift.struct/register(_:)`` which will in turn register /// each method with the router. /// /// You may wish to not serve all methods from your service in which case you can either: diff --git a/Sources/GRPCCore/ClientError.swift b/Sources/GRPCCore/ClientError.swift new file mode 100644 index 000000000..77ad48719 --- /dev/null +++ b/Sources/GRPCCore/ClientError.swift @@ -0,0 +1,148 @@ +/* + * Copyright 2023, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/// A runtime error thrown by the client. +/// +/// In contrast to ``RPCError``, the ``ClientError`` represents errors which happen at a scope +/// wider than an individual RPC. For example, attempting to start a client which is already +/// stopped would result in a ``ClientError``. +public struct ClientError: Error, Hashable, @unchecked Sendable { + private var storage: Storage + + // Ensures the underlying storage is unique. + private mutating func ensureUniqueStorage() { + if !isKnownUniquelyReferenced(&self.storage) { + self.storage = self.storage.copy() + } + } + + /// The code indicating the domain of the error. + public var code: Code { + get { self.storage.code } + set { + self.ensureUniqueStorage() + self.storage.code = newValue + } + } + + /// A message providing more details about the error which may include details specific to this + /// instance of the error. + public var message: String { + get { self.storage.message } + set { + self.ensureUniqueStorage() + self.storage.message = newValue + } + } + + /// The original error which led to this error being thrown. + public var cause: Error? { + get { self.storage.cause } + set { + self.ensureUniqueStorage() + self.storage.cause = newValue + } + } + + /// Creates a new error. + /// + /// - Parameters: + /// - code: The error code. + /// - message: A description of the error. + /// - cause: The original error which led to this error being thrown. + public init(code: Code, message: String, cause: Error? = nil) { + self.storage = Storage(code: code, message: message, cause: cause) + } +} + +extension ClientError: CustomStringConvertible { + public var description: String { + if let cause = self.cause { + return "\(self.code): \"\(self.message)\" (cause: \"\(cause)\")" + } else { + return "\(self.code): \"\(self.message)\"" + } + } +} + +extension ClientError { + private final class Storage: Hashable { + var code: Code + var message: String + var cause: Error? + + init(code: Code, message: String, cause: Error?) { + self.code = code + self.message = message + self.cause = cause + } + + func copy() -> Storage { + return Storage(code: self.code, message: self.message, cause: self.cause) + } + + func hash(into hasher: inout Hasher) { + hasher.combine(self.code) + hasher.combine(self.message) + } + + static func == (lhs: Storage, rhs: Storage) -> Bool { + return lhs.code == rhs.code && lhs.message == rhs.message + } + } +} + +extension ClientError { + public struct Code: Hashable, Sendable { + private enum Value { + case clientIsAlreadyRunning + case clientIsNotRunning + case clientIsStopped + case transportError + } + + private var value: Value + private init(_ value: Value) { + self.value = value + } + + /// At attempt to start the client was made but it is already running. + public static var clientIsAlreadyRunning: Self { + Self(.clientIsAlreadyRunning) + } + + /// An attempt to start an RPC was made but the client is not running. + public static var clientIsNotRunning: Self { + Self(.clientIsNotRunning) + } + + /// At attempt to start the client was made but it has already stopped. + public static var clientIsStopped: Self { + Self(.clientIsStopped) + } + + /// The transport threw an error whilst connected. + public static var transportError: Self { + Self(.transportError) + } + } +} + +extension ClientError.Code: CustomStringConvertible { + public var description: String { + String(describing: self.value) + } +} diff --git a/Sources/GRPCCore/GRPCClient.swift b/Sources/GRPCCore/GRPCClient.swift new file mode 100644 index 000000000..848e01297 --- /dev/null +++ b/Sources/GRPCCore/GRPCClient.swift @@ -0,0 +1,473 @@ +/* + * Copyright 2023, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Atomics + +/// A gRPC client. +/// +/// A ``GRPCClient`` communicates to a server via a ``ClientTransport``. +/// +/// You can start RPCs to the server by calling the corresponding method: +/// - ``unary(request:descriptor:serializer:deserializer:handler:)`` +/// - ``clientStreaming(request:descriptor:serializer:deserializer:handler:)`` +/// - ``serverStreaming(request:descriptor:serializer:deserializer:handler:)`` +/// - ``bidirectionalStreaming(request:descriptor:serializer:deserializer:handler:)`` +/// +/// However, in most cases you should prefer wrapping the ``GRPCClient`` with a generated stub. +/// +/// You can set ``MethodConfiguration``s on this client to override whatever configurations have been +/// set on the given transport. You can also use ``ClientInterceptor``s to implement cross-cutting +/// logic which apply to all RPCs. Example uses of interceptors include authentication and logging. +/// +/// ## Creating and configuring a client +/// +/// The following example demonstrates how to create and configure a client. +/// +/// ```swift +/// // Create a configuration object for the client. +/// var configuration = GRPCClient.Configuration() +/// +/// // Create and add an interceptor. +/// configuration.interceptors.add(StatsRecordingClientInterceptor()) +/// +/// // Override the timeout for the 'Get' method on the 'echo.Echo' service. This configuration +/// // takes precedence over any set by the transport. +/// let echoGet = MethodDescriptor(service: "echo.Echo", method: "Get") +/// configuration.method.overrides[echoGet] = MethodConfiguration( +/// executionPolicy: nil, +/// timeout: .seconds(5) +/// ) +/// +/// // Configure a fallback timeout for all RPCs if no configuration is provided in the overrides +/// // or by the transport. +/// let defaultMethodConfiguration = MethodConfiguration(executionPolicy: nil, timeout: seconds(10)) +/// configuration.method.defaults.setDefaultConfiguration(defaultMethodConfiguration) +/// +/// // Finally create a transport and instantiate the client. +/// let inProcessServerTransport = InProcessServerTransport() +/// let inProcessClientTransport = InProcessClientTransport(serverTransport: inProcessServerTransport) +/// let client = GRPCClient(transport: inProcessClientTransport, configuration: configuration) +/// ``` +/// +/// ## Starting and stopping the client +/// +/// Once you have configured the client, call ``run()`` to start it. Calling ``run()`` instructs the +/// transport to start connecting to the server. +/// +/// ```swift +/// // Start running the client. 'run()' must be running while RPCs are execute so it's executed in +/// // a task group. +/// try await withThrowingTaskGroup(of: Void.self) { group in +/// group.addTask { +/// try await client.run() +/// } +/// +/// // Execute a request against the "echo.Echo" service. +/// try await client.unary( +/// request: ClientRequest.Single<[UInt8]>(message: [72, 101, 108, 108, 111, 33]), +/// descriptor: MethodDescriptor(service: "echo.Echo", method: "Get"), +/// serializer: IdentitySerializer(), +/// deserializer: IdentityDeserializer(), +/// ) { response in +/// print(response.message) +/// } +/// +/// // The RPC has completed, close the client. +/// client.close() +/// } +/// ``` +/// +/// The ``run()`` method won't return until the client has finished handling all requests. You can +/// signal to the client that it should stop creating new request streams by calling ``close()``. +/// This gives the client enough time to drain any requests already in flight. To stop the client +/// more abruptly you can cancel the task running your client. If your application requires +/// additional resources that need their lifecycles managed you should consider using [Swift Service +/// Lifecycle](https://github.com/swift-server/swift-service-lifecycle). +@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) +public struct GRPCClient: Sendable { + /// The transport which provides a bidirectional communication channel with the server. + private let transport: any ClientTransport + + /// The configuration used by the client. + public let configuration: Configuration + + /// The current state of the client. + private let state: ManagedAtomic + + /// The state of the client. + private enum State: UInt8, AtomicValue { + /// The client hasn't been started yet. Can transition to `running` or `stopped`. + case notStarted + /// The client is running and can send RPCs. Can transition to `stopping`. + case running + /// The client is stopping and no new RPCs will be sent. Existing RPCs may run to + /// completion. May transition to `stopped`. + case stopping + /// The client has stopped, no RPCs are in flight and no more will be accepted. This state + /// is terminal. + case stopped + } + + /// Creates a new client with the given transport and configuration. + /// + /// - Parameters: + /// - transport: The transport used to establish a communication channel with a server. + /// - configuration: Configuration for the client. + public init(transport: some ClientTransport, configuration: Configuration = Configuration()) { + self.transport = transport + self.configuration = configuration + self.state = ManagedAtomic(.notStarted) + } + + /// Start the client. + /// + /// This returns once ``close()`` has been called and all in-flight RPCs have finished executing. + /// If you need to abruptly stop all work you should cancel the task executing this method. + /// + /// The client, and by extension this function, can only be run once. If the client is already + /// running or has already been closed then a ``ClientError`` is thrown. + public func run() async throws { + let (wasNotStarted, original) = self.state.compareExchange( + expected: .notStarted, + desired: .running, + ordering: .sequentiallyConsistent + ) + + guard wasNotStarted else { + switch original { + case .notStarted: + // The value wasn't exchanged so the original value can't be 'notStarted'. + fatalError() + case .running: + throw ClientError( + code: .clientIsAlreadyRunning, + message: "The client is already running and can only be started once." + ) + case .stopping, .stopped: + throw ClientError( + code: .clientIsStopped, + message: "The client has stopped and can only be started once." + ) + } + } + + // When we exit this function we must have stopped. + defer { + self.state.store(.stopped, ordering: .sequentiallyConsistent) + } + + do { + try await self.transport.connect(lazily: false) + } catch { + throw ClientError( + code: .transportError, + message: "The transport threw an error while connected.", + cause: error + ) + } + } + + /// Close the client. + /// + /// The transport will be closed: this means that it will be given enough time to wait for + /// in-flight RPCs to finish executing, but no new RPCs will be accepted. You can cancel the task + /// executing ``run()`` if you want to abruptly stop in-flight RPCs. + public func close() { + while true { + let (wasRunning, actualState) = self.state.compareExchange( + expected: .running, + desired: .stopping, + ordering: .sequentiallyConsistent + ) + + // Transition from running to stopping: close the transport. + if wasRunning { + self.transport.close() + return + } + + // The expected state wasn't 'running'. There are two options: + // 1. The client isn't running yet. + // 2. The client is already stopping or stopped. + switch actualState { + case .notStarted: + // Not started: try going straight to stopped. + let (wasNotStarted, _) = self.state.compareExchange( + expected: .notStarted, + desired: .stopped, + ordering: .sequentiallyConsistent + ) + + // If the exchange happened then just return: the client wasn't started so there's no + // transport to start. + // + // If the exchange didn't happen then continue looping: the client must've been started by + // another thread. + if wasNotStarted { + return + } else { + continue + } + + case .running: + // Unreachable: the value was exchanged and this was the expected value. + fatalError() + + case .stopping, .stopped: + // No exchange happened but the client is already stopping. + return + } + } + } + + /// Executes a unary RPC. + /// + /// - Parameters: + /// - request: The unary request. + /// - descriptor: The method descriptor for which to execute this request. + /// - serializer: A request serializer. + /// - deserializer: A response deserializer. + /// - handler: A unary response handler. + /// + /// - Returns: The return value from the `handler`. + public func unary( + request: ClientRequest.Single, + descriptor: MethodDescriptor, + serializer: some MessageSerializer, + deserializer: some MessageDeserializer, + handler: @Sendable @escaping (ClientResponse.Single) async throws -> ReturnValue + ) async throws -> ReturnValue { + try await bidirectionalStreaming( + request: ClientRequest.Stream(single: request), + descriptor: descriptor, + serializer: serializer, + deserializer: deserializer + ) { stream in + let singleResponse = await ClientResponse.Single(stream: stream) + return try await handler(singleResponse) + } + } + + /// Start a client-streaming RPC. + /// + /// - Parameters: + /// - request: The request stream. + /// - descriptor: The method descriptor for which to execute this request. + /// - serializer: A request serializer. + /// - deserializer: A response deserializer. + /// - handler: A unary response handler. + /// + /// - Returns: The return value from the `handler`. + public func clientStreaming( + request: ClientRequest.Stream, + descriptor: MethodDescriptor, + serializer: some MessageSerializer, + deserializer: some MessageDeserializer, + handler: @Sendable @escaping (ClientResponse.Single) async throws -> ReturnValue + ) async throws -> ReturnValue { + try await bidirectionalStreaming( + request: request, + descriptor: descriptor, + serializer: serializer, + deserializer: deserializer + ) { stream in + let singleResponse = await ClientResponse.Single(stream: stream) + return try await handler(singleResponse) + } + } + + /// Start a server-streaming RPC. + /// + /// - Parameters: + /// - request: The unary request. + /// - descriptor: The method descriptor for which to execute this request. + /// - serializer: A request serializer. + /// - deserializer: A response deserializer. + /// - handler: A response stream handler. + /// + /// - Returns: The return value from the `handler`. + public func serverStreaming( + request: ClientRequest.Single, + descriptor: MethodDescriptor, + serializer: some MessageSerializer, + deserializer: some MessageDeserializer, + handler: @Sendable @escaping (ClientResponse.Stream) async throws -> ReturnValue + ) async throws -> ReturnValue { + try await bidirectionalStreaming( + request: ClientRequest.Stream(single: request), + descriptor: descriptor, + serializer: serializer, + deserializer: deserializer, + handler: handler + ) + } + + /// Start a bidirectional streaming RPC. + /// + /// - Note: ``run()`` must have been called and still executing, and ``close()`` mustn't + /// have been called. + /// + /// - Parameters: + /// - request: The streaming request. + /// - descriptor: The method descriptor for which to execute this request. + /// - serializer: A request serializer. + /// - deserializer: A response deserializer. + /// - handler: A response stream handler. + /// + /// - Returns: The return value from the `handler`. + public func bidirectionalStreaming( + request: ClientRequest.Stream, + descriptor: MethodDescriptor, + serializer: some MessageSerializer, + deserializer: some MessageDeserializer, + handler: @Sendable @escaping (ClientResponse.Stream) async throws -> ReturnValue + ) async throws -> ReturnValue { + switch self.state.load(ordering: .sequentiallyConsistent) { + case .running: + () + case .notStarted: + throw ClientError( + code: .clientIsNotRunning, + message: "Client must be running to make an RPC: call run() first." + ) + case .stopping, .stopped: + throw ClientError( + code: .clientIsStopped, + message: "Client has been stopped. Can't make any more RPCs." + ) + } + + return try await ClientRPCExecutor.execute( + request: request, + method: descriptor, + configuration: self.resolveMethodConfiguration(for: descriptor), + serializer: serializer, + deserializer: deserializer, + transport: self.transport, + interceptors: self.configuration.interceptors.values, + handler: handler + ) + } + + private func resolveMethodConfiguration(for descriptor: MethodDescriptor) -> MethodConfiguration { + if let configuration = self.configuration.method.overrides[descriptor] { + return configuration + } + + if let configuration = self.transport.executionConfiguration(forMethod: descriptor) { + return configuration + } + + if let configuration = self.configuration.method.defaults[descriptor] { + return configuration + } + + // No configuration found, return the "vanilla" configuration. + return MethodConfiguration(executionPolicy: nil, timeout: nil) + } +} + +@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) +extension GRPCClient { + public struct Configuration: Sendable { + /// A collection of interceptors providing cross-cutting functionality to each accepted RPC. + /// + /// The order in which interceptors are added reflects the order in which they are called. The + /// first interceptor added will be the first interceptor to intercept each request. The last + /// interceptor added will be the final interceptor to intercept each request before calling + /// the appropriate handler. + public var interceptors: Interceptors + + /// Configuration for how methods are executed. + /// + /// Method configuration determines how each RPC is executed by the client. Some services and + /// transports provide this information to the client when the server name is resolved. However, + /// you override this configuration and set default values should no override be set and the + /// transport doesn't provide a value. + public var method: Method + + /// Creates a new default configuration. + public init() { + self.interceptors = Interceptors() + self.method = Method() + } + } +} + +@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) +extension GRPCClient.Configuration { + /// A collection of ``ClientInterceptor`` implementations which are applied to all accepted + /// RPCs. + /// + /// RPCs are intercepted in the order that interceptors are added. That is, a request sent from the client to + /// the server will first be intercepted by the first added interceptor followed by the second, and so on. + /// For responses from the server, they'll be applied in the opposite order. + public struct Interceptors: Sendable { + private(set) var values: [any ClientInterceptor] = [] + + /// Add an interceptor to the client. + /// + /// The order in which interceptors are added reflects the order in which they are called. The + /// first interceptor added will be the first interceptor to intercept each request. The last + /// interceptor added will be the final interceptor to intercept each request before calling + /// the appropriate handler. + /// + /// - Parameter interceptor: The interceptor to add. + public mutating func add(_ interceptor: some ClientInterceptor) { + self.values.append(interceptor) + } + + /// Adds a sequence of interceptor to the client. + /// + /// The order in which interceptors are added reflects the order in which they are called. The + /// first interceptor added will be the first interceptor to intercept each request. The last + /// interceptor added will be the final interceptor to intercept each request before calling + /// the appropriate handler. + /// + /// - Parameter interceptors: The interceptors to add. + public mutating func add(contentsOf interceptors: some Sequence) { + self.values.append(contentsOf: interceptors) + } + } + + /// Configuration for how methods should be executed. + /// + /// In most cases the client should defer to the configuration provided by the transport as this + /// can be provided to the transport as part of name resolution when establishing a connection. + /// + /// The client first checks ``overrides`` for a configuration, followed by the transport, followed + /// by ``defaults``. + public struct Method: Sendable, Hashable { + /// Configuration to use in precedence to that provided by the transport. + public var overrides: MethodConfigurations + + /// Configuration to use only if there are no overrides and the transport doesn't specify + /// any configuration. + public var defaults: MethodConfigurations + + public init() { + self.overrides = MethodConfigurations() + self.defaults = MethodConfigurations() + } + } +} + +@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) +extension GRPCClient.Configuration.Interceptors: CustomStringConvertible { + public var description: String { + return String(describing: self.values.map { String(describing: type(of: $0)) }) + } +} diff --git a/Sources/GRPCCore/GRPCServer.swift b/Sources/GRPCCore/GRPCServer.swift index 07c92715e..626816f9b 100644 --- a/Sources/GRPCCore/GRPCServer.swift +++ b/Sources/GRPCCore/GRPCServer.swift @@ -23,7 +23,7 @@ import Atomics /// streams to a service to handle the RPC or rejects them with an appropriate error if no service /// can handle the RPC. /// -/// A ``Server`` may listen with multiple transports (for example, HTTP/2 and in-process) and route +/// A ``GRPCServer`` may listen with multiple transports (for example, HTTP/2 and in-process) and route /// requests from each transport to the same service instance. You can also use "interceptors", /// to implement cross-cutting logic which apply to all accepted RPCs. Example uses of interceptors /// include request filtering, authentication, and logging. Once requests have been intercepted diff --git a/Sources/GRPCCore/Call/Client/ClientRPCExecutionConfiguration.swift b/Sources/GRPCCore/MethodConfiguration.swift similarity index 98% rename from Sources/GRPCCore/Call/Client/ClientRPCExecutionConfiguration.swift rename to Sources/GRPCCore/MethodConfiguration.swift index b771e0b76..14d2f717f 100644 --- a/Sources/GRPCCore/Call/Client/ClientRPCExecutionConfiguration.swift +++ b/Sources/GRPCCore/MethodConfiguration.swift @@ -16,7 +16,7 @@ /// Configuration values for executing an RPC. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) -public struct ClientRPCExecutionConfiguration: Hashable, Sendable { +public struct MethodConfiguration: Hashable, Sendable { /// The default timeout for the RPC. /// /// If no reply is received in the specified amount of time the request is aborted @@ -88,7 +88,7 @@ public struct ClientRPCExecutionConfiguration: Hashable, Sendable { } @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) -extension ClientRPCExecutionConfiguration { +extension MethodConfiguration { /// The execution policy for an RPC. public enum ExecutionPolicy: Hashable, Sendable { /// Policy for retrying an RPC. diff --git a/Sources/GRPCCore/MethodConfigurations.swift b/Sources/GRPCCore/MethodConfigurations.swift new file mode 100644 index 000000000..c7e741222 --- /dev/null +++ b/Sources/GRPCCore/MethodConfigurations.swift @@ -0,0 +1,92 @@ +/* + * Copyright 2023, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/// A collection of ``MethodConfiguration``s, mapped to specific methods or services. +/// +/// When creating a new instance, no overrides and no default will be set for using when getting +/// a configuration for a method that has not been given a specific override. +/// Use ``setDefaultConfiguration(_:forService:)`` to set a specific override for a whole +/// service, or set a default configuration for all methods by calling ``setDefaultConfiguration(_:)``. +/// +/// Use the subscript to get and set configurations for specific methods. +@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) +public struct MethodConfigurations: Sendable, Hashable { + private var elements: [MethodDescriptor: MethodConfiguration] + + /// Create a new ``MethodConfigurations`` with no overrides and no default configuration. + public init() { + self.elements = [:] + } + + /// Get or set the corresponding ``MethodConfiguration`` for the given ``MethodDescriptor``. + /// + /// Configuration is hierarchical and can be set per-method, per-service + /// (``setDefaultConfiguration(_:forService:)``) and globally (``setDefaultConfiguration(_:)``). + /// This subscript sets the per-method configuration but retrieves a configuration respecting + /// the hierarchy. If no per-method configuration is present, the per-service configuration is + /// checked and returned if present. If the per-service configuration isn't present then the + /// global configuration is returned, if present. + /// + /// - Parameters: + /// - descriptor: The ``MethodDescriptor`` for which to get or set a ``MethodConfiguration``. + public subscript(_ descriptor: MethodDescriptor) -> MethodConfiguration? { + get { + if let configuration = self.elements[descriptor] { + return configuration + } + + // Check if the config is set at the service level by clearing the method. + var descriptor = descriptor + descriptor.method = "" + + if let configuration = self.elements[descriptor] { + return configuration + } + + // Check if the config is set at the global level by clearing the service and method. + descriptor.service = "" + return self.elements[descriptor] + } + + set { + self.elements[descriptor] = newValue + } + } + + /// Set a default configuration for all methods that have no overrides. + /// + /// - Parameter configuration: The default configuration. + public mutating func setDefaultConfiguration(_ configuration: MethodConfiguration?) { + let descriptor = MethodDescriptor(service: "", method: "") + self.elements[descriptor] = configuration + } + + /// Set a default configuration for a service. + /// + /// If getting a configuration for a method that's part of a service, and the method itself doesn't have an + /// override, then this configuration will be used instead of the default configuration passed when creating + /// this instance of ``MethodConfigurations``. + /// + /// - Parameters: + /// - configuration: The default configuration for the service. + /// - service: The name of the service for which this override applies. + public mutating func setDefaultConfiguration( + _ configuration: MethodConfiguration?, + forService service: String + ) { + self.elements[MethodDescriptor(service: service, method: "")] = configuration + } +} diff --git a/Sources/GRPCCore/Transport/ClientTransport.swift b/Sources/GRPCCore/Transport/ClientTransport.swift index 6e7d20bcb..913712ea7 100644 --- a/Sources/GRPCCore/Transport/ClientTransport.swift +++ b/Sources/GRPCCore/Transport/ClientTransport.swift @@ -16,8 +16,8 @@ @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) public protocol ClientTransport: Sendable { - associatedtype Inbound: (AsyncSequence & Sendable) where Inbound.Element == RPCResponsePart - associatedtype Outbound: ClosableRPCWriterProtocol + typealias Inbound = RPCAsyncSequence + typealias Outbound = RPCWriter.Closable /// Returns a throttle which gRPC uses to determine whether retries can be executed. /// @@ -77,5 +77,5 @@ public protocol ClientTransport: Sendable { /// - Returns: Execution configuration for the method, if it exists. func executionConfiguration( forMethod descriptor: MethodDescriptor - ) -> ClientRPCExecutionConfiguration? + ) -> MethodConfiguration? } diff --git a/Sources/GRPCCore/Transport/InProcessClientTransport.swift b/Sources/GRPCCore/Transport/InProcessClientTransport.swift index 47b1bbf73..d3cf123c5 100644 --- a/Sources/GRPCCore/Transport/InProcessClientTransport.swift +++ b/Sources/GRPCCore/Transport/InProcessClientTransport.swift @@ -96,12 +96,12 @@ public struct InProcessClientTransport: ClientTransport { public let retryThrottle: RetryThrottle - private let executionConfigurations: ClientRPCExecutionConfigurationCollection + private let executionConfigurations: MethodConfigurations private let state: LockedValueBox public init( server: InProcessServerTransport, - executionConfigurations: ClientRPCExecutionConfigurationCollection + executionConfigurations: MethodConfigurations ) { self.retryThrottle = RetryThrottle(maximumTokens: 10, tokenRatio: 0.1) self.executionConfigurations = executionConfigurations @@ -329,7 +329,7 @@ public struct InProcessClientTransport: ClientTransport { /// - Returns: Execution configuration for the method, if it exists. public func executionConfiguration( forMethod descriptor: MethodDescriptor - ) -> ClientRPCExecutionConfiguration? { + ) -> MethodConfiguration? { self.executionConfigurations[descriptor] } } diff --git a/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness+Transport.swift b/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness+Transport.swift index 42529470e..abaa8e6ef 100644 --- a/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness+Transport.swift +++ b/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness+Transport.swift @@ -24,15 +24,7 @@ extension InProcessServerTransport { ) -> InProcessClientTransport { return InProcessClientTransport( server: self, - executionConfigurations: .init( - defaultConfiguration: .init( - hedgingPolicy: .init( - maximumAttempts: 2, - hedgingDelay: .milliseconds(100), - nonFatalStatusCodes: [] - ) - ) - ) + executionConfigurations: .init() ) } } diff --git a/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness.swift b/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness.swift index 3b546d1e0..3d22f454b 100644 --- a/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness.swift +++ b/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness.swift @@ -67,7 +67,7 @@ struct ClientRPCExecutorTestHarness { func unary( request: ClientRequest.Single<[UInt8]>, - configuration: ClientRPCExecutionConfiguration? = nil, + configuration: MethodConfiguration? = nil, handler: @escaping @Sendable (ClientResponse.Single<[UInt8]>) async throws -> Void ) async throws { try await self.bidirectional( @@ -80,7 +80,7 @@ struct ClientRPCExecutorTestHarness { func clientStreaming( request: ClientRequest.Stream<[UInt8]>, - configuration: ClientRPCExecutionConfiguration? = nil, + configuration: MethodConfiguration? = nil, handler: @escaping @Sendable (ClientResponse.Single<[UInt8]>) async throws -> Void ) async throws { try await self.bidirectional( @@ -93,7 +93,7 @@ struct ClientRPCExecutorTestHarness { func serverStreaming( request: ClientRequest.Single<[UInt8]>, - configuration: ClientRPCExecutionConfiguration? = nil, + configuration: MethodConfiguration? = nil, handler: @escaping @Sendable (ClientResponse.Stream<[UInt8]>) async throws -> Void ) async throws { try await self.bidirectional( @@ -106,7 +106,7 @@ struct ClientRPCExecutorTestHarness { func bidirectional( request: ClientRequest.Stream<[UInt8]>, - configuration: ClientRPCExecutionConfiguration? = nil, + configuration: MethodConfiguration? = nil, handler: @escaping @Sendable (ClientResponse.Stream<[UInt8]>) async throws -> Void ) async throws { try await self.execute( @@ -122,7 +122,7 @@ struct ClientRPCExecutorTestHarness { request: ClientRequest.Stream, serializer: some MessageSerializer, deserializer: some MessageDeserializer, - configuration: ClientRPCExecutionConfiguration?, + configuration: MethodConfiguration?, handler: @escaping @Sendable (ClientResponse.Stream) async throws -> Void ) async throws { try await withThrowingTaskGroup(of: Void.self) { group in @@ -141,11 +141,14 @@ struct ClientRPCExecutorTestHarness { try await self.clientTransport.connect(lazily: false) } - let executionConfiguration: ClientRPCExecutionConfiguration + let executionConfiguration: MethodConfiguration if let configuration = configuration { executionConfiguration = configuration } else { - executionConfiguration = ClientRPCExecutionConfiguration(executionPolicy: nil, timeout: nil) + executionConfiguration = MethodConfiguration( + executionPolicy: nil, + timeout: nil + ) } // Execute the request. diff --git a/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTests+Hedging.swift b/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTests+Hedging.swift index ab6a6fca5..79663bfee 100644 --- a/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTests+Hedging.swift +++ b/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTests+Hedging.swift @@ -187,7 +187,7 @@ extension ClientRPCExecutorTests { } @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) -extension ClientRPCExecutionConfiguration { +extension MethodConfiguration { fileprivate static func hedge( maximumAttempts: Int = 5, delay: Duration = .milliseconds(25), @@ -200,6 +200,6 @@ extension ClientRPCExecutionConfiguration { nonFatalStatusCodes: nonFatalCodes ) - return ClientRPCExecutionConfiguration(hedgingPolicy: policy, timeout: timeout) + return Self(hedgingPolicy: policy, timeout: timeout) } } diff --git a/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTests+Retries.swift b/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTests+Retries.swift index a313148ea..335044e26 100644 --- a/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTests+Retries.swift +++ b/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTests+Retries.swift @@ -286,7 +286,7 @@ extension ClientRPCExecutorTests { } @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) -extension ClientRPCExecutionConfiguration { +extension MethodConfiguration { fileprivate static func retry( maximumAttempts: Int = 5, codes: Set, @@ -300,6 +300,6 @@ extension ClientRPCExecutionConfiguration { retryableStatusCodes: codes ) - return ClientRPCExecutionConfiguration(retryPolicy: policy, timeout: timeout) + return Self(retryPolicy: policy, timeout: timeout) } } diff --git a/Tests/GRPCCoreTests/GRPCClientTests.swift b/Tests/GRPCCoreTests/GRPCClientTests.swift new file mode 100644 index 000000000..204c6579e --- /dev/null +++ b/Tests/GRPCCoreTests/GRPCClientTests.swift @@ -0,0 +1,448 @@ +/* + * Copyright 2023, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import Atomics +import GRPCCore +import XCTest + +@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) +final class GRPCClientTests: XCTestCase { + func makeInProcessPair() -> (client: InProcessClientTransport, server: InProcessServerTransport) { + let server = InProcessServerTransport() + let client = InProcessClientTransport( + server: server, + executionConfigurations: MethodConfigurations() + ) + + return (client, server) + } + + func withInProcessConnectedClient( + services: [any RegistrableRPCService], + interceptors: [any ClientInterceptor] = [], + _ body: (GRPCClient, GRPCServer) async throws -> Void + ) async throws { + let inProcess = self.makeInProcessPair() + var configuration = GRPCClient.Configuration() + configuration.interceptors.add(contentsOf: interceptors) + let client = GRPCClient(transport: inProcess.client, configuration: configuration) + + let server = GRPCServer() + server.transports.add(inProcess.server) + + for service in services { + server.services.register(service) + } + + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await server.run() + } + + group.addTask { + try await client.run() + } + + // Make sure both server and client are running + try await Task.sleep(for: .milliseconds(100)) + try await body(client, server) + client.close() + server.stopListening() + } + } + + struct IdentitySerializer: MessageSerializer { + typealias Message = [UInt8] + + func serialize(_ message: [UInt8]) throws -> [UInt8] { + return message + } + } + + struct IdentityDeserializer: MessageDeserializer { + typealias Message = [UInt8] + + func deserialize(_ serializedMessageBytes: [UInt8]) throws -> [UInt8] { + return serializedMessageBytes + } + } + + func testUnary() async throws { + try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in + try await client.unary( + request: .init(message: [3, 1, 4, 1, 5]), + descriptor: BinaryEcho.Methods.collect, + serializer: IdentitySerializer(), + deserializer: IdentityDeserializer() + ) { response in + let message = try response.message + XCTAssertEqual(message, [3, 1, 4, 1, 5]) + } + } + } + + func testClientStreaming() async throws { + try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in + try await client.clientStreaming( + request: .init(producer: { writer in + for byte in [3, 1, 4, 1, 5] as [UInt8] { + try await writer.write([byte]) + } + }), + descriptor: BinaryEcho.Methods.collect, + serializer: IdentitySerializer(), + deserializer: IdentityDeserializer() + ) { response in + let message = try response.message + XCTAssertEqual(message, [3, 1, 4, 1, 5]) + } + } + } + + func testServerStreaming() async throws { + try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in + try await client.serverStreaming( + request: .init(message: [3, 1, 4, 1, 5]), + descriptor: BinaryEcho.Methods.expand, + serializer: IdentitySerializer(), + deserializer: IdentityDeserializer() + ) { response in + var responseParts = response.messages.makeAsyncIterator() + for byte in [3, 1, 4, 1, 5] as [UInt8] { + let message = try await responseParts.next() + XCTAssertEqual(message, [byte]) + } + } + } + } + + func testBidirectionalStreaming() async throws { + try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in + try await client.bidirectionalStreaming( + request: .init(producer: { writer in + for byte in [3, 1, 4, 1, 5] as [UInt8] { + try await writer.write([byte]) + } + }), + descriptor: BinaryEcho.Methods.update, + serializer: IdentitySerializer(), + deserializer: IdentityDeserializer() + ) { response in + var responseParts = response.messages.makeAsyncIterator() + for byte in [3, 1, 4, 1, 5] as [UInt8] { + let message = try await responseParts.next() + XCTAssertEqual(message, [byte]) + } + } + } + } + + func testUnimplementedMethod_Unary() async throws { + try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in + try await client.unary( + request: .init(message: [3, 1, 4, 1, 5]), + descriptor: MethodDescriptor(service: "not", method: "implemented"), + serializer: IdentitySerializer(), + deserializer: IdentityDeserializer() + ) { response in + XCTAssertThrowsRPCError(try response.accepted.get()) { error in + XCTAssertEqual(error.code, .unimplemented) + } + } + } + } + + func testUnimplementedMethod_ClientStreaming() async throws { + try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in + try await client.clientStreaming( + request: .init(producer: { writer in + for byte in [3, 1, 4, 1, 5] as [UInt8] { + try await writer.write([byte]) + } + }), + descriptor: MethodDescriptor(service: "not", method: "implemented"), + serializer: IdentitySerializer(), + deserializer: IdentityDeserializer() + ) { response in + XCTAssertThrowsRPCError(try response.accepted.get()) { error in + XCTAssertEqual(error.code, .unimplemented) + } + } + } + } + + func testUnimplementedMethod_ServerStreaming() async throws { + try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in + try await client.serverStreaming( + request: .init(message: [3, 1, 4, 1, 5]), + descriptor: MethodDescriptor(service: "not", method: "implemented"), + serializer: IdentitySerializer(), + deserializer: IdentityDeserializer() + ) { response in + XCTAssertThrowsRPCError(try response.accepted.get()) { error in + XCTAssertEqual(error.code, .unimplemented) + } + } + } + } + + func testUnimplementedMethod_BidirectionalStreaming() async throws { + try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in + try await client.bidirectionalStreaming( + request: .init(producer: { writer in + for byte in [3, 1, 4, 1, 5] as [UInt8] { + try await writer.write([byte]) + } + }), + descriptor: MethodDescriptor(service: "not", method: "implemented"), + serializer: IdentitySerializer(), + deserializer: IdentityDeserializer() + ) { response in + XCTAssertThrowsRPCError(try response.accepted.get()) { error in + XCTAssertEqual(error.code, .unimplemented) + } + } + } + } + + func testMultipleConcurrentRequests() async throws { + try await self.withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in + await withThrowingTaskGroup(of: Void.self) { group in + for i in UInt8.min ..< UInt8.max { + group.addTask { + try await client.unary( + request: .init(message: [i]), + descriptor: BinaryEcho.Methods.collect, + serializer: IdentitySerializer(), + deserializer: IdentityDeserializer() + ) { response in + let message = try response.message + XCTAssertEqual(message, [i]) + } + } + } + } + } + } + + func testInterceptorsAreAppliedInOrder() async throws { + let counter1 = ManagedAtomic(0) + let counter2 = ManagedAtomic(0) + + try await self.withInProcessConnectedClient( + services: [BinaryEcho()], + interceptors: [ + .requestCounter(counter1), + .rejectAll(with: RPCError(code: .unavailable, message: "")), + .requestCounter(counter2), + ] + ) { client, _ in + try await client.unary( + request: .init(message: [3, 1, 4, 1, 5]), + descriptor: BinaryEcho.Methods.collect, + serializer: IdentitySerializer(), + deserializer: IdentityDeserializer() + ) { response in + XCTAssertRejected(response) { error in + XCTAssertEqual(error.code, .unavailable) + } + } + } + + XCTAssertEqual(counter1.load(ordering: .sequentiallyConsistent), 1) + XCTAssertEqual(counter2.load(ordering: .sequentiallyConsistent), 0) + } + + func testNoNewRPCsAfterClientClose() async throws { + try await withInProcessConnectedClient(services: [BinaryEcho()]) { client, _ in + // Run an RPC so we know the client is running properly. + try await client.unary( + request: .init(message: [3, 1, 4, 1, 5]), + descriptor: BinaryEcho.Methods.collect, + serializer: IdentitySerializer(), + deserializer: IdentityDeserializer() + ) { response in + let message = try response.message + XCTAssertEqual(message, [3, 1, 4, 1, 5]) + } + + // New RPCs should fail immediately after this. + client.close() + + // RPC should fail now. + await XCTAssertThrowsErrorAsync(ofType: ClientError.self) { + try await client.unary( + request: .init(message: [3, 1, 4, 1, 5]), + descriptor: BinaryEcho.Methods.collect, + serializer: IdentitySerializer(), + deserializer: IdentityDeserializer() + ) { _ in } + } errorHandler: { error in + XCTAssertEqual(error.code, .clientIsStopped) + } + } + } + + func testInFlightRPCsCanContinueAfterClientIsClosed() async throws { + try await withInProcessConnectedClient(services: [BinaryEcho()]) { client, server in + try await client.clientStreaming( + request: .init(producer: { writer in + + // Close the client once this RCP has been started. + client.close() + + // Attempts to start a new RPC should fail. + await XCTAssertThrowsErrorAsync(ofType: ClientError.self) { + try await client.unary( + request: .init(message: [3, 1, 4, 1, 5]), + descriptor: BinaryEcho.Methods.collect, + serializer: IdentitySerializer(), + deserializer: IdentityDeserializer() + ) { _ in } + } errorHandler: { error in + XCTAssertEqual(error.code, .clientIsStopped) + } + + // Now write to the already opened stream to confirm that opened streams + // can successfully run to completion. + for byte in [3, 1, 4, 1, 5] as [UInt8] { + try await writer.write([byte]) + } + }), + descriptor: BinaryEcho.Methods.collect, + serializer: IdentitySerializer(), + deserializer: IdentityDeserializer() + ) { response in + let message = try response.message + XCTAssertEqual(message, [3, 1, 4, 1, 5]) + } + } + } + + func testCancelRunningClient() async throws { + let inProcess = self.makeInProcessPair() + let client = GRPCClient(transport: inProcess.client) + + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + let server = GRPCServer() + server.services.register(BinaryEcho()) + server.transports.add(inProcess.server) + try await server.run() + } + + group.addTask { + try await client.run() + } + + // Wait for client and server to be running. + try await Task.sleep(for: .milliseconds(10)) + + let task = Task { + try await client.clientStreaming( + request: .init(producer: { writer in + try await Task.sleep(for: .seconds(5)) + }), + descriptor: BinaryEcho.Methods.collect, + serializer: IdentitySerializer(), + deserializer: IdentityDeserializer() + ) { response in + XCTAssertRejected(response) { error in + XCTAssertEqual(error.code, .unknown) + } + } + } + + // Check requests are getting through. + try await client.unary( + request: .init(message: [3, 1, 4, 1, 5]), + descriptor: BinaryEcho.Methods.collect, + serializer: IdentitySerializer(), + deserializer: IdentityDeserializer() + ) { response in + let message = try response.message + XCTAssertEqual(message, [3, 1, 4, 1, 5]) + } + + task.cancel() + try await task.value + group.cancelAll() + } + } + + func testRunStoppedClient() async throws { + let (clientTransport, _) = self.makeInProcessPair() + let client = GRPCClient(transport: clientTransport) + // Run the client. + let task = Task { try await client.run() } + task.cancel() + try await task.value + + // Client is stopped, should throw an error. + await XCTAssertThrowsErrorAsync(ofType: ClientError.self) { + try await client.run() + } errorHandler: { error in + XCTAssertEqual(error.code, .clientIsStopped) + } + } + + func testRunAlreadyRunningClient() async throws { + let (clientTransport, _) = self.makeInProcessPair() + let client = GRPCClient(transport: clientTransport) + // Run the client. + let task = Task { try await client.run() } + // Make sure the client is run for the first time here. + try await Task.sleep(for: .milliseconds(10)) + + // Client is already running, should throw an error. + await XCTAssertThrowsErrorAsync(ofType: ClientError.self) { + try await client.run() + } errorHandler: { error in + XCTAssertEqual(error.code, .clientIsAlreadyRunning) + } + + task.cancel() + } + + func testRunClientNotRunning() async throws { + let (clientTransport, _) = self.makeInProcessPair() + let client = GRPCClient(transport: clientTransport) + + // Client is not running, should throw an error. + await XCTAssertThrowsErrorAsync(ofType: ClientError.self) { + try await client.unary( + request: .init(message: [3, 1, 4, 1, 5]), + descriptor: BinaryEcho.Methods.collect, + serializer: IdentitySerializer(), + deserializer: IdentityDeserializer() + ) { response in + let message = try response.message + XCTAssertEqual(message, [3, 1, 4, 1, 5]) + } + } errorHandler: { error in + XCTAssertEqual(error.code, .clientIsNotRunning) + } + } + + func testInterceptorsDescription() async throws { + var config = GRPCClient.Configuration() + config.interceptors.add(.rejectAll(with: .init(code: .aborted, message: ""))) + config.interceptors.add(.requestCounter(.init(0))) + + let description = String(describing: config.interceptors) + let expected = #"["RejectAllClientInterceptor", "RequestCountingClientInterceptor"]"# + XCTAssertEqual(description, expected) + } +} diff --git a/Tests/GRPCCoreTests/GRPCServerTests.swift b/Tests/GRPCCoreTests/GRPCServerTests.swift index e99a46914..3bc4a6a8c 100644 --- a/Tests/GRPCCoreTests/GRPCServerTests.swift +++ b/Tests/GRPCCoreTests/GRPCServerTests.swift @@ -23,7 +23,7 @@ final class GRPCServerTests: XCTestCase { let server = InProcessServerTransport() let client = InProcessClientTransport( server: server, - executionConfigurations: ClientRPCExecutionConfigurationCollection() + executionConfigurations: MethodConfigurations() ) return (client, server) @@ -59,7 +59,6 @@ final class GRPCServerTests: XCTestCase { inProcess.client.close() server.stopListening() } - } func testServerHandlesUnary() async throws { diff --git a/Tests/GRPCCoreTests/Call/Client/ClientRPCExecutionConfigurationTests.swift b/Tests/GRPCCoreTests/MethodConfigurationTests.swift similarity index 96% rename from Tests/GRPCCoreTests/Call/Client/ClientRPCExecutionConfigurationTests.swift rename to Tests/GRPCCoreTests/MethodConfigurationTests.swift index 768b808b2..8104b9fd6 100644 --- a/Tests/GRPCCoreTests/Call/Client/ClientRPCExecutionConfigurationTests.swift +++ b/Tests/GRPCCoreTests/MethodConfigurationTests.swift @@ -17,7 +17,7 @@ import GRPCCore import XCTest @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) -final class ClientRPCExecutionConfigurationTests: XCTestCase { +final class MethodConfigurationTests: XCTestCase { func testRetryPolicyClampsMaxAttempts() { var policy = RetryPolicy( maximumAttempts: 10, diff --git a/Tests/GRPCCoreTests/Call/Client/ClientRPCExecutionConfigurationCollectionTests.swift b/Tests/GRPCCoreTests/MethodConfigurationsTests.swift similarity index 72% rename from Tests/GRPCCoreTests/Call/Client/ClientRPCExecutionConfigurationCollectionTests.swift rename to Tests/GRPCCoreTests/MethodConfigurationsTests.swift index 38f964652..0d8e88829 100644 --- a/Tests/GRPCCoreTests/Call/Client/ClientRPCExecutionConfigurationCollectionTests.swift +++ b/Tests/GRPCCoreTests/MethodConfigurationsTests.swift @@ -17,17 +17,16 @@ import GRPCCore import XCTest @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) -final class ClientRPCExecutionConfigurationCollectionTests: XCTestCase { - func testGetConfigurationForKnownMethod() { +final class MethodConfigurationsTests: XCTestCase { + func testGetConfigurationForKnownMethod() async throws { let policy = HedgingPolicy( maximumAttempts: 10, hedgingDelay: .seconds(1), nonFatalStatusCodes: [] ) - let defaultConfiguration = ClientRPCExecutionConfiguration(hedgingPolicy: policy) - var configurations = ClientRPCExecutionConfigurationCollection( - defaultConfiguration: defaultConfiguration - ) + let defaultConfiguration = MethodConfiguration(hedgingPolicy: policy) + var configurations = MethodConfigurations() + configurations.setDefaultConfiguration(defaultConfiguration) let descriptor = MethodDescriptor(service: "test", method: "first") let retryPolicy = RetryPolicy( maximumAttempts: 10, @@ -36,7 +35,7 @@ final class ClientRPCExecutionConfigurationCollectionTests: XCTestCase { backoffMultiplier: 1.0, retryableStatusCodes: [.unavailable] ) - let overrideConfiguration = ClientRPCExecutionConfiguration(retryPolicy: retryPolicy) + let overrideConfiguration = MethodConfiguration(retryPolicy: retryPolicy) configurations[descriptor] = overrideConfiguration XCTAssertEqual(configurations[descriptor], overrideConfiguration) @@ -48,10 +47,9 @@ final class ClientRPCExecutionConfigurationCollectionTests: XCTestCase { hedgingDelay: .seconds(1), nonFatalStatusCodes: [] ) - let defaultConfiguration = ClientRPCExecutionConfiguration(hedgingPolicy: policy) - var configurations = ClientRPCExecutionConfigurationCollection( - defaultConfiguration: defaultConfiguration - ) + let defaultConfiguration = MethodConfiguration(hedgingPolicy: policy) + var configurations = MethodConfigurations() + configurations.setDefaultConfiguration(defaultConfiguration) let firstDescriptor = MethodDescriptor(service: "test", method: "") let retryPolicy = RetryPolicy( maximumAttempts: 10, @@ -60,7 +58,7 @@ final class ClientRPCExecutionConfigurationCollectionTests: XCTestCase { backoffMultiplier: 1.0, retryableStatusCodes: [.unavailable] ) - let overrideConfiguration = ClientRPCExecutionConfiguration(retryPolicy: retryPolicy) + let overrideConfiguration = MethodConfiguration(retryPolicy: retryPolicy) configurations[firstDescriptor] = overrideConfiguration let secondDescriptor = MethodDescriptor(service: "test", method: "second") @@ -73,10 +71,9 @@ final class ClientRPCExecutionConfigurationCollectionTests: XCTestCase { hedgingDelay: .seconds(1), nonFatalStatusCodes: [] ) - let defaultConfiguration = ClientRPCExecutionConfiguration(hedgingPolicy: policy) - var configurations = ClientRPCExecutionConfigurationCollection( - defaultConfiguration: defaultConfiguration - ) + let defaultConfiguration = MethodConfiguration(hedgingPolicy: policy) + var configurations = MethodConfigurations() + configurations.setDefaultConfiguration(defaultConfiguration) let firstDescriptor = MethodDescriptor(service: "test1", method: "first") let retryPolicy = RetryPolicy( maximumAttempts: 10, @@ -85,7 +82,7 @@ final class ClientRPCExecutionConfigurationCollectionTests: XCTestCase { backoffMultiplier: 1.0, retryableStatusCodes: [.unavailable] ) - let overrideConfiguration = ClientRPCExecutionConfiguration(retryPolicy: retryPolicy) + let overrideConfiguration = MethodConfiguration(retryPolicy: retryPolicy) configurations[firstDescriptor] = overrideConfiguration let secondDescriptor = MethodDescriptor(service: "test2", method: "second") diff --git a/Tests/GRPCCoreTests/Test Utilities/Call/Client/ClientInterceptors.swift b/Tests/GRPCCoreTests/Test Utilities/Call/Client/ClientInterceptors.swift new file mode 100644 index 000000000..e89681d58 --- /dev/null +++ b/Tests/GRPCCoreTests/Test Utilities/Call/Client/ClientInterceptors.swift @@ -0,0 +1,84 @@ +/* + * Copyright 2023, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import Atomics +import GRPCCore + +extension ClientInterceptor where Self == RejectAllClientInterceptor { + static func rejectAll(with error: RPCError) -> Self { + return RejectAllClientInterceptor(error: error, throw: false) + } + + static func throwError(_ error: RPCError) -> Self { + return RejectAllClientInterceptor(error: error, throw: true) + } + +} + +extension ClientInterceptor where Self == RequestCountingClientInterceptor { + static func requestCounter(_ counter: ManagedAtomic) -> Self { + return RequestCountingClientInterceptor(counter: counter) + } +} + +/// Rejects all RPCs with the provided error. +struct RejectAllClientInterceptor: ClientInterceptor { + /// The error to reject all RPCs with. + let error: RPCError + /// Whether the error should be thrown. If `false` then the request is rejected with the error + /// instead. + let `throw`: Bool + + init(error: RPCError, throw: Bool = false) { + self.error = error + self.`throw` = `throw` + } + + func intercept( + request: ClientRequest.Stream, + context: ClientInterceptorContext, + next: @Sendable ( + ClientRequest.Stream, + ClientInterceptorContext + ) async throws -> ClientResponse.Stream + ) async throws -> ClientResponse.Stream { + if self.throw { + throw self.error + } else { + return ClientResponse.Stream(error: self.error) + } + } +} + +struct RequestCountingClientInterceptor: ClientInterceptor { + /// The number of requests made. + let counter: ManagedAtomic + + init(counter: ManagedAtomic) { + self.counter = counter + } + + func intercept( + request: ClientRequest.Stream, + context: ClientInterceptorContext, + next: @Sendable ( + ClientRequest.Stream, + ClientInterceptorContext + ) async throws -> ClientResponse.Stream + ) async throws -> ClientResponse.Stream { + self.counter.wrappingIncrement(ordering: .sequentiallyConsistent) + return try await next(request, context) + } +} diff --git a/Tests/GRPCCoreTests/Test Utilities/Call/Server/ServerInterceptors.swift b/Tests/GRPCCoreTests/Test Utilities/Call/Server/ServerInterceptors.swift index 266648768..02d4061a6 100644 --- a/Tests/GRPCCoreTests/Test Utilities/Call/Server/ServerInterceptors.swift +++ b/Tests/GRPCCoreTests/Test Utilities/Call/Server/ServerInterceptors.swift @@ -24,7 +24,6 @@ extension ServerInterceptor where Self == RejectAllServerInterceptor { static func throwError(_ error: RPCError) -> Self { return RejectAllServerInterceptor(error: error, throw: true) } - } extension ServerInterceptor where Self == RequestCountingServerInterceptor { @@ -63,7 +62,7 @@ struct RejectAllServerInterceptor: ServerInterceptor { } struct RequestCountingServerInterceptor: ServerInterceptor { - /// The error to reject all RPCs with. + /// The number of requests made. let counter: ManagedAtomic init(counter: ManagedAtomic) { diff --git a/Tests/GRPCCoreTests/Test Utilities/Transport/AnyTransport.swift b/Tests/GRPCCoreTests/Test Utilities/Transport/AnyTransport.swift index fa64d76fd..5fb153bee 100644 --- a/Tests/GRPCCoreTests/Test Utilities/Transport/AnyTransport.swift +++ b/Tests/GRPCCoreTests/Test Utilities/Transport/AnyTransport.swift @@ -28,7 +28,7 @@ struct AnyClientTransport: ClientTransport, Sendable { ) async throws -> Any private let _connect: @Sendable (Bool) async throws -> Void private let _close: @Sendable () -> Void - private let _configuration: @Sendable (MethodDescriptor) -> ClientRPCExecutionConfiguration? + private let _configuration: @Sendable (MethodDescriptor) -> MethodConfiguration? init(wrapping transport: Transport) where Transport.Inbound == Inbound, Transport.Outbound == Outbound { @@ -74,7 +74,7 @@ struct AnyClientTransport: ClientTransport, Sendable { func executionConfiguration( forMethod descriptor: MethodDescriptor - ) -> ClientRPCExecutionConfiguration? { + ) -> MethodConfiguration? { self._configuration(descriptor) } } diff --git a/Tests/GRPCCoreTests/Test Utilities/Transport/StreamCountingTransport.swift b/Tests/GRPCCoreTests/Test Utilities/Transport/StreamCountingTransport.swift index 11c294e03..0cdb2d1fd 100644 --- a/Tests/GRPCCoreTests/Test Utilities/Transport/StreamCountingTransport.swift +++ b/Tests/GRPCCoreTests/Test Utilities/Transport/StreamCountingTransport.swift @@ -68,7 +68,7 @@ struct StreamCountingClientTransport: ClientTransport, Sendable { func executionConfiguration( forMethod descriptor: MethodDescriptor - ) -> ClientRPCExecutionConfiguration? { + ) -> MethodConfiguration? { self.transport.executionConfiguration(forMethod: descriptor) } } diff --git a/Tests/GRPCCoreTests/Test Utilities/Transport/ThrowingTransport.swift b/Tests/GRPCCoreTests/Test Utilities/Transport/ThrowingTransport.swift index 0dd1cee9b..def987fac 100644 --- a/Tests/GRPCCoreTests/Test Utilities/Transport/ThrowingTransport.swift +++ b/Tests/GRPCCoreTests/Test Utilities/Transport/ThrowingTransport.swift @@ -38,7 +38,7 @@ struct ThrowOnStreamCreationTransport: ClientTransport { func executionConfiguration( forMethod descriptor: MethodDescriptor - ) -> ClientRPCExecutionConfiguration? { + ) -> MethodConfiguration? { return nil } diff --git a/Tests/GRPCCoreTests/Test Utilities/XCTest+Utilities.swift b/Tests/GRPCCoreTests/Test Utilities/XCTest+Utilities.swift index 7bc88edef..71ca6dd7a 100644 --- a/Tests/GRPCCoreTests/Test Utilities/XCTest+Utilities.swift +++ b/Tests/GRPCCoreTests/Test Utilities/XCTest+Utilities.swift @@ -104,6 +104,18 @@ func XCTAssertRejected( } } +func XCTAssertRejected( + _ response: ClientResponse.Single, + errorHandler: (RPCError) -> Void +) { + switch response.accepted { + case .success: + XCTFail("Expected RPC to be rejected") + case .failure(let error): + errorHandler(error) + } +} + func XCTAssertMetadata( _ part: RPCResponsePart?, metadataHandler: (Metadata) -> Void = { _ in } @@ -116,6 +128,18 @@ func XCTAssertMetadata( } } +func XCTAssertMetadata( + _ part: RPCRequestPart?, + metadataHandler: (Metadata) async throws -> Void = { _ in } +) async throws { + switch part { + case .some(.metadata(let metadata)): + try await metadataHandler(metadata) + default: + XCTFail("Expected '.metadata' but found '\(String(describing: part))'") + } +} + func XCTAssertMessage( _ part: RPCResponsePart?, messageHandler: ([UInt8]) -> Void = { _ in } @@ -124,7 +148,19 @@ func XCTAssertMessage( case .some(.message(let message)): messageHandler(message) default: - XCTFail("Expected '.metadata' but found '\(String(describing: part))'") + XCTFail("Expected '.message' but found '\(String(describing: part))'") + } +} + +func XCTAssertMessage( + _ part: RPCRequestPart?, + messageHandler: ([UInt8]) async throws -> Void = { _ in } +) async throws { + switch part { + case .some(.message(let message)): + try await messageHandler(message) + default: + XCTFail("Expected '.message' but found '\(String(describing: part))'") } } diff --git a/Tests/GRPCCoreTests/Transport/InProcessClientTransportTests.swift b/Tests/GRPCCoreTests/Transport/InProcessClientTransportTests.swift index c7bbdef80..4ff0557de 100644 --- a/Tests/GRPCCoreTests/Transport/InProcessClientTransportTests.swift +++ b/Tests/GRPCCoreTests/Transport/InProcessClientTransportTests.swift @@ -187,10 +187,9 @@ final class InProcessClientTransportTests: XCTestCase { hedgingDelay: .seconds(1), nonFatalStatusCodes: [] ) - let defaultConfiguration = ClientRPCExecutionConfiguration(hedgingPolicy: policy) - var configurations = ClientRPCExecutionConfigurationCollection( - defaultConfiguration: defaultConfiguration - ) + let defaultConfiguration = MethodConfiguration(hedgingPolicy: policy) + var configurations = MethodConfigurations() + configurations.setDefaultConfiguration(defaultConfiguration) var client = InProcessClientTransport(server: .init(), executionConfigurations: configurations) @@ -204,7 +203,7 @@ final class InProcessClientTransportTests: XCTestCase { backoffMultiplier: 1.0, retryableStatusCodes: [.unavailable] ) - let overrideConfiguration = ClientRPCExecutionConfiguration(retryPolicy: retryPolicy) + let overrideConfiguration = MethodConfiguration(retryPolicy: retryPolicy) configurations[firstDescriptor] = overrideConfiguration client = InProcessClientTransport(server: .init(), executionConfigurations: configurations) let secondDescriptor = MethodDescriptor(service: "test", method: "second") @@ -243,7 +242,7 @@ final class InProcessClientTransportTests: XCTestCase { } func makeClient( - configuration: ClientRPCExecutionConfiguration? = nil, + configuration: MethodConfiguration? = nil, server: InProcessServerTransport = InProcessServerTransport() ) -> InProcessClientTransport { let defaultPolicy = RetryPolicy( @@ -254,11 +253,13 @@ final class InProcessClientTransportTests: XCTestCase { retryableStatusCodes: [.unavailable] ) + var methodConfiguration = MethodConfigurations() + methodConfiguration.setDefaultConfiguration( + configuration ?? .init(retryPolicy: defaultPolicy) + ) return InProcessClientTransport( server: server, - executionConfigurations: .init( - defaultConfiguration: configuration ?? .init(retryPolicy: defaultPolicy) - ) + executionConfigurations: methodConfiguration ) } }