Skip to content

Commit

Permalink
Enhance OpenCircuitException error reporting (#4534)
Browse files Browse the repository at this point in the history
* Enhance OpenCircuitException to include what caused the circuit breaker to open

* Add unit test

* Update API approver list

* Move LastCaughtException from `IAtomicState` to the abstract `AtomicState` class

* Move LastCaughtException to CircuitBreaker. Make sure CircuitBreaker wwith null cause stays stored as null.

* Revert `CircuitBreaker.Fail()` function signature for binary compatibility.

* Update API approver list

Co-authored-by: Aaron Stannard <aaron@petabridge.com>
  • Loading branch information
Arkatufus and Aaronontheweb committed Aug 7, 2020
1 parent 1bdd3f9 commit 9dfdfa0
Show file tree
Hide file tree
Showing 7 changed files with 149 additions and 30 deletions.
6 changes: 6 additions & 0 deletions src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3891,6 +3891,7 @@ namespace Akka.Pattern
public bool IsClosed { get; }
public bool IsHalfOpen { get; }
public bool IsOpen { get; }
public System.Exception LastCaughtException { get; }
public int MaxFailures { get; }
public System.TimeSpan ResetTimeout { get; }
public static Akka.Pattern.CircuitBreaker Create(int maxFailures, System.TimeSpan callTimeout, System.TimeSpan resetTimeout) { }
Expand All @@ -3917,9 +3918,14 @@ namespace Akka.Pattern
public class OpenCircuitException : Akka.Actor.AkkaException
{
public OpenCircuitException() { }
public OpenCircuitException(System.Exception cause) { }
public OpenCircuitException(string message) { }
public OpenCircuitException(string message, System.Exception cause) { }
}
public class UserCalledFailException : Akka.Actor.AkkaException
{
public UserCalledFailException() { }
}
}
namespace Akka.Routing
{
Expand Down
88 changes: 78 additions & 10 deletions src/core/Akka.Tests/Pattern/CircuitBreakerSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,17 @@ public void Must_increment_failure_count_on_fail_method()
Assert.True(breaker.Instance.CurrentFailureCount == 1);
}

[Fact(DisplayName = "A synchronous circuit breaker that is closed must reset failure count after success method")]
[Fact(DisplayName = "A synchronous circuit breaker that is closed must reset failure count and clears cached last exception after success method")]
public void Must_reset_failure_count_after_success_method()
{
var breaker = MultiFailureCb();
Assert.True(breaker.Instance.CurrentFailureCount == 0);
Assert.True(InterceptExceptionType<TestException>(() => breaker.Instance.WithSyncCircuitBreaker(ThrowException)));
Assert.True(breaker.Instance.CurrentFailureCount == 1);
Assert.True(breaker.Instance.LastCaughtException is TestException);
breaker.Instance.Succeed();
Assert.True(breaker.Instance.CurrentFailureCount == 0);
Assert.True(breaker.Instance.LastCaughtException is null);
}
}

Expand All @@ -101,6 +103,45 @@ public void Should_Pass_Call_And_Transition_To_Close_On_Success( )
Assert.Equal( SayTest( ), result );
}

[Fact(DisplayName = "An asynchronous circuit breaker that is half open should pass only one call until it closes")]
public async Task Should_Pass_Only_One_Call_And_Transition_To_Close_On_Success()
{
var breaker = ShortResetTimeoutCb();
InterceptExceptionType<TestException>(() => breaker.Instance.WithSyncCircuitBreaker(ThrowException));
Assert.True(CheckLatch(breaker.HalfOpenLatch));

var task1 = breaker.Instance.WithCircuitBreaker(() => DelayedSayTest(TimeSpan.FromSeconds(0.1)));
var task2 = breaker.Instance.WithCircuitBreaker(() => DelayedSayTest(TimeSpan.FromSeconds(0.1)));
var combined = Task.WhenAny(task1, task2).Unwrap();

// One of the 2 tasks will throw, because the circuit breaker is half open
Exception caughtException = null;
try
{
await combined;
}
catch (Exception e)
{
caughtException = e;
}
Assert.True(caughtException is OpenCircuitException);
Assert.StartsWith("Circuit breaker is half open", caughtException.Message);

// Wait until one of task completes
await Task.Delay(TimeSpan.FromSeconds(0.25));
Assert.True(CheckLatch(breaker.ClosedLatch));

// We don't know which one of the task got faulted
string result = null;
if (task1.IsCompleted && !task1.IsFaulted)
result = task1.Result;
else if (task2.IsCompleted && !task2.IsFaulted)
result = task2.Result;

Assert.Equal(SayTest(), result);
}


[Fact(DisplayName = "A synchronous circuit breaker that is half open should pass call and transition to open on exception")]
public void Should_Pass_Call_And_Transition_To_Open_On_Exception( )
{
Expand Down Expand Up @@ -239,6 +280,7 @@ public void Should_Increment_Failure_Count_When_Call_Times_Out( )

Assert.True( CheckLatch( breaker.OpenLatch ) );
Assert.Equal( 1, breaker.Instance.CurrentFailureCount );
Assert.True(breaker.Instance.LastCaughtException is TimeoutException);
}
}

