From def3983128b7d214075c543bc7c712263a97add0 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Thu, 19 Jan 2023 09:59:30 -0800 Subject: [PATCH 1/4] xds/resolver: cleanup tests to use real xDS client 5/n --- xds/internal/resolver/xds_resolver_test.go | 651 +++++++++++++-------- 1 file changed, 396 insertions(+), 255 deletions(-) diff --git a/xds/internal/resolver/xds_resolver_test.go b/xds/internal/resolver/xds_resolver_test.go index e35b4b6ecec6..a7701d45942e 100644 --- a/xds/internal/resolver/xds_resolver_test.go +++ b/xds/internal/resolver/xds_resolver_test.go @@ -28,6 +28,7 @@ import ( "time" xxhash "github.com/cespare/xxhash/v2" + "github.com/envoyproxy/go-control-plane/pkg/wellknown" "github.com/google/go-cmp/cmp" "github.com/google/uuid" "google.golang.org/grpc" @@ -58,10 +59,13 @@ import ( "google.golang.org/grpc/xds/internal/xdsclient/bootstrap" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version" + "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/wrapperspb" + v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" + v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3" v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" _ "google.golang.org/grpc/xds/internal/balancer/cdsbalancer" // To parse LB config @@ -1106,54 +1110,137 @@ func (s) TestResolverWRR(t *testing.T) { } } -func (s) TestXDSResolverMaxStreamDuration(t *testing.T) { - xdsR, xdsC, tcc, cancel := testSetup(t, setupOpts{target: target}) - defer xdsR.Close() - defer cancel() +// TestResolverMaxStreamDuration tests the case where the resolver receives max +// stream duration as part of the listener and route configuration resources. +// The test verifies that the RPC timeout returned by the config selector +// matches the value specified in the config. +func (s) TestResolverMaxStreamDuration(t *testing.T) { + mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{}) + if err != nil { + t.Fatal(err) + } + defer mgmtServer.Stop() - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - waitForWatchListener(ctx, t, xdsC, targetStr) - xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{RouteConfigName: routeStr, MaxStreamDuration: time.Second, HTTPFilters: routerFilterList}, nil) - waitForWatchRouteConfig(ctx, t, xdsC, routeStr) + // Create a bootstrap configuration specifying the above management server. + nodeID := uuid.New().String() + cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{ + NodeID: nodeID, + ServerURI: mgmtServer.Address, + Version: xdsbootstrap.TransportV3, + }) + if err != nil { + t.Fatal(err) + } + defer cleanup() - defer func(oldNewWRR func() wrr.WRR) { newWRR = oldNewWRR }(newWRR) - newWRR = testutils.NewTestWRR + const serviceName = "my-service-client-side-xds" + tcc, rClose := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)}) + defer rClose() - // Invoke the watchAPI callback with a good service update and wait for the - // UpdateState method to be called on the ClientConn. - xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{ - VirtualHosts: []*xdsresource.VirtualHost{ - { - Domains: []string{targetStr}, - Routes: []*xdsresource.Route{{ - Prefix: newStringP("/foo"), - WeightedClusters: map[string]xdsresource.WeightedCluster{"A": {Weight: 1}}, - MaxStreamDuration: newDurationP(5 * time.Second), - }, { - Prefix: newStringP("/bar"), - WeightedClusters: map[string]xdsresource.WeightedCluster{"B": {Weight: 1}}, - MaxStreamDuration: newDurationP(0), - }, { - Prefix: newStringP(""), - WeightedClusters: map[string]xdsresource.WeightedCluster{"C": {Weight: 1}}, - }}, + // Configure the management server with a listener resource that specifies a + // max stream duration as part of its HTTP connection manager. Also + // configure a route configuration resource, which has multiple routes with + // different values of max stream duration. + ldsName := serviceName + rdsName := "route-" + serviceName + hcm := testutils.MarshalAny(&v3httppb.HttpConnectionManager{ + RouteSpecifier: &v3httppb.HttpConnectionManager_Rds{Rds: &v3httppb.Rds{ + ConfigSource: &v3corepb.ConfigSource{ + ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{Ads: &v3corepb.AggregatedConfigSource{}}, }, + RouteConfigName: rdsName, + }}, + HttpFilters: []*v3httppb.HttpFilter{e2e.RouterHTTPFilter}, + CommonHttpProtocolOptions: &v3corepb.HttpProtocolOptions{ + MaxStreamDuration: durationpb.New(1 * time.Second), }, - }, nil) + }) + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{{ + Name: ldsName, + ApiListener: &v3listenerpb.ApiListener{ApiListener: hcm}, + FilterChains: []*v3listenerpb.FilterChain{{ + Name: "filter-chain-name", + Filters: []*v3listenerpb.Filter{{ + Name: wellknown.HTTPConnectionManager, + ConfigType: &v3listenerpb.Filter_TypedConfig{TypedConfig: hcm}, + }}, + }}, + }}, + Routes: []*v3routepb.RouteConfiguration{{ + Name: rdsName, + VirtualHosts: []*v3routepb.VirtualHost{{ + Domains: []string{ldsName}, + Routes: []*v3routepb.Route{ + { + Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/foo"}}, + Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{ + ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{WeightedClusters: &v3routepb.WeightedCluster{ + Clusters: []*v3routepb.WeightedCluster_ClusterWeight{ + { + Name: "A", + Weight: &wrapperspb.UInt32Value{Value: 100}, + }, + }}, + }, + MaxStreamDuration: &v3routepb.RouteAction_MaxStreamDuration{ + MaxStreamDuration: durationpb.New(5 * time.Second), + }, + }}, + }, + { + Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/bar"}}, + Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{ + ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{WeightedClusters: &v3routepb.WeightedCluster{ + Clusters: []*v3routepb.WeightedCluster_ClusterWeight{ + { + Name: "B", + Weight: &wrapperspb.UInt32Value{Value: 100}, + }, + }}, + }, + MaxStreamDuration: &v3routepb.RouteAction_MaxStreamDuration{ + MaxStreamDuration: durationpb.New(0 * time.Second), + }, + }}, + }, + { + Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}}, + Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{ + ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{WeightedClusters: &v3routepb.WeightedCluster{ + Clusters: []*v3routepb.WeightedCluster_ClusterWeight{ + { + Name: "C", + Weight: &wrapperspb.UInt32Value{Value: 100}, + }, + }}, + }, + }}, + }, + }, + }}, + }}, + SkipValidation: true, + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + // Read the update pushed by the resolver to the ClientConn. gotState, err := tcc.stateCh.Receive(ctx) if err != nil { - t.Fatalf("Error waiting for UpdateState to be called: %v", err) + t.Fatalf("Timeout waiting for an update from the resolver: %v", err) } rState := gotState.(resolver.State) if err := rState.ServiceConfig.Err; err != nil { - t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err) + t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err) } - cs := iresolver.GetConfigSelector(rState) if cs == nil { - t.Fatal("received nil config selector") + t.Fatal("Received nil config selector in update from resolver") } testCases := []struct { @@ -1178,295 +1265,349 @@ func (s) TestXDSResolverMaxStreamDuration(t *testing.T) { t.Run(tc.name, func(t *testing.T) { req := iresolver.RPCInfo{ Method: tc.method, - Context: context.Background(), + Context: ctx, } res, err := cs.SelectConfig(req) if err != nil { - t.Errorf("Unexpected error from cs.SelectConfig(%v): %v", req, err) + t.Errorf("cs.SelectConfig(%v): %v", req, err) return } res.OnCommitted() got := res.MethodConfig.Timeout - if !reflect.DeepEqual(got, tc.want) { + if !cmp.Equal(got, tc.want) { t.Errorf("For method %q: res.MethodConfig.Timeout = %v; want %v", tc.method, got, tc.want) } }) } } -// TestXDSResolverDelayedOnCommitted tests that clusters remain in service +// TestResolverDelayedOnCommitted tests that clusters remain in service // config if RPCs are in flight. -func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) { - xdsR, xdsC, tcc, cancel := testSetup(t, setupOpts{target: target}) - defer xdsR.Close() - defer cancel() +func (s) TestResolverDelayedOnCommitted(t *testing.T) { + mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{}) + if err != nil { + t.Fatal(err) + } + defer mgmtServer.Stop() + // Create a bootstrap configuration specifying the above management server. + nodeID := uuid.New().String() + cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{ + NodeID: nodeID, + ServerURI: mgmtServer.Address, + Version: xdsbootstrap.TransportV3, + }) + if err != nil { + t.Fatal(err) + } + defer cleanup() + + const serviceName = "my-service-client-side-xds" + tcc, rClose := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)}) + defer rClose() + + // Configure the management server with a good listener and route + // configuration resource. + ldsName := serviceName + rdsName := "route-" + serviceName + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(ldsName, rdsName)}, + Routes: []*v3routepb.RouteConfiguration{{ + Name: rdsName, + VirtualHosts: []*v3routepb.VirtualHost{{ + Domains: []string{ldsName}, + Routes: []*v3routepb.Route{{ + Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}}, + Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{ + ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{WeightedClusters: &v3routepb.WeightedCluster{ + Clusters: []*v3routepb.WeightedCluster_ClusterWeight{ + { + Name: "OLD-CLUSTER", + Weight: &wrapperspb.UInt32Value{Value: 100}, + }, + }, + }}, + }}, + }}, + }}, + }}, + SkipValidation: true, + } ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - waitForWatchListener(ctx, t, xdsC, targetStr) - xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{RouteConfigName: routeStr, HTTPFilters: routerFilterList}, nil) - waitForWatchRouteConfig(ctx, t, xdsC, routeStr) - - // Invoke the watchAPI callback with a good service update and wait for the - // UpdateState method to be called on the ClientConn. - xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{ - VirtualHosts: []*xdsresource.VirtualHost{ - { - Domains: []string{targetStr}, - Routes: []*xdsresource.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsresource.WeightedCluster{"test-cluster-1": {Weight: 1}}}}, - }, - }, - }, nil) + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } - gotState, err := tcc.stateCh.Receive(ctx) + // Read the update pushed by the resolver to the ClientConn. + val, err := tcc.stateCh.Receive(ctx) if err != nil { - t.Fatalf("Error waiting for UpdateState to be called: %v", err) + t.Fatalf("Timeout waiting for an update from the resolver: %v", err) } - rState := gotState.(resolver.State) + rState := val.(resolver.State) if err := rState.ServiceConfig.Err; err != nil { - t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err) + t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err) } - - wantJSON := `{"loadBalancingConfig":[{ - "xds_cluster_manager_experimental":{ - "children":{ - "cluster:test-cluster-1":{ - "childPolicy":[{"cds_experimental":{"cluster":"test-cluster-1"}}] - } - } - }}]}` - wantSCParsed := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(wantJSON) + wantSCParsed := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(` +{ + "loadBalancingConfig": [ + { + "xds_cluster_manager_experimental": { + "children": { + "cluster:OLD-CLUSTER": { + "childPolicy": [ + { + "cds_experimental": { + "cluster": "OLD-CLUSTER" + } + } + ] + } + } + } + } + ] +}`) if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { - t.Errorf("ClientConn.UpdateState received different service config") + t.Errorf("Received unexpected service config") t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config)) t.Fatal("want: ", cmp.Diff(nil, wantSCParsed.Config)) } + // Make an RPC, but do not commit it yet. cs := iresolver.GetConfigSelector(rState) if cs == nil { - t.Fatal("received nil config selector") + t.Fatal("Received nil config selector in update from resolver") } - - res, err := cs.SelectConfig(iresolver.RPCInfo{Context: context.Background()}) + resOld, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"}) if err != nil { - t.Fatalf("Unexpected error from cs.SelectConfig(_): %v", err) + t.Fatalf("cs.SelectConfig(): %v", err) } - cluster := clustermanager.GetPickedClusterForTesting(res.Context) - if cluster != "cluster:test-cluster-1" { - t.Fatalf("") + if cluster := clustermanager.GetPickedClusterForTesting(resOld.Context); cluster != "cluster:OLD-CLUSTER" { + t.Fatalf("Picked cluster is %q, want %q", cluster, "cluster:OLD-CLUSTER") } - // delay res.OnCommitted() - - // Perform TWO updates to ensure the old config selector does not hold a - // reference to test-cluster-1. - xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{ - VirtualHosts: []*xdsresource.VirtualHost{ - { - Domains: []string{targetStr}, - Routes: []*xdsresource.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsresource.WeightedCluster{"NEW": {Weight: 1}}}}, - }, - }, - }, nil) - tcc.stateCh.Receive(ctx) // Ignore the first update. + // delay resOld.OnCommitted() - xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{ - VirtualHosts: []*xdsresource.VirtualHost{ - { - Domains: []string{targetStr}, - Routes: []*xdsresource.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsresource.WeightedCluster{"NEW": {Weight: 1}}}}, - }, - }, - }, nil) + // Update the route configuration resource on the management server to + // return a new cluster. + resources = e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(ldsName, rdsName)}, + Routes: []*v3routepb.RouteConfiguration{{ + Name: rdsName, + VirtualHosts: []*v3routepb.VirtualHost{{ + Domains: []string{ldsName}, + Routes: []*v3routepb.Route{{ + Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}}, + Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{ + ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{WeightedClusters: &v3routepb.WeightedCluster{ + Clusters: []*v3routepb.WeightedCluster_ClusterWeight{ + { + Name: "NEW-CLUSTER", + Weight: &wrapperspb.UInt32Value{Value: 100}, + }, + }, + }}, + }}, + }}, + }}, + }}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } - gotState, err = tcc.stateCh.Receive(ctx) + // Read the update pushed by the resolver to the ClientConn and ensure the + // old cluster is present in the service config. Also ensure that the newly + // returned config selector does not hold a reference to the old cluster. + val, err = tcc.stateCh.Receive(ctx) if err != nil { - t.Fatalf("Error waiting for UpdateState to be called: %v", err) + t.Fatalf("Timeout waiting for an update from the resolver: %v", err) } - rState = gotState.(resolver.State) + rState = val.(resolver.State) if err := rState.ServiceConfig.Err; err != nil { - t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err) + t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err) } - wantJSON2 := `{"loadBalancingConfig":[{ - "xds_cluster_manager_experimental":{ - "children":{ - "cluster:test-cluster-1":{ - "childPolicy":[{"cds_experimental":{"cluster":"test-cluster-1"}}] - }, - "cluster:NEW":{ - "childPolicy":[{"cds_experimental":{"cluster":"NEW"}}] - } - } - }}]}` - wantSCParsed2 := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(wantJSON2) - if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed2.Config) { - t.Errorf("ClientConn.UpdateState received different service config") + wantSCParsed = internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(` +{ + "loadBalancingConfig": [ + { + "xds_cluster_manager_experimental": { + "children": { + "cluster:OLD-CLUSTER": { + "childPolicy": [ + { + "cds_experimental": { + "cluster": "OLD-CLUSTER" + } + } + ] + }, + "cluster:NEW-CLUSTER": { + "childPolicy": [ + { + "cds_experimental": { + "cluster": "NEW-CLUSTER" + } + } + ] + } + } + } + } + ] +}`) + if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { + t.Errorf("Received unexpected service config") t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config)) - t.Fatal("want: ", cmp.Diff(nil, wantSCParsed2.Config)) + t.Fatal("want: ", cmp.Diff(nil, wantSCParsed.Config)) } - // Invoke OnCommitted; should lead to a service config update that deletes - // test-cluster-1. - res.OnCommitted() + cs = iresolver.GetConfigSelector(rState) + if cs == nil { + t.Fatal("Received nil config selector in update from resolver") + } + resNew, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"}) + if err != nil { + t.Fatalf("cs.SelectConfig(): %v", err) + } + if cluster := clustermanager.GetPickedClusterForTesting(resNew.Context); cluster != "cluster:NEW-CLUSTER" { + t.Fatalf("Picked cluster is %q, want %q", cluster, "cluster:NEW-CLUSTER") + } - xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{ - VirtualHosts: []*xdsresource.VirtualHost{ - { - Domains: []string{targetStr}, - Routes: []*xdsresource.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsresource.WeightedCluster{"NEW": {Weight: 1}}}}, - }, - }, - }, nil) - gotState, err = tcc.stateCh.Receive(ctx) + // Invoke OnCommitted on the old RPC; should lead to a service config update + // that deletes the old cluster. + resOld.OnCommitted() + + val, err = tcc.stateCh.Receive(ctx) if err != nil { - t.Fatalf("Error waiting for UpdateState to be called: %v", err) + t.Fatalf("Timeout waiting for an update from the resolver: %v", err) } - rState = gotState.(resolver.State) + rState = val.(resolver.State) if err := rState.ServiceConfig.Err; err != nil { - t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err) + t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err) } - wantJSON3 := `{"loadBalancingConfig":[{ - "xds_cluster_manager_experimental":{ - "children":{ - "cluster:NEW":{ - "childPolicy":[{"cds_experimental":{"cluster":"NEW"}}] - } - } - }}]}` - wantSCParsed3 := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(wantJSON3) - if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed3.Config) { - t.Errorf("ClientConn.UpdateState received different service config") + wantSCParsed = internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(` +{ + "loadBalancingConfig": [ + { + "xds_cluster_manager_experimental": { + "children": { + "cluster:NEW-CLUSTER": { + "childPolicy": [ + { + "cds_experimental": { + "cluster": "NEW-CLUSTER" + } + } + ] + } + } + } + } + ] +}`) + if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { + t.Errorf("Received unexpected service config") t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config)) - t.Fatal("want: ", cmp.Diff(nil, wantSCParsed3.Config)) + t.Fatal("want: ", cmp.Diff(nil, wantSCParsed.Config)) } } -// TestXDSResolverUpdates tests the cases where the resolver gets a good update -// after an error, and an error after the good update. -func (s) TestXDSResolverGoodUpdateAfterError(t *testing.T) { - xdsR, xdsC, tcc, cancel := testSetup(t, setupOpts{target: target}) - defer xdsR.Close() - defer cancel() - - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - waitForWatchListener(ctx, t, xdsC, targetStr) - xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{RouteConfigName: routeStr, HTTPFilters: routerFilterList}, nil) - waitForWatchRouteConfig(ctx, t, xdsC, routeStr) - - // Invoke the watchAPI callback with a bad service update and wait for the - // ReportError method to be called on the ClientConn. - suErr := errors.New("bad serviceupdate") - xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{}, suErr) - - if gotErrVal, gotErr := tcc.errorCh.Receive(ctx); gotErr != nil || gotErrVal != suErr { - t.Fatalf("ClientConn.ReportError() received %v, want %v", gotErrVal, suErr) - } - - // Invoke the watchAPI callback with a good service update and wait for the - // UpdateState method to be called on the ClientConn. - xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{ - VirtualHosts: []*xdsresource.VirtualHost{ - { - Domains: []string{targetStr}, - Routes: []*xdsresource.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsresource.WeightedCluster{cluster: {Weight: 1}}}}, - }, - }, - }, nil) - gotState, err := tcc.stateCh.Receive(ctx) +// TestResolverMultipleLDSUpdates tests the case where two LDS updates with the +// same RDS name to watch are received without an RDS in between. Those LDS +// updates shouldn't trigger service config update. +func (s) TestResolverMultipleLDSUpdates(t *testing.T) { + mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{}) if err != nil { - t.Fatalf("Error waiting for UpdateState to be called: %v", err) - } - rState := gotState.(resolver.State) - if err := rState.ServiceConfig.Err; err != nil { - t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err) + t.Fatal(err) } + defer mgmtServer.Stop() - // Invoke the watchAPI callback with a bad service update and wait for the - // ReportError method to be called on the ClientConn. - suErr2 := errors.New("bad serviceupdate 2") - xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{}, suErr2) - if gotErrVal, gotErr := tcc.errorCh.Receive(ctx); gotErr != nil || gotErrVal != suErr2 { - t.Fatalf("ClientConn.ReportError() received %v, want %v", gotErrVal, suErr2) + // Create a bootstrap configuration specifying the above management server. + nodeID := uuid.New().String() + cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{ + NodeID: nodeID, + ServerURI: mgmtServer.Address, + Version: xdsbootstrap.TransportV3, + }) + if err != nil { + t.Fatal(err) } -} + defer cleanup() -// TestXDSResolverResourceNotFoundError tests the cases where the resolver gets -// a ResourceNotFoundError. It should generate a service config picking -// weighted_target, but no child balancers. -func (s) TestXDSResolverResourceNotFoundError(t *testing.T) { - xdsR, xdsC, tcc, cancel := testSetup(t, setupOpts{target: target}) - defer xdsR.Close() - defer cancel() + // Build an xDS resolver that uses the above bootstrap configuration + // Creating the xDS resolver should result in creation of the xDS client. + const serviceName = "my-service-client-side-xds" + tcc, rClose := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)}) + defer rClose() + // Configure the management server with a listener resource, but not route + // configuration resource. + ldsName := serviceName + rdsName := "route-" + serviceName + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(ldsName, rdsName)}, + SkipValidation: true, + } ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - waitForWatchListener(ctx, t, xdsC, targetStr) - xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{RouteConfigName: routeStr, HTTPFilters: routerFilterList}, nil) - waitForWatchRouteConfig(ctx, t, xdsC, routeStr) - - // Invoke the watchAPI callback with a bad service update and wait for the - // ReportError method to be called on the ClientConn. - suErr := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource removed error") - xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{}, suErr) + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + // Ensure there is no update from the resolver. sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) defer sCancel() - if gotErrVal, gotErr := tcc.errorCh.Receive(sCtx); gotErr != context.DeadlineExceeded { - t.Fatalf("ClientConn.ReportError() received %v, %v, want channel recv timeout", gotErrVal, gotErr) + gotState, err := tcc.stateCh.Receive(sCtx) + if err == nil { + t.Fatalf("Received update fro resolver %v when none expected", gotState) } - ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - gotState, err := tcc.stateCh.Receive(ctx) - if err != nil { - t.Fatalf("Error waiting for UpdateState to be called: %v", err) - } - rState := gotState.(resolver.State) - wantParsedConfig := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)("{}") - if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantParsedConfig.Config) { - t.Error("ClientConn.UpdateState got wrong service config") - t.Errorf("gotParsed: %s", cmp.Diff(nil, rState.ServiceConfig.Config)) - t.Errorf("wantParsed: %s", cmp.Diff(nil, wantParsedConfig.Config)) - } - if err := rState.ServiceConfig.Err; err != nil { - t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err) + // Configure the management server with a listener resource that points to + // the same route configuration resource but has different values for some + // other fields. There is still no route configuration resource on the + // management server. + hcm := testutils.MarshalAny(&v3httppb.HttpConnectionManager{ + RouteSpecifier: &v3httppb.HttpConnectionManager_Rds{Rds: &v3httppb.Rds{ + ConfigSource: &v3corepb.ConfigSource{ + ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{Ads: &v3corepb.AggregatedConfigSource{}}, + }, + RouteConfigName: rdsName, + }}, + HttpFilters: []*v3httppb.HttpFilter{e2e.RouterHTTPFilter}, + CommonHttpProtocolOptions: &v3corepb.HttpProtocolOptions{ + MaxStreamDuration: durationpb.New(1 * time.Second), + }, + }) + resources = e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{{ + Name: ldsName, + ApiListener: &v3listenerpb.ApiListener{ApiListener: hcm}, + FilterChains: []*v3listenerpb.FilterChain{{ + Name: "filter-chain-name", + Filters: []*v3listenerpb.Filter{{ + Name: wellknown.HTTPConnectionManager, + ConfigType: &v3listenerpb.Filter_TypedConfig{TypedConfig: hcm}, + }}, + }}, + }}, + SkipValidation: true, } -} - -// TestXDSResolverMultipleLDSUpdates tests the case where two LDS updates with -// the same RDS name to watch are received without an RDS in between. Those LDS -// updates shouldn't trigger service config update. -// -// This test case also makes sure the resolver doesn't panic. -func (s) TestXDSResolverMultipleLDSUpdates(t *testing.T) { - xdsR, xdsC, tcc, cancel := testSetup(t, setupOpts{target: target}) - defer xdsR.Close() - defer cancel() - - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - waitForWatchListener(ctx, t, xdsC, targetStr) - xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{RouteConfigName: routeStr, HTTPFilters: routerFilterList}, nil) - waitForWatchRouteConfig(ctx, t, xdsC, routeStr) - defer replaceRandNumGenerator(0)() - - // Send a new LDS update, with the same fields. - xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{RouteConfigName: routeStr, HTTPFilters: routerFilterList}, nil) - ctx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) - defer cancel() - // Should NOT trigger a state update. - gotState, err := tcc.stateCh.Receive(ctx) - if err == nil { - t.Fatalf("ClientConn.UpdateState received %v, want timeout error", gotState) + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) } - // Send a new LDS update, with the same RDS name, but different fields. - xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{RouteConfigName: routeStr, MaxStreamDuration: time.Second, HTTPFilters: routerFilterList}, nil) - ctx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) - defer cancel() - gotState, err = tcc.stateCh.Receive(ctx) + // Ensure that there is no update from the resolver. + sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout) + defer sCancel() + gotState, err = tcc.stateCh.Receive(sCtx) if err == nil { - t.Fatalf("ClientConn.UpdateState received %v, want timeout error", gotState) + t.Fatalf("Received update fro resolver %v when none expected", gotState) } } From 6df6a551de5d18ed55973db1c4fd52d146d51068 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Mon, 23 Jan 2023 16:56:04 -0800 Subject: [PATCH 2/4] review comment #1 --- xds/internal/resolver/xds_resolver_test.go | 67 ++++++++++------------ 1 file changed, 29 insertions(+), 38 deletions(-) diff --git a/xds/internal/resolver/xds_resolver_test.go b/xds/internal/resolver/xds_resolver_test.go index a7701d45942e..ac754783d94d 100644 --- a/xds/internal/resolver/xds_resolver_test.go +++ b/xds/internal/resolver/xds_resolver_test.go @@ -837,9 +837,7 @@ func (s) TestResolverRemovedWithRPCs(t *testing.T) { ] }`) if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { - t.Errorf("Received unexpected service config") - t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config)) - t.Fatal("want: ", cmp.Diff(nil, wantSCParsed.Config)) + t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, rState.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config)) } cs := iresolver.GetConfigSelector(rState) @@ -870,9 +868,7 @@ func (s) TestResolverRemovedWithRPCs(t *testing.T) { t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err) } if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { - t.Errorf("Received unexpected service config") - t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config)) - t.Fatal("want: ", cmp.Diff(nil, wantSCParsed.Config)) + t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, rState.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config)) } cs = iresolver.GetConfigSelector(rState) if cs == nil { @@ -899,9 +895,7 @@ func (s) TestResolverRemovedWithRPCs(t *testing.T) { } wantSCParsed = internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(`{}`) if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { - t.Errorf("Received unexpected service config") - t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config)) - t.Fatal("want: ", cmp.Diff(nil, wantSCParsed.Config)) + t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, rState.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config)) } } @@ -1323,7 +1317,7 @@ func (s) TestResolverDelayedOnCommitted(t *testing.T) { ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{WeightedClusters: &v3routepb.WeightedCluster{ Clusters: []*v3routepb.WeightedCluster_ClusterWeight{ { - Name: "OLD-CLUSTER", + Name: "old-cluster", Weight: &wrapperspb.UInt32Value{Value: 100}, }, }, @@ -1355,11 +1349,11 @@ func (s) TestResolverDelayedOnCommitted(t *testing.T) { { "xds_cluster_manager_experimental": { "children": { - "cluster:OLD-CLUSTER": { + "cluster:old-cluster": { "childPolicy": [ { "cds_experimental": { - "cluster": "OLD-CLUSTER" + "cluster": "old-cluster" } } ] @@ -1370,9 +1364,7 @@ func (s) TestResolverDelayedOnCommitted(t *testing.T) { ] }`) if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { - t.Errorf("Received unexpected service config") - t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config)) - t.Fatal("want: ", cmp.Diff(nil, wantSCParsed.Config)) + t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, rState.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config)) } // Make an RPC, but do not commit it yet. @@ -1384,10 +1376,12 @@ func (s) TestResolverDelayedOnCommitted(t *testing.T) { if err != nil { t.Fatalf("cs.SelectConfig(): %v", err) } - if cluster := clustermanager.GetPickedClusterForTesting(resOld.Context); cluster != "cluster:OLD-CLUSTER" { - t.Fatalf("Picked cluster is %q, want %q", cluster, "cluster:OLD-CLUSTER") + if cluster := clustermanager.GetPickedClusterForTesting(resOld.Context); cluster != "cluster:old-cluster" { + t.Fatalf("Picked cluster is %q, want %q", cluster, "cluster:old-cluster") } - // delay resOld.OnCommitted() + + // Delay resOld.OnCommitted(). As long as there are pending RPCs to removed + // clusters, they still appear in the service config. // Update the route configuration resource on the management server to // return a new cluster. @@ -1404,7 +1398,7 @@ func (s) TestResolverDelayedOnCommitted(t *testing.T) { ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{WeightedClusters: &v3routepb.WeightedCluster{ Clusters: []*v3routepb.WeightedCluster_ClusterWeight{ { - Name: "NEW-CLUSTER", + Name: "new-cluster", Weight: &wrapperspb.UInt32Value{Value: 100}, }, }, @@ -1436,20 +1430,20 @@ func (s) TestResolverDelayedOnCommitted(t *testing.T) { { "xds_cluster_manager_experimental": { "children": { - "cluster:OLD-CLUSTER": { + "cluster:old-cluster": { "childPolicy": [ { "cds_experimental": { - "cluster": "OLD-CLUSTER" + "cluster": "old-cluster" } } ] }, - "cluster:NEW-CLUSTER": { + "cluster:new-cluster": { "childPolicy": [ { "cds_experimental": { - "cluster": "NEW-CLUSTER" + "cluster": "new-cluster" } } ] @@ -1460,9 +1454,7 @@ func (s) TestResolverDelayedOnCommitted(t *testing.T) { ] }`) if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { - t.Errorf("Received unexpected service config") - t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config)) - t.Fatal("want: ", cmp.Diff(nil, wantSCParsed.Config)) + t.Fatalf("Got service config:\n%s\nWant service config:\n%s", cmp.Diff(nil, rState.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config)) } cs = iresolver.GetConfigSelector(rState) @@ -1473,12 +1465,13 @@ func (s) TestResolverDelayedOnCommitted(t *testing.T) { if err != nil { t.Fatalf("cs.SelectConfig(): %v", err) } - if cluster := clustermanager.GetPickedClusterForTesting(resNew.Context); cluster != "cluster:NEW-CLUSTER" { - t.Fatalf("Picked cluster is %q, want %q", cluster, "cluster:NEW-CLUSTER") + if cluster := clustermanager.GetPickedClusterForTesting(resNew.Context); cluster != "cluster:new-cluster" { + t.Fatalf("Picked cluster is %q, want %q", cluster, "cluster:new-cluster") } // Invoke OnCommitted on the old RPC; should lead to a service config update - // that deletes the old cluster. + // that deletes the old cluster, as the old cluster no longer has any + // pending RPCs. resOld.OnCommitted() val, err = tcc.stateCh.Receive(ctx) @@ -1495,11 +1488,11 @@ func (s) TestResolverDelayedOnCommitted(t *testing.T) { { "xds_cluster_manager_experimental": { "children": { - "cluster:NEW-CLUSTER": { + "cluster:new-cluster": { "childPolicy": [ { "cds_experimental": { - "cluster": "NEW-CLUSTER" + "cluster": "new-cluster" } } ] @@ -1510,15 +1503,13 @@ func (s) TestResolverDelayedOnCommitted(t *testing.T) { ] }`) if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { - t.Errorf("Received unexpected service config") - t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config)) - t.Fatal("want: ", cmp.Diff(nil, wantSCParsed.Config)) + t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, rState.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config)) } } // TestResolverMultipleLDSUpdates tests the case where two LDS updates with the // same RDS name to watch are received without an RDS in between. Those LDS -// updates shouldn't trigger service config update. +// updates shouldn't trigger a service config update. func (s) TestResolverMultipleLDSUpdates(t *testing.T) { mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{}) if err != nil { @@ -1544,7 +1535,7 @@ func (s) TestResolverMultipleLDSUpdates(t *testing.T) { tcc, rClose := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)}) defer rClose() - // Configure the management server with a listener resource, but not route + // Configure the management server with a listener resource, but no route // configuration resource. ldsName := serviceName rdsName := "route-" + serviceName @@ -1564,7 +1555,7 @@ func (s) TestResolverMultipleLDSUpdates(t *testing.T) { defer sCancel() gotState, err := tcc.stateCh.Receive(sCtx) if err == nil { - t.Fatalf("Received update fro resolver %v when none expected", gotState) + t.Fatalf("Received update from resolver %v when none expected", gotState) } // Configure the management server with a listener resource that points to @@ -1607,7 +1598,7 @@ func (s) TestResolverMultipleLDSUpdates(t *testing.T) { defer sCancel() gotState, err = tcc.stateCh.Receive(sCtx) if err == nil { - t.Fatalf("Received update fro resolver %v when none expected", gotState) + t.Fatalf("Received update from resolver %v when none expected", gotState) } } From fe18efeef913087c4f3f87c578e15e71c36f46c5 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Tue, 24 Jan 2023 13:53:13 -0800 Subject: [PATCH 3/4] update e2e.DefaultRouteConfig() where possible --- internal/testutils/xds/e2e/clientresources.go | 9 ++- xds/internal/resolver/xds_resolver_test.go | 72 +++---------------- .../e2e_test/federation_watchers_test.go | 2 +- .../xdsclient/e2e_test/misc_watchers_test.go | 4 +- .../xdsclient/e2e_test/rds_watchers_test.go | 20 +++--- 5 files changed, 30 insertions(+), 77 deletions(-) diff --git a/internal/testutils/xds/e2e/clientresources.go b/internal/testutils/xds/e2e/clientresources.go index 2dacebb14653..9d9012e23838 100644 --- a/internal/testutils/xds/e2e/clientresources.go +++ b/internal/testutils/xds/e2e/clientresources.go @@ -296,7 +296,14 @@ func DefaultRouteConfig(routeName, ldsTarget, clusterName string) *v3routepb.Rou Routes: []*v3routepb.Route{{ Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}}, Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{ - ClusterSpecifier: &v3routepb.RouteAction_Cluster{Cluster: clusterName}, + ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{WeightedClusters: &v3routepb.WeightedCluster{ + Clusters: []*v3routepb.WeightedCluster_ClusterWeight{ + { + Name: clusterName, + Weight: &wrapperspb.UInt32Value{Value: 100}, + }, + }, + }}, }}, }}, }}, diff --git a/xds/internal/resolver/xds_resolver_test.go b/xds/internal/resolver/xds_resolver_test.go index ac754783d94d..943abebf2d79 100644 --- a/xds/internal/resolver/xds_resolver_test.go +++ b/xds/internal/resolver/xds_resolver_test.go @@ -778,27 +778,9 @@ func (s) TestResolverRemovedWithRPCs(t *testing.T) { // Configure the management server with a good listener and route // configuration resource. resources := e2e.UpdateOptions{ - NodeID: nodeID, - Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(ldsName, rdsName)}, - Routes: []*v3routepb.RouteConfiguration{{ - Name: rdsName, - VirtualHosts: []*v3routepb.VirtualHost{{ - Domains: []string{ldsName}, - Routes: []*v3routepb.Route{{ - Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}}, - Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{ - ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{WeightedClusters: &v3routepb.WeightedCluster{ - Clusters: []*v3routepb.WeightedCluster_ClusterWeight{ - { - Name: "test-cluster-1", - Weight: &wrapperspb.UInt32Value{Value: 100}, - }, - }, - }}, - }}, - }}, - }}, - }}, + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(ldsName, rdsName)}, + Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(rdsName, ldsName, "test-cluster-1")}, SkipValidation: true, } ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) @@ -1305,27 +1287,9 @@ func (s) TestResolverDelayedOnCommitted(t *testing.T) { ldsName := serviceName rdsName := "route-" + serviceName resources := e2e.UpdateOptions{ - NodeID: nodeID, - Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(ldsName, rdsName)}, - Routes: []*v3routepb.RouteConfiguration{{ - Name: rdsName, - VirtualHosts: []*v3routepb.VirtualHost{{ - Domains: []string{ldsName}, - Routes: []*v3routepb.Route{{ - Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}}, - Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{ - ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{WeightedClusters: &v3routepb.WeightedCluster{ - Clusters: []*v3routepb.WeightedCluster_ClusterWeight{ - { - Name: "old-cluster", - Weight: &wrapperspb.UInt32Value{Value: 100}, - }, - }, - }}, - }}, - }}, - }}, - }}, + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(ldsName, rdsName)}, + Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(rdsName, ldsName, "old-cluster")}, SkipValidation: true, } ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) @@ -1386,27 +1350,9 @@ func (s) TestResolverDelayedOnCommitted(t *testing.T) { // Update the route configuration resource on the management server to // return a new cluster. resources = e2e.UpdateOptions{ - NodeID: nodeID, - Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(ldsName, rdsName)}, - Routes: []*v3routepb.RouteConfiguration{{ - Name: rdsName, - VirtualHosts: []*v3routepb.VirtualHost{{ - Domains: []string{ldsName}, - Routes: []*v3routepb.Route{{ - Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}}, - Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{ - ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{WeightedClusters: &v3routepb.WeightedCluster{ - Clusters: []*v3routepb.WeightedCluster_ClusterWeight{ - { - Name: "new-cluster", - Weight: &wrapperspb.UInt32Value{Value: 100}, - }, - }, - }}, - }}, - }}, - }}, - }}, + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(ldsName, rdsName)}, + Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(rdsName, ldsName, "new-cluster")}, SkipValidation: true, } if err := mgmtServer.Update(ctx, resources); err != nil { diff --git a/xds/internal/xdsclient/e2e_test/federation_watchers_test.go b/xds/internal/xdsclient/e2e_test/federation_watchers_test.go index 437edebe1392..e2e6d5dabc31 100644 --- a/xds/internal/xdsclient/e2e_test/federation_watchers_test.go +++ b/xds/internal/xdsclient/e2e_test/federation_watchers_test.go @@ -187,7 +187,7 @@ func (s) TestFederation_RouteConfigResourceContextParamOrder(t *testing.T) { { Prefix: newStringP("/"), ActionType: xdsresource.RouteActionRoute, - WeightedClusters: map[string]xdsresource.WeightedCluster{"cluster-resource": {Weight: 1}}, + WeightedClusters: map[string]xdsresource.WeightedCluster{"cluster-resource": {Weight: 100}}, }, }, }, diff --git a/xds/internal/xdsclient/e2e_test/misc_watchers_test.go b/xds/internal/xdsclient/e2e_test/misc_watchers_test.go index e761e70a6246..fd08e33bbaab 100644 --- a/xds/internal/xdsclient/e2e_test/misc_watchers_test.go +++ b/xds/internal/xdsclient/e2e_test/misc_watchers_test.go @@ -96,7 +96,7 @@ func (s) TestWatchCallAnotherWatch(t *testing.T) { { Prefix: newStringP("/"), ActionType: xdsresource.RouteActionRoute, - WeightedClusters: map[string]xdsresource.WeightedCluster{cdsName: {Weight: 1}}, + WeightedClusters: map[string]xdsresource.WeightedCluster{cdsName: {Weight: 100}}, }, }, }, @@ -112,7 +112,7 @@ func (s) TestWatchCallAnotherWatch(t *testing.T) { { Prefix: newStringP("/"), ActionType: xdsresource.RouteActionRoute, - WeightedClusters: map[string]xdsresource.WeightedCluster{cdsName: {Weight: 1}}, + WeightedClusters: map[string]xdsresource.WeightedCluster{cdsName: {Weight: 100}}, }, }, }, diff --git a/xds/internal/xdsclient/e2e_test/rds_watchers_test.go b/xds/internal/xdsclient/e2e_test/rds_watchers_test.go index 47fcb2fe2e39..79d2bd7edbe6 100644 --- a/xds/internal/xdsclient/e2e_test/rds_watchers_test.go +++ b/xds/internal/xdsclient/e2e_test/rds_watchers_test.go @@ -142,7 +142,7 @@ func (s) TestRDSWatch(t *testing.T) { { Prefix: newStringP("/"), ActionType: xdsresource.RouteActionRoute, - WeightedClusters: map[string]xdsresource.WeightedCluster{cdsName: {Weight: 1}}, + WeightedClusters: map[string]xdsresource.WeightedCluster{cdsName: {Weight: 100}}, }, }, }, @@ -165,7 +165,7 @@ func (s) TestRDSWatch(t *testing.T) { { Prefix: newStringP("/"), ActionType: xdsresource.RouteActionRoute, - WeightedClusters: map[string]xdsresource.WeightedCluster{cdsNameNewStyle: {Weight: 1}}, + WeightedClusters: map[string]xdsresource.WeightedCluster{cdsNameNewStyle: {Weight: 100}}, }, }, }, @@ -281,7 +281,7 @@ func (s) TestRDSWatch_TwoWatchesForSameResourceName(t *testing.T) { { Prefix: newStringP("/"), ActionType: xdsresource.RouteActionRoute, - WeightedClusters: map[string]xdsresource.WeightedCluster{cdsName: {Weight: 1}}, + WeightedClusters: map[string]xdsresource.WeightedCluster{cdsName: {Weight: 100}}, }, }, }, @@ -297,7 +297,7 @@ func (s) TestRDSWatch_TwoWatchesForSameResourceName(t *testing.T) { { Prefix: newStringP("/"), ActionType: xdsresource.RouteActionRoute, - WeightedClusters: map[string]xdsresource.WeightedCluster{"new-cds-resource": {Weight: 1}}, + WeightedClusters: map[string]xdsresource.WeightedCluster{"new-cds-resource": {Weight: 100}}, }, }, }, @@ -319,7 +319,7 @@ func (s) TestRDSWatch_TwoWatchesForSameResourceName(t *testing.T) { { Prefix: newStringP("/"), ActionType: xdsresource.RouteActionRoute, - WeightedClusters: map[string]xdsresource.WeightedCluster{cdsNameNewStyle: {Weight: 1}}, + WeightedClusters: map[string]xdsresource.WeightedCluster{cdsNameNewStyle: {Weight: 100}}, }, }, }, @@ -335,7 +335,7 @@ func (s) TestRDSWatch_TwoWatchesForSameResourceName(t *testing.T) { { Prefix: newStringP("/"), ActionType: xdsresource.RouteActionRoute, - WeightedClusters: map[string]xdsresource.WeightedCluster{"new-cds-resource": {Weight: 1}}, + WeightedClusters: map[string]xdsresource.WeightedCluster{"new-cds-resource": {Weight: 100}}, }, }, }, @@ -492,7 +492,7 @@ func (s) TestRDSWatch_ThreeWatchesForDifferentResourceNames(t *testing.T) { { Prefix: newStringP("/"), ActionType: xdsresource.RouteActionRoute, - WeightedClusters: map[string]xdsresource.WeightedCluster{cdsName: {Weight: 1}}, + WeightedClusters: map[string]xdsresource.WeightedCluster{cdsName: {Weight: 100}}, }, }, }, @@ -577,7 +577,7 @@ func (s) TestRDSWatch_ResourceCaching(t *testing.T) { { Prefix: newStringP("/"), ActionType: xdsresource.RouteActionRoute, - WeightedClusters: map[string]xdsresource.WeightedCluster{cdsName: {Weight: 1}}, + WeightedClusters: map[string]xdsresource.WeightedCluster{cdsName: {Weight: 100}}, }, }, }, @@ -713,7 +713,7 @@ func (s) TestRDSWatch_ValidResponseCancelsExpiryTimerBehavior(t *testing.T) { { Prefix: newStringP("/"), ActionType: xdsresource.RouteActionRoute, - WeightedClusters: map[string]xdsresource.WeightedCluster{cdsName: {Weight: 1}}, + WeightedClusters: map[string]xdsresource.WeightedCluster{cdsName: {Weight: 100}}, }, }, }, @@ -850,7 +850,7 @@ func (s) TestRDSWatch_PartialValid(t *testing.T) { { Prefix: newStringP("/"), ActionType: xdsresource.RouteActionRoute, - WeightedClusters: map[string]xdsresource.WeightedCluster{cdsName: {Weight: 1}}, + WeightedClusters: map[string]xdsresource.WeightedCluster{cdsName: {Weight: 100}}, }, }, }, From c3f385d62bd21ca4184891244b84e89c4c1dcd33 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Tue, 24 Jan 2023 14:50:22 -0800 Subject: [PATCH 4/4] clarify comment --- xds/internal/resolver/xds_resolver_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/xds/internal/resolver/xds_resolver_test.go b/xds/internal/resolver/xds_resolver_test.go index 943abebf2d79..e62ac945635f 100644 --- a/xds/internal/resolver/xds_resolver_test.go +++ b/xds/internal/resolver/xds_resolver_test.go @@ -1089,7 +1089,9 @@ func (s) TestResolverWRR(t *testing.T) { // TestResolverMaxStreamDuration tests the case where the resolver receives max // stream duration as part of the listener and route configuration resources. // The test verifies that the RPC timeout returned by the config selector -// matches the value specified in the config. +// matches expectations. A non-nil max stream duration (this includes an +// explicit zero value) in a matching route overrides the value specified in the +// listener resource. func (s) TestResolverMaxStreamDuration(t *testing.T) { mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{}) if err != nil {