Skip to content

Commit

Permalink
pickfirst: receive state updates via callback instead of UpdateSubCon…
Browse files Browse the repository at this point in the history
…nState (#6495)
  • Loading branch information
dfawley committed Aug 4, 2023
1 parent 7aceafc commit d06ab0d
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 24 deletions.
27 changes: 16 additions & 11 deletions clientconn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1143,18 +1143,10 @@ func testDefaultServiceConfigWhenResolverReturnInvalidServiceConfig(t *testing.T
}

type stateRecordingBalancer struct {
notifier chan<- connectivity.State
balancer.Balancer
}

func (b *stateRecordingBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) {
b.notifier <- s.ConnectivityState
b.Balancer.UpdateSubConnState(sc, s)
}

func (b *stateRecordingBalancer) ResetNotifier(r chan<- connectivity.State) {
b.notifier = r
}
func (b *stateRecordingBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) {}

func (b *stateRecordingBalancer) Close() {
b.Balancer.Close()
Expand All @@ -1179,8 +1171,7 @@ func (b *stateRecordingBalancerBuilder) Build(cc balancer.ClientConn, opts balan
b.notifier = stateNotifications
b.mu.Unlock()
return &stateRecordingBalancer{
notifier: stateNotifications,
Balancer: balancer.Get("pick_first").Build(cc, opts),
Balancer: balancer.Get("pick_first").Build(&stateRecordingCCWrapper{cc, stateNotifications}, opts),
}
}

Expand All @@ -1192,6 +1183,20 @@ func (b *stateRecordingBalancerBuilder) nextStateNotifier() <-chan connectivity.
return ret
}

type stateRecordingCCWrapper struct {
balancer.ClientConn
notifier chan<- connectivity.State
}

func (ccw *stateRecordingCCWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
oldListener := opts.StateListener
opts.StateListener = func(s balancer.SubConnState) {
ccw.notifier <- s.ConnectivityState
oldListener(s)
}
return ccw.ClientConn.NewSubConn(addrs, opts)
}

// Keep reading until something causes the connection to die (EOF, server
// closed, etc). Useful as a tool for mindlessly keeping the connection
// healthy, since the client will error if things like client prefaces are not
Expand Down
13 changes: 12 additions & 1 deletion pickfirst.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,12 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState
return nil
}

subConn, err := b.cc.NewSubConn(addrs, balancer.NewSubConnOptions{})
var subConn balancer.SubConn
subConn, err := b.cc.NewSubConn(addrs, balancer.NewSubConnOptions{
StateListener: func(state balancer.SubConnState) {
b.updateSubConnState(subConn, state)
},
})
if err != nil {
if b.logger.V(2) {
b.logger.Infof("Failed to create new SubConn: %v", err)
Expand All @@ -168,7 +173,13 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState
return nil
}

// UpdateSubConnState is unused as a StateListener is always registered when
// creating SubConns.
func (b *pickfirstBalancer) UpdateSubConnState(subConn balancer.SubConn, state balancer.SubConnState) {
b.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", subConn, state)
}

func (b *pickfirstBalancer) updateSubConnState(subConn balancer.SubConn, state balancer.SubConnState) {
if b.logger.V(2) {
b.logger.Infof("Received SubConn state update: %p, %+v", subConn, state)
}
Expand Down
27 changes: 15 additions & 12 deletions test/clientconn_state_transition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,19 +444,9 @@ func (s) TestStateTransitions_MultipleAddrsEntersReady(t *testing.T) {
}

type stateRecordingBalancer struct {
notifier chan<- connectivity.State
balancer.Balancer
}

func (b *stateRecordingBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) {
b.notifier <- s.ConnectivityState
b.Balancer.UpdateSubConnState(sc, s)
}

func (b *stateRecordingBalancer) ResetNotifier(r chan<- connectivity.State) {
b.notifier = r
}

func (b *stateRecordingBalancer) Close() {
b.Balancer.Close()
}
Expand All @@ -480,8 +470,7 @@ func (b *stateRecordingBalancerBuilder) Build(cc balancer.ClientConn, opts balan
b.notifier = stateNotifications
b.mu.Unlock()
return &stateRecordingBalancer{
notifier: stateNotifications,
Balancer: balancer.Get("pick_first").Build(cc, opts),
Balancer: balancer.Get("pick_first").Build(&stateRecordingCCWrapper{cc, stateNotifications}, opts),
}
}

Expand All @@ -493,6 +482,20 @@ func (b *stateRecordingBalancerBuilder) nextStateNotifier() <-chan connectivity.
return ret
}

type stateRecordingCCWrapper struct {
balancer.ClientConn
notifier chan<- connectivity.State
}

func (ccw *stateRecordingCCWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
oldListener := opts.StateListener
opts.StateListener = func(s balancer.SubConnState) {
ccw.notifier <- s.ConnectivityState
oldListener(s)
}
return ccw.ClientConn.NewSubConn(addrs, opts)
}

// Keep reading until something causes the connection to die (EOF, server
// closed, etc). Useful as a tool for mindlessly keeping the connection
// healthy, since the client will error if things like client prefaces are not
Expand Down

0 comments on commit d06ab0d

Please sign in to comment.