Expand Down Expand Up @@ -320,6 +362,12 @@ public Task Delay( TimeSpan toDelay, CancellationToken? token )
return token.HasValue ? Task.Delay( toDelay, token.Value ) : Task.Delay( toDelay );
}

public async Task<string> DelayedSayTest(TimeSpan delay)
{
await Task.Delay(delay);
return "Test";
}

public void ThrowException( )
{
throw new TestException( "Test Exception" );
Expand All @@ -340,26 +388,46 @@ public string SayTest( )
}
catch ( Exception ex )
{
var aggregate = ex as AggregateException;
if ( aggregate != null )
if (ex is AggregateException aggregate)
{

// ReSharper disable once UnusedVariable
foreach ( var temp in aggregate.InnerExceptions.Select( innerException => innerException as T ).Where( temp => temp == null ) )
foreach (var temp in aggregate
.InnerExceptions
.Where(t => !(t is T)))
{
throw;
}
}
else
} else if (!(ex is T))
{
var temp = ex as T;
throw;
}
}
return true;
}

if ( temp == null )
public async Task<bool> InterceptExceptionTypeAsync<T>(Task action) where T : Exception
{
try
{
await action;
return false;
}
catch (Exception ex)
{
if (ex is AggregateException aggregate)
{
// ReSharper disable once UnusedVariable
foreach (var temp in aggregate
.InnerExceptions
.Where(t => !(t is T)))
{
throw;
}
}

else if (!(ex is T))
{
throw;
}
}
return true;
}
Expand Down
9 changes: 8 additions & 1 deletion src/core/Akka/Pattern/CircuitBreaker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ public long CurrentFailureCount
get { return Closed.Current; }
}

public Exception LastCaughtException { get; private set; }


/// <summary>
/// Wraps invocation of asynchronous calls that need to be protected
/// </summary>
Expand Down Expand Up @@ -197,12 +200,16 @@ public T WithSyncCircuitBreaker<T>(Func<T> body)
/// </summary>
public void Succeed() => _currentState.CallSucceeds();

internal void OnSuccess() => LastCaughtException = null;

/// <summary>
/// Mark a failed call through CircuitBreaker. Sometimes the callee of CircuitBreaker sends back a message to the
/// caller Actor. In such a case, it is convenient to mark a failed call instead of using Future
/// via <see cref="WithCircuitBreaker"/>
/// </summary>
public void Fail() => _currentState.CallFails();
public void Fail() => _currentState.CallFails(new UserCalledFailException());

internal void OnFail(Exception cause) => LastCaughtException = cause;

/// <summary>
/// Return true if the internal state is Closed. WARNING: It is a "power API" call which you should use with care.
Expand Down
29 changes: 20 additions & 9 deletions src/core/Akka/Pattern/CircuitBreakerState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//-----------------------------------------------------------------------

using System;
using System.Diagnostics;
using System.Globalization;
using System.Threading.Tasks;
using Akka.Util;
Expand Down Expand Up @@ -37,9 +38,9 @@ public Open(CircuitBreaker breaker)
/// <param name="body">N/A</param>
/// <exception cref="OpenCircuitException">This exception is thrown automatically since the circuit is open.</exception>
/// <returns>N/A</returns>
public override async Task<T> Invoke<T>(Func<Task<T>> body)
public override Task<T> Invoke<T>(Func<Task<T>> body)
{
throw new OpenCircuitException();
throw new OpenCircuitException(_breaker.LastCaughtException);
}

/// <summary>
Expand All @@ -48,23 +49,29 @@ public override async Task<T> Invoke<T>(Func<Task<T>> body)
/// <param name="body">N/A</param>
/// <exception cref="OpenCircuitException">This exception is thrown automatically since the circuit is open.</exception>
/// <returns>N/A</returns>
public override async Task Invoke(Func<Task> body)
public override Task Invoke(Func<Task> body)
{
throw new OpenCircuitException();
throw new OpenCircuitException(_breaker.LastCaughtException);
}

/// <summary>
/// No-op for open, calls are never executed so cannot succeed or fail
/// </summary>
protected internal override void CallFails()
protected internal override void CallFails(Exception cause)
{
// This is a no-op, but CallFails() can be called from CircuitBreaker
// (The function summary is a lie)
Debug.WriteLine($"Ignoring calls to [CallFails()] because {nameof(CircuitBreaker)} is in open state. Exception cause was: {cause}");
}

/// <summary>
/// No-op for open, calls are never executed so cannot succeed or fail
/// </summary>
protected internal override void CallSucceeds()
{
// This is a no-op, but CallSucceeds() can be called from CircuitBreaker
// (The function summary is a lie)
Debug.WriteLine($"Ignoring calls to [CallSucceeds()] because {nameof(CircuitBreaker)} is in open state.");
}

/// <summary>
Expand Down Expand Up @@ -113,7 +120,7 @@ public override async Task<T> Invoke<T>(Func<Task<T>> body)
{
if (!_lock.CompareAndSet(true, false))
{
throw new OpenCircuitException();
throw new OpenCircuitException("Circuit breaker is half open, only one call is allowed; this call is failing fast.", _breaker.LastCaughtException);
}
return await CallThrough(body);
}
Expand All @@ -129,16 +136,17 @@ public override async Task Invoke(Func<Task> body)
{
if (!_lock.CompareAndSet(true, false))
{
throw new OpenCircuitException();
throw new OpenCircuitException("Circuit breaker is half open, only one call is allowed; this call is failing fast.", _breaker.LastCaughtException);
}
await CallThrough(body);
}

/// <summary>
/// Reopen breaker on failed call.
/// </summary>
protected internal override void CallFails()
protected internal override void CallFails(Exception cause)
{
_breaker.OnFail(cause);
_breaker.TripBreaker(this);
}

Expand All @@ -147,6 +155,7 @@ protected internal override void CallFails()
/// </summary>
protected internal override void CallSucceeds()
{
_breaker.OnSuccess();
_breaker.ResetBreaker();
}

Expand Down Expand Up @@ -210,8 +219,9 @@ public override Task Invoke(Func<Task> body)
/// On failed call, the failure count is incremented. The count is checked against the configured maxFailures, and
/// the breaker is tripped if we have reached maxFailures.
/// </summary>
protected internal override void CallFails()
protected internal override void CallFails(Exception cause)
{
_breaker.OnFail(cause);
if (IncrementAndGet() == _breaker.MaxFailures)
{
_breaker.TripBreaker(this);
Expand All @@ -223,6 +233,7 @@ protected internal override void CallFails()
/// </summary>
protected internal override void CallSucceeds()
{
_breaker.OnSuccess();
Reset();
}

Expand Down
5 changes: 5 additions & 0 deletions src/core/Akka/Pattern/OpenCircuitException.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ public class OpenCircuitException : AkkaException
/// </summary>
public OpenCircuitException() : base("Circuit Breaker is open; calls are failing fast") { }

public OpenCircuitException(Exception cause)
: base("Circuit Breaker is open; calls are failing fast", cause)
{
}

/// <summary>
/// Initializes a new instance of the <see cref="OpenCircuitException"/> class.
/// </summary>
Expand Down
13 changes: 13 additions & 0 deletions src/core/Akka/Pattern/UserCalledFailException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using System;
using System.Collections.Generic;
using System.Text;
using Akka.Actor;

namespace Akka.Pattern
{
public class UserCalledFailException : AkkaException
{
public UserCalledFailException() : base($"User code caused [{nameof(CircuitBreaker)}] to fail because it calls the [{nameof(CircuitBreaker.Fail)}()] method.")
{ }
}
}
29 changes: 19 additions & 10 deletions src/core/Akka/Util/Internal/AtomicState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,16 @@ public async Task<T> CallThrough<T>(Func<Task<T>> task)
capturedException = ExceptionDispatchInfo.Capture(ex);
}

bool throwException = capturedException != null;
if (throwException || DateTime.UtcNow.CompareTo(deadline) >= 0)
// Need to make sure that timeouts are reported as timeouts
if (capturedException != null)
{
CallFails();
if (throwException)
capturedException.Throw();
CallFails(capturedException.SourceException);
capturedException.Throw();
}
else if (DateTime.UtcNow.CompareTo(deadline) >= 0)
{
CallFails(new TimeoutException(
$"Execution did not complete within the time allotted {_callTimeout.TotalMilliseconds} ms"));
}
else
{
Expand Down Expand Up @@ -135,11 +139,16 @@ public async Task CallThrough(Func<Task> task)
capturedException = ExceptionDispatchInfo.Capture(ex);
}

bool throwException = capturedException != null;
if (throwException || DateTime.UtcNow.CompareTo(deadline) >= 0)
// Need to make sure that timeouts are reported as timeouts
if (capturedException != null)
{
CallFails(capturedException?.SourceException);
capturedException.Throw();
}
else if (DateTime.UtcNow.CompareTo(deadline) >= 0)
{
CallFails();
if (throwException) capturedException.Throw();
CallFails(new TimeoutException(
$"Execution did not complete within the time allotted {_callTimeout.TotalMilliseconds} ms"));
}
else
{
Expand Down Expand Up @@ -167,7 +176,7 @@ public async Task CallThrough(Func<Task> task)
/// <summary>
/// Invoked when call fails
/// </summary>
protected internal abstract void CallFails();
protected internal abstract void CallFails(Exception cause);

/// <summary>
/// Invoked when call succeeds
Expand Down

0 comments on commit 9dfdfa0

Please sign in to comment.