Skip to content

Commit

Permalink
balancer: change roundrobin to accept empty address list (#3491)
Browse files Browse the repository at this point in the history
Roundrobin will remove all SubConns. The ClientConn will set SubConn state change to shutdown, and the overall state will turn transient failure.
  • Loading branch information
menghanl committed Apr 3, 2020
1 parent fe1d8e7 commit aedb136
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 26 deletions.
12 changes: 8 additions & 4 deletions balancer/base/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,6 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
if grpclog.V(2) {
grpclog.Infoln("base.baseBalancer: got new ClientConn state: ", s)
}
if len(s.ResolverState.Addresses) == 0 {
b.ResolverError(errors.New("produced zero addresses"))
return balancer.ErrBadResolverState
}
// Successful resolution; clear resolver error and ensure we return nil.
b.resolverErr = nil
// addrsSet is the set converted from addrs, it's used for quick lookup of an address.
Expand Down Expand Up @@ -144,6 +140,14 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
// The entry will be deleted in HandleSubConnStateChange.
}
}
// If resolver state contains no addresses, return an error so ClientConn
// will trigger re-resolve. Also records this as an resolver error, so when
// the overall state turns transient failure, the error message will have
// the zero address information.
if len(s.ResolverState.Addresses) == 0 {
b.ResolverError(errors.New("produced zero addresses"))
return balancer.ErrBadResolverState
}
return nil
}

Expand Down
25 changes: 7 additions & 18 deletions balancer/roundrobin/roundrobin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,31 +216,20 @@ func (s) TestAddressesRemoved(t *testing.T) {
}

r.UpdateState(resolver.State{Addresses: []resolver.Address{}})
// Removing addresses results in an error reported to the clientconn, but
// the existing connections remain. RPCs should still succeed.
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
}

