From ca8575dd6ad2b25b19859fa2bcc318e0ca0c02f2 Mon Sep 17 00:00:00 2001 From: Zac Bergquist Date: Tue, 21 Jun 2022 09:19:38 -0600 Subject: [PATCH] Workaround for GRPC unexpected EOF A number of our GRPC tests have been failing with an unexpected EOF error. It appears to be related to grpc/grpc-go#5358. Apply the suggested window size workaround in tests that have been known to fail with this error. --- api/client/client_test.go | 18 +++++++++++++++--- lib/utils/grpc_test.go | 13 +++++++++---- 2 files changed, 24 insertions(+), 7 deletions(-) diff --git a/api/client/client_test.go b/api/client/client_test.go index 32e30969d4771..5f2095d3c47e5 100644 --- a/api/client/client_test.go +++ b/api/client/client_test.go @@ -74,6 +74,7 @@ func WithConfig(cfg Config) ConfigOpt { } func (m *mockServer) NewClient(ctx context.Context, opts ...ConfigOpt) (*Client, error) { + const windowSize = 65536 cfg := Config{ Addrs: []string{m.addr}, Credentials: []Credentials{ @@ -81,6 +82,11 @@ func (m *mockServer) NewClient(ctx context.Context, opts ...ConfigOpt) (*Client, }, DialOpts: []grpc.DialOption{ grpc.WithTransportCredentials(insecure.NewCredentials()), // TODO(Joerger) remove insecure dial option + + // Set a large window size to disable dynamic window estimation + // See: https://github.com/grpc/grpc-go/issues/5358 + grpc.WithInitialWindowSize(windowSize), + grpc.WithInitialConnWindowSize(windowSize), }, } @@ -103,9 +109,8 @@ func startMockServerWithListener(t *testing.T, l net.Listener) *mockServer { srv := newMockServer(l.Addr().String()) t.Cleanup(srv.grpc.Stop) - go func() { - require.NoError(t, srv.grpc.Serve(l)) - }() + go srv.grpc.Serve(l) + return srv } @@ -116,6 +121,7 @@ func (m *mockServer) Ping(ctx context.Context, req *proto.PingRequest) (*proto.P func (m *mockServer) ListResources(ctx context.Context, req *proto.ListResourcesRequest) (*proto.ListResourcesResponse, error) { resources, err := testResources(req.ResourceType, req.Namespace) if err != nil { + println("!!!!! 118") return nil, trail.ToGRPC(err) } @@ -143,6 +149,7 @@ func (m *mockServer) ListResources(ctx context.Context, req *proto.ListResources case types.KindDatabaseServer: database, ok := resource.(*types.DatabaseServerV3) if !ok { + println("!!!!! 146") return nil, trace.Errorf("database server has invalid type %T", resource) } @@ -150,6 +157,7 @@ func (m *mockServer) ListResources(ctx context.Context, req *proto.ListResources case types.KindAppServer: app, ok := resource.(*types.AppServerV3) if !ok { + println("!!!!! 154") return nil, trace.Errorf("application server has invalid type %T", resource) } @@ -157,6 +165,7 @@ func (m *mockServer) ListResources(ctx context.Context, req *proto.ListResources case types.KindNode: srv, ok := resource.(*types.ServerV2) if !ok { + println("!!!!! 162") return nil, trace.Errorf("node has invalid type %T", resource) } @@ -164,6 +173,7 @@ func (m *mockServer) ListResources(ctx context.Context, req *proto.ListResources case types.KindKubeService: srv, ok := resource.(*types.ServerV2) if !ok { + println("!!!!! 170") return nil, trace.Errorf("kubernetes service has invalid type %T", resource) } @@ -171,6 +181,7 @@ func (m *mockServer) ListResources(ctx context.Context, req *proto.ListResources case types.KindWindowsDesktop: desktop, ok := resource.(*types.WindowsDesktopV3) if !ok { + println("!!!!! 178") return nil, trace.Errorf("windows desktop has invalid type %T", resource) } @@ -516,6 +527,7 @@ func TestListResources(t *testing.T) { // Create client clt, err := srv.NewClient(ctx) require.NoError(t, err) + t.Cleanup(func() { clt.Close() }) for name, test := range testCases { t.Run(name, func(t *testing.T) { diff --git a/lib/utils/grpc_test.go b/lib/utils/grpc_test.go index b4c958215d2d1..bf3501846c3b3 100644 --- a/lib/utils/grpc_test.go +++ b/lib/utils/grpc_test.go @@ -103,11 +103,17 @@ func TestGRPCErrorWrapping(t *testing.T) { }() defer server.Stop() + const windowSize = 65536 conn, err := grpc.Dial( listener.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithChainUnaryInterceptor(GRPCClientUnaryErrorInterceptor), grpc.WithChainStreamInterceptor(GRPCClientStreamErrorInterceptor), + + // Set a large window size to disable dynamic window estimation + // See: https://github.com/grpc/grpc-go/issues/5358 + grpc.WithInitialWindowSize(windowSize), + grpc.WithInitialConnWindowSize(windowSize), ) require.NoError(t, err) defer conn.Close() @@ -125,10 +131,9 @@ func TestGRPCErrorWrapping(t *testing.T) { sendErr := stream.Send(&pb.EchoRequest{Message: "Hi!"}) - // io.EOF means the server closed the stream, which can - // happen depending in timing. In either case, it is - // still safe to recv from the stream and check for - // the already exists error. + // io.EOF means the server closed the stream. + // It is still safe to recv from the stream and check for + // the already exists error though. if sendErr != nil && !errors.Is(sendErr, io.EOF) { require.FailNowf(t, "unexpected error", "%v", sendErr) }