Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions Sources/GRPCCore/ClientError.swift
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ extension ClientError {
public struct Code: Hashable, Sendable {
private enum Value {
case clientIsAlreadyRunning
case clientIsNotRunning
case clientIsStopped
case transportError
}
Expand All @@ -124,11 +123,6 @@ extension ClientError {
Self(.clientIsAlreadyRunning)
}

/// An attempt to start an RPC was made but the client is not running.
public static var clientIsNotRunning: Self {
Self(.clientIsNotRunning)
}

/// At attempt to start the client was made but it has already stopped.
public static var clientIsStopped: Self {
Self(.clientIsStopped)
Expand Down
101 changes: 34 additions & 67 deletions Sources/GRPCCore/GRPCClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@ import Atomics
/// // Create a configuration object for the client.
/// var configuration = GRPCClient.Configuration()
///
/// // Create and add an interceptor.
/// configuration.interceptors.add(StatsRecordingClientInterceptor())
///
/// // Override the timeout for the 'Get' method on the 'echo.Echo' service. This configuration
/// // takes precedence over any set by the transport.
/// let echoGet = MethodDescriptor(service: "echo.Echo", method: "Get")
Expand All @@ -56,10 +53,15 @@ import Atomics
/// let defaultMethodConfiguration = MethodConfiguration(executionPolicy: nil, timeout: seconds(10))
/// configuration.method.defaults.setDefaultConfiguration(defaultMethodConfiguration)
///
/// // Finally create a transport and instantiate the client.
/// // Finally create a transport and instantiate the client, adding an interceptor.
/// let inProcessServerTransport = InProcessServerTransport()
/// let inProcessClientTransport = InProcessClientTransport(serverTransport: inProcessServerTransport)
/// let client = GRPCClient(transport: inProcessClientTransport, configuration: configuration)
///
/// let client = GRPCClient(
/// transport: inProcessClientTransport,
/// interceptors: [StatsRecordingClientInterceptor()],
/// configuration: configuration
/// )
/// ```
///
/// ## Starting and stopping the client
Expand Down Expand Up @@ -101,6 +103,14 @@ public struct GRPCClient: Sendable {
/// The transport which provides a bidirectional communication channel with the server.
private let transport: any ClientTransport

/// A collection of interceptors providing cross-cutting functionality to each accepted RPC.
///
/// The order in which interceptors are added reflects the order in which they are called. The
/// first interceptor added will be the first interceptor to intercept each request. The last
/// interceptor added will be the final interceptor to intercept each request before calling
/// the appropriate handler.
private let interceptors: [any ClientInterceptor]

/// The configuration used by the client.
public let configuration: Configuration

Expand All @@ -121,13 +131,23 @@ public struct GRPCClient: Sendable {
case stopped
}

/// Creates a new client with the given transport and configuration.
/// Creates a new client with the given transport, interceptors and configuration.
///
/// - Parameters:
/// - transport: The transport used to establish a communication channel with a server.
/// - interceptors: A collection of interceptors providing cross-cutting functionality to each
/// accepted RPC. The order in which interceptors are added reflects the order in which they
/// are called. The first interceptor added will be the first interceptor to intercept each
/// request. The last interceptor added will be the final interceptor to intercept each
/// request before calling the appropriate handler.
/// - configuration: Configuration for the client.
public init(transport: some ClientTransport, configuration: Configuration = Configuration()) {
public init(
transport: some ClientTransport,
interceptors: [any ClientInterceptor] = [],
configuration: Configuration = Configuration()
) {
self.transport = transport
self.interceptors = interceptors
self.configuration = configuration
self.state = ManagedAtomic(.notStarted)
}
Expand Down Expand Up @@ -250,7 +270,7 @@ public struct GRPCClient: Sendable {
deserializer: some MessageDeserializer<Response>,
handler: @Sendable @escaping (ClientResponse.Single<Response>) async throws -> ReturnValue
) async throws -> ReturnValue {
try await bidirectionalStreaming(
try await self.bidirectionalStreaming(
request: ClientRequest.Stream(single: request),
descriptor: descriptor,
serializer: serializer,
Expand Down Expand Up @@ -278,7 +298,7 @@ public struct GRPCClient: Sendable {
deserializer: some MessageDeserializer<Response>,
handler: @Sendable @escaping (ClientResponse.Single<Response>) async throws -> ReturnValue
) async throws -> ReturnValue {
try await bidirectionalStreaming(
try await self.bidirectionalStreaming(
request: request,
descriptor: descriptor,
serializer: serializer,
Expand Down Expand Up @@ -306,7 +326,7 @@ public struct GRPCClient: Sendable {
deserializer: some MessageDeserializer<Response>,
handler: @Sendable @escaping (ClientResponse.Stream<Response>) async throws -> ReturnValue
) async throws -> ReturnValue {
try await bidirectionalStreaming(
try await self.bidirectionalStreaming(
request: ClientRequest.Stream(single: request),
descriptor: descriptor,
serializer: serializer,
Expand Down Expand Up @@ -336,13 +356,10 @@ public struct GRPCClient: Sendable {
handler: @Sendable @escaping (ClientResponse.Stream<Response>) async throws -> ReturnValue
) async throws -> ReturnValue {
switch self.state.load(ordering: .sequentiallyConsistent) {
case .running:
case .notStarted, .running:
// Allow .notStarted as making a request can race with 'run()'. Transports should tolerate
// queuing the request if not yet started.
()
case .notStarted:
throw ClientError(
code: .clientIsNotRunning,
message: "Client must be running to make an RPC: call run() first."
)
case .stopping, .stopped:
throw ClientError(
code: .clientIsStopped,
Expand All @@ -357,7 +374,7 @@ public struct GRPCClient: Sendable {
serializer: serializer,
deserializer: deserializer,
transport: self.transport,
interceptors: self.configuration.interceptors.values,
interceptors: self.interceptors,
handler: handler
)
}
Expand All @@ -383,14 +400,6 @@ public struct GRPCClient: Sendable {
@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
extension GRPCClient {
public struct Configuration: Sendable {
/// A collection of interceptors providing cross-cutting functionality to each accepted RPC.
///
/// The order in which interceptors are added reflects the order in which they are called. The
/// first interceptor added will be the first interceptor to intercept each request. The last
/// interceptor added will be the final interceptor to intercept each request before calling
/// the appropriate handler.
public var interceptors: Interceptors

/// Configuration for how methods are executed.
///
/// Method configuration determines how each RPC is executed by the client. Some services and
Expand All @@ -401,48 +410,13 @@ extension GRPCClient {

/// Creates a new default configuration.
public init() {
self.interceptors = Interceptors()
self.method = Method()
}
}
}

@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
extension GRPCClient.Configuration {
/// A collection of ``ClientInterceptor`` implementations which are applied to all accepted
/// RPCs.
///
/// RPCs are intercepted in the order that interceptors are added. That is, a request sent from the client to
/// the server will first be intercepted by the first added interceptor followed by the second, and so on.
/// For responses from the server, they'll be applied in the opposite order.
public struct Interceptors: Sendable {
private(set) var values: [any ClientInterceptor] = []

/// Add an interceptor to the client.
///
/// The order in which interceptors are added reflects the order in which they are called. The
/// first interceptor added will be the first interceptor to intercept each request. The last
/// interceptor added will be the final interceptor to intercept each request before calling
/// the appropriate handler.
///
/// - Parameter interceptor: The interceptor to add.
public mutating func add(_ interceptor: some ClientInterceptor) {
self.values.append(interceptor)
}

/// Adds a sequence of interceptor to the client.
///
/// The order in which interceptors are added reflects the order in which they are called. The
/// first interceptor added will be the first interceptor to intercept each request. The last
/// interceptor added will be the final interceptor to intercept each request before calling
/// the appropriate handler.
///
/// - Parameter interceptors: The interceptors to add.
public mutating func add(contentsOf interceptors: some Sequence<ClientInterceptor>) {
self.values.append(contentsOf: interceptors)
}
}

/// Configuration for how methods should be executed.
///
/// In most cases the client should defer to the configuration provided by the transport as this
Expand All @@ -464,10 +438,3 @@ extension GRPCClient.Configuration {
}
}
}

@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
extension GRPCClient.Configuration.Interceptors: CustomStringConvertible {
public var description: String {
return String(describing: self.values.map { String(describing: type(of: $0)) })
}
}
Loading