diff --git a/Sources/performance-worker/BenchmarkService.swift b/Sources/performance-worker/BenchmarkService.swift new file mode 100644 index 000000000..67d1de679 --- /dev/null +++ b/Sources/performance-worker/BenchmarkService.swift @@ -0,0 +1,176 @@ +/* + * Copyright 2024, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Atomics +import GRPCCore + +import struct Foundation.Data + +@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) +struct BenchmarkService: Grpc_Testing_BenchmarkService.ServiceProtocol { + /// Used to check if the server can be streaming responses. + private let working = ManagedAtomic(true) + + /// One request followed by one response. + /// The server returns a client payload with the size requested by the client. + func unaryCall( + request: GRPCCore.ServerRequest.Single + ) async throws + -> GRPCCore.ServerResponse.Single + { + // Throw an error if the status is not `ok`. Otherwise, an `ok` status is automatically sent + // if the request is successful. + if request.message.responseStatus.isInitialized { + try self.checkOkStatus(request.message.responseStatus) + } + + return ServerResponse.Single( + message: Grpc_Testing_BenchmarkService.Method.UnaryCall.Output.with { + $0.payload = Grpc_Testing_Payload.with { + $0.body = Data(count: Int(request.message.responseSize)) + } + } + ) + } + + /// Repeated sequence of one request followed by one response. + /// The server returns a payload with the size requested by the client for each received message. + func streamingCall( + request: GRPCCore.ServerRequest.Stream + ) async throws + -> GRPCCore.ServerResponse.Stream + { + return ServerResponse.Stream { writer in + for try await message in request.messages { + if message.responseStatus.isInitialized { + try self.checkOkStatus(message.responseStatus) + } + try await writer.write( + Grpc_Testing_BenchmarkService.Method.StreamingCall.Output.with { + $0.payload = Grpc_Testing_Payload.with { + $0.body = Data(count: Int(message.responseSize)) + } + } + ) + } + return [:] + } + } + + /// Single-sided unbounded streaming from client to server. + /// The server returns a payload with the size requested by the client once the client does WritesDone. + func streamingFromClient( + request: ServerRequest.Stream + ) async throws + -> ServerResponse.Single + { + var responseSize = 0 + for try await message in request.messages { + if message.responseStatus.isInitialized { + try self.checkOkStatus(message.responseStatus) + } + responseSize = Int(message.responseSize) + } + + return ServerResponse.Single( + message: Grpc_Testing_BenchmarkService.Method.StreamingFromClient.Output.with { + $0.payload = Grpc_Testing_Payload.with { + $0.body = Data(count: responseSize) + } + } + ) + } + + /// Single-sided unbounded streaming from server to client. + /// The server repeatedly returns a payload with the size requested by the client. + func streamingFromServer( + request: ServerRequest.Single + ) async throws + -> ServerResponse.Stream + { + if request.message.responseStatus.isInitialized { + try self.checkOkStatus(request.message.responseStatus) + } + let response = Grpc_Testing_BenchmarkService.Method.StreamingCall.Output.with { + $0.payload = Grpc_Testing_Payload.with { + $0.body = Data(count: Int(request.message.responseSize)) + } + } + return ServerResponse.Stream { writer in + while working.load(ordering: .relaxed) { + try await writer.write(response) + } + return [:] + } + } + + /// Two-sided unbounded streaming between server to client. + /// Both sides send the content of their own choice to the other. + func streamingBothWays( + request: GRPCCore.ServerRequest.Stream< + Grpc_Testing_BenchmarkService.Method.StreamingBothWays.Input + > + ) async throws + -> ServerResponse.Stream + { + // The 100 size is used by the other implementations as well. + // We are using the same canned response size for all responses + // as it is allowed by the spec. + let response = Grpc_Testing_BenchmarkService.Method.StreamingCall.Output.with { + $0.payload = Grpc_Testing_Payload.with { + $0.body = Data(count: 100) + } + } + + // Marks if the inbound streaming is ongoing or finished. + let inboundStreaming = ManagedAtomic(true) + + return ServerResponse.Stream { writer in + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + for try await message in request.messages { + if message.responseStatus.isInitialized { + try self.checkOkStatus(message.responseStatus) + } + } + inboundStreaming.store(false, ordering: .relaxed) + } + group.addTask { + while inboundStreaming.load(ordering: .relaxed) + && self.working.load(ordering: .acquiring) + { + try await writer.write(response) + } + } + try await group.next() + group.cancelAll() + return [:] + } + } + } +} + +@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) +extension BenchmarkService { + private func checkOkStatus(_ responseStatus: Grpc_Testing_EchoStatus) throws { + guard let code = Status.Code(rawValue: Int(responseStatus.code)) else { + throw RPCError(code: .invalidArgument, message: "The response status code is invalid.") + } + if let code = RPCError.Code(code) { + throw RPCError(code: code, message: responseStatus.message) + } + } +}