From cdab8ae5c4544528261b36476c2d7c154c84f813 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Tue, 21 Mar 2023 15:37:39 -0700 Subject: [PATCH] clusterresolver: push empty config to child policy upon removal of cluster resource (#6125) --- .../clusterresolver/clusterresolver_test.go | 188 -------- .../clusterresolver/e2e_test/balancer_test.go | 450 ++++++++++++++++++ .../clusterresolver/e2e_test/eds_impl_test.go | 3 +- .../clusterresolver/resource_resolver.go | 17 +- .../clusterresolver/resource_resolver_dns.go | 6 + .../clusterresolver/resource_resolver_eds.go | 25 +- xds/internal/xdsclient/authority.go | 1 + xds/internal/xdsclient/client.go | 5 + xds/internal/xdsclient/clientimpl_watchers.go | 16 - 9 files changed, 501 insertions(+), 210 deletions(-) create mode 100644 xds/internal/balancer/clusterresolver/e2e_test/balancer_test.go diff --git a/xds/internal/balancer/clusterresolver/clusterresolver_test.go b/xds/internal/balancer/clusterresolver/clusterresolver_test.go index 940de0aacbe..f327c8cf5fc 100644 --- a/xds/internal/balancer/clusterresolver/clusterresolver_test.go +++ b/xds/internal/balancer/clusterresolver/clusterresolver_test.go @@ -147,22 +147,6 @@ func (f *fakeChildBalancer) waitForClientConnStateChangeVerifyBalancerConfig(ctx return nil } -func (f *fakeChildBalancer) waitForClientConnStateChange(ctx context.Context) error { - _, err := f.clientConnState.Receive(ctx) - if err != nil { - return err - } - return nil -} - -func (f *fakeChildBalancer) waitForResolverError(ctx context.Context) error { - _, err := f.resolverError.Receive(ctx) - if err != nil { - return err - } - return nil -} - func (f *fakeChildBalancer) waitForSubConnStateChange(ctx context.Context, wantState *scStateChange) error { val, err := f.subConnState.Receive(ctx) if err != nil { @@ -258,178 +242,6 @@ func (s) TestSubConnStateChange(t *testing.T) { } } -// TestErrorFromXDSClientUpdate verifies that an error from xdsClient update is -// handled correctly. -// -// If it's resource-not-found, watch will NOT be canceled, the EDS impl will -// receive an empty EDS update, and new RPCs will fail. -// -// If it's connection error, nothing will happen. This will need to change to -// handle fallback. -func (s) TestErrorFromXDSClientUpdate(t *testing.T) { - edsLBCh := testutils.NewChannel() - xdsC, cleanup := setup(edsLBCh) - defer cleanup() - - builder := balancer.Get(Name) - edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{}) - if edsB == nil { - t.Fatalf("builder.Build(%s) failed and returned nil", Name) - } - defer edsB.Close() - - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - if err := edsB.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: xdsclient.SetClient(resolver.State{}, xdsC), - BalancerConfig: newLBConfigWithOneEDS(testEDSService), - }); err != nil { - t.Fatal(err) - } - if _, err := xdsC.WaitForWatchEDS(ctx); err != nil { - t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err) - } - xdsC.InvokeWatchEDSCallback("", xdsresource.EndpointsUpdate{}, nil) - edsLB, err := waitForNewChildLB(ctx, edsLBCh) - if err != nil { - t.Fatal(err) - } - if err := edsLB.waitForClientConnStateChange(ctx); err != nil { - t.Fatalf("EDS impl got unexpected update: %v", err) - } - - connectionErr := xdsresource.NewErrorf(xdsresource.ErrorTypeConnection, "connection error") - xdsC.InvokeWatchEDSCallback("", xdsresource.EndpointsUpdate{}, connectionErr) - - sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) - defer sCancel() - if _, err := xdsC.WaitForCancelEDSWatch(sCtx); err != context.DeadlineExceeded { - t.Fatal("watch was canceled, want not canceled (timeout error)") - } - - sCtx, sCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) - defer sCancel() - if err := edsLB.waitForClientConnStateChange(sCtx); err != context.DeadlineExceeded { - t.Fatal(err) - } - if err := edsLB.waitForResolverError(ctx); err != nil { - t.Fatalf("want resolver error, got %v", err) - } - - resourceErr := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "clusterResolverBalancer resource not found error") - xdsC.InvokeWatchEDSCallback("", xdsresource.EndpointsUpdate{}, resourceErr) - // Even if error is resource not found, watch shouldn't be canceled, because - // this is an EDS resource removed (and xds client actually never sends this - // error, but we still handles it). - sCtx, sCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) - defer sCancel() - if _, err := xdsC.WaitForCancelEDSWatch(sCtx); err != context.DeadlineExceeded { - t.Fatal("watch was canceled, want not canceled (timeout error)") - } - if err := edsLB.waitForClientConnStateChange(sCtx); err != context.DeadlineExceeded { - t.Fatal(err) - } - if err := edsLB.waitForResolverError(ctx); err != nil { - t.Fatalf("want resolver error, got %v", err) - } - - // An update with the same service name should not trigger a new watch. - if err := edsB.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: xdsclient.SetClient(resolver.State{}, xdsC), - BalancerConfig: newLBConfigWithOneEDS(testEDSService), - }); err != nil { - t.Fatal(err) - } - sCtx, sCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) - defer sCancel() - if _, err := xdsC.WaitForWatchEDS(sCtx); err != context.DeadlineExceeded { - t.Fatal("got unexpected new EDS watch") - } -} - -// TestErrorFromResolver verifies that resolver errors are handled correctly. -// -// If it's resource-not-found, watch will be canceled, the EDS impl will receive -// an empty EDS update, and new RPCs will fail. -// -// If it's connection error, nothing will happen. This will need to change to -// handle fallback. -func (s) TestErrorFromResolver(t *testing.T) { - edsLBCh := testutils.NewChannel() - xdsC, cleanup := setup(edsLBCh) - defer cleanup() - - builder := balancer.Get(Name) - edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{}) - if edsB == nil { - t.Fatalf("builder.Build(%s) failed and returned nil", Name) - } - defer edsB.Close() - - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - if err := edsB.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: xdsclient.SetClient(resolver.State{}, xdsC), - BalancerConfig: newLBConfigWithOneEDS(testEDSService), - }); err != nil { - t.Fatal(err) - } - - if _, err := xdsC.WaitForWatchEDS(ctx); err != nil { - t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err) - } - xdsC.InvokeWatchEDSCallback("", xdsresource.EndpointsUpdate{}, nil) - edsLB, err := waitForNewChildLB(ctx, edsLBCh) - if err != nil { - t.Fatal(err) - } - if err := edsLB.waitForClientConnStateChange(ctx); err != nil { - t.Fatalf("EDS impl got unexpected update: %v", err) - } - - connectionErr := xdsresource.NewErrorf(xdsresource.ErrorTypeConnection, "connection error") - edsB.ResolverError(connectionErr) - - sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) - defer sCancel() - if _, err := xdsC.WaitForCancelEDSWatch(sCtx); err != context.DeadlineExceeded { - t.Fatal("watch was canceled, want not canceled (timeout error)") - } - - sCtx, sCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) - defer sCancel() - if err := edsLB.waitForClientConnStateChange(sCtx); err != context.DeadlineExceeded { - t.Fatal("eds impl got EDS resp, want timeout error") - } - if err := edsLB.waitForResolverError(ctx); err != nil { - t.Fatalf("want resolver error, got %v", err) - } - - resourceErr := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "clusterResolverBalancer resource not found error") - edsB.ResolverError(resourceErr) - if _, err := xdsC.WaitForCancelEDSWatch(ctx); err != nil { - t.Fatalf("want watch to be canceled, waitForCancel failed: %v", err) - } - if err := edsLB.waitForClientConnStateChange(sCtx); err != context.DeadlineExceeded { - t.Fatal(err) - } - if err := edsLB.waitForResolverError(ctx); err != nil { - t.Fatalf("want resolver error, got %v", err) - } - - // An update with the same service name should trigger a new watch, because - // the previous watch was canceled. - if err := edsB.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: xdsclient.SetClient(resolver.State{}, xdsC), - BalancerConfig: newLBConfigWithOneEDS(testEDSService), - }); err != nil { - t.Fatal(err) - } - if _, err := xdsC.WaitForWatchEDS(ctx); err != nil { - t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err) - } -} - // Given a list of resource names, verifies that EDS requests for the same are // sent by the EDS balancer, through the fake xDS client. func verifyExpectedRequests(ctx context.Context, fc *fakeclient.Client, resourceNames ...string) error { diff --git a/xds/internal/balancer/clusterresolver/e2e_test/balancer_test.go b/xds/internal/balancer/clusterresolver/e2e_test/balancer_test.go new file mode 100644 index 00000000000..1513940c940 --- /dev/null +++ b/xds/internal/balancer/clusterresolver/e2e_test/balancer_test.go @@ -0,0 +1,450 @@ +/* + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package e2e_test + +import ( + "context" + "fmt" + "net" + "strconv" + "strings" + "testing" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal" + "google.golang.org/grpc/internal/stubserver" + "google.golang.org/grpc/internal/testutils" + "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/resolver/manual" + "google.golang.org/grpc/serviceconfig" + "google.golang.org/grpc/status" + "google.golang.org/grpc/xds/internal/xdsclient" + "google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version" + + v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" + v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" + v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + testgrpc "google.golang.org/grpc/test/grpc_testing" + testpb "google.golang.org/grpc/test/grpc_testing" + + _ "google.golang.org/grpc/xds/internal/balancer/cdsbalancer" // Register the "cds_experimental" LB policy. +) + +// TestErrorFromParentLB_ConnectionError tests the case where the parent of the +// clusterresolver LB policy sends its a connection error. The parent policy, +// CDS LB policy, sends a connection error when the ADS stream to the management +// server breaks. The test verifies that there is no perceivable effect because +// of this connection error, and that RPCs continue to work (because the LB +// policies are expected to use previously received xDS resources). +func (s) TestErrorFromParentLB_ConnectionError(t *testing.T) { + // Create a listener to be used by the management server. The test will + // close this listener to simulate ADS stream breakage. + lis, err := testutils.LocalTCPListener() + if err != nil { + t.Fatalf("testutils.LocalTCPListener() failed: %v", err) + } + + // Start an xDS management server with the above restartable listener, and + // push a channel when the stream is closed. + streamClosedCh := make(chan struct{}, 1) + managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{ + Listener: lis, + OnStreamClosed: func(int64) { + select { + case streamClosedCh <- struct{}{}: + default: + } + }, + }) + defer cleanup() + + // Start a test backend and extract its host and port. + backend := &stubserver.StubServer{ + EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil }, + } + backend.StartServer() + defer backend.Stop() + _, p, err := net.SplitHostPort(backend.Address) + if err != nil { + t.Fatalf("Failed to split test backend address %q: %v", backend.Address, err) + } + port, err := strconv.ParseUint(p, 10, 32) + if err != nil { + t.Fatalf("Failed to parse test backend port %q: %v", backend.Address, err) + } + + // Configure cluster and endpoints resources in the management server. + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, edsServiceName, e2e.SecurityLevelNone)}, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(edsServiceName, "localhost", []uint32{uint32(port)})}, + SkipValidation: true, + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Create an xDS xdsClient for use by the cluster_resolver LB policy. + xdsClient, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + if err != nil { + t.Fatalf("Failed to create xDS client: %v", err) + } + defer close() + + // Create a manual resolver and push a service config specifying the use of + // the cds LB policy as the top-level LB policy, and a corresponding config + // with a single cluster. + r := manual.NewBuilderWithScheme("whatever") + jsonSC := fmt.Sprintf(`{ + "loadBalancingConfig":[{ + "cds_experimental":{ + "cluster": "%s" + } + }] + }`, clusterName) + scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC) + r.InitialState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, xdsClient)) + + // Create a ClientConn and make a successful RPC. + cc, err := grpc.Dial(r.Scheme()+":///test.service", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r)) + if err != nil { + t.Fatalf("failed to dial local test server: %v", err) + } + defer cc.Close() + + client := testgrpc.NewTestServiceClient(cc) + if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil { + t.Fatalf("rpc EmptyCall() failed: %v", err) + } + + // Close the listener and ensure that the ADS stream breaks. + lis.Close() + select { + case <-ctx.Done(): + t.Fatal("Timeout when waiting for ADS stream to close") + default: + } + + // Ensure that RPCs continue to succeed for the next one second. + for end := time.Now().Add(time.Second); time.Now().Before(end); <-time.After(defaultTestShortTimeout) { + if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil { + t.Fatalf("rpc EmptyCall() failed: %v", err) + } + } +} + +// TestErrorFromParentLB_ResourceNotFound tests the case where the parent of the +// clusterresolver LB policy sends it a resource-not-found error. The parent +// policy, CDS LB policy, sends a resource-not-found error when the cluster +// resource associated with these LB policies is removed by the management +// server. The test verifies that the associated EDS is canceled and RPCs fail. +// It also ensures that when the Cluster resource is added back, the EDS +// resource is re-requested and RPCs being to succeed. +func (s) TestErrorFromParentLB_ResourceNotFound(t *testing.T) { + // Start an xDS management server that uses a couple of channels to + // notify the test about the following events: + // - an EDS requested with the expected resource name is requested + // - EDS resource is unrequested, i.e, an EDS request with no resource name + // is received, which indicates that we are not longer interested in that + // resource. + edsResourceRequestedCh := make(chan struct{}, 1) + edsResourceCanceledCh := make(chan struct{}, 1) + managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{ + OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error { + if req.GetTypeUrl() == version.V3EndpointsURL { + switch len(req.GetResourceNames()) { + case 0: + select { + case edsResourceCanceledCh <- struct{}{}: + default: + } + case 1: + if req.GetResourceNames()[0] == edsServiceName { + select { + case edsResourceRequestedCh <- struct{}{}: + default: + } + } + default: + t.Errorf("Unexpected number of resources, %d, in an EDS request", len(req.GetResourceNames())) + } + } + return nil + }, + }) + defer cleanup() + + // Start a test backend and extract its host and port. + backend := &stubserver.StubServer{ + EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil }, + } + backend.StartServer() + defer backend.Stop() + _, p, err := net.SplitHostPort(backend.Address) + if err != nil { + t.Fatalf("Failed to split test backend address %q: %v", backend.Address, err) + } + port, err := strconv.ParseUint(p, 10, 32) + if err != nil { + t.Fatalf("Failed to parse test backend port %q: %v", backend.Address, err) + } + + // Configure cluster and endpoints resources in the management server. + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, edsServiceName, e2e.SecurityLevelNone)}, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(edsServiceName, "localhost", []uint32{uint32(port)})}, + SkipValidation: true, + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Create an xDS xdsClient for use by the cluster_resolver LB policy. + xdsClient, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + if err != nil { + t.Fatalf("Failed to create xDS client: %v", err) + } + defer close() + + // Create a manual resolver and push a service config specifying the use of + // the cds LB policy as the top-level LB policy, and a corresponding config + // with a single cluster. + r := manual.NewBuilderWithScheme("whatever") + jsonSC := fmt.Sprintf(`{ + "loadBalancingConfig":[{ + "cds_experimental":{ + "cluster": "%s" + } + }] + }`, clusterName) + scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC) + r.InitialState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, xdsClient)) + + // Create a ClientConn that kick starts the xDS workflow. + cc, err := grpc.Dial(r.Scheme()+":///test.service", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r)) + if err != nil { + t.Fatalf("failed to dial local test server: %v", err) + } + defer cc.Close() + + // Wait for the EDS resource to be requested. + select { + case <-ctx.Done(): + t.Fatal("Timeout when waiting for EDS resource to be requested") + case <-edsResourceRequestedCh: + } + + // Ensure that a successful RPC can be made. + client := testgrpc.NewTestServiceClient(cc) + if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil { + t.Fatalf("rpc EmptyCall() failed: %v", err) + } + + // Delete the cluster resource from the mangement server. + resources.Clusters = nil + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Wait for the EDS resource to be not requested anymore. + select { + case <-ctx.Done(): + t.Fatal("Timeout when waiting for EDS resource to not requested") + case <-edsResourceCanceledCh: + } + + // Ensure that RPCs start to fail with expected error. + for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) { + sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) + defer sCancel() + _, err := client.EmptyCall(sCtx, &testpb.Empty{}) + if status.Code(err) == codes.Unavailable && strings.Contains(err.Error(), "all priorities are removed") { + break + } + if err != nil { + t.Logf("EmptyCall RPC failed: %v", err) + } + } + if ctx.Err() != nil { + t.Fatalf("RPCs did not fail after removal of Cluster resource") + } + + // Ensure that the ClientConn is in TransientFailure. + if state := cc.GetState(); state != connectivity.TransientFailure { + t.Fatalf("Unexpected connectivity state for ClientConn, got: %s, want %s", state, connectivity.TransientFailure) + } + + // Configure cluster and endpoints resources in the management server. + resources = e2e.UpdateOptions{ + NodeID: nodeID, + Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, edsServiceName, e2e.SecurityLevelNone)}, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(edsServiceName, "localhost", []uint32{uint32(port)})}, + SkipValidation: true, + } + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Wait for the EDS resource to be requested again. + select { + case <-ctx.Done(): + t.Fatal("Timeout when waiting for EDS resource to be requested") + case <-edsResourceRequestedCh: + } + + // Ensure that a successful RPC can be made. + for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) { + sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) + defer sCancel() + if _, err := client.EmptyCall(sCtx, &testpb.Empty{}); err != nil { + t.Logf("EmptyCall RPC failed: %v", err) + continue + } + break + } + if ctx.Err() != nil { + t.Fatalf("RPCs did not fail after removal of Cluster resource") + } +} + +// TestEDSResourceRemoved tests the case where the EDS resource requested by the +// clusterresolver LB policy is removed from the management server. The test +// verifies that the EDS watch is not canceled and that RPCs continue to succeed +// with the previously received configuration. +func (s) TestEDSResourceRemoved(t *testing.T) { + // Start an xDS management server that uses a couple of channels to + // notify the test about the following events: + // - an EDS requested with the expected resource name is requested + // - EDS resource is unrequested, i.e, an EDS request with no resource name + // is received, which indicates that we are not longer interested in that + // resource. + edsResourceRequestedCh := make(chan struct{}, 1) + edsResourceCanceledCh := make(chan struct{}, 1) + managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{ + OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error { + if req.GetTypeUrl() == version.V3EndpointsURL { + switch len(req.GetResourceNames()) { + case 0: + select { + case edsResourceCanceledCh <- struct{}{}: + default: + } + case 1: + if req.GetResourceNames()[0] == edsServiceName { + select { + case edsResourceRequestedCh <- struct{}{}: + default: + } + } + default: + t.Errorf("Unexpected number of resources, %d, in an EDS request", len(req.GetResourceNames())) + } + } + return nil + }, + }) + defer cleanup() + + // Start a test backend and extract its host and port. + backend := &stubserver.StubServer{ + EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil }, + } + backend.StartServer() + defer backend.Stop() + _, p, err := net.SplitHostPort(backend.Address) + if err != nil { + t.Fatalf("Failed to split test backend address %q: %v", backend.Address, err) + } + port, err := strconv.ParseUint(p, 10, 32) + if err != nil { + t.Fatalf("Failed to parse test backend port %q: %v", backend.Address, err) + } + + // Configure cluster and endpoints resources in the management server. + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, edsServiceName, e2e.SecurityLevelNone)}, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(edsServiceName, "localhost", []uint32{uint32(port)})}, + SkipValidation: true, + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Create an xDS xdsClient for use by the cluster_resolver LB policy. + xdsClient, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + if err != nil { + t.Fatalf("Failed to create xDS client: %v", err) + } + defer close() + + // Create a manual resolver and push a service config specifying the use of + // the cds LB policy as the top-level LB policy, and a corresponding config + // with a single cluster. + r := manual.NewBuilderWithScheme("whatever") + jsonSC := fmt.Sprintf(`{ + "loadBalancingConfig":[{ + "cds_experimental":{ + "cluster": "%s" + } + }] + }`, clusterName) + scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC) + r.InitialState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, xdsClient)) + + // Create a ClientConn and make a successful RPC. + cc, err := grpc.Dial(r.Scheme()+":///test.service", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r)) + if err != nil { + t.Fatalf("failed to dial local test server: %v", err) + } + defer cc.Close() + + client := testgrpc.NewTestServiceClient(cc) + if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil { + t.Fatalf("rpc EmptyCall() failed: %v", err) + } + + // Delete the endpoints resource from the mangement server. + resources.Endpoints = nil + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Ensure that RPCs continue to succeed for the next one second, and that the EDS watch is not canceled. + for end := time.Now().Add(time.Second); time.Now().Before(end); <-time.After(defaultTestShortTimeout) { + if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil { + t.Fatalf("rpc EmptyCall() failed: %v", err) + } + select { + case <-edsResourceCanceledCh: + t.Fatal("EDS watch canceled when not expected to be canceled") + default: + } + } +} diff --git a/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go b/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go index 83289ce40df..ce497c1706d 100644 --- a/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go +++ b/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go @@ -58,7 +58,8 @@ const ( localityName2 = "my-locality-2" localityName3 = "my-locality-3" - defaultTestTimeout = 5 * time.Second + defaultTestTimeout = 5 * time.Second + defaultTestShortTimeout = 10 * time.Millisecond ) type s struct { diff --git a/xds/internal/balancer/clusterresolver/resource_resolver.go b/xds/internal/balancer/clusterresolver/resource_resolver.go index e47aaf1ceba..580734a0215 100644 --- a/xds/internal/balancer/clusterresolver/resource_resolver.go +++ b/xds/internal/balancer/clusterresolver/resource_resolver.go @@ -57,7 +57,8 @@ type endpointsResolver interface { // resolverNow triggers re-resolution of the resource. resolveNow() - // stop stops resolution of the resource. + // stop stops resolution of the resource. Implementations must not invoke + // any methods on the topLevelResolver interface once `stop()` returns. stop() } @@ -224,6 +225,20 @@ func (rr *resourceResolver) stop() { for _, r := range cm { r.r.stop() } + + // stop() is called when the LB policy is closed or when the underlying + // cluster resource is removed by the management server. In the latter case, + // an empty config update needs to be pushed to the child policy to ensure + // that a picker that fails RPCs is sent up to the channel. + // + // Resource resolver implementations are expected to not send any updates + // after they are stopped. Therefore, we don't have to worry about another + // write to this channel happening at the same time as this one. + select { + case <-rr.updateChannel: + default: + } + rr.updateChannel <- &resourceUpdate{} } // generateLocked collects updates from all resolvers. It pushes the combined diff --git a/xds/internal/balancer/clusterresolver/resource_resolver_dns.go b/xds/internal/balancer/clusterresolver/resource_resolver_dns.go index 4ce70e0fe6b..06af9cc6df3 100644 --- a/xds/internal/balancer/clusterresolver/resource_resolver_dns.go +++ b/xds/internal/balancer/clusterresolver/resource_resolver_dns.go @@ -100,6 +100,12 @@ func (dr *dnsDiscoveryMechanism) resolveNow() { } } +// The definition of stop() mentions that implementations must not invoke any +// methods on the topLevelResolver once the call to `stop()` returns. The +// underlying dns resolver does not send any updates to the resolver.ClientConn +// interface passed to it (implemented by dnsDiscoveryMechanism in this case) +// after its `Close()` returns. Therefore, we can guarantee that no methods of +// the topLevelResolver are invoked after we return from this method. func (dr *dnsDiscoveryMechanism) stop() { if dr.dnsR != nil { dr.dnsR.Close() diff --git a/xds/internal/balancer/clusterresolver/resource_resolver_eds.go b/xds/internal/balancer/clusterresolver/resource_resolver_eds.go index 62d932b85d5..2517cf49159 100644 --- a/xds/internal/balancer/clusterresolver/resource_resolver_eds.go +++ b/xds/internal/balancer/clusterresolver/resource_resolver_eds.go @@ -21,6 +21,7 @@ package clusterresolver import ( "sync" + "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" ) @@ -29,8 +30,9 @@ type edsResourceWatcher interface { } type edsDiscoveryMechanism struct { - cancel func() + cancelWatch func() topLevelResolver topLevelResolver + stopped *grpcsync.Event mu sync.Mutex update xdsresource.EndpointsUpdate @@ -50,11 +52,23 @@ func (er *edsDiscoveryMechanism) lastUpdate() (interface{}, bool) { func (er *edsDiscoveryMechanism) resolveNow() { } +// The definition of stop() mentions that implementations must not invoke any +// methods on the topLevelResolver once the call to `stop()` returns. func (er *edsDiscoveryMechanism) stop() { - er.cancel() + // Canceling a watch with the xDS client can race with an xDS response + // received around the same time, and can result in the watch callback being + // invoked after the watch is canceled. Callers need to handle this race, + // and we fire the stopped event here to ensure that a watch callback + // invocation around the same time becomes a no-op. + er.stopped.Fire() + er.cancelWatch() } func (er *edsDiscoveryMechanism) handleEndpointsUpdate(update xdsresource.EndpointsUpdate, err error) { + if er.stopped.HasFired() { + return + } + if err != nil { er.topLevelResolver.onError(err) return @@ -71,7 +85,10 @@ func (er *edsDiscoveryMechanism) handleEndpointsUpdate(update xdsresource.Endpoi // newEDSResolver returns an implementation of the endpointsResolver interface // that uses EDS to resolve the given name to endpoints. func newEDSResolver(nameToWatch string, watcher edsResourceWatcher, topLevelResolver topLevelResolver) *edsDiscoveryMechanism { - ret := &edsDiscoveryMechanism{topLevelResolver: topLevelResolver} - ret.cancel = watcher.WatchEndpoints(nameToWatch, ret.handleEndpointsUpdate) + ret := &edsDiscoveryMechanism{ + topLevelResolver: topLevelResolver, + stopped: grpcsync.NewEvent(), + } + ret.cancelWatch = watcher.WatchEndpoints(nameToWatch, ret.handleEndpointsUpdate) return ret } diff --git a/xds/internal/xdsclient/authority.go b/xds/internal/xdsclient/authority.go index 1ea1d532e3d..0a03e43a314 100644 --- a/xds/internal/xdsclient/authority.go +++ b/xds/internal/xdsclient/authority.go @@ -483,6 +483,7 @@ func (a *authority) watchResource(rType xdsresource.Type, resourceName string, w // There are no more watchers for this resource, delete the state // associated with it, and instruct the transport to send a request // which does not include this resource name. + a.logger.Debugf("Removing last watch for type %q, resource name %q", rType.TypeEnum(), resourceName) delete(resources, resourceName) a.sendDiscoveryRequestLocked(rType, resources) } diff --git a/xds/internal/xdsclient/client.go b/xds/internal/xdsclient/client.go index 6e380b27d54..cc39fb2e4d1 100644 --- a/xds/internal/xdsclient/client.go +++ b/xds/internal/xdsclient/client.go @@ -45,6 +45,11 @@ type XDSClient interface { // instead use a resource-type-specific wrapper API provided by the relevant // resource type implementation. // + // + // During a race (e.g. an xDS response is received while the user is calling + // cancel()), there's a small window where the callback can be called after + // the watcher is canceled. Callers need to handle this case. + // // TODO: Once this generic client API is fully implemented and integrated, // delete the resource type specific watch APIs on this interface. WatchResource(rType xdsresource.Type, resourceName string, watcher xdsresource.ResourceWatcher) (cancel func()) diff --git a/xds/internal/xdsclient/clientimpl_watchers.go b/xds/internal/xdsclient/clientimpl_watchers.go index 77c4a614a22..3a2505db43e 100644 --- a/xds/internal/xdsclient/clientimpl_watchers.go +++ b/xds/internal/xdsclient/clientimpl_watchers.go @@ -48,10 +48,6 @@ func (l *listenerWatcher) OnResourceDoesNotExist() { // WatchListener uses LDS to discover information about the Listener resource // identified by resourceName. -// -// Note that during race (e.g. an xDS response is received while the user is -// calling cancel()), there's a small window where the callback can be called -// after the watcher is canceled. The caller needs to handle this case. func (c *clientImpl) WatchListener(resourceName string, cb func(xdsresource.ListenerUpdate, error)) (cancel func()) { watcher := &listenerWatcher{resourceName: resourceName, cb: cb} return xdsresource.WatchListener(c, resourceName, watcher) @@ -80,10 +76,6 @@ func (r *routeConfigWatcher) OnResourceDoesNotExist() { // WatchRouteConfig uses RDS to discover information about the // RouteConfiguration resource identified by resourceName. -// -// Note that during race (e.g. an xDS response is received while the user is -// calling cancel()), there's a small window where the callback can be called -// after the watcher is canceled. The caller needs to handle this case. func (c *clientImpl) WatchRouteConfig(resourceName string, cb func(xdsresource.RouteConfigUpdate, error)) (cancel func()) { watcher := &routeConfigWatcher{resourceName: resourceName, cb: cb} return xdsresource.WatchRouteConfig(c, resourceName, watcher) @@ -115,10 +107,6 @@ func (c *clusterWatcher) OnResourceDoesNotExist() { // // WatchCluster can be called multiple times, with same or different // clusterNames. Each call will start an independent watcher for the resource. -// -// Note that during race (e.g. an xDS response is received while the user is -// calling cancel()), there's a small window where the callback can be called -// after the watcher is canceled. The caller needs to handle this case. func (c *clientImpl) WatchCluster(resourceName string, cb func(xdsresource.ClusterUpdate, error)) (cancel func()) { watcher := &clusterWatcher{resourceName: resourceName, cb: cb} return xdsresource.WatchCluster(c, resourceName, watcher) @@ -150,10 +138,6 @@ func (c *endpointsWatcher) OnResourceDoesNotExist() { // // WatchEndpoints can be called multiple times, with same or different // clusterNames. Each call will start an independent watcher for the resource. -// -// Note that during race (e.g. an xDS response is received while the user is -// calling cancel()), there's a small window where the callback can be called -// after the watcher is canceled. The caller needs to handle this case. func (c *clientImpl) WatchEndpoints(resourceName string, cb func(xdsresource.EndpointsUpdate, error)) (cancel func()) { watcher := &endpointsWatcher{resourceName: resourceName, cb: cb} return xdsresource.WatchEndpoints(c, resourceName, watcher)