Skip to content

Commit 20bd1e7

Browse files
authored
grpc: revert #8278: Fix cardinality violations in non-server streaming RPCs (#8404)
This reverts commit a64d933.
1 parent bdbe6a2 commit 20bd1e7

File tree

2 files changed

+6
-116
lines changed

2 files changed

+6
-116
lines changed

stream.go

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1138,10 +1138,6 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) {
11381138
if statusErr := a.transportStream.Status().Err(); statusErr != nil {
11391139
return statusErr
11401140
}
1141-
// Received no msg and status OK for non-server streaming rpcs.
1142-
if !cs.desc.ServerStreams {
1143-
return status.Error(codes.Internal, "cardinality violation: received no response message from non-streaming RPC")
1144-
}
11451141
return io.EOF // indicates successful end of stream.
11461142
}
11471143

@@ -1175,7 +1171,7 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) {
11751171
} else if err != nil {
11761172
return toRPCErr(err)
11771173
}
1178-
return status.Error(codes.Internal, "cardinality violation: expected <EOF> for non server-streaming RPCs, but received another message")
1174+
return status.Errorf(codes.Internal, "cardinality violation: expected <EOF> for non server-streaming RPCs, but received another message")
11791175
}
11801176

11811177
func (a *csAttempt) finish(err error) {
@@ -1482,10 +1478,6 @@ func (as *addrConnStream) RecvMsg(m any) (err error) {
14821478
if statusErr := as.transportStream.Status().Err(); statusErr != nil {
14831479
return statusErr
14841480
}
1485-
// Received no msg and status OK for non-server streaming rpcs.
1486-
if !as.desc.ServerStreams {
1487-
return status.Error(codes.Internal, "cardinality violation: received no response message from non-streaming RPC")
1488-
}
14891481
return io.EOF // indicates successful end of stream.
14901482
}
14911483
return toRPCErr(err)
@@ -1503,7 +1495,7 @@ func (as *addrConnStream) RecvMsg(m any) (err error) {
15031495
} else if err != nil {
15041496
return toRPCErr(err)
15051497
}
1506-
return status.Error(codes.Internal, "cardinality violation: expected <EOF> for non server-streaming RPCs, but received another message")
1498+
return status.Errorf(codes.Internal, "cardinality violation: expected <EOF> for non server-streaming RPCs, but received another message")
15071499
}
15081500

