From 5702fc829038bfc04506d4627c6bb179b03bc280 Mon Sep 17 00:00:00 2001 From: Stefana Dranca Date: Wed, 17 Apr 2024 14:26:51 +0100 Subject: [PATCH 1/5] BenchmarkClient RPC implementation Motivation: When the WorkerService holds and monitors BenchmarkClients, they need to make the requests to the server, as configured in the config input the WorkerService receives from the Test Driver. Modifications: - implemented a helper function that computes the latency and extracts the error code for a RPC - implemented the body of the makrRPC function that makes one of the 5 possible RPCs Result: The BenchmarkClient implementation for performance testing will be completed. --- .../performance-worker/BenchmarkClient.swift | 130 ++++++++++++++++-- .../performance-worker/WorkerService.swift | 1 + 2 files changed, 123 insertions(+), 8 deletions(-) diff --git a/Sources/performance-worker/BenchmarkClient.swift b/Sources/performance-worker/BenchmarkClient.swift index 63d7506b6..bbc77e05f 100644 --- a/Sources/performance-worker/BenchmarkClient.swift +++ b/Sources/performance-worker/BenchmarkClient.swift @@ -23,17 +23,20 @@ struct BenchmarkClient { private var client: GRPCClient private var rpcNumber: Int32 private var rpcType: Grpc_Testing_RpcType + private var messagesPerStream: Int32 private let rpcStats: NIOLockedValueBox init( client: GRPCClient, rpcNumber: Int32, rpcType: Grpc_Testing_RpcType, + messagesPerStream: Int32, histogramParams: Grpc_Testing_HistogramParams? ) { self.client = client self.rpcNumber = rpcNumber self.rpcType = rpcType + self.messagesPerStream = messagesPerStream let histogram: RPCStats.LatencyHistogram if let histogramParams = histogramParams { @@ -64,7 +67,13 @@ struct BenchmarkClient { try await withThrowingTaskGroup(of: Void.self) { rpcsGroup in for _ in 0 ..< self.rpcNumber { rpcsGroup.addTask { - let (latency, errorCode) = self.makeRPC(client: benchmarkClient, rpcType: self.rpcType) + let (latency, errorCode) = try await self.makeRPC( + benchmarkClient: benchmarkClient, + rpcType: self.rpcType + ) + guard errorCode != RPCError.Code.unknown else { + throw RPCError(code: .unknown, message: "The RPC type is UNRECOGNIZED.") + } self.rpcStats.withLockedValue { $0.latencyHistogram.record(latency) if let errorCode = errorCode { @@ -80,18 +89,123 @@ struct BenchmarkClient { } } + private func computeTimeAndErrorCode( + _ body: (Grpc_Testing_SimpleRequest) async throws -> Result + ) async throws -> (latency: Double, errorCode: RPCError.Code?) { + let request = Grpc_Testing_SimpleRequest.with { + $0.responseSize = 10 + } + let startTime = DispatchTime.now().uptimeNanoseconds + let result = try await body(request) + let endTime = DispatchTime.now().uptimeNanoseconds + + var errorCode: RPCError.Code? + switch result { + case .success: + errorCode = nil + case let .failure(error): + errorCode = error.code + } + return ( + latency: Double(endTime - startTime), errorCode: errorCode + ) + } + // The result is the number of nanoseconds for processing the RPC. private func makeRPC( - client: Grpc_Testing_BenchmarkServiceClient, + benchmarkClient: Grpc_Testing_BenchmarkServiceClient, rpcType: Grpc_Testing_RpcType - ) -> (latency: Double, errorCode: RPCError.Code?) { + ) async throws -> (latency: Double, errorCode: RPCError.Code?) { switch rpcType { - case .unary, .streaming, .streamingFromClient, .streamingFromServer, .streamingBothWays, - .UNRECOGNIZED: - let startTime = DispatchTime.now().uptimeNanoseconds - let endTime = DispatchTime.now().uptimeNanoseconds + case .unary: + return try await self.computeTimeAndErrorCode { request in + let responseStatus = try await benchmarkClient.unaryCall( + request: ClientRequest.Single(message: request) + ) { + response in + return response.accepted + } + + return responseStatus + } + + // Repeated sequence of one request followed by one response. + // It is a ping-pong of messages between the client and the server. + case .streaming: + return try await self.computeTimeAndErrorCode { request in + let ids = AsyncStream.makeStream(of: Int.self) + let streamingRequest = ClientRequest.Stream { writer in + for try await id in ids.stream { + if id <= self.messagesPerStream { + try await writer.write(request) + } else { + return + } + } + } + + ids.continuation.yield(1) + + let responseStatus = try await benchmarkClient.streamingCall(request: streamingRequest) { + response in + var id = 1 + for try await _ in response.messages { + id += 1 + ids.continuation.yield(id) + } + return response.accepted + } + + return responseStatus + } + + case .streamingFromClient: + return try await self.computeTimeAndErrorCode { request in + let streamingRequest = ClientRequest.Stream { writer in + for _ in 1 ... self.messagesPerStream { + try await writer.write(request) + } + } + + let responseStatus = try await benchmarkClient.streamingFromClient( + request: streamingRequest + ) { response in + return response.accepted + } + + return responseStatus + } + + case .streamingFromServer: + return try await self.computeTimeAndErrorCode { request in + let responseStatus = try await benchmarkClient.streamingFromServer( + request: ClientRequest.Single(message: request) + ) { response in + return response.accepted + } + + return responseStatus + } + + case .streamingBothWays: + return try await self.computeTimeAndErrorCode { request in + let streamingRequest = ClientRequest.Stream { writer in + for _ in 1 ... self.messagesPerStream { + try await writer.write(request) + } + } + + let responseStatus = try await benchmarkClient.streamingBothWays(request: streamingRequest) + { response in + return response.accepted + } + + return responseStatus + } + + case .UNRECOGNIZED: return ( - latency: Double(endTime - startTime), errorCode: RPCError.Code(.unimplemented) + latency: -1, errorCode: RPCError.Code(.unknown) ) } } diff --git a/Sources/performance-worker/WorkerService.swift b/Sources/performance-worker/WorkerService.swift index e9e61d14f..3d2ba01ac 100644 --- a/Sources/performance-worker/WorkerService.swift +++ b/Sources/performance-worker/WorkerService.swift @@ -333,6 +333,7 @@ extension WorkerService { client: grpcClient, rpcNumber: config.outstandingRpcsPerChannel, rpcType: config.rpcType, + messagesPerStream: config.messagesPerStream, histogramParams: config.histogramParams ) ) From acb9af196e42e1587b70f00fad5039bde3f11c9f Mon Sep 17 00:00:00 2001 From: Stefana Dranca Date: Wed, 17 Apr 2024 14:39:05 +0100 Subject: [PATCH 2/5] added accessible makeStream function for earlier versions of swift --- .../Internal/AsyncStream+MakeStream.swift | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) create mode 100644 Sources/performance-worker/Internal/AsyncStream+MakeStream.swift diff --git a/Sources/performance-worker/Internal/AsyncStream+MakeStream.swift b/Sources/performance-worker/Internal/AsyncStream+MakeStream.swift new file mode 100644 index 000000000..5f1b75dd6 --- /dev/null +++ b/Sources/performance-worker/Internal/AsyncStream+MakeStream.swift @@ -0,0 +1,32 @@ +/* + * Copyright 2024, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#if swift(<5.9) +@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) +extension AsyncStream { + @inlinable + static func makeStream( + of elementType: Element.Type = Element.self, + bufferingPolicy limit: AsyncStream.Continuation.BufferingPolicy = .unbounded + ) -> (stream: AsyncStream, continuation: AsyncStream.Continuation) { + var continuation: AsyncStream.Continuation! + let stream = AsyncStream(Element.self, bufferingPolicy: limit) { + continuation = $0 + } + return (stream, continuation) + } +} +#endif From 0027362201126ce65afa255018b083f3bdba812a Mon Sep 17 00:00:00 2001 From: Stefana Dranca Date: Thu, 18 Apr 2024 14:53:23 +0100 Subject: [PATCH 3/5] implemented timeIt function --- .../performance-worker/BenchmarkClient.swift | 198 ++++++++++-------- .../performance-worker/WorkerService.swift | 3 +- 2 files changed, 115 insertions(+), 86 deletions(-) diff --git a/Sources/performance-worker/BenchmarkClient.swift b/Sources/performance-worker/BenchmarkClient.swift index bbc77e05f..43c04c0a2 100644 --- a/Sources/performance-worker/BenchmarkClient.swift +++ b/Sources/performance-worker/BenchmarkClient.swift @@ -22,8 +22,9 @@ import NIOConcurrencyHelpers struct BenchmarkClient { private var client: GRPCClient private var rpcNumber: Int32 - private var rpcType: Grpc_Testing_RpcType + private var rpcType: RpcType private var messagesPerStream: Int32 + private var protoParams: Grpc_Testing_SimpleProtoParams private let rpcStats: NIOLockedValueBox init( @@ -31,12 +32,20 @@ struct BenchmarkClient { rpcNumber: Int32, rpcType: Grpc_Testing_RpcType, messagesPerStream: Int32, + protoParams: Grpc_Testing_SimpleProtoParams, histogramParams: Grpc_Testing_HistogramParams? - ) { + ) throws { self.client = client self.rpcNumber = rpcNumber - self.rpcType = rpcType self.messagesPerStream = messagesPerStream + self.protoParams = protoParams + + switch rpcType { + case .unary, .streaming, .streamingFromClient, .streamingFromServer, .streamingBothWays: + self.rpcType = RpcType(rawValue: rpcType.rawValue)! + case .UNRECOGNIZED: + throw RPCError(code: .unknown, message: "The RPC type is UNRECOGNIZED.") + } let histogram: RPCStats.LatencyHistogram if let histogramParams = histogramParams { @@ -51,6 +60,14 @@ struct BenchmarkClient { self.rpcStats = NIOLockedValueBox(RPCStats(latencyHistogram: histogram)) } + enum RpcType: Int { + case unary + case streaming + case streamingFromClient + case streamingFromServer + case streamingBothWays + } + internal var currentStats: RPCStats { return self.rpcStats.withLockedValue { stats in return stats @@ -71,9 +88,6 @@ struct BenchmarkClient { benchmarkClient: benchmarkClient, rpcType: self.rpcType ) - guard errorCode != RPCError.Code.unknown else { - throw RPCError(code: .unknown, message: "The RPC type is UNRECOGNIZED.") - } self.rpcStats.withLockedValue { $0.latencyHistogram.record(latency) if let errorCode = errorCode { @@ -89,124 +103,138 @@ struct BenchmarkClient { } } - private func computeTimeAndErrorCode( - _ body: (Grpc_Testing_SimpleRequest) async throws -> Result - ) async throws -> (latency: Double, errorCode: RPCError.Code?) { - let request = Grpc_Testing_SimpleRequest.with { - $0.responseSize = 10 - } + private func timeIt( + _ body: () async throws -> R + ) async rethrows -> (R, nanoseconds: Double) { let startTime = DispatchTime.now().uptimeNanoseconds - let result = try await body(request) + let result = try await body() let endTime = DispatchTime.now().uptimeNanoseconds - - var errorCode: RPCError.Code? - switch result { - case .success: - errorCode = nil - case let .failure(error): - errorCode = error.code - } - return ( - latency: Double(endTime - startTime), errorCode: errorCode - ) + return (result, nanoseconds: Double(endTime - startTime)) } // The result is the number of nanoseconds for processing the RPC. private func makeRPC( benchmarkClient: Grpc_Testing_BenchmarkServiceClient, - rpcType: Grpc_Testing_RpcType + rpcType: RpcType ) async throws -> (latency: Double, errorCode: RPCError.Code?) { + let request = Grpc_Testing_SimpleRequest.with { + $0.responseSize = self.protoParams.respSize + $0.payload = Grpc_Testing_Payload.with { + $0.body = Data(count: Int(self.protoParams.reqSize)) + } + } + switch rpcType { case .unary: - return try await self.computeTimeAndErrorCode { request in - let responseStatus = try await benchmarkClient.unaryCall( - request: ClientRequest.Single(message: request) - ) { - response in - return response.accepted + let (errorCode, nanoseconds): (RPCError.Code?, Double) = await self.timeIt { + do { + try await benchmarkClient.unaryCall(request: ClientRequest.Single(message: request)) { + response in + _ = try response.message + } + return nil + } catch let error as RPCError { + return error.code + } catch { + return .unknown } - - return responseStatus } + return (latency: nanoseconds, errorCode) // Repeated sequence of one request followed by one response. // It is a ping-pong of messages between the client and the server. case .streaming: - return try await self.computeTimeAndErrorCode { request in - let ids = AsyncStream.makeStream(of: Int.self) - let streamingRequest = ClientRequest.Stream { writer in - for try await id in ids.stream { - if id <= self.messagesPerStream { - try await writer.write(request) - } else { - return + let (errorCode, nanoseconds): (RPCError.Code?, Double) = await self.timeIt { + do { + let ids = AsyncStream.makeStream(of: Int.self) + let streamingRequest = ClientRequest.Stream { writer in + for try await id in ids.stream { + if id <= self.messagesPerStream { + try await writer.write(request) + } else { + return + } } } - } - ids.continuation.yield(1) + ids.continuation.yield(1) - let responseStatus = try await benchmarkClient.streamingCall(request: streamingRequest) { - response in - var id = 1 - for try await _ in response.messages { - id += 1 - ids.continuation.yield(id) + try await benchmarkClient.streamingCall(request: streamingRequest) { response in + var id = 1 + for try await _ in response.messages { + id += 1 + ids.continuation.yield(id) + } } - return response.accepted + return nil + } catch let error as RPCError { + return error.code + } catch { + return .unknown } - - return responseStatus } + return (latency: nanoseconds, errorCode) case .streamingFromClient: - return try await self.computeTimeAndErrorCode { request in - let streamingRequest = ClientRequest.Stream { writer in - for _ in 1 ... self.messagesPerStream { - try await writer.write(request) + let (errorCode, nanoseconds): (RPCError.Code?, Double) = await self.timeIt { + do { + let streamingRequest = ClientRequest.Stream { writer in + for _ in 1 ... self.messagesPerStream { + try await writer.write(request) + } } - } - let responseStatus = try await benchmarkClient.streamingFromClient( - request: streamingRequest - ) { response in - return response.accepted + _ = try await benchmarkClient.streamingFromClient( + request: streamingRequest + ) { response in + _ = try response.message + } + return nil + } catch let error as RPCError { + return error.code + } catch { + return .unknown } - - return responseStatus } + return (latency: nanoseconds, errorCode) case .streamingFromServer: - return try await self.computeTimeAndErrorCode { request in - let responseStatus = try await benchmarkClient.streamingFromServer( - request: ClientRequest.Single(message: request) - ) { response in - return response.accepted + let (errorCode, nanoseconds): (RPCError.Code?, Double) = await self.timeIt { + do { + try await benchmarkClient.streamingFromServer( + request: ClientRequest.Single(message: request) + ) { response in + for try await _ in response.messages {} + } + return nil + } catch let error as RPCError { + return error.code + } catch { + return .unknown } - - return responseStatus } + return (latency: nanoseconds, errorCode) case .streamingBothWays: - return try await self.computeTimeAndErrorCode { request in - let streamingRequest = ClientRequest.Stream { writer in - for _ in 1 ... self.messagesPerStream { - try await writer.write(request) + let (errorCode, nanoseconds): (RPCError.Code?, Double) = await self.timeIt { + do { + let streamingRequest = ClientRequest.Stream { writer in + for _ in 1 ... self.messagesPerStream { + try await writer.write(request) + } } - } - let responseStatus = try await benchmarkClient.streamingBothWays(request: streamingRequest) - { response in - return response.accepted + try await benchmarkClient.streamingBothWays(request: streamingRequest) { response in + for try await _ in response.messages {} + } + return nil + } catch let error as RPCError { + return error.code + } catch { + return .unknown } - - return responseStatus } - - case .UNRECOGNIZED: - return ( - latency: -1, errorCode: RPCError.Code(.unknown) - ) + return (latency: nanoseconds, errorCode) } } diff --git a/Sources/performance-worker/WorkerService.swift b/Sources/performance-worker/WorkerService.swift index 3d2ba01ac..5a8d8a427 100644 --- a/Sources/performance-worker/WorkerService.swift +++ b/Sources/performance-worker/WorkerService.swift @@ -328,12 +328,13 @@ extension WorkerService { var clients = [BenchmarkClient]() for _ in 0 ..< config.clientChannels { let grpcClient = self.makeGRPCClient() - clients.append( + try clients.append( BenchmarkClient( client: grpcClient, rpcNumber: config.outstandingRpcsPerChannel, rpcType: config.rpcType, messagesPerStream: config.messagesPerStream, + protoParams: config.payloadConfig.simpleParams, histogramParams: config.histogramParams ) ) From d6d8fd1c9bc73bec2af5061212e75309eb896fc8 Mon Sep 17 00:00:00 2001 From: Stefana Dranca Date: Thu, 18 Apr 2024 16:39:35 +0100 Subject: [PATCH 4/5] fixed rpctype switching --- .../performance-worker/BenchmarkClient.swift | 34 ++++++++----------- .../performance-worker/WorkerService.swift | 18 +++++++++- 2 files changed, 31 insertions(+), 21 deletions(-) diff --git a/Sources/performance-worker/BenchmarkClient.swift b/Sources/performance-worker/BenchmarkClient.swift index 43c04c0a2..7bce34d82 100644 --- a/Sources/performance-worker/BenchmarkClient.swift +++ b/Sources/performance-worker/BenchmarkClient.swift @@ -30,7 +30,7 @@ struct BenchmarkClient { init( client: GRPCClient, rpcNumber: Int32, - rpcType: Grpc_Testing_RpcType, + rpcType: RpcType, messagesPerStream: Int32, protoParams: Grpc_Testing_SimpleProtoParams, histogramParams: Grpc_Testing_HistogramParams? @@ -39,13 +39,7 @@ struct BenchmarkClient { self.rpcNumber = rpcNumber self.messagesPerStream = messagesPerStream self.protoParams = protoParams - - switch rpcType { - case .unary, .streaming, .streamingFromClient, .streamingFromServer, .streamingBothWays: - self.rpcType = RpcType(rawValue: rpcType.rawValue)! - case .UNRECOGNIZED: - throw RPCError(code: .unknown, message: "The RPC type is UNRECOGNIZED.") - } + self.rpcType = rpcType let histogram: RPCStats.LatencyHistogram if let histogramParams = histogramParams { @@ -60,7 +54,7 @@ struct BenchmarkClient { self.rpcStats = NIOLockedValueBox(RPCStats(latencyHistogram: histogram)) } - enum RpcType: Int { + enum RpcType { case unary case streaming case streamingFromClient @@ -85,8 +79,7 @@ struct BenchmarkClient { for _ in 0 ..< self.rpcNumber { rpcsGroup.addTask { let (latency, errorCode) = try await self.makeRPC( - benchmarkClient: benchmarkClient, - rpcType: self.rpcType + benchmarkClient: benchmarkClient ) self.rpcStats.withLockedValue { $0.latencyHistogram.record(latency) @@ -114,21 +107,22 @@ struct BenchmarkClient { // The result is the number of nanoseconds for processing the RPC. private func makeRPC( - benchmarkClient: Grpc_Testing_BenchmarkServiceClient, - rpcType: RpcType + benchmarkClient: Grpc_Testing_BenchmarkServiceClient ) async throws -> (latency: Double, errorCode: RPCError.Code?) { - let request = Grpc_Testing_SimpleRequest.with { + let message = Grpc_Testing_SimpleRequest.with { $0.responseSize = self.protoParams.respSize $0.payload = Grpc_Testing_Payload.with { $0.body = Data(count: Int(self.protoParams.reqSize)) } } - switch rpcType { + switch self.rpcType { case .unary: let (errorCode, nanoseconds): (RPCError.Code?, Double) = await self.timeIt { do { - try await benchmarkClient.unaryCall(request: ClientRequest.Single(message: request)) { + try await benchmarkClient.unaryCall( + request: ClientRequest.Single(message: message) + ) { response in _ = try response.message } @@ -150,7 +144,7 @@ struct BenchmarkClient { let streamingRequest = ClientRequest.Stream { writer in for try await id in ids.stream { if id <= self.messagesPerStream { - try await writer.write(request) + try await writer.write(message) } else { return } @@ -180,7 +174,7 @@ struct BenchmarkClient { do { let streamingRequest = ClientRequest.Stream { writer in for _ in 1 ... self.messagesPerStream { - try await writer.write(request) + try await writer.write(message) } } @@ -202,7 +196,7 @@ struct BenchmarkClient { let (errorCode, nanoseconds): (RPCError.Code?, Double) = await self.timeIt { do { try await benchmarkClient.streamingFromServer( - request: ClientRequest.Single(message: request) + request: ClientRequest.Single(message: message) ) { response in for try await _ in response.messages {} } @@ -220,7 +214,7 @@ struct BenchmarkClient { do { let streamingRequest = ClientRequest.Stream { writer in for _ in 1 ... self.messagesPerStream { - try await writer.write(request) + try await writer.write(message) } } diff --git a/Sources/performance-worker/WorkerService.swift b/Sources/performance-worker/WorkerService.swift index 5a8d8a427..18fb70cc2 100644 --- a/Sources/performance-worker/WorkerService.swift +++ b/Sources/performance-worker/WorkerService.swift @@ -325,6 +325,22 @@ extension WorkerService { } private func setupClients(_ config: Grpc_Testing_ClientConfig) async throws -> [BenchmarkClient] { + let rpcType: BenchmarkClient.RpcType + switch config.rpcType { + case .unary: + rpcType = .unary + case .streaming: + rpcType = .streaming + case .streamingFromClient: + rpcType = .streamingFromClient + case .streamingFromServer: + rpcType = .streamingFromServer + case .streamingBothWays: + rpcType = .streamingBothWays + case .UNRECOGNIZED: + throw RPCError(code: .unknown, message: "The RPC type is UNRECOGNIZED.") + } + var clients = [BenchmarkClient]() for _ in 0 ..< config.clientChannels { let grpcClient = self.makeGRPCClient() @@ -332,7 +348,7 @@ extension WorkerService { BenchmarkClient( client: grpcClient, rpcNumber: config.outstandingRpcsPerChannel, - rpcType: config.rpcType, + rpcType: rpcType, messagesPerStream: config.messagesPerStream, protoParams: config.payloadConfig.simpleParams, histogramParams: config.histogramParams From b7adcbd1491ba375871195587faea742870607e3 Mon Sep 17 00:00:00 2001 From: Stefana Dranca Date: Fri, 19 Apr 2024 09:28:32 +0100 Subject: [PATCH 5/5] fixed minor problems --- Sources/performance-worker/BenchmarkClient.swift | 13 ++++++------- Sources/performance-worker/WorkerService.swift | 4 ++-- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/Sources/performance-worker/BenchmarkClient.swift b/Sources/performance-worker/BenchmarkClient.swift index 7bce34d82..7945003b9 100644 --- a/Sources/performance-worker/BenchmarkClient.swift +++ b/Sources/performance-worker/BenchmarkClient.swift @@ -22,7 +22,7 @@ import NIOConcurrencyHelpers struct BenchmarkClient { private var client: GRPCClient private var rpcNumber: Int32 - private var rpcType: RpcType + private var rpcType: RPCType private var messagesPerStream: Int32 private var protoParams: Grpc_Testing_SimpleProtoParams private let rpcStats: NIOLockedValueBox @@ -30,11 +30,11 @@ struct BenchmarkClient { init( client: GRPCClient, rpcNumber: Int32, - rpcType: RpcType, + rpcType: RPCType, messagesPerStream: Int32, protoParams: Grpc_Testing_SimpleProtoParams, histogramParams: Grpc_Testing_HistogramParams? - ) throws { + ) { self.client = client self.rpcNumber = rpcNumber self.messagesPerStream = messagesPerStream @@ -54,7 +54,7 @@ struct BenchmarkClient { self.rpcStats = NIOLockedValueBox(RPCStats(latencyHistogram: histogram)) } - enum RpcType { + enum RPCType { case unary case streaming case streamingFromClient @@ -122,8 +122,7 @@ struct BenchmarkClient { do { try await benchmarkClient.unaryCall( request: ClientRequest.Single(message: message) - ) { - response in + ) { response in _ = try response.message } return nil @@ -178,7 +177,7 @@ struct BenchmarkClient { } } - _ = try await benchmarkClient.streamingFromClient( + try await benchmarkClient.streamingFromClient( request: streamingRequest ) { response in _ = try response.message diff --git a/Sources/performance-worker/WorkerService.swift b/Sources/performance-worker/WorkerService.swift index 18fb70cc2..260cde4a1 100644 --- a/Sources/performance-worker/WorkerService.swift +++ b/Sources/performance-worker/WorkerService.swift @@ -325,7 +325,7 @@ extension WorkerService { } private func setupClients(_ config: Grpc_Testing_ClientConfig) async throws -> [BenchmarkClient] { - let rpcType: BenchmarkClient.RpcType + let rpcType: BenchmarkClient.RPCType switch config.rpcType { case .unary: rpcType = .unary @@ -344,7 +344,7 @@ extension WorkerService { var clients = [BenchmarkClient]() for _ in 0 ..< config.clientChannels { let grpcClient = self.makeGRPCClient() - try clients.append( + clients.append( BenchmarkClient( client: grpcClient, rpcNumber: config.outstandingRpcsPerChannel,