diff --git a/xds/internal/resolver/xds_resolver_test.go b/xds/internal/resolver/xds_resolver_test.go index a0e58a5b8716..181ffd74d259 100644 --- a/xds/internal/resolver/xds_resolver_test.go +++ b/xds/internal/resolver/xds_resolver_test.go @@ -28,7 +28,9 @@ import ( "time" xxhash "github.com/cespare/xxhash/v2" + "github.com/envoyproxy/go-control-plane/pkg/wellknown" "github.com/google/go-cmp/cmp" + "github.com/google/uuid" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" @@ -57,7 +59,12 @@ import ( "google.golang.org/grpc/xds/internal/xdsclient/bootstrap" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version" + "google.golang.org/protobuf/types/known/wrapperspb" + v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" + v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" + v3routerpb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/router/v3" + v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3" v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" _ "google.golang.org/grpc/xds/internal/balancer/cdsbalancer" // To parse LB config @@ -478,139 +485,369 @@ func (s) TestXDSResolverCloseClosesXDSClient(t *testing.T) { cancel() // Blocks until the xDS client is closed. } -// TestXDSResolverBadServiceUpdate tests the case the xdsClient returns a bad -// service update. -func (s) TestXDSResolverBadServiceUpdate(t *testing.T) { - xdsR, xdsC, tcc, cancel := testSetup(t, setupOpts{target: target}) - defer xdsR.Close() - defer cancel() +// TestResolverBadServiceUpdate tests the case where a resource returned by the +// management server is NACKed by the xDS client, which then returns an update +// containing an error to the resolver. Verifies that the update is propagated +// to the ClientConn by the resolver. It also tests the cases where the resolver +// gets a good update subsequently, and another error after the good update. +func (s) TestResolverBadServiceUpdate(t *testing.T) { + mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{}) + if err != nil { + t.Fatal(err) + } + defer mgmtServer.Stop() + + // Create a bootstrap configuration specifying the above management server. + nodeID := uuid.New().String() + cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{ + NodeID: nodeID, + ServerURI: mgmtServer.Address, + Version: xdsbootstrap.TransportV3, + }) + if err != nil { + t.Fatal(err) + } + defer cleanup() + // Build an xDS resolver that uses the above bootstrap configuration + // Creating the xDS resolver should result in creation of the xDS client. + const serviceName = "my-service-client-side-xds" + builder := resolver.Get(xdsScheme) + if builder == nil { + t.Fatalf("resolver.Get(%v) returned nil", xdsScheme) + } + u, err := url.Parse("xds:///" + serviceName) + if err != nil { + t.Fatal(err) + } + tcc := newTestClientConn() + r, err := builder.Build(resolver.Target{URL: *u}, tcc, resolver.BuildOptions{}) + if err != nil { + t.Fatalf("builder.Build(%v) returned err: %v", target, err) + } + defer r.Close() + + // Configure a listener resource that is expected to be NACKed because it + // does not contain the `RouteSpecifier` field in the HTTPConnectionManager. + hcm := testutils.MarshalAny(&v3httppb.HttpConnectionManager{ + HttpFilters: []*v3httppb.HttpFilter{e2e.HTTPFilter("router", &v3routerpb.Router{})}, + }) + lis := &v3listenerpb.Listener{ + Name: serviceName, + ApiListener: &v3listenerpb.ApiListener{ApiListener: hcm}, + FilterChains: []*v3listenerpb.FilterChain{{ + Name: "filter-chain-name", + Filters: []*v3listenerpb.Filter{{ + Name: wellknown.HTTPConnectionManager, + ConfigType: &v3listenerpb.Filter_TypedConfig{TypedConfig: hcm}, + }}, + }}, + } + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{lis}, + SkipValidation: true, + } ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - waitForWatchListener(ctx, t, xdsC, targetStr) - xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{RouteConfigName: routeStr, HTTPFilters: routerFilterList}, nil) - waitForWatchRouteConfig(ctx, t, xdsC, routeStr) + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } - // Invoke the watchAPI callback with a bad service update and wait for the - // ReportError method to be called on the ClientConn. - suErr := errors.New("bad serviceupdate") - xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{}, suErr) + wantErr := "no RouteSpecifier" + val, err := tcc.errorCh.Receive(ctx) + if err != nil { + t.Fatal("Timeout when waiting for error to be propagated to the ClientConn") + } + gotErr := val.(error) + if gotErr == nil || !strings.Contains(gotErr.Error(), wantErr) { + t.Fatalf("Received error from resolver %q, want %q", gotErr, wantErr) + } - if gotErrVal, gotErr := tcc.errorCh.Receive(ctx); gotErr != nil || gotErrVal != suErr { - t.Fatalf("ClientConn.ReportError() received %v, want %v", gotErrVal, suErr) + // Configure good listener and route configuration resources on the + // management server. + rdsName := "route-" + serviceName + cdsName := "cluster-" + serviceName + resources = e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, rdsName)}, + Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(rdsName, serviceName, cdsName)}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Expect a good update from the resolver. + val, err = tcc.stateCh.Receive(ctx) + if err != nil { + t.Fatalf("Timeout waiting for an update from the resolver: %v", err) + } + rState := val.(resolver.State) + if err := rState.ServiceConfig.Err; err != nil { + t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err) + } + + // Configure another bad resource on the management server. + resources = e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{lis}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Expect an error update from the resolver. + val, err = tcc.errorCh.Receive(ctx) + if err != nil { + t.Fatal("Timeout when waiting for error to be propagated to the ClientConn") + } + gotErr = val.(error) + if gotErr == nil || !strings.Contains(gotErr.Error(), wantErr) { + t.Fatalf("Received error from resolver %q, want %q", gotErr, wantErr) } } -// TestXDSResolverGoodServiceUpdate tests the happy case where the resolver -// gets a good service update from the xdsClient. -func (s) TestXDSResolverGoodServiceUpdate(t *testing.T) { - xdsR, xdsC, tcc, cancel := testSetup(t, setupOpts{target: target}) - defer xdsR.Close() - defer cancel() +// TestResolverGoodServiceUpdate tests the case where the resource returned by +// the management server is ACKed by the xDS client, which then returns a good +// service update to the resolver. The test verifies that the service config +// returned by the resolver matches expectations, and that the config selector +// returned by the resolver picks clusters based on the route configuration +// received from the management server. +func (s) TestResolverGoodServiceUpdate(t *testing.T) { + mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{}) + if err != nil { + t.Fatal(err) + } + defer mgmtServer.Stop() - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - waitForWatchListener(ctx, t, xdsC, targetStr) - xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{RouteConfigName: routeStr, HTTPFilters: routerFilterList}, nil) - waitForWatchRouteConfig(ctx, t, xdsC, routeStr) - defer replaceRandNumGenerator(0)() + // Create a bootstrap configuration specifying the above management server. + nodeID := uuid.New().String() + cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{ + NodeID: nodeID, + ServerURI: mgmtServer.Address, + Version: xdsbootstrap.TransportV3, + }) + if err != nil { + t.Fatal(err) + } + defer cleanup() + + // Build an xDS resolver that uses the above bootstrap configuration + // Creating the xDS resolver should result in creation of the xDS client. + builder := resolver.Get(xdsScheme) + if builder == nil { + t.Fatalf("resolver.Get(%v) returned nil", xdsScheme) + } + const serviceName = "my-service-client-side-xds" + ldsName := serviceName + rdsName := "route-" + serviceName + u, err := url.Parse("xds:///" + serviceName) + if err != nil { + t.Fatal(err) + } + tcc := newTestClientConn() + r, err := builder.Build(resolver.Target{URL: *u}, tcc, resolver.BuildOptions{}) + if err != nil { + t.Fatalf("builder.Build(%v) returned err: %v", target, err) + } + defer r.Close() for _, tt := range []struct { - routes []*xdsresource.Route - wantJSON string - wantClusters map[string]bool + routeConfig *v3routepb.RouteConfiguration + wantServiceConfig string + wantClusters map[string]bool }{ { - routes: []*xdsresource.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsresource.WeightedCluster{"test-cluster-1": {Weight: 1}}}}, - wantJSON: `{"loadBalancingConfig":[{ - "xds_cluster_manager_experimental":{ - "children":{ - "cluster:test-cluster-1":{ - "childPolicy":[{"cds_experimental":{"cluster":"test-cluster-1"}}] - } - } - }}]}`, + // A route configuration with a single cluster. + routeConfig: &v3routepb.RouteConfiguration{ + Name: rdsName, + VirtualHosts: []*v3routepb.VirtualHost{{ + Domains: []string{ldsName}, + Routes: []*v3routepb.Route{{ + Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}}, + Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{ + ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{WeightedClusters: &v3routepb.WeightedCluster{ + Clusters: []*v3routepb.WeightedCluster_ClusterWeight{ + { + Name: "test-cluster-1", + Weight: &wrapperspb.UInt32Value{Value: 100}, + }, + }, + }}, + }}, + }}, + }}, + }, + wantServiceConfig: ` +{ + "loadBalancingConfig": [{ + "xds_cluster_manager_experimental": { + "children": { + "cluster:test-cluster-1": { + "childPolicy": [{ + "cds_experimental": { + "cluster": "test-cluster-1" + } + }] + } + } + } + }] +}`, wantClusters: map[string]bool{"cluster:test-cluster-1": true}, }, { - routes: []*xdsresource.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsresource.WeightedCluster{ - "cluster_1": {Weight: 75}, - "cluster_2": {Weight: 25}, - }}}, - // This update contains the cluster from the previous update as - // well as this update, as the previous config selector still - // references the old cluster when the new one is pushed. - wantJSON: `{"loadBalancingConfig":[{ - "xds_cluster_manager_experimental":{ - "children":{ - "cluster:test-cluster-1":{ - "childPolicy":[{"cds_experimental":{"cluster":"test-cluster-1"}}] - }, - "cluster:cluster_1":{ - "childPolicy":[{"cds_experimental":{"cluster":"cluster_1"}}] - }, - "cluster:cluster_2":{ - "childPolicy":[{"cds_experimental":{"cluster":"cluster_2"}}] - } - } - }}]}`, + // A route configuration with a two new clusters. + routeConfig: &v3routepb.RouteConfiguration{ + Name: rdsName, + VirtualHosts: []*v3routepb.VirtualHost{{ + Domains: []string{ldsName}, + Routes: []*v3routepb.Route{{ + Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}}, + Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{ + ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{WeightedClusters: &v3routepb.WeightedCluster{ + Clusters: []*v3routepb.WeightedCluster_ClusterWeight{ + { + Name: "cluster_1", + Weight: &wrapperspb.UInt32Value{Value: 75}, + }, + { + Name: "cluster_2", + Weight: &wrapperspb.UInt32Value{Value: 25}, + }, + }, + }}, + }}, + }}, + }}, + }, + // This update contains the cluster from the previous update as well + // as this update, as the previous config selector still references + // the old cluster when the new one is pushed. + wantServiceConfig: ` +{ + "loadBalancingConfig": [{ + "xds_cluster_manager_experimental": { + "children": { + "cluster:test-cluster-1": { + "childPolicy": [{ + "cds_experimental": { + "cluster": "test-cluster-1" + } + }] + }, + "cluster:cluster_1": { + "childPolicy": [{ + "cds_experimental": { + "cluster": "cluster_1" + } + }] + }, + "cluster:cluster_2": { + "childPolicy": [{ + "cds_experimental": { + "cluster": "cluster_2" + } + }] + } + } + } + }] +}`, wantClusters: map[string]bool{"cluster:cluster_1": true, "cluster:cluster_2": true}, }, { - routes: []*xdsresource.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsresource.WeightedCluster{ - "cluster_1": {Weight: 75}, - "cluster_2": {Weight: 25}, - }}}, + // A redundant route configuration update. + // TODO(easwars): Do we need this, or can we do something else? Because the xds client might swallow this update. + routeConfig: &v3routepb.RouteConfiguration{ + Name: rdsName, + VirtualHosts: []*v3routepb.VirtualHost{{ + Domains: []string{ldsName}, + Routes: []*v3routepb.Route{{ + Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}}, + Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{ + ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{WeightedClusters: &v3routepb.WeightedCluster{ + Clusters: []*v3routepb.WeightedCluster_ClusterWeight{ + { + Name: "cluster_1", + Weight: &wrapperspb.UInt32Value{Value: 75}, + }, + { + Name: "cluster_2", + Weight: &wrapperspb.UInt32Value{Value: 25}, + }, + }, + }}, + }}, + }}, + }}, + }, // With this redundant update, the old config selector has been // stopped, so there are no more references to the first cluster. // Only the second update's clusters should remain. - wantJSON: `{"loadBalancingConfig":[{ - "xds_cluster_manager_experimental":{ - "children":{ - "cluster:cluster_1":{ - "childPolicy":[{"cds_experimental":{"cluster":"cluster_1"}}] - }, - "cluster:cluster_2":{ - "childPolicy":[{"cds_experimental":{"cluster":"cluster_2"}}] - } - } - }}]}`, + wantServiceConfig: ` +{ + "loadBalancingConfig": [{ + "xds_cluster_manager_experimental": { + "children": { + "cluster:cluster_1": { + "childPolicy": [{ + "cds_experimental": { + "cluster": "cluster_1" + } + }] + }, + "cluster:cluster_2": { + "childPolicy": [{ + "cds_experimental": { + "cluster": "cluster_2" + } + }] + } + } + } + }] +}`, wantClusters: map[string]bool{"cluster:cluster_1": true, "cluster:cluster_2": true}, }, } { - // Invoke the watchAPI callback with a good service update and wait for the - // UpdateState method to be called on the ClientConn. - xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{ - VirtualHosts: []*xdsresource.VirtualHost{ - { - Domains: []string{targetStr}, - Routes: tt.routes, - }, - }, - }, nil) + // Configure the management server with a good listener resource and a + // route configuration resource, as specified by the test case. + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(ldsName, rdsName)}, + Routes: []*v3routepb.RouteConfiguration{tt.routeConfig}, + SkipValidation: true, + } ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - gotState, err := tcc.stateCh.Receive(ctx) + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Read the update pushed by the resolver to the ClientConn. + val, err := tcc.stateCh.Receive(ctx) if err != nil { - t.Fatalf("Error waiting for UpdateState to be called: %v", err) + t.Fatalf("Timeout waiting for an update from the resolver: %v", err) } - rState := gotState.(resolver.State) + rState := val.(resolver.State) if err := rState.ServiceConfig.Err; err != nil { - t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err) + t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err) } - wantSCParsed := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(tt.wantJSON) + wantSCParsed := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(tt.wantServiceConfig) if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { - t.Errorf("ClientConn.UpdateState received different service config") + t.Errorf("Received unexpected service config") t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config)) - t.Error("want: ", cmp.Diff(nil, wantSCParsed.Config)) + t.Fatal("want: ", cmp.Diff(nil, wantSCParsed.Config)) } cs := iresolver.GetConfigSelector(rState) if cs == nil { - t.Error("received nil config selector") - continue + t.Fatal("Received nil config selector in update from resolver") } pickedClusters := make(map[string]bool) @@ -618,15 +855,15 @@ func (s) TestXDSResolverGoodServiceUpdate(t *testing.T) { // with the random number generator stubbed out, we can rely on this // to be 100% reproducible. for i := 0; i < 100; i++ { - res, err := cs.SelectConfig(iresolver.RPCInfo{Context: context.Background()}) + res, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"}) if err != nil { - t.Fatalf("Unexpected error from cs.SelectConfig(_): %v", err) + t.Fatalf("cs.SelectConfig(): %v", err) } cluster := clustermanager.GetPickedClusterForTesting(res.Context) pickedClusters[cluster] = true res.OnCommitted() } - if !reflect.DeepEqual(pickedClusters, tt.wantClusters) { + if !cmp.Equal(pickedClusters, tt.wantClusters) { t.Errorf("Picked clusters: %v; want: %v", pickedClusters, tt.wantClusters) } }