Skip to content

Commit

Permalink
Add customized probe num support for circuit breaker (#428)
Browse files Browse the repository at this point in the history
  • Loading branch information
binbin0325 committed Dec 11, 2021
1 parent a409ca5 commit 9eabe9f
Show file tree
Hide file tree
Showing 3 changed files with 170 additions and 10 deletions.
46 changes: 39 additions & 7 deletions core/circuitbreaker/circuit_breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ type circuitBreakerBase struct {
retryTimeoutMs uint32
// nextRetryTimestampMs is the time circuit breaker could probe
nextRetryTimestampMs uint64
// probeNumber is the number of probe requests that are allowed to pass when the circuit breaker is half open.
probeNumber uint64
// curProbeNumber is the real-time probe number.
curProbeNumber uint64
// state is the state machine of circuit breaker
state *State
}
Expand All @@ -156,6 +160,14 @@ func (b *circuitBreakerBase) updateNextRetryTimestamp() {
atomic.StoreUint64(&b.nextRetryTimestampMs, util.CurrentTimeMillis()+uint64(b.retryTimeoutMs))
}

func (b *circuitBreakerBase) addCurProbeNum() {
atomic.AddUint64(&b.curProbeNumber, 1)
}

func (b *circuitBreakerBase) resetCurProbeNum() {
atomic.StoreUint64(&b.curProbeNumber, 0)
}

// fromClosedToOpen updates circuit breaker state machine from closed to open.
// Return true only if current goroutine successfully accomplished the transformation.
func (b *circuitBreakerBase) fromClosedToOpen(snapshot interface{}) bool {
Expand Down Expand Up @@ -206,6 +218,7 @@ func (b *circuitBreakerBase) fromOpenToHalfOpen(ctx *base.EntryContext) bool {
// Return true only if current goroutine successfully accomplished the transformation.
func (b *circuitBreakerBase) fromHalfOpenToOpen(snapshot interface{}) bool {
if b.state.cas(HalfOpen, Open) {
b.resetCurProbeNum()
b.updateNextRetryTimestamp()
for _, listener := range stateChangeListeners {
listener.OnTransformToOpen(HalfOpen, *b.rule, snapshot)
Expand All @@ -221,6 +234,7 @@ func (b *circuitBreakerBase) fromHalfOpenToOpen(snapshot interface{}) bool {
// Return true only if current goroutine successfully accomplished the transformation.
func (b *circuitBreakerBase) fromHalfOpenToClosed() bool {
if b.state.cas(HalfOpen, Closed) {
b.resetCurProbeNum()
for _, listener := range stateChangeListeners {
listener.OnTransformToClosed(HalfOpen, *b.rule)
}
Expand All @@ -247,6 +261,7 @@ func newSlowRtCircuitBreakerWithStat(r *Rule, stat *slowRequestLeapArray) *slowR
retryTimeoutMs: r.RetryTimeoutMs,
nextRetryTimestampMs: 0,
state: newState(),
probeNumber: r.ProbeNum,
},
stat: stat,
maxAllowedRt: r.MaxAllowedRtMs,
Expand Down Expand Up @@ -282,6 +297,8 @@ func (b *slowRtCircuitBreaker) TryPass(ctx *base.EntryContext) bool {
if b.retryTimeoutArrived() && b.fromOpenToHalfOpen(ctx) {
return true
}
} else if curStatus == HalfOpen && b.probeNumber > 0 {
return true
}
return false
}
Expand Down Expand Up @@ -318,9 +335,12 @@ func (b *slowRtCircuitBreaker) OnRequestComplete(rt uint64, _ error) {
// fail to probe
b.fromHalfOpenToOpen(1.0)
} else {
// succeed to probe
b.fromHalfOpenToClosed()
b.resetMetric()
b.addCurProbeNum()
if b.probeNumber == 0 || atomic.LoadUint64(&b.curProbeNumber) >= b.probeNumber {
// succeed to probe
b.fromHalfOpenToClosed()
b.resetMetric()
}
}
return
}
Expand Down Expand Up @@ -433,6 +453,7 @@ func newErrorRatioCircuitBreakerWithStat(r *Rule, stat *errorCounterLeapArray) *
retryTimeoutMs: r.RetryTimeoutMs,
nextRetryTimestampMs: 0,
state: newState(),
probeNumber: r.ProbeNum,
},
minRequestAmount: r.MinRequestAmount,
errorRatioThreshold: r.Threshold,
Expand Down Expand Up @@ -465,6 +486,8 @@ func (b *errorRatioCircuitBreaker) TryPass(ctx *base.EntryContext) bool {
if b.retryTimeoutArrived() && b.fromOpenToHalfOpen(ctx) {
return true
}
} else if curStatus == HalfOpen && b.probeNumber > 0 {
return true
}
return false
}
Expand Down Expand Up @@ -498,8 +521,11 @@ func (b *errorRatioCircuitBreaker) OnRequestComplete(_ uint64, err error) {
}
if curStatus == HalfOpen {
if err == nil {
b.fromHalfOpenToClosed()
b.resetMetric()
b.addCurProbeNum()
if b.probeNumber == 0 || atomic.LoadUint64(&b.curProbeNumber) >= b.probeNumber {
b.fromHalfOpenToClosed()
b.resetMetric()
}
} else {
b.fromHalfOpenToOpen(1.0)
}
Expand Down Expand Up @@ -612,6 +638,7 @@ func newErrorCountCircuitBreakerWithStat(r *Rule, stat *errorCounterLeapArray) *
retryTimeoutMs: r.RetryTimeoutMs,
nextRetryTimestampMs: 0,
state: newState(),
probeNumber: r.ProbeNum,
},
minRequestAmount: r.MinRequestAmount,
errorCountThreshold: uint64(r.Threshold),
Expand Down Expand Up @@ -644,6 +671,8 @@ func (b *errorCountCircuitBreaker) TryPass(ctx *base.EntryContext) bool {
if b.retryTimeoutArrived() && b.fromOpenToHalfOpen(ctx) {
return true
}
} else if curStatus == HalfOpen && b.probeNumber > 0 {
return true
}
return false
}
Expand Down Expand Up @@ -675,8 +704,11 @@ func (b *errorCountCircuitBreaker) OnRequestComplete(_ uint64, err error) {
}
if curStatus == HalfOpen {
if err == nil {
b.fromHalfOpenToClosed()
b.resetMetric()
b.addCurProbeNum()
if b.probeNumber == 0 || atomic.LoadUint64(&b.curProbeNumber) >= b.probeNumber {
b.fromHalfOpenToClosed()
b.resetMetric()
}
} else {
b.fromHalfOpenToOpen(1)
}
Expand Down
129 changes: 126 additions & 3 deletions core/circuitbreaker/circuit_breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,15 @@ type StateChangeListenerMock struct {
}

func (s *StateChangeListenerMock) OnTransformToClosed(prev State, rule Rule) {
_ = s.Called(prev, rule)
logging.Debug("transform to closed", "strategy", rule.Strategy, "prevState", prev.String())
return
}

func (s *StateChangeListenerMock) OnTransformToOpen(prev State, rule Rule, snapshot interface{}) {
_ = s.Called(prev, rule, snapshot)
logging.Debug("transform to open", "strategy", rule.Strategy, "prevState", prev.String(), "snapshot", snapshot)
}

func (s *StateChangeListenerMock) OnTransformToHalfOpen(prev State, rule Rule) {
_ = s.Called(prev, rule)
logging.Debug("transform to Half-Open", "strategy", rule.Strategy, "prevState", prev.String())
}

Expand Down Expand Up @@ -140,6 +137,35 @@ func TestSlowRtCircuitBreaker_TryPass(t *testing.T) {
assert.True(t, pass)
assert.True(t, b.state.get() == HalfOpen)
})

t.Run("TryPass_ProbeNum", func(t *testing.T) {
r := &Rule{
Resource: "abc",
Strategy: SlowRequestRatio,
RetryTimeoutMs: 3000,
MinRequestAmount: 10,
StatIntervalMs: 10000,
MaxAllowedRtMs: 50,
Threshold: 0.5,
ProbeNum: 10,
}
b, err := newSlowRtCircuitBreaker(r)
assert.Nil(t, err)

b.state.set(Open)
ctx := &base.EntryContext{
Resource: base.NewResourceWrapper("abc", base.ResTypeCommon, base.Inbound),
}
e := base.NewSentinelEntry(ctx, base.NewResourceWrapper("abc", base.ResTypeCommon, base.Inbound), nil)
ctx.SetEntry(e)
for i := 0; i < 10; i++ {
pass := b.TryPass(ctx)
assert.True(t, pass)
assert.True(t, b.state.get() == HalfOpen)
b.OnRequestComplete(1, nil)
}
assert.True(t, b.state.get() == Closed)
})
}

func TestSlowRt_OnRequestComplete(t *testing.T) {
Expand Down Expand Up @@ -169,6 +195,20 @@ func TestSlowRt_OnRequestComplete(t *testing.T) {
b.OnRequestComplete(10, nil)
assert.True(t, b.CurrentState() == Closed)
})
t.Run("OnRequestComplete_ProbeNum_Success", func(t *testing.T) {
b.probeNumber = 2
b.state.set(HalfOpen)
b.OnRequestComplete(10, nil)
assert.True(t, b.CurrentState() == HalfOpen)
assert.True(t, b.curProbeNumber == 1)
})
t.Run("OnRequestComplete_ProbeNum_Failed", func(t *testing.T) {
b.probeNumber = 2
b.state.set(HalfOpen)
b.OnRequestComplete(base.NewEmptyEntryContext().Rt(), nil)
assert.True(t, b.CurrentState() == Open)
assert.True(t, b.curProbeNumber == 0)
})
}

func TestSlowRt_ResetBucketTo(t *testing.T) {
Expand Down Expand Up @@ -227,6 +267,33 @@ func TestErrorRatioCircuitBreaker_TryPass(t *testing.T) {
assert.True(t, pass)
assert.True(t, b.state.get() == HalfOpen)
})
t.Run("TryPass_ProbeNum", func(t *testing.T) {
r := &Rule{
Resource: "abc",
Strategy: ErrorRatio,
RetryTimeoutMs: 3000,
MinRequestAmount: 10,
StatIntervalMs: 10000,
Threshold: 0.5,
ProbeNum: 10,
}
b, err := newErrorRatioCircuitBreaker(r)
assert.Nil(t, err)

b.state.set(Open)
ctx := &base.EntryContext{
Resource: base.NewResourceWrapper("abc", base.ResTypeCommon, base.Inbound),
}
e := base.NewSentinelEntry(ctx, base.NewResourceWrapper("abc", base.ResTypeCommon, base.Inbound), nil)
ctx.SetEntry(e)
for i := 0; i < 10; i++ {
pass := b.TryPass(ctx)
assert.True(t, pass)
assert.True(t, b.state.get() == HalfOpen)
b.OnRequestComplete(1, nil)
}
assert.True(t, b.state.get() == Closed)
})
}

func TestErrorRatio_OnRequestComplete(t *testing.T) {
Expand Down Expand Up @@ -254,6 +321,20 @@ func TestErrorRatio_OnRequestComplete(t *testing.T) {
b.OnRequestComplete(0, errors.New("errorRatio"))
assert.True(t, b.CurrentState() == Open)
})
t.Run("OnRequestComplete_ProbeNum_Success", func(t *testing.T) {
b.probeNumber = 2
b.state.set(HalfOpen)
b.OnRequestComplete(base.NewEmptyEntryContext().Rt(), nil)
assert.True(t, b.CurrentState() == HalfOpen)
assert.True(t, b.curProbeNumber == 1)
})
t.Run("OnRequestComplete_ProbeNum_Failed", func(t *testing.T) {
b.probeNumber = 2
b.state.set(HalfOpen)
b.OnRequestComplete(0, errors.New("errorRatio"))
assert.True(t, b.CurrentState() == Open)
assert.True(t, b.curProbeNumber == 0)
})
}

func TestErrorRatio_ResetBucketTo(t *testing.T) {
Expand Down Expand Up @@ -312,6 +393,34 @@ func TestErrorCountCircuitBreaker_TryPass(t *testing.T) {
assert.True(t, pass)
assert.True(t, b.state.get() == HalfOpen)
})

t.Run("TryPass_ProbeNum", func(t *testing.T) {
r := &Rule{
Resource: "abc",
Strategy: ErrorCount,
RetryTimeoutMs: 3000,
MinRequestAmount: 10,
StatIntervalMs: 10000,
Threshold: 1.0,
ProbeNum: 10,
}
b, err := newErrorCountCircuitBreaker(r)
assert.Nil(t, err)

b.state.set(Open)
ctx := &base.EntryContext{
Resource: base.NewResourceWrapper("abc", base.ResTypeCommon, base.Inbound),
}
e := base.NewSentinelEntry(ctx, base.NewResourceWrapper("abc", base.ResTypeCommon, base.Inbound), nil)
ctx.SetEntry(e)
for i := 0; i < 10; i++ {
pass := b.TryPass(ctx)
assert.True(t, pass)
assert.True(t, b.state.get() == HalfOpen)
b.OnRequestComplete(1, nil)
}
assert.True(t, b.state.get() == Closed)
})
}

func TestErrorCount_OnRequestComplete(t *testing.T) {
Expand Down Expand Up @@ -339,6 +448,20 @@ func TestErrorCount_OnRequestComplete(t *testing.T) {
b.OnRequestComplete(0, errors.New("errorCount"))
assert.True(t, b.CurrentState() == Open)
})
t.Run("OnRequestComplete_ProbeNum_Success", func(t *testing.T) {
b.probeNumber = 2
b.state.set(HalfOpen)
b.OnRequestComplete(base.NewEmptyEntryContext().Rt(), nil)
assert.True(t, b.CurrentState() == HalfOpen)
assert.True(t, b.curProbeNumber == 1)
})
t.Run("OnRequestComplete_ProbeNum_Failed", func(t *testing.T) {
b.probeNumber = 2
b.state.set(HalfOpen)
b.OnRequestComplete(0, errors.New("errorCount"))
assert.True(t, b.CurrentState() == Open)
assert.True(t, b.curProbeNumber == 0)
})
}

func TestFromClosedToOpen(t *testing.T) {
Expand Down
5 changes: 5 additions & 0 deletions core/circuitbreaker/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ type Rule struct {
// for ErrorRatio, it represents the max error request ratio
// for ErrorCount, it represents the max error request count
Threshold float64 `json:"threshold"`
//ProbeNum is number of probes required when the circuit breaker is half-open.
//when the probe num are set and circuit breaker in the half-open state.
//if err occurs during the probe, the circuit breaker is opened immediately.
//otherwise,the circuit breaker is closed only after the number of probes is reached
ProbeNum uint64 `json:"probeNum"`
}

func (r *Rule) String() string {
Expand Down

0 comments on commit 9eabe9f

Please sign in to comment.