From 10b6a3668cf52c28e6bd734ecc6010e743a10fd5 Mon Sep 17 00:00:00 2001 From: Stefana Dranca Date: Wed, 13 Mar 2024 17:27:05 +0000 Subject: [PATCH 1/9] QPS Benchmark Service implementation Motivation: The 2 workers used in QPS testing are a Benchmark service client and server respectivelly, so we need to implement the Benchmark Service. Modifications: Implemented the 'BenchmarkService' struct that defines the service protocol methods, based on their documentation. Result: We will be able to proceed with the Wrorker Service implementation. --- .../performance-worker/BenchmarkService.swift | 135 ++++++++++++++++++ 1 file changed, 135 insertions(+) create mode 100644 Sources/performance-worker/BenchmarkService.swift diff --git a/Sources/performance-worker/BenchmarkService.swift b/Sources/performance-worker/BenchmarkService.swift new file mode 100644 index 000000000..540c619b0 --- /dev/null +++ b/Sources/performance-worker/BenchmarkService.swift @@ -0,0 +1,135 @@ +/* + * Copyright 2024, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import GRPCCore + +import struct Foundation.Data + +@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) +struct BenchmarkService: Grpc_Testing_BenchmarkService.ServiceProtocol { + /// One request followed by one response. + /// The server returns the client payload as-is. + func unaryCall( + request: GRPCCore.ServerRequest.Single + ) async throws + -> GRPCCore.ServerResponse.Single + { + + // Throw an error if the status is not `ok`. Otherwise, an `ok` status is automatically sent + // if the request is successful. + if request.message.responseStatus.isInitialized { + try self.echoStatus(responseStatus: request.message.responseStatus) + } + + return ServerResponse.Single( + message: Grpc_Testing_BenchmarkService.Method.UnaryCall.Output.with { + $0.payload = request.message.payload + } + ) + } + + /// Repeated sequence of one request followed by one response. + /// The server returns the client payload as-is on each response + func streamingCall( + request: GRPCCore.ServerRequest.Stream + ) async throws + -> GRPCCore.ServerResponse.Stream + { + return ServerResponse.Stream { writer in + for try await message in request.messages { + if message.responseStatus.isInitialized { + try self.echoStatus(responseStatus: message.responseStatus) + } + try await writer.write( + Grpc_Testing_BenchmarkService.Method.StreamingCall.Output.with { + $0.payload = message.payload + } + ) + } + return [:] + } + } + + /// Single-sided unbounded streaming from client to server + /// The server returns the client payload as-is once the client does WritesDone + func streamingFromClient( + request: ServerRequest.Stream + ) async throws + -> ServerResponse.Single + { + throw RPCError(code: .unimplemented, message: "The RPC is not implemented.") + } + + /// Single-sided unbounded streaming from server to client + /// The server repeatedly returns the client payload as-is + func streamingFromServer( + request: ServerRequest.Single + ) async throws + -> ServerResponse.Stream + { + return ServerResponse.Stream { writer in + if request.message.responseStatus.isInitialized { + try self.echoStatus(responseStatus: request.message.responseStatus) + } + + while true { + try await writer.write( + Grpc_Testing_BenchmarkService.Method.StreamingCall.Output.with { + $0.payload = request.message.payload + } + ) + try await Task.sleep(nanoseconds: 10) + } + } + } + + /// Two-sided unbounded streaming between server to client. + /// Both sides send the content of their own choice to the other. + func streamingBothWays( + request: GRPCCore.ServerRequest.Stream< + Grpc_Testing_BenchmarkService.Method.StreamingBothWays.Input + > + ) async throws + -> GRPCCore.ServerResponse.Stream + { + return ServerResponse.Stream { writer in + while true { + try await writer.write( + Grpc_Testing_BenchmarkService.Method.StreamingCall.Output.with { + $0.payload = Grpc_Testing_Payload.with { + $0.body = Data(count: Int.random(in: 1 ..< 10)) + } + } + ) + try await Task.sleep(nanoseconds: 10) + } + } + } +} + +@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) +extension BenchmarkService { + private func echoStatus(responseStatus: Grpc_Testing_EchoStatus) throws { + guard let code = Status.Code(rawValue: Int(responseStatus.code)) + else { + throw RPCError(code: .invalidArgument, message: "The response status code is invalid.") + } + let status = Status(code: code, message: responseStatus.message) + if let error = RPCError(status: status) { + throw error + } + } +} From 56edde9d6576c006cfbc0efc31ef077756a6b23e Mon Sep 17 00:00:00 2001 From: Stefana Dranca Date: Thu, 14 Mar 2024 14:39:50 +0000 Subject: [PATCH 2/9] implemented feedback --- .../performance-worker/BenchmarkService.swift | 57 +++++++++++++------ 1 file changed, 39 insertions(+), 18 deletions(-) diff --git a/Sources/performance-worker/BenchmarkService.swift b/Sources/performance-worker/BenchmarkService.swift index 540c619b0..97ebc60fb 100644 --- a/Sources/performance-worker/BenchmarkService.swift +++ b/Sources/performance-worker/BenchmarkService.swift @@ -14,12 +14,16 @@ * limitations under the License. */ +import Atomics import GRPCCore import struct Foundation.Data @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) struct BenchmarkService: Grpc_Testing_BenchmarkService.ServiceProtocol { + /// Used to check if + var working = ManagedAtomic(true) + /// One request followed by one response. /// The server returns the client payload as-is. func unaryCall( @@ -27,16 +31,17 @@ struct BenchmarkService: Grpc_Testing_BenchmarkService.ServiceProtocol { ) async throws -> GRPCCore.ServerResponse.Single { - // Throw an error if the status is not `ok`. Otherwise, an `ok` status is automatically sent // if the request is successful. if request.message.responseStatus.isInitialized { - try self.echoStatus(responseStatus: request.message.responseStatus) + try self.checkOkStatus(request.message.responseStatus) } return ServerResponse.Single( message: Grpc_Testing_BenchmarkService.Method.UnaryCall.Output.with { - $0.payload = request.message.payload + $0.payload = Grpc_Testing_Payload.with { + $0.body = Data(count: Int(request.message.responseSize)) + } } ) } @@ -51,11 +56,13 @@ struct BenchmarkService: Grpc_Testing_BenchmarkService.ServiceProtocol { return ServerResponse.Stream { writer in for try await message in request.messages { if message.responseStatus.isInitialized { - try self.echoStatus(responseStatus: message.responseStatus) + try self.checkOkStatus(message.responseStatus) } try await writer.write( Grpc_Testing_BenchmarkService.Method.StreamingCall.Output.with { - $0.payload = message.payload + $0.payload = Grpc_Testing_Payload.with { + $0.body = Data(count: Int(message.responseSize)) + } } ) } @@ -70,7 +77,21 @@ struct BenchmarkService: Grpc_Testing_BenchmarkService.ServiceProtocol { ) async throws -> ServerResponse.Single { - throw RPCError(code: .unimplemented, message: "The RPC is not implemented.") + var responseSize = 0 + for try await message in request.messages { + if message.responseStatus.isInitialized { + try self.checkOkStatus(message.responseStatus) + } + responseSize = Int(message.responseSize) + } + + return ServerResponse.Single( + message: Grpc_Testing_BenchmarkService.Method.StreamingFromClient.Output.with { + $0.payload = Grpc_Testing_Payload.with { + $0.body = Data(count: responseSize) + } + } + ) } /// Single-sided unbounded streaming from server to client @@ -82,17 +103,19 @@ struct BenchmarkService: Grpc_Testing_BenchmarkService.ServiceProtocol { { return ServerResponse.Stream { writer in if request.message.responseStatus.isInitialized { - try self.echoStatus(responseStatus: request.message.responseStatus) + try self.checkOkStatus(request.message.responseStatus) } - while true { + while working.load(ordering: .acquiring) { try await writer.write( Grpc_Testing_BenchmarkService.Method.StreamingCall.Output.with { - $0.payload = request.message.payload + $0.payload = Grpc_Testing_Payload.with { + $0.body = Data(count: Int(request.message.responseSize)) + } } ) - try await Task.sleep(nanoseconds: 10) } + return [:] } } @@ -106,7 +129,7 @@ struct BenchmarkService: Grpc_Testing_BenchmarkService.ServiceProtocol { -> GRPCCore.ServerResponse.Stream { return ServerResponse.Stream { writer in - while true { + while working.load(ordering: .acquiring) { try await writer.write( Grpc_Testing_BenchmarkService.Method.StreamingCall.Output.with { $0.payload = Grpc_Testing_Payload.with { @@ -114,22 +137,20 @@ struct BenchmarkService: Grpc_Testing_BenchmarkService.ServiceProtocol { } } ) - try await Task.sleep(nanoseconds: 10) } + return [:] } } } @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) extension BenchmarkService { - private func echoStatus(responseStatus: Grpc_Testing_EchoStatus) throws { - guard let code = Status.Code(rawValue: Int(responseStatus.code)) - else { + private func checkOkStatus(_ responseStatus: Grpc_Testing_EchoStatus) throws { + guard let code = Status.Code(rawValue: Int(responseStatus.code)) else { throw RPCError(code: .invalidArgument, message: "The response status code is invalid.") } - let status = Status(code: code, message: responseStatus.message) - if let error = RPCError(status: status) { - throw error + if let code = RPCError.Code(code) { + throw RPCError(code: code, message: responseStatus.message) } } } From 97defe4f41f5ce455ba7088efe2d248b084241bf Mon Sep 17 00:00:00 2001 From: Stefana Dranca Date: Fri, 15 Mar 2024 11:00:36 +0000 Subject: [PATCH 3/9] fixed streamingbothways --- Sources/performance-worker/BenchmarkService.swift | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/Sources/performance-worker/BenchmarkService.swift b/Sources/performance-worker/BenchmarkService.swift index 97ebc60fb..ea32ebc49 100644 --- a/Sources/performance-worker/BenchmarkService.swift +++ b/Sources/performance-worker/BenchmarkService.swift @@ -128,12 +128,17 @@ struct BenchmarkService: Grpc_Testing_BenchmarkService.ServiceProtocol { ) async throws -> GRPCCore.ServerResponse.Stream { + for try await message in request.messages { + if message.responseStatus.isInitialized { + try self.checkOkStatus(message.responseStatus) + } + } return ServerResponse.Stream { writer in while working.load(ordering: .acquiring) { try await writer.write( Grpc_Testing_BenchmarkService.Method.StreamingCall.Output.with { $0.payload = Grpc_Testing_Payload.with { - $0.body = Data(count: Int.random(in: 1 ..< 10)) + $0.body = Data(count: 100) } } ) From 2a9fd9f41bc7020660a1c8c792943ceb67957fa0 Mon Sep 17 00:00:00 2001 From: Stefana Dranca Date: Fri, 15 Mar 2024 11:07:11 +0000 Subject: [PATCH 4/9] fixed documentation --- Sources/performance-worker/BenchmarkService.swift | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/Sources/performance-worker/BenchmarkService.swift b/Sources/performance-worker/BenchmarkService.swift index ea32ebc49..a1694a9f1 100644 --- a/Sources/performance-worker/BenchmarkService.swift +++ b/Sources/performance-worker/BenchmarkService.swift @@ -25,7 +25,7 @@ struct BenchmarkService: Grpc_Testing_BenchmarkService.ServiceProtocol { var working = ManagedAtomic(true) /// One request followed by one response. - /// The server returns the client payload as-is. + /// The server returns a client payload with the size requested by the client. func unaryCall( request: GRPCCore.ServerRequest.Single ) async throws @@ -47,7 +47,7 @@ struct BenchmarkService: Grpc_Testing_BenchmarkService.ServiceProtocol { } /// Repeated sequence of one request followed by one response. - /// The server returns the client payload as-is on each response + /// The server returns a payload with the size requested by the client for each received message. func streamingCall( request: GRPCCore.ServerRequest.Stream ) async throws @@ -70,8 +70,8 @@ struct BenchmarkService: Grpc_Testing_BenchmarkService.ServiceProtocol { } } - /// Single-sided unbounded streaming from client to server - /// The server returns the client payload as-is once the client does WritesDone + /// Single-sided unbounded streaming from client to server. + /// The server returns a payload with the size requested by the client once the client does WritesDone. func streamingFromClient( request: ServerRequest.Stream ) async throws @@ -94,8 +94,8 @@ struct BenchmarkService: Grpc_Testing_BenchmarkService.ServiceProtocol { ) } - /// Single-sided unbounded streaming from server to client - /// The server repeatedly returns the client payload as-is + /// Single-sided unbounded streaming from server to client. + /// The server repeatedly returns a payload with the size requested by the client. func streamingFromServer( request: ServerRequest.Single ) async throws From 9e3748d7f8a670edd42fd352611bcfd1805cb60e Mon Sep 17 00:00:00 2001 From: Stefana Dranca Date: Fri, 15 Mar 2024 14:32:04 +0000 Subject: [PATCH 5/9] implemented feedback --- .../performance-worker/BenchmarkService.swift | 40 +++++++++---------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/Sources/performance-worker/BenchmarkService.swift b/Sources/performance-worker/BenchmarkService.swift index a1694a9f1..f6b773b55 100644 --- a/Sources/performance-worker/BenchmarkService.swift +++ b/Sources/performance-worker/BenchmarkService.swift @@ -21,8 +21,8 @@ import struct Foundation.Data @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) struct BenchmarkService: Grpc_Testing_BenchmarkService.ServiceProtocol { - /// Used to check if - var working = ManagedAtomic(true) + /// Used to check if the server can be streaming responses. + private let working = ManagedAtomic(true) /// One request followed by one response. /// The server returns a client payload with the size requested by the client. @@ -101,19 +101,17 @@ struct BenchmarkService: Grpc_Testing_BenchmarkService.ServiceProtocol { ) async throws -> ServerResponse.Stream { - return ServerResponse.Stream { writer in - if request.message.responseStatus.isInitialized { - try self.checkOkStatus(request.message.responseStatus) + if request.message.responseStatus.isInitialized { + try self.checkOkStatus(request.message.responseStatus) + } + let response = Grpc_Testing_BenchmarkService.Method.StreamingCall.Output.with { + $0.payload = Grpc_Testing_Payload.with { + $0.body = Data(count: Int(request.message.responseSize)) } - + } + return ServerResponse.Stream { writer in while working.load(ordering: .acquiring) { - try await writer.write( - Grpc_Testing_BenchmarkService.Method.StreamingCall.Output.with { - $0.payload = Grpc_Testing_Payload.with { - $0.body = Data(count: Int(request.message.responseSize)) - } - } - ) + try await writer.write(response) } return [:] } @@ -133,15 +131,17 @@ struct BenchmarkService: Grpc_Testing_BenchmarkService.ServiceProtocol { try self.checkOkStatus(message.responseStatus) } } + + // Always use the same canned response for bidirectional streaming. + // This is allowed by the spec. + let response = Grpc_Testing_BenchmarkService.Method.StreamingCall.Output.with { + $0.payload = Grpc_Testing_Payload.with { + $0.body = Data(count: 100) + } + } return ServerResponse.Stream { writer in while working.load(ordering: .acquiring) { - try await writer.write( - Grpc_Testing_BenchmarkService.Method.StreamingCall.Output.with { - $0.payload = Grpc_Testing_Payload.with { - $0.body = Data(count: 100) - } - } - ) + try await writer.write(response) } return [:] } From 67b2c35da7a2361fb4cad3b4f7d9c614e9903e79 Mon Sep 17 00:00:00 2001 From: Stefana Dranca Date: Fri, 15 Mar 2024 15:45:12 +0000 Subject: [PATCH 6/9] changed comment --- Sources/performance-worker/BenchmarkService.swift | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/Sources/performance-worker/BenchmarkService.swift b/Sources/performance-worker/BenchmarkService.swift index f6b773b55..c490aaa2d 100644 --- a/Sources/performance-worker/BenchmarkService.swift +++ b/Sources/performance-worker/BenchmarkService.swift @@ -132,8 +132,9 @@ struct BenchmarkService: Grpc_Testing_BenchmarkService.ServiceProtocol { } } - // Always use the same canned response for bidirectional streaming. - // This is allowed by the spec. + // The 100 size is used by the other implementations as well. + // We are using the same canned response size for all responses + // as it is allowed by the spec. let response = Grpc_Testing_BenchmarkService.Method.StreamingCall.Output.with { $0.payload = Grpc_Testing_Payload.with { $0.body = Data(count: 100) From c45645d535d6fa7a04217d484b677c2cef2623f9 Mon Sep 17 00:00:00 2001 From: Stefana Dranca Date: Fri, 15 Mar 2024 18:51:29 +0000 Subject: [PATCH 7/9] added taskgroup implementation to streamingBothWays() --- .../performance-worker/BenchmarkService.swift | 45 ++++++++++++++----- 1 file changed, 34 insertions(+), 11 deletions(-) diff --git a/Sources/performance-worker/BenchmarkService.swift b/Sources/performance-worker/BenchmarkService.swift index c490aaa2d..672caf086 100644 --- a/Sources/performance-worker/BenchmarkService.swift +++ b/Sources/performance-worker/BenchmarkService.swift @@ -124,14 +124,8 @@ struct BenchmarkService: Grpc_Testing_BenchmarkService.ServiceProtocol { Grpc_Testing_BenchmarkService.Method.StreamingBothWays.Input > ) async throws - -> GRPCCore.ServerResponse.Stream + -> ServerResponse.Stream { - for try await message in request.messages { - if message.responseStatus.isInitialized { - try self.checkOkStatus(message.responseStatus) - } - } - // The 100 size is used by the other implementations as well. // We are using the same canned response size for all responses // as it is allowed by the spec. @@ -140,12 +134,41 @@ struct BenchmarkService: Grpc_Testing_BenchmarkService.ServiceProtocol { $0.body = Data(count: 100) } } - return ServerResponse.Stream { writer in - while working.load(ordering: .acquiring) { - try await writer.write(response) + + var serverResponse = ServerResponse.Stream< + Grpc_Testing_BenchmarkService.Method.StreamingBothWays.Output + >(accepted: .failure(RPCError(code: .internalError, message: "There was a server issue."))) + + try await withThrowingTaskGroup( + of: ServerResponse.Stream?.self + ) { group in + group.addTask { + for try await message in request.messages { + if message.responseStatus.isInitialized { + try self.checkOkStatus(message.responseStatus) + } + } + _ = self.working.exchange(false, ordering: .acquiring) + return nil } - return [:] + group.addTask { + return ServerResponse.Stream { writer in + while working.load(ordering: .acquiring) { + try await writer.write(response) + } + return [:] + } + } + for try await result in group { + guard let result = result else { + continue + } + serverResponse = result + group.cancelAll() + } + group.cancelAll() } + return serverResponse } } From ebcc7f2b1e0f0f31ca68df4ac693a1c13f17bee8 Mon Sep 17 00:00:00 2001 From: Stefana Dranca Date: Mon, 18 Mar 2024 11:03:43 +0000 Subject: [PATCH 8/9] fixed task grpup --- .../performance-worker/BenchmarkService.swift | 41 ++++++++----------- 1 file changed, 16 insertions(+), 25 deletions(-) diff --git a/Sources/performance-worker/BenchmarkService.swift b/Sources/performance-worker/BenchmarkService.swift index 672caf086..75d7f9972 100644 --- a/Sources/performance-worker/BenchmarkService.swift +++ b/Sources/performance-worker/BenchmarkService.swift @@ -135,40 +135,31 @@ struct BenchmarkService: Grpc_Testing_BenchmarkService.ServiceProtocol { } } - var serverResponse = ServerResponse.Stream< - Grpc_Testing_BenchmarkService.Method.StreamingBothWays.Output - >(accepted: .failure(RPCError(code: .internalError, message: "There was a server issue."))) + // Marks if the inbound streaming is ongoing or finished. + let inboundStreaming = ManagedAtomic(true) - try await withThrowingTaskGroup( - of: ServerResponse.Stream?.self - ) { group in - group.addTask { - for try await message in request.messages { - if message.responseStatus.isInitialized { - try self.checkOkStatus(message.responseStatus) + return ServerResponse.Stream { writer in + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + for try await message in request.messages { + if message.responseStatus.isInitialized { + try self.checkOkStatus(message.responseStatus) + } } + _ = inboundStreaming.exchange(false, ordering: .acquiring) } - _ = self.working.exchange(false, ordering: .acquiring) - return nil - } - group.addTask { - return ServerResponse.Stream { writer in - while working.load(ordering: .acquiring) { + group.addTask { + while inboundStreaming.load(ordering: .acquiring) + && self.working.load(ordering: .acquiring) + { try await writer.write(response) } - return [:] - } - } - for try await result in group { - guard let result = result else { - continue } - serverResponse = result + try await group.next() group.cancelAll() + return [:] } - group.cancelAll() } - return serverResponse } } From 7f71ef32b39e1eb1a4836658fecd732953311078 Mon Sep 17 00:00:00 2001 From: Stefana Dranca Date: Mon, 18 Mar 2024 14:33:56 +0000 Subject: [PATCH 9/9] changed exchange to store --- Sources/performance-worker/BenchmarkService.swift | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Sources/performance-worker/BenchmarkService.swift b/Sources/performance-worker/BenchmarkService.swift index 75d7f9972..67d1de679 100644 --- a/Sources/performance-worker/BenchmarkService.swift +++ b/Sources/performance-worker/BenchmarkService.swift @@ -110,7 +110,7 @@ struct BenchmarkService: Grpc_Testing_BenchmarkService.ServiceProtocol { } } return ServerResponse.Stream { writer in - while working.load(ordering: .acquiring) { + while working.load(ordering: .relaxed) { try await writer.write(response) } return [:] @@ -146,10 +146,10 @@ struct BenchmarkService: Grpc_Testing_BenchmarkService.ServiceProtocol { try self.checkOkStatus(message.responseStatus) } } - _ = inboundStreaming.exchange(false, ordering: .acquiring) + inboundStreaming.store(false, ordering: .relaxed) } group.addTask { - while inboundStreaming.load(ordering: .acquiring) + while inboundStreaming.load(ordering: .relaxed) && self.working.load(ordering: .acquiring) { try await writer.write(response)