From d06ab0d4b9706329e7609938a5a7453ee3bdf77b Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Fri, 4 Aug 2023 08:14:18 -0700 Subject: [PATCH] pickfirst: receive state updates via callback instead of UpdateSubConnState (#6495) --- clientconn_test.go | 27 ++++++++++++++---------- pickfirst.go | 13 +++++++++++- test/clientconn_state_transition_test.go | 27 +++++++++++++----------- 3 files changed, 43 insertions(+), 24 deletions(-) diff --git a/clientconn_test.go b/clientconn_test.go index 281c9618606f..c742e144a94c 100644 --- a/clientconn_test.go +++ b/clientconn_test.go @@ -1143,18 +1143,10 @@ func testDefaultServiceConfigWhenResolverReturnInvalidServiceConfig(t *testing.T } type stateRecordingBalancer struct { - notifier chan<- connectivity.State balancer.Balancer } -func (b *stateRecordingBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) { - b.notifier <- s.ConnectivityState - b.Balancer.UpdateSubConnState(sc, s) -} - -func (b *stateRecordingBalancer) ResetNotifier(r chan<- connectivity.State) { - b.notifier = r -} +func (b *stateRecordingBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) {} func (b *stateRecordingBalancer) Close() { b.Balancer.Close() @@ -1179,8 +1171,7 @@ func (b *stateRecordingBalancerBuilder) Build(cc balancer.ClientConn, opts balan b.notifier = stateNotifications b.mu.Unlock() return &stateRecordingBalancer{ - notifier: stateNotifications, - Balancer: balancer.Get("pick_first").Build(cc, opts), + Balancer: balancer.Get("pick_first").Build(&stateRecordingCCWrapper{cc, stateNotifications}, opts), } } @@ -1192,6 +1183,20 @@ func (b *stateRecordingBalancerBuilder) nextStateNotifier() <-chan connectivity. return ret } +type stateRecordingCCWrapper struct { + balancer.ClientConn + notifier chan<- connectivity.State +} + +func (ccw *stateRecordingCCWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { + oldListener := opts.StateListener + opts.StateListener = func(s balancer.SubConnState) { + ccw.notifier <- s.ConnectivityState + oldListener(s) + } + return ccw.ClientConn.NewSubConn(addrs, opts) +} + // Keep reading until something causes the connection to die (EOF, server // closed, etc). Useful as a tool for mindlessly keeping the connection // healthy, since the client will error if things like client prefaces are not diff --git a/pickfirst.go b/pickfirst.go index d41949f1c8a5..00dd7633ebf1 100644 --- a/pickfirst.go +++ b/pickfirst.go @@ -146,7 +146,12 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState return nil } - subConn, err := b.cc.NewSubConn(addrs, balancer.NewSubConnOptions{}) + var subConn balancer.SubConn + subConn, err := b.cc.NewSubConn(addrs, balancer.NewSubConnOptions{ + StateListener: func(state balancer.SubConnState) { + b.updateSubConnState(subConn, state) + }, + }) if err != nil { if b.logger.V(2) { b.logger.Infof("Failed to create new SubConn: %v", err) @@ -168,7 +173,13 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState return nil } +// UpdateSubConnState is unused as a StateListener is always registered when +// creating SubConns. func (b *pickfirstBalancer) UpdateSubConnState(subConn balancer.SubConn, state balancer.SubConnState) { + b.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", subConn, state) +} + +func (b *pickfirstBalancer) updateSubConnState(subConn balancer.SubConn, state balancer.SubConnState) { if b.logger.V(2) { b.logger.Infof("Received SubConn state update: %p, %+v", subConn, state) } diff --git a/test/clientconn_state_transition_test.go b/test/clientconn_state_transition_test.go index a14ff4588a0f..545e01246ab6 100644 --- a/test/clientconn_state_transition_test.go +++ b/test/clientconn_state_transition_test.go @@ -444,19 +444,9 @@ func (s) TestStateTransitions_MultipleAddrsEntersReady(t *testing.T) { } type stateRecordingBalancer struct { - notifier chan<- connectivity.State balancer.Balancer } -func (b *stateRecordingBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) { - b.notifier <- s.ConnectivityState - b.Balancer.UpdateSubConnState(sc, s) -} - -func (b *stateRecordingBalancer) ResetNotifier(r chan<- connectivity.State) { - b.notifier = r -} - func (b *stateRecordingBalancer) Close() { b.Balancer.Close() } @@ -480,8 +470,7 @@ func (b *stateRecordingBalancerBuilder) Build(cc balancer.ClientConn, opts balan b.notifier = stateNotifications b.mu.Unlock() return &stateRecordingBalancer{ - notifier: stateNotifications, - Balancer: balancer.Get("pick_first").Build(cc, opts), + Balancer: balancer.Get("pick_first").Build(&stateRecordingCCWrapper{cc, stateNotifications}, opts), } } @@ -493,6 +482,20 @@ func (b *stateRecordingBalancerBuilder) nextStateNotifier() <-chan connectivity. return ret } +type stateRecordingCCWrapper struct { + balancer.ClientConn + notifier chan<- connectivity.State +} + +func (ccw *stateRecordingCCWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { + oldListener := opts.StateListener + opts.StateListener = func(s balancer.SubConnState) { + ccw.notifier <- s.ConnectivityState + oldListener(s) + } + return ccw.ClientConn.NewSubConn(addrs, opts) +} + // Keep reading until something causes the connection to die (EOF, server // closed, etc). Useful as a tool for mindlessly keeping the connection // healthy, since the client will error if things like client prefaces are not