From e0290ad59a21987051b34819e0a862a0847f4400 Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Fri, 6 Sep 2024 16:08:58 +0530 Subject: [PATCH] Address review comments --- balancer/pickfirstleaf/pickfirstleaf.go | 5 ++ .../pickfirstleaf/test/pickfirstleaf_test.go | 54 +++++++++++++++++++ 2 files changed, 59 insertions(+) diff --git a/balancer/pickfirstleaf/pickfirstleaf.go b/balancer/pickfirstleaf/pickfirstleaf.go index bbfdc5a436cf..6773bec06e1a 100644 --- a/balancer/pickfirstleaf/pickfirstleaf.go +++ b/balancer/pickfirstleaf/pickfirstleaf.go @@ -151,6 +151,10 @@ type pickfirstBalancer struct { firstPass bool } +// ResolverError is called by the ClientConn when the name resolver producers an +// an error. If the resolver returns an error before sending the first update, +// it is handled by the gracefulswitch balancer (which is always the top-level +// LB policy on any channel), so we don't need to handle that here. func (b *pickfirstBalancer) ResolverError(err error) { doneCh := make(chan error, 1) b.serializer.ScheduleOr(func(_ context.Context) { @@ -159,6 +163,7 @@ func (b *pickfirstBalancer) ResolverError(err error) { }, func() { close(doneCh) }) + <-doneCh } // Only executed in the context of a serializer callback. diff --git a/balancer/pickfirstleaf/test/pickfirstleaf_test.go b/balancer/pickfirstleaf/test/pickfirstleaf_test.go index 2c09360c36ec..b2b7cc289922 100644 --- a/balancer/pickfirstleaf/test/pickfirstleaf_test.go +++ b/balancer/pickfirstleaf/test/pickfirstleaf_test.go @@ -840,6 +840,60 @@ func (s) TestPickFirstLeaf_EmptyAddressList(t *testing.T) { } } +// TestPickFirstLeaf_InitialResolverError verifies the behaviour when a resolver +// returns an error before any valid configuration. +func (s) TestPickFirstLeaf_InitialResolverError(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + r := manual.NewBuilderWithScheme("whatever") + balChan := make(chan *stateStoringBalancer, 1) + balancer.Register(&stateStoringBalancerBuilder{balancer: balChan}) + + dopts := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithResolvers(r), + grpc.WithDefaultServiceConfig(stateStoringServiceConfig), + } + cc, err := grpc.NewClient(r.Scheme()+":///test.server", dopts...) + if err != nil { + t.Fatalf("grpc.NewClient() failed: %v", err) + } + + ccClosed := false + defer func() { + if !ccClosed { + cc.Close() + } + }() + + stateSubscriber := &ccStateSubscriber{} + internal.SubscribeToConnectivityStateChanges.(func(cc *grpc.ClientConn, s grpcsync.Subscriber) func())(cc, stateSubscriber) + // At this point, the resolver has not returned any addresses to the channel. + // This RPC must block until the context expires. + sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer sCancel() + client := testgrpc.NewTestServiceClient(cc) + if _, err := client.EmptyCall(sCtx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded { + t.Fatalf("EmptyCall() = %s, want %s", status.Code(err), codes.DeadlineExceeded) + } + r.CC.ReportError(fmt.Errorf("test error")) + testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure) + + // Close the clientconn to flush the connectivity state manager. + cc.Close() + ccClosed = true + + /// The clientconn should never transition to CONNECTING. + wantTransitions := []connectivity.State{ + connectivity.TransientFailure, + connectivity.Shutdown, + } + + if diff := cmp.Diff(wantTransitions, stateSubscriber.transitions); diff != "" { + t.Errorf("ClientConn states mismatch (-want +got):\n%s", diff) + } +} + // stateStoringBalancer stores the state of the subconns being created. type stateStoringBalancer struct { balancer.Balancer