-
Notifications
You must be signed in to change notification settings - Fork 435
BenchmarkClient RPCs implementation #1861
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Motivation: When the WorkerService holds and monitors BenchmarkClients, they need to make the requests to the server, as configured in the config input the WorkerService receives from the Test Driver. Modifications: - implemented a helper function that computes the latency and extracts the error code for a RPC - implemented the body of the makrRPC function that makes one of the 5 possible RPCs Result: The BenchmarkClient implementation for performance testing will be completed.
| guard errorCode != RPCError.Code.unknown else { | ||
| throw RPCError(code: .unknown, message: "The RPC type is UNRECOGNIZED.") | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than checking each time we do an RPC can we move this logic up to where we first get the RPC type? We can create our own internal enum containing all but the unrecognised case and use that after validating that we weren't sent UNRECOGNIZED.
| _ body: (Grpc_Testing_SimpleRequest) async throws -> Result<Contents, RPCError> | ||
| ) async throws -> (latency: Double, errorCode: RPCError.Code?) { | ||
| let request = Grpc_Testing_SimpleRequest.with { | ||
| $0.responseSize = 10 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we always set this to 10?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no particular reason, I was thinking we can create it a single time and set a random size. Is there a specific size I should set?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, does the client config not tell us what size to use?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you're right, I forgot about the SimpleProtoParams - they specify the request and response sizes
| } | ||
| } | ||
|
|
||
| private func computeTimeAndErrorCode<Contents>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a bit strange to link the timing with the capturing of error codes. Aim for simple: have a function do one thing; time the execution of the body and return the result:
func timeIt<R>(_ body: () async throws -> R) async rethrows -> (R, seconds: Double) {
// ...
}This means at the call site you can return through the error code if applicable:
let (errorCode, seconds) = await timeIt {
do {
try await client.whatever()
return nil
} catch let error as RPCError {
return error.code
} catch {
return .unknown
}
}| id += 1 | ||
| ids.continuation.yield(id) | ||
| } | ||
| return response.accepted |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to return accepted here, we'll throw any RPCError from response.message, so if you get to the return here you know there's been no error.
Effectively we know that if we don't throw that the error code should be nil.
| ) { response in | ||
| return response.accepted | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to consume the messages on the response: if we don't consume them we'll exert backpressure and the server will stop writing messages.
|
|
||
| let responseStatus = try await benchmarkClient.streamingBothWays(request: streamingRequest) | ||
| { response in | ||
| return response.accepted |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same note here re: consuming the messages
glbrntt
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few nits but looks good otherwise!
| self.rpcStats = NIOLockedValueBox(RPCStats(latencyHistogram: histogram)) | ||
| } | ||
|
|
||
| enum RpcType: Int { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: RPCType, also no need for it to be raw representable as an Int here
| switch rpcType { | ||
| case .unary, .streaming, .streamingFromClient, .streamingFromServer, .streamingBothWays: | ||
| self.rpcType = RpcType(rawValue: rpcType.rawValue)! | ||
| case .UNRECOGNIZED: | ||
| throw RPCError(code: .unknown, message: "The RPC type is UNRECOGNIZED.") | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we do this before creating the client?
| let (latency, errorCode) = self.makeRPC(client: benchmarkClient, rpcType: self.rpcType) | ||
| let (latency, errorCode) = try await self.makeRPC( | ||
| benchmarkClient: benchmarkClient, | ||
| rpcType: self.rpcType |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need to pass this in because it's stored on self
| benchmarkClient: Grpc_Testing_BenchmarkServiceClient, | ||
| rpcType: RpcType | ||
| ) async throws -> (latency: Double, errorCode: RPCError.Code?) { | ||
| let request = Grpc_Testing_SimpleRequest.with { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we call this message? I think of the "request" as the overall input to the RPC, i.e. it includes the metadata as well. A "message" or "messages" are a part of the request.
| try await benchmarkClient.unaryCall(request: ClientRequest.Single(message: request)) { | ||
| response in | ||
| _ = try response.message |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| try await benchmarkClient.unaryCall(request: ClientRequest.Single(message: request)) { | |
| response in | |
| _ = try response.message | |
| try await benchmarkClient.unaryCall( | |
| request: ClientRequest.Single(message: request) | |
| ) { response in | |
| _ = try response.message |
glbrntt
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few things got missed from last time but basically looks good!
| self.rpcStats = NIOLockedValueBox(RPCStats(latencyHistogram: histogram)) | ||
| } | ||
|
|
||
| enum RpcType { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should be RPCType
| protoParams: Grpc_Testing_SimpleProtoParams, | ||
| histogramParams: Grpc_Testing_HistogramParams? | ||
| ) { | ||
| ) throws { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't throwing
| ) { | ||
| response in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| ) { | |
| response in | |
| ) { response in |
| } | ||
| } | ||
|
|
||
| _ = try await benchmarkClient.streamingFromClient( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one shouldn't be necessary as the body of the closure doesn't return anything
| _ = try await benchmarkClient.streamingFromClient( | |
| try await benchmarkClient.streamingFromClient( |
Motivation:
When the WorkerService holds and monitors BenchmarkClients, they need to make the requests to the server, as configured in the config input the WorkerService receives from the Test Driver.
Modifications:
Result:
The BenchmarkClient implementation for performance testing will be completed.