diff --git a/Sources/performance-worker/BenchmarkClient.swift b/Sources/performance-worker/BenchmarkClient.swift index 63d7506b6..7945003b9 100644 --- a/Sources/performance-worker/BenchmarkClient.swift +++ b/Sources/performance-worker/BenchmarkClient.swift @@ -22,17 +22,23 @@ import NIOConcurrencyHelpers struct BenchmarkClient { private var client: GRPCClient private var rpcNumber: Int32 - private var rpcType: Grpc_Testing_RpcType + private var rpcType: RPCType + private var messagesPerStream: Int32 + private var protoParams: Grpc_Testing_SimpleProtoParams private let rpcStats: NIOLockedValueBox init( client: GRPCClient, rpcNumber: Int32, - rpcType: Grpc_Testing_RpcType, + rpcType: RPCType, + messagesPerStream: Int32, + protoParams: Grpc_Testing_SimpleProtoParams, histogramParams: Grpc_Testing_HistogramParams? ) { self.client = client self.rpcNumber = rpcNumber + self.messagesPerStream = messagesPerStream + self.protoParams = protoParams self.rpcType = rpcType let histogram: RPCStats.LatencyHistogram @@ -48,6 +54,14 @@ struct BenchmarkClient { self.rpcStats = NIOLockedValueBox(RPCStats(latencyHistogram: histogram)) } + enum RPCType { + case unary + case streaming + case streamingFromClient + case streamingFromServer + case streamingBothWays + } + internal var currentStats: RPCStats { return self.rpcStats.withLockedValue { stats in return stats @@ -64,7 +78,9 @@ struct BenchmarkClient { try await withThrowingTaskGroup(of: Void.self) { rpcsGroup in for _ in 0 ..< self.rpcNumber { rpcsGroup.addTask { - let (latency, errorCode) = self.makeRPC(client: benchmarkClient, rpcType: self.rpcType) + let (latency, errorCode) = try await self.makeRPC( + benchmarkClient: benchmarkClient + ) self.rpcStats.withLockedValue { $0.latencyHistogram.record(latency) if let errorCode = errorCode { @@ -80,19 +96,138 @@ struct BenchmarkClient { } } + private func timeIt( + _ body: () async throws -> R + ) async rethrows -> (R, nanoseconds: Double) { + let startTime = DispatchTime.now().uptimeNanoseconds + let result = try await body() + let endTime = DispatchTime.now().uptimeNanoseconds + return (result, nanoseconds: Double(endTime - startTime)) + } + // The result is the number of nanoseconds for processing the RPC. private func makeRPC( - client: Grpc_Testing_BenchmarkServiceClient, - rpcType: Grpc_Testing_RpcType - ) -> (latency: Double, errorCode: RPCError.Code?) { - switch rpcType { - case .unary, .streaming, .streamingFromClient, .streamingFromServer, .streamingBothWays, - .UNRECOGNIZED: - let startTime = DispatchTime.now().uptimeNanoseconds - let endTime = DispatchTime.now().uptimeNanoseconds - return ( - latency: Double(endTime - startTime), errorCode: RPCError.Code(.unimplemented) - ) + benchmarkClient: Grpc_Testing_BenchmarkServiceClient + ) async throws -> (latency: Double, errorCode: RPCError.Code?) { + let message = Grpc_Testing_SimpleRequest.with { + $0.responseSize = self.protoParams.respSize + $0.payload = Grpc_Testing_Payload.with { + $0.body = Data(count: Int(self.protoParams.reqSize)) + } + } + + switch self.rpcType { + case .unary: + let (errorCode, nanoseconds): (RPCError.Code?, Double) = await self.timeIt { + do { + try await benchmarkClient.unaryCall( + request: ClientRequest.Single(message: message) + ) { response in + _ = try response.message + } + return nil + } catch let error as RPCError { + return error.code + } catch { + return .unknown + } + } + return (latency: nanoseconds, errorCode) + + // Repeated sequence of one request followed by one response. + // It is a ping-pong of messages between the client and the server. + case .streaming: + let (errorCode, nanoseconds): (RPCError.Code?, Double) = await self.timeIt { + do { + let ids = AsyncStream.makeStream(of: Int.self) + let streamingRequest = ClientRequest.Stream { writer in + for try await id in ids.stream { + if id <= self.messagesPerStream { + try await writer.write(message) + } else { + return + } + } + } + + ids.continuation.yield(1) + + try await benchmarkClient.streamingCall(request: streamingRequest) { response in + var id = 1 + for try await _ in response.messages { + id += 1 + ids.continuation.yield(id) + } + } + return nil + } catch let error as RPCError { + return error.code + } catch { + return .unknown + } + } + return (latency: nanoseconds, errorCode) + + case .streamingFromClient: + let (errorCode, nanoseconds): (RPCError.Code?, Double) = await self.timeIt { + do { + let streamingRequest = ClientRequest.Stream { writer in + for _ in 1 ... self.messagesPerStream { + try await writer.write(message) + } + } + + try await benchmarkClient.streamingFromClient( + request: streamingRequest + ) { response in + _ = try response.message + } + return nil + } catch let error as RPCError { + return error.code + } catch { + return .unknown + } + } + return (latency: nanoseconds, errorCode) + + case .streamingFromServer: + let (errorCode, nanoseconds): (RPCError.Code?, Double) = await self.timeIt { + do { + try await benchmarkClient.streamingFromServer( + request: ClientRequest.Single(message: message) + ) { response in + for try await _ in response.messages {} + } + return nil + } catch let error as RPCError { + return error.code + } catch { + return .unknown + } + } + return (latency: nanoseconds, errorCode) + + case .streamingBothWays: + let (errorCode, nanoseconds): (RPCError.Code?, Double) = await self.timeIt { + do { + let streamingRequest = ClientRequest.Stream { writer in + for _ in 1 ... self.messagesPerStream { + try await writer.write(message) + } + } + + try await benchmarkClient.streamingBothWays(request: streamingRequest) { response in + for try await _ in response.messages {} + } + return nil + } catch let error as RPCError { + return error.code + } catch { + return .unknown + } + } + return (latency: nanoseconds, errorCode) } } diff --git a/Sources/performance-worker/Internal/AsyncStream+MakeStream.swift b/Sources/performance-worker/Internal/AsyncStream+MakeStream.swift new file mode 100644 index 000000000..5f1b75dd6 --- /dev/null +++ b/Sources/performance-worker/Internal/AsyncStream+MakeStream.swift @@ -0,0 +1,32 @@ +/* + * Copyright 2024, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#if swift(<5.9) +@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) +extension AsyncStream { + @inlinable + static func makeStream( + of elementType: Element.Type = Element.self, + bufferingPolicy limit: AsyncStream.Continuation.BufferingPolicy = .unbounded + ) -> (stream: AsyncStream, continuation: AsyncStream.Continuation) { + var continuation: AsyncStream.Continuation! + let stream = AsyncStream(Element.self, bufferingPolicy: limit) { + continuation = $0 + } + return (stream, continuation) + } +} +#endif diff --git a/Sources/performance-worker/WorkerService.swift b/Sources/performance-worker/WorkerService.swift index e9e61d14f..260cde4a1 100644 --- a/Sources/performance-worker/WorkerService.swift +++ b/Sources/performance-worker/WorkerService.swift @@ -325,6 +325,22 @@ extension WorkerService { } private func setupClients(_ config: Grpc_Testing_ClientConfig) async throws -> [BenchmarkClient] { + let rpcType: BenchmarkClient.RPCType + switch config.rpcType { + case .unary: + rpcType = .unary + case .streaming: + rpcType = .streaming + case .streamingFromClient: + rpcType = .streamingFromClient + case .streamingFromServer: + rpcType = .streamingFromServer + case .streamingBothWays: + rpcType = .streamingBothWays + case .UNRECOGNIZED: + throw RPCError(code: .unknown, message: "The RPC type is UNRECOGNIZED.") + } + var clients = [BenchmarkClient]() for _ in 0 ..< config.clientChannels { let grpcClient = self.makeGRPCClient() @@ -332,7 +348,9 @@ extension WorkerService { BenchmarkClient( client: grpcClient, rpcNumber: config.outstandingRpcsPerChannel, - rpcType: config.rpcType, + rpcType: rpcType, + messagesPerStream: config.messagesPerStream, + protoParams: config.payloadConfig.simpleParams, histogramParams: config.histogramParams ) )