From 5f282cb8973044c8e3034c985c98c3d2efe03d55 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Mon, 27 Jul 2020 13:55:02 -0700 Subject: [PATCH] xdsrouting: remove env variable for routing (#3754) --- xds/internal/client/bootstrap/bootstrap.go | 4 + xds/internal/client/client_watchers_rds.go | 4 - .../client/client_watchers_rds_test.go | 10 +- .../client/client_watchers_service.go | 12 +- .../client/client_watchers_service_test.go | 42 +++---- xds/internal/client/envconfig.go | 33 ----- xds/internal/client/v2client_rds.go | 48 +------- xds/internal/client/v2client_rds_test.go | 80 ++++-------- xds/internal/resolver/serviceconfig.go | 17 --- xds/internal/resolver/serviceconfig_test.go | 115 ++++++------------ xds/internal/resolver/xds_resolver_test.go | 13 +- 11 files changed, 103 insertions(+), 275 deletions(-) delete mode 100644 xds/internal/client/envconfig.go diff --git a/xds/internal/client/bootstrap/bootstrap.go b/xds/internal/client/bootstrap/bootstrap.go index 1e2e05e8f9b..b2805bf7372 100644 --- a/xds/internal/client/bootstrap/bootstrap.go +++ b/xds/internal/client/bootstrap/bootstrap.go @@ -195,6 +195,10 @@ func NewConfig() (*Config, error) { // 2. Environment variable "GRPC_XDS_EXPERIMENTAL_V3_SUPPORT" is set to // true. // The default value of the enum type "version.TransportAPI" is v2. + // + // TODO: there are multiple env variables, GRPC_XDS_BOOTSTRAP and + // GRPC_XDS_EXPERIMENTAL_V3_SUPPORT. Move all env variables into a separate + // package. if v3Env := os.Getenv(v3SupportEnv); v3Env == "true" { if serverSupportsV3 { config.TransportAPI = version.TransportV3 diff --git a/xds/internal/client/client_watchers_rds.go b/xds/internal/client/client_watchers_rds.go index 91de78601b4..cc1b18c2d91 100644 --- a/xds/internal/client/client_watchers_rds.go +++ b/xds/internal/client/client_watchers_rds.go @@ -49,10 +49,6 @@ type Route struct { } type rdsUpdate struct { - // weightedCluster is only set when routing is disabled (env variable - // GRPC_XDS_EXPERIMENTAL_ROUTING is not true). - weightedCluster map[string]uint32 - routes []*Route } type rdsCallbackFunc func(rdsUpdate, error) diff --git a/xds/internal/client/client_watchers_rds_test.go b/xds/internal/client/client_watchers_rds_test.go index 06ed7a377e2..16e042ecd96 100644 --- a/xds/internal/client/client_watchers_rds_test.go +++ b/xds/internal/client/client_watchers_rds_test.go @@ -54,7 +54,7 @@ func (s) TestRDSWatch(t *testing.T) { t.Fatalf("want new watch to start, got error %v", err) } - wantUpdate := rdsUpdate{weightedCluster: map[string]uint32{testCDSName: 1}} + wantUpdate := rdsUpdate{routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}} v2Client.r.newRDSUpdate(map[string]rdsUpdate{ testRDSName: wantUpdate, }) @@ -113,7 +113,7 @@ func (s) TestRDSTwoWatchSameResourceName(t *testing.T) { } } - wantUpdate := rdsUpdate{weightedCluster: map[string]uint32{testCDSName: 1}} + wantUpdate := rdsUpdate{routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}} v2Client.r.newRDSUpdate(map[string]rdsUpdate{ testRDSName: wantUpdate, }) @@ -179,8 +179,8 @@ func (s) TestRDSThreeWatchDifferentResourceName(t *testing.T) { t.Fatalf("want new watch to start, got error %v", err) } - wantUpdate1 := rdsUpdate{weightedCluster: map[string]uint32{testCDSName + "1": 1}} - wantUpdate2 := rdsUpdate{weightedCluster: map[string]uint32{testCDSName + "2": 1}} + wantUpdate1 := rdsUpdate{routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName + "1": 1}}}} + wantUpdate2 := rdsUpdate{routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName + "2": 1}}}} v2Client.r.newRDSUpdate(map[string]rdsUpdate{ testRDSName + "1": wantUpdate1, testRDSName + "2": wantUpdate2, @@ -219,7 +219,7 @@ func (s) TestRDSWatchAfterCache(t *testing.T) { t.Fatalf("want new watch to start, got error %v", err) } - wantUpdate := rdsUpdate{weightedCluster: map[string]uint32{testCDSName: 1}} + wantUpdate := rdsUpdate{routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}} v2Client.r.newRDSUpdate(map[string]rdsUpdate{ testRDSName: wantUpdate, }) diff --git a/xds/internal/client/client_watchers_service.go b/xds/internal/client/client_watchers_service.go index 1cf4c0f988c..cc96622d71d 100644 --- a/xds/internal/client/client_watchers_service.go +++ b/xds/internal/client/client_watchers_service.go @@ -25,14 +25,7 @@ import ( // ServiceUpdate contains update about the service. type ServiceUpdate struct { - // WeightedCluster is a map from cluster names (CDS resource to watch) to - // their weights. - // - // This field is only set when routing is disabled (env variable - // GRPC_XDS_EXPERIMENTAL_ROUTING is not true). - WeightedCluster map[string]uint32 - - // Routes + // Routes contain matchers+actions to route RPCs. Routes []*Route } @@ -126,8 +119,7 @@ func (w *serviceUpdateWatcher) handleRDSResp(update rdsUpdate, err error) { return } w.serviceCb(ServiceUpdate{ - WeightedCluster: update.weightedCluster, - Routes: update.routes, + Routes: update.routes, }, nil) } diff --git a/xds/internal/client/client_watchers_service_test.go b/xds/internal/client/client_watchers_service_test.go index 4535285bd6b..4b72f63e8ee 100644 --- a/xds/internal/client/client_watchers_service_test.go +++ b/xds/internal/client/client_watchers_service_test.go @@ -58,7 +58,7 @@ func (s) TestServiceWatch(t *testing.T) { serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err}) }) - wantUpdate := ServiceUpdate{WeightedCluster: map[string]uint32{testCDSName: 1}} + wantUpdate := ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}} if _, err := v2Client.addWatches[ldsURL].Receive(); err != nil { t.Fatalf("want new watch to start, got error %v", err) @@ -70,7 +70,7 @@ func (s) TestServiceWatch(t *testing.T) { t.Fatalf("want new watch to start, got error %v", err) } v2Client.r.newRDSUpdate(map[string]rdsUpdate{ - testRDSName: {weightedCluster: map[string]uint32{testCDSName: 1}}, + testRDSName: {routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}, }) if u, err := serviceUpdateCh.Receive(); err != nil || !cmp.Equal(u, serviceUpdateErr{wantUpdate, nil}, serviceCmpOpts...) { @@ -116,7 +116,7 @@ func (s) TestServiceWatchLDSUpdate(t *testing.T) { serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err}) }) - wantUpdate := ServiceUpdate{WeightedCluster: map[string]uint32{testCDSName: 1}} + wantUpdate := ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}} if _, err := v2Client.addWatches[ldsURL].Receive(); err != nil { t.Fatalf("want new watch to start, got error %v", err) @@ -128,7 +128,7 @@ func (s) TestServiceWatchLDSUpdate(t *testing.T) { t.Fatalf("want new watch to start, got error %v", err) } v2Client.r.newRDSUpdate(map[string]rdsUpdate{ - testRDSName: {weightedCluster: map[string]uint32{testCDSName: 1}}, + testRDSName: {routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}, }) if u, err := serviceUpdateCh.Receive(); err != nil || !cmp.Equal(u, serviceUpdateErr{wantUpdate, nil}, serviceCmpOpts...) { @@ -145,17 +145,17 @@ func (s) TestServiceWatchLDSUpdate(t *testing.T) { // Another update for the old name. v2Client.r.newRDSUpdate(map[string]rdsUpdate{ - testRDSName: {weightedCluster: map[string]uint32{testCDSName: 1}}, + testRDSName: {routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}, }) if u, err := serviceUpdateCh.Receive(); err != testutils.ErrRecvTimeout { t.Errorf("unexpected serviceUpdate: %v, %v, want channel recv timeout", u, err) } - wantUpdate2 := ServiceUpdate{WeightedCluster: map[string]uint32{testCDSName + "2": 1}} + wantUpdate2 := ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName + "2": 1}}}} // RDS update for the new name. v2Client.r.newRDSUpdate(map[string]rdsUpdate{ - testRDSName + "2": {weightedCluster: map[string]uint32{testCDSName + "2": 1}}, + testRDSName + "2": {routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName + "2": 1}}}}, }) if u, err := serviceUpdateCh.Receive(); err != nil || !cmp.Equal(u, serviceUpdateErr{wantUpdate2, nil}, serviceCmpOpts...) { @@ -183,7 +183,7 @@ func (s) TestServiceWatchSecond(t *testing.T) { serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err}) }) - wantUpdate := ServiceUpdate{WeightedCluster: map[string]uint32{testCDSName: 1}} + wantUpdate := ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}} if _, err := v2Client.addWatches[ldsURL].Receive(); err != nil { t.Fatalf("want new watch to start, got error %v", err) @@ -195,7 +195,7 @@ func (s) TestServiceWatchSecond(t *testing.T) { t.Fatalf("want new watch to start, got error %v", err) } v2Client.r.newRDSUpdate(map[string]rdsUpdate{ - testRDSName: {weightedCluster: map[string]uint32{testCDSName: 1}}, + testRDSName: {routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}, }) if u, err := serviceUpdateCh.Receive(); err != nil || !cmp.Equal(u, serviceUpdateErr{wantUpdate, nil}, serviceCmpOpts...) { @@ -226,7 +226,7 @@ func (s) TestServiceWatchSecond(t *testing.T) { testLDSName: {routeName: testRDSName}, }) v2Client.r.newRDSUpdate(map[string]rdsUpdate{ - testRDSName: {weightedCluster: map[string]uint32{testCDSName: 1}}, + testRDSName: {routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}, }) if u, err := serviceUpdateCh.Receive(); err != nil || !cmp.Equal(u, serviceUpdateErr{wantUpdate, nil}, serviceCmpOpts...) { @@ -263,8 +263,8 @@ func (s) TestServiceWatchWithNoResponseFromServer(t *testing.T) { callbackCh := testutils.NewChannel() cancelWatch := xdsClient.WatchService(goodLDSTarget1, func(su ServiceUpdate, err error) { - if su.WeightedCluster != nil { - callbackCh.Send(fmt.Errorf("got WeightedCluster: %+v, want nil", su.WeightedCluster)) + if su.Routes != nil { + callbackCh.Send(fmt.Errorf("got WeightedCluster: %+v, want nil", su.Routes)) return } if err == nil { @@ -307,8 +307,8 @@ func (s) TestServiceWatchEmptyRDS(t *testing.T) { callbackCh := testutils.NewChannel() cancelWatch := xdsClient.WatchService(goodLDSTarget1, func(su ServiceUpdate, err error) { - if su.WeightedCluster != nil { - callbackCh.Send(fmt.Errorf("got WeightedCluster: %+v, want nil", su.WeightedCluster)) + if su.Routes != nil { + callbackCh.Send(fmt.Errorf("got WeightedCluster: %+v, want nil", su.Routes)) return } if err == nil { @@ -394,7 +394,7 @@ func (s) TestServiceNotCancelRDSOnSameLDSUpdate(t *testing.T) { serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err}) }) - wantUpdate := ServiceUpdate{WeightedCluster: map[string]uint32{testCDSName: 1}} + wantUpdate := ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}} if _, err := v2Client.addWatches[ldsURL].Receive(); err != nil { t.Fatalf("want new watch to start, got error %v", err) @@ -406,7 +406,7 @@ func (s) TestServiceNotCancelRDSOnSameLDSUpdate(t *testing.T) { t.Fatalf("want new watch to start, got error %v", err) } v2Client.r.newRDSUpdate(map[string]rdsUpdate{ - testRDSName: {weightedCluster: map[string]uint32{testCDSName: 1}}, + testRDSName: {routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}, }) if u, err := serviceUpdateCh.Receive(); err != nil || !cmp.Equal(u, serviceUpdateErr{wantUpdate, nil}, serviceCmpOpts...) { @@ -445,7 +445,7 @@ func (s) TestServiceResourceRemoved(t *testing.T) { serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err}) }) - wantUpdate := ServiceUpdate{WeightedCluster: map[string]uint32{testCDSName: 1}} + wantUpdate := ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}} if _, err := v2Client.addWatches[ldsURL].Receive(); err != nil { t.Fatalf("want new watch to start, got error %v", err) @@ -457,7 +457,7 @@ func (s) TestServiceResourceRemoved(t *testing.T) { t.Fatalf("want new watch to start, got error %v", err) } v2Client.r.newRDSUpdate(map[string]rdsUpdate{ - testRDSName: {weightedCluster: map[string]uint32{testCDSName: 1}}, + testRDSName: {routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}, }) if u, err := serviceUpdateCh.Receive(); err != nil || !cmp.Equal(u, serviceUpdateErr{wantUpdate, nil}, serviceCmpOpts...) { @@ -477,7 +477,7 @@ func (s) TestServiceResourceRemoved(t *testing.T) { // Send RDS update for the removed LDS resource, expect no updates to // callback, because RDS should be canceled. v2Client.r.newRDSUpdate(map[string]rdsUpdate{ - testRDSName: {weightedCluster: map[string]uint32{testCDSName + "new": 1}}, + testRDSName: {routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName + "new": 1}}}}, }) if u, err := serviceUpdateCh.Receive(); err != testutils.ErrRecvTimeout { t.Errorf("unexpected serviceUpdate: %v, want receiving from channel timeout", u) @@ -497,9 +497,9 @@ func (s) TestServiceResourceRemoved(t *testing.T) { } v2Client.r.newRDSUpdate(map[string]rdsUpdate{ - testRDSName: {weightedCluster: map[string]uint32{testCDSName + "new2": 1}}, + testRDSName: {routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName + "new2": 1}}}}, }) - if u, err := serviceUpdateCh.Receive(); err != nil || !cmp.Equal(u, serviceUpdateErr{ServiceUpdate{WeightedCluster: map[string]uint32{testCDSName + "new2": 1}}, nil}, serviceCmpOpts...) { + if u, err := serviceUpdateCh.Receive(); err != nil || !cmp.Equal(u, serviceUpdateErr{ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName + "new2": 1}}}}, nil}, serviceCmpOpts...) { t.Errorf("unexpected serviceUpdate: %v, error receiving from channel: %v", u, err) } } diff --git a/xds/internal/client/envconfig.go b/xds/internal/client/envconfig.go deleted file mode 100644 index 40f448e6371..00000000000 --- a/xds/internal/client/envconfig.go +++ /dev/null @@ -1,33 +0,0 @@ -/* - * - * Copyright 2020 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package client - -import ( - "os" - "strings" -) - -// TODO: there are multiple env variables, GRPC_XDS_BOOTSTRAP and -// GRPC_XDS_EXPERIMENTAL_V3_SUPPORT, and this. Move all env variables into a -// separate package. -const routingEnabledConfigStr = "GRPC_XDS_EXPERIMENTAL_ROUTING" - -// routing is enabled only if env variable is set to true. The default is false. -// We may flip the default later. -var routingEnabled = strings.EqualFold(os.Getenv(routingEnabledConfigStr), "true") diff --git a/xds/internal/client/v2client_rds.go b/xds/internal/client/v2client_rds.go index 1fc9ac9752b..cc71f6538f6 100644 --- a/xds/internal/client/v2client_rds.go +++ b/xds/internal/client/v2client_rds.go @@ -97,51 +97,11 @@ func generateRDSUpdateFromRouteConfiguration(rc *xdspb.RouteConfiguration, host return rdsUpdate{}, fmt.Errorf("matched virtual host has no routes") } - // Keep the old code path for routing disabled. - if routingEnabled { - routes, err := routesProtoToSlice(vh.Routes, logger) - if err != nil { - return rdsUpdate{}, fmt.Errorf("received route is invalid: %v", err) - } - return rdsUpdate{routes: routes}, nil - } - - dr := vh.Routes[len(vh.Routes)-1] - match := dr.GetMatch() - if match == nil { - return rdsUpdate{}, fmt.Errorf("matched virtual host's default route doesn't have a match") - } - if prefix := match.GetPrefix(); prefix != "" && prefix != "/" { - // The matched virtual host is invalid. Match is not "" or "/". - return rdsUpdate{}, fmt.Errorf("matched virtual host's default route is %v, want Prefix empty string or /", match) - } - if caseSensitive := match.GetCaseSensitive(); caseSensitive != nil && !caseSensitive.Value { - // The case sensitive is set to false. Not set or set to true are both - // valid. - return rdsUpdate{}, fmt.Errorf("matched virtual host's default route set case-sensitive to false") - } - routeAction := dr.GetRoute() - if routeAction == nil { - return rdsUpdate{}, fmt.Errorf("matched route is nil") + routes, err := routesProtoToSlice(vh.Routes, logger) + if err != nil { + return rdsUpdate{}, fmt.Errorf("received route is invalid: %v", err) } - - if wc := routeAction.GetWeightedClusters(); wc != nil { - m, err := weightedClustersProtoToMap(wc) - if err != nil { - return rdsUpdate{}, fmt.Errorf("matched weighted cluster is invalid: %v", err) - } - return rdsUpdate{weightedCluster: m}, nil - } - - // When there's just one cluster, we set weightedCluster to map with one - // entry. This mean we will build a weighted_target balancer even if there's - // just one cluster. - // - // Otherwise, we will need to switch the top policy between weighted_target - // and CDS. In case when the action changes between one cluster and multiple - // clusters, changing top level policy means recreating TCP connection every - // time. - return rdsUpdate{weightedCluster: map[string]uint32{routeAction.GetCluster(): 1}}, nil + return rdsUpdate{routes: routes}, nil } func routesProtoToSlice(routes []*routepb.Route, logger *grpclog.PrefixLogger) ([]*Route, error) { diff --git a/xds/internal/client/v2client_rds_test.go b/xds/internal/client/v2client_rds_test.go index e3ec4e01fd8..3e5ef96fd6b 100644 --- a/xds/internal/client/v2client_rds_test.go +++ b/xds/internal/client/v2client_rds_test.go @@ -158,7 +158,7 @@ func (s) TestRDSGenerateRDSUpdateFromRouteConfiguration(t *testing.T) { { name: "good-route-config-with-empty-string-route", rc: goodRouteConfig1, - wantUpdate: rdsUpdate{weightedCluster: map[string]uint32{goodClusterName1: 1}}, + wantUpdate: rdsUpdate{routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{goodClusterName1: 1}}}}, }, { // default route's match is not empty string, but "/". @@ -173,7 +173,7 @@ func (s) TestRDSGenerateRDSUpdateFromRouteConfiguration(t *testing.T) { Route: &routepb.RouteAction{ ClusterSpecifier: &routepb.RouteAction_Cluster{Cluster: goodClusterName1}, }}}}}}}, - wantUpdate: rdsUpdate{weightedCluster: map[string]uint32{goodClusterName1: 1}}, + wantUpdate: rdsUpdate{routes: []*Route{{Prefix: newStringP("/"), Action: map[string]uint32{goodClusterName1: 1}}}}, }, { @@ -217,7 +217,7 @@ func (s) TestRDSGenerateRDSUpdateFromRouteConfiguration(t *testing.T) { }, TotalWeight: &wrapperspb.UInt32Value{Value: 10}, }}}}}}}}}, - wantUpdate: rdsUpdate{weightedCluster: map[string]uint32{"a": 2, "b": 3, "c": 5}}, + wantUpdate: rdsUpdate{routes: []*Route{{Prefix: newStringP("/"), Action: map[string]uint32{"a": 2, "b": 3, "c": 5}}}}, }, } @@ -243,59 +243,10 @@ func doLDS(t *testing.T, v2c *v2Client, fakeServer *fakeserver.Server) { } } -// TestRDSHandleResponseWithRoutingEnabled starts a fake xDS server, makes a -// ClientConn to it, and creates a v2Client using it. Then, it registers an LDS -// and RDS watcher and tests different RDS responses. -// -// Routing is protected by an env variable. This test sets it to true, so the -// new fields will be parsed. -func (s) TestRDSHandleResponseWithRoutingEnabled(t *testing.T) { - routingEnabled = true - defer func() { - routingEnabled = false - }() - tests := []struct { - name string - rdsResponse *xdspb.DiscoveryResponse - wantErr bool - wantUpdate *rdsUpdate - wantUpdateErr bool - }{ - // Response contains one good interesting RouteConfiguration. - { - name: "one-good-route-config", - rdsResponse: goodRDSResponse1, - wantErr: false, - wantUpdate: &rdsUpdate{ - // Instead of just weighted targets when routing is disabled, - // this result contains a route with perfix "", and action as - // weighted targets. - routes: []*Route{{ - Prefix: newStringP(""), - Action: map[string]uint32{goodClusterName1: 1}, - }}, - }, - wantUpdateErr: false, - }, - } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - testWatchHandle(t, &watchHandleTestcase{ - typeURL: rdsURL, - resourceName: goodRouteName1, - responseToHandle: test.rdsResponse, - wantHandleErr: test.wantErr, - wantUpdate: test.wantUpdate, - wantUpdateErr: test.wantUpdateErr, - }) - }) - } -} - -// TestRDSHandleResponseWithRoutingDisabled starts a fake xDS server, makes a -// ClientConn to it, and creates a v2Client using it. Then, it registers an LDS -// and RDS watcher and tests different RDS responses. -func (s) TestRDSHandleResponseWithRoutingDisabled(t *testing.T) { +// TestRDSHandleResponseWithRouting starts a fake xDS server, makes a ClientConn +// to it, and creates a v2Client using it. Then, it registers an LDS and RDS +// watcher and tests different RDS responses. +func (s) TestRDSHandleResponseWithRouting(t *testing.T) { tests := []struct { name string rdsResponse *xdspb.DiscoveryResponse @@ -342,7 +293,22 @@ func (s) TestRDSHandleResponseWithRoutingDisabled(t *testing.T) { name: "one-good-route-config", rdsResponse: goodRDSResponse1, wantErr: false, - wantUpdate: &rdsUpdate{weightedCluster: map[string]uint32{goodClusterName1: 1}}, + wantUpdate: &rdsUpdate{routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{goodClusterName1: 1}}}}, + wantUpdateErr: false, + }, + { + name: "one-good-route-config with routes", + rdsResponse: goodRDSResponse1, + wantErr: false, + wantUpdate: &rdsUpdate{ + // Instead of just weighted targets when routing is disabled, + // this result contains a route with perfix "", and action as + // weighted targets. + routes: []*Route{{ + Prefix: newStringP(""), + Action: map[string]uint32{goodClusterName1: 1}, + }}, + }, wantUpdateErr: false, }, } diff --git a/xds/internal/resolver/serviceconfig.go b/xds/internal/resolver/serviceconfig.go index 84c5753adf6..805d8d41104 100644 --- a/xds/internal/resolver/serviceconfig.go +++ b/xds/internal/resolver/serviceconfig.go @@ -137,23 +137,6 @@ func weightedClusterToBalancerConfig(wc map[string]uint32) balancerConfig { return bc } -func weightedClusterToJSON(wc map[string]uint32) (string, error) { - sc := serviceConfig{ - LoadBalancingConfig: weightedClusterToBalancerConfig(wc), - } - bs, err := json.Marshal(sc) - if err != nil { - return "", fmt.Errorf("failed to marshal json: %v", err) - } - return string(bs), nil -} - func (r *xdsResolver) serviceUpdateToJSON(su xdsclient.ServiceUpdate) (string, error) { - // If WeightedClusters is set, routing is disabled (by env variable). Use - // weighted target only. - if su.WeightedCluster != nil { - return weightedClusterToJSON(su.WeightedCluster) - } - return r.routesToJSON(su.Routes) } diff --git a/xds/internal/resolver/serviceconfig_test.go b/xds/internal/resolver/serviceconfig_test.go index 4e149893ee7..ce4a7e8fab9 100644 --- a/xds/internal/resolver/serviceconfig_test.go +++ b/xds/internal/resolver/serviceconfig_test.go @@ -32,31 +32,44 @@ import ( ) const ( - testCluster1 = "test-cluster-1" - testClusterOnlyJSON = `{"loadBalancingConfig":[{ - "weighted_target_experimental": { - "targets": { "test-cluster-1" : { "weight":1, "childPolicy":[{"cds_experimental":{"cluster":"test-cluster-1"}}] } } - } - }]}` + testCluster1 = "test-cluster-1" + testOneClusterOnlyJSON = `{"loadBalancingConfig":[{ + "xds_routing_experimental":{ + "action":{ + "test-cluster-1_0":{ + "childPolicy":[{ + "weighted_target_experimental":{ + "targets":{ + "test-cluster-1":{ + "weight":1, + "childPolicy":[{"cds_experimental":{"cluster":"test-cluster-1"}}] + } + }}}] + } + }, + "route":[{"prefix":"","action":"test-cluster-1_0"}] + }}]}` testWeightedCDSJSON = `{"loadBalancingConfig":[{ - "weighted_target_experimental": { - "targets": { - "cluster_1" : { - "weight":75, - "childPolicy":[{"cds_experimental":{"cluster":"cluster_1"}}] - }, - "cluster_2" : { - "weight":25, - "childPolicy":[{"cds_experimental":{"cluster":"cluster_2"}}] - } - } - } - }]}` - testWeightedCDSNoChildJSON = `{"loadBalancingConfig":[{ - "weighted_target_experimental": { - "targets": {} - } - }]}` + "xds_routing_experimental":{ + "action":{ + "cluster_1_cluster_2_1":{ + "childPolicy":[{ + "weighted_target_experimental":{ + "targets":{ + "cluster_1":{ + "weight":75, + "childPolicy":[{"cds_experimental":{"cluster":"cluster_1"}}] + }, + "cluster_2":{ + "weight":25, + "childPolicy":[{"cds_experimental":{"cluster":"cluster_2"}}] + } + }}}] + } + }, + "route":[{"prefix":"","action":"cluster_1_cluster_2_1"}] + }}]}` + testRoutingJSON = `{"loadBalancingConfig":[{ "xds_routing_experimental": { "action":{ @@ -178,51 +191,6 @@ const ( ` ) -func TestWeightedClusterToJSON(t *testing.T) { - tests := []struct { - name string - wc map[string]uint32 - wantJSON string // wantJSON is not to be compared verbatim. - }{ - { - name: "one cluster only", - wc: map[string]uint32{testCluster1: 1}, - wantJSON: testClusterOnlyJSON, - }, - { - name: "empty weighted clusters", - wc: nil, - wantJSON: testWeightedCDSNoChildJSON, - }, - { - name: "weighted clusters", - wc: map[string]uint32{ - "cluster_1": 75, - "cluster_2": 25, - }, - wantJSON: testWeightedCDSJSON, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - gotJSON, err := weightedClusterToJSON(tt.wc) - if err != nil { - t.Errorf("serviceUpdateToJSON returned error: %v", err) - return - } - - gotParsed := internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)(gotJSON) - wantParsed := internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)(tt.wantJSON) - - if !internal.EqualServiceConfigForTesting(gotParsed.Config, wantParsed.Config) { - t.Errorf("serviceUpdateToJSON() = %v, want %v", gotJSON, tt.wantJSON) - t.Error("gotParsed: ", cmp.Diff(nil, gotParsed)) - t.Error("wantParsed: ", cmp.Diff(nil, wantParsed)) - } - }) - } -} - func TestRoutesToJSON(t *testing.T) { tests := []struct { name string @@ -348,15 +316,6 @@ func TestServiceUpdateToJSON(t *testing.T) { wantJSON string wantErr bool }{ - { - name: "weighted clusters", - su: client.ServiceUpdate{WeightedCluster: map[string]uint32{ - "cluster_1": 75, - "cluster_2": 25, - }}, - wantJSON: testWeightedCDSJSON, - wantErr: false, - }, { name: "routing", su: client.ServiceUpdate{ diff --git a/xds/internal/resolver/xds_resolver_test.go b/xds/internal/resolver/xds_resolver_test.go index d84ec44eb8c..5c3b0fce84e 100644 --- a/xds/internal/resolver/xds_resolver_test.go +++ b/xds/internal/resolver/xds_resolver_test.go @@ -272,7 +272,7 @@ func TestXDSResolverWatchCallbackAfterClose(t *testing.T) { // Call the watchAPI callback after closing the resolver, and make sure no // update is triggerred on the ClientConn. xdsR.Close() - xdsC.InvokeWatchServiceCallback(xdsclient.ServiceUpdate{WeightedCluster: map[string]uint32{cluster: 1}}, nil) + xdsC.InvokeWatchServiceCallback(xdsclient.ServiceUpdate{Routes: []*client.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster: 1}}}}, nil) if gotVal, gotErr := tcc.stateCh.Receive(); gotErr != testutils.ErrRecvTimeout { t.Fatalf("ClientConn.UpdateState called after xdsResolver is closed: %v", gotVal) } @@ -316,20 +316,21 @@ func TestXDSResolverGoodServiceUpdate(t *testing.T) { }() waitForWatchService(t, xdsC, targetStr) + defer replaceRandNumGenerator(0)() for _, tt := range []struct { su client.ServiceUpdate wantJSON string }{ { - su: client.ServiceUpdate{WeightedCluster: map[string]uint32{testCluster1: 1}}, - wantJSON: testClusterOnlyJSON, + su: client.ServiceUpdate{Routes: []*client.Route{{Prefix: newStringP(""), Action: map[string]uint32{testCluster1: 1}}}}, + wantJSON: testOneClusterOnlyJSON, }, { - su: client.ServiceUpdate{WeightedCluster: map[string]uint32{ + su: client.ServiceUpdate{Routes: []*client.Route{{Prefix: newStringP(""), Action: map[string]uint32{ "cluster_1": 75, "cluster_2": 25, - }}, + }}}}, wantJSON: testWeightedCDSJSON, }, } { @@ -382,7 +383,7 @@ func TestXDSResolverGoodUpdateAfterError(t *testing.T) { // Invoke the watchAPI callback with a good service update and wait for the // UpdateState method to be called on the ClientConn. - xdsC.InvokeWatchServiceCallback(xdsclient.ServiceUpdate{WeightedCluster: map[string]uint32{cluster: 1}}, nil) + xdsC.InvokeWatchServiceCallback(xdsclient.ServiceUpdate{Routes: []*client.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster: 1}}}}, nil) gotState, err := tcc.stateCh.Receive() if err != nil { t.Fatalf("ClientConn.UpdateState returned error: %v", err)