Skip to content

Commit 07bff44

Browse files
committed
PR changes
1 parent 46443c2 commit 07bff44

File tree

4 files changed

+62
-108
lines changed

4 files changed

+62
-108
lines changed

Sources/GRPCCore/ClientError.swift

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ extension ClientError {
111111
case clientIsAlreadyRunning
112112
case clientIsNotRunning
113113
case clientIsStopped
114+
case transportError
114115
}
115116

116117
private var value: Value
@@ -132,6 +133,11 @@ extension ClientError {
132133
public static var clientIsStopped: Self {
133134
Self(.clientIsStopped)
134135
}
136+
137+
/// The transport threw an error whilst connected.
138+
public static var transportError: Self {
139+
Self(.transportError)
140+
}
135141
}
136142
}
137143

Sources/GRPCCore/GRPCClient.swift

Lines changed: 41 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,9 @@
3434
///
3535
/// ```swift
3636
/// // Create and add an in-process transport.
37-
/// let inProcessTransport = InProcessClientTransport()
38-
/// let client = GRPCClient(transport: inProcessTransport)
37+
/// let inProcessServerTransport = InProcessServerTransport()
38+
/// let inProcessClientTransport = InProcessClientTransport(serverTransport: inProcessServerTransport)
39+
/// let client = GRPCClient(transport: inProcessClientTransport)
3940
///
4041
/// // Create and add some interceptors.
4142
/// client.interceptors.add(StatsRecordingServerInterceptors())
@@ -56,7 +57,22 @@
5657
///
5758
/// ```swift
5859
/// // Start running the client.
59-
/// try await client.run()
60+
/// // Since it's a long-running task, we do it in a task group so it runs in
61+
/// // the background.
62+
/// try await withThrowingTaskGroup(of: Void.self) { group in
63+
/// group.addTask {
64+
/// try await client.run()
65+
/// }
66+
///
67+
/// try await client.unary(
68+
/// request: .init(message: "Hello!"),
69+
/// descriptor: MethodDescriptor(service: "service", method: "method"),
70+
/// serializer: ...,
71+
/// deserializer: ...
72+
/// ) { response in
73+
/// // Do something with the response
74+
/// }
75+
/// }
6076
/// ```
6177
///
6278
/// The ``run()`` method won't return until the client has finished handling all requests. You can
@@ -133,17 +149,18 @@ public final class GRPCClient: Sendable {
133149
private let storage: LockedValueBox<Storage>
134150

135151
/// The transport which provides a bidirectional communication channel with the server.
136-
private let transport: ClientTransport
152+
private let transport: any ClientTransport
137153

138-
/// Creates a new client with no resources.
154+
/// Creates a new client with no resources (i.e., with no ``Interceptors-swift.struct`` and no
155+
/// ``MethodDescriptor``s).
139156
///
140157
/// You can add resources to the client via ``interceptors-swift.property`` and
141158
/// ``methodConfigurationOverrides-swift.property``, and start the client by calling ``run()``.
142159
///
143160
/// - Note: Any changes to resources after ``run()`` has been called will be ignored.
144161
///
145162
/// - Parameter transport: The ``ClientTransport`` to be used for this ``GRPCClient``.
146-
public init(transport: ClientTransport) {
163+
public init(transport: any ClientTransport) {
147164
self.storage = LockedValueBox(Storage())
148165
self.transport = transport
149166
}
@@ -177,11 +194,14 @@ public final class GRPCClient: Sendable {
177194
self.storage.withLockedValue { $0.state = .stopped }
178195
}
179196

180-
try await withThrowingTaskGroup(of: Void.self) { group in
181-
group.addTask {
182-
try await self.transport.connect(lazily: false)
183-
}
184-
try await group.next()
197+
do {
198+
try await self.transport.connect(lazily: false)
199+
} catch {
200+
throw ClientError(
201+
code: .transportError,
202+
message: "The transport threw an error while connected.",
203+
cause: error
204+
)
185205
}
186206
}
187207

@@ -205,7 +225,7 @@ public final class GRPCClient: Sendable {
205225
self.transport.close()
206226
}
207227

208-
/// Start a unary RPC.
228+
/// Executes a unary RPC.
209229
///
210230
/// - Parameters:
211231
/// - request: The unary request.
@@ -222,41 +242,14 @@ public final class GRPCClient: Sendable {
222242
deserializer: some MessageDeserializer<Response>,
223243
handler: @Sendable @escaping (ClientResponse.Single<Response>) async throws -> ReturnValue
224244
) async throws -> ReturnValue {
225-
let (configurationOverrides, interceptors) = try self.storage.withLockedValue { storage in
226-
switch storage.state {
227-
case .running:
228-
return (storage.methodConfigurationOverrides, storage.interceptors.values)
229-
case .notStarted:
230-
throw ClientError(
231-
code: .clientIsNotRunning,
232-
message: "Client must be running to make an RPC: call run() first."
233-
)
234-
case .stopping, .stopped:
235-
throw ClientError(
236-
code: .clientIsStopped,
237-
message: "Client has been stopped. Can't make any more RPCs."
238-
)
239-
}
240-
}
241-
242-
let applicableConfiguration = self.resolveMethodConfiguration(
243-
descriptor: descriptor,
244-
clientConfigurations: configurationOverrides
245-
)
246-
247-
return try await ClientRPCExecutor.execute(
245+
try await bidirectionalStreaming(
248246
request: ClientRequest.Stream(single: request),
249-
method: descriptor,
250-
configuration: applicableConfiguration,
247+
descriptor: descriptor,
251248
serializer: serializer,
252-
deserializer: deserializer,
253-
transport: self.transport,
254-
interceptors: interceptors,
255-
handler: { stream in
249+
deserializer: deserializer) { stream in
256250
let singleResponse = await ClientResponse.Single(stream: stream)
257251
return try await handler(singleResponse)
258252
}
259-
)
260253
}
261254

262255
/// Start a client-streaming RPC.
@@ -276,41 +269,14 @@ public final class GRPCClient: Sendable {
276269
deserializer: some MessageDeserializer<Response>,
277270
handler: @Sendable @escaping (ClientResponse.Single<Response>) async throws -> ReturnValue
278271
) async throws -> ReturnValue {
279-
let (configurationOverrides, interceptors) = try self.storage.withLockedValue { storage in
280-
switch storage.state {
281-
case .running:
282-
return (storage.methodConfigurationOverrides, storage.interceptors.values)
283-
case .notStarted:
284-
throw ClientError(
285-
code: .clientIsNotRunning,
286-
message: "Client must be running to make an RPC: call run() first."
287-
)
288-
case .stopping, .stopped:
289-
throw ClientError(
290-
code: .clientIsStopped,
291-
message: "Client has been stopped. Can't make any more RPCs."
292-
)
293-
}
294-
}
295-
296-
let applicableConfiguration = self.resolveMethodConfiguration(
297-
descriptor: descriptor,
298-
clientConfigurations: configurationOverrides
299-
)
300-
301-
return try await ClientRPCExecutor.execute(
272+
try await bidirectionalStreaming(
302273
request: request,
303-
method: descriptor,
304-
configuration: applicableConfiguration,
274+
descriptor: descriptor,
305275
serializer: serializer,
306-
deserializer: deserializer,
307-
transport: transport,
308-
interceptors: interceptors,
309-
handler: { stream in
276+
deserializer: deserializer) { stream in
310277
let singleResponse = await ClientResponse.Single(stream: stream)
311278
return try await handler(singleResponse)
312279
}
313-
)
314280
}
315281

316282
/// Start a server-streaming RPC.
@@ -330,36 +296,11 @@ public final class GRPCClient: Sendable {
330296
deserializer: some MessageDeserializer<Response>,
331297
handler: @Sendable @escaping (ClientResponse.Stream<Response>) async throws -> ReturnValue
332298
) async throws -> ReturnValue {
333-
let (configurationOverrides, interceptors) = try self.storage.withLockedValue { storage in
334-
switch storage.state {
335-
case .running:
336-
return (storage.methodConfigurationOverrides, storage.interceptors.values)
337-
case .notStarted:
338-
throw ClientError(
339-
code: .clientIsNotRunning,
340-
message: "Client must be running to make an RPC: call run() first."
341-
)
342-
case .stopping, .stopped:
343-
throw ClientError(
344-
code: .clientIsStopped,
345-
message: "Client has been stopped. Can't make any more RPCs."
346-
)
347-
}
348-
}
349-
350-
let applicableConfiguration = self.resolveMethodConfiguration(
351-
descriptor: descriptor,
352-
clientConfigurations: configurationOverrides
353-
)
354-
355-
return try await ClientRPCExecutor.execute(
299+
try await bidirectionalStreaming(
356300
request: ClientRequest.Stream(single: request),
357-
method: descriptor,
358-
configuration: applicableConfiguration,
301+
descriptor: descriptor,
359302
serializer: serializer,
360303
deserializer: deserializer,
361-
transport: transport,
362-
interceptors: interceptors,
363304
handler: handler
364305
)
365306
}
@@ -442,7 +383,7 @@ extension GRPCClient {
442383
public struct Interceptors: Sendable {
443384
private(set) var values: [any ClientInterceptor] = []
444385

445-
/// Add an interceptor to the server.
386+
/// Add an interceptor to the client.
446387
///
447388
/// The order in which interceptors are added reflects the order in which they are called. The
448389
/// first interceptor added will be the first interceptor to intercept each request. The last

Sources/GRPCCore/GRPCServer.swift

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,11 @@ public final class GRPCServer: Sendable {
7676
self.storage.withLockedValue { $0.transports }
7777
}
7878
set {
79-
self.storage.withLockedValue { $0.transports = newValue }
79+
self.storage.withLockedValue { storage in
80+
if case .notStarted = storage.state {
81+
storage.transports = newValue
82+
}
83+
}
8084
}
8185
}
8286

@@ -86,7 +90,11 @@ public final class GRPCServer: Sendable {
8690
self.storage.withLockedValue { $0.services }
8791
}
8892
set {
89-
self.storage.withLockedValue { $0.services = newValue }
93+
self.storage.withLockedValue { storage in
94+
if case .notStarted = storage.state {
95+
storage.services = newValue
96+
}
97+
}
9098
}
9199
}
92100

@@ -101,7 +109,11 @@ public final class GRPCServer: Sendable {
101109
self.storage.withLockedValue { $0.interceptors }
102110
}
103111
set {
104-
self.storage.withLockedValue { $0.interceptors = newValue }
112+
self.storage.withLockedValue { storage in
113+
if case .notStarted = storage.state {
114+
storage.interceptors = newValue
115+
}
116+
}
105117
}
106118
}
107119

Tests/GRPCCoreTests/Call/Client/MethodConfigurationRegistryTestsTests.swift

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,6 @@ import XCTest
1919
@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
2020
final class MethodConfigurationRegistryTests: XCTestCase {
2121
func testGetConfigurationForKnownMethod() async throws {
22-
let first = ContinuousClock.now
23-
let second = first.advanced(by: .seconds(1))
24-
let result = second.duration(to: first)
25-
print(result.components)
26-
2722
let policy = GRPCClient.MethodConfiguration.HedgingPolicy(
2823
maximumAttempts: 10,
2924
hedgingDelay: .seconds(1),

0 commit comments

Comments
 (0)