Skip to content

Conversation

@stefanadranca
Copy link
Collaborator

Motivation:

The 2 workers used in QPS testing are a Benchmark service client and server respectivelly, so we need to implement the Benchmark Service.

Modifications:

Implemented the 'BenchmarkService' struct that defines the service protocol methods, based on their documentation.

Result:

We will be able to proceed with the Wrorker Service implementation.

Motivation:

The 2 workers used in QPS testing are a Benchmark service client and server respectivelly,
so we need to implement the Benchmark Service.

Modifications:

Implemented the 'BenchmarkService' struct that defines the service protocol methods, based on their documentation.

Result:

We will be able to proceed with the Wrorker Service implementation.
Comment on lines 29 to 31
{

// Throw an error if the status is not `ok`. Otherwise, an `ok` status is automatically sent
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
{
// Throw an error if the status is not `ok`. Otherwise, an `ok` status is automatically sent
{
// Throw an error if the status is not `ok`. Otherwise, an `ok` status is automatically sent


@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
extension BenchmarkService {
private func echoStatus(responseStatus: Grpc_Testing_EchoStatus) throws {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The naming/usage is not all that clear because it's try echoStatus(...) which makes it seem like it throws if it can't echo the status.

One analogue I can think of is throwing a cancellation error if a task is cancelled and that's spelled try Task.checkCancellation().

Drawing on that I think something like try checkOkStatus(_ responseStatus: Grpc_Testing_EchoStatus) (note the dropped label as 'status' is included in the name) has clearer semantics: it returns if the status is okay, otherwise it throws.

) async throws
-> ServerResponse.Single<Grpc_Testing_BenchmarkService.Method.StreamingFromClient.Output>
{
throw RPCError(code: .unimplemented, message: "The RPC is not implemented.")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we implement this one?

$0.payload = request.message.payload
}
)
try await Task.sleep(nanoseconds: 10)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't need to sleep here

try self.echoStatus(responseStatus: request.message.responseStatus)
}

while true {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to stop eventually, while !Task.isCancelled is probably a better condition to use here.

-> GRPCCore.ServerResponse.Stream<Grpc_Testing_BenchmarkService.Method.StreamingBothWays.Output>
{
return ServerResponse.Stream { writer in
while true {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, we need to stop eventually

}
}
)
try await Task.sleep(nanoseconds: 10)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to sleep here

Comment on lines 38 to 40
message: Grpc_Testing_BenchmarkService.Method.UnaryCall.Output.with {
$0.payload = request.message.payload
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't quite right: the request tells us how big the response payload should be (via responeSize). Same goes for all the other RPCs.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this also true for streamingBothWays()? The documentation for it says "Both sides send the content of their own choice to the other."

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I believe so, I think the docs are outdated because the scenario descriptions which drive the tests expect different request/response sizes.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't making this change mean that streamingCall() and streamingBothWays() become the same function?

Comment on lines 126 to 127
guard let code = Status.Code(rawValue: Int(responseStatus.code))
else {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
guard let code = Status.Code(rawValue: Int(responseStatus.code))
else {
guard let code = Status.Code(rawValue: Int(responseStatus.code)) else {

Comment on lines 130 to 133
let status = Status(code: code, message: responseStatus.message)
if let error = RPCError(status: status) {
throw error
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we always create the Status but don't always use it (in fact we don't need to use it at all):

if let code = RPCError.Code(code) {
  throw RPCError(code: code, message: "...")
}

@stefanadranca stefanadranca added the version/v2 Relates to v2 label Mar 15, 2024
@stefanadranca stefanadranca requested a review from glbrntt March 15, 2024 11:10

@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
struct BenchmarkService: Grpc_Testing_BenchmarkService.ServiceProtocol {
/// Used to check if
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The suspense is killing me, what is it used to check?

@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
struct BenchmarkService: Grpc_Testing_BenchmarkService.ServiceProtocol {
/// Used to check if
var working = ManagedAtomic<Bool>(true)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be private and a let

try await writer.write(
Grpc_Testing_BenchmarkService.Method.StreamingCall.Output.with {
$0.payload = Grpc_Testing_Payload.with {
$0.body = Data(count: Int(request.message.responseSize))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will create the message each time which is potentially expensive. Can we create it once before the loop?

try await writer.write(
Grpc_Testing_BenchmarkService.Method.StreamingCall.Output.with {
$0.payload = Grpc_Testing_Payload.with {
$0.body = Data(count: 100)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment about why we used 100? There's a very low probability we'll remember why otherwise

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, was there a specific reason we used exactly 100? (We are using a single value so we can create the message once - as you pointed before I will create it before the loop, but was there a reason for this size specifically?)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so because the java implementation does this? my understanding of the comment is that they are using the same size for all responses because the spec allows this, but not that the spec says it should be 100. Am I missing something?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're not missing anything. We're doing it because Java (and other impls) are using 100, i.e. there is precedent elsewhere and we haven't just picked 100 out of thin air (someone else did, we're just following).

@stefanadranca stefanadranca requested a review from glbrntt March 15, 2024 14:33
Comment on lines 129 to 147
for try await message in request.messages {
if message.responseStatus.isInitialized {
try self.checkOkStatus(message.responseStatus)
}
}

// Always use the same canned response for bidirectional streaming.
// This is allowed by the spec.
let response = Grpc_Testing_BenchmarkService.Method.StreamingCall.Output.with {
$0.payload = Grpc_Testing_Payload.with {
$0.body = Data(count: 100)
}
}
return ServerResponse.Stream { writer in
while working.load(ordering: .acquiring) {
try await writer.write(response)
}
return [:]
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to read and write at the same time which means we need to do the work in a task group within the body of the response stream. We can have one task for reading and one for writing. You can use an atomic bool as the stopping condition for writing (we should stop writing when all requests have been consumed).

@stefanadranca stefanadranca requested a review from glbrntt March 15, 2024 18:52
try self.checkOkStatus(message.responseStatus)
}
}
_ = self.working.exchange(false, ordering: .acquiring)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is quite right: self.working will be toggled by the quit method on the worker service, it's an external trigger to shutdown. Looking at it in a different way: I don't think it makes sense that the end of the request stream for this method would stop the response stream from streaming-from-server.

Instead I think we want a separate atomic to signal between the request stream and response stream.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! It makes sense to use a different atomic for this. One question though: I should still check when writing the responses if 'working' is set to 'true', besides checking the new atomic, right?

}
group.cancelAll()
}
return serverResponse
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit inverted. At the moment the behaviour will be:

  • RPC starts
  • Task group is created
    • One task runs to consume the request messages
    • The other task returns the response stream immediately
  • We then consume the response stream, the first result we'll get will be the server response because it's returned immediately so we cancel the task group (and stop consuming requests)
  • We then wait for all child tasks in the task group to finish (this should be fast because one has already finished and the other has been cancelled)
  • Then we return the server response so that gRPC can run it.

Instead we should be consuming the request stream within a task group inside the response stream. The rough shape is something like this:

return ServerResponse.Stream { writer in 
  try await withThrowingTaskGroup(of: Void.self) { group in
    group.addTask { 
      // consume requests 
    } 
    group.addTask { 
      // write responses 
    }
  }
  return [:]
}

@stefanadranca stefanadranca requested a review from glbrntt March 18, 2024 11:10
Copy link
Collaborator

@glbrntt glbrntt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one nit but looks good otherwise

try self.checkOkStatus(message.responseStatus)
}
}
_ = inboundStreaming.exchange(false, ordering: .acquiring)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to exchange, store is fine. We can also use relaxed ordering here (and while loading).

@stefanadranca stefanadranca requested a review from glbrntt March 18, 2024 14:39
@glbrntt glbrntt enabled auto-merge (squash) March 18, 2024 14:43
@glbrntt glbrntt merged commit 62653ba into grpc:main Mar 18, 2024
@gjcairo gjcairo added semver/none No version bump required. and removed semver/none No version bump required. labels Apr 2, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

version/v2 Relates to v2

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants