Skip to content

Commit

Permalink
beef up doc comments for cancel timings; reconcile the logic in refer…
Browse files Browse the repository at this point in the history
…enceclient and grpcclient with the pseudo-code in docs
  • Loading branch information
jhump committed May 23, 2024
1 parent bd780b4 commit 36c5085
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 54 deletions.
4 changes: 4 additions & 0 deletions docs/testing_clients.md
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,10 @@ for each request message {
extract the payload field from the response and
record it in array of payload values
if we should cancel after N response messages and this is the Nth {
cancel the RPC (but do not return)
}
}
if we should cancel before close send {
Expand Down
30 changes: 14 additions & 16 deletions internal/app/grpcclient/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,16 @@ func (i *invoker) serverStream(
ctx, cancel := context.WithCancel(ctx)
defer cancel()

if timing.AfterCloseSendMs >= 0 {
time.AfterFunc(time.Duration(timing.AfterCloseSendMs)*time.Millisecond, cancel)
}
stream, err := i.client.ServerStream(ctx, req)
if err != nil {
return nil, err
}

if timing.AfterCloseSendMs >= 0 {
time.Sleep(time.Duration(timing.AfterCloseSendMs) * time.Millisecond)
cancel()
}

// Read headers from the stream
hdr, err := stream.Header()
if err != nil {
Expand All @@ -145,11 +148,6 @@ func (i *invoker) serverStream(
}
}()

// If the cancel timing specifies after 0 responses, then cancel before
// receiving anything
if timing.AfterNumResponses == 0 {
cancel()
}
totalRcvd := 0
for {
msg, err := stream.Recv()
Expand Down Expand Up @@ -287,12 +285,8 @@ func (i *invoker) bidiStream(
break
}
if fullDuplex {
if totalRcvd == timing.AfterNumResponses {
cancel()
}
// If this is a full duplex stream, receive a response for each request
msg, err := stream.Recv()
totalRcvd++
if err != nil {
if !errors.Is(err, io.EOF) {
// If an error was returned that is not an EOF, convert it
Expand All @@ -306,6 +300,10 @@ func (i *invoker) bidiStream(
}
// If the call was successful, get the returned payloads
result.Payloads = append(result.Payloads, msg.Payload)
totalRcvd++
if totalRcvd == timing.AfterNumResponses {
cancel()
}
}
}

Expand Down Expand Up @@ -347,11 +345,7 @@ func (i *invoker) bidiStream(

// Receive any remaining responses
for {
if totalRcvd == timing.AfterNumResponses {
cancel()
}
msg, err := stream.Recv()
totalRcvd++
if err != nil {
if !errors.Is(err, io.EOF) {
// If an error was returned that is not an EOF, convert it
Expand All @@ -363,6 +357,10 @@ func (i *invoker) bidiStream(
}
// If the call was successful, save the payloads
result.Payloads = append(result.Payloads, msg.Payload)
totalRcvd++
if totalRcvd == timing.AfterNumResponses {
cancel()
}
}

if protoErr != nil {
Expand Down
34 changes: 14 additions & 20 deletions internal/app/referenceclient/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,6 @@ func (i *invoker) serverStream(

ctx = i.withWireCapture(ctx)

if timing.AfterCloseSendMs >= 0 {
time.AfterFunc(time.Duration(timing.AfterCloseSendMs)*time.Millisecond, cancel)
}
stream, err := i.client.ServerStream(ctx, request)
if err != nil {
// If an error was returned, first convert it to a Connect error
Expand Down Expand Up @@ -266,15 +263,15 @@ func (i *invoker) serverStream(
}
}()

if timing.AfterCloseSendMs >= 0 {
time.Sleep(time.Duration(timing.AfterCloseSendMs) * time.Millisecond)
cancel()
}

if ssr.ResponseDefinition != nil {
result.Payloads = make([]*conformancev1.ConformancePayload, 0, len(ssr.ResponseDefinition.ResponseData))
}

// If the cancel timing specifies after 0 responses, then cancel before
// receiving anything
if timing.AfterNumResponses == 0 {
cancel()
}
totalRcvd := 0
for stream.Receive() {
totalRcvd++
Expand Down Expand Up @@ -338,10 +335,7 @@ func (i *invoker) clientStream(
if timing.BeforeCloseSend != nil {
cancel()
} else if timing.AfterCloseSendMs >= 0 {
go func() {
time.Sleep(time.Duration(timing.AfterCloseSendMs) * time.Millisecond)
cancel()
}()
time.AfterFunc(time.Duration(timing.AfterCloseSendMs)*time.Millisecond, cancel)
}
resp, err := stream.CloseAndReceive()
if err != nil {
Expand Down Expand Up @@ -440,12 +434,8 @@ func (i *invoker) bidiStream(
break
}
if fullDuplex {
if totalRcvd == timing.AfterNumResponses {
cancel()
}
// If this is a full duplex stream, receive a response for each request
msg, err := stream.Receive()
totalRcvd++
if err != nil {
if !errors.Is(err, io.EOF) {
// If an error was returned that is not an EOF, convert it
Expand All @@ -459,6 +449,10 @@ func (i *invoker) bidiStream(
}
// If the call was successful, get the returned payloads
result.Payloads = append(result.Payloads, msg.Payload)
totalRcvd++
if totalRcvd == timing.AfterNumResponses {
cancel()
}
}
}

Expand All @@ -484,11 +478,7 @@ func (i *invoker) bidiStream(

// Receive any remaining responses
for {
if totalRcvd == timing.AfterNumResponses {
cancel()
}
msg, err := stream.Receive()
totalRcvd++
if err != nil {
if !errors.Is(err, io.EOF) {
// If an error was returned that is not an EOF, convert it
Expand All @@ -500,6 +490,10 @@ func (i *invoker) bidiStream(
}
// If the call was successful, save the payloads
result.Payloads = append(result.Payloads, msg.Payload)
totalRcvd++
if totalRcvd == timing.AfterNumResponses {
cancel()
}
}

if protoErr != nil {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 27 additions & 9 deletions proto/connectrpc/conformance/v1/client_compat.proto
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ message ClientCompatRequest {
// If specified, service must also be specified.
// If not specified, the test runner will auto-populate this field based on the stream_type.
optional string method = 12;
// The stream type of `method` (i.e. Unary, Client-Streaming, Server-Streaming, Full Duplex Bidi, or Half Duplex Bidi).
// The stream type of `method` (i.e. unary, client stream, server stream, full-duplex bidi
// stream, or half-duplex bidi stream).
// When writing test cases, this is a required field.
StreamType stream_type = 13;
// If protocol indicates Connect and stream type indicates
Expand All @@ -83,16 +84,16 @@ message ClientCompatRequest {
// The actual request messages that will sent to the server.
// The type URL for all entries should be equal to the request type of the
// method.
// There must be exactly one for unary and server-stream methods but
// can be zero or more for client- and bidi-stream methods.
// For client- and bidi-stream methods, all entries will have the
// There must be exactly one for unary and server stream methods but
// can be zero or more for client and bidi stream methods.
// For client and bidi stream methods, all entries will have the
// same type URL.
repeated google.protobuf.Any request_messages = 16;
// The timeout, in milliseconds, for the request. This is equivalent to a
// deadline for the request. If unset, there will be no timeout.
optional uint32 timeout_ms = 17;
// Wait this many milliseconds before sending a request message.
// For client- or bidi-streaming requests, this delay should be
// For client or bidi stream methods, this delay should be
// applied before each request sent.
uint32 request_delay_ms = 18;
// If present, the client should cancel the RPC instead of
Expand All @@ -107,15 +108,32 @@ message ClientCompatRequest {
oneof cancel_timing {
// When present, the client should cancel *instead of*
// closing the send side of the stream, after all requests
// have been sent. This applies only to client and bidi
// stream RPCs.
// have been sent.
//
// This applies only to client and bidi stream RPCs.
google.protobuf.Empty before_close_send = 1;
// When present, the client should delay for this many
// milliseconds after closing the send side of the stream
// and then cancel.
//
// This applies to all types of RPCs.
//
// For unary and server stream RPCs, where the API usually
// does not allow explicitly closing the send side, the
// cancellation should be done immediately after invoking

This comment has been minimized.

Copy link
@smaye81

smaye81 May 23, 2024

Member

❤️

// the RPC (which should implicitly send the one-and-only
// request and then close the send-side).
//
// For APIs where unary RPCs block until the response
// is received, there is no point after the request is
// sent but before a response is received to cancel. So
// the client must arrange for the RPC to be canceled
// asynchronously before invoking the blocking unary call.
uint32 after_close_send_ms = 2;
// When present, the client should cancel right after
// reading this number of response messages from the stream.
// When present, this will be greater than zero.
//
// This applies only to server and bidi stream RPCs.
uint32 after_num_responses = 3;
}
Expand Down Expand Up @@ -159,8 +177,8 @@ message ClientResponseResult {
repeated Header response_headers = 1;
// Servers should echo back payloads that they received as part of the request.
// This field should contain all the payloads the server echoed back. Note that
// There will be zero-to-one for unary and client-stream methods and
// zero-to-many for server- and bidi-stream methods.
// There will be zero-to-one for unary and client stream methods and
// zero-to-many for server and bidi stream methods.
repeated ConformancePayload payloads = 2;
// The error received from the actual RPC invocation. Note this is not representative
// of a runtime error and should always be the proto equivalent of a Connect
Expand Down

0 comments on commit 36c5085

Please sign in to comment.