Skip to content

Commit

Permalink
Merge pull request #1784 from justxuewei/fix/adasvc/hill-climbing
Browse files Browse the repository at this point in the history
fix(adasvc): fix issue where limitation not updates
  • Loading branch information
justxuewei committed Mar 30, 2022
2 parents 97f45f4 + febe6fb commit c12c1d6
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 66 deletions.
1 change: 1 addition & 0 deletions cluster/loadbalance/p2c/loadbalance.go
Expand Up @@ -98,6 +98,7 @@ func (l *p2cLoadBalance) Select(invokers []protocol.Invoker, invocation protocol
return nil
}

// TODO(justxuewei): It should have a strategy to drop some metrics after a period of time.
remainingJIface, err := m.GetMethodMetrics(invokers[j].GetURL(), methodName, metrics.HillClimbing)
if err != nil {
if errors.Is(err, metrics.ErrMetricsNotFound) {
Expand Down
158 changes: 92 additions & 66 deletions filter/adaptivesvc/limiter/hill_climbing.go
Expand Up @@ -45,8 +45,9 @@ const (
var (
initialLimitation uint64 = 50
maxLimitation uint64 = 500
radicalPeriod uint64 = 1000
stablePeriod uint64 = 32000

radicalPeriod = 1000 * time.Millisecond
stablePeriod = 32000 * time.Millisecond
)

// HillClimbing is a limiter using HillClimbing algorithm
Expand All @@ -59,18 +60,18 @@ type HillClimbing struct {

mutex *sync.Mutex
// nextUpdateTime = lastUpdatedTime + updateInterval
updateInterval *atomic.Uint64
updateInterval *atomic.Duration
lastUpdatedTime *atomic.Time

// indicators of the current round
successCounter *atomic.Uint64
// metrics of the current round
transactionNum *atomic.Uint64
rttAvg *atomic.Float64

// indicators of history
bestConcurrency *atomic.Uint64
// best metrics in the history
bestMaxCapacity *atomic.Float64
bestRTTAvg *atomic.Float64
bestLimitation *atomic.Uint64
bestSuccessRate *atomic.Uint64
bestTPS *atomic.Uint64
}

func NewHillClimbing() Limiter {
Expand All @@ -80,14 +81,14 @@ func NewHillClimbing() Limiter {
inflight: new(atomic.Uint64),
limitation: atomic.NewUint64(initialLimitation),
mutex: new(sync.Mutex),
updateInterval: atomic.NewUint64(radicalPeriod),
updateInterval: atomic.NewDuration(radicalPeriod),
lastUpdatedTime: atomic.NewTime(time.Now()),
successCounter: new(atomic.Uint64),
transactionNum: new(atomic.Uint64),
rttAvg: new(atomic.Float64),
bestConcurrency: new(atomic.Uint64),
bestRTTAvg: new(atomic.Float64),
bestMaxCapacity: new(atomic.Float64),
bestRTTAvg: atomic.NewFloat64(math.MaxFloat64),
bestLimitation: new(atomic.Uint64),
bestSuccessRate: new(atomic.Uint64),
bestTPS: new(atomic.Uint64),
}

return l
Expand Down Expand Up @@ -139,7 +140,7 @@ func (u *HillClimbingUpdater) DoUpdate() error {
}()
VerboseDebugf("[HillClimbingUpdater] A request finished, the limiter will be updated, seq: %d.", u.seq)

rtt := uint64(time.Now().Sub(u.startTime))
rtt := uint64(time.Now().Sub(u.startTime).Milliseconds())
inflight := u.limiter.Inflight()

option, err := u.getOption(rtt, inflight)
Expand All @@ -162,45 +163,56 @@ func (u *HillClimbingUpdater) getOption(rtt, _ uint64) (HillClimbingOption, erro
lastUpdatedTime := u.limiter.lastUpdatedTime.Load()
updateInterval := u.limiter.updateInterval.Load()
rttAvg := u.limiter.rttAvg.Load()
successCounter := u.limiter.successCounter.Load()
transactionNum := u.limiter.transactionNum.Load()
limitation := u.limiter.limitation.Load()

if now.Sub(lastUpdatedTime) > time.Duration(updateInterval) ||
rttAvg == 0 {
// Current req is at the next round or no rttAvg.
// the current option is expired
if now.Before(lastUpdatedTime) {
return option, nil
}

if now.Sub(lastUpdatedTime) > updateInterval || rttAvg == 0 {
// the current req is on the next round or no rttAvg.

// FIXME(justxuewei): If all requests in one round
// not receive responses, rttAvg will be 0, and
// concurrency will be 0 as well, the actual
// concurrency, however, is not 0.
concurrency := float64(successCounter) * rttAvg / float64(updateInterval)
// FIXME(justxuewei): If all requests in one round not receive responses, rttAvg will be 0, and maxCapacity will
// be 0 as well, the actual maxCapacity, however, is not 0.
maxCapacity := float64(transactionNum) * float64(updateInterval.Milliseconds()) / rttAvg
VerboseDebugf("[HillClimbingUpdater] maxCapacity: %f, transactionNum: %d, rttAvg: %f, bestRTTAvg: %f, "+
"updateInterval: %d",
maxCapacity, transactionNum, rttAvg, u.limiter.bestRTTAvg.Load(), updateInterval.Milliseconds())

// Consider extending limitation if concurrent is
// about to reach the limitation.
if uint64(concurrency*1.5) > limitation {
// Consider extending limitation if concurrent is about to reach the limitation.
if u.limiter.bestRTTAvg.Load() == math.MaxFloat64 || uint64(maxCapacity*1.5) > limitation {
if updateInterval == radicalPeriod {
option = HillClimbingOptionExtendPlus
} else {
option = HillClimbingOptionExtend
}
}

successRate := uint64(1000.0 * float64(successCounter) / float64(updateInterval))
tps := uint64(1000.0 * float64(transactionNum) / float64(updateInterval.Milliseconds()))
VerboseDebugf("[HillClimbingUpdater] The TPS is %d, transactionNum: %d, updateInterval: %d.",
tps, transactionNum, updateInterval)

if successRate > u.limiter.bestSuccessRate.Load() {
// successRate is the best in the history, update
// all best-indicators.
u.limiter.bestSuccessRate.Store(successRate)
if tps > u.limiter.bestTPS.Load() {
VerboseDebugf("[HillClimbingUpdater] The best TPS is updated from %d to %d.",
u.limiter.bestTPS.Load(), tps)
// tps is the best in the history, update
// all best metrics.
u.limiter.bestTPS.Store(tps)
u.limiter.bestRTTAvg.Store(rttAvg)
u.limiter.bestConcurrency.Store(uint64(concurrency))
u.limiter.bestMaxCapacity.Store(maxCapacity)
u.limiter.bestLimitation.Store(u.limiter.limitation.Load())
VerboseDebugf("[HillClimbingUpdater] Best-indicators are up-to-date, "+
"seq: %d, bestSuccessRate: %d, bestRTTAvg: %.4f, bestConcurrency: %d,"+
" bestLimitation: %d.", u.seq, u.limiter.bestSuccessRate.Load(),
u.limiter.bestRTTAvg.Load(), u.limiter.bestConcurrency.Load(),
VerboseDebugf("[HillClimbingUpdater] Best-metrics are up-to-date, "+
"seq: %d, bestTPS: %d, bestRTTAvg: %.4f, bestMaxCapacity: %d,"+
" bestLimitation: %d.", u.seq, u.limiter.bestTPS.Load(),
u.limiter.bestRTTAvg.Load(), u.limiter.bestMaxCapacity.Load(),
u.limiter.bestLimitation.Load())
} else {
if u.shouldShrink(successCounter, uint64(concurrency), successRate, rttAvg) {
VerboseDebugf("[HillClimbingUpdater] The best TPS is not updated, best TPS is %d, "+
"the current TPS is %d",
u.limiter.bestTPS.Load(), tps)
if u.shouldShrink(transactionNum, maxCapacity, tps, rttAvg) {
if updateInterval == radicalPeriod {
option = HillClimbingOptionShrinkPlus
} else {
Expand All @@ -209,84 +221,98 @@ func (u *HillClimbingUpdater) getOption(rtt, _ uint64) (HillClimbingOption, erro
// shrinking limitation means the process of adjusting
// limitation goes to stable, so extends the update
// interval to avoid adjusting frequently.
u.limiter.updateInterval.Store(minUint64(updateInterval*2, stablePeriod))
u.limiter.updateInterval.Store(minDuration(updateInterval*2, stablePeriod))
}
}

// reset indicators for the new round
u.limiter.successCounter.Store(0)
// reset metrics for the new round
u.limiter.transactionNum.Store(0)
u.limiter.rttAvg.Store(float64(rtt))
u.limiter.lastUpdatedTime.Store(time.Now())
VerboseDebugf("[HillClimbingUpdater] A new round is applied, all indicators are reset.")
VerboseDebugf("[HillClimbingUpdater] A new round is applied, all metrics are reset.")
} else {
// still in the current round
// still on the current round

u.limiter.successCounter.Add(1)
u.limiter.transactionNum.Add(1)
// ra = (ra * c + r) / (c + 1), where ra denotes rttAvg,
// c denotes successCounter, r denotes rtt.
u.limiter.rttAvg.Store((rttAvg*float64(successCounter) + float64(rtt)) / float64(successCounter+1))
// c denotes transactionNum, r denotes rtt.
u.limiter.rttAvg.Store((rttAvg*float64(transactionNum) + float64(rtt)) / float64(transactionNum+1))
option = HillClimbingOptionDoNothing
}

return option, nil
}

func (u *HillClimbingUpdater) shouldShrink(counter, concurrency, successRate uint64, rttAvg float64) bool {
bestSuccessRate := u.limiter.bestSuccessRate.Load()
func (u *HillClimbingUpdater) shouldShrink(transactionNum uint64, maxCapacity float64, tps uint64, rttAvg float64) bool {
//bestTPS := u.limiter.bestTPS.Load()
bestMaxCapacity := u.limiter.bestMaxCapacity.Load()
bestRTTAvg := u.limiter.bestRTTAvg.Load()

diff := bestSuccessRate - successRate
diffPct := uint64(100.0 * float64(successRate) / float64(bestSuccessRate))
diff := bestMaxCapacity - maxCapacity
diffPct := uint64(100.0 * diff / bestMaxCapacity)

VerboseDebugf("[HillClimbingUpdater] shouldShrink maxCapacity diff: %f, diffPct: %d.", diff, diffPct)

if diff <= 300 && diffPct <= 10 {
// diff is acceptable, shouldn't shrink
return false
}

if concurrency > bestSuccessRate || rttAvg > bestRTTAvg {
// The unacceptable diff dues to too large
// concurrency or rttAvg.
concDiff := concurrency - bestSuccessRate
concDiffPct := uint64(100.0 * float64(concurrency) / float64(bestSuccessRate))
rttAvgDiff := rttAvg - bestRTTAvg
rttAvgPctDiff := uint64(100.0 * rttAvg / bestRTTAvg)
if diff > 0 || rttAvg > bestRTTAvg {
// The unacceptable diff dues to too large maxCapacity or rttAvg.
rttAvgDiff := uint64(rttAvg - bestRTTAvg)
rttAvgDiffPct := uint64(100.0 * rttAvg / bestRTTAvg)

// TODO(justxuewei): Hard-coding here is not proper, but
// it should refactor after testing.
// TODO(justxuewei): Hard-coding here is not proper, but it should refactor after testing.
var (
rttAvgDiffThreshold uint64
rttAvgPctDiffThreshold uint64
rttAvgDiffPctThreshold uint64
)
if bestRTTAvg < 5 {
rttAvgDiffThreshold = 3
rttAvgPctDiffThreshold = 80
rttAvgDiffPctThreshold = 80
} else if bestRTTAvg < 10 {
rttAvgDiffThreshold = 2
rttAvgPctDiffThreshold = 30
rttAvgDiffPctThreshold = 30
} else if bestRTTAvg < 50 {
rttAvgDiffThreshold = 5
rttAvgPctDiffThreshold = 20
rttAvgDiffPctThreshold = 20
} else if bestRTTAvg < 100 {
rttAvgDiffThreshold = 10
rttAvgPctDiffThreshold = 10
rttAvgDiffPctThreshold = 10
} else {
rttAvgDiffThreshold = 20
rttAvgPctDiffThreshold = 5
rttAvgDiffPctThreshold = 5
}

return (concDiffPct > 10 && concDiff > 5) && (uint64(rttAvgDiff) > rttAvgDiffThreshold || rttAvgPctDiff >= rttAvgPctDiffThreshold)
VerboseDebugf("[HillClimbingUpdater] shouldShrink bestRTTAvg: %d, rttAvgDiff: %d, rttAvgDiffPct: %d, "+
"rttAvgDiffThreshold: %d, rttAvgDiffPctThreshold: %d.", bestRTTAvg, rttAvgDiff, rttAvgDiffPct,
rttAvgDiffPctThreshold, rttAvgDiffPctThreshold)

return (diffPct > 10 && diff > 5) &&
(rttAvgDiff > rttAvgDiffThreshold || rttAvgDiffPct >= rttAvgDiffPctThreshold)
}

return false
}

func (u *HillClimbingUpdater) adjustLimitation(option HillClimbingOption) error {
if option == HillClimbingOptionDoNothing {
VerboseDebugf("[HillClimbingUpdater] The option is do nothing, the limitation will not be updated.")
return nil
}

limitation := float64(u.limiter.limitation.Load())
oldLimitation := limitation
bestLimitation := float64(u.limiter.bestLimitation.Load())
updateInterval := u.limiter.updateInterval.Load()
alpha := 1.5 * math.Log(limitation)
beta := 0.8 * math.Log(limitation)
logUpdateInterval := math.Log2(float64(u.limiter.updateInterval.Load()) / 1000.0)
logUpdateInterval := math.Max(1.0, math.Log2(float64(updateInterval.Milliseconds())/1000.0))

VerboseDebugf("[HillClimbingUpdater] Before calculating new limitation, option: %d, limitation: %f, "+
"bestLimitation: %f, alpha: %f, beta: %f, logUpdateInterval: %f, updateInterval: %d", option, limitation,
bestLimitation, alpha, beta, logUpdateInterval, updateInterval.Milliseconds())

switch option {
case HillClimbingOptionExtendPlus:
Expand Down
5 changes: 5 additions & 0 deletions filter/adaptivesvc/limiter/limiter.go
Expand Up @@ -34,9 +34,14 @@ const (
type Limiter interface {
Inflight() uint64
Remaining() uint64
// Acquire inspects the current status of the system:
// - if reaches the limitation, reject the request immediately.
// - if not, grant this request and return an Updater defined below.
Acquire() (Updater, error)
}

type Updater interface {
// DoUpdate is called once an invocation is finished, it tells Updater that the invocation is finished, and please
// update the Remaining, Inflight parameters of the Limiter.
DoUpdate() error
}
11 changes: 11 additions & 0 deletions filter/adaptivesvc/limiter/utils.go
Expand Up @@ -17,6 +17,10 @@

package limiter

import (
"time"
)

import (
"dubbo.apache.org/dubbo-go/v3/common/logger"
)
Expand All @@ -34,3 +38,10 @@ func minUint64(lhs, rhs uint64) uint64 {
}
return rhs
}

func minDuration(lhs, rhs time.Duration) time.Duration {
if lhs < rhs {
return lhs
}
return rhs
}

0 comments on commit c12c1d6

Please sign in to comment.