diff --git a/xds/internal/balancer/ringhash/picker.go b/xds/internal/balancer/ringhash/picker.go index dcea6d46e517..ec3b5605690d 100644 --- a/xds/internal/balancer/ringhash/picker.go +++ b/xds/internal/balancer/ringhash/picker.go @@ -143,6 +143,8 @@ func (p *picker) handleTransientFailure(e *ringEntry) (balancer.PickResult, erro return balancer.PickResult{}, fmt.Errorf("no connection is Ready") } +// nextSkippingDuplicates finds the next entry in the ring, with a different +// subconn from the given entry. func nextSkippingDuplicates(ring *ring, entry *ringEntry) *ringEntry { for next := ring.next(entry); next != entry; next = ring.next(next) { if next.sc != entry.sc { @@ -152,3 +154,28 @@ func nextSkippingDuplicates(ring *ring, entry *ringEntry) *ringEntry { // There's no qualifying next entry. return nil } + +// nextSkippingDuplicatesSubConn finds the next subconn in the ring, that's +// different from the given subconn. +func nextSkippingDuplicatesSubConn(ring *ring, sc *subConn) *subConn { + var entry *ringEntry + for _, it := range ring.items { + if it.sc == sc { + entry = it + break + } + } + if entry == nil { + // If the given subconn is not in the ring (e.g. it was deleted), return + // the first one. + if len(ring.items) > 0 { + return ring.items[0].sc + } + return nil + } + ee := nextSkippingDuplicates(ring, entry) + if ee == nil { + return nil + } + return ee.sc +} diff --git a/xds/internal/balancer/ringhash/ringhash.go b/xds/internal/balancer/ringhash/ringhash.go index f8a47f165bdf..4e9c1772b166 100644 --- a/xds/internal/balancer/ringhash/ringhash.go +++ b/xds/internal/balancer/ringhash/ringhash.go @@ -98,6 +98,10 @@ type subConn struct { // When connectivity state is updated to Idle for this SubConn, if // connectQueued is true, Connect() will be called on the SubConn. connectQueued bool + // attemptingToConnect indicates if this subconn is attempting to connect. + // It's set when queueConnect is called. It's unset when the state is + // changed to Ready/Shutdown, or Idle (and if connectQueued is false). + attemptingToConnect bool } // setState updates the state of this SubConn. @@ -113,6 +117,8 @@ func (sc *subConn) setState(s connectivity.State) { if sc.connectQueued { sc.connectQueued = false sc.sc.Connect() + } else { + sc.attemptingToConnect = false } case connectivity.Connecting: // Clear connectQueued if the SubConn isn't failing. This state @@ -122,11 +128,14 @@ func (sc *subConn) setState(s connectivity.State) { // Clear connectQueued if the SubConn isn't failing. This state // transition is unlikely to happen, but handle this just in case. sc.connectQueued = false + sc.attemptingToConnect = false // Set to a non-failing state. sc.failing = false case connectivity.TransientFailure: // Set to a failing state. sc.failing = true + case connectivity.Shutdown: + sc.attemptingToConnect = false } sc.state = s } @@ -149,6 +158,7 @@ func (sc *subConn) effectiveState() connectivity.State { func (sc *subConn) queueConnect() { sc.mu.Lock() defer sc.mu.Unlock() + sc.attemptingToConnect = true if sc.state == connectivity.Idle { sc.sc.Connect() return @@ -158,6 +168,12 @@ func (sc *subConn) queueConnect() { sc.connectQueued = true } +func (sc *subConn) isAttemptingToConnect() bool { + sc.mu.Lock() + defer sc.mu.Unlock() + return sc.attemptingToConnect +} + type ringhashBalancer struct { cc balancer.ClientConn logger *grpclog.PrefixLogger @@ -268,7 +284,8 @@ func (b *ringhashBalancer) UpdateClientConnState(s balancer.ClientConnState) err var err error b.ring, err = newRing(b.subConns, b.config.MinRingSize, b.config.MaxRingSize) if err != nil { - panic(err) + b.ResolverError(fmt.Errorf("ringhash failed to make a new ring: %v", err)) + return balancer.ErrBadResolverState } b.regeneratePicker() b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker}) @@ -334,12 +351,6 @@ func (b *ringhashBalancer) UpdateSubConnState(sc balancer.SubConn, state balance switch s { case connectivity.Idle: - // When the overall state is TransientFailure, this will never get picks - // if there's a lower priority. Need to keep the SubConns connecting so - // there's a chance it will recover. - if b.state == connectivity.TransientFailure { - scs.queueConnect() - } // No need to send an update. No queued RPC can be unblocked. If the // overall state changed because of this, sendUpdate is already true. case connectivity.Connecting: @@ -364,6 +375,35 @@ func (b *ringhashBalancer) UpdateSubConnState(sc balancer.SubConn, state balance if sendUpdate { b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker}) } + + switch b.state { + case connectivity.Connecting, connectivity.TransientFailure: + // When overall state is TransientFailure, we need to make sure at least + // one SubConn is attempting to connect, otherwise this balancer may + // never get picks if the parent is priority. + // + // Because we report Connecting as the overall state when only one + // SubConn is in TransientFailure, we do the same check for Connecting + // here. + // + // Note that this check also covers deleting SubConns due to address + // change. E.g. if the SubConn attempting to connect is deleted, and the + // overall state is TF. Since there must be at least one SubConn + // attempting to connect, we need to trigger one. But since the deleted + // SubConn will eventually send a shutdown update, this code will run + // and trigger the next SubConn to connect. + for _, sc := range b.subConns { + if sc.isAttemptingToConnect() { + return + } + } + // Trigger a SubConn (this updated SubConn's next SubConn in the ring) + // to connect if nobody is attempting to connect. + sc := nextSkippingDuplicatesSubConn(b.ring, scs) + if sc != nil { + sc.queueConnect() + } + } } // mergeErrors builds an error from the last connection error and the last @@ -395,6 +435,7 @@ func (b *ringhashBalancer) Close() {} // // It's not thread safe. type connectivityStateEvaluator struct { + sum uint64 nums [5]uint64 } @@ -404,6 +445,7 @@ type connectivityStateEvaluator struct { // - If there is at least one subchannel in READY state, report READY. // - If there are 2 or more subchannels in TRANSIENT_FAILURE state, report TRANSIENT_FAILURE. // - If there is at least one subchannel in CONNECTING state, report CONNECTING. +// - If there is one subchannel in TRANSIENT_FAILURE and there is more than one subchannel, report state CONNECTING. // - If there is at least one subchannel in Idle state, report Idle. // - Otherwise, report TRANSIENT_FAILURE. // @@ -417,6 +459,14 @@ func (cse *connectivityStateEvaluator) recordTransition(oldState, newState conne updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new. cse.nums[state] += updateVal } + if oldState == connectivity.Shutdown { + // There's technically no transition from Shutdown. But we record a + // Shutdown->Idle transition when a new SubConn is created. + cse.sum++ + } + if newState == connectivity.Shutdown { + cse.sum-- + } if cse.nums[connectivity.Ready] > 0 { return connectivity.Ready @@ -427,6 +477,9 @@ func (cse *connectivityStateEvaluator) recordTransition(oldState, newState conne if cse.nums[connectivity.Connecting] > 0 { return connectivity.Connecting } + if cse.nums[connectivity.TransientFailure] > 0 && cse.sum > 1 { + return connectivity.Connecting + } if cse.nums[connectivity.Idle] > 0 { return connectivity.Idle } diff --git a/xds/internal/balancer/ringhash/ringhash_test.go b/xds/internal/balancer/ringhash/ringhash_test.go index 015424cdafed..22586c60b154 100644 --- a/xds/internal/balancer/ringhash/ringhash_test.go +++ b/xds/internal/balancer/ringhash/ringhash_test.go @@ -365,8 +365,8 @@ func TestAddrWeightChange(t *testing.T) { } // TestSubConnToConnectWhenOverallTransientFailure covers the situation when the -// overall state is TransientFailure, the SubConns turning Idle will be -// triggered to Connect(). But not when the overall state is not +// overall state is TransientFailure, the SubConns turning Idle will trigger the +// next SubConn in the ring to Connect(). But not when the overall state is not // TransientFailure. func TestSubConnToConnectWhenOverallTransientFailure(t *testing.T) { wantAddrs := []resolver.Address{ @@ -377,30 +377,56 @@ func TestSubConnToConnectWhenOverallTransientFailure(t *testing.T) { _, b, p0 := setupTest(t, wantAddrs) ring0 := p0.(*picker).ring - // Turn all SubConns to TransientFailure. - for _, it := range ring0.items { - b.UpdateSubConnState(it.sc.sc, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) - } - - // The next one turning Idle should Connect(). + // Turn the first subconn to transient failure. sc0 := ring0.items[0].sc.sc + b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Idle}) + + // It will trigger the second subconn to connect (because overall state is + // Connect (when one subconn is TF)). + sc1 := ring0.items[1].sc.sc + select { + case <-sc1.(*testutils.TestSubConn).ConnectCh: + case <-time.After(defaultTestShortTimeout): + t.Fatalf("timeout waiting for Connect() from SubConn %v", sc1) + } + + // Turn the second subconn to TF. This will set the overall state to TF. + b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) + b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Idle}) + + // It will trigger the third subconn to connect. + sc2 := ring0.items[2].sc.sc + select { + case <-sc2.(*testutils.TestSubConn).ConnectCh: + case <-time.After(defaultTestShortTimeout): + t.Fatalf("timeout waiting for Connect() from SubConn %v", sc2) + } + + // Turn the third subconn to TF. This will set the overall state to TF. + b.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) + b.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Idle}) + + // It will trigger the first subconn to connect. select { case <-sc0.(*testutils.TestSubConn).ConnectCh: - case <-time.After(defaultTestTimeout): - t.Errorf("timeout waiting for Connect() from SubConn %v", sc0) + case <-time.After(defaultTestShortTimeout): + t.Fatalf("timeout waiting for Connect() from SubConn %v", sc0) } - // If this SubConn is ready. Other SubConns turning Idle will not Connect(). - b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) - b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + // Turn the third subconn to TF again. + b.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) + b.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Idle}) - // The third SubConn in the ring should connect. - sc1 := ring0.items[1].sc.sc - b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Idle}) + // This will not trigger any new Connect() on the SubConns, because sc0 is + // still attempting to connect, and we only need one SubConn to connect. select { + case <-sc0.(*testutils.TestSubConn).ConnectCh: + t.Fatalf("unexpected Connect() from SubConn %v", sc0) case <-sc1.(*testutils.TestSubConn).ConnectCh: - t.Errorf("unexpected Connect() from SubConn %v", sc1) + t.Fatalf("unexpected Connect() from SubConn %v", sc1) + case <-sc2.(*testutils.TestSubConn).ConnectCh: + t.Fatalf("unexpected Connect() from SubConn %v", sc2) case <-time.After(defaultTestShortTimeout): } }