From b286c3c94b2c2c0e14ad4a085eb24df9942d6022 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Thu, 16 Mar 2023 12:08:32 -0700 Subject: [PATCH] clusterresolver: push empty config to child policy upon removal of cluster resource --- .../cdsbalancer/tests/balancer_test.go | 161 ++++++++++++++++++ .../clusterresolver/resource_resolver.go | 6 + xds/internal/xdsclient/authority.go | 1 + 3 files changed, 168 insertions(+) create mode 100644 xds/internal/balancer/cdsbalancer/tests/balancer_test.go diff --git a/xds/internal/balancer/cdsbalancer/tests/balancer_test.go b/xds/internal/balancer/cdsbalancer/tests/balancer_test.go new file mode 100644 index 000000000000..0b9b191ffda9 --- /dev/null +++ b/xds/internal/balancer/cdsbalancer/tests/balancer_test.go @@ -0,0 +1,161 @@ +/* + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package tests + +import ( + "context" + "fmt" + "net" + "strconv" + "strings" + "testing" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal" + "google.golang.org/grpc/internal/grpctest" + "google.golang.org/grpc/internal/stubserver" + "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/resolver/manual" + "google.golang.org/grpc/serviceconfig" + "google.golang.org/grpc/status" + "google.golang.org/grpc/xds/internal/xdsclient" + + v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" + v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" + testgrpc "google.golang.org/grpc/test/grpc_testing" + testpb "google.golang.org/grpc/test/grpc_testing" + + _ "google.golang.org/grpc/xds/internal/balancer/cdsbalancer" // Register the "cds_experimental" LB policy. +) + +const ( + clusterName = "cluster-my-service-client-side-xds" + edsServiceName = "endpoints-my-service-client-side-xds" + defaultTestTimeout = 5 * time.Second + defaultTestShortTimeout = 10 * time.Millisecond +) + +type s struct { + grpctest.Tester +} + +func Test(t *testing.T) { + grpctest.RunSubTests(t, s{}) +} + +// TestClusterResourceRemoved tests the case where the cds LB policy is +// configured as the top-level LB policy. The test verifies that removal of the +// associated cluster resource results in RPCs failing with Unavailable and the +// channel moving to TransientFailure. +func (s) TestClusterResourceRemoved(t *testing.T) { + // Start an xDS management server. + managementServer, nodeID, bootstrapContents, _, cleanup1 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{}) + defer cleanup1() + + // Start a test backend and extract its host and port. + backend := &stubserver.StubServer{ + EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil }, + } + backend.StartServer() + defer backend.Stop() + _, p, err := net.SplitHostPort(backend.Address) + if err != nil { + t.Fatalf("Failed to split test backend address %q: %v", backend.Address, err) + } + port, err := strconv.ParseUint(p, 10, 32) + if err != nil { + t.Fatalf("Failed to parse test backend port %q: %v", backend.Address, err) + } + + // Configure cluster and endpoints resources in the management server. + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, edsServiceName, e2e.SecurityLevelNone)}, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(edsServiceName, "localhost", []uint32{uint32(port)})}, + SkipValidation: true, + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Create an xDS xdsClient for use by the cluster_resolver LB policy. + xdsClient, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + if err != nil { + t.Fatalf("Failed to create xDS client: %v", err) + } + defer close() + + // Create a manual resolver and push a service config specifying the use of + // the cds LB policy as the top-level LB policy, and a corresponding config + // with a single cluster. + r := manual.NewBuilderWithScheme("whatever") + jsonSC := fmt.Sprintf(`{ + "loadBalancingConfig":[{ + "cds_experimental":{ + "cluster": "%s" + } + }] + }`, clusterName) + scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC) + r.InitialState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, xdsClient)) + + // Create a ClientConn and make a successful RPC. + cc, err := grpc.Dial(r.Scheme()+":///test.service", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r)) + if err != nil { + t.Fatalf("failed to dial local test server: %v", err) + } + defer cc.Close() + + client := testgrpc.NewTestServiceClient(cc) + if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil { + t.Fatalf("rpc EmptyCall() failed: %v", err) + } + + // Delete the cluster resource from the mangement server. + resources.Clusters = nil + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Ensure that RPCs start to fail with expected error. + for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) { + sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) + defer sCancel() + _, err := client.EmptyCall(sCtx, &testpb.Empty{}) + if status.Code(err) == codes.Unavailable && strings.Contains(err.Error(), "all priorities are removed") { + break + } + if err != nil { + t.Logf("EmptyCall RPC failed: %v", err) + } + } + if ctx.Err() != nil { + t.Fatalf("RPCs did not fail after removal of Cluster resource") + } + + // Ensure that the ClientConn is in TransientFailure. + if state := cc.GetState(); state != connectivity.TransientFailure { + t.Fatalf("Unexpected connectivity state for ClientConn, got: %s, want %s", state, connectivity.TransientFailure) + } +} diff --git a/xds/internal/balancer/clusterresolver/resource_resolver.go b/xds/internal/balancer/clusterresolver/resource_resolver.go index e47aaf1ceba1..730c060b4e86 100644 --- a/xds/internal/balancer/clusterresolver/resource_resolver.go +++ b/xds/internal/balancer/clusterresolver/resource_resolver.go @@ -224,6 +224,12 @@ func (rr *resourceResolver) stop() { for _, r := range cm { r.r.stop() } + + // stop() is called when the LB policy is closed or when the underlying + // cluster resource is removed by the management server. In the latter case, + // an empty config update needs to be pushed to the child policy to ensure + // that a picker that fails RPCs is sent up to the channel. + rr.updateChannel <- &resourceUpdate{} } // generateLocked collects updates from all resolvers. It pushes the combined diff --git a/xds/internal/xdsclient/authority.go b/xds/internal/xdsclient/authority.go index 1ea1d532e3d8..0a03e43a3148 100644 --- a/xds/internal/xdsclient/authority.go +++ b/xds/internal/xdsclient/authority.go @@ -483,6 +483,7 @@ func (a *authority) watchResource(rType xdsresource.Type, resourceName string, w // There are no more watchers for this resource, delete the state // associated with it, and instruct the transport to send a request // which does not include this resource name. + a.logger.Debugf("Removing last watch for type %q, resource name %q", rType.TypeEnum(), resourceName) delete(resources, resourceName) a.sendDiscoveryRequestLocked(rType, resources) }