Skip to content

Commit

Permalink
balancergroup: Propagate balancer.BuildOptions to child policies (#4184)
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars committed Feb 8, 2021
1 parent b753f49 commit 9280052
Show file tree
Hide file tree
Showing 12 changed files with 175 additions and 85 deletions.
11 changes: 9 additions & 2 deletions xds/internal/balancer/balancergroup/balancergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ type subBalancerWrapper struct {
// The static part of sub-balancer. Keeps balancerBuilders and addresses.
// To be used when restarting sub-balancer.
builder balancer.Builder
// Options to be passed to sub-balancer at the time of creation.
buildOpts balancer.BuildOptions
// ccState is a cache of the addresses/balancer config, so when the balancer
// is restarted after close, it will get the previous update. It's a pointer
// and is set to nil at init, so when the balancer is built for the first
Expand Down Expand Up @@ -94,7 +96,7 @@ func (sbc *subBalancerWrapper) updateBalancerStateWithCachedPicker() {
}

func (sbc *subBalancerWrapper) startBalancer() {
b := sbc.builder.Build(sbc, balancer.BuildOptions{})
b := sbc.builder.Build(sbc, sbc.buildOpts)
sbc.group.logger.Infof("Created child policy %p of type %v", b, sbc.builder.Name())
sbc.balancer = b
if sbc.ccState != nil {
Expand Down Expand Up @@ -179,6 +181,7 @@ func (sbc *subBalancerWrapper) stopBalancer() {
// balancer group.
type BalancerGroup struct {
cc balancer.ClientConn
buildOpts balancer.BuildOptions
logger *grpclog.PrefixLogger
loadStore load.PerClusterReporter

Expand Down Expand Up @@ -235,9 +238,12 @@ var DefaultSubBalancerCloseTimeout = 15 * time.Minute

// New creates a new BalancerGroup. Note that the BalancerGroup
// needs to be started to work.
func New(cc balancer.ClientConn, stateAggregator BalancerStateAggregator, loadStore load.PerClusterReporter, logger *grpclog.PrefixLogger) *BalancerGroup {
//
// TODO(easwars): Pass an options struct instead of N args.
func New(cc balancer.ClientConn, bOpts balancer.BuildOptions, stateAggregator BalancerStateAggregator, loadStore load.PerClusterReporter, logger *grpclog.PrefixLogger) *BalancerGroup {
return &BalancerGroup{
cc: cc,
buildOpts: bOpts,
logger: logger,
loadStore: loadStore,

Expand Down Expand Up @@ -305,6 +311,7 @@ func (bg *BalancerGroup) Add(id string, builder balancer.Builder) {
id: id,
group: bg,
builder: builder,
buildOpts: bg.buildOpts,
}
if bg.outgoingStarted {
// Only start the balancer if bg is started. Otherwise, we only keep the
Expand Down
58 changes: 52 additions & 6 deletions xds/internal/balancer/balancergroup/balancergroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/xds/internal/balancer/weightedtarget/weightedaggregator"
"google.golang.org/grpc/xds/internal/client/load"
Expand Down Expand Up @@ -74,7 +76,7 @@ func newTestBalancerGroup(t *testing.T, loadStore load.PerClusterReporter) (*tes
cc := testutils.NewTestClientConn(t)
gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR)
gator.Start()
bg := New(cc, gator, loadStore, nil)
bg := New(cc, balancer.BuildOptions{}, gator, loadStore, nil)
bg.Start()
return cc, gator, bg
}
Expand Down Expand Up @@ -501,7 +503,7 @@ func (s) TestBalancerGroup_start_close(t *testing.T) {
cc := testutils.NewTestClientConn(t)
gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR)
gator.Start()
bg := New(cc, gator, nil, nil)
bg := New(cc, balancer.BuildOptions{}, gator, nil, nil)

// Add two balancers to group and send two resolved addresses to both
// balancers.
Expand Down Expand Up @@ -590,16 +592,20 @@ func (s) TestBalancerGroup_start_close(t *testing.T) {
// whenever it gets an address update. It's expected that start() doesn't block
// because of deadlock.
func (s) TestBalancerGroup_start_close_deadlock(t *testing.T) {
const balancerName = "stub-TestBalancerGroup_start_close_deadlock"
stub.Register(balancerName, stub.BalancerFuncs{})
builder := balancer.Get(balancerName)

cc := testutils.NewTestClientConn(t)
gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR)
gator.Start()
bg := New(cc, gator, nil, nil)
bg := New(cc, balancer.BuildOptions{}, gator, nil, nil)

gator.Add(testBalancerIDs[0], 2)
bg.Add(testBalancerIDs[0], &testutils.TestConstBalancerBuilder{})
bg.Add(testBalancerIDs[0], builder)
bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:2]}})
gator.Add(testBalancerIDs[1], 1)
bg.Add(testBalancerIDs[1], &testutils.TestConstBalancerBuilder{})
bg.Add(testBalancerIDs[1], builder)
bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}})

bg.Start()
Expand Down Expand Up @@ -695,7 +701,7 @@ func initBalancerGroupForCachingTest(t *testing.T) (*weightedaggregator.Aggregat
cc := testutils.NewTestClientConn(t)
gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR)
gator.Start()
bg := New(cc, gator, nil, nil)
bg := New(cc, balancer.BuildOptions{}, gator, nil, nil)

// Add two balancers to group and send two resolved addresses to both
// balancers.
Expand Down Expand Up @@ -931,3 +937,43 @@ func (s) TestBalancerGroup_locality_caching_readd_with_different_builder(t *test
t.Fatalf("want %v, got %v", want, err)
}
}

// TestBalancerGroupBuildOptions verifies that the balancer.BuildOptions passed
// to the balancergroup at creation time is passed to child policies.
func (s) TestBalancerGroupBuildOptions(t *testing.T) {
const (
balancerName = "stubBalancer-TestBalancerGroupBuildOptions"
parent = int64(1234)
userAgent = "ua"
defaultTestTimeout = 1 * time.Second
)

// Setup the stub balancer such that we can read the build options passed to
// it in the UpdateClientConnState method.
bOpts := balancer.BuildOptions{
DialCreds: insecure.NewCredentials(),
ChannelzParentID: parent,
CustomUserAgent: userAgent,
}
stub.Register(balancerName, stub.BalancerFuncs{
UpdateClientConnState: func(bd *stub.BalancerData, _ balancer.ClientConnState) error {
if !cmp.Equal(bd.BuildOptions, bOpts) {
return fmt.Errorf("buildOptions in child balancer: %v, want %v", bd, bOpts)
}
return nil
},
})
cc := testutils.NewTestClientConn(t)
bg := New(cc, bOpts, nil, nil, nil)
bg.Start()

// Add the stub balancer build above as a child policy.
balancerBuilder := balancer.Get(balancerName)
bg.Add(testBalancerIDs[0], balancerBuilder)

// Send an empty clientConn state change. This should trigger the
// verification of the buildOptions being passed to the child policy.
if err := bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{}); err != nil {
t.Fatal(err)
}
}
4 changes: 2 additions & 2 deletions xds/internal/balancer/clustermanager/clustermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ func init() {

type builder struct{}

func (builder) Build(cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer {
func (builder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
b := &bal{}
b.logger = prefixLogger(b)
b.stateAggregator = newBalancerStateAggregator(cc, b.logger)
b.stateAggregator.start()
b.bg = balancergroup.New(cc, b.stateAggregator, nil, b.logger)
b.bg = balancergroup.New(cc, opts, b.stateAggregator, nil, b.logger)
b.bg.Start()
b.logger.Infof("Created")
return b
Expand Down
55 changes: 55 additions & 0 deletions xds/internal/balancer/clustermanager/clustermanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@ import (
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/hierarchy"
itestutils "google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/status"
"google.golang.org/grpc/xds/internal/balancer/balancergroup"
Expand Down Expand Up @@ -510,3 +513,55 @@ func TestRoutingConfigUpdateDeleteAll(t *testing.T) {
testPick(t, p3, tt.pickInfo, tt.wantSC, tt.wantErr)
}
}

func TestClusterManagerForwardsBalancerBuildOptions(t *testing.T) {
const (
balancerName = "stubBalancer-TestClusterManagerForwardsBalancerBuildOptions"
parent = int64(1234)
userAgent = "ua"
defaultTestTimeout = 1 * time.Second
)

// Setup the stub balancer such that we can read the build options passed to
// it in the UpdateClientConnState method.
ccsCh := itestutils.NewChannel()
bOpts := balancer.BuildOptions{
DialCreds: insecure.NewCredentials(),
ChannelzParentID: parent,
CustomUserAgent: userAgent,
}
stub.Register(balancerName, stub.BalancerFuncs{
UpdateClientConnState: func(bd *stub.BalancerData, _ balancer.ClientConnState) error {
if !cmp.Equal(bd.BuildOptions, bOpts) {
err := fmt.Errorf("buildOptions in child balancer: %v, want %v", bd, bOpts)
ccsCh.Send(err)
return err
}
ccsCh.Send(nil)
return nil
},
})

cc := testutils.NewTestClientConn(t)
rtb := rtBuilder.Build(cc, bOpts)

configJSON1 := fmt.Sprintf(`{
"children": {
"cds:cluster_1":{ "childPolicy": [{"%s":""}] }
}
}`, balancerName)
config1, err := rtParser.ParseConfig([]byte(configJSON1))
if err != nil {
t.Fatalf("failed to parse balancer config: %v", err)
}

if err := rtb.UpdateClientConnState(balancer.ClientConnState{BalancerConfig: config1}); err != nil {
t.Fatalf("failed to update ClientConn state: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if v, err := ccsCh.Receive(ctx); err != nil {
err2 := v.(error)
t.Fatal(err2)
}
}
8 changes: 4 additions & 4 deletions xds/internal/balancer/edsbalancer/eds.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ type xdsClientInterface interface {
}

var (
newEDSBalancer = func(cc balancer.ClientConn, enqueueState func(priorityType, balancer.State), lw load.PerClusterReporter, logger *grpclog.PrefixLogger) edsBalancerImplInterface {
return newEDSBalancerImpl(cc, enqueueState, lw, logger)
newEDSBalancer = func(cc balancer.ClientConn, opts balancer.BuildOptions, enqueueState func(priorityType, balancer.State), lw load.PerClusterReporter, logger *grpclog.PrefixLogger) edsBalancerImplInterface {
return newEDSBalancerImpl(cc, opts, enqueueState, lw, logger)
}
newXDSClient = func() (xdsClientInterface, error) { return xdsclient.New() }
)
Expand All @@ -61,7 +61,7 @@ func init() {
type edsBalancerBuilder struct{}

// Build helps implement the balancer.Builder interface.
func (b *edsBalancerBuilder) Build(cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer {
func (b *edsBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
x := &edsBalancer{
cc: cc,
closed: grpcsync.NewEvent(),
Expand All @@ -80,7 +80,7 @@ func (b *edsBalancerBuilder) Build(cc balancer.ClientConn, _ balancer.BuildOptio
}

x.xdsClient = client
x.edsImpl = newEDSBalancer(x.cc, x.enqueueChildBalancerState, x.lsw, x.logger)
x.edsImpl = newEDSBalancer(x.cc, opts, x.enqueueChildBalancerState, x.lsw, x.logger)
x.logger.Infof("Created")
go x.run()
return x
Expand Down
7 changes: 5 additions & 2 deletions xds/internal/balancer/edsbalancer/eds_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/google/go-cmp/cmp"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/balancer/roundrobin"
Expand Down Expand Up @@ -65,6 +66,7 @@ type balancerGroupWithConfig struct {
// policy is used to manage endpoints in each locality.
type edsBalancerImpl struct {
cc balancer.ClientConn
buildOpts balancer.BuildOptions
logger *grpclog.PrefixLogger
loadReporter load.PerClusterReporter

Expand Down Expand Up @@ -102,9 +104,10 @@ type edsBalancerImpl struct {
}

// newEDSBalancerImpl create a new edsBalancerImpl.
func newEDSBalancerImpl(cc balancer.ClientConn, enqueueState func(priorityType, balancer.State), lr load.PerClusterReporter, logger *grpclog.PrefixLogger) *edsBalancerImpl {
func newEDSBalancerImpl(cc balancer.ClientConn, bOpts balancer.BuildOptions, enqueueState func(priorityType, balancer.State), lr load.PerClusterReporter, logger *grpclog.PrefixLogger) *edsBalancerImpl {
edsImpl := &edsBalancerImpl{
cc: cc,
buildOpts: bOpts,
logger: logger,
subBalancerBuilder: balancer.Get(roundrobin.Name),
loadReporter: lr,
Expand Down Expand Up @@ -248,7 +251,7 @@ func (edsImpl *edsBalancerImpl) handleEDSResponse(edsResp xdsclient.EndpointsUpd
ccPriorityWrapper := edsImpl.ccWrapperWithPriority(priority)
stateAggregator := weightedaggregator.New(ccPriorityWrapper, edsImpl.logger, newRandomWRR)
bgwc = &balancerGroupWithConfig{
bg: balancergroup.New(ccPriorityWrapper, stateAggregator, edsImpl.loadReporter, edsImpl.logger),
bg: balancergroup.New(ccPriorityWrapper, edsImpl.buildOpts, stateAggregator, edsImpl.loadReporter, edsImpl.logger),
stateAggregator: stateAggregator,
configs: make(map[internal.LocalityID]*localityConfig),
}
Expand Down
20 changes: 10 additions & 10 deletions xds/internal/balancer/edsbalancer/eds_impl_priority_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
// Init 0 and 1; 0 is up, use 0; add 2, use 0; remove 2, use 0.
func (s) TestEDSPriority_HighPriorityReady(t *testing.T) {
cc := testutils.NewTestClientConn(t)
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
edsb := newEDSBalancerImpl(cc, balancer.BuildOptions{}, nil, nil, nil)
edsb.enqueueChildBalancerStateUpdate = edsb.updateState

// Two localities, with priorities [0, 1], each with one backend.
Expand Down Expand Up @@ -101,7 +101,7 @@ func (s) TestEDSPriority_HighPriorityReady(t *testing.T) {
// down, use 2; remove 2, use 1.
func (s) TestEDSPriority_SwitchPriority(t *testing.T) {
cc := testutils.NewTestClientConn(t)
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
edsb := newEDSBalancerImpl(cc, balancer.BuildOptions{}, nil, nil, nil)
edsb.enqueueChildBalancerStateUpdate = edsb.updateState

// Two localities, with priorities [0, 1], each with one backend.
Expand Down Expand Up @@ -208,7 +208,7 @@ func (s) TestEDSPriority_SwitchPriority(t *testing.T) {
// Init 0 and 1; 0 and 1 both down; add 2, use 2.
func (s) TestEDSPriority_HigherDownWhileAddingLower(t *testing.T) {
cc := testutils.NewTestClientConn(t)
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
edsb := newEDSBalancerImpl(cc, balancer.BuildOptions{}, nil, nil, nil)
edsb.enqueueChildBalancerStateUpdate = edsb.updateState

// Two localities, with different priorities, each with one backend.
Expand Down Expand Up @@ -271,7 +271,7 @@ func (s) TestEDSPriority_HigherDownWhileAddingLower(t *testing.T) {
// Init 0,1,2; 0 and 1 down, use 2; 0 up, close 1 and 2.
func (s) TestEDSPriority_HigherReadyCloseAllLower(t *testing.T) {
cc := testutils.NewTestClientConn(t)
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
edsb := newEDSBalancerImpl(cc, balancer.BuildOptions{}, nil, nil, nil)
edsb.enqueueChildBalancerStateUpdate = edsb.updateState

// Two localities, with priorities [0,1,2], each with one backend.
Expand Down Expand Up @@ -353,7 +353,7 @@ func (s) TestEDSPriority_InitTimeout(t *testing.T) {
}()()

cc := testutils.NewTestClientConn(t)
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
edsb := newEDSBalancerImpl(cc, balancer.BuildOptions{}, nil, nil, nil)
edsb.enqueueChildBalancerStateUpdate = edsb.updateState

// Two localities, with different priorities, each with one backend.
Expand Down Expand Up @@ -403,7 +403,7 @@ func (s) TestEDSPriority_InitTimeout(t *testing.T) {
// - add localities to existing p0 and p1
func (s) TestEDSPriority_MultipleLocalities(t *testing.T) {
cc := testutils.NewTestClientConn(t)
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
edsb := newEDSBalancerImpl(cc, balancer.BuildOptions{}, nil, nil, nil)
edsb.enqueueChildBalancerStateUpdate = edsb.updateState

// Two localities, with different priorities, each with one backend.
Expand Down Expand Up @@ -514,7 +514,7 @@ func (s) TestEDSPriority_RemovesAllLocalities(t *testing.T) {
}()()

cc := testutils.NewTestClientConn(t)
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
edsb := newEDSBalancerImpl(cc, balancer.BuildOptions{}, nil, nil, nil)
edsb.enqueueChildBalancerStateUpdate = edsb.updateState

// Two localities, with different priorities, each with one backend.
Expand Down Expand Up @@ -698,7 +698,7 @@ func (s) TestPriorityTypeEqual(t *testing.T) {
// will be used.
func (s) TestEDSPriority_HighPriorityNoEndpoints(t *testing.T) {
cc := testutils.NewTestClientConn(t)
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
edsb := newEDSBalancerImpl(cc, balancer.BuildOptions{}, nil, nil, nil)
edsb.enqueueChildBalancerStateUpdate = edsb.updateState

// Two localities, with priorities [0, 1], each with one backend.
Expand Down Expand Up @@ -757,7 +757,7 @@ func (s) TestEDSPriority_HighPriorityNoEndpoints(t *testing.T) {
// priority will be used.
func (s) TestEDSPriority_HighPriorityAllUnhealthy(t *testing.T) {
cc := testutils.NewTestClientConn(t)
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
edsb := newEDSBalancerImpl(cc, balancer.BuildOptions{}, nil, nil, nil)
edsb.enqueueChildBalancerStateUpdate = edsb.updateState

// Two localities, with priorities [0, 1], each with one backend.
Expand Down Expand Up @@ -823,7 +823,7 @@ func (s) TestEDSPriority_FirstPriorityUnavailable(t *testing.T) {
defaultPriorityInitTimeout = testPriorityInitTimeout

cc := testutils.NewTestClientConn(t)
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
edsb := newEDSBalancerImpl(cc, balancer.BuildOptions{}, nil, nil, nil)
edsb.enqueueChildBalancerStateUpdate = edsb.updateState

// One localities, with priorities [0], each with one backend.
Expand Down
Loading

0 comments on commit 9280052

Please sign in to comment.