Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
arjan-bal committed Sep 6, 2024
1 parent 2b2c2a3 commit e0290ad
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 0 deletions.
5 changes: 5 additions & 0 deletions balancer/pickfirstleaf/pickfirstleaf.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ type pickfirstBalancer struct {
firstPass bool
}

// ResolverError is called by the ClientConn when the name resolver producers an
// an error. If the resolver returns an error before sending the first update,
// it is handled by the gracefulswitch balancer (which is always the top-level
// LB policy on any channel), so we don't need to handle that here.
func (b *pickfirstBalancer) ResolverError(err error) {
doneCh := make(chan error, 1)
b.serializer.ScheduleOr(func(_ context.Context) {
Expand All @@ -159,6 +163,7 @@ func (b *pickfirstBalancer) ResolverError(err error) {
}, func() {
close(doneCh)
})
<-doneCh
}

// Only executed in the context of a serializer callback.
Expand Down
54 changes: 54 additions & 0 deletions balancer/pickfirstleaf/test/pickfirstleaf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -840,6 +840,60 @@ func (s) TestPickFirstLeaf_EmptyAddressList(t *testing.T) {
}
}

// TestPickFirstLeaf_InitialResolverError verifies the behaviour when a resolver
// returns an error before any valid configuration.
func (s) TestPickFirstLeaf_InitialResolverError(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
r := manual.NewBuilderWithScheme("whatever")
balChan := make(chan *stateStoringBalancer, 1)
balancer.Register(&stateStoringBalancerBuilder{balancer: balChan})

dopts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithResolvers(r),
grpc.WithDefaultServiceConfig(stateStoringServiceConfig),
}
cc, err := grpc.NewClient(r.Scheme()+":///test.server", dopts...)
if err != nil {
t.Fatalf("grpc.NewClient() failed: %v", err)
}

ccClosed := false
defer func() {
if !ccClosed {
cc.Close()
}
}()

stateSubscriber := &ccStateSubscriber{}
internal.SubscribeToConnectivityStateChanges.(func(cc *grpc.ClientConn, s grpcsync.Subscriber) func())(cc, stateSubscriber)
// At this point, the resolver has not returned any addresses to the channel.
// This RPC must block until the context expires.
sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer sCancel()
client := testgrpc.NewTestServiceClient(cc)
if _, err := client.EmptyCall(sCtx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded {
t.Fatalf("EmptyCall() = %s, want %s", status.Code(err), codes.DeadlineExceeded)
}
r.CC.ReportError(fmt.Errorf("test error"))
testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)

// Close the clientconn to flush the connectivity state manager.
cc.Close()
ccClosed = true

/// The clientconn should never transition to CONNECTING.
wantTransitions := []connectivity.State{
connectivity.TransientFailure,
connectivity.Shutdown,
}

if diff := cmp.Diff(wantTransitions, stateSubscriber.transitions); diff != "" {
t.Errorf("ClientConn states mismatch (-want +got):\n%s", diff)
}
}

// stateStoringBalancer stores the state of the subconns being created.
type stateStoringBalancer struct {
balancer.Balancer
Expand Down

0 comments on commit e0290ad

Please sign in to comment.