diff --git a/xds/internal/resolver/xds_resolver_test.go b/xds/internal/resolver/xds_resolver_test.go index 6131c220bbc..d67cb58a478 100644 --- a/xds/internal/resolver/xds_resolver_test.go +++ b/xds/internal/resolver/xds_resolver_test.go @@ -567,14 +567,6 @@ func (s) TestResolverCloseClosesXDSClient(t *testing.T) { } } -// TestXDSResolverCloseClosesXDSClient tests that the XDS resolver's Close -// method closes the XDS client. -func (s) TestXDSResolverCloseClosesXDSClient(t *testing.T) { - xdsR, _, _, cancel := testSetup(t, setupOpts{target: target}) - xdsR.Close() - cancel() // Blocks until the xDS client is closed. -} - // TestResolverBadServiceUpdate tests the case where a resource returned by the // management server is NACKed by the xDS client, which then returns an update // containing an error to the resolver. Verifies that the update is propagated @@ -1168,110 +1160,139 @@ func (s) TestResolverRemovedWithRPCs(t *testing.T) { } } -// TestXDSResolverRemovedResource tests for proper behavior after a resource is -// removed. -func (s) TestXDSResolverRemovedResource(t *testing.T) { - xdsR, xdsC, tcc, cancel := testSetup(t, setupOpts{target: target}) - defer cancel() - defer xdsR.Close() +// TestResolverRemovedResource tests the case where resources returned by the +// management server are removed. The test verifies that the resolver pushes the +// expected config selector and service config in this case. +func (s) TestResolverRemovedResource(t *testing.T) { + mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{}) + if err != nil { + t.Fatal(err) + } + defer mgmtServer.Stop() + + // Create a bootstrap configuration specifying the above management server. + nodeID := uuid.New().String() + cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{ + NodeID: nodeID, + ServerURI: mgmtServer.Address, + Version: xdsbootstrap.TransportV3, + }) + if err != nil { + t.Fatal(err) + } + defer cleanup() + + const serviceName = "my-service-client-side-xds" + tcc, rClose := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)}) + defer rClose() + // Configure the management server with a good listener and route + // configuration resource. + ldsName := serviceName + rdsName := "route-" + serviceName + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(ldsName, rdsName)}, + Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(rdsName, ldsName, "test-cluster-1")}, + SkipValidation: true, + } ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - waitForWatchListener(ctx, t, xdsC, targetStr) - xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{RouteConfigName: routeStr, HTTPFilters: routerFilterList}, nil) - waitForWatchRouteConfig(ctx, t, xdsC, routeStr) - - // Invoke the watchAPI callback with a good service update and wait for the - // UpdateState method to be called on the ClientConn. - xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{ - VirtualHosts: []*xdsresource.VirtualHost{ - { - Domains: []string{targetStr}, - Routes: []*xdsresource.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsresource.WeightedCluster{"test-cluster-1": {Weight: 1}}}}, - }, - }, - }, nil) - wantJSON := `{"loadBalancingConfig":[{ - "xds_cluster_manager_experimental":{ - "children":{ - "cluster:test-cluster-1":{ - "childPolicy":[{"cds_experimental":{"cluster":"test-cluster-1"}}] - } - } - }}]}` - wantSCParsed := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(wantJSON) + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } - gotState, err := tcc.stateCh.Receive(ctx) + // Read the update pushed by the resolver to the ClientConn. + val, err := tcc.stateCh.Receive(ctx) if err != nil { - t.Fatalf("Error waiting for UpdateState to be called: %v", err) + t.Fatalf("Timeout waiting for an update from the resolver: %v", err) } - rState := gotState.(resolver.State) + rState := val.(resolver.State) if err := rState.ServiceConfig.Err; err != nil { - t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err) + t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err) } + wantSCParsed := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(` +{ + "loadBalancingConfig": [ + { + "xds_cluster_manager_experimental": { + "children": { + "cluster:test-cluster-1": { + "childPolicy": [ + { + "cds_experimental": { + "cluster": "test-cluster-1" + } + } + ] + } + } + } + } + ] +}`) if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { - t.Errorf("ClientConn.UpdateState received different service config") - t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config)) - t.Error("want: ", cmp.Diff(nil, wantSCParsed.Config)) + t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, rState.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config)) } // "Make an RPC" by invoking the config selector. cs := iresolver.GetConfigSelector(rState) if cs == nil { - t.Fatalf("received nil config selector") + t.Fatal("Received nil config selector in update from resolver") } - res, err := cs.SelectConfig(iresolver.RPCInfo{Context: context.Background()}) + res, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"}) if err != nil { - t.Fatalf("Unexpected error from cs.SelectConfig(_): %v", err) + t.Fatalf("cs.SelectConfig(): %v", err) } // "Finish the RPC"; this could cause a panic if the resolver doesn't // handle it correctly. res.OnCommitted() - // Delete the resource. The channel should receive a service config with the - // original cluster but with an erroring config selector. - suErr := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource removed error") - xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{}, suErr) + // Delete the resources on the management server, resulting in a + // resource-not-found error from the xDS client. + if err := mgmtServer.Update(ctx, e2e.UpdateOptions{NodeID: nodeID}); err != nil { + t.Fatal(err) + } - if gotState, err = tcc.stateCh.Receive(ctx); err != nil { - t.Fatalf("Error waiting for UpdateState to be called: %v", err) + // The channel should receive the existing service config with the original + // cluster but with an erroring config selector. + val, err = tcc.stateCh.Receive(ctx) + if err != nil { + t.Fatalf("Timeout waiting for an update from the resolver: %v", err) } - rState = gotState.(resolver.State) + rState = val.(resolver.State) if err := rState.ServiceConfig.Err; err != nil { - t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err) + t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err) } if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { - t.Errorf("ClientConn.UpdateState received different service config") - t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config)) - t.Error("want: ", cmp.Diff(nil, wantSCParsed.Config)) + t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, rState.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config)) } // "Make another RPC" by invoking the config selector. cs = iresolver.GetConfigSelector(rState) if cs == nil { - t.Fatalf("received nil config selector") + t.Fatal("Received nil config selector in update from resolver") } - res, err = cs.SelectConfig(iresolver.RPCInfo{Context: context.Background()}) + res, err = cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"}) if err == nil || status.Code(err) != codes.Unavailable { - t.Fatalf("Expected UNAVAILABLE error from cs.SelectConfig(_); got %v, %v", res, err) + t.Fatalf("cs.SelectConfig() got %v, %v, expected UNAVAILABLE error", res, err) } // In the meantime, an empty ServiceConfig update should have been sent. - if gotState, err = tcc.stateCh.Receive(ctx); err != nil { - t.Fatalf("Error waiting for UpdateState to be called: %v", err) + val, err = tcc.stateCh.Receive(ctx) + if err != nil { + t.Fatalf("Timeout waiting for an update from the resolver: %v", err) } - rState = gotState.(resolver.State) + rState = val.(resolver.State) if err := rState.ServiceConfig.Err; err != nil { - t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err) + t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err) } wantSCParsed = internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)("{}") if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { - t.Errorf("ClientConn.UpdateState received different service config") - t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config)) - t.Error("want: ", cmp.Diff(nil, wantSCParsed.Config)) + t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, rState.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config)) } }