From ebfe3be62a82434bc83fd7b36410141a603a96be Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Mon, 12 Jul 2021 16:42:02 -0700 Subject: [PATCH] cluster_resolver: implement resource resolver to resolve EDS and DNS (#4531) --- resolver/manual/manual.go | 20 +- .../clusterresolver/balancerconfig/type.go | 42 + .../clusterresolver/clusterresolver_test.go | 20 +- .../balancer/clusterresolver/eds_impl_test.go | 40 +- .../balancer/clusterresolver/priority_test.go | 44 +- .../clusterresolver/resource_resolver.go | 248 +++++ .../clusterresolver/resource_resolver_dns.go | 114 +++ .../clusterresolver/resource_resolver_test.go | 873 ++++++++++++++++++ xds/internal/testutils/fakeclient/client.go | 34 +- 9 files changed, 1371 insertions(+), 64 deletions(-) create mode 100644 xds/internal/balancer/clusterresolver/resource_resolver.go create mode 100644 xds/internal/balancer/clusterresolver/resource_resolver_dns.go create mode 100644 xds/internal/balancer/clusterresolver/resource_resolver_test.go diff --git a/resolver/manual/manual.go b/resolver/manual/manual.go index 3679d702ab9..f6e7b5ae358 100644 --- a/resolver/manual/manual.go +++ b/resolver/manual/manual.go @@ -27,7 +27,9 @@ import ( // NewBuilderWithScheme creates a new test resolver builder with the given scheme. func NewBuilderWithScheme(scheme string) *Resolver { return &Resolver{ + BuildCallback: func(resolver.Target, resolver.ClientConn, resolver.BuildOptions) {}, ResolveNowCallback: func(resolver.ResolveNowOptions) {}, + CloseCallback: func() {}, scheme: scheme, } } @@ -35,11 +37,17 @@ func NewBuilderWithScheme(scheme string) *Resolver { // Resolver is also a resolver builder. // It's build() function always returns itself. type Resolver struct { + // BuildCallback is called when the Build method is called. Must not be + // nil. Must not be changed after the resolver may be built. + BuildCallback func(resolver.Target, resolver.ClientConn, resolver.BuildOptions) // ResolveNowCallback is called when the ResolveNow method is called on the // resolver. Must not be nil. Must not be changed after the resolver may // be built. ResolveNowCallback func(resolver.ResolveNowOptions) - scheme string + // CloseCallback is called when the Close method is called. Must not be + // nil. Must not be changed after the resolver may be built. + CloseCallback func() + scheme string // Fields actually belong to the resolver. CC resolver.ClientConn @@ -54,6 +62,7 @@ func (r *Resolver) InitialState(s resolver.State) { // Build returns itself for Resolver, because it's both a builder and a resolver. func (r *Resolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { + r.BuildCallback(target, cc, opts) r.CC = cc if r.bootstrapState != nil { r.UpdateState(*r.bootstrapState) @@ -72,9 +81,16 @@ func (r *Resolver) ResolveNow(o resolver.ResolveNowOptions) { } // Close is a noop for Resolver. -func (*Resolver) Close() {} +func (r *Resolver) Close() { + r.CloseCallback() +} // UpdateState calls CC.UpdateState. func (r *Resolver) UpdateState(s resolver.State) { r.CC.UpdateState(s) } + +// ReportError calls CC.ReportError. +func (r *Resolver) ReportError(err error) { + r.CC.ReportError(err) +} diff --git a/xds/internal/balancer/clusterresolver/balancerconfig/type.go b/xds/internal/balancer/clusterresolver/balancerconfig/type.go index 2f9ba68fe59..3e47b8234e3 100644 --- a/xds/internal/balancer/clusterresolver/balancerconfig/type.go +++ b/xds/internal/balancer/clusterresolver/balancerconfig/type.go @@ -95,4 +95,46 @@ type DiscoveryMechanism struct { // This is used for EDS watch if set. If unset, Cluster is used for EDS // watch. EDSServiceName string `json:"edsServiceName,omitempty"` + // DNSHostname is the DNS name to resolve in "host:port" form. For type + // LOGICAL_DNS only. + DNSHostname string `json:"dnsHostname,omitempty"` +} + +// Equal returns whether the DiscoveryMechanism is the same with the parameter. +func (dm DiscoveryMechanism) Equal(b DiscoveryMechanism) bool { + switch { + case dm.Cluster != b.Cluster: + return false + case !equalStringP(dm.LoadReportingServerName, b.LoadReportingServerName): + return false + case !equalUint32P(dm.MaxConcurrentRequests, b.MaxConcurrentRequests): + return false + case dm.Type != b.Type: + return false + case dm.EDSServiceName != b.EDSServiceName: + return false + case dm.DNSHostname != b.DNSHostname: + return false + } + return true +} + +func equalStringP(a, b *string) bool { + if a == nil && b == nil { + return true + } + if a == nil || b == nil { + return false + } + return *a == *b +} + +func equalUint32P(a, b *uint32) bool { + if a == nil && b == nil { + return true + } + if a == nil || b == nil { + return false + } + return *a == *b } diff --git a/xds/internal/balancer/clusterresolver/clusterresolver_test.go b/xds/internal/balancer/clusterresolver/clusterresolver_test.go index ea8ac4e419f..8f3644d08be 100644 --- a/xds/internal/balancer/clusterresolver/clusterresolver_test.go +++ b/xds/internal/balancer/clusterresolver/clusterresolver_test.go @@ -231,7 +231,7 @@ func (s) TestSubConnStateChange(t *testing.T) { if _, err := xdsC.WaitForWatchEDS(ctx); err != nil { t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err) } - xdsC.InvokeWatchEDSCallback(defaultEndpointsUpdate, nil) + xdsC.InvokeWatchEDSCallback("", defaultEndpointsUpdate, nil) edsLB, err := waitForNewChildLB(ctx, edsLBCh) if err != nil { t.Fatal(err) @@ -276,7 +276,7 @@ func (s) TestErrorFromXDSClientUpdate(t *testing.T) { if _, err := xdsC.WaitForWatchEDS(ctx); err != nil { t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err) } - xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, nil) + xdsC.InvokeWatchEDSCallback("", xdsclient.EndpointsUpdate{}, nil) edsLB, err := waitForNewChildLB(ctx, edsLBCh) if err != nil { t.Fatal(err) @@ -286,11 +286,11 @@ func (s) TestErrorFromXDSClientUpdate(t *testing.T) { } connectionErr := xdsclient.NewErrorf(xdsclient.ErrorTypeConnection, "connection error") - xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, connectionErr) + xdsC.InvokeWatchEDSCallback("", xdsclient.EndpointsUpdate{}, connectionErr) sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) defer sCancel() - if err := xdsC.WaitForCancelEDSWatch(sCtx); err != context.DeadlineExceeded { + if _, err := xdsC.WaitForCancelEDSWatch(sCtx); err != context.DeadlineExceeded { t.Fatal("watch was canceled, want not canceled (timeout error)") } @@ -304,13 +304,13 @@ func (s) TestErrorFromXDSClientUpdate(t *testing.T) { } resourceErr := xdsclient.NewErrorf(xdsclient.ErrorTypeResourceNotFound, "clusterResolverBalancer resource not found error") - xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, resourceErr) + xdsC.InvokeWatchEDSCallback("", xdsclient.EndpointsUpdate{}, resourceErr) // Even if error is resource not found, watch shouldn't be canceled, because // this is an EDS resource removed (and xds client actually never sends this // error, but we still handles it). sCtx, sCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) defer sCancel() - if err := xdsC.WaitForCancelEDSWatch(sCtx); err != context.DeadlineExceeded { + if _, err := xdsC.WaitForCancelEDSWatch(sCtx); err != context.DeadlineExceeded { t.Fatal("watch was canceled, want not canceled (timeout error)") } if err := edsLB.waitForClientConnStateChange(sCtx); err != context.DeadlineExceeded { @@ -365,7 +365,7 @@ func (s) TestErrorFromResolver(t *testing.T) { if _, err := xdsC.WaitForWatchEDS(ctx); err != nil { t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err) } - xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, nil) + xdsC.InvokeWatchEDSCallback("", xdsclient.EndpointsUpdate{}, nil) edsLB, err := waitForNewChildLB(ctx, edsLBCh) if err != nil { t.Fatal(err) @@ -379,7 +379,7 @@ func (s) TestErrorFromResolver(t *testing.T) { sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) defer sCancel() - if err := xdsC.WaitForCancelEDSWatch(sCtx); err != context.DeadlineExceeded { + if _, err := xdsC.WaitForCancelEDSWatch(sCtx); err != context.DeadlineExceeded { t.Fatal("watch was canceled, want not canceled (timeout error)") } @@ -394,7 +394,7 @@ func (s) TestErrorFromResolver(t *testing.T) { resourceErr := xdsclient.NewErrorf(xdsclient.ErrorTypeResourceNotFound, "clusterResolverBalancer resource not found error") edsB.ResolverError(resourceErr) - if err := xdsC.WaitForCancelEDSWatch(ctx); err != nil { + if _, err := xdsC.WaitForCancelEDSWatch(ctx); err != nil { t.Fatalf("want watch to be canceled, waitForCancel failed: %v", err) } if err := edsLB.waitForClientConnStateChange(sCtx); err != context.DeadlineExceeded { @@ -423,7 +423,7 @@ func verifyExpectedRequests(ctx context.Context, fc *fakeclient.Client, resource for _, name := range resourceNames { if name == "" { // ResourceName empty string indicates a cancel. - if err := fc.WaitForCancelEDSWatch(ctx); err != nil { + if _, err := fc.WaitForCancelEDSWatch(ctx); err != nil { return fmt.Errorf("timed out when expecting resource %q", name) } continue diff --git a/xds/internal/balancer/clusterresolver/eds_impl_test.go b/xds/internal/balancer/clusterresolver/eds_impl_test.go index f0eae0afc3d..9a41fa9e2b3 100644 --- a/xds/internal/balancer/clusterresolver/eds_impl_test.go +++ b/xds/internal/balancer/clusterresolver/eds_impl_test.go @@ -103,7 +103,7 @@ func (s) TestEDS_OneLocality(t *testing.T) { // One locality with one backend. clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) - xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab1.Build()), nil) + xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil) sc1 := <-cc.NewSubConnCh edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) @@ -117,7 +117,7 @@ func (s) TestEDS_OneLocality(t *testing.T) { // The same locality, add one more backend. clab2 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) clab2.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:2], nil) - xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab2.Build()), nil) + xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab2.Build()), nil) sc2 := <-cc.NewSubConnCh edsb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) @@ -131,7 +131,7 @@ func (s) TestEDS_OneLocality(t *testing.T) { // The same locality, delete first backend. clab3 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) clab3.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[1:2], nil) - xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab3.Build()), nil) + xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab3.Build()), nil) scToRemove := <-cc.RemoveSubConnCh if !cmp.Equal(scToRemove, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { @@ -147,7 +147,7 @@ func (s) TestEDS_OneLocality(t *testing.T) { // The same locality, replace backend. clab4 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) clab4.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[2:3], nil) - xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab4.Build()), nil) + xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab4.Build()), nil) sc3 := <-cc.NewSubConnCh edsb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) @@ -166,7 +166,7 @@ func (s) TestEDS_OneLocality(t *testing.T) { // The same locality, different drop rate, dropping 50%. clab5 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], map[string]uint32{"test-drop": 50}) clab5.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[2:3], nil) - xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab5.Build()), nil) + xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab5.Build()), nil) // Picks with drops. if err := testPickerFromCh(cc.NewPickerCh, func(p balancer.Picker) error { @@ -188,7 +188,7 @@ func (s) TestEDS_OneLocality(t *testing.T) { // The same locality, remove drops. clab6 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) clab6.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[2:3], nil) - xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab6.Build()), nil) + xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab6.Build()), nil) // Pick without drops. if err := testRoundRobinPickerFromCh(cc.NewPickerCh, []balancer.SubConn{sc3}); err != nil { @@ -209,7 +209,7 @@ func (s) TestEDS_TwoLocalities(t *testing.T) { // Two localities, each with one backend. clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) - xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab1.Build()), nil) + xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil) sc1 := <-cc.NewSubConnCh edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) @@ -218,7 +218,7 @@ func (s) TestEDS_TwoLocalities(t *testing.T) { // locality. Otherwise the test is flaky because of a map is used in EDS to // keep localities. clab1.AddLocality(testSubZones[1], 1, 0, testEndpointAddrs[1:2], nil) - xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab1.Build()), nil) + xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil) sc2 := <-cc.NewSubConnCh edsb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) edsb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready}) @@ -233,7 +233,7 @@ func (s) TestEDS_TwoLocalities(t *testing.T) { clab2.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) clab2.AddLocality(testSubZones[1], 1, 0, testEndpointAddrs[1:2], nil) clab2.AddLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:3], nil) - xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab2.Build()), nil) + xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab2.Build()), nil) sc3 := <-cc.NewSubConnCh edsb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) @@ -248,7 +248,7 @@ func (s) TestEDS_TwoLocalities(t *testing.T) { clab3 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) clab3.AddLocality(testSubZones[1], 1, 0, testEndpointAddrs[1:2], nil) clab3.AddLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:3], nil) - xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab3.Build()), nil) + xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab3.Build()), nil) scToRemove := <-cc.RemoveSubConnCh if !cmp.Equal(scToRemove, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { @@ -265,7 +265,7 @@ func (s) TestEDS_TwoLocalities(t *testing.T) { clab4 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) clab4.AddLocality(testSubZones[1], 1, 0, testEndpointAddrs[1:2], nil) clab4.AddLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:4], nil) - xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab4.Build()), nil) + xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab4.Build()), nil) sc4 := <-cc.NewSubConnCh edsb.UpdateSubConnState(sc4, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) @@ -284,7 +284,7 @@ func (s) TestEDS_TwoLocalities(t *testing.T) { clab5 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) clab5.AddLocality(testSubZones[1], 2, 0, testEndpointAddrs[1:2], nil) clab5.AddLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:4], nil) - xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab5.Build()), nil) + xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab5.Build()), nil) // Test pick with two subconns different locality weight. // @@ -299,7 +299,7 @@ func (s) TestEDS_TwoLocalities(t *testing.T) { clab6 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) clab6.AddLocality(testSubZones[1], 0, 0, testEndpointAddrs[1:2], nil) clab6.AddLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:4], nil) - xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab6.Build()), nil) + xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab6.Build()), nil) // Changing weight of locality[1] to 0 caused it to be removed. It's subconn // should also be removed. @@ -349,7 +349,7 @@ func (s) TestEDS_EndpointsHealth(t *testing.T) { corepb.HealthStatus_DEGRADED, }, }) - xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab1.Build()), nil) + xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil) var ( readySCs []balancer.SubConn @@ -406,7 +406,7 @@ func (s) TestEDS_EmptyUpdate(t *testing.T) { defer func() { balancergroup.DefaultSubBalancerCloseTimeout = oldCacheTimeout }() // The first update is an empty update. - xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, nil) + xdsC.InvokeWatchEDSCallback("", xdsclient.EndpointsUpdate{}, nil) // Pick should fail with transient failure, and all priority removed error. if err := testErrPickerFromCh(cc.NewPickerCh, priority.ErrAllPrioritiesRemoved); err != nil { t.Fatal(err) @@ -415,7 +415,7 @@ func (s) TestEDS_EmptyUpdate(t *testing.T) { // One locality with one backend. clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) - xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab1.Build()), nil) + xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil) sc1 := <-cc.NewSubConnCh edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) @@ -426,7 +426,7 @@ func (s) TestEDS_EmptyUpdate(t *testing.T) { t.Fatal(err) } - xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, nil) + xdsC.InvokeWatchEDSCallback("", xdsclient.EndpointsUpdate{}, nil) // Pick should fail with transient failure, and all priority removed error. if err := testErrPickerFromCh(cc.NewPickerCh, priority.ErrAllPrioritiesRemoved); err != nil { t.Fatal(err) @@ -442,7 +442,7 @@ func (s) TestEDS_EmptyUpdate(t *testing.T) { edsb.UpdateSubConnState(scToRemove, balancer.SubConnState{ConnectivityState: connectivity.Shutdown}) // Handle another update with priorities and localities. - xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab1.Build()), nil) + xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil) sc2 := <-cc.NewSubConnCh edsb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) @@ -484,7 +484,7 @@ func (s) TestEDS_UpdateSubBalancerName(t *testing.T) { clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) clab1.AddLocality(testSubZones[1], 1, 0, testEndpointAddrs[1:2], nil) - xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab1.Build()), nil) + xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil) for i := 0; i < 2; i++ { sc := <-cc.NewSubConnCh @@ -579,7 +579,7 @@ func (s) TestEDS_CircuitBreaking(t *testing.T) { // One locality with one backend. clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) - xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab1.Build()), nil) + xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil) sc1 := <-cc.NewSubConnCh edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) diff --git a/xds/internal/balancer/clusterresolver/priority_test.go b/xds/internal/balancer/clusterresolver/priority_test.go index 711c3eeed0d..a4c6d5b1c65 100644 --- a/xds/internal/balancer/clusterresolver/priority_test.go +++ b/xds/internal/balancer/clusterresolver/priority_test.go @@ -44,7 +44,7 @@ func (s) TestEDSPriority_HighPriorityReady(t *testing.T) { clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) clab1.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) - xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab1.Build()), nil) + xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil) addrs1 := <-cc.NewSubConnAddrsCh if got, want := addrs1[0].Addr, testEndpointAddrs[0]; got != want { @@ -66,7 +66,7 @@ func (s) TestEDSPriority_HighPriorityReady(t *testing.T) { clab2.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) clab2.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) clab2.AddLocality(testSubZones[2], 1, 2, testEndpointAddrs[2:3], nil) - xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab2.Build()), nil) + xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab2.Build()), nil) select { case <-cc.NewPickerCh: @@ -82,7 +82,7 @@ func (s) TestEDSPriority_HighPriorityReady(t *testing.T) { clab3 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) clab3.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) clab3.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) - xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab3.Build()), nil) + xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab3.Build()), nil) select { case <-cc.NewPickerCh: @@ -107,7 +107,7 @@ func (s) TestEDSPriority_SwitchPriority(t *testing.T) { clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) clab1.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) - xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab1.Build()), nil) + xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil) addrs0 := <-cc.NewSubConnAddrsCh if got, want := addrs0[0].Addr, testEndpointAddrs[0]; got != want { @@ -144,7 +144,7 @@ func (s) TestEDSPriority_SwitchPriority(t *testing.T) { clab2.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) clab2.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) clab2.AddLocality(testSubZones[2], 1, 2, testEndpointAddrs[2:3], nil) - xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab2.Build()), nil) + xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab2.Build()), nil) select { case <-cc.NewPickerCh: @@ -175,7 +175,7 @@ func (s) TestEDSPriority_SwitchPriority(t *testing.T) { clab3 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) clab3.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) clab3.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) - xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab3.Build()), nil) + xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab3.Build()), nil) // p2 SubConns are removed. scToRemove := <-cc.RemoveSubConnCh @@ -200,7 +200,7 @@ func (s) TestEDSPriority_HigherDownWhileAddingLower(t *testing.T) { clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) clab1.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) - xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab1.Build()), nil) + xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil) addrs0 := <-cc.NewSubConnAddrsCh if got, want := addrs0[0].Addr, testEndpointAddrs[0]; got != want { t.Fatalf("sc is created with addr %v, want %v", got, want) @@ -227,7 +227,7 @@ func (s) TestEDSPriority_HigherDownWhileAddingLower(t *testing.T) { clab2.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) clab2.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) clab2.AddLocality(testSubZones[2], 1, 2, testEndpointAddrs[2:3], nil) - xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab2.Build()), nil) + xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab2.Build()), nil) addrs2 := <-cc.NewSubConnAddrsCh if got, want := addrs2[0].Addr, testEndpointAddrs[2]; got != want { t.Fatalf("sc is created with addr %v, want %v", got, want) @@ -254,7 +254,7 @@ func (s) TestEDSPriority_HigherReadyCloseAllLower(t *testing.T) { clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) clab1.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) clab1.AddLocality(testSubZones[2], 1, 2, testEndpointAddrs[2:3], nil) - xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab1.Build()), nil) + xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil) addrs0 := <-cc.NewSubConnAddrsCh if got, want := addrs0[0].Addr, testEndpointAddrs[0]; got != want { t.Fatalf("sc is created with addr %v, want %v", got, want) @@ -344,7 +344,7 @@ func (s) TestEDSPriority_InitTimeout(t *testing.T) { clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) clab1.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) - xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab1.Build()), nil) + xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil) addrs0 := <-cc.NewSubConnAddrsCh if got, want := addrs0[0].Addr, testEndpointAddrs[0]; got != want { t.Fatalf("sc is created with addr %v, want %v", got, want) @@ -387,7 +387,7 @@ func (s) TestEDSPriority_MultipleLocalities(t *testing.T) { clab0 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) clab0.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) clab0.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) - xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab0.Build()), nil) + xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab0.Build()), nil) addrs0 := <-cc.NewSubConnAddrsCh if got, want := addrs0[0].Addr, testEndpointAddrs[0]; got != want { t.Fatalf("sc is created with addr %v, want %v", got, want) @@ -436,7 +436,7 @@ func (s) TestEDSPriority_MultipleLocalities(t *testing.T) { clab1.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) clab1.AddLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:3], nil) clab1.AddLocality(testSubZones[3], 1, 1, testEndpointAddrs[3:4], nil) - xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab1.Build()), nil) + xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil) addrs2 := <-cc.NewSubConnAddrsCh if got, want := addrs2[0].Addr, testEndpointAddrs[2]; got != want { t.Fatalf("sc is created with addr %v, want %v", got, want) @@ -484,7 +484,7 @@ func (s) TestEDSPriority_RemovesAllLocalities(t *testing.T) { clab0 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) clab0.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) clab0.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) - xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab0.Build()), nil) + xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab0.Build()), nil) addrs0 := <-cc.NewSubConnAddrsCh if got, want := addrs0[0].Addr, testEndpointAddrs[0]; got != want { t.Fatalf("sc is created with addr %v, want %v", got, want) @@ -500,7 +500,7 @@ func (s) TestEDSPriority_RemovesAllLocalities(t *testing.T) { // Remove all priorities. clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) - xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab1.Build()), nil) + xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil) // p0 subconn should be removed. scToRemove := <-cc.RemoveSubConnCh <-cc.RemoveSubConnCh // Drain the duplicate subconn removed. @@ -519,7 +519,7 @@ func (s) TestEDSPriority_RemovesAllLocalities(t *testing.T) { clab2 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) clab2.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[2:3], nil) clab2.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[3:4], nil) - xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab2.Build()), nil) + xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab2.Build()), nil) addrs01 := <-cc.NewSubConnAddrsCh if got, want := addrs01[0].Addr, testEndpointAddrs[2]; got != want { t.Fatalf("sc is created with addr %v, want %v", got, want) @@ -547,7 +547,7 @@ func (s) TestEDSPriority_RemovesAllLocalities(t *testing.T) { // Remove p1 from EDS, to fallback to p0. clab3 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) clab3.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[2:3], nil) - xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab3.Build()), nil) + xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab3.Build()), nil) // p1 subconn should be removed. scToRemove1 := <-cc.RemoveSubConnCh @@ -591,7 +591,7 @@ func (s) TestEDSPriority_HighPriorityNoEndpoints(t *testing.T) { clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) clab1.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) - xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab1.Build()), nil) + xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil) addrs1 := <-cc.NewSubConnAddrsCh if got, want := addrs1[0].Addr, testEndpointAddrs[0]; got != want { t.Fatalf("sc is created with addr %v, want %v", got, want) @@ -611,7 +611,7 @@ func (s) TestEDSPriority_HighPriorityNoEndpoints(t *testing.T) { clab2 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) clab2.AddLocality(testSubZones[0], 1, 0, nil, nil) clab2.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) - xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab2.Build()), nil) + xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab2.Build()), nil) // p0 will remove the subconn, and ClientConn will send a sc update to // shutdown. scToRemove := <-cc.RemoveSubConnCh @@ -642,7 +642,7 @@ func (s) TestEDSPriority_HighPriorityAllUnhealthy(t *testing.T) { clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) clab1.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) - xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab1.Build()), nil) + xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil) addrs1 := <-cc.NewSubConnAddrsCh if got, want := addrs1[0].Addr, testEndpointAddrs[0]; got != want { t.Fatalf("sc is created with addr %v, want %v", got, want) @@ -664,7 +664,7 @@ func (s) TestEDSPriority_HighPriorityAllUnhealthy(t *testing.T) { Health: []corepb.HealthStatus{corepb.HealthStatus_UNHEALTHY}, }) clab2.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) - xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab2.Build()), nil) + xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab2.Build()), nil) // p0 will remove the subconn, and ClientConn will send a sc update to // transient failure. scToRemove := <-cc.RemoveSubConnCh @@ -702,10 +702,10 @@ func (s) TestEDSPriority_FirstPriorityRemoved(t *testing.T) { // One localities, with priorities [0], each with one backend. clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) - xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab1.Build()), nil) + xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil) // Remove the only localities. clab2 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) - xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab2.Build()), nil) + xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab2.Build()), nil) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() if err := cc.WaitForErrPicker(ctx); err != nil { diff --git a/xds/internal/balancer/clusterresolver/resource_resolver.go b/xds/internal/balancer/clusterresolver/resource_resolver.go new file mode 100644 index 00000000000..29aed0e72f4 --- /dev/null +++ b/xds/internal/balancer/clusterresolver/resource_resolver.go @@ -0,0 +1,248 @@ +/* + * + * Copyright 2021 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package clusterresolver + +import ( + "sync" + + "google.golang.org/grpc/xds/internal/balancer/clusterresolver/balancerconfig" + "google.golang.org/grpc/xds/internal/xdsclient" +) + +// resourceUpdate is a combined update from all the resources, in the order of +// priority. For example, it can be {EDS, EDS, DNS}. +type resourceUpdate struct { + p []balancerconfig.PriorityConfig + err error +} + +type discoveryMechanism interface { + lastUpdate() (interface{}, bool) + resolveNow() + stop() +} + +// discoveryMechanismKey is {type+resource_name}, it's used as the map key, so +// that the same resource resolver can be reused (e.g. when there are two +// mechanisms, both for the same EDS resource, but has different circuit +// breaking config. +type discoveryMechanismKey struct { + typ balancerconfig.DiscoveryMechanismType + name string +} + +// resolverMechanismTuple is needed to keep the resolver and the discovery +// mechanism together, because resolvers can be shared. And we need the +// mechanism for fields like circuit breaking, LRS etc when generating the +// balancer config. +type resolverMechanismTuple struct { + dm balancerconfig.DiscoveryMechanism + dmKey discoveryMechanismKey + r discoveryMechanism +} + +type resourceResolver struct { + parent *clusterResolverBalancer + updateChannel chan *resourceUpdate + + // mu protects the slice and map, and content of the resolvers in the slice. + mu sync.Mutex + mechanisms []balancerconfig.DiscoveryMechanism + children []resolverMechanismTuple + childrenMap map[discoveryMechanismKey]discoveryMechanism +} + +func newResourceResolver(parent *clusterResolverBalancer) *resourceResolver { + return &resourceResolver{ + parent: parent, + updateChannel: make(chan *resourceUpdate, 1), + childrenMap: make(map[discoveryMechanismKey]discoveryMechanism), + } +} + +func equalDiscoveryMechanisms(a, b []balancerconfig.DiscoveryMechanism) bool { + if len(a) != len(b) { + return false + } + for i, aa := range a { + bb := b[i] + if !aa.Equal(bb) { + return false + } + } + return true +} + +func (rr *resourceResolver) updateMechanisms(mechanisms []balancerconfig.DiscoveryMechanism) { + rr.mu.Lock() + defer rr.mu.Unlock() + if equalDiscoveryMechanisms(rr.mechanisms, mechanisms) { + return + } + rr.mechanisms = mechanisms + rr.children = make([]resolverMechanismTuple, len(mechanisms)) + newDMs := make(map[discoveryMechanismKey]bool) + + // Start one watch for each new discover mechanism {type+resource_name}. + for i, dm := range mechanisms { + switch dm.Type { + case balancerconfig.DiscoveryMechanismTypeEDS: + // If EDSServiceName is not set, use the cluster name as EDS service + // name to watch. + nameToWatch := dm.EDSServiceName + if nameToWatch == "" { + nameToWatch = dm.Cluster + } + dmKey := discoveryMechanismKey{typ: dm.Type, name: nameToWatch} + newDMs[dmKey] = true + + r := rr.childrenMap[dmKey] + if r == nil { + r = newEDSResolver(nameToWatch, rr.parent.xdsClient, rr) + rr.childrenMap[dmKey] = r + } + rr.children[i] = resolverMechanismTuple{dm: dm, dmKey: dmKey, r: r} + case balancerconfig.DiscoveryMechanismTypeLogicalDNS: + // Name to resolve in DNS is the hostname, not the ClientConn + // target. + dmKey := discoveryMechanismKey{typ: dm.Type, name: dm.DNSHostname} + newDMs[dmKey] = true + + r := rr.childrenMap[dmKey] + if r == nil { + r = newDNSResolver(dm.DNSHostname, rr) + rr.childrenMap[dmKey] = r + } + rr.children[i] = resolverMechanismTuple{dm: dm, dmKey: dmKey, r: r} + } + } + // Stop the resources that were removed. + for dm, r := range rr.childrenMap { + if !newDMs[dm] { + delete(rr.childrenMap, dm) + r.stop() + } + } + // Regenerate even if there's no change in discovery mechanism, in case + // priority order changed. + rr.generate() +} + +// resolveNow is typically called to trigger re-resolve of DNS. The EDS +// resolveNow() is a noop. +func (rr *resourceResolver) resolveNow() { + rr.mu.Lock() + defer rr.mu.Unlock() + for _, r := range rr.childrenMap { + r.resolveNow() + } +} + +func (rr *resourceResolver) stop() { + rr.mu.Lock() + defer rr.mu.Unlock() + for dm, r := range rr.childrenMap { + delete(rr.childrenMap, dm) + r.stop() + } + rr.mechanisms = nil + rr.children = nil +} + +// generate collects all the updates from all the resolvers, and push the +// combined result into the update channel. It only pushes the update when all +// the child resolvers have received at least one update, otherwise it will +// wait. +// +// caller must hold rr.mu. +func (rr *resourceResolver) generate() { + var ret []balancerconfig.PriorityConfig + for _, rDM := range rr.children { + r, ok := rr.childrenMap[rDM.dmKey] + if !ok { + rr.parent.logger.Infof("resolver for %+v not found, should never happen", rDM.dmKey) + continue + } + + u, ok := r.lastUpdate() + if !ok { + // Don't send updates to parent until all resolvers have update to + // send. + return + } + switch uu := u.(type) { + case xdsclient.EndpointsUpdate: + ret = append(ret, balancerconfig.PriorityConfig{Mechanism: rDM.dm, EDSResp: uu}) + case []string: + ret = append(ret, balancerconfig.PriorityConfig{Mechanism: rDM.dm, Addresses: uu}) + } + } + select { + case <-rr.updateChannel: + default: + } + rr.updateChannel <- &resourceUpdate{p: ret} +} + +type edsDiscoveryMechanism struct { + cancel func() + + update xdsclient.EndpointsUpdate + updateReceived bool +} + +func (er *edsDiscoveryMechanism) lastUpdate() (interface{}, bool) { + if !er.updateReceived { + return nil, false + } + return er.update, true +} + +func (er *edsDiscoveryMechanism) resolveNow() { +} + +func (er *edsDiscoveryMechanism) stop() { + er.cancel() +} + +// newEDSResolver starts the EDS watch on the given xds client. +func newEDSResolver(nameToWatch string, xdsc xdsclient.XDSClient, topLevelResolver *resourceResolver) *edsDiscoveryMechanism { + ret := &edsDiscoveryMechanism{} + topLevelResolver.parent.logger.Infof("EDS watch started on %v", nameToWatch) + cancel := xdsc.WatchEndpoints(nameToWatch, func(update xdsclient.EndpointsUpdate, err error) { + topLevelResolver.mu.Lock() + defer topLevelResolver.mu.Unlock() + if err != nil { + select { + case <-topLevelResolver.updateChannel: + default: + } + topLevelResolver.updateChannel <- &resourceUpdate{err: err} + return + } + ret.update = update + ret.updateReceived = true + topLevelResolver.generate() + }) + ret.cancel = func() { + topLevelResolver.parent.logger.Infof("EDS watch canceled on %v", nameToWatch) + cancel() + } + return ret +} diff --git a/xds/internal/balancer/clusterresolver/resource_resolver_dns.go b/xds/internal/balancer/clusterresolver/resource_resolver_dns.go new file mode 100644 index 00000000000..7a639f51a5d --- /dev/null +++ b/xds/internal/balancer/clusterresolver/resource_resolver_dns.go @@ -0,0 +1,114 @@ +/* + * + * Copyright 2021 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package clusterresolver + +import ( + "fmt" + + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/serviceconfig" +) + +var ( + newDNS = func(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { + // The dns resolver is registered by the grpc package. So, this call to + // resolver.Get() is never expected to return nil. + return resolver.Get("dns").Build(target, cc, opts) + } +) + +// dnsDiscoveryMechanism watches updates for the given DNS hostname. +// +// It implements resolver.ClientConn interface to work with the DNS resolver. +type dnsDiscoveryMechanism struct { + target string + topLevelResolver *resourceResolver + r resolver.Resolver + + addrs []string + updateReceived bool +} + +func newDNSResolver(target string, topLevelResolver *resourceResolver) *dnsDiscoveryMechanism { + ret := &dnsDiscoveryMechanism{ + target: target, + topLevelResolver: topLevelResolver, + } + r, err := newDNS(resolver.Target{Scheme: "dns", Endpoint: target}, ret, resolver.BuildOptions{}) + if err != nil { + select { + case <-topLevelResolver.updateChannel: + default: + } + topLevelResolver.updateChannel <- &resourceUpdate{err: err} + } + ret.r = r + return ret +} + +func (dr *dnsDiscoveryMechanism) lastUpdate() (interface{}, bool) { + if !dr.updateReceived { + return nil, false + } + return dr.addrs, true +} + +func (dr *dnsDiscoveryMechanism) resolveNow() { + dr.r.ResolveNow(resolver.ResolveNowOptions{}) +} + +func (dr *dnsDiscoveryMechanism) stop() { + dr.r.Close() +} + +// dnsDiscoveryMechanism needs to implement resolver.ClientConn interface to receive +// updates from the real DNS resolver. + +func (dr *dnsDiscoveryMechanism) UpdateState(state resolver.State) error { + dr.topLevelResolver.mu.Lock() + defer dr.topLevelResolver.mu.Unlock() + addrs := make([]string, len(state.Addresses)) + for i, a := range state.Addresses { + addrs[i] = a.Addr + } + dr.addrs = addrs + dr.updateReceived = true + dr.topLevelResolver.generate() + return nil +} + +func (dr *dnsDiscoveryMechanism) ReportError(err error) { + select { + case <-dr.topLevelResolver.updateChannel: + default: + } + dr.topLevelResolver.updateChannel <- &resourceUpdate{err: err} +} + +func (dr *dnsDiscoveryMechanism) NewAddress(addresses []resolver.Address) { + dr.UpdateState(resolver.State{Addresses: addresses}) +} + +func (dr *dnsDiscoveryMechanism) NewServiceConfig(string) { + // This method is deprecated, and service config isn't supported. +} + +func (dr *dnsDiscoveryMechanism) ParseServiceConfig(string) *serviceconfig.ParseResult { + return &serviceconfig.ParseResult{Err: fmt.Errorf("service config not supported")} +} diff --git a/xds/internal/balancer/clusterresolver/resource_resolver_test.go b/xds/internal/balancer/clusterresolver/resource_resolver_test.go new file mode 100644 index 00000000000..9a943815509 --- /dev/null +++ b/xds/internal/balancer/clusterresolver/resource_resolver_test.go @@ -0,0 +1,873 @@ +// +build go1.12 + +/* + * + * Copyright 2021 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package clusterresolver + +import ( + "context" + "fmt" + "testing" + + "github.com/google/go-cmp/cmp" + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/resolver/manual" + "google.golang.org/grpc/xds/internal/balancer/clusterresolver/balancerconfig" + "google.golang.org/grpc/xds/internal/testutils" + "google.golang.org/grpc/xds/internal/testutils/fakeclient" + xdsclient "google.golang.org/grpc/xds/internal/xdsclient" +) + +const ( + testDNSTarget = "dns.com" +) + +var ( + testEDSUpdates []xdsclient.EndpointsUpdate +) + +func init() { + clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) + clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) + testEDSUpdates = append(testEDSUpdates, parseEDSRespProtoForTesting(clab1.Build())) + clab2 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) + clab2.AddLocality(testSubZones[1], 1, 0, testEndpointAddrs[1:2], nil) + testEDSUpdates = append(testEDSUpdates, parseEDSRespProtoForTesting(clab2.Build())) +} + +// Test the simple case with one EDS resource to watch. +func (s) TestResourceResolverOneEDSResource(t *testing.T) { + for _, test := range []struct { + name string + clusterName, edsName string + wantName string + edsUpdate xdsclient.EndpointsUpdate + want []balancerconfig.PriorityConfig + }{ + {name: "watch EDS", + clusterName: testClusterName, + edsName: testServiceName, + wantName: testServiceName, + edsUpdate: testEDSUpdates[0], + want: []balancerconfig.PriorityConfig{{ + Mechanism: balancerconfig.DiscoveryMechanism{ + Type: balancerconfig.DiscoveryMechanismTypeEDS, + Cluster: testClusterName, + EDSServiceName: testServiceName, + }, + EDSResp: testEDSUpdates[0], + }}, + }, + { + name: "watch EDS no EDS name", // Will watch for cluster name. + clusterName: testClusterName, + wantName: testClusterName, + edsUpdate: testEDSUpdates[1], + want: []balancerconfig.PriorityConfig{{ + Mechanism: balancerconfig.DiscoveryMechanism{ + Type: balancerconfig.DiscoveryMechanismTypeEDS, + Cluster: testClusterName, + }, + EDSResp: testEDSUpdates[1], + }}, + }, + } { + t.Run(test.name, func(t *testing.T) { + fakeClient := fakeclient.NewClient() + rr := newResourceResolver(&clusterResolverBalancer{xdsClient: fakeClient}) + rr.updateMechanisms([]balancerconfig.DiscoveryMechanism{{ + Type: balancerconfig.DiscoveryMechanismTypeEDS, + Cluster: test.clusterName, + EDSServiceName: test.edsName, + }}) + ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer ctxCancel() + gotEDSName, err := fakeClient.WaitForWatchEDS(ctx) + if err != nil { + t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) + } + if gotEDSName != test.wantName { + t.Fatalf("xdsClient.WatchEDS called for cluster: %v, want: %v", gotEDSName, test.wantName) + } + + // Invoke callback, should get an update. + fakeClient.InvokeWatchEDSCallback("", test.edsUpdate, nil) + select { + case u := <-rr.updateChannel: + if diff := cmp.Diff(u.p, test.want); diff != "" { + t.Fatalf("got unexpected resource update, diff (-got, +want): %v", diff) + } + case <-ctx.Done(): + t.Fatal("Timed out waiting for update from update channel.") + } + // Close the resource resolver. Should stop EDS watch. + rr.stop() + edsNameCanceled, err := fakeClient.WaitForCancelEDSWatch(ctx) + if err != nil { + t.Fatalf("xdsClient.CancelCDS failed with error: %v", err) + } + if edsNameCanceled != test.wantName { + t.Fatalf("xdsClient.CancelEDS called for %v, want: %v", edsNameCanceled, testServiceName) + } + }) + } +} + +func setupDNS() (chan resolver.Target, chan struct{}, chan resolver.ResolveNowOptions, *manual.Resolver, func()) { + dnsTargetCh := make(chan resolver.Target, 1) + dnsCloseCh := make(chan struct{}, 1) + resolveNowCh := make(chan resolver.ResolveNowOptions, 1) + + mr := manual.NewBuilderWithScheme("dns") + mr.BuildCallback = func(target resolver.Target, _ resolver.ClientConn, _ resolver.BuildOptions) { dnsTargetCh <- target } + mr.CloseCallback = func() { dnsCloseCh <- struct{}{} } + mr.ResolveNowCallback = func(opts resolver.ResolveNowOptions) { resolveNowCh <- opts } + oldNewDNS := newDNS + newDNS = func(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { + return mr.Build(target, cc, opts) + } + return dnsTargetCh, dnsCloseCh, resolveNowCh, mr, func() { newDNS = oldNewDNS } +} + +// Test the simple case of one DNS resolver. +func (s) TestResourceResolverOneDNSResource(t *testing.T) { + for _, test := range []struct { + name string + target string + wantTarget resolver.Target + addrs []resolver.Address + want []balancerconfig.PriorityConfig + }{ + { + name: "watch DNS", + target: testDNSTarget, + wantTarget: resolver.Target{Scheme: "dns", Endpoint: testDNSTarget}, + addrs: []resolver.Address{{Addr: "1.1.1.1"}, {Addr: "2.2.2.2"}}, + want: []balancerconfig.PriorityConfig{{ + Mechanism: balancerconfig.DiscoveryMechanism{ + Type: balancerconfig.DiscoveryMechanismTypeLogicalDNS, + DNSHostname: testDNSTarget, + }, + Addresses: []string{"1.1.1.1", "2.2.2.2"}, + }}, + }, + } { + t.Run(test.name, func(t *testing.T) { + dnsTargetCh, dnsCloseCh, _, dnsR, cleanup := setupDNS() + defer cleanup() + fakeClient := fakeclient.NewClient() + rr := newResourceResolver(&clusterResolverBalancer{xdsClient: fakeClient}) + rr.updateMechanisms([]balancerconfig.DiscoveryMechanism{{ + Type: balancerconfig.DiscoveryMechanismTypeLogicalDNS, + DNSHostname: test.target, + }}) + ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer ctxCancel() + select { + case target := <-dnsTargetCh: + if diff := cmp.Diff(target, test.wantTarget); diff != "" { + t.Fatalf("got unexpected DNS target to watch, diff (-got, +want): %v", diff) + } + case <-ctx.Done(): + t.Fatal("Timed out waiting for building DNS resolver") + } + + // Invoke callback, should get an update. + dnsR.UpdateState(resolver.State{Addresses: test.addrs}) + select { + case u := <-rr.updateChannel: + if diff := cmp.Diff(u.p, test.want); diff != "" { + t.Fatalf("got unexpected resource update, diff (-got, +want): %v", diff) + } + case <-ctx.Done(): + t.Fatal("Timed out waiting for update from update channel.") + } + // Close the resource resolver. Should close the underlying resolver. + rr.stop() + select { + case <-dnsCloseCh: + case <-ctx.Done(): + t.Fatal("Timed out waiting for closing DNS resolver") + } + }) + } +} + +// Test that changing EDS name would cause a cancel and a new watch. +// +// Also, changes that don't actually change EDS names (e.g. changing cluster +// name but not service name, or change circuit breaking count) doesn't do +// anything. +// +// - update DiscoveryMechanism +// - same EDS name to watch, but different MaxCurrentCount: no new watch +// - different cluster name, but same EDS name: no new watch +func (s) TestResourceResolverChangeEDSName(t *testing.T) { + fakeClient := fakeclient.NewClient() + rr := newResourceResolver(&clusterResolverBalancer{xdsClient: fakeClient}) + rr.updateMechanisms([]balancerconfig.DiscoveryMechanism{{ + Type: balancerconfig.DiscoveryMechanismTypeEDS, + Cluster: testClusterName, + EDSServiceName: testServiceName, + }}) + ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer ctxCancel() + gotEDSName1, err := fakeClient.WaitForWatchEDS(ctx) + if err != nil { + t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) + } + if gotEDSName1 != testServiceName { + t.Fatalf("xdsClient.WatchEDS called for cluster: %v, want: %v", gotEDSName1, testServiceName) + } + + // Invoke callback, should get an update. + fakeClient.InvokeWatchEDSCallback(gotEDSName1, testEDSUpdates[0], nil) + select { + case u := <-rr.updateChannel: + if diff := cmp.Diff(u.p, []balancerconfig.PriorityConfig{{ + Mechanism: balancerconfig.DiscoveryMechanism{ + Type: balancerconfig.DiscoveryMechanismTypeEDS, + Cluster: testClusterName, + EDSServiceName: testServiceName, + }, + EDSResp: testEDSUpdates[0], + }}); diff != "" { + t.Fatalf("got unexpected resource update, diff (-got, +want): %v", diff) + } + case <-ctx.Done(): + t.Fatal("Timed out waiting for update from update channel.") + } + + // Change name to watch. + rr.updateMechanisms([]balancerconfig.DiscoveryMechanism{{ + Type: balancerconfig.DiscoveryMechanismTypeEDS, + Cluster: testClusterName, + }}) + edsNameCanceled1, err := fakeClient.WaitForCancelEDSWatch(ctx) + if err != nil { + t.Fatalf("xdsClient.CancelCDS failed with error: %v", err) + } + if edsNameCanceled1 != gotEDSName1 { + t.Fatalf("xdsClient.CancelEDS called for %v, want: %v", edsNameCanceled1, testServiceName) + } + gotEDSName2, err := fakeClient.WaitForWatchEDS(ctx) + if err != nil { + t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) + } + if gotEDSName2 != testClusterName { + t.Fatalf("xdsClient.WatchEDS called for cluster: %v, want: %v", gotEDSName2, testClusterName) + } + // Shouldn't get any update, because the new resource hasn't received any + // update. + shortCtx, shortCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer shortCancel() + select { + case u := <-rr.updateChannel: + t.Fatalf("get unexpected update %+v", u) + case <-shortCtx.Done(): + } + + // Invoke callback, should get an update. + fakeClient.InvokeWatchEDSCallback(gotEDSName2, testEDSUpdates[1], nil) + select { + case u := <-rr.updateChannel: + if diff := cmp.Diff(u.p, []balancerconfig.PriorityConfig{{ + Mechanism: balancerconfig.DiscoveryMechanism{ + Type: balancerconfig.DiscoveryMechanismTypeEDS, + Cluster: testClusterName, + }, + EDSResp: testEDSUpdates[1], + }}); diff != "" { + t.Fatalf("got unexpected resource update, diff (-got, +want): %v", diff) + } + case <-ctx.Done(): + t.Fatal("Timed out waiting for update from update channel.") + } + + // Change circuit breaking count, should get an update with new circuit + // breaking count, but shouldn't trigger new watch. + rr.updateMechanisms([]balancerconfig.DiscoveryMechanism{{ + Type: balancerconfig.DiscoveryMechanismTypeEDS, + Cluster: testClusterName, + MaxConcurrentRequests: newUint32(123), + }}) + shortCtx, shortCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer shortCancel() + if n, err := fakeClient.WaitForWatchEDS(shortCtx); err == nil { + t.Fatalf("unexpected watch started for EDS: %v", n) + } + select { + case u := <-rr.updateChannel: + if diff := cmp.Diff(u.p, []balancerconfig.PriorityConfig{{ + Mechanism: balancerconfig.DiscoveryMechanism{ + Type: balancerconfig.DiscoveryMechanismTypeEDS, + Cluster: testClusterName, + MaxConcurrentRequests: newUint32(123), + }, + EDSResp: testEDSUpdates[1], + }}); diff != "" { + t.Fatalf("got unexpected resource update, diff (-got, +want): %v", diff) + } + case <-ctx.Done(): + t.Fatal("Timed out waiting for update from update channel.") + } + + // Close the resource resolver. Should stop EDS watch. + rr.stop() + edsNameCanceled, err := fakeClient.WaitForCancelEDSWatch(ctx) + if err != nil { + t.Fatalf("xdsClient.CancelCDS failed with error: %v", err) + } + if edsNameCanceled != gotEDSName2 { + t.Fatalf("xdsClient.CancelEDS called for %v, want: %v", edsNameCanceled, gotEDSName2) + } +} + +// Test the case that same resources with the same priority should not add new +// EDS watch, and also should not trigger an update. +func (s) TestResourceResolverNoChangeNoUpdate(t *testing.T) { + fakeClient := fakeclient.NewClient() + rr := newResourceResolver(&clusterResolverBalancer{xdsClient: fakeClient}) + rr.updateMechanisms([]balancerconfig.DiscoveryMechanism{ + { + Type: balancerconfig.DiscoveryMechanismTypeEDS, + Cluster: testClusterNames[0], + }, + { + Type: balancerconfig.DiscoveryMechanismTypeEDS, + Cluster: testClusterNames[1], + MaxConcurrentRequests: newUint32(100), + }, + }) + ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer ctxCancel() + gotEDSName1, err := fakeClient.WaitForWatchEDS(ctx) + if err != nil { + t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) + } + if gotEDSName1 != testClusterNames[0] { + t.Fatalf("xdsClient.WatchEDS called for cluster: %v, want: %v", gotEDSName1, testClusterNames[0]) + } + gotEDSName2, err := fakeClient.WaitForWatchEDS(ctx) + if err != nil { + t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) + } + if gotEDSName2 != testClusterNames[1] { + t.Fatalf("xdsClient.WatchEDS called for cluster: %v, want: %v", gotEDSName2, testClusterNames[1]) + } + + // Invoke callback, should get an update. + fakeClient.InvokeWatchEDSCallback(gotEDSName1, testEDSUpdates[0], nil) + // Shouldn't send update, because only one resource received an update. + shortCtx, shortCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer shortCancel() + select { + case u := <-rr.updateChannel: + t.Fatalf("get unexpected update %+v", u) + case <-shortCtx.Done(): + } + fakeClient.InvokeWatchEDSCallback(gotEDSName2, testEDSUpdates[1], nil) + select { + case u := <-rr.updateChannel: + if diff := cmp.Diff(u.p, []balancerconfig.PriorityConfig{ + { + Mechanism: balancerconfig.DiscoveryMechanism{ + Type: balancerconfig.DiscoveryMechanismTypeEDS, + Cluster: testClusterNames[0], + }, + EDSResp: testEDSUpdates[0], + }, + { + Mechanism: balancerconfig.DiscoveryMechanism{ + Type: balancerconfig.DiscoveryMechanismTypeEDS, + Cluster: testClusterNames[1], + MaxConcurrentRequests: newUint32(100), + }, + EDSResp: testEDSUpdates[1], + }, + }); diff != "" { + t.Fatalf("got unexpected resource update, diff (-got, +want): %v", diff) + } + case <-ctx.Done(): + t.Fatal("Timed out waiting for update from update channel.") + } + + // Send the same resources with the same priorities, shouldn't any change. + rr.updateMechanisms([]balancerconfig.DiscoveryMechanism{ + { + Type: balancerconfig.DiscoveryMechanismTypeEDS, + Cluster: testClusterNames[0], + }, + { + Type: balancerconfig.DiscoveryMechanismTypeEDS, + Cluster: testClusterNames[1], + MaxConcurrentRequests: newUint32(100), + }, + }) + shortCtx, shortCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer shortCancel() + if n, err := fakeClient.WaitForWatchEDS(shortCtx); err == nil { + t.Fatalf("unexpected watch started for EDS: %v", n) + } + shortCtx, shortCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer shortCancel() + select { + case u := <-rr.updateChannel: + t.Fatalf("unexpected update: %+v", u) + case <-shortCtx.Done(): + } + + // Close the resource resolver. Should stop EDS watch. + rr.stop() + edsNameCanceled1, err := fakeClient.WaitForCancelEDSWatch(ctx) + if err != nil { + t.Fatalf("xdsClient.CancelCDS failed with error: %v", err) + } + if edsNameCanceled1 != gotEDSName1 && edsNameCanceled1 != gotEDSName2 { + t.Fatalf("xdsClient.CancelEDS called for %v, want: %v or %v", edsNameCanceled1, gotEDSName1, gotEDSName2) + } + edsNameCanceled2, err := fakeClient.WaitForCancelEDSWatch(ctx) + if err != nil { + t.Fatalf("xdsClient.CancelCDS failed with error: %v", err) + } + if edsNameCanceled2 != gotEDSName2 && edsNameCanceled2 != gotEDSName1 { + t.Fatalf("xdsClient.CancelEDS called for %v, want: %v or %v", edsNameCanceled2, gotEDSName1, gotEDSName2) + } +} + +// Test the case that same resources are watched, but with different priority. +// Should not add new EDS watch, but should trigger an update with the new +// priorities. +func (s) TestResourceResolverChangePriority(t *testing.T) { + fakeClient := fakeclient.NewClient() + rr := newResourceResolver(&clusterResolverBalancer{xdsClient: fakeClient}) + rr.updateMechanisms([]balancerconfig.DiscoveryMechanism{ + { + Type: balancerconfig.DiscoveryMechanismTypeEDS, + Cluster: testClusterNames[0], + }, + { + Type: balancerconfig.DiscoveryMechanismTypeEDS, + Cluster: testClusterNames[1], + }, + }) + ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer ctxCancel() + gotEDSName1, err := fakeClient.WaitForWatchEDS(ctx) + if err != nil { + t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) + } + if gotEDSName1 != testClusterNames[0] { + t.Fatalf("xdsClient.WatchEDS called for cluster: %v, want: %v", gotEDSName1, testClusterNames[0]) + } + gotEDSName2, err := fakeClient.WaitForWatchEDS(ctx) + if err != nil { + t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) + } + if gotEDSName2 != testClusterNames[1] { + t.Fatalf("xdsClient.WatchEDS called for cluster: %v, want: %v", gotEDSName2, testClusterNames[1]) + } + + // Invoke callback, should get an update. + fakeClient.InvokeWatchEDSCallback(gotEDSName1, testEDSUpdates[0], nil) + // Shouldn't send update, because only one resource received an update. + shortCtx, shortCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer shortCancel() + select { + case u := <-rr.updateChannel: + t.Fatalf("get unexpected update %+v", u) + case <-shortCtx.Done(): + } + fakeClient.InvokeWatchEDSCallback(gotEDSName2, testEDSUpdates[1], nil) + select { + case u := <-rr.updateChannel: + if diff := cmp.Diff(u.p, []balancerconfig.PriorityConfig{ + { + Mechanism: balancerconfig.DiscoveryMechanism{ + Type: balancerconfig.DiscoveryMechanismTypeEDS, + Cluster: testClusterNames[0], + }, + EDSResp: testEDSUpdates[0], + }, + { + Mechanism: balancerconfig.DiscoveryMechanism{ + Type: balancerconfig.DiscoveryMechanismTypeEDS, + Cluster: testClusterNames[1], + }, + EDSResp: testEDSUpdates[1], + }, + }); diff != "" { + t.Fatalf("got unexpected resource update, diff (-got, +want): %v", diff) + } + case <-ctx.Done(): + t.Fatal("Timed out waiting for update from update channel.") + } + + // Send the same resources with different priorities, shouldn't trigger + // watch, but should trigger an update with the new priorities. + rr.updateMechanisms([]balancerconfig.DiscoveryMechanism{ + { + Type: balancerconfig.DiscoveryMechanismTypeEDS, + Cluster: testClusterNames[1], + }, + { + Type: balancerconfig.DiscoveryMechanismTypeEDS, + Cluster: testClusterNames[0], + }, + }) + shortCtx, shortCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer shortCancel() + if n, err := fakeClient.WaitForWatchEDS(shortCtx); err == nil { + t.Fatalf("unexpected watch started for EDS: %v", n) + } + select { + case u := <-rr.updateChannel: + if diff := cmp.Diff(u.p, []balancerconfig.PriorityConfig{ + { + Mechanism: balancerconfig.DiscoveryMechanism{ + Type: balancerconfig.DiscoveryMechanismTypeEDS, + Cluster: testClusterNames[1], + }, + EDSResp: testEDSUpdates[1], + }, + { + Mechanism: balancerconfig.DiscoveryMechanism{ + Type: balancerconfig.DiscoveryMechanismTypeEDS, + Cluster: testClusterNames[0], + }, + EDSResp: testEDSUpdates[0], + }, + }); diff != "" { + t.Fatalf("got unexpected resource update, diff (-got, +want): %v", diff) + } + case <-ctx.Done(): + t.Fatal("Timed out waiting for update from update channel.") + } + + // Close the resource resolver. Should stop EDS watch. + rr.stop() + edsNameCanceled1, err := fakeClient.WaitForCancelEDSWatch(ctx) + if err != nil { + t.Fatalf("xdsClient.CancelCDS failed with error: %v", err) + } + if edsNameCanceled1 != gotEDSName1 && edsNameCanceled1 != gotEDSName2 { + t.Fatalf("xdsClient.CancelEDS called for %v, want: %v or %v", edsNameCanceled1, gotEDSName1, gotEDSName2) + } + edsNameCanceled2, err := fakeClient.WaitForCancelEDSWatch(ctx) + if err != nil { + t.Fatalf("xdsClient.CancelCDS failed with error: %v", err) + } + if edsNameCanceled2 != gotEDSName2 && edsNameCanceled2 != gotEDSName1 { + t.Fatalf("xdsClient.CancelEDS called for %v, want: %v or %v", edsNameCanceled2, gotEDSName1, gotEDSName2) + } +} + +// Test the case that covers resource for both EDS and DNS. +func (s) TestResourceResolverEDSAndDNS(t *testing.T) { + dnsTargetCh, dnsCloseCh, _, dnsR, cleanup := setupDNS() + defer cleanup() + fakeClient := fakeclient.NewClient() + rr := newResourceResolver(&clusterResolverBalancer{xdsClient: fakeClient}) + rr.updateMechanisms([]balancerconfig.DiscoveryMechanism{ + { + Type: balancerconfig.DiscoveryMechanismTypeEDS, + Cluster: testClusterName, + }, + { + Type: balancerconfig.DiscoveryMechanismTypeLogicalDNS, + DNSHostname: testDNSTarget, + }, + }) + ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer ctxCancel() + gotEDSName1, err := fakeClient.WaitForWatchEDS(ctx) + if err != nil { + t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) + } + if gotEDSName1 != testClusterName { + t.Fatalf("xdsClient.WatchEDS called for cluster: %v, want: %v", gotEDSName1, testClusterName) + } + select { + case target := <-dnsTargetCh: + if diff := cmp.Diff(target, resolver.Target{Scheme: "dns", Endpoint: testDNSTarget}); diff != "" { + t.Fatalf("got unexpected DNS target to watch, diff (-got, +want): %v", diff) + } + case <-ctx.Done(): + t.Fatal("Timed out waiting for building DNS resolver") + } + + fakeClient.InvokeWatchEDSCallback(gotEDSName1, testEDSUpdates[0], nil) + // Shouldn't send update, because only one resource received an update. + shortCtx, shortCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer shortCancel() + select { + case u := <-rr.updateChannel: + t.Fatalf("get unexpected update %+v", u) + case <-shortCtx.Done(): + } + // Invoke DNS, should get an update. + dnsR.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "1.1.1.1"}, {Addr: "2.2.2.2"}}}) + select { + case u := <-rr.updateChannel: + if diff := cmp.Diff(u.p, []balancerconfig.PriorityConfig{ + { + Mechanism: balancerconfig.DiscoveryMechanism{ + Type: balancerconfig.DiscoveryMechanismTypeEDS, + Cluster: testClusterName, + }, + EDSResp: testEDSUpdates[0], + }, + { + Mechanism: balancerconfig.DiscoveryMechanism{ + Type: balancerconfig.DiscoveryMechanismTypeLogicalDNS, + DNSHostname: testDNSTarget, + }, + Addresses: []string{"1.1.1.1", "2.2.2.2"}, + }, + }); diff != "" { + t.Fatalf("got unexpected resource update, diff (-got, +want): %v", diff) + } + case <-ctx.Done(): + t.Fatal("Timed out waiting for update from update channel.") + } + + // Close the resource resolver. Should stop EDS watch. + rr.stop() + edsNameCanceled1, err := fakeClient.WaitForCancelEDSWatch(ctx) + if err != nil { + t.Fatalf("xdsClient.CancelCDS failed with error: %v", err) + } + if edsNameCanceled1 != gotEDSName1 { + t.Fatalf("xdsClient.CancelEDS called for %v, want: %v", edsNameCanceled1, gotEDSName1) + } + select { + case <-dnsCloseCh: + case <-ctx.Done(): + t.Fatal("Timed out waiting for closing DNS resolver") + } +} + +// Test the case that covers resource changing between EDS and DNS. +func (s) TestResourceResolverChangeFromEDSToDNS(t *testing.T) { + dnsTargetCh, dnsCloseCh, _, dnsR, cleanup := setupDNS() + defer cleanup() + fakeClient := fakeclient.NewClient() + rr := newResourceResolver(&clusterResolverBalancer{xdsClient: fakeClient}) + rr.updateMechanisms([]balancerconfig.DiscoveryMechanism{{ + Type: balancerconfig.DiscoveryMechanismTypeEDS, + Cluster: testClusterName, + }}) + ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer ctxCancel() + gotEDSName1, err := fakeClient.WaitForWatchEDS(ctx) + if err != nil { + t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) + } + if gotEDSName1 != testClusterName { + t.Fatalf("xdsClient.WatchEDS called for cluster: %v, want: %v", gotEDSName1, testClusterName) + } + + // Invoke callback, should get an update. + fakeClient.InvokeWatchEDSCallback(gotEDSName1, testEDSUpdates[0], nil) + select { + case u := <-rr.updateChannel: + if diff := cmp.Diff(u.p, []balancerconfig.PriorityConfig{{ + Mechanism: balancerconfig.DiscoveryMechanism{ + Type: balancerconfig.DiscoveryMechanismTypeEDS, + Cluster: testClusterName, + }, + EDSResp: testEDSUpdates[0], + }}); diff != "" { + t.Fatalf("got unexpected resource update, diff (-got, +want): %v", diff) + } + case <-ctx.Done(): + t.Fatal("Timed out waiting for update from update channel.") + } + + // Update to watch DNS instead. Should cancel EDS, and start DNS. + rr.updateMechanisms([]balancerconfig.DiscoveryMechanism{{ + Type: balancerconfig.DiscoveryMechanismTypeLogicalDNS, + DNSHostname: testDNSTarget, + }}) + select { + case target := <-dnsTargetCh: + if diff := cmp.Diff(target, resolver.Target{Scheme: "dns", Endpoint: testDNSTarget}); diff != "" { + t.Fatalf("got unexpected DNS target to watch, diff (-got, +want): %v", diff) + } + case <-ctx.Done(): + t.Fatal("Timed out waiting for building DNS resolver") + } + edsNameCanceled1, err := fakeClient.WaitForCancelEDSWatch(ctx) + if err != nil { + t.Fatalf("xdsClient.CancelCDS failed with error: %v", err) + } + if edsNameCanceled1 != gotEDSName1 { + t.Fatalf("xdsClient.CancelEDS called for %v, want: %v", edsNameCanceled1, gotEDSName1) + } + + dnsR.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "1.1.1.1"}, {Addr: "2.2.2.2"}}}) + select { + case u := <-rr.updateChannel: + if diff := cmp.Diff(u.p, []balancerconfig.PriorityConfig{{ + Mechanism: balancerconfig.DiscoveryMechanism{ + Type: balancerconfig.DiscoveryMechanismTypeLogicalDNS, + DNSHostname: testDNSTarget, + }, + Addresses: []string{"1.1.1.1", "2.2.2.2"}, + }}); diff != "" { + t.Fatalf("got unexpected resource update, diff (-got, +want): %v", diff) + } + case <-ctx.Done(): + t.Fatal("Timed out waiting for update from update channel.") + } + + // Close the resource resolver. Should stop DNS. + rr.stop() + select { + case <-dnsCloseCh: + case <-ctx.Done(): + t.Fatal("Timed out waiting for closing DNS resolver") + } +} + +// Test the case that covers errors for both EDS and DNS. +func (s) TestResourceResolverError(t *testing.T) { + dnsTargetCh, dnsCloseCh, _, dnsR, cleanup := setupDNS() + defer cleanup() + fakeClient := fakeclient.NewClient() + rr := newResourceResolver(&clusterResolverBalancer{xdsClient: fakeClient}) + rr.updateMechanisms([]balancerconfig.DiscoveryMechanism{ + { + Type: balancerconfig.DiscoveryMechanismTypeEDS, + Cluster: testClusterName, + }, + { + Type: balancerconfig.DiscoveryMechanismTypeLogicalDNS, + DNSHostname: testDNSTarget, + }, + }) + ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer ctxCancel() + gotEDSName1, err := fakeClient.WaitForWatchEDS(ctx) + if err != nil { + t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) + } + if gotEDSName1 != testClusterName { + t.Fatalf("xdsClient.WatchEDS called for cluster: %v, want: %v", gotEDSName1, testClusterName) + } + select { + case target := <-dnsTargetCh: + if diff := cmp.Diff(target, resolver.Target{Scheme: "dns", Endpoint: testDNSTarget}); diff != "" { + t.Fatalf("got unexpected DNS target to watch, diff (-got, +want): %v", diff) + } + case <-ctx.Done(): + t.Fatal("Timed out waiting for building DNS resolver") + } + + // Invoke callback with an error, should get an update. + edsErr := fmt.Errorf("EDS error") + fakeClient.InvokeWatchEDSCallback(gotEDSName1, xdsclient.EndpointsUpdate{}, edsErr) + select { + case u := <-rr.updateChannel: + if u.err != edsErr { + t.Fatalf("got unexpected error from update, want %v, got %v", edsErr, u.err) + } + case <-ctx.Done(): + t.Fatal("Timed out waiting for update from update channel.") + } + + // Invoke DNS with an error, should get an update. + dnsErr := fmt.Errorf("DNS error") + dnsR.ReportError(dnsErr) + select { + case u := <-rr.updateChannel: + if u.err != dnsErr { + t.Fatalf("got unexpected error from update, want %v, got %v", dnsErr, u.err) + } + case <-ctx.Done(): + t.Fatal("Timed out waiting for update from update channel.") + } + + // Close the resource resolver. Should stop EDS watch. + rr.stop() + edsNameCanceled1, err := fakeClient.WaitForCancelEDSWatch(ctx) + if err != nil { + t.Fatalf("xdsClient.CancelCDS failed with error: %v", err) + } + if edsNameCanceled1 != gotEDSName1 { + t.Fatalf("xdsClient.CancelEDS called for %v, want: %v", edsNameCanceled1, gotEDSName1) + } + select { + case <-dnsCloseCh: + case <-ctx.Done(): + t.Fatal("Timed out waiting for closing DNS resolver") + } +} + +// Test re-resolve of the DNS resolver. +func (s) TestResourceResolverDNSResolveNow(t *testing.T) { + dnsTargetCh, dnsCloseCh, resolveNowCh, dnsR, cleanup := setupDNS() + defer cleanup() + fakeClient := fakeclient.NewClient() + rr := newResourceResolver(&clusterResolverBalancer{xdsClient: fakeClient}) + rr.updateMechanisms([]balancerconfig.DiscoveryMechanism{{ + Type: balancerconfig.DiscoveryMechanismTypeLogicalDNS, + DNSHostname: testDNSTarget, + }}) + ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer ctxCancel() + select { + case target := <-dnsTargetCh: + if diff := cmp.Diff(target, resolver.Target{Scheme: "dns", Endpoint: testDNSTarget}); diff != "" { + t.Fatalf("got unexpected DNS target to watch, diff (-got, +want): %v", diff) + } + case <-ctx.Done(): + t.Fatal("Timed out waiting for building DNS resolver") + } + + // Invoke callback, should get an update. + dnsR.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "1.1.1.1"}, {Addr: "2.2.2.2"}}}) + select { + case u := <-rr.updateChannel: + if diff := cmp.Diff(u.p, []balancerconfig.PriorityConfig{{ + Mechanism: balancerconfig.DiscoveryMechanism{ + Type: balancerconfig.DiscoveryMechanismTypeLogicalDNS, + DNSHostname: testDNSTarget, + }, + Addresses: []string{"1.1.1.1", "2.2.2.2"}, + }}); diff != "" { + t.Fatalf("got unexpected resource update, diff (-got, +want): %v", diff) + } + case <-ctx.Done(): + t.Fatal("Timed out waiting for update from update channel.") + } + rr.resolveNow() + select { + case <-resolveNowCh: + case <-ctx.Done(): + t.Fatal("Timed out waiting for re-resolve") + } + // Close the resource resolver. Should close the underlying resolver. + rr.stop() + select { + case <-dnsCloseCh: + case <-ctx.Done(): + t.Fatal("Timed out waiting for closing DNS resolver") + } +} diff --git a/xds/internal/testutils/fakeclient/client.go b/xds/internal/testutils/fakeclient/client.go index f3cfb9401e3..25545408036 100644 --- a/xds/internal/testutils/fakeclient/client.go +++ b/xds/internal/testutils/fakeclient/client.go @@ -54,7 +54,7 @@ type Client struct { ldsCb func(xdsclient.ListenerUpdate, error) rdsCb func(xdsclient.RouteConfigUpdate, error) cdsCbs map[string]func(xdsclient.ClusterUpdate, error) - edsCb func(xdsclient.EndpointsUpdate, error) + edsCbs map[string]func(xdsclient.EndpointsUpdate, error) Closed *grpcsync.Event // fired when Close is called. } @@ -180,10 +180,10 @@ func (xdsC *Client) WaitForCancelClusterWatch(ctx context.Context) (string, erro // WatchEndpoints registers an EDS watch for provided clusterName. func (xdsC *Client) WatchEndpoints(clusterName string, callback func(xdsclient.EndpointsUpdate, error)) (cancel func()) { - xdsC.edsCb = callback + xdsC.edsCbs[clusterName] = callback xdsC.edsWatchCh.Send(clusterName) return func() { - xdsC.edsCancelCh.Send(nil) + xdsC.edsCancelCh.Send(clusterName) } } @@ -201,15 +201,28 @@ func (xdsC *Client) WaitForWatchEDS(ctx context.Context) (string, error) { // // Not thread safe with WatchEndpoints. Only call this after // WaitForWatchEDS. -func (xdsC *Client) InvokeWatchEDSCallback(update xdsclient.EndpointsUpdate, err error) { - xdsC.edsCb(update, err) +func (xdsC *Client) InvokeWatchEDSCallback(name string, update xdsclient.EndpointsUpdate, err error) { + if len(xdsC.edsCbs) != 1 { + // This may panic if name isn't found. But it's fine for tests. + xdsC.edsCbs[name](update, err) + return + } + // Keeps functionality with previous usage of this, if single callback call + // that callback. + for n := range xdsC.edsCbs { + name = n + } + xdsC.edsCbs[name](update, err) } // WaitForCancelEDSWatch waits for a EDS watch to be cancelled and returns // context.DeadlineExceeded otherwise. -func (xdsC *Client) WaitForCancelEDSWatch(ctx context.Context) error { - _, err := xdsC.edsCancelCh.Receive(ctx) - return err +func (xdsC *Client) WaitForCancelEDSWatch(ctx context.Context) (string, error) { + edsNameReceived, err := xdsC.edsCancelCh.Receive(ctx) + if err != nil { + return "", err + } + return edsNameReceived.(string), err } // ReportLoadArgs wraps the arguments passed to ReportLoad. @@ -282,15 +295,16 @@ func NewClientWithName(name string) *Client { ldsWatchCh: testutils.NewChannel(), rdsWatchCh: testutils.NewChannel(), cdsWatchCh: testutils.NewChannelWithSize(10), - edsWatchCh: testutils.NewChannel(), + edsWatchCh: testutils.NewChannelWithSize(10), ldsCancelCh: testutils.NewChannel(), rdsCancelCh: testutils.NewChannel(), cdsCancelCh: testutils.NewChannelWithSize(10), - edsCancelCh: testutils.NewChannel(), + edsCancelCh: testutils.NewChannelWithSize(10), loadReportCh: testutils.NewChannel(), lrsCancelCh: testutils.NewChannel(), loadStore: load.NewStore(), cdsCbs: make(map[string]func(xdsclient.ClusterUpdate, error)), + edsCbs: make(map[string]func(xdsclient.EndpointsUpdate, error)), Closed: grpcsync.NewEvent(), } }