diff --git a/balancer/base/balancer.go b/balancer/base/balancer.go index b0fa5192161..a7f1eeec8e6 100644 --- a/balancer/base/balancer.go +++ b/balancer/base/balancer.go @@ -105,7 +105,12 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error { addrsSet.Set(a, nil) if _, ok := b.subConns.Get(a); !ok { // a is a new address (not existing in b.subConns). - sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{HealthCheckEnabled: b.config.HealthCheck}) + var sc balancer.SubConn + opts := balancer.NewSubConnOptions{ + HealthCheckEnabled: b.config.HealthCheck, + StateListener: func(scs balancer.SubConnState) { b.updateSubConnState(sc, scs) }, + } + sc, err := b.cc.NewSubConn([]resolver.Address{a}, opts) if err != nil { logger.Warningf("base.baseBalancer: failed to create new SubConn: %v", err) continue @@ -124,7 +129,7 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error { sc.Shutdown() b.subConns.Delete(a) // Keep the state of this sc in b.scStates until sc's state becomes Shutdown. - // The entry will be deleted in UpdateSubConnState. + // The entry will be deleted in updateSubConnState. } } // If resolver state contains no addresses, return an error so ClientConn @@ -177,7 +182,12 @@ func (b *baseBalancer) regeneratePicker() { b.picker = b.pickerBuilder.Build(PickerBuildInfo{ReadySCs: readySCs}) } +// UpdateSubConnState is a nop because a StateListener is always set in NewSubConn. func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { + logger.Errorf("base.baseBalancer: UpdateSubConnState(%v, %+v) called unexpectedly", sc, state) +} + +func (b *baseBalancer) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { s := state.ConnectivityState if logger.V(2) { logger.Infof("base.baseBalancer: handle SubConn state change: %p, %v", sc, s) diff --git a/balancer/base/balancer_test.go b/balancer/base/balancer_test.go index 7bf4d92f8f0..8a97b4220a5 100644 --- a/balancer/base/balancer_test.go +++ b/balancer/base/balancer_test.go @@ -19,7 +19,9 @@ package base import ( + "context" "testing" + "time" "google.golang.org/grpc/attributes" "google.golang.org/grpc/balancer" @@ -38,7 +40,9 @@ func (c *testClientConn) NewSubConn(addrs []resolver.Address, opts balancer.NewS func (c *testClientConn) UpdateState(balancer.State) {} -type testSubConn struct{} +type testSubConn struct { + updateState func(balancer.SubConnState) +} func (sc *testSubConn) UpdateAddresses(addresses []resolver.Address) {} @@ -61,7 +65,11 @@ func (p *testPickBuilder) Build(info PickerBuildInfo) balancer.Picker { } func TestBaseBalancerReserveAttributes(t *testing.T) { - var v = func(info PickerBuildInfo) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + validated := make(chan struct{}, 1) + v := func(info PickerBuildInfo) { + defer func() { validated <- struct{}{} }() for _, sc := range info.ReadySCs { if sc.Address.Addr == "1.1.1.1" { if sc.Address.Attributes == nil { @@ -80,8 +88,8 @@ func TestBaseBalancerReserveAttributes(t *testing.T) { } pickBuilder := &testPickBuilder{validate: v} b := (&baseBuilder{pickerBuilder: pickBuilder}).Build(&testClientConn{ - newSubConn: func(addrs []resolver.Address, _ balancer.NewSubConnOptions) (balancer.SubConn, error) { - return &testSubConn{}, nil + newSubConn: func(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { + return &testSubConn{updateState: opts.StateListener}, nil }, }, balancer.BuildOptions{}).(*baseBalancer) @@ -93,8 +101,18 @@ func TestBaseBalancerReserveAttributes(t *testing.T) { }, }, }) + select { + case <-validated: + case <-ctx.Done(): + t.Fatalf("timed out waiting for UpdateClientConnState to call picker.Build") + } for sc := range b.scStates { - b.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready, ConnectionError: nil}) + sc.(*testSubConn).updateState(balancer.SubConnState{ConnectivityState: connectivity.Ready, ConnectionError: nil}) + select { + case <-validated: + case <-ctx.Done(): + t.Fatalf("timed out waiting for UpdateClientConnState to call picker.Build") + } } }