-
Notifications
You must be signed in to change notification settings - Fork 4.5k
grpc: Fix cardinality violations in non-client streaming RPCs. #8385
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #8385 +/- ##
==========================================
- Coverage 82.44% 82.34% -0.10%
==========================================
Files 413 413
Lines 40424 40532 +108
==========================================
+ Hits 33328 33377 +49
- Misses 5742 5787 +45
- Partials 1354 1368 +14
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to implement all of this in server.go
to avoid adding state to the serverStream
?
As discussed this with @arjan-bal offline, Cardinality violations can only be detected when messages are being read from the stream. This reading process occurs specifically within the server.RecvMsg() function. Since RecvMsg() is invoked from the user-implemented handler, it's not possible to detect cardinality violations during the initial stream setup phase. |
stream.go
Outdated
@@ -1820,7 +1820,7 @@ func (ss *serverStream) RecvMsg(m any) (err error) { | |||
} else if err != nil { | |||
return err | |||
} | |||
return status.Error(codes.Internal, "cardinality violation: expected <EOF> for non client-streaming RPCs, but received another message") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Non-client-streaming was right before, wasn't it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, didn't realise while copying it.
Made the changes.
desc := &grpc.StreamDesc{ | ||
StreamName: "StreamingOutputCall", | ||
ServerStreams: true, | ||
ClientStreams: false, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct. This exact test, pretty much, but set this field to true
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except the few tiny things. Thanks!
test/end2end_test.go
Outdated
// Second call to SendMsg should fail with Internal error and result in closing | ||
// the connection with a RST_STREAM. | ||
func (s) TestServerStreaming_ClientCallSendMsgTwice(t *testing.T) { | ||
// To ensure server.recvMsg() is successfully completed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, but can you explain more like how I did, to say why the synchronization is needed? Because there would be a race between client cancellation and the server reading the first request message.
test/end2end_test.go
Outdated
|
||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) | ||
defer cancel() | ||
cc, err := grpc.NewClient(ss.Address, grpc.WithTransportCredentials(insecure.NewCredentials())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you switch all these tests to use local credentials instead of insecure?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
test/end2end_test.go
Outdated
|
||
// Tests the behavior of client for server-side streaming RPC when client sends zero request messages. | ||
func (s) TestServerStreaming_ClientSendsZeroRequests(t *testing.T) { | ||
t.Skip() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a string here and include the issue number (#7286)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use the t.Skip
string instead of adding a comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
test/end2end_test.go
Outdated
// Second call to SendMsg should fail with Internal error and result in closing | ||
// the connection with a RST_STREAM. | ||
func (s) TestServerStreaming_ClientCallSendMsgTwice(t *testing.T) { | ||
// To ensure server.recvMsg() is successfully completed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please leave the function-level comment to just a description of the test, and put the explanation of the various steps in intra-function comments.
test/end2end_test.go
Outdated
|
||
// Tests the behavior of client for server-side streaming RPC when client sends zero request messages. | ||
func (s) TestServerStreaming_ClientSendsZeroRequests(t *testing.T) { | ||
t.Skip() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use the t.Skip
string instead of adding a comment.
test/end2end_test.go
Outdated
// and validating expected error codes. | ||
// Tests the behavior for server-side streaming when client calls SendMsg twice. | ||
// Second call to SendMsg should fail with Internal error and result in closing | ||
// the connection with a RST_STREAM. | ||
func (s) TestServerStreaming_ClientCallSendMsgTwice(t *testing.T) { | ||
// To ensure server.recvMsg() is successfully completed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I was hoping for was more like this:
// To ensure server.recvMsg() is successfully completed. | |
// To ensure server.recvMsg() is successfully completed. Otherwise, if the client application | |
// attempts to send a second request message, that will trigger a RST_STREAM from the | |
// client due to the application violating the RPC's protocol. The RST_STREAM will prevent | |
// the method handler from being called. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
Partially addresses: #7286
In non-client streaming RPCs, the client's SendMsg() method is designed to automatically close the send operation after its initial call. If someone attempts to call Client.SendMsg() twice for non-client streaming RPCs, if will return with error
Internal desc = SendMsg called after CloseSend
.To mirror this behavior, the server-side logic has been updated so that calling RecvMsg() more than once for non-client streaming RPCs will now similarly return an
Internal
error.RELEASE NOTES: