Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

balancer/base: keep address attributes for pickers #4253

Merged
merged 3 commits into from Mar 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
29 changes: 19 additions & 10 deletions balancer/base/balancer.go
Expand Up @@ -22,6 +22,7 @@ import (
"errors"
"fmt"

"google.golang.org/grpc/attributes"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
Expand All @@ -41,7 +42,7 @@ func (bb *baseBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions)
cc: cc,
pickerBuilder: bb.pickerBuilder,

subConns: make(map[resolver.Address]balancer.SubConn),
subConns: make(map[resolver.Address]subConnInfo),
scStates: make(map[balancer.SubConn]connectivity.State),
csEvltr: &balancer.ConnectivityStateEvaluator{},
config: bb.config,
Expand All @@ -57,14 +58,19 @@ func (bb *baseBuilder) Name() string {
return bb.name
}

type subConnInfo struct {
subConn balancer.SubConn
attrs *attributes.Attributes
}

type baseBalancer struct {
cc balancer.ClientConn
pickerBuilder PickerBuilder

csEvltr *balancer.ConnectivityStateEvaluator
state connectivity.State

subConns map[resolver.Address]balancer.SubConn // `attributes` is stripped from the keys of this map (the addresses)
subConns map[resolver.Address]subConnInfo // `attributes` is stripped from the keys of this map (the addresses)
scStates map[balancer.SubConn]connectivity.State
picker balancer.Picker
config Config
Expand Down Expand Up @@ -114,7 +120,7 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
aNoAttrs := a
aNoAttrs.Attributes = nil
addrsSet[aNoAttrs] = struct{}{}
if sc, ok := b.subConns[aNoAttrs]; !ok {
if scInfo, ok := b.subConns[aNoAttrs]; !ok {
// a is a new address (not existing in b.subConns).
//
// When creating SubConn, the original address with attributes is
Expand All @@ -125,7 +131,7 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
logger.Warningf("base.baseBalancer: failed to create new SubConn: %v", err)
continue
}
b.subConns[aNoAttrs] = sc
b.subConns[aNoAttrs] = subConnInfo{subConn: sc, attrs: a.Attributes}
b.scStates[sc] = connectivity.Idle
sc.Connect()
} else {
Expand All @@ -135,13 +141,15 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
// The SubConn does a reflect.DeepEqual of the new and old
// addresses. So this is a noop if the current address is the same
// as the old one (including attributes).
b.cc.UpdateAddresses(sc, []resolver.Address{a})
scInfo.attrs = a.Attributes
b.subConns[aNoAttrs] = scInfo
b.cc.UpdateAddresses(scInfo.subConn, []resolver.Address{a})
}
}
for a, sc := range b.subConns {
for a, scInfo := range b.subConns {
// a was removed by resolver.
if _, ok := addrsSet[a]; !ok {
b.cc.RemoveSubConn(sc)
b.cc.RemoveSubConn(scInfo.subConn)
delete(b.subConns, a)
// Keep the state of this sc in b.scStates until sc's state becomes Shutdown.
// The entry will be deleted in UpdateSubConnState.
Expand Down Expand Up @@ -184,9 +192,10 @@ func (b *baseBalancer) regeneratePicker() {
readySCs := make(map[balancer.SubConn]SubConnInfo)

// Filter out all ready SCs from full subConn map.
for addr, sc := range b.subConns {
if st, ok := b.scStates[sc]; ok && st == connectivity.Ready {
readySCs[sc] = SubConnInfo{Address: addr}
for addr, scInfo := range b.subConns {
if st, ok := b.scStates[scInfo.subConn]; ok && st == connectivity.Ready {
addr.Attributes = scInfo.attrs
readySCs[scInfo.subConn] = SubConnInfo{Address: addr}
}
}
b.picker = b.pickerBuilder.Build(PickerBuildInfo{ReadySCs: readySCs})
Expand Down
54 changes: 53 additions & 1 deletion balancer/base/balancer_test.go
Expand Up @@ -23,6 +23,7 @@ import (

"google.golang.org/grpc/attributes"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/resolver"
)

Expand All @@ -35,12 +36,24 @@ func (c *testClientConn) NewSubConn(addrs []resolver.Address, opts balancer.NewS
return c.newSubConn(addrs, opts)
}

func (c *testClientConn) UpdateState(balancer.State) {}

type testSubConn struct{}

func (sc *testSubConn) UpdateAddresses(addresses []resolver.Address) {}

func (sc *testSubConn) Connect() {}

// testPickBuilder creates balancer.Picker for test.
type testPickBuilder struct {
validate func(info PickerBuildInfo)
}

func (p *testPickBuilder) Build(info PickerBuildInfo) balancer.Picker {
p.validate(info)
return nil
}

func TestBaseBalancerStripAttributes(t *testing.T) {
b := (&baseBuilder{}).Build(&testClientConn{
newSubConn: func(addrs []resolver.Address, _ balancer.NewSubConnOptions) (balancer.SubConn, error) {
Expand All @@ -64,7 +77,46 @@ func TestBaseBalancerStripAttributes(t *testing.T) {

for addr := range b.subConns {
if addr.Attributes != nil {
t.Errorf("in b.subConns, got address %+v with nil attributes, want not nil", addr)
t.Errorf("in b.subConns, got address %+v with not nil attributes, want nil", addr)
}
}
}

func TestBaseBalancerReserveAttributes(t *testing.T) {
var v = func(info PickerBuildInfo) {
for _, sc := range info.ReadySCs {
if sc.Address.Addr == "1.1.1.1" {
if sc.Address.Attributes == nil {
t.Errorf("in picker.validate, got address %+v with nil attributes, want not nil", sc.Address)
}
foo, ok := sc.Address.Attributes.Value("foo").(string)
if !ok || foo != "2233niang" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤦‍♂️

t.Errorf("in picker.validate, got address[1.1.1.1] with invalid attributes value %v, want 2233niang", sc.Address.Attributes.Value("foo"))
}
} else if sc.Address.Addr == "2.2.2.2" {
if sc.Address.Attributes != nil {
t.Error("in b.subConns, got address[2.2.2.2] with not nil attributes, want nil")
}
}
}
}
pickBuilder := &testPickBuilder{validate: v}
b := (&baseBuilder{pickerBuilder: pickBuilder}).Build(&testClientConn{
newSubConn: func(addrs []resolver.Address, _ balancer.NewSubConnOptions) (balancer.SubConn, error) {
return &testSubConn{}, nil
},
}, balancer.BuildOptions{}).(*baseBalancer)

b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{
Addresses: []resolver.Address{
{Addr: "1.1.1.1", Attributes: attributes.New("foo", "2233niang")},
{Addr: "2.2.2.2", Attributes: nil},
},
},
})

for sc := range b.scStates {
b.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready, ConnectionError: nil})
}
}