-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
client: send RST_STREAM on client-side errors to prevent server from blocking #1823
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code looks good! Just a few comments on the test case, and a minor one in the code.
test/end2end_test.go
Outdated
func testClientResourceExhaustedCancelFullDuplex(t *testing.T, e env) { | ||
te := newTest(t, e) | ||
recvErr := make(chan error, 1) | ||
timeoutErr := errors.New("1s timeout reached") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is only referenced once -- just inline it.
test/end2end_test.go
Outdated
if err != nil { | ||
t.Fatalf("expecting <nil> error, got %v", err) | ||
} | ||
// create a payload that's larger than the default flow control window. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can actually send a smaller payload in a loop and get the same effect. That way no matter how big the flow control window is, you're guaranteed to get stuck eventually
test/end2end_test.go
Outdated
if _, err := stream.Recv(); status.Code(err) != codes.ResourceExhausted { | ||
t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) | ||
} | ||
select { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need a select/case here, just err = <-recvErr
test/end2end_test.go
Outdated
ts := &funcServer{fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error { | ||
_, err := stream.Recv() | ||
if err != nil { | ||
t.Fatalf("expecting <nil> error, got %v", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
t.Fatalf
doesn't actually terminate the test if it's done in a separate goroutine.
If you return the error instead, your test will fail anyway because of the other checks. Just make sure you do a "defer close(recvErr)" at the top of this function.
test/end2end_test.go
Outdated
}() | ||
select { | ||
case err = <-ce: | ||
case <-time.After(time.Second): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
5s or 10s would be better here to help avoid the possibility of a flake caused by a hiccup in travis. It should only ever be reached in an error case, so we can be pretty generous.
transport/transport.go
Outdated
mu sync.RWMutex // guard the following | ||
headerOk bool // becomes true from the first header is about to send | ||
state streamState | ||
rstReceived bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit - locate this with the other rst fields (which are also guarded by the mutex(!))
Also, please add a comment.
test/end2end_test.go
Outdated
_, err := stream.Recv() | ||
if err != nil { | ||
t.Fatalf("expecting <nil> error, got %v", err) | ||
return err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A little nitty, but to be safe, this and the return
below should return a specific new status error that is not ResourceExhausted
.
fix #1809
Previously, we only send reset stream if the rpc error is originated on client side at transport layer, which leave the grpc layer case unhandled.