From f06c489f389f6d934420692861b84fc18816f2cc Mon Sep 17 00:00:00 2001 From: George Barnett Date: Tue, 5 Dec 2023 16:40:58 +0000 Subject: [PATCH] Simplify client and server init Motivation: When configuring a client and server must users will specify only the transport and services (for a server) and potentially interceptors too. This should be the "easy" path. Moreover these are all reference types so it makes sense to separate them from "raw" configuration. Creating a client also diverged somewhat from the server. Having both follow a similar pattern simplifies things for users. Modifications: - Move interceptors from the client config to the init. - Modify the server to follow a similar pattern to the server. Results: Server and client are configured similarly. --- Sources/GRPCCore/ClientError.swift | 6 - Sources/GRPCCore/GRPCClient.swift | 101 ++--- Sources/GRPCCore/GRPCServer.swift | 344 +++++++----------- .../GRPCCore/Transport/ServerTransport.swift | 2 +- Tests/GRPCCoreTests/GRPCClientTests.swift | 46 +-- Tests/GRPCCoreTests/GRPCServerTests.swift | 76 +--- 6 files changed, 186 insertions(+), 389 deletions(-) diff --git a/Sources/GRPCCore/ClientError.swift b/Sources/GRPCCore/ClientError.swift index 77ad48719..ddf4937dc 100644 --- a/Sources/GRPCCore/ClientError.swift +++ b/Sources/GRPCCore/ClientError.swift @@ -109,7 +109,6 @@ extension ClientError { public struct Code: Hashable, Sendable { private enum Value { case clientIsAlreadyRunning - case clientIsNotRunning case clientIsStopped case transportError } @@ -124,11 +123,6 @@ extension ClientError { Self(.clientIsAlreadyRunning) } - /// An attempt to start an RPC was made but the client is not running. - public static var clientIsNotRunning: Self { - Self(.clientIsNotRunning) - } - /// At attempt to start the client was made but it has already stopped. public static var clientIsStopped: Self { Self(.clientIsStopped) diff --git a/Sources/GRPCCore/GRPCClient.swift b/Sources/GRPCCore/GRPCClient.swift index 848e01297..22f054f59 100644 --- a/Sources/GRPCCore/GRPCClient.swift +++ b/Sources/GRPCCore/GRPCClient.swift @@ -40,9 +40,6 @@ import Atomics /// // Create a configuration object for the client. /// var configuration = GRPCClient.Configuration() /// -/// // Create and add an interceptor. -/// configuration.interceptors.add(StatsRecordingClientInterceptor()) -/// /// // Override the timeout for the 'Get' method on the 'echo.Echo' service. This configuration /// // takes precedence over any set by the transport. /// let echoGet = MethodDescriptor(service: "echo.Echo", method: "Get") @@ -56,10 +53,15 @@ import Atomics /// let defaultMethodConfiguration = MethodConfiguration(executionPolicy: nil, timeout: seconds(10)) /// configuration.method.defaults.setDefaultConfiguration(defaultMethodConfiguration) /// -/// // Finally create a transport and instantiate the client. +/// // Finally create a transport and instantiate the client, adding an interceptor. /// let inProcessServerTransport = InProcessServerTransport() /// let inProcessClientTransport = InProcessClientTransport(serverTransport: inProcessServerTransport) -/// let client = GRPCClient(transport: inProcessClientTransport, configuration: configuration) +/// +/// let client = GRPCClient( +/// transport: inProcessClientTransport, +/// interceptors: [StatsRecordingClientInterceptor()], +/// configuration: configuration +/// ) /// ``` /// /// ## Starting and stopping the client @@ -101,6 +103,14 @@ public struct GRPCClient: Sendable { /// The transport which provides a bidirectional communication channel with the server. private let transport: any ClientTransport + /// A collection of interceptors providing cross-cutting functionality to each accepted RPC. + /// + /// The order in which interceptors are added reflects the order in which they are called. The + /// first interceptor added will be the first interceptor to intercept each request. The last + /// interceptor added will be the final interceptor to intercept each request before calling + /// the appropriate handler. + private let interceptors: [any ClientInterceptor] + /// The configuration used by the client. public let configuration: Configuration @@ -121,13 +131,23 @@ public struct GRPCClient: Sendable { case stopped } - /// Creates a new client with the given transport and configuration. + /// Creates a new client with the given transport, interceptors and configuration. /// /// - Parameters: /// - transport: The transport used to establish a communication channel with a server. + /// - interceptors: A collection of interceptors providing cross-cutting functionality to each + /// accepted RPC. The order in which interceptors are added reflects the order in which they + /// are called. The first interceptor added will be the first interceptor to intercept each + /// request. The last interceptor added will be the final interceptor to intercept each + /// request before calling the appropriate handler. /// - configuration: Configuration for the client. - public init(transport: some ClientTransport, configuration: Configuration = Configuration()) { + public init( + transport: some ClientTransport, + interceptors: [any ClientInterceptor] = [], + configuration: Configuration = Configuration() + ) { self.transport = transport + self.interceptors = interceptors self.configuration = configuration self.state = ManagedAtomic(.notStarted) } @@ -250,7 +270,7 @@ public struct GRPCClient: Sendable { deserializer: some MessageDeserializer, handler: @Sendable @escaping (ClientResponse.Single) async throws -> ReturnValue ) async throws -> ReturnValue { - try await bidirectionalStreaming( + try await self.bidirectionalStreaming( request: ClientRequest.Stream(single: request), descriptor: descriptor, serializer: serializer, @@ -278,7 +298,7 @@ public struct GRPCClient: Sendable { deserializer: some MessageDeserializer, handler: @Sendable @escaping (ClientResponse.Single) async throws -> ReturnValue ) async throws -> ReturnValue { - try await bidirectionalStreaming( + try await self.bidirectionalStreaming( request: request, descriptor: descriptor, serializer: serializer, @@ -306,7 +326,7 @@ public struct GRPCClient: Sendable { deserializer: some MessageDeserializer, handler: @Sendable @escaping (ClientResponse.Stream) async throws -> ReturnValue ) async throws -> ReturnValue { - try await bidirectionalStreaming( + try await self.bidirectionalStreaming( request: ClientRequest.Stream(single: request), descriptor: descriptor, serializer: serializer, @@ -336,13 +356,10 @@ public struct GRPCClient: Sendable { handler: @Sendable @escaping (ClientResponse.Stream) async throws -> ReturnValue ) async throws -> ReturnValue { switch self.state.load(ordering: .sequentiallyConsistent) { - case .running: + case .notStarted, .running: + // Allow .notStarted as making a request can race with 'run()'. Transports should tolerate + // queuing the request if not yet started. () - case .notStarted: - throw ClientError( - code: .clientIsNotRunning, - message: "Client must be running to make an RPC: call run() first." - ) case .stopping, .stopped: throw ClientError( code: .clientIsStopped, @@ -357,7 +374,7 @@ public struct GRPCClient: Sendable { serializer: serializer, deserializer: deserializer, transport: self.transport, - interceptors: self.configuration.interceptors.values, + interceptors: self.interceptors, handler: handler ) } @@ -383,14 +400,6 @@ public struct GRPCClient: Sendable { @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) extension GRPCClient { public struct Configuration: Sendable { - /// A collection of interceptors providing cross-cutting functionality to each accepted RPC. - /// - /// The order in which interceptors are added reflects the order in which they are called. The - /// first interceptor added will be the first interceptor to intercept each request. The last - /// interceptor added will be the final interceptor to intercept each request before calling - /// the appropriate handler. - public var interceptors: Interceptors - /// Configuration for how methods are executed. /// /// Method configuration determines how each RPC is executed by the client. Some services and @@ -401,7 +410,6 @@ extension GRPCClient { /// Creates a new default configuration. public init() { - self.interceptors = Interceptors() self.method = Method() } } @@ -409,40 +417,6 @@ extension GRPCClient { @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) extension GRPCClient.Configuration { - /// A collection of ``ClientInterceptor`` implementations which are applied to all accepted - /// RPCs. - /// - /// RPCs are intercepted in the order that interceptors are added. That is, a request sent from the client to - /// the server will first be intercepted by the first added interceptor followed by the second, and so on. - /// For responses from the server, they'll be applied in the opposite order. - public struct Interceptors: Sendable { - private(set) var values: [any ClientInterceptor] = [] - - /// Add an interceptor to the client. - /// - /// The order in which interceptors are added reflects the order in which they are called. The - /// first interceptor added will be the first interceptor to intercept each request. The last - /// interceptor added will be the final interceptor to intercept each request before calling - /// the appropriate handler. - /// - /// - Parameter interceptor: The interceptor to add. - public mutating func add(_ interceptor: some ClientInterceptor) { - self.values.append(interceptor) - } - - /// Adds a sequence of interceptor to the client. - /// - /// The order in which interceptors are added reflects the order in which they are called. The - /// first interceptor added will be the first interceptor to intercept each request. The last - /// interceptor added will be the final interceptor to intercept each request before calling - /// the appropriate handler. - /// - /// - Parameter interceptors: The interceptors to add. - public mutating func add(contentsOf interceptors: some Sequence) { - self.values.append(contentsOf: interceptors) - } - } - /// Configuration for how methods should be executed. /// /// In most cases the client should defer to the configuration provided by the transport as this @@ -464,10 +438,3 @@ extension GRPCClient.Configuration { } } } - -@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) -extension GRPCClient.Configuration.Interceptors: CustomStringConvertible { - public var description: String { - return String(describing: self.values.map { String(describing: type(of: $0)) }) - } -} diff --git a/Sources/GRPCCore/GRPCServer.swift b/Sources/GRPCCore/GRPCServer.swift index 626816f9b..7f953fb6e 100644 --- a/Sources/GRPCCore/GRPCServer.swift +++ b/Sources/GRPCCore/GRPCServer.swift @@ -34,18 +34,22 @@ import Atomics /// The following example demonstrates how to create and configure a server. /// /// ```swift -/// let server = GRPCServer() -/// -/// // Create and add an in-process transport. +/// // Create and an in-process transport. /// let inProcessTransport = InProcessServerTransport() -/// server.transports.add(inProcessTransport) /// -/// // Create and register the 'Greeter' and 'Echo' services. -/// server.services.register(GreeterService()) -/// server.services.register(EchoService()) +/// // Create the 'Greeter' and 'Echo' services. +/// let greeter = GreeterService() +/// let echo = EchoService() +/// +/// // Create an interceptor. +/// let statsRecorder = StatsRecordingServerInterceptors() /// -/// // Create and add some interceptors. -/// server.interceptors.add(StatsRecordingServerInterceptors()) +/// // Finally create the server. +/// let server = GRPCServer( +/// transports: [inProcessTransport], +/// services: [greeter, echo], +/// interceptors: [statsRecorder] +/// ) /// ``` /// /// ## Starting and stopping the server @@ -66,29 +70,15 @@ import Atomics /// that need their lifecycles managed you should consider using [Swift Service /// Lifecycle](https://github.com/swift-server/swift-service-lifecycle). @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) -public final class GRPCServer: Sendable { +public struct GRPCServer: Sendable { typealias Stream = RPCStream /// A collection of ``ServerTransport`` implementations that the server uses to listen /// for new requests. - public var transports: Transports { - get { - self.storage.withLockedValue { $0.transports } - } - set { - self.storage.withLockedValue { $0.transports = newValue } - } - } + private let transports: [any ServerTransport] /// The services registered which the server is serving. - public var services: Services { - get { - self.storage.withLockedValue { $0.services } - } - set { - self.storage.withLockedValue { $0.services = newValue } - } - } + private let router: RPCRouter /// A collection of ``ServerInterceptor`` implementations which are applied to all accepted /// RPCs. @@ -96,34 +86,12 @@ public final class GRPCServer: Sendable { /// RPCs are intercepted in the order that interceptors are added. That is, a request received /// from the client will first be intercepted by the first added interceptor followed by the /// second, and so on. - public var interceptors: Interceptors { - get { - self.storage.withLockedValue { $0.interceptors } - } - set { - self.storage.withLockedValue { $0.interceptors = newValue } - } - } - - /// Underlying storage for the server. - private struct Storage { - var transports: Transports - var services: Services - var interceptors: Interceptors - var state: State - - init() { - self.transports = Transports() - self.services = Services() - self.interceptors = Interceptors() - self.state = .notStarted - } - } - - private let storage: LockedValueBox + private let interceptors: [any ServerInterceptor] /// The state of the server. - private enum State { + private let state: ManagedAtomic + + private enum State: UInt8, AtomicValue { /// The server hasn't been started yet. Can transition to `starting` or `stopped`. case notStarted /// The server is starting but isn't accepting requests yet. Can transition to `running` @@ -141,11 +109,46 @@ public final class GRPCServer: Sendable { /// Creates a new server with no resources. /// - /// You can add resources to the server via ``transports-swift.property``, - /// ``services-swift.property``, and ``interceptors-swift.property`` and start the server by - /// calling ``run()``. Any changes to resources after ``run()`` has been called will be ignored. - public init() { - self.storage = LockedValueBox(Storage()) + /// - Parameters: + /// - transports: The transports the server should listen on. + /// - services: Services offered by the server. + /// - interceptors: A collection of interceptors providing cross-cutting functionality to each + /// accepted RPC. The order in which interceptors are added reflects the order in which they + /// are called. The first interceptor added will be the first interceptor to intercept each + /// request. The last interceptor added will be the final interceptor to intercept each + /// request before calling the appropriate handler. + public init( + transports: [any ServerTransport], + services: [any RegistrableRPCService], + interceptors: [any ServerInterceptor] = [] + ) { + var router = RPCRouter() + for service in services { + service.registerMethods(with: &router) + } + + self.init(transports: transports, router: router, interceptors: interceptors) + } + + /// Creates a new server with no resources. + /// + /// - Parameters: + /// - transports: The transports the server should listen on. + /// - router: A ``RPCRouter`` used by the server to route accepted streams to method handlers. + /// - interceptors: A collection of interceptors providing cross-cutting functionality to each + /// accepted RPC. The order in which interceptors are added reflects the order in which they + /// are called. The first interceptor added will be the first interceptor to intercept each + /// request. The last interceptor added will be the final interceptor to intercept each + /// request before calling the appropriate handler. + public init( + transports: [any ServerTransport], + router: RPCRouter, + interceptors: [any ServerInterceptor] = [] + ) { + self.state = ManagedAtomic(.notStarted) + self.transports = transports + self.router = router + self.interceptors = interceptors } /// Starts the server and runs until all registered transports have closed. @@ -159,20 +162,19 @@ public final class GRPCServer: Sendable { /// /// To stop the server more abruptly you can cancel the task that this function is running in. /// - /// You must register all resources you wish to use with the server before calling this function - /// as changes made after calling ``run()`` won't be reflected. - /// /// - Note: You can only call this function once, repeated calls will result in a /// ``ServerError`` being thrown. - /// - Important: You must register at least one transport by calling - /// ``Transports-swift.struct/add(_:)`` before calling this method. public func run() async throws { - let (transports, router, interceptors) = try self.storage.withLockedValue { storage in - switch storage.state { + let (wasNotStarted, actualState) = self.state.compareExchange( + expected: .notStarted, + desired: .starting, + ordering: .sequentiallyConsistent + ) + + guard wasNotStarted else { + switch actualState { case .notStarted: - storage.state = .starting - return (storage.transports, storage.services.router, storage.interceptors) - + fatalError() case .starting, .running: throw ServerError( code: .serverIsAlreadyRunning, @@ -189,31 +191,31 @@ public final class GRPCServer: Sendable { // When we exit this function we must have stopped. defer { - self.storage.withLockedValue { $0.state = .stopped } + self.state.store(.stopped, ordering: .sequentiallyConsistent) } - if transports.values.isEmpty { + if self.transports.isEmpty { throw ServerError( code: .noTransportsConfigured, message: """ Can't start server, no transports are configured. You must add at least one transport \ - to the server using 'transports.add(_:)' before calling 'run()'. + to the server before calling 'run()'. """ ) } var listeners: [RPCAsyncSequence] = [] - listeners.reserveCapacity(transports.values.count) + listeners.reserveCapacity(self.transports.count) - for transport in transports.values { + for transport in self.transports { do { let listener = try await transport.listen() listeners.append(listener) } catch let cause { // Failed to start, so start stopping. - self.storage.withLockedValue { $0.state = .stopping } + self.state.store(.stopping, ordering: .sequentiallyConsistent) // Some listeners may have started and have streams which need closing. - await Self.rejectRequests(listeners, transports: transports) + await self.rejectRequests(listeners) throw ServerError( code: .failedToStartTransport, @@ -227,35 +229,24 @@ public final class GRPCServer: Sendable { } // May have been told to stop listening while starting the transports. - let isStopping = self.storage.withLockedValue { storage in - switch storage.state { - case .notStarted, .running, .stopped: - fatalError("Invalid state") - - case .starting: - storage.state = .running - return false - - case .stopping: - return true - } - } + let (wasStarting, _) = self.state.compareExchange( + expected: .starting, + desired: .running, + ordering: .sequentiallyConsistent + ) // If the server is stopping then notify the transport and then consume them: there may be // streams opened at a lower level (e.g. HTTP/2) which are already open and need to be consumed. - if isStopping { - await Self.rejectRequests(listeners, transports: transports) + if wasStarting { + await self.handleRequests(listeners) } else { - await Self.handleRequests(listeners, router: router, interceptors: interceptors) + await self.rejectRequests(listeners) } } - private static func rejectRequests( - _ listeners: [RPCAsyncSequence], - transports: Transports - ) async { + private func rejectRequests(_ listeners: [RPCAsyncSequence]) async { // Tell the active listeners to stop listening. - for transport in transports.values.prefix(listeners.count) { + for transport in self.transports.prefix(listeners.count) { transport.stopListening() } @@ -282,33 +273,21 @@ public final class GRPCServer: Sendable { } } - private static func handleRequests( - _ listeners: [RPCAsyncSequence], - router: RPCRouter, - interceptors: Interceptors - ) async { + private func handleRequests(_ listeners: [RPCAsyncSequence]) async { #if swift(>=5.9) if #available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *) { - await Self.handleRequestsInDiscardingTaskGroup( - listeners, - router: router, - interceptors: interceptors - ) + await self.handleRequestsInDiscardingTaskGroup(listeners) } else { - await Self.handleRequestsInTaskGroup(listeners, router: router, interceptors: interceptors) + await self.handleRequestsInTaskGroup(listeners) } #else - await Self.handleRequestsInTaskGroup(listeners, router: router, interceptors: interceptors) + await self.handleRequestsInTaskGroup(listeners) #endif } #if swift(>=5.9) @available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *) - private static func handleRequestsInDiscardingTaskGroup( - _ listeners: [RPCAsyncSequence], - router: RPCRouter, - interceptors: Interceptors - ) async { + private func handleRequestsInDiscardingTaskGroup(_ listeners: [RPCAsyncSequence]) async { await withDiscardingTaskGroup { group in for listener in listeners { group.addTask { @@ -316,7 +295,7 @@ public final class GRPCServer: Sendable { do { for try await stream in listener { subGroup.addTask { - await router.handle(stream: stream, interceptors: interceptors.values) + await self.router.handle(stream: stream, interceptors: self.interceptors) } } } catch { @@ -330,11 +309,7 @@ public final class GRPCServer: Sendable { } #endif - private static func handleRequestsInTaskGroup( - _ listeners: [RPCAsyncSequence], - router: RPCRouter, - interceptors: Interceptors - ) async { + private func handleRequestsInTaskGroup(_ listeners: [RPCAsyncSequence]) async { // If the discarding task group isn't available then fall back to using a regular task group // with a limit on subtasks. Most servers will use an HTTP/2 based transport, most // implementations limit connections to 100 concurrent streams. A limit of 4096 gives the server @@ -355,7 +330,7 @@ public final class GRPCServer: Sendable { } subGroup.addTask { - await router.handle(stream: stream, interceptors: interceptors.values) + await self.router.handle(stream: stream, interceptors: self.interceptors) } } } catch { @@ -375,105 +350,50 @@ public final class GRPCServer: Sendable { /// /// Calling this on a server which is already stopping or has stopped has no effect. public func stopListening() { - let transports = self.storage.withLockedValue { storage in - let transports: Transports? - - switch storage.state { - case .notStarted: - storage.state = .stopped - transports = nil - case .starting: - storage.state = .stopping - transports = nil - case .running: - storage.state = .stopping - transports = storage.transports - case .stopping: - transports = nil - case .stopped: - transports = nil - } - - return transports - } - - if let transports = transports?.values { - for transport in transports { + let (wasRunning, actual) = self.state.compareExchange( + expected: .running, + desired: .stopping, + ordering: .sequentiallyConsistent + ) + + if wasRunning { + for transport in self.transports { transport.stopListening() } - } - } -} - -@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) -extension GRPCServer { - /// The transports which provide a bidirectional communication channel with clients. - /// - /// You can add a new transport by calling ``add(_:)``. - public struct Transports: Sendable { - private(set) var values: [any (ServerTransport & Sendable)] = [] - - /// Add a transport to the server. - /// - /// - Parameter transport: The transport to add. - public mutating func add(_ transport: some (ServerTransport & Sendable)) { - self.values.append(transport) - } - } + } else { + switch actual { + case .notStarted: + let (exchanged, _) = self.state.compareExchange( + expected: .notStarted, + desired: .stopped, + ordering: .sequentiallyConsistent + ) - /// The services registered with this server. - /// - /// You can register services by calling ``register(_:)`` or by manually adding handlers for - /// methods to the ``router``. - public struct Services: Sendable { - /// The router storing handlers for known methods. - public var router = RPCRouter() - - /// Registers service methods with the ``router``. - /// - /// - Parameter service: The service to register with the ``router``. - public mutating func register(_ service: some RegistrableRPCService) { - service.registerMethods(with: &self.router) - } - } + // Lost a race with 'run()', try again. + if !exchanged { + self.stopListening() + } - /// A collection of interceptors providing cross-cutting functionality to each accepted RPC. - public struct Interceptors: Sendable { - private(set) var values: [any ServerInterceptor] = [] - - /// Add an interceptor to the server. - /// - /// The order in which interceptors are added reflects the order in which they are called. The - /// first interceptor added will be the first interceptor to intercept each request. The last - /// interceptor added will be the final interceptor to intercept each request before calling - /// the appropriate handler. - /// - /// - Parameter interceptor: The interceptor to add. - public mutating func add(_ interceptor: some ServerInterceptor) { - self.values.append(interceptor) - } - } -} + case .starting: + let (exchanged, _) = self.state.compareExchange( + expected: .starting, + desired: .stopping, + ordering: .sequentiallyConsistent + ) -@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) -extension GRPCServer.Transports: CustomStringConvertible { - public var description: String { - return String(describing: self.values) - } -} + // Lost a race with 'run()', try again. + if !exchanged { + self.stopListening() + } -@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) -extension GRPCServer.Services: CustomStringConvertible { - public var description: String { - // List the fully qualified all methods ordered by service and then method - let rpcs = self.router.methods.map { $0.fullyQualifiedMethod }.sorted() - return String(describing: rpcs) - } -} + case .running: + // Unreachable, this branch only happens when the initial exchange didn't take place. + fatalError() -@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) -extension GRPCServer.Interceptors: CustomStringConvertible { - public var description: String { - return String(describing: self.values.map { String(describing: type(of: $0)) }) + case .stopping, .stopped: + // Already stopping/stopped, ignore. + () + } + } } } diff --git a/Sources/GRPCCore/Transport/ServerTransport.swift b/Sources/GRPCCore/Transport/ServerTransport.swift index 3c3dbc45c..05ca4e9ef 100644 --- a/Sources/GRPCCore/Transport/ServerTransport.swift +++ b/Sources/GRPCCore/Transport/ServerTransport.swift @@ -15,7 +15,7 @@ */ @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) -public protocol ServerTransport { +public protocol ServerTransport: Sendable { typealias Inbound = RPCAsyncSequence typealias Outbound = RPCWriter.Closable diff --git a/Tests/GRPCCoreTests/GRPCClientTests.swift b/Tests/GRPCCoreTests/GRPCClientTests.swift index d36f78dee..bf8d5c050 100644 --- a/Tests/GRPCCoreTests/GRPCClientTests.swift +++ b/Tests/GRPCCoreTests/GRPCClientTests.swift @@ -36,16 +36,8 @@ final class GRPCClientTests: XCTestCase { _ body: (GRPCClient, GRPCServer) async throws -> Void ) async throws { let inProcess = self.makeInProcessPair() - var configuration = GRPCClient.Configuration() - configuration.interceptors.add(contentsOf: interceptors) - let client = GRPCClient(transport: inProcess.client, configuration: configuration) - - let server = GRPCServer() - server.transports.add(inProcess.server) - - for service in services { - server.services.register(service) - } + let client = GRPCClient(transport: inProcess.client, interceptors: interceptors) + let server = GRPCServer(transports: [inProcess.server], services: services) try await withThrowingTaskGroup(of: Void.self) { group in group.addTask { @@ -338,9 +330,7 @@ final class GRPCClientTests: XCTestCase { try await withThrowingTaskGroup(of: Void.self) { group in group.addTask { - let server = GRPCServer() - server.services.register(BinaryEcho()) - server.transports.add(inProcess.server) + let server = GRPCServer(transports: [inProcess.server], services: [BinaryEcho()]) try await server.run() } @@ -416,34 +406,4 @@ final class GRPCClientTests: XCTestCase { task.cancel() } - - func testRunClientNotRunning() async throws { - let (clientTransport, _) = self.makeInProcessPair() - let client = GRPCClient(transport: clientTransport) - - // Client is not running, should throw an error. - await XCTAssertThrowsErrorAsync(ofType: ClientError.self) { - try await client.unary( - request: .init(message: [3, 1, 4, 1, 5]), - descriptor: BinaryEcho.Methods.collect, - serializer: IdentitySerializer(), - deserializer: IdentityDeserializer() - ) { response in - let message = try response.message - XCTAssertEqual(message, [3, 1, 4, 1, 5]) - } - } errorHandler: { error in - XCTAssertEqual(error.code, .clientIsNotRunning) - } - } - - func testInterceptorsDescription() async throws { - var config = GRPCClient.Configuration() - config.interceptors.add(.rejectAll(with: .init(code: .aborted, message: ""))) - config.interceptors.add(.requestCounter(.init(0))) - - let description = String(describing: config.interceptors) - let expected = #"["RejectAllClientInterceptor", "RequestCountingClientInterceptor"]"# - XCTAssertEqual(description, expected) - } } diff --git a/Tests/GRPCCoreTests/GRPCServerTests.swift b/Tests/GRPCCoreTests/GRPCServerTests.swift index e694e80cd..cc3169d50 100644 --- a/Tests/GRPCCoreTests/GRPCServerTests.swift +++ b/Tests/GRPCCoreTests/GRPCServerTests.swift @@ -33,16 +33,11 @@ final class GRPCServerTests: XCTestCase { _ body: (InProcessClientTransport, GRPCServer) async throws -> Void ) async throws { let inProcess = self.makeInProcessPair() - let server = GRPCServer() - server.transports.add(inProcess.server) - - for service in services { - server.services.register(service) - } - - for interceptor in interceptors { - server.interceptors.add(interceptor) - } + let server = GRPCServer( + transports: [inProcess.server], + services: services, + interceptors: interceptors + ) try await withThrowingTaskGroup(of: Void.self) { group in group.addTask { @@ -305,9 +300,7 @@ final class GRPCServerTests: XCTestCase { func testCancelRunningServer() async throws { let inProcess = self.makeInProcessPair() let task = Task { - let server = GRPCServer() - server.services.register(BinaryEcho()) - server.transports.add(inProcess.server) + let server = GRPCServer(transports: [inProcess.server], services: [BinaryEcho()]) try await server.run() } @@ -326,7 +319,7 @@ final class GRPCServerTests: XCTestCase { } func testTestRunServerWithNoTransport() async throws { - let server = GRPCServer() + let server = GRPCServer(transports: [], services: []) await XCTAssertThrowsErrorAsync(ofType: ServerError.self) { try await server.run() } errorHandler: { error in @@ -335,8 +328,7 @@ final class GRPCServerTests: XCTestCase { } func testTestRunStoppedServer() async throws { - let server = GRPCServer() - server.transports.add(InProcessServerTransport()) + let server = GRPCServer(transports: [InProcessServerTransport()], services: []) // Run the server. let task = Task { try await server.run() } task.cancel() @@ -351,8 +343,7 @@ final class GRPCServerTests: XCTestCase { } func testRunServerWhenTransportThrows() async throws { - let server = GRPCServer() - server.transports.add(ThrowOnRunServerTransport()) + let server = GRPCServer(transports: [ThrowOnRunServerTransport()], services: []) await XCTAssertThrowsErrorAsync(ofType: ServerError.self) { try await server.run() } errorHandler: { error in @@ -361,15 +352,17 @@ final class GRPCServerTests: XCTestCase { } func testRunServerDrainsRunningTransportsWhenOneFailsToStart() async throws { - let server = GRPCServer() - // Register the in process transport first and allow it to come up. let inProcess = self.makeInProcessPair() - server.transports.add(inProcess.server) - // Register a transport waits for a signal before throwing. let signal = AsyncStream.makeStream(of: Void.self) - server.transports.add(ThrowOnSignalServerTransport(signal: signal.stream)) + let server = GRPCServer( + transports: [ + inProcess.server, + ThrowOnSignalServerTransport(signal: signal.stream), + ], + services: [] + ) // Connect the in process client and start an RPC. When the stream is opened signal the // other transport to throw. This stream should be failed by the server. @@ -402,43 +395,6 @@ final class GRPCServerTests: XCTestCase { } } - func testInterceptorsDescription() async throws { - let server = GRPCServer() - server.interceptors.add(.rejectAll(with: .init(code: .aborted, message: ""))) - server.interceptors.add(.requestCounter(.init(0))) - let description = String(describing: server.interceptors) - let expected = #"["RejectAllServerInterceptor", "RequestCountingServerInterceptor"]"# - XCTAssertEqual(description, expected) - } - - func testServicesDescription() async throws { - let server = GRPCServer() - let methods: [(String, String)] = [ - ("helloworld.Greeter", "SayHello"), - ("echo.Echo", "Foo"), - ("echo.Echo", "Bar"), - ("echo.Echo", "Baz"), - ] - - for (service, method) in methods { - let descriptor = MethodDescriptor(service: service, method: method) - server.services.router.registerHandler( - forMethod: descriptor, - deserializer: IdentityDeserializer(), - serializer: IdentitySerializer() - ) { _ in - fatalError("Unreachable") - } - } - - let description = String(describing: server.services) - let expected = """ - ["echo.Echo/Bar", "echo.Echo/Baz", "echo.Echo/Foo", "helloworld.Greeter/SayHello"] - """ - - XCTAssertEqual(description, expected) - } - private func doEchoGet(using transport: some ClientTransport) async throws { try await transport.withStream(descriptor: BinaryEcho.Methods.get) { stream in try await stream.outbound.write(.metadata([:]))