Skip to content

Commit

Permalink
xdsrouting: remove env variable for routing (#3754)
Browse files Browse the repository at this point in the history
  • Loading branch information
menghanl committed Jul 28, 2020
1 parent cee815d commit 5f282cb
Show file tree
Hide file tree
Showing 11 changed files with 103 additions and 275 deletions.
4 changes: 4 additions & 0 deletions xds/internal/client/bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,10 @@ func NewConfig() (*Config, error) {
// 2. Environment variable "GRPC_XDS_EXPERIMENTAL_V3_SUPPORT" is set to
// true.
// The default value of the enum type "version.TransportAPI" is v2.
//
// TODO: there are multiple env variables, GRPC_XDS_BOOTSTRAP and
// GRPC_XDS_EXPERIMENTAL_V3_SUPPORT. Move all env variables into a separate
// package.
if v3Env := os.Getenv(v3SupportEnv); v3Env == "true" {
if serverSupportsV3 {
config.TransportAPI = version.TransportV3
Expand Down
4 changes: 0 additions & 4 deletions xds/internal/client/client_watchers_rds.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,6 @@ type Route struct {
}

type rdsUpdate struct {
// weightedCluster is only set when routing is disabled (env variable
// GRPC_XDS_EXPERIMENTAL_ROUTING is not true).
weightedCluster map[string]uint32

routes []*Route
}
type rdsCallbackFunc func(rdsUpdate, error)
Expand Down
10 changes: 5 additions & 5 deletions xds/internal/client/client_watchers_rds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (s) TestRDSWatch(t *testing.T) {
t.Fatalf("want new watch to start, got error %v", err)
}

wantUpdate := rdsUpdate{weightedCluster: map[string]uint32{testCDSName: 1}}
wantUpdate := rdsUpdate{routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}
v2Client.r.newRDSUpdate(map[string]rdsUpdate{
testRDSName: wantUpdate,
})
Expand Down Expand Up @@ -113,7 +113,7 @@ func (s) TestRDSTwoWatchSameResourceName(t *testing.T) {
}
}

wantUpdate := rdsUpdate{weightedCluster: map[string]uint32{testCDSName: 1}}
wantUpdate := rdsUpdate{routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}
v2Client.r.newRDSUpdate(map[string]rdsUpdate{
testRDSName: wantUpdate,
})
Expand Down Expand Up @@ -179,8 +179,8 @@ func (s) TestRDSThreeWatchDifferentResourceName(t *testing.T) {
t.Fatalf("want new watch to start, got error %v", err)
}

wantUpdate1 := rdsUpdate{weightedCluster: map[string]uint32{testCDSName + "1": 1}}
wantUpdate2 := rdsUpdate{weightedCluster: map[string]uint32{testCDSName + "2": 1}}
wantUpdate1 := rdsUpdate{routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName + "1": 1}}}}
wantUpdate2 := rdsUpdate{routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName + "2": 1}}}}
v2Client.r.newRDSUpdate(map[string]rdsUpdate{
testRDSName + "1": wantUpdate1,
testRDSName + "2": wantUpdate2,
Expand Down Expand Up @@ -219,7 +219,7 @@ func (s) TestRDSWatchAfterCache(t *testing.T) {
t.Fatalf("want new watch to start, got error %v", err)
}

wantUpdate := rdsUpdate{weightedCluster: map[string]uint32{testCDSName: 1}}
wantUpdate := rdsUpdate{routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}
v2Client.r.newRDSUpdate(map[string]rdsUpdate{
testRDSName: wantUpdate,
})
Expand Down
12 changes: 2 additions & 10 deletions xds/internal/client/client_watchers_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,7 @@ import (

// ServiceUpdate contains update about the service.
type ServiceUpdate struct {
// WeightedCluster is a map from cluster names (CDS resource to watch) to
// their weights.
//
// This field is only set when routing is disabled (env variable
// GRPC_XDS_EXPERIMENTAL_ROUTING is not true).
WeightedCluster map[string]uint32

// Routes
// Routes contain matchers+actions to route RPCs.
Routes []*Route
}

Expand Down Expand Up @@ -126,8 +119,7 @@ func (w *serviceUpdateWatcher) handleRDSResp(update rdsUpdate, err error) {
return
}
w.serviceCb(ServiceUpdate{
WeightedCluster: update.weightedCluster,
Routes: update.routes,
Routes: update.routes,
}, nil)
}

Expand Down
42 changes: 21 additions & 21 deletions xds/internal/client/client_watchers_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (s) TestServiceWatch(t *testing.T) {
serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err})
})

wantUpdate := ServiceUpdate{WeightedCluster: map[string]uint32{testCDSName: 1}}
wantUpdate := ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}

if _, err := v2Client.addWatches[ldsURL].Receive(); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
Expand All @@ -70,7 +70,7 @@ func (s) TestServiceWatch(t *testing.T) {
t.Fatalf("want new watch to start, got error %v", err)
}
v2Client.r.newRDSUpdate(map[string]rdsUpdate{
testRDSName: {weightedCluster: map[string]uint32{testCDSName: 1}},
testRDSName: {routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}},
})

if u, err := serviceUpdateCh.Receive(); err != nil || !cmp.Equal(u, serviceUpdateErr{wantUpdate, nil}, serviceCmpOpts...) {
Expand Down Expand Up @@ -116,7 +116,7 @@ func (s) TestServiceWatchLDSUpdate(t *testing.T) {
serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err})
})

wantUpdate := ServiceUpdate{WeightedCluster: map[string]uint32{testCDSName: 1}}
wantUpdate := ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}

if _, err := v2Client.addWatches[ldsURL].Receive(); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
Expand All @@ -128,7 +128,7 @@ func (s) TestServiceWatchLDSUpdate(t *testing.T) {
t.Fatalf("want new watch to start, got error %v", err)
}
v2Client.r.newRDSUpdate(map[string]rdsUpdate{
testRDSName: {weightedCluster: map[string]uint32{testCDSName: 1}},
testRDSName: {routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}},
})

if u, err := serviceUpdateCh.Receive(); err != nil || !cmp.Equal(u, serviceUpdateErr{wantUpdate, nil}, serviceCmpOpts...) {
Expand All @@ -145,17 +145,17 @@ func (s) TestServiceWatchLDSUpdate(t *testing.T) {

// Another update for the old name.
v2Client.r.newRDSUpdate(map[string]rdsUpdate{
testRDSName: {weightedCluster: map[string]uint32{testCDSName: 1}},
testRDSName: {routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}},
})

if u, err := serviceUpdateCh.Receive(); err != testutils.ErrRecvTimeout {
t.Errorf("unexpected serviceUpdate: %v, %v, want channel recv timeout", u, err)
}

wantUpdate2 := ServiceUpdate{WeightedCluster: map[string]uint32{testCDSName + "2": 1}}
wantUpdate2 := ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName + "2": 1}}}}
// RDS update for the new name.
v2Client.r.newRDSUpdate(map[string]rdsUpdate{
testRDSName + "2": {weightedCluster: map[string]uint32{testCDSName + "2": 1}},
testRDSName + "2": {routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName + "2": 1}}}},
})

if u, err := serviceUpdateCh.Receive(); err != nil || !cmp.Equal(u, serviceUpdateErr{wantUpdate2, nil}, serviceCmpOpts...) {
Expand Down Expand Up @@ -183,7 +183,7 @@ func (s) TestServiceWatchSecond(t *testing.T) {
serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err})
})

wantUpdate := ServiceUpdate{WeightedCluster: map[string]uint32{testCDSName: 1}}
wantUpdate := ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}

if _, err := v2Client.addWatches[ldsURL].Receive(); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
Expand All @@ -195,7 +195,7 @@ func (s) TestServiceWatchSecond(t *testing.T) {
t.Fatalf("want new watch to start, got error %v", err)
}
v2Client.r.newRDSUpdate(map[string]rdsUpdate{
testRDSName: {weightedCluster: map[string]uint32{testCDSName: 1}},
testRDSName: {routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}},
})

if u, err := serviceUpdateCh.Receive(); err != nil || !cmp.Equal(u, serviceUpdateErr{wantUpdate, nil}, serviceCmpOpts...) {
Expand Down Expand Up @@ -226,7 +226,7 @@ func (s) TestServiceWatchSecond(t *testing.T) {
testLDSName: {routeName: testRDSName},
})
v2Client.r.newRDSUpdate(map[string]rdsUpdate{
testRDSName: {weightedCluster: map[string]uint32{testCDSName: 1}},
testRDSName: {routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}},
})

if u, err := serviceUpdateCh.Receive(); err != nil || !cmp.Equal(u, serviceUpdateErr{wantUpdate, nil}, serviceCmpOpts...) {
Expand Down Expand Up @@ -263,8 +263,8 @@ func (s) TestServiceWatchWithNoResponseFromServer(t *testing.T) {

callbackCh := testutils.NewChannel()
cancelWatch := xdsClient.WatchService(goodLDSTarget1, func(su ServiceUpdate, err error) {
if su.WeightedCluster != nil {
callbackCh.Send(fmt.Errorf("got WeightedCluster: %+v, want nil", su.WeightedCluster))
if su.Routes != nil {
callbackCh.Send(fmt.Errorf("got WeightedCluster: %+v, want nil", su.Routes))
return
}
if err == nil {
Expand Down Expand Up @@ -307,8 +307,8 @@ func (s) TestServiceWatchEmptyRDS(t *testing.T) {

callbackCh := testutils.NewChannel()
cancelWatch := xdsClient.WatchService(goodLDSTarget1, func(su ServiceUpdate, err error) {
if su.WeightedCluster != nil {
callbackCh.Send(fmt.Errorf("got WeightedCluster: %+v, want nil", su.WeightedCluster))
if su.Routes != nil {
callbackCh.Send(fmt.Errorf("got WeightedCluster: %+v, want nil", su.Routes))
return
}
if err == nil {
Expand Down Expand Up @@ -394,7 +394,7 @@ func (s) TestServiceNotCancelRDSOnSameLDSUpdate(t *testing.T) {
serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err})
})

wantUpdate := ServiceUpdate{WeightedCluster: map[string]uint32{testCDSName: 1}}
wantUpdate := ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}

if _, err := v2Client.addWatches[ldsURL].Receive(); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
Expand All @@ -406,7 +406,7 @@ func (s) TestServiceNotCancelRDSOnSameLDSUpdate(t *testing.T) {
t.Fatalf("want new watch to start, got error %v", err)
}
v2Client.r.newRDSUpdate(map[string]rdsUpdate{
testRDSName: {weightedCluster: map[string]uint32{testCDSName: 1}},
testRDSName: {routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}},
})

if u, err := serviceUpdateCh.Receive(); err != nil || !cmp.Equal(u, serviceUpdateErr{wantUpdate, nil}, serviceCmpOpts...) {
Expand Down Expand Up @@ -445,7 +445,7 @@ func (s) TestServiceResourceRemoved(t *testing.T) {
serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err})
})

wantUpdate := ServiceUpdate{WeightedCluster: map[string]uint32{testCDSName: 1}}
wantUpdate := ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}

if _, err := v2Client.addWatches[ldsURL].Receive(); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
Expand All @@ -457,7 +457,7 @@ func (s) TestServiceResourceRemoved(t *testing.T) {
t.Fatalf("want new watch to start, got error %v", err)
}
v2Client.r.newRDSUpdate(map[string]rdsUpdate{
testRDSName: {weightedCluster: map[string]uint32{testCDSName: 1}},
testRDSName: {routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}},
})

if u, err := serviceUpdateCh.Receive(); err != nil || !cmp.Equal(u, serviceUpdateErr{wantUpdate, nil}, serviceCmpOpts...) {
Expand All @@ -477,7 +477,7 @@ func (s) TestServiceResourceRemoved(t *testing.T) {
// Send RDS update for the removed LDS resource, expect no updates to
// callback, because RDS should be canceled.
v2Client.r.newRDSUpdate(map[string]rdsUpdate{
testRDSName: {weightedCluster: map[string]uint32{testCDSName + "new": 1}},
testRDSName: {routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName + "new": 1}}}},
})
if u, err := serviceUpdateCh.Receive(); err != testutils.ErrRecvTimeout {
t.Errorf("unexpected serviceUpdate: %v, want receiving from channel timeout", u)
Expand All @@ -497,9 +497,9 @@ func (s) TestServiceResourceRemoved(t *testing.T) {
}

v2Client.r.newRDSUpdate(map[string]rdsUpdate{
testRDSName: {weightedCluster: map[string]uint32{testCDSName + "new2": 1}},
testRDSName: {routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName + "new2": 1}}}},
})
if u, err := serviceUpdateCh.Receive(); err != nil || !cmp.Equal(u, serviceUpdateErr{ServiceUpdate{WeightedCluster: map[string]uint32{testCDSName + "new2": 1}}, nil}, serviceCmpOpts...) {
if u, err := serviceUpdateCh.Receive(); err != nil || !cmp.Equal(u, serviceUpdateErr{ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName + "new2": 1}}}}, nil}, serviceCmpOpts...) {
t.Errorf("unexpected serviceUpdate: %v, error receiving from channel: %v", u, err)
}
}
33 changes: 0 additions & 33 deletions xds/internal/client/envconfig.go

This file was deleted.

48 changes: 4 additions & 44 deletions xds/internal/client/v2client_rds.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,51 +97,11 @@ func generateRDSUpdateFromRouteConfiguration(rc *xdspb.RouteConfiguration, host
return rdsUpdate{}, fmt.Errorf("matched virtual host has no routes")
}

// Keep the old code path for routing disabled.
if routingEnabled {
routes, err := routesProtoToSlice(vh.Routes, logger)
if err != nil {
return rdsUpdate{}, fmt.Errorf("received route is invalid: %v", err)
}
return rdsUpdate{routes: routes}, nil
}

dr := vh.Routes[len(vh.Routes)-1]
match := dr.GetMatch()
if match == nil {
return rdsUpdate{}, fmt.Errorf("matched virtual host's default route doesn't have a match")
}
if prefix := match.GetPrefix(); prefix != "" && prefix != "/" {
// The matched virtual host is invalid. Match is not "" or "/".
return rdsUpdate{}, fmt.Errorf("matched virtual host's default route is %v, want Prefix empty string or /", match)
}
if caseSensitive := match.GetCaseSensitive(); caseSensitive != nil && !caseSensitive.Value {
// The case sensitive is set to false. Not set or set to true are both
// valid.
return rdsUpdate{}, fmt.Errorf("matched virtual host's default route set case-sensitive to false")
}
routeAction := dr.GetRoute()
if routeAction == nil {
return rdsUpdate{}, fmt.Errorf("matched route is nil")
routes, err := routesProtoToSlice(vh.Routes, logger)
if err != nil {
return rdsUpdate{}, fmt.Errorf("received route is invalid: %v", err)
}

if wc := routeAction.GetWeightedClusters(); wc != nil {
m, err := weightedClustersProtoToMap(wc)
if err != nil {
return rdsUpdate{}, fmt.Errorf("matched weighted cluster is invalid: %v", err)
}
return rdsUpdate{weightedCluster: m}, nil
}

// When there's just one cluster, we set weightedCluster to map with one
// entry. This mean we will build a weighted_target balancer even if there's
// just one cluster.
//
// Otherwise, we will need to switch the top policy between weighted_target
// and CDS. In case when the action changes between one cluster and multiple
// clusters, changing top level policy means recreating TCP connection every
// time.
return rdsUpdate{weightedCluster: map[string]uint32{routeAction.GetCluster(): 1}}, nil
return rdsUpdate{routes: routes}, nil
}

func routesProtoToSlice(routes []*routepb.Route, logger *grpclog.PrefixLogger) ([]*Route, error) {
Expand Down
Loading

0 comments on commit 5f282cb

Please sign in to comment.