Skip to content

Commit

Permalink
review comments 3
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley committed May 4, 2023
1 parent 2ca3db9 commit 0aa8699
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 112 deletions.
26 changes: 12 additions & 14 deletions balancer/weightedroundrobin/balancer.go
Expand Up @@ -131,7 +131,6 @@ func (b *wrrBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error
return balancer.ErrBadResolverState
}

// Regenerate & send picker.
b.regeneratePicker()

return nil
Expand All @@ -156,21 +155,25 @@ func (b *wrrBalancer) updateAddresses(addrs []resolver.Address) {
// addr is a new address (not existing in b.subConns).
sc, err := b.cc.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{})
if err != nil {
b.logger.Warningf("wrr: failed to create new SubConn: %v", err)
b.logger.Warningf("wrr: failed to create new SubConn for address %v: %v", addr, err)
continue
}
wsc = &weightedSubConn{
SubConn: sc,
logger: b.logger,
connectivityState: connectivity.Idle,
// Initially, we set load reports to off, because they are not
// running upon initial weightedSubConn creation.
cfg: &lbConfig{EnableOOBLoadReport: false},
}
b.subConns.Set(addr, wsc)
b.scMap[sc] = wsc
b.csEvltr.RecordTransition(connectivity.Shutdown, connectivity.Idle)
sc.Connect()
}
// Update config for existing weightedSubConn or send update for first
// time to new one. Ensures an OOB listener is running if needed.
// time to new one. Ensures an OOB listener is running if needed
// (and stops the existing one if applicable).
wsc.updateConfig(b.cfg)
}

Expand Down Expand Up @@ -384,8 +387,8 @@ func (p *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {

// weightedSubConn is the wrapper of a subconn that holds the subconn and its
// weight (and other parameters relevant to computing the effective weight).
// It also tracks connectivity state, listens for metrics updates by
// implementing the orca.OOBListener interface and manages that listener.
// When needed, it also tracks connectivity state, listens for metrics updates
// by implementing the orca.OOBListener interface and manages that listener.
type weightedSubConn struct {
balancer.SubConn
logger *grpclog.PrefixLogger
Expand Down Expand Up @@ -436,11 +439,6 @@ func (w *weightedSubConn) updateConfig(cfg *lbConfig) {
w.mu.Lock()
defer w.mu.Unlock()
oldCfg := w.cfg
if oldCfg == nil {
// By default we set load reports to off, because they are not running
// upon initial weightedSubConn creation.
oldCfg = &lbConfig{EnableOOBLoadReport: false}
}
w.cfg = cfg
newPeriod := cfg.OOBReportingPeriod
if cfg.EnableOOBLoadReport == oldCfg.EnableOOBLoadReport &&
Expand All @@ -460,7 +458,7 @@ func (w *weightedSubConn) updateConfig(cfg *lbConfig) {
return
}
if w.logger.V(2) {
w.logger.Infof("Registering listener for %v with interval %v", w.SubConn, newPeriod)
w.logger.Infof("Registering ORCA listener for %v with interval %v", w.SubConn, newPeriod)
}
opts := orca.OOBListenerOptions{ReportInterval: newPeriod}
w.stopORCAListener = orca.RegisterOOBListener(w.SubConn, w, opts)
Expand All @@ -476,7 +474,7 @@ func (w *weightedSubConn) updateConnectivityState(cs connectivity.State) connect
w.SubConn.Connect()
case connectivity.Ready:
// If we transition back to READY state, reset nonEmptySince so that we
// apply the backout period after we start receiving load data. Note
// apply the blackout period after we start receiving load data. Note
// that we cannot guarantee that we will never receive lingering
// callbacks for backend metric reports from the previous connection
// after the new connection has been established, but they should be
Expand Down Expand Up @@ -506,8 +504,8 @@ func (w *weightedSubConn) updateConnectivityState(cs connectivity.State) connect

// weight returns the current effective weight of the subconn, taking into
// account the parameters. Returns 0 for blacked out or expired data, which
// will cause the backend weight to be treated as the mean of the other
// backends.
// will cause the backend weight to be treated as the mean of the weights of
// the other backends.
func (w *weightedSubConn) weight(now time.Time, weightExpirationPeriod, blackoutPeriod time.Duration) float64 {
w.mu.Lock()
defer w.mu.Unlock()
Expand Down

0 comments on commit 0aa8699

Please sign in to comment.