From 5296c64273e9f5e5d9048dab076a4742a50a8057 Mon Sep 17 00:00:00 2001 From: Drew Date: Mon, 7 Mar 2022 08:55:56 -0500 Subject: [PATCH] Add Stateful methods for circuitbreaker (#5650) * Add Stateful methods for circuitbreaker * api docs * fix api docs Co-authored-by: Gregorius Soedharmo Co-authored-by: Aaron Stannard --- .../CoreAPISpec.ApproveCore.approved.txt | 2 + src/core/Akka/Pattern/CircuitBreaker.cs | 14 +++- src/core/Akka/Pattern/CircuitBreakerState.cs | 52 +++++++++++--- src/core/Akka/Util/Internal/AtomicState.cs | 68 +++++++++++++++++++ 4 files changed, 126 insertions(+), 10 deletions(-) diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt index c4798d4ee33..08805ed01dd 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt @@ -4045,7 +4045,9 @@ namespace Akka.Pattern public Akka.Pattern.CircuitBreaker OnOpen(System.Action callback) { } public void Succeed() { } public System.Threading.Tasks.Task WithCircuitBreaker(System.Func> body) { } + public System.Threading.Tasks.Task WithCircuitBreaker(TState state, System.Func> body) { } public System.Threading.Tasks.Task WithCircuitBreaker(System.Func body) { } + public System.Threading.Tasks.Task WithCircuitBreaker(TState state, System.Func body) { } public Akka.Pattern.CircuitBreaker WithExponentialBackoff(System.TimeSpan maxResetTimeout) { } public Akka.Pattern.CircuitBreaker WithRandomFactor(double randomFactor) { } public void WithSyncCircuitBreaker(System.Action body) { } diff --git a/src/core/Akka/Pattern/CircuitBreaker.cs b/src/core/Akka/Pattern/CircuitBreaker.cs index 5c3fc735f42..45d738ecb82 100644 --- a/src/core/Akka/Pattern/CircuitBreaker.cs +++ b/src/core/Akka/Pattern/CircuitBreaker.cs @@ -232,6 +232,12 @@ public Task WithCircuitBreaker(Func> body) return CurrentState.Invoke(body); } + public Task WithCircuitBreaker(TState state, + Func> body) + { + return CurrentState.InvokeState(state, body); + } + /// /// Wraps invocation of asynchronous calls that need to be protected /// @@ -241,6 +247,10 @@ public Task WithCircuitBreaker(Func body) { return CurrentState.Invoke(body); } + public Task WithCircuitBreaker(TState state, Func body) + { + return CurrentState.InvokeState(state, body); + } /// /// The failure will be recorded farther down. @@ -248,7 +258,7 @@ public Task WithCircuitBreaker(Func body) /// TBD public void WithSyncCircuitBreaker(Action body) { - var cbTask = WithCircuitBreaker(() => Task.Factory.StartNew(body)); + var cbTask = WithCircuitBreaker(body,(b) => Task.Factory.StartNew(b)); if (!cbTask.Wait(CallTimeout)) { //throw new TimeoutException( string.Format( "Execution did not complete within the time allotted {0} ms", CallTimeout.TotalMilliseconds ) ); @@ -275,7 +285,7 @@ public void WithSyncCircuitBreaker(Action body) /// or default() public T WithSyncCircuitBreaker(Func body) { - var cbTask = WithCircuitBreaker(() => Task.Factory.StartNew(body)); + var cbTask = WithCircuitBreaker(body,(b) => Task.Factory.StartNew(b)); return cbTask.Wait(CallTimeout) ? cbTask.Result : default(T); } diff --git a/src/core/Akka/Pattern/CircuitBreakerState.cs b/src/core/Akka/Pattern/CircuitBreakerState.cs index f225867e13f..c335ec9ac9e 100644 --- a/src/core/Akka/Pattern/CircuitBreakerState.cs +++ b/src/core/Akka/Pattern/CircuitBreakerState.cs @@ -51,6 +51,17 @@ private TimeSpan RemainingDuration() public override Task Invoke(Func> body) => Task.FromException(new OpenCircuitException(_breaker.LastCaughtException, RemainingDuration())); + public override Task + InvokeState(TState state, Func body) => + Task.FromException( + new OpenCircuitException(_breaker.LastCaughtException, + RemainingDuration())); + + public override Task InvokeState(TState state, + Func> body) => Task.FromException( + new OpenCircuitException(_breaker.LastCaughtException, + RemainingDuration())); + /// /// Fail-fast on any invocation /// @@ -121,6 +132,14 @@ public HalfOpen(CircuitBreaker breaker) _lock = new AtomicBoolean(); } + private void CheckState() + { + if (!_lock.CompareAndSet(true, false)) + { + throw new OpenCircuitException("Circuit breaker is half open, only one call is allowed; this call is failing fast.", _breaker.LastCaughtException, TimeSpan.Zero); + } + } + /// /// Allows a single call through, during which all other callers fail-fast. If the call fails, the breaker reopens. /// If the call succeeds, the breaker closes. @@ -130,12 +149,15 @@ public HalfOpen(CircuitBreaker breaker) /// containing result of protected call public override async Task Invoke(Func> body) { - if (!_lock.CompareAndSet(true, false)) - { - throw new OpenCircuitException("Circuit breaker is half open, only one call is allowed; this call is failing fast.", _breaker.LastCaughtException, TimeSpan.Zero); - } + CheckState(); return await CallThrough(body); } + + public override async Task InvokeState(TState state, Func> body) + { + CheckState(); + return await CallThrough(state,body); + } /// /// Allows a single call through, during which all other callers fail-fast. If the call fails, the breaker reopens. @@ -145,13 +167,17 @@ public override async Task Invoke(Func> body) /// containing result of protected call public override async Task Invoke(Func body) { - if (!_lock.CompareAndSet(true, false)) - { - throw new OpenCircuitException("Circuit breaker is half open, only one call is allowed; this call is failing fast.", _breaker.LastCaughtException, TimeSpan.Zero); - } + CheckState(); await CallThrough(body); } + public override async Task InvokeState(TState state, + Func body) + { + CheckState(); + await CallThrough(state,body); + } + /// /// Reopen breaker on failed call. /// @@ -216,6 +242,11 @@ public override Task Invoke(Func> body) return CallThrough(body); } + public override Task InvokeState(TState state, Func> body) + { + return CallThrough(state, body); + } + /// /// Implementation of invoke, which simply attempts the call /// @@ -226,6 +257,11 @@ public override Task Invoke(Func body) return CallThrough(body); } + public override Task InvokeState(TState state, Func body) + { + return CallThrough(state, body); + } + /// /// On failed call, the failure count is incremented. The count is checked against the configured maxFailures, and /// the breaker is tripped if we have reached maxFailures. diff --git a/src/core/Akka/Util/Internal/AtomicState.cs b/src/core/Akka/Util/Internal/AtomicState.cs index db50b9d2d16..81878a91ec2 100644 --- a/src/core/Akka/Util/Internal/AtomicState.cs +++ b/src/core/Akka/Util/Internal/AtomicState.cs @@ -113,6 +113,38 @@ public async Task CallThrough(Func> task) } return result; } + + public async Task CallThrough(TState state, Func> task) + { + var deadline = DateTime.UtcNow.Add(_callTimeout); + ExceptionDispatchInfo capturedException = null; + T result = default(T); + try + { + result = await task(state).ConfigureAwait(false); + } + catch (Exception ex) + { + capturedException = ExceptionDispatchInfo.Capture(ex); + } + + // Need to make sure that timeouts are reported as timeouts + if (capturedException != null) + { + CallFails(capturedException.SourceException); + capturedException.Throw(); + } + else if (DateTime.UtcNow.CompareTo(deadline) >= 0) + { + CallFails(new TimeoutException( + $"Execution did not complete within the time allotted {_callTimeout.TotalMilliseconds} ms")); + } + else + { + CallSucceeds(); + } + return result; + } /// /// Shared implementation of call across all states. Thrown exception or execution of the call beyond the allowed @@ -154,8 +186,37 @@ public async Task CallThrough(Func task) { CallSucceeds(); } + } + + public async Task CallThrough(TState state, Func task) + { + var deadline = DateTime.UtcNow.Add(_callTimeout); + ExceptionDispatchInfo capturedException = null; + try + { + await task(state).ConfigureAwait(false); + } + catch (Exception ex) + { + capturedException = ExceptionDispatchInfo.Capture(ex); + } + // Need to make sure that timeouts are reported as timeouts + if (capturedException != null) + { + CallFails(capturedException?.SourceException); + capturedException.Throw(); + } + else if (DateTime.UtcNow.CompareTo(deadline) >= 0) + { + CallFails(new TimeoutException( + $"Execution did not complete within the time allotted {_callTimeout.TotalMilliseconds} ms")); + } + else + { + CallSucceeds(); + } } /// @@ -166,6 +227,9 @@ public async Task CallThrough(Func task) /// containing result of protected call public abstract Task Invoke(Func> body); + public abstract Task InvokeState(TState state, + Func> body); + /// /// Abstract entry point for all states /// @@ -173,6 +237,10 @@ public async Task CallThrough(Func task) /// containing result of protected call public abstract Task Invoke(Func body); + public abstract Task InvokeState(TState state, + Func body); + + /// /// Invoked when call fails ///