15091501
func (as *addrConnStream) finish(err error) {

test/end2end_test.go

Lines changed: 4 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -3589,6 +3589,9 @@ func testClientStreamingError(t *testing.T, e env) {
35893589
// Tests that a client receives a cardinality violation error for client-streaming
35903590
// RPCs if the server doesn't send a message before returning status OK.
35913591
func (s) TestClientStreamingCardinalityViolation_ServerHandlerMissingSendAndClose(t *testing.T) {
3592+
// TODO : https://github.com/grpc/grpc-go/issues/8119 - remove `t.Skip()`
3593+
// after this is fixed.
3594+
t.Skip()
35923595
ss := &stubserver.StubServer{
35933596
StreamingInputCallF: func(_ testgrpc.TestService_StreamingInputCallServer) error {
35943597
// Returning status OK without sending a response message.This is a
@@ -3737,113 +3740,8 @@ func (s) TestClientStreaming_ReturnErrorAfterSendAndClose(t *testing.T) {
37373740
}
37383741
}
37393742

3740-
// Tests that a client receives a cardinality violation error for unary
3741-
// RPCs if the server doesn't send a message before returning status OK.
3742-
func (s) TestUnaryRPC_ServerSendsOnlyTrailersWithOK(t *testing.T) {
3743-
lis, err := testutils.LocalTCPListener()
3744-
if err != nil {
3745-
t.Fatal(err)
3746-
}
3747-
defer lis.Close()
3748-
3749-
ss := grpc.UnknownServiceHandler(func(any, grpc.ServerStream) error {
3750-
return nil
3751-
})
3752-
3753-
s := grpc.NewServer(ss)
3754-
go s.Serve(lis)
3755-
defer s.Stop()
3756-
3757-
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
3758-
defer cancel()
3759-
cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
3760-
if err != nil {
3761-
t.Fatalf("grpc.NewClient(%q) failed unexpectedly: %v", lis.Addr(), err)
3762-
}
3763-
defer cc.Close()
3764-
3765-
client := testgrpc.NewTestServiceClient(cc)
3766-
if _, err = client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Internal {
3767-
t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), codes.Internal)
3768-
}
3769-
}
3770-
3771-
// Tests that client will receive cardinality violations when calling
3772-
// RecvMsg() multiple times for non-streaming response streams.
3773-
func (s) TestUnaryRPC_ClientCallRecvMsgTwice(t *testing.T) {
3774-
e := tcpTLSEnv
3775-
te := newTest(t, e)
3776-
defer te.tearDown()
3777-
3778-
te.startServer(&testServer{security: e.security})
3779-
3780-
cc := te.clientConn()
3781-
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
3782-
defer cancel()
3783-
3784-
desc := &grpc.StreamDesc{
3785-
StreamName: "UnaryCall",
3786-
ServerStreams: false,
3787-
ClientStreams: false,
3788-
}
3789-
stream, err := cc.NewStream(ctx, desc, "/grpc.testing.TestService/UnaryCall")
3790-
if err != nil {
3791-
t.Fatalf("cc.NewStream() failed unexpectedly: %v", err)
3792-
}
3793-
3794-
if err := stream.SendMsg(&testpb.SimpleRequest{}); err != nil {
3795-
t.Fatalf("stream.SendMsg(_) = %v, want <nil>", err)
3796-
}
3797-
3798-
resp := &testpb.SimpleResponse{}
3799-
if err := stream.RecvMsg(resp); err != nil {
3800-
t.Fatalf("stream.RecvMsg() = %v , want <nil>", err)
3801-
}
3802-
3803-
if err = stream.RecvMsg(resp); status.Code(err) != codes.Internal {
3804-
t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), codes.Internal)
3805-
}
3806-
}
3807-
3808-
// Tests that client will receive cardinality violations when calling
3809-
// RecvMsg() multiple times for non-streaming response streams.
3810-
func (s) TestClientStreaming_ClientCallRecvMsgTwice(t *testing.T) {
3811-
ss := stubserver.StubServer{
3812-
StreamingInputCallF: func(stream testgrpc.TestService_StreamingInputCallServer) error {
3813-
if err := stream.SendAndClose(&testpb.StreamingInputCallResponse{}); err != nil {
3814-
t.Errorf("stream.SendAndClose(_) = %v, want <nil>", err)
3815-
}
3816-
return nil
3817-
},
3818-
}
3819-
if err := ss.Start(nil); err != nil {
3820-
t.Fatal("Error starting server:", err)
3821-
}
3822-
defer ss.Stop()
3823-
3824-
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
3825-
defer cancel()
3826-
stream, err := ss.Client.StreamingInputCall(ctx)
3827-
if err != nil {
3828-
t.Fatalf(".StreamingInputCall(_) = _, %v, want <nil>", err)
3829-
}
3830-
if err := stream.Send(&testpb.StreamingInputCallRequest{}); err != nil {
3831-
t.Fatalf("stream.Send(_) = %v, want <nil>", err)
3832-
}
3833-
if err := stream.CloseSend(); err != nil {
3834-
t.Fatalf("stream.CloseSend() = %v, want <nil>", err)
3835-
}
3836-
resp := new(testpb.StreamingInputCallResponse)
3837-
if err := stream.RecvMsg(resp); err != nil {
3838-
t.Fatalf("stream.RecvMsg() = %v , want <nil>", err)
3839-
}
3840-
if err = stream.RecvMsg(resp); status.Code(err) != codes.Internal {
3841-
t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), codes.Internal)
3842-
}
3843-
}
3844-
38453743
// Tests that a client receives a cardinality violation error for client-streaming
3846-
// RPCs if the server call SendMsg() multiple times.
3744+
// RPCs if the server call SendMsg multiple times.
38473745
func (s) TestClientStreaming_ServerHandlerSendMsgAfterSendMsg(t *testing.T) {
38483746
ss := stubserver.StubServer{
38493747
StreamingInputCallF: func(stream testgrpc.TestService_StreamingInputCallServer) error {

0 commit comments

Comments
 (0)