// Stop the server to bring the channel state into transient failure.
test.cleanup()
// Wait for not-ready.
for src := cc.GetState(); src == connectivity.Ready; src = cc.GetState() {
if !cc.WaitForStateChange(ctx, src) {
t.Fatalf("timed out waiting for state change. got %v; want !%v", src, connectivity.Ready)
ctx2, cancel2 := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel2()
// Wait for state to change to transient failure.
for src := cc.GetState(); src != connectivity.TransientFailure; src = cc.GetState() {
if !cc.WaitForStateChange(ctx2, src) {
t.Fatalf("timed out waiting for state change. got %v; want %v", src, connectivity.TransientFailure)
}
}
// Report an empty server list again; because the state is not ready, the
// empty address list error should surface to the user.
r.UpdateState(resolver.State{Addresses: []resolver.Address{}})

const msgWant = "produced zero addresses"
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || !strings.Contains(status.Convert(err).Message(), msgWant) {
if _, err := testc.EmptyCall(ctx2, &testpb.Empty{}); err == nil || !strings.Contains(status.Convert(err).Message(), msgWant) {
t.Fatalf("EmptyCall() = _, %v, want _, Contains(Message(), %q)", err, msgWant)
}

}

func (s) TestCloseWithPendingRPC(t *testing.T) {
Expand Down
125 changes: 121 additions & 4 deletions xds/internal/balancer/edsbalancer/eds_impl_priority_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"testing"
"time"

corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
Expand Down Expand Up @@ -56,7 +57,6 @@ func (s) TestEDSPriority_HighPriorityReady(t *testing.T) {
p1 := <-cc.newPickerCh
want := []balancer.SubConn{sc1}
if err := isRoundRobin(want, subConnFromPicker(p1)); err != nil {
// t.Fatalf("want %v, got %v", want, err)
t.Fatalf("want %v, got %v", want, err)
}

Expand Down Expand Up @@ -123,7 +123,6 @@ func (s) TestEDSPriority_SwitchPriority(t *testing.T) {
p0 := <-cc.newPickerCh
want := []balancer.SubConn{sc0}
if err := isRoundRobin(want, subConnFromPicker(p0)); err != nil {
// t.Fatalf("want %v, got %v", want, err)
t.Fatalf("want %v, got %v", want, err)
}

Expand Down Expand Up @@ -426,7 +425,6 @@ func (s) TestEDSPriority_MultipleLocalities(t *testing.T) {
p0 := <-cc.newPickerCh
want := []balancer.SubConn{sc0}
if err := isRoundRobin(want, subConnFromPicker(p0)); err != nil {
// t.Fatalf("want %v, got %v", want, err)
t.Fatalf("want %v, got %v", want, err)
}

Expand All @@ -445,7 +443,6 @@ func (s) TestEDSPriority_MultipleLocalities(t *testing.T) {
p1 := <-cc.newPickerCh
want = []balancer.SubConn{sc1}
if err := isRoundRobin(want, subConnFromPicker(p1)); err != nil {
// t.Fatalf("want %v, got %v", want, err)
t.Fatalf("want %v, got %v", want, err)
}

Expand Down Expand Up @@ -657,3 +654,123 @@ func (s) TestPriorityType(t *testing.T) {
t.Errorf("want p1 to be equal to priority with value 1, got p1==1: %v", got)
}
}

// Test the case where the high priority contains no backends. The low priority
// will be used.
func (s) TestEDSPriority_HighPriorityNoEndpoints(t *testing.T) {
cc := newTestClientConn(t)
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
edsb.enqueueChildBalancerStateUpdate = edsb.updateState

// Two localities, with priorities [0, 1], each with one backend.
clab1 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab1.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab1.Build()))

addrs1 := <-cc.newSubConnAddrsCh
if got, want := addrs1[0].Addr, testEndpointAddrs[0]; got != want {
t.Fatalf("sc is created with addr %v, want %v", got, want)
}
sc1 := <-cc.newSubConnCh

// p0 is ready.
edsb.HandleSubConnStateChange(sc1, connectivity.Connecting)
edsb.HandleSubConnStateChange(sc1, connectivity.Ready)

// Test roundrobin with only p0 subconns.
p1 := <-cc.newPickerCh
want := []balancer.SubConn{sc1}
if err := isRoundRobin(want, subConnFromPicker(p1)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}

// Remove addresses from priority 0, should use p1.
clab2 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab2.AddLocality(testSubZones[0], 1, 0, nil, nil)
clab2.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab2.Build()))

// p0 will remove the subconn, and ClientConn will send a sc update to
// shutdown.
scToRemove := <-cc.removeSubConnCh
edsb.HandleSubConnStateChange(scToRemove, connectivity.Shutdown)

addrs2 := <-cc.newSubConnAddrsCh
if got, want := addrs2[0].Addr, testEndpointAddrs[1]; got != want {
t.Fatalf("sc is created with addr %v, want %v", got, want)
}
sc2 := <-cc.newSubConnCh

// p1 is ready.
edsb.HandleSubConnStateChange(sc2, connectivity.Connecting)
edsb.HandleSubConnStateChange(sc2, connectivity.Ready)

// Test roundrobin with only p1 subconns.
p2 := <-cc.newPickerCh
want = []balancer.SubConn{sc2}
if err := isRoundRobin(want, subConnFromPicker(p2)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
}

// Test the case where the high priority contains no healthy backends. The low
// priority will be used.
func (s) TestEDSPriority_HighPriorityAllUnhealthy(t *testing.T) {
cc := newTestClientConn(t)
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
edsb.enqueueChildBalancerStateUpdate = edsb.updateState

// Two localities, with priorities [0, 1], each with one backend.
clab1 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab1.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab1.Build()))

addrs1 := <-cc.newSubConnAddrsCh
if got, want := addrs1[0].Addr, testEndpointAddrs[0]; got != want {
t.Fatalf("sc is created with addr %v, want %v", got, want)
}
sc1 := <-cc.newSubConnCh

// p0 is ready.
edsb.HandleSubConnStateChange(sc1, connectivity.Connecting)
edsb.HandleSubConnStateChange(sc1, connectivity.Ready)

// Test roundrobin with only p0 subconns.
p1 := <-cc.newPickerCh
want := []balancer.SubConn{sc1}
if err := isRoundRobin(want, subConnFromPicker(p1)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}

// Set priority 0 endpoints to all unhealthy, should use p1.
clab2 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab2.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], &xdsclient.AddLocalityOptions{
Health: []corepb.HealthStatus{corepb.HealthStatus_UNHEALTHY},
})
clab2.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab2.Build()))

// p0 will remove the subconn, and ClientConn will send a sc update to
// transient failure.
scToRemove := <-cc.removeSubConnCh
edsb.HandleSubConnStateChange(scToRemove, connectivity.Shutdown)

addrs2 := <-cc.newSubConnAddrsCh
if got, want := addrs2[0].Addr, testEndpointAddrs[1]; got != want {
t.Fatalf("sc is created with addr %v, want %v", got, want)
}
sc2 := <-cc.newSubConnCh

// p1 is ready.
edsb.HandleSubConnStateChange(sc2, connectivity.Connecting)
edsb.HandleSubConnStateChange(sc2, connectivity.Ready)

// Test roundrobin with only p1 subconns.
p2 := <-cc.newPickerCh
want = []balancer.SubConn{sc2}
if err := isRoundRobin(want, subConnFromPicker(p2)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
}

0 comments on commit aedb136

Please sign in to comment.