Skip to content

Commit

Permalink
xds/federation: support federation in LRS (#5128)
Browse files Browse the repository at this point in the history
  • Loading branch information
menghanl authored Jan 26, 2022
1 parent 61a6a06 commit 0a68f8a
Show file tree
Hide file tree
Showing 23 changed files with 359 additions and 226 deletions.
20 changes: 14 additions & 6 deletions xds/internal/balancer/cdsbalancer/cdsbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,12 +318,20 @@ func (b *cdsBalancer) handleWatchUpdate(update clusterHandlerUpdate) {
EDSServiceName: cu.EDSServiceName,
MaxConcurrentRequests: cu.MaxRequests,
}
if cu.EnableLRS {
// An empty string here indicates that the cluster_resolver balancer should use the
// same xDS server for load reporting as it does for EDS
// requests/responses.
dms[i].LoadReportingServerName = new(string)

if cu.LRSServerConfig == xdsresource.ClusterLRSServerSelf {
bootstrapConfig := b.xdsClient.BootstrapConfig()
parsedName := xdsresource.ParseName(cu.ClusterName)
if parsedName.Scheme == xdsresource.FederationScheme {
// Is a federation resource name, find the corresponding
// authority server config.
if cfg, ok := bootstrapConfig.Authorities[parsedName.Authority]; ok {
dms[i].LoadReportingServer = cfg.XDSServer
}
} else {
// Not a federation resource name, use the default
// authority.
dms[i].LoadReportingServer = bootstrapConfig.XDSServer
}
}
case xdsresource.ClusterTypeLogicalDNS:
dms[i] = clusterresolver.DiscoveryMechanism{
Expand Down
14 changes: 11 additions & 3 deletions xds/internal/balancer/cdsbalancer/cdsbalancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"google.golang.org/grpc/xds/internal/balancer/ringhash"
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)

Expand All @@ -48,6 +49,11 @@ const (
defaultTestShortTimeout = 10 * time.Millisecond // For events expected to *not* happen.
)

var defaultTestAuthorityServerConfig = &bootstrap.ServerConfig{
ServerURI: "self_server",
CredsType: "self_creds",
}

type s struct {
grpctest.Tester
}
Expand Down Expand Up @@ -209,8 +215,7 @@ func edsCCS(service string, countMax *uint32, enableLRS bool, xdslbpolicy *inter
MaxConcurrentRequests: countMax,
}
if enableLRS {
discoveryMechanism.LoadReportingServerName = new(string)

discoveryMechanism.LoadReportingServer = defaultTestAuthorityServerConfig
}
lbCfg := &clusterresolver.LBConfig{
DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{discoveryMechanism},
Expand Down Expand Up @@ -354,6 +359,9 @@ func (s) TestUpdateClientConnStateWithSameState(t *testing.T) {
// to the edsBalancer.
func (s) TestHandleClusterUpdate(t *testing.T) {
xdsC, cdsB, edsB, _, cancel := setupWithWatch(t)
xdsC.SetBootstrapConfig(&bootstrap.Config{
XDSServer: defaultTestAuthorityServerConfig,
})
defer func() {
cancel()
cdsB.Close()
Expand All @@ -367,7 +375,7 @@ func (s) TestHandleClusterUpdate(t *testing.T) {
}{
{
name: "happy-case-with-lrs",
cdsUpdate: xdsresource.ClusterUpdate{ClusterName: serviceName, EnableLRS: true},
cdsUpdate: xdsresource.ClusterUpdate{ClusterName: serviceName, LRSServerConfig: xdsresource.ClusterLRSServerSelf},
wantCCS: edsCCS(serviceName, nil, true, nil),
},
{
Expand Down
77 changes: 42 additions & 35 deletions xds/internal/balancer/clusterimpl/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,22 +41,26 @@ import (
xdsinternal "google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/grpc/xds/internal/xdsclient/load"
)

const (
defaultTestTimeout = 1 * time.Second
defaultShortTestTimeout = 100 * time.Microsecond

testClusterName = "test-cluster"
testServiceName = "test-eds-service"
testLRSServerName = "test-lrs-name"
testClusterName = "test-cluster"
testServiceName = "test-eds-service"
)

var (
testBackendAddrs = []resolver.Address{
{Addr: "1.1.1.1:1"},
}
testLRSServerConfig = &bootstrap.ServerConfig{
ServerURI: "trafficdirector.googleapis.com:443",
CredsType: "google_default",
}

cmpOpts = cmp.Options{
cmpopts.EquateEmpty(),
Expand Down Expand Up @@ -103,9 +107,9 @@ func (s) TestDropByCategory(t *testing.T) {
if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC),
BalancerConfig: &LBConfig{
Cluster: testClusterName,
EDSServiceName: testServiceName,
LoadReportingServerName: newString(testLRSServerName),
Cluster: testClusterName,
EDSServiceName: testServiceName,
LoadReportingServer: testLRSServerConfig,
DropCategories: []DropConfig{{
Category: dropReason,
RequestsPerMillion: million * dropNumerator / dropDenominator,
Expand All @@ -125,8 +129,8 @@ func (s) TestDropByCategory(t *testing.T) {
if err != nil {
t.Fatalf("xdsClient.ReportLoad failed with error: %v", err)
}
if got.Server != testLRSServerName {
t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerName)
if got.Server != testLRSServerConfig {
t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerConfig)
}

sc1 := <-cc.NewSubConnCh
Expand Down Expand Up @@ -191,9 +195,9 @@ func (s) TestDropByCategory(t *testing.T) {
if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC),
BalancerConfig: &LBConfig{
Cluster: testClusterName,
EDSServiceName: testServiceName,
LoadReportingServerName: newString(testLRSServerName),
Cluster: testClusterName,
EDSServiceName: testServiceName,
LoadReportingServer: testLRSServerConfig,
DropCategories: []DropConfig{{
Category: dropReason2,
RequestsPerMillion: million * dropNumerator2 / dropDenominator2,
Expand Down Expand Up @@ -257,10 +261,10 @@ func (s) TestDropCircuitBreaking(t *testing.T) {
if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC),
BalancerConfig: &LBConfig{
Cluster: testClusterName,
EDSServiceName: testServiceName,
LoadReportingServerName: newString(testLRSServerName),
MaxConcurrentRequests: &maxRequest,
Cluster: testClusterName,
EDSServiceName: testServiceName,
LoadReportingServer: testLRSServerConfig,
MaxConcurrentRequests: &maxRequest,
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: roundrobin.Name,
},
Expand All @@ -276,8 +280,8 @@ func (s) TestDropCircuitBreaking(t *testing.T) {
if err != nil {
t.Fatalf("xdsClient.ReportLoad failed with error: %v", err)
}
if got.Server != testLRSServerName {
t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerName)
if got.Server != testLRSServerConfig {
t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerConfig)
}

sc1 := <-cc.NewSubConnCh
Expand Down Expand Up @@ -605,9 +609,9 @@ func (s) TestLoadReporting(t *testing.T) {
if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{Addresses: addrs}, xdsC),
BalancerConfig: &LBConfig{
Cluster: testClusterName,
EDSServiceName: testServiceName,
LoadReportingServerName: newString(testLRSServerName),
Cluster: testClusterName,
EDSServiceName: testServiceName,
LoadReportingServer: testLRSServerConfig,
// Locality: testLocality,
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: roundrobin.Name,
Expand All @@ -624,8 +628,8 @@ func (s) TestLoadReporting(t *testing.T) {
if err != nil {
t.Fatalf("xdsClient.ReportLoad failed with error: %v", err)
}
if got.Server != testLRSServerName {
t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerName)
if got.Server != testLRSServerConfig {
t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerConfig)
}

sc1 := <-cc.NewSubConnCh
Expand Down Expand Up @@ -720,9 +724,9 @@ func (s) TestUpdateLRSServer(t *testing.T) {
if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{Addresses: addrs}, xdsC),
BalancerConfig: &LBConfig{
Cluster: testClusterName,
EDSServiceName: testServiceName,
LoadReportingServerName: newString(""),
Cluster: testClusterName,
EDSServiceName: testServiceName,
LoadReportingServer: testLRSServerConfig,
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: roundrobin.Name,
},
Expand All @@ -738,17 +742,21 @@ func (s) TestUpdateLRSServer(t *testing.T) {
if err != nil {
t.Fatalf("xdsClient.ReportLoad failed with error: %v", err)
}
if got.Server != "" {
t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, "")
if got.Server != testLRSServerConfig {
t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerConfig)
}

testLRSServerConfig2 := &bootstrap.ServerConfig{
ServerURI: "trafficdirector-another.googleapis.com:443",
CredsType: "google_default",
}
// Update LRS server to a different name.
if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{Addresses: addrs}, xdsC),
BalancerConfig: &LBConfig{
Cluster: testClusterName,
EDSServiceName: testServiceName,
LoadReportingServerName: newString(testLRSServerName),
Cluster: testClusterName,
EDSServiceName: testServiceName,
LoadReportingServer: testLRSServerConfig2,
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: roundrobin.Name,
},
Expand All @@ -763,17 +771,16 @@ func (s) TestUpdateLRSServer(t *testing.T) {
if err2 != nil {
t.Fatalf("xdsClient.ReportLoad failed with error: %v", err2)
}
if got2.Server != testLRSServerName {
t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got2.Server, testLRSServerName)
if got2.Server != testLRSServerConfig2 {
t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got2.Server, testLRSServerConfig2)
}

// Update LRS server to nil, to disable LRS.
if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{Addresses: addrs}, xdsC),
BalancerConfig: &LBConfig{
Cluster: testClusterName,
EDSServiceName: testServiceName,
LoadReportingServerName: nil,
Cluster: testClusterName,
EDSServiceName: testServiceName,
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: roundrobin.Name,
},
Expand Down
19 changes: 10 additions & 9 deletions xds/internal/balancer/clusterimpl/clusterimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
xdsinternal "google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/balancer/loadstore"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/grpc/xds/internal/xdsclient/load"
)

Expand Down Expand Up @@ -104,7 +105,7 @@ type clusterImplBalancer struct {
childLB balancer.Balancer
cancelLoadReport func()
edsServiceName string
lrsServerName *string
lrsServer *bootstrap.ServerConfig
loadWrapper *loadstore.Wrapper

clusterNameMu sync.Mutex
Expand Down Expand Up @@ -171,22 +172,22 @@ func (b *clusterImplBalancer) updateLoadStore(newConfig *LBConfig) error {
)

// Check if it's necessary to restart load report.
if b.lrsServerName == nil {
if newConfig.LoadReportingServerName != nil {
if b.lrsServer == nil {
if newConfig.LoadReportingServer != nil {
// Old is nil, new is not nil, start new LRS.
b.lrsServerName = newConfig.LoadReportingServerName
b.lrsServer = newConfig.LoadReportingServer
startNewLoadReport = true
}
// Old is nil, new is nil, do nothing.
} else if newConfig.LoadReportingServerName == nil {
} else if newConfig.LoadReportingServer == nil {
// Old is not nil, new is nil, stop old, don't start new.
b.lrsServerName = newConfig.LoadReportingServerName
b.lrsServer = newConfig.LoadReportingServer
stopOldLoadReport = true
} else {
// Old is not nil, new is not nil, compare string values, if
// different, stop old and start new.
if *b.lrsServerName != *newConfig.LoadReportingServerName {
b.lrsServerName = newConfig.LoadReportingServerName
if *b.lrsServer != *newConfig.LoadReportingServer {
b.lrsServer = newConfig.LoadReportingServer
stopOldLoadReport = true
startNewLoadReport = true
}
Expand All @@ -206,7 +207,7 @@ func (b *clusterImplBalancer) updateLoadStore(newConfig *LBConfig) error {
if startNewLoadReport {
var loadStore *load.Store
if b.xdsClient != nil {
loadStore, b.cancelLoadReport = b.xdsClient.ReportLoad(*b.lrsServerName)
loadStore, b.cancelLoadReport = b.xdsClient.ReportLoad(b.lrsServer)
}
b.loadWrapper.UpdateLoadStore(loadStore)
}
Expand Down
15 changes: 9 additions & 6 deletions xds/internal/balancer/clusterimpl/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
)

// DropConfig contains the category, and drop ratio.
Expand All @@ -35,12 +36,14 @@ type DropConfig struct {
type LBConfig struct {
serviceconfig.LoadBalancingConfig `json:"-"`

Cluster string `json:"cluster,omitempty"`
EDSServiceName string `json:"edsServiceName,omitempty"`
LoadReportingServerName *string `json:"lrsLoadReportingServerName,omitempty"`
MaxConcurrentRequests *uint32 `json:"maxConcurrentRequests,omitempty"`
DropCategories []DropConfig `json:"dropCategories,omitempty"`
ChildPolicy *internalserviceconfig.BalancerConfig `json:"childPolicy,omitempty"`
Cluster string `json:"cluster,omitempty"`
EDSServiceName string `json:"edsServiceName,omitempty"`
// LoadReportingServer is the LRS server to send load reports to. If not
// present, load reporting will be disabled.
LoadReportingServer *bootstrap.ServerConfig `json:"lrsLoadReportingServer,omitempty"`
MaxConcurrentRequests *uint32 `json:"maxConcurrentRequests,omitempty"`
DropCategories []DropConfig `json:"dropCategories,omitempty"`
ChildPolicy *internalserviceconfig.BalancerConfig `json:"childPolicy,omitempty"`
}

func parseConfig(c json.RawMessage) (*LBConfig, error) {
Expand Down
21 changes: 11 additions & 10 deletions xds/internal/balancer/clusterimpl/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,22 @@ import (
"testing"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"google.golang.org/grpc/balancer"
_ "google.golang.org/grpc/balancer/roundrobin"
_ "google.golang.org/grpc/balancer/weightedtarget"
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
)

const (
testJSONConfig = `{
"cluster": "test_cluster",
"edsServiceName": "test-eds",
"lrsLoadReportingServerName": "lrs_server",
"lrsLoadReportingServer": {
"server_uri": "trafficdirector.googleapis.com:443",
"channel_creds": [ { "type": "google_default" } ]
},
"maxConcurrentRequests": 123,
"dropCategories": [
{
Expand Down Expand Up @@ -106,10 +111,10 @@ func TestParseConfig(t *testing.T) {
name: "OK",
js: testJSONConfig,
want: &LBConfig{
Cluster: "test_cluster",
EDSServiceName: "test-eds",
LoadReportingServerName: newString("lrs_server"),
MaxConcurrentRequests: newUint32(123),
Cluster: "test_cluster",
EDSServiceName: "test-eds",
LoadReportingServer: testLRSServerConfig,
MaxConcurrentRequests: newUint32(123),
DropCategories: []DropConfig{
{Category: "drop-1", RequestsPerMillion: 314},
{Category: "drop-2", RequestsPerMillion: 159},
Expand All @@ -128,17 +133,13 @@ func TestParseConfig(t *testing.T) {
if (err != nil) != tt.wantErr {
t.Fatalf("parseConfig() error = %v, wantErr %v", err, tt.wantErr)
}
if !cmp.Equal(got, tt.want) {
if !cmp.Equal(got, tt.want, cmpopts.IgnoreFields(bootstrap.ServerConfig{}, "Creds")) {
t.Errorf("parseConfig() got unexpected result, diff: %v", cmp.Diff(got, tt.want))
}
})
}
}

func newString(s string) *string {
return &s
}

func newUint32(i uint32) *uint32 {
return &i
}
Loading

0 comments on commit 0a68f8a

Please sign in to comment.