Skip to content

Commit

Permalink
Add circuit breaker exponential backoff support
Browse files Browse the repository at this point in the history
  • Loading branch information
ismaelhamed committed Mar 22, 2020
1 parent b62842e commit 7ba587d
Show file tree
Hide file tree
Showing 7 changed files with 230 additions and 78 deletions.
30 changes: 14 additions & 16 deletions docs/articles/utilities/circuit-breaker.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,23 @@ The Akka.NET library provides an implementation of a circuit breaker called `Akk
## What do they do?

* During normal operation, a circuit breaker is in the `Closed` state:
* Exceptions or calls exceeding the configured `СallTimeout` increment a
failure counter
* Successes reset the failure count to zero
* When the failure counter reaches a `MaxFailures` count, the breaker is
tripped into `Open` state
* Exceptions or calls exceeding the configured `СallTimeout` increment a failure counter
* Successes reset the failure count to zero
* When the failure counter reaches a `MaxFailures` count, the breaker is tripped into `Open` state

* While in `Open` state:
* All calls fail-fast with a `OpenCircuitException`
* After the configured `ResetTimeout`, the circuit breaker enters a
`Half-Open` state
* All calls fail-fast with a `OpenCircuitException`
* After the configured `ResetTimeout`, the circuit breaker enters a `Half-Open` state

* In `Half-Open` state:
* The first call attempted is allowed through without failing fast
* All other calls fail-fast with an exception just as in `Open` state
* If the first call succeeds, the breaker is reset back to `Closed` state
* If the first call fails, the breaker is tripped again into the `Open` state
for another full `ResetTimeout`
* The first call attempted is allowed through without failing fast
* All other calls fail-fast with an exception just as in `Open` state
* If the first call succeeds, the breaker is reset back to `Closed` state and the `ResetTimeout` is reset
* If the first call fails, the breaker is tripped again into the `Open` state (as for exponential backoff circuit breaker, the `ResetTimeout` is multiplied by the exponential backoff factor)

* State transition listeners:
* Callbacks can be provided for every state entry via `OnOpen`, `OnClose`,
and `OnHalfOpen`
* These are executed in the `ExecutionContext` provided.
* Callbacks can be provided for every state entry via `OnOpen`, `OnClose`, and `OnHalfOpen`
* These are executed in the `ExecutionContext` provided.

![Circuit breaker states](/images/circuit-breaker-states.png)

Expand Down
14 changes: 11 additions & 3 deletions src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3810,22 +3810,27 @@ namespace Akka.Pattern
}
public class CircuitBreaker
{
public CircuitBreaker(int maxFailures, System.TimeSpan callTimeout, System.TimeSpan resetTimeout) { }
public CircuitBreaker(Akka.Actor.IScheduler scheduler, int maxFailures, System.TimeSpan callTimeout, System.TimeSpan resetTimeout) { }
public CircuitBreaker(Akka.Actor.IScheduler scheduler, int maxFailures, System.TimeSpan callTimeout, System.TimeSpan resetTimeout, System.TimeSpan maxResetTimeout, double exponentialBackoffFactor) { }
public System.TimeSpan CallTimeout { get; }
public long CurrentFailureCount { get; }
public double ExponentialBackoffFactor { get; }
public bool IsClosed { get; }
public bool IsHalfOpen { get; }
public bool IsOpen { get; }
public int MaxFailures { get; }
public System.TimeSpan MaxResetTimeout { get; }
public System.TimeSpan ResetTimeout { get; }
public static Akka.Pattern.CircuitBreaker Create(int maxFailures, System.TimeSpan callTimeout, System.TimeSpan resetTimeout) { }
public Akka.Actor.IScheduler Scheduler { get; }
public static Akka.Pattern.CircuitBreaker Create(Akka.Actor.IScheduler scheduler, int maxFailures, System.TimeSpan callTimeout, System.TimeSpan resetTimeout) { }
public void Fail() { }
public Akka.Pattern.CircuitBreaker OnClose(System.Action callback) { }
public Akka.Pattern.CircuitBreaker OnHalfOpen(System.Action callback) { }
public Akka.Pattern.CircuitBreaker OnOpen(System.Action callback) { }
public void Succeed() { }
public System.Threading.Tasks.Task<T> WithCircuitBreaker<T>(System.Func<System.Threading.Tasks.Task<T>> body) { }
public System.Threading.Tasks.Task WithCircuitBreaker(System.Func<System.Threading.Tasks.Task> body) { }
public Akka.Pattern.CircuitBreaker WithExponentialBackoff(System.TimeSpan maxResetTimeout) { }
public void WithSyncCircuitBreaker(System.Action body) { }
public T WithSyncCircuitBreaker<T>(System.Func<T> body) { }
}
Expand All @@ -3841,9 +3846,12 @@ namespace Akka.Pattern
}
public class OpenCircuitException : Akka.Actor.AkkaException
{
public OpenCircuitException() { }
public OpenCircuitException(System.TimeSpan remainingDuration) { }
public OpenCircuitException(string message) { }
public OpenCircuitException(string message, System.TimeSpan remainingDuration) { }
public OpenCircuitException(string message, System.Exception cause) { }
public OpenCircuitException(string message, System.Exception cause, System.TimeSpan remainingDuration) { }
public System.TimeSpan RemainingDuration { get; }
}
}
namespace Akka.Routing
Expand Down
17 changes: 9 additions & 8 deletions src/core/Akka.Docs.Tests/Utilities/CircuitBreakerDocSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ public class DangerousActor : ReceiveActor
public DangerousActor()
{
var breaker = new CircuitBreaker(
maxFailures: 5,
callTimeout: TimeSpan.FromSeconds(10),
resetTimeout: TimeSpan.FromMinutes(1))
.OnOpen(NotifyMeOnOpen);
Context.System.Scheduler,
maxFailures: 5,
callTimeout: TimeSpan.FromSeconds(10),
resetTimeout: TimeSpan.FromMinutes(1)).OnOpen(NotifyMeOnOpen);
}

private void NotifyMeOnOpen()
Expand All @@ -42,10 +42,10 @@ public class DangerousActorCallProtection : ReceiveActor
public DangerousActorCallProtection()
{
var breaker = new CircuitBreaker(
maxFailures: 5,
callTimeout: TimeSpan.FromSeconds(10),
resetTimeout: TimeSpan.FromMinutes(1))
.OnOpen(NotifyMeOnOpen);
Context.System.Scheduler,
maxFailures: 5,
callTimeout: TimeSpan.FromSeconds(10),
resetTimeout: TimeSpan.FromMinutes(1)).OnOpen(NotifyMeOnOpen);

var dangerousCall = "This really isn't that dangerous of a call after all";

Expand Down Expand Up @@ -77,6 +77,7 @@ public TellPatternActor(IActorRef recipient )
{
_recipient = recipient;
_breaker = new CircuitBreaker(
Context.System.Scheduler,
maxFailures: 5,
callTimeout: TimeSpan.FromSeconds(10),
resetTimeout: TimeSpan.FromMinutes(1)).OnOpen(NotifyMeOnOpen);
Expand Down
63 changes: 51 additions & 12 deletions src/core/Akka.Tests/Pattern/CircuitBreakerSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//-----------------------------------------------------------------------

using System;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Runtime.Serialization;
Expand Down Expand Up @@ -262,7 +263,6 @@ public void Should_Pass_Call_And_Transition_To_Open_On_Exception( )
{
var breaker = ShortResetTimeoutCb( );


Assert.True( InterceptExceptionType<TestException>( ( ) => breaker.Instance.WithCircuitBreaker( () => Task.Factory.StartNew( ThrowException ) ).Wait( ) ) );
Assert.True( CheckLatch( breaker.HalfOpenLatch ) );

Expand Down Expand Up @@ -303,6 +303,29 @@ public void Should_Transition_To_Half_Open_When_Reset_Timeout( )
Assert.True( InterceptExceptionType<TestException>( ( ) => breaker.Instance.WithCircuitBreaker( () => Task.Factory.StartNew( ThrowException ) ).Wait( ) ) );
Assert.True( CheckLatch( breaker.HalfOpenLatch ) );
}

[Fact(DisplayName = "An asynchronous circuit breaker that is open should increase the reset timeout after it transits to open again")]
public void Should_Reset_Timeout_After_It_Transits_To_Open_Again()
{
var breaker = NonOneFactorCb();
Assert.True(InterceptExceptionType<TestException>(() => breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)).Wait()));
Assert.True(CheckLatch(breaker.OpenLatch));

var e1 = InterceptException<OpenCircuitException>(() => breaker.Instance.WithSyncCircuitBreaker(SayTest));
var shortRemainingDuration = e1.RemainingDuration;

Thread.Sleep(1000);
Assert.True(CheckLatch(breaker.HalfOpenLatch));

// transit to open again
Assert.True(InterceptExceptionType<TestException>(() => breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)).Wait()));
Assert.True(CheckLatch(breaker.OpenLatch));

var e2 = InterceptException<OpenCircuitException>(() => breaker.Instance.WithSyncCircuitBreaker(SayTest));
var longRemainingDuration = e2.RemainingDuration;

Assert.True(shortRemainingDuration < longRemainingDuration);
}
}

public class CircuitBreakerSpecBase : AkkaSpec
Expand All @@ -320,14 +343,25 @@ public Task Delay( TimeSpan toDelay, CancellationToken? token )
return token.HasValue ? Task.Delay( toDelay, token.Value ) : Task.Delay( toDelay );
}

public void ThrowException( )
{
throw new TestException( "Test Exception" );
}
[DebuggerStepThrough]
public void ThrowException() => throw new TestException("Test Exception");

public string SayTest() => "Test";

public string SayTest( )
protected T InterceptException<T>(Action actionThatThrows) where T : Exception
{
return "Test";
return Assert.Throws<T>(() =>
{
try
{
actionThatThrows();
}
catch (AggregateException ex)
{
foreach (var e in ex.Flatten().InnerExceptions.Where(e => e is T).Select(e => e))
throw e;
}
});
}

[SuppressMessage( "Microsoft.Design", "CA1004:GenericMethodsShouldProvideTypeParameter" )]
Expand Down Expand Up @@ -366,27 +400,32 @@ public string SayTest( )

public TestBreaker ShortCallTimeoutCb( )
{
return new TestBreaker( new CircuitBreaker( 1, TimeSpan.FromMilliseconds( 50 ), TimeSpan.FromMilliseconds( 500 ) ) );
return new TestBreaker( new CircuitBreaker(Sys.Scheduler, 1, TimeSpan.FromMilliseconds( 50 ), TimeSpan.FromMilliseconds( 500 ) ) );
}

public TestBreaker ShortResetTimeoutCb( )
{
return new TestBreaker( new CircuitBreaker( 1, TimeSpan.FromMilliseconds( 1000 ), TimeSpan.FromMilliseconds( 50 ) ) );
return new TestBreaker( new CircuitBreaker(Sys.Scheduler, 1, TimeSpan.FromMilliseconds( 1000 ), TimeSpan.FromMilliseconds( 50 ) ) );
}

public TestBreaker LongCallTimeoutCb( )
{
return new TestBreaker( new CircuitBreaker( 1, TimeSpan.FromMilliseconds( 5000 ), TimeSpan.FromMilliseconds( 500 ) ) );
return new TestBreaker( new CircuitBreaker(Sys.Scheduler, 1, TimeSpan.FromMilliseconds( 5000 ), TimeSpan.FromMilliseconds( 500 ) ) );
}

public TestBreaker LongResetTimeoutCb( )
{
return new TestBreaker( new CircuitBreaker( 1, TimeSpan.FromMilliseconds( 100 ), TimeSpan.FromMilliseconds( 5000 ) ) );
return new TestBreaker( new CircuitBreaker(Sys.Scheduler, 1, TimeSpan.FromMilliseconds( 100 ), TimeSpan.FromMilliseconds( 5000 ) ) );
}

public TestBreaker MultiFailureCb( )
{
return new TestBreaker( new CircuitBreaker( 5, TimeSpan.FromMilliseconds( 200 ), TimeSpan.FromMilliseconds( 500 ) ) );
return new TestBreaker( new CircuitBreaker(Sys.Scheduler, 5, TimeSpan.FromMilliseconds( 200 ), TimeSpan.FromMilliseconds( 500 ) ) );
}

public TestBreaker NonOneFactorCb()
{
return new TestBreaker(new CircuitBreaker(Sys.Scheduler, 1, TimeSpan.FromMilliseconds(2000), TimeSpan.FromMilliseconds(1000), TimeSpan.FromDays(1), 5));
}
}

Expand Down

0 comments on commit 7ba587d

Please sign in to comment.