From 9eabe9f06b2203771cacb249c6ee48f36ff4f61f Mon Sep 17 00:00:00 2001 From: "binbin.zhang" Date: Sat, 11 Dec 2021 15:43:43 +0800 Subject: [PATCH] Add customized probe num support for circuit breaker (#428) --- core/circuitbreaker/circuit_breaker.go | 46 +++++-- core/circuitbreaker/circuit_breaker_test.go | 129 +++++++++++++++++++- core/circuitbreaker/rule.go | 5 + 3 files changed, 170 insertions(+), 10 deletions(-) diff --git a/core/circuitbreaker/circuit_breaker.go b/core/circuitbreaker/circuit_breaker.go index b7d2396ab..171482a61 100644 --- a/core/circuitbreaker/circuit_breaker.go +++ b/core/circuitbreaker/circuit_breaker.go @@ -136,6 +136,10 @@ type circuitBreakerBase struct { retryTimeoutMs uint32 // nextRetryTimestampMs is the time circuit breaker could probe nextRetryTimestampMs uint64 + // probeNumber is the number of probe requests that are allowed to pass when the circuit breaker is half open. + probeNumber uint64 + // curProbeNumber is the real-time probe number. + curProbeNumber uint64 // state is the state machine of circuit breaker state *State } @@ -156,6 +160,14 @@ func (b *circuitBreakerBase) updateNextRetryTimestamp() { atomic.StoreUint64(&b.nextRetryTimestampMs, util.CurrentTimeMillis()+uint64(b.retryTimeoutMs)) } +func (b *circuitBreakerBase) addCurProbeNum() { + atomic.AddUint64(&b.curProbeNumber, 1) +} + +func (b *circuitBreakerBase) resetCurProbeNum() { + atomic.StoreUint64(&b.curProbeNumber, 0) +} + // fromClosedToOpen updates circuit breaker state machine from closed to open. // Return true only if current goroutine successfully accomplished the transformation. func (b *circuitBreakerBase) fromClosedToOpen(snapshot interface{}) bool { @@ -206,6 +218,7 @@ func (b *circuitBreakerBase) fromOpenToHalfOpen(ctx *base.EntryContext) bool { // Return true only if current goroutine successfully accomplished the transformation. func (b *circuitBreakerBase) fromHalfOpenToOpen(snapshot interface{}) bool { if b.state.cas(HalfOpen, Open) { + b.resetCurProbeNum() b.updateNextRetryTimestamp() for _, listener := range stateChangeListeners { listener.OnTransformToOpen(HalfOpen, *b.rule, snapshot) @@ -221,6 +234,7 @@ func (b *circuitBreakerBase) fromHalfOpenToOpen(snapshot interface{}) bool { // Return true only if current goroutine successfully accomplished the transformation. func (b *circuitBreakerBase) fromHalfOpenToClosed() bool { if b.state.cas(HalfOpen, Closed) { + b.resetCurProbeNum() for _, listener := range stateChangeListeners { listener.OnTransformToClosed(HalfOpen, *b.rule) } @@ -247,6 +261,7 @@ func newSlowRtCircuitBreakerWithStat(r *Rule, stat *slowRequestLeapArray) *slowR retryTimeoutMs: r.RetryTimeoutMs, nextRetryTimestampMs: 0, state: newState(), + probeNumber: r.ProbeNum, }, stat: stat, maxAllowedRt: r.MaxAllowedRtMs, @@ -282,6 +297,8 @@ func (b *slowRtCircuitBreaker) TryPass(ctx *base.EntryContext) bool { if b.retryTimeoutArrived() && b.fromOpenToHalfOpen(ctx) { return true } + } else if curStatus == HalfOpen && b.probeNumber > 0 { + return true } return false } @@ -318,9 +335,12 @@ func (b *slowRtCircuitBreaker) OnRequestComplete(rt uint64, _ error) { // fail to probe b.fromHalfOpenToOpen(1.0) } else { - // succeed to probe - b.fromHalfOpenToClosed() - b.resetMetric() + b.addCurProbeNum() + if b.probeNumber == 0 || atomic.LoadUint64(&b.curProbeNumber) >= b.probeNumber { + // succeed to probe + b.fromHalfOpenToClosed() + b.resetMetric() + } } return } @@ -433,6 +453,7 @@ func newErrorRatioCircuitBreakerWithStat(r *Rule, stat *errorCounterLeapArray) * retryTimeoutMs: r.RetryTimeoutMs, nextRetryTimestampMs: 0, state: newState(), + probeNumber: r.ProbeNum, }, minRequestAmount: r.MinRequestAmount, errorRatioThreshold: r.Threshold, @@ -465,6 +486,8 @@ func (b *errorRatioCircuitBreaker) TryPass(ctx *base.EntryContext) bool { if b.retryTimeoutArrived() && b.fromOpenToHalfOpen(ctx) { return true } + } else if curStatus == HalfOpen && b.probeNumber > 0 { + return true } return false } @@ -498,8 +521,11 @@ func (b *errorRatioCircuitBreaker) OnRequestComplete(_ uint64, err error) { } if curStatus == HalfOpen { if err == nil { - b.fromHalfOpenToClosed() - b.resetMetric() + b.addCurProbeNum() + if b.probeNumber == 0 || atomic.LoadUint64(&b.curProbeNumber) >= b.probeNumber { + b.fromHalfOpenToClosed() + b.resetMetric() + } } else { b.fromHalfOpenToOpen(1.0) } @@ -612,6 +638,7 @@ func newErrorCountCircuitBreakerWithStat(r *Rule, stat *errorCounterLeapArray) * retryTimeoutMs: r.RetryTimeoutMs, nextRetryTimestampMs: 0, state: newState(), + probeNumber: r.ProbeNum, }, minRequestAmount: r.MinRequestAmount, errorCountThreshold: uint64(r.Threshold), @@ -644,6 +671,8 @@ func (b *errorCountCircuitBreaker) TryPass(ctx *base.EntryContext) bool { if b.retryTimeoutArrived() && b.fromOpenToHalfOpen(ctx) { return true } + } else if curStatus == HalfOpen && b.probeNumber > 0 { + return true } return false } @@ -675,8 +704,11 @@ func (b *errorCountCircuitBreaker) OnRequestComplete(_ uint64, err error) { } if curStatus == HalfOpen { if err == nil { - b.fromHalfOpenToClosed() - b.resetMetric() + b.addCurProbeNum() + if b.probeNumber == 0 || atomic.LoadUint64(&b.curProbeNumber) >= b.probeNumber { + b.fromHalfOpenToClosed() + b.resetMetric() + } } else { b.fromHalfOpenToOpen(1) } diff --git a/core/circuitbreaker/circuit_breaker_test.go b/core/circuitbreaker/circuit_breaker_test.go index cacd7e033..6f845058a 100644 --- a/core/circuitbreaker/circuit_breaker_test.go +++ b/core/circuitbreaker/circuit_breaker_test.go @@ -61,18 +61,15 @@ type StateChangeListenerMock struct { } func (s *StateChangeListenerMock) OnTransformToClosed(prev State, rule Rule) { - _ = s.Called(prev, rule) logging.Debug("transform to closed", "strategy", rule.Strategy, "prevState", prev.String()) return } func (s *StateChangeListenerMock) OnTransformToOpen(prev State, rule Rule, snapshot interface{}) { - _ = s.Called(prev, rule, snapshot) logging.Debug("transform to open", "strategy", rule.Strategy, "prevState", prev.String(), "snapshot", snapshot) } func (s *StateChangeListenerMock) OnTransformToHalfOpen(prev State, rule Rule) { - _ = s.Called(prev, rule) logging.Debug("transform to Half-Open", "strategy", rule.Strategy, "prevState", prev.String()) } @@ -140,6 +137,35 @@ func TestSlowRtCircuitBreaker_TryPass(t *testing.T) { assert.True(t, pass) assert.True(t, b.state.get() == HalfOpen) }) + + t.Run("TryPass_ProbeNum", func(t *testing.T) { + r := &Rule{ + Resource: "abc", + Strategy: SlowRequestRatio, + RetryTimeoutMs: 3000, + MinRequestAmount: 10, + StatIntervalMs: 10000, + MaxAllowedRtMs: 50, + Threshold: 0.5, + ProbeNum: 10, + } + b, err := newSlowRtCircuitBreaker(r) + assert.Nil(t, err) + + b.state.set(Open) + ctx := &base.EntryContext{ + Resource: base.NewResourceWrapper("abc", base.ResTypeCommon, base.Inbound), + } + e := base.NewSentinelEntry(ctx, base.NewResourceWrapper("abc", base.ResTypeCommon, base.Inbound), nil) + ctx.SetEntry(e) + for i := 0; i < 10; i++ { + pass := b.TryPass(ctx) + assert.True(t, pass) + assert.True(t, b.state.get() == HalfOpen) + b.OnRequestComplete(1, nil) + } + assert.True(t, b.state.get() == Closed) + }) } func TestSlowRt_OnRequestComplete(t *testing.T) { @@ -169,6 +195,20 @@ func TestSlowRt_OnRequestComplete(t *testing.T) { b.OnRequestComplete(10, nil) assert.True(t, b.CurrentState() == Closed) }) + t.Run("OnRequestComplete_ProbeNum_Success", func(t *testing.T) { + b.probeNumber = 2 + b.state.set(HalfOpen) + b.OnRequestComplete(10, nil) + assert.True(t, b.CurrentState() == HalfOpen) + assert.True(t, b.curProbeNumber == 1) + }) + t.Run("OnRequestComplete_ProbeNum_Failed", func(t *testing.T) { + b.probeNumber = 2 + b.state.set(HalfOpen) + b.OnRequestComplete(base.NewEmptyEntryContext().Rt(), nil) + assert.True(t, b.CurrentState() == Open) + assert.True(t, b.curProbeNumber == 0) + }) } func TestSlowRt_ResetBucketTo(t *testing.T) { @@ -227,6 +267,33 @@ func TestErrorRatioCircuitBreaker_TryPass(t *testing.T) { assert.True(t, pass) assert.True(t, b.state.get() == HalfOpen) }) + t.Run("TryPass_ProbeNum", func(t *testing.T) { + r := &Rule{ + Resource: "abc", + Strategy: ErrorRatio, + RetryTimeoutMs: 3000, + MinRequestAmount: 10, + StatIntervalMs: 10000, + Threshold: 0.5, + ProbeNum: 10, + } + b, err := newErrorRatioCircuitBreaker(r) + assert.Nil(t, err) + + b.state.set(Open) + ctx := &base.EntryContext{ + Resource: base.NewResourceWrapper("abc", base.ResTypeCommon, base.Inbound), + } + e := base.NewSentinelEntry(ctx, base.NewResourceWrapper("abc", base.ResTypeCommon, base.Inbound), nil) + ctx.SetEntry(e) + for i := 0; i < 10; i++ { + pass := b.TryPass(ctx) + assert.True(t, pass) + assert.True(t, b.state.get() == HalfOpen) + b.OnRequestComplete(1, nil) + } + assert.True(t, b.state.get() == Closed) + }) } func TestErrorRatio_OnRequestComplete(t *testing.T) { @@ -254,6 +321,20 @@ func TestErrorRatio_OnRequestComplete(t *testing.T) { b.OnRequestComplete(0, errors.New("errorRatio")) assert.True(t, b.CurrentState() == Open) }) + t.Run("OnRequestComplete_ProbeNum_Success", func(t *testing.T) { + b.probeNumber = 2 + b.state.set(HalfOpen) + b.OnRequestComplete(base.NewEmptyEntryContext().Rt(), nil) + assert.True(t, b.CurrentState() == HalfOpen) + assert.True(t, b.curProbeNumber == 1) + }) + t.Run("OnRequestComplete_ProbeNum_Failed", func(t *testing.T) { + b.probeNumber = 2 + b.state.set(HalfOpen) + b.OnRequestComplete(0, errors.New("errorRatio")) + assert.True(t, b.CurrentState() == Open) + assert.True(t, b.curProbeNumber == 0) + }) } func TestErrorRatio_ResetBucketTo(t *testing.T) { @@ -312,6 +393,34 @@ func TestErrorCountCircuitBreaker_TryPass(t *testing.T) { assert.True(t, pass) assert.True(t, b.state.get() == HalfOpen) }) + + t.Run("TryPass_ProbeNum", func(t *testing.T) { + r := &Rule{ + Resource: "abc", + Strategy: ErrorCount, + RetryTimeoutMs: 3000, + MinRequestAmount: 10, + StatIntervalMs: 10000, + Threshold: 1.0, + ProbeNum: 10, + } + b, err := newErrorCountCircuitBreaker(r) + assert.Nil(t, err) + + b.state.set(Open) + ctx := &base.EntryContext{ + Resource: base.NewResourceWrapper("abc", base.ResTypeCommon, base.Inbound), + } + e := base.NewSentinelEntry(ctx, base.NewResourceWrapper("abc", base.ResTypeCommon, base.Inbound), nil) + ctx.SetEntry(e) + for i := 0; i < 10; i++ { + pass := b.TryPass(ctx) + assert.True(t, pass) + assert.True(t, b.state.get() == HalfOpen) + b.OnRequestComplete(1, nil) + } + assert.True(t, b.state.get() == Closed) + }) } func TestErrorCount_OnRequestComplete(t *testing.T) { @@ -339,6 +448,20 @@ func TestErrorCount_OnRequestComplete(t *testing.T) { b.OnRequestComplete(0, errors.New("errorCount")) assert.True(t, b.CurrentState() == Open) }) + t.Run("OnRequestComplete_ProbeNum_Success", func(t *testing.T) { + b.probeNumber = 2 + b.state.set(HalfOpen) + b.OnRequestComplete(base.NewEmptyEntryContext().Rt(), nil) + assert.True(t, b.CurrentState() == HalfOpen) + assert.True(t, b.curProbeNumber == 1) + }) + t.Run("OnRequestComplete_ProbeNum_Failed", func(t *testing.T) { + b.probeNumber = 2 + b.state.set(HalfOpen) + b.OnRequestComplete(0, errors.New("errorCount")) + assert.True(t, b.CurrentState() == Open) + assert.True(t, b.curProbeNumber == 0) + }) } func TestFromClosedToOpen(t *testing.T) { diff --git a/core/circuitbreaker/rule.go b/core/circuitbreaker/rule.go index 4103011e0..0886604bc 100644 --- a/core/circuitbreaker/rule.go +++ b/core/circuitbreaker/rule.go @@ -78,6 +78,11 @@ type Rule struct { // for ErrorRatio, it represents the max error request ratio // for ErrorCount, it represents the max error request count Threshold float64 `json:"threshold"` + //ProbeNum is number of probes required when the circuit breaker is half-open. + //when the probe num are set and circuit breaker in the half-open state. + //if err occurs during the probe, the circuit breaker is opened immediately. + //otherwise,the circuit breaker is closed only after the number of probes is reached + ProbeNum uint64 `json:"probeNum"` } func (r *Rule) String() string {