Skip to content

Commit

Permalink
Workaround for GRPC unexpected EOF
Browse files Browse the repository at this point in the history
A number of our GRPC tests have been failing with an unexpected EOF
error. It appears to be related to grpc/grpc-go#5358.

Apply the suggested window size workaround in tests that have been known
to fail with this error.
  • Loading branch information
zmb3 committed Jun 21, 2022
1 parent d3db006 commit ca8575d
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 7 deletions.
18 changes: 15 additions & 3 deletions api/client/client_test.go
Expand Up @@ -74,13 +74,19 @@ func WithConfig(cfg Config) ConfigOpt {
}

func (m *mockServer) NewClient(ctx context.Context, opts ...ConfigOpt) (*Client, error) {
const windowSize = 65536
cfg := Config{
Addrs: []string{m.addr},
Credentials: []Credentials{
&mockInsecureTLSCredentials{}, // TODO(Joerger) replace insecure credentials
},
DialOpts: []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()), // TODO(Joerger) remove insecure dial option

// Set a large window size to disable dynamic window estimation
// See: https://github.com/grpc/grpc-go/issues/5358
grpc.WithInitialWindowSize(windowSize),
grpc.WithInitialConnWindowSize(windowSize),
},
}

Expand All @@ -103,9 +109,8 @@ func startMockServerWithListener(t *testing.T, l net.Listener) *mockServer {
srv := newMockServer(l.Addr().String())
t.Cleanup(srv.grpc.Stop)

go func() {
require.NoError(t, srv.grpc.Serve(l))
}()
go srv.grpc.Serve(l)

return srv
}

Expand All @@ -116,6 +121,7 @@ func (m *mockServer) Ping(ctx context.Context, req *proto.PingRequest) (*proto.P
func (m *mockServer) ListResources(ctx context.Context, req *proto.ListResourcesRequest) (*proto.ListResourcesResponse, error) {
resources, err := testResources(req.ResourceType, req.Namespace)
if err != nil {
println("!!!!! 118")
return nil, trail.ToGRPC(err)
}

Expand Down Expand Up @@ -143,34 +149,39 @@ func (m *mockServer) ListResources(ctx context.Context, req *proto.ListResources
case types.KindDatabaseServer:
database, ok := resource.(*types.DatabaseServerV3)
if !ok {
println("!!!!! 146")
return nil, trace.Errorf("database server has invalid type %T", resource)
}

protoResource = &proto.PaginatedResource{Resource: &proto.PaginatedResource_DatabaseServer{DatabaseServer: database}}
case types.KindAppServer:
app, ok := resource.(*types.AppServerV3)
if !ok {
println("!!!!! 154")
return nil, trace.Errorf("application server has invalid type %T", resource)
}

protoResource = &proto.PaginatedResource{Resource: &proto.PaginatedResource_AppServer{AppServer: app}}
case types.KindNode:
srv, ok := resource.(*types.ServerV2)
if !ok {
println("!!!!! 162")
return nil, trace.Errorf("node has invalid type %T", resource)
}

protoResource = &proto.PaginatedResource{Resource: &proto.PaginatedResource_Node{Node: srv}}
case types.KindKubeService:
srv, ok := resource.(*types.ServerV2)
if !ok {
println("!!!!! 170")
return nil, trace.Errorf("kubernetes service has invalid type %T", resource)
}

protoResource = &proto.PaginatedResource{Resource: &proto.PaginatedResource_KubeService{KubeService: srv}}
case types.KindWindowsDesktop:
desktop, ok := resource.(*types.WindowsDesktopV3)
if !ok {
println("!!!!! 178")
return nil, trace.Errorf("windows desktop has invalid type %T", resource)
}

Expand Down Expand Up @@ -516,6 +527,7 @@ func TestListResources(t *testing.T) {
// Create client
clt, err := srv.NewClient(ctx)
require.NoError(t, err)
t.Cleanup(func() { clt.Close() })

for name, test := range testCases {
t.Run(name, func(t *testing.T) {
Expand Down
13 changes: 9 additions & 4 deletions lib/utils/grpc_test.go
Expand Up @@ -103,11 +103,17 @@ func TestGRPCErrorWrapping(t *testing.T) {
}()
defer server.Stop()

const windowSize = 65536
conn, err := grpc.Dial(
listener.Addr().String(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithChainUnaryInterceptor(GRPCClientUnaryErrorInterceptor),
grpc.WithChainStreamInterceptor(GRPCClientStreamErrorInterceptor),

// Set a large window size to disable dynamic window estimation
// See: https://github.com/grpc/grpc-go/issues/5358
grpc.WithInitialWindowSize(windowSize),
grpc.WithInitialConnWindowSize(windowSize),
)
require.NoError(t, err)
defer conn.Close()
Expand All @@ -125,10 +131,9 @@ func TestGRPCErrorWrapping(t *testing.T) {

sendErr := stream.Send(&pb.EchoRequest{Message: "Hi!"})

// io.EOF means the server closed the stream, which can
// happen depending in timing. In either case, it is
// still safe to recv from the stream and check for
// the already exists error.
// io.EOF means the server closed the stream.
// It is still safe to recv from the stream and check for
// the already exists error though.
if sendErr != nil && !errors.Is(sendErr, io.EOF) {
require.FailNowf(t, "unexpected error", "%v", sendErr)
}
Expand Down

0 comments on commit ca8575d

Please sign in to comment.