Skip to content

Commit

Permalink
Tighten up when response can combine headers and trailers; add check …
Browse files Browse the repository at this point in the history
…for unsent requests (#805)
  • Loading branch information
jhump committed Feb 23, 2024
1 parent ff841fa commit ebf0680
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 58 deletions.
27 changes: 13 additions & 14 deletions internal/app/connectconformance/results.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,26 +170,25 @@ func (r *testResults) assert(
errs = append(errs, checkError(expected.Error, actual.Error)...)
errs = append(errs, checkPayloads(expected.Payloads, actual.Payloads)...)

// TODO - This check is for trailers-only and should really only apply to gRPC and gRPC-Web protocols.
// Previously, it checked for error != nil, which is compliant with gRPC. But gRPC-Web does trailers-only
// responses with no errors also.
if len(expected.Payloads) == 0 {
// When there are no messages in the body, the server may send a
// trailers-only response. In that case, it is acceptable for the expected
// headers and trailers to be merged into one set, and it is acceptable for the
// client to interpret them as either headers or trailers.
if len(expected.Payloads) == 0 &&
expected.Error != nil &&
(definition.Request.StreamType == conformancev1.StreamType_STREAM_TYPE_UNARY ||
definition.Request.StreamType == conformancev1.StreamType_STREAM_TYPE_CLIENT_STREAM) {
// For unary and client-stream operations, a server API may not provide a way to
// set headers and trailers separately on error but instead a way to return an
// error with embedded metadata. In that case, we may not be able to distinguish
// headers from trailers in the response -- they get unified into a single bag of
// "error metadata". The conformance client should record those as trailers when
// sending back a ClientResponseResult message.

// So first we see if normal attribute succeeds
metadataErrs := checkHeaders("response headers", expected.ResponseHeaders, actual.ResponseHeaders)
metadataErrs = append(metadataErrs, checkHeaders("response trailers", expected.ResponseTrailers, actual.ResponseTrailers)...)
if len(metadataErrs) > 0 {
// That did not work. So we test to see if client attributed them all as headers
// or all as trailers.
// That did not work. So we test to see if client attributed them all as trailers.
merged := mergeHeaders(expected.ResponseHeaders, expected.ResponseTrailers)
allHeadersErrs := checkHeaders("response metadata", merged, actual.ResponseHeaders)
allTrailersErrs := checkHeaders("response metadata", merged, actual.ResponseTrailers)
if len(allHeadersErrs) != 0 && len(allTrailersErrs) != 0 {
// These checks failed also. So the received headers/trailers are incorrect.
if allTrailersErrs := checkHeaders("response metadata", merged, actual.ResponseTrailers); len(allTrailersErrs) != 0 {
// That check failed also. So the received headers/trailers are incorrect.
// Report the original errors computed above.
errs = append(errs, metadataErrs...)
}
Expand Down
38 changes: 13 additions & 25 deletions internal/app/connectconformance/results_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,14 +121,20 @@ func TestResults_Assert(t *testing.T) {
{Data: []byte{0, 1, 2, 3, 4}},
},
}
testCase1 := &conformancev1.TestCase{ExpectedResponse: payload1}
testCase1 := &conformancev1.TestCase{
Request: &conformancev1.ClientCompatRequest{TestName: "abc1"},
ExpectedResponse: payload1,
}
payload2 := &conformancev1.ClientResponseResult{
Error: &conformancev1.Error{
Code: conformancev1.Code_CODE_ABORTED,
Message: proto.String("oops"),
},
}
testCase2 := &conformancev1.TestCase{ExpectedResponse: payload2}
testCase2 := &conformancev1.TestCase{
Request: &conformancev1.ClientCompatRequest{TestName: "abc2"},
ExpectedResponse: payload2,
}
results.assert("foo/bar/1", testCase1, payload2)
results.assert("foo/bar/2", testCase2, payload1)
results.assert("foo/bar/3", testCase1, payload1)
Expand Down Expand Up @@ -288,28 +294,7 @@ func TestResults_Assert_ReportsAllErrors(t *testing.T) {
},
},
{
name: "response meta misattributed allowed for trailers-only response (all in headers)",
expected: `{
"error": {"code": 5},
"response_headers": [
{"name": "abc", "value": ["xyz", "123"]},
{"name": "xyz", "value": ["value1"]}
],
"response_trailers": [
{"name": "Case-Does-Not-Matter-For-Name", "value": ["value2"]}
]
}`,
actual: `{
"error": {"code": 5},
"response_headers": [
{"name": "abc", "value": ["xyz", "123"]},
{"name": "xyz", "value": ["value1"]},
{"name": "Case-Does-Not-Matter-For-Name", "value": ["value2"]}
]
}`,
},
{
name: "response meta misattributed allowed for trailers-only response (all in trailers)",
name: "response meta all in trailers allowed for error with trailers-only response",
expected: `{
"error": {"code": 5},
"response_headers": [
Expand Down Expand Up @@ -694,7 +679,10 @@ func TestResults_Assert_ReportsAllErrors(t *testing.T) {
t.Parallel()
results := newResults(&testTrie{}, &testTrie{}, nil)

expected := &conformancev1.TestCase{ExpectedResponse: &conformancev1.ClientResponseResult{}}
expected := &conformancev1.TestCase{
Request: &conformancev1.ClientCompatRequest{StreamType: conformancev1.StreamType_STREAM_TYPE_UNARY},
ExpectedResponse: &conformancev1.ClientResponseResult{},
}
err := protojson.Unmarshal(([]byte)(testCase.expected), expected.ExpectedResponse)
require.NoError(t, err)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ testCases:
- request:
testName: client-stream/first-request-exceeds-server-limit
streamType: STREAM_TYPE_CLIENT_STREAM
requestDelayMs: 50 # give server enough time to reject message and client to notice
requestMessages:
- "@type": type.googleapis.com/connectrpc.conformance.v1.ClientStreamRequest
responseDefinition:
Expand All @@ -56,9 +57,11 @@ testCases:
expectedResponse:
error:
code: CODE_RESOURCE_EXHAUSTED
numUnsentRequests: 2
- request:
testName: client-stream/subsequent-request-exceeds-server-limit
streamType: STREAM_TYPE_CLIENT_STREAM
requestDelayMs: 50
requestMessages:
- "@type": type.googleapis.com/connectrpc.conformance.v1.ClientStreamRequest
responseDefinition:
Expand All @@ -72,6 +75,7 @@ testCases:
expectedResponse:
error:
code: CODE_RESOURCE_EXHAUSTED
numUnsentRequests: 1
# Server Stream Tests ---------------------------------------------------------
- request:
testName: server-stream/request-equal-to-server-limit
Expand Down Expand Up @@ -99,6 +103,7 @@ testCases:
expectedResponse:
error:
code: CODE_RESOURCE_EXHAUSTED
numUnsentRequests: 1
# Bidi Stream Tests -----------------------------------------------------------
- request:
testName: bidi-stream/half-duplex/all-requests-equal-to-server-limit
Expand All @@ -117,6 +122,7 @@ testCases:
- request:
testName: bidi-stream/half-duplex/first-request-exceeds-server-limit
streamType: STREAM_TYPE_HALF_DUPLEX_BIDI_STREAM
requestDelayMs: 50
requestMessages:
- "@type": type.googleapis.com/connectrpc.conformance.v1.BidiStreamRequest
responseDefinition:
Expand All @@ -134,9 +140,11 @@ testCases:
expectedResponse:
error:
code: CODE_RESOURCE_EXHAUSTED
numUnsentRequests: 2
- request:
testName: bidi-stream/half-duplex/subsequent-request-exceeds-server-limit
streamType: STREAM_TYPE_HALF_DUPLEX_BIDI_STREAM
requestDelayMs: 50
requestMessages:
- "@type": type.googleapis.com/connectrpc.conformance.v1.BidiStreamRequest
responseDefinition:
Expand All @@ -154,6 +162,7 @@ testCases:
expectedResponse:
error:
code: CODE_RESOURCE_EXHAUSTED
numUnsentRequests: 1
- request:
testName: bidi-stream/full-duplex/all-requests-equal-to-server-limit
streamType: STREAM_TYPE_FULL_DUPLEX_BIDI_STREAM
Expand Down Expand Up @@ -181,6 +190,7 @@ testCases:
fullDuplex: true
- "@type": type.googleapis.com/connectrpc.conformance.v1.BidiStreamRequest
requestData: "dGVzdCByZXNwb25zZQ=="
requestDelayMs: 50
expandRequests:
- sizeRelativeToLimit: 10
- sizeRelativeToLimit: 0
Expand All @@ -189,11 +199,13 @@ testCases:
expectedResponse:
error:
code: CODE_RESOURCE_EXHAUSTED
numUnsentRequests: 2
# TODO - Need a way to populate the expected response payload because the test
# library padded it with size and we don't know what it looks like here.
# - request:
# testName: bidi-stream/full-duplex/subsequent-request-exceeds-server-limit
# streamType: STREAM_TYPE_FULL_DUPLEX_BIDI_STREAM
# requestDelayMs: 50
# requestMessages:
# - "@type": type.googleapis.com/connectrpc.conformance.v1.BidiStreamRequest
# responseDefinition:
Expand Down Expand Up @@ -222,3 +234,4 @@ testCases:
# requestData: "dGVzdCByZXNwb25zZQ=="
# error:
# code: CODE_RESOURCE_EXHAUSTED
# numUnsentRequests: 1
33 changes: 20 additions & 13 deletions internal/app/referenceclient/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,10 @@ func (i *invoker) unary(

if err != nil {
// If an error was returned, first convert it to a Connect error
// so that we can get the headers from the Meta property. Then,
// so that we can get the trailers from the Meta property. Then,
// convert _that_ to a proto Error so we can set it in the response.
connectErr := internal.ConvertErrorToConnectError(err)
headers = internal.ConvertToProtoHeader(connectErr.Meta())
trailers = internal.ConvertToProtoHeader(connectErr.Meta())
protoErr = internal.ConvertConnectToProtoError(connectErr)
} else {
// If the call was successful, get the headers and trailers
Expand Down Expand Up @@ -318,11 +318,11 @@ func (i *invoker) clientStream(

ctx = i.withWireCapture(ctx)
stream := i.client.ClientStream(ctx)

var numUnsent int
// Add the specified request headers to the request
internal.AddHeaders(req.RequestHeaders, stream.RequestHeader())

for _, msg := range req.RequestMessages {
for i, msg := range req.RequestMessages {
csr := &conformancev1.ClientStreamRequest{}
if err := msg.UnmarshalTo(csr); err != nil {
return nil, err
Expand All @@ -332,6 +332,7 @@ func (i *invoker) clientStream(
time.Sleep(time.Duration(req.RequestDelayMs) * time.Millisecond)

if err := stream.Send(csr); err != nil && errors.Is(err, io.EOF) {
numUnsent = len(req.RequestMessages) - i
break
}
}
Expand All @@ -357,10 +358,10 @@ func (i *invoker) clientStream(
resp, err := stream.CloseAndReceive()
if err != nil {
// If an error was returned, first convert it to a Connect error
// so that we can get the headers from the Meta property. Then,
// so that we can get the trailers from the Meta property. Then,
// convert _that_ to a proto Error so we can set it in the response.
connectErr := internal.ConvertErrorToConnectError(err)
headers = internal.ConvertToProtoHeader(connectErr.Meta())
trailers = internal.ConvertToProtoHeader(connectErr.Meta())
protoErr = internal.ConvertConnectToProtoError(connectErr)
} else {
// If the call was successful, get the returned payloads
Expand All @@ -373,12 +374,13 @@ func (i *invoker) clientStream(
statusCode, feedback := i.examineWireDetails(ctx)

return &conformancev1.ClientResponseResult{
ResponseHeaders: headers,
ResponseTrailers: trailers,
Payloads: payloads,
Error: protoErr,
HttpStatusCode: statusCode,
Feedback: feedback,
ResponseHeaders: headers,
ResponseTrailers: trailers,
Payloads: payloads,
NumUnsentRequests: int32(numUnsent),
Error: protoErr,
HttpStatusCode: statusCode,
Feedback: feedback,
}, nil
}

Expand Down Expand Up @@ -424,13 +426,17 @@ func (i *invoker) bidiStream(

var protoErr *conformancev1.Error
totalRcvd := 0
for _, msg := range req.RequestMessages {
for i, msg := range req.RequestMessages {
bsr := &conformancev1.BidiStreamRequest{}
if err := msg.UnmarshalTo(bsr); err != nil {
// Return the error and nil result because this is an
// unmarshalling error unrelated to the RPC
return nil, err
}

// Sleep for any specified delay
time.Sleep(time.Duration(req.RequestDelayMs) * time.Millisecond)

if err := stream.Send(bsr); err != nil && errors.Is(err, io.EOF) {
// Call receive to get the error and convert it to a proto error
if _, recvErr := stream.Receive(); recvErr != nil {
Expand All @@ -442,6 +448,7 @@ func (i *invoker) bidiStream(
protoErr = internal.ConvertErrorToProtoError(err)
}
// Break the send loop
result.NumUnsentRequests = int32(len(req.RequestMessages) - i)
break
}
if fullDuplex {
Expand Down
15 changes: 9 additions & 6 deletions internal/app/referenceserver/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,12 @@ func (s *conformanceServer) ServerStream(
if responseDefinition != nil { //nolint:nestif
internal.AddHeaders(responseDefinition.ResponseHeaders, stream.ResponseHeader())
internal.AddHeaders(responseDefinition.ResponseTrailers, stream.ResponseTrailer())
// Immediately send the headers/trailers on the stream so that they can be read by the client
if err := stream.Send(nil); err != nil {
return connect.NewError(connect.CodeInternal, fmt.Errorf("error sending on stream: %w", err))

if len(responseDefinition.ResponseData) > 0 {
// Immediately send the headers/trailers on the stream so that they can be read by the client
if err := stream.Send(nil); err != nil {
return connect.NewError(connect.CodeInternal, fmt.Errorf("error sending on stream: %w", err))
}
}

// Calculate the response delay if specified
Expand Down Expand Up @@ -278,7 +281,7 @@ func (s *conformanceServer) BidiStream(
internal.AddHeaders(responseDefinition.ResponseHeaders, stream.ResponseHeader())
internal.AddHeaders(responseDefinition.ResponseTrailers, stream.ResponseTrailer())

if fullDuplex {
if fullDuplex && len(responseDefinition.ResponseData) > 0 {
// Immediately send the headers on the stream so that they can be read by the client.
// We can only do this for full-duplex. For half-duplex operation, we must let client
// complete its upload before trying to send anything.
Expand All @@ -294,7 +297,7 @@ func (s *conformanceServer) BidiStream(

// If fullDuplex, then send one of the desired responses each time we get a message on the stream
if fullDuplex {
if responseDefinition == nil || respNum >= len(responseDefinition.ResponseData) {
if respNum >= len(responseDefinition.GetResponseData()) {
// If there are no responses to send, then break the receive loop
// and throw the error specified
break
Expand Down Expand Up @@ -331,7 +334,7 @@ func (s *conformanceServer) BidiStream(
}
}

if !fullDuplex {
if !fullDuplex && len(responseDefinition.GetResponseData()) > 0 {
// Now that upload is complete, we can immediately send headers for half-duplex calls.
if err := stream.Send(nil); err != nil {
return connect.NewError(connect.CodeInternal, fmt.Errorf("error sending on stream: %w", err))
Expand Down

0 comments on commit ebf0680

Please sign in to comment.