Skip to content

Commit

Permalink
xds: move balancer group to a separate package (#3493)
Browse files Browse the repository at this point in the history
This PR moves balancer group to a separate package and exports the type with some methods. Balancer group will be used by the weighted_target policy.
  • Loading branch information
menghanl committed Apr 6, 2020
1 parent c5faf56 commit 98e4c7a
Show file tree
Hide file tree
Showing 13 changed files with 1,078 additions and 952 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
* limitations under the License.
*/

package edsbalancer
// Package balancergroup implements a utility struct to bind multiple balancers
// into one balancer.
package balancergroup

import (
"fmt"
Expand Down Expand Up @@ -52,7 +54,7 @@ type subBalancerWithConfig struct {
// Some are forward to balancer group with the sub-balancer ID.
balancer.ClientConn
id internal.Locality
group *balancerGroup
group *BalancerGroup

mu sync.Mutex
state balancer.State
Expand Down Expand Up @@ -154,7 +156,7 @@ func (s *pickerState) String() string {
return fmt.Sprintf("weight:%v,picker:%p,state:%v", s.weight, s.picker, s.state)
}

// balancerGroup takes a list of balancers, and make then into one balancer.
// BalancerGroup takes a list of balancers, and make them into one balancer.
//
// Note that this struct doesn't implement balancer.Balancer, because it's not
// intended to be used directly as a balancer. It's expected to be used as a
Expand All @@ -178,7 +180,7 @@ func (s *pickerState) String() string {
// balancer group is closed, the sub-balancers are also closed. And it's
// guaranteed that no updates will be sent to parent ClientConn from a closed
// balancer group.
type balancerGroup struct {
type BalancerGroup struct {
cc balancer.ClientConn
logger *grpclog.PrefixLogger
loadStore lrs.Store
Expand Down Expand Up @@ -226,26 +228,34 @@ type balancerGroup struct {
idToPickerState map[internal.Locality]*pickerState
}

// defaultSubBalancerCloseTimeout is defined as a variable instead of const for
// DefaultSubBalancerCloseTimeout is defined as a variable instead of const for
// testing.
//
// TODO: make it a parameter for newBalancerGroup().
var defaultSubBalancerCloseTimeout = 15 * time.Minute
// TODO: make it a parameter for New().
var DefaultSubBalancerCloseTimeout = 15 * time.Minute

func newBalancerGroup(cc balancer.ClientConn, loadStore lrs.Store, logger *grpclog.PrefixLogger) *balancerGroup {
return &balancerGroup{
// New creates a new BalancerGroup. Note that the BalancerGroup
// needs to be started to work.
func New(cc balancer.ClientConn, loadStore lrs.Store, logger *grpclog.PrefixLogger) *BalancerGroup {
return &BalancerGroup{
cc: cc,
logger: logger,
loadStore: loadStore,

idToBalancerConfig: make(map[internal.Locality]*subBalancerWithConfig),
balancerCache: cache.NewTimeoutCache(defaultSubBalancerCloseTimeout),
balancerCache: cache.NewTimeoutCache(DefaultSubBalancerCloseTimeout),
scToSubBalancer: make(map[balancer.SubConn]*subBalancerWithConfig),
idToPickerState: make(map[internal.Locality]*pickerState),
}
}

func (bg *balancerGroup) start() {
// Start starts the balancer group, including building all the sub-balancers,
// and send the existing addresses to them.
//
// A BalancerGroup can be closed and started later. When a BalancerGroup is
// closed, it can still receive address updates, which will be applied when
// restarted.
func (bg *BalancerGroup) Start() {
bg.incomingMu.Lock()
bg.incomingStarted = true
bg.incomingMu.Unlock()
Expand All @@ -263,12 +273,12 @@ func (bg *balancerGroup) start() {
bg.outgoingMu.Unlock()
}

// add adds a balancer built by builder to the group, with given id and weight.
// Add adds a balancer built by builder to the group, with given id and weight.
//
// weight should never be zero.
func (bg *balancerGroup) add(id internal.Locality, weight uint32, builder balancer.Builder) {
func (bg *BalancerGroup) Add(id internal.Locality, weight uint32, builder balancer.Builder) {
if weight == 0 {
bg.logger.Errorf("balancerGroup.add called with weight 0, locality: %v. Locality is not added to balancer group", id)
bg.logger.Errorf("BalancerGroup.add called with weight 0, locality: %v. Locality is not added to balancer group", id)
return
}

Expand Down Expand Up @@ -329,15 +339,15 @@ func (bg *balancerGroup) add(id internal.Locality, weight uint32, builder balanc
bg.outgoingMu.Unlock()
}

// remove removes the balancer with id from the group.
// Remove removes the balancer with id from the group.
//
// But doesn't close the balancer. The balancer is kept in a cache, and will be
// closed after timeout. Cleanup work (closing sub-balancer and removing
// subconns) will be done after timeout.
//
// It also removes the picker generated from this balancer from the picker
// group. It always results in a picker update.
func (bg *balancerGroup) remove(id internal.Locality) {
func (bg *BalancerGroup) Remove(id internal.Locality) {
bg.outgoingMu.Lock()
if sbToRemove, ok := bg.idToBalancerConfig[id]; ok {
if bg.outgoingStarted {
Expand Down Expand Up @@ -374,7 +384,7 @@ func (bg *balancerGroup) remove(id internal.Locality) {

// bg.remove(id) doesn't do cleanup for the sub-balancer. This function does
// cleanup after the timeout.
func (bg *balancerGroup) cleanupSubConns(config *subBalancerWithConfig) {
func (bg *BalancerGroup) cleanupSubConns(config *subBalancerWithConfig) {
bg.incomingMu.Lock()
// Remove SubConns. This is only done after the balancer is
// actually closed.
Expand All @@ -393,16 +403,16 @@ func (bg *balancerGroup) cleanupSubConns(config *subBalancerWithConfig) {
bg.incomingMu.Unlock()
}

// changeWeight changes the weight of the balancer.
// ChangeWeight changes the weight of the balancer.
//
// newWeight should never be zero.
//
// NOTE: It always results in a picker update now. This probably isn't
// necessary. But it seems better to do the update because it's a change in the
// picker (which is balancer's snapshot).
func (bg *balancerGroup) changeWeight(id internal.Locality, newWeight uint32) {
func (bg *BalancerGroup) ChangeWeight(id internal.Locality, newWeight uint32) {
if newWeight == 0 {
bg.logger.Errorf("balancerGroup.changeWeight called with newWeight 0. Weight is not changed")
bg.logger.Errorf("BalancerGroup.changeWeight called with newWeight 0. Weight is not changed")
return
}
bg.incomingMu.Lock()
Expand All @@ -425,8 +435,9 @@ func (bg *balancerGroup) changeWeight(id internal.Locality, newWeight uint32) {

// Following are actions from the parent grpc.ClientConn, forward to sub-balancers.

// SubConn state change: find the corresponding balancer and then forward.
func (bg *balancerGroup) handleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
// HandleSubConnStateChange handles the state for the subconn. It finds the
// corresponding balancer and forwards the update.
func (bg *BalancerGroup) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
bg.incomingMu.Lock()
config, ok := bg.scToSubBalancer[sc]
if !ok {
Expand All @@ -444,8 +455,12 @@ func (bg *balancerGroup) handleSubConnStateChange(sc balancer.SubConn, state con
bg.outgoingMu.Unlock()
}

// Address change: forward to balancer.
func (bg *balancerGroup) handleResolvedAddrs(id internal.Locality, addrs []resolver.Address) {
// HandleResolvedAddrs handles addresses from resolver. It finds the balancer
// and forwards the update.
//
// TODO: change this to UpdateClientConnState to handle addresses and balancer
// config.
func (bg *BalancerGroup) HandleResolvedAddrs(id internal.Locality, addrs []resolver.Address) {
bg.outgoingMu.Lock()
if config, ok := bg.idToBalancerConfig[id]; ok {
config.updateAddrs(addrs)
Expand All @@ -467,7 +482,7 @@ func (bg *balancerGroup) handleResolvedAddrs(id internal.Locality, addrs []resol
// from map. Delete sc from the map only when state changes to Shutdown. Since
// it's just forwarding the action, there's no need for a removeSubConn()
// wrapper function.
func (bg *balancerGroup) newSubConn(config *subBalancerWithConfig, addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
func (bg *BalancerGroup) newSubConn(config *subBalancerWithConfig, addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
// NOTE: if balancer with id was already removed, this should also return
// error. But since we call balancer.stopBalancer when removing the balancer, this
// shouldn't happen.
Expand All @@ -488,7 +503,7 @@ func (bg *balancerGroup) newSubConn(config *subBalancerWithConfig, addrs []resol

// updateBalancerState: create an aggregated picker and an aggregated
// connectivity state, then forward to ClientConn.
func (bg *balancerGroup) updateBalancerState(id internal.Locality, state balancer.State) {
func (bg *BalancerGroup) updateBalancerState(id internal.Locality, state balancer.State) {
bg.logger.Infof("Balancer state update from locality %v, new state: %+v", id, state)

bg.incomingMu.Lock()
Expand All @@ -508,7 +523,9 @@ func (bg *balancerGroup) updateBalancerState(id internal.Locality, state balance
}
}

func (bg *balancerGroup) close() {
// Close closes the balancer. It stops sub-balancers, and removes the subconns.
// The BalancerGroup can be restarted later.
func (bg *BalancerGroup) Close() {
bg.incomingMu.Lock()
if bg.incomingStarted {
bg.incomingStarted = false
Expand Down Expand Up @@ -568,8 +585,9 @@ func buildPickerAndState(m map[internal.Locality]*pickerState) balancer.State {
return balancer.State{ConnectivityState: aggregatedState, Picker: newPickerGroup(readyPickerWithWeights)}
}

// RandomWRR constructor, to be modified in tests.
var newRandomWRR = wrr.NewRandom
// NewRandomWRR is the WRR constructor used to pick sub-pickers from
// sub-balancers. It's to be modified in tests.
var NewRandomWRR = wrr.NewRandom

type pickerGroup struct {
length int
Expand All @@ -584,7 +602,7 @@ type pickerGroup struct {
// TODO: (bg) confirm this is the expected behavior: non-ready balancers should
// be ignored when picking. Only ready balancers are picked.
func newPickerGroup(readyPickerWithWeights []pickerState) *pickerGroup {
w := newRandomWRR()
w := NewRandomWRR()
for _, ps := range readyPickerWithWeights {
w.Add(ps.picker, int64(ps.weight))
}
Expand Down
Loading

0 comments on commit 98e4c7a

Please sign in to comment.