Skip to content

Commit

Permalink
docs: Document and show named circuitbreaker as the main option (#32417)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: Enno Runne <458526+ennru@users.noreply.github.com>
  • Loading branch information
johanandren and ennru committed May 22, 2024
1 parent 902efad commit 69cf92f
Show file tree
Hide file tree
Showing 6 changed files with 412 additions and 249 deletions.
57 changes: 33 additions & 24 deletions akka-docs/src/main/paradox/common/circuitbreaker.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,39 +65,38 @@ The Akka library provides an implementation of a circuit breaker called

### Initialization

Here's how a @apidoc[CircuitBreaker] would be configured for:
Here's how a named @apidoc[CircuitBreaker] is configured with the name `data-access`:

* 5 maximum failures
* a call timeout of 10 seconds
* a reset timeout of 1 minute

@@snip [application.conf](/akka-docs/src/test/scala/docs/circuitbreaker/CircuitBreakerDocSpec.scala) { #config }

The circuit breaker is created on first access with the same name, subsequent lookups will return the same circuit breaker
instance. Looking up the circuit breaker and using it looks like this:

Scala
: @@snip [CircuitBreakerDocSpec.scala](/akka-docs/src/test/scala/docs/circuitbreaker/CircuitBreakerDocSpec.scala) { #imports1 #circuit-breaker-initialization }
: @@snip [CircuitBreakerDocSpec.scala](/akka-docs/src/test/scala/docs/circuitbreaker/CircuitBreakerDocSpec.scala) { #circuit-breaker-initialization }

Java
: @@snip [DangerousJavaActor.java](/akka-docs/src/test/java/jdocs/circuitbreaker/DangerousJavaActor.java) { #imports1 #circuit-breaker-initialization }
: @@snip [DangerousJavaActor.java](/akka-docs/src/test/java/jdocs/circuitbreaker/CircuitBreakerDocTest.java) { #circuit-breaker-initialization }

### Future & Synchronous based API

Once a circuit breaker actor has been initialized, interacting with that actor is done by either using the Future based API or the synchronous API. Both of these APIs are considered `Call Protection` because whether synchronously or asynchronously, the purpose of the circuit breaker is to protect your system from cascading failures while making a call to another service. In the future based API, we use the @scala[@scaladoc[withCircuitBreaker](akka.pattern.CircuitBreaker#withCircuitBreaker[T](body:=%3Escala.concurrent.Future[T]):scala.concurrent.Future[T])]@java[@javadoc[callWithCircuitBreakerCS](akka.pattern.CircuitBreaker#callWithCircuitBreakerCS(java.util.concurrent.Callable))] which takes an asynchronous method (some method wrapped in a @scala[@scaladoc[Future](scala.concurrent.Future)]@java[@javadoc[CompletionState](java.util.concurrent.CompletionStage)]), for instance a call to retrieve data from a database, and we pipe the result back to the sender. If for some reason the database in this example isn't responding, or there is another issue, the circuit breaker will open and stop trying to hit the database again and again until the timeout is over.
Once a circuit breaker actor has been initialized, interacting with that actor is done by either using the Future based API or the synchronous API. Both of these APIs are considered `Call Protection` because whether synchronously or asynchronously, the purpose of the circuit breaker is to protect your system from cascading failures while making a call to another service.

The Synchronous API would also wrap your call with the circuit breaker logic, however, it uses the @scala[@scaladoc[withSyncCircuitBreaker](akka.pattern.CircuitBreaker#withSyncCircuitBreaker[T](body:=%3ET):T)]@java[@javadoc[callWithSyncCircuitBreaker](akka.pattern.CircuitBreaker#callWithSyncCircuitBreaker(java.util.concurrent.Callable))] and receives a method that is not wrapped in a @scala[@scaladoc[Future](scala.concurrent.Future)]@java[@javadoc[CompletionState](java.util.concurrent.CompletionStage)].
In the future based API, we use the @scala[@scaladoc[withCircuitBreaker](akka.pattern.CircuitBreaker#withCircuitBreaker[T](body:=%3Escala.concurrent.Future[T]):scala.concurrent.Future[T])]@java[@javadoc[callWithCircuitBreakerCS](akka.pattern.CircuitBreaker#callWithCircuitBreakerCS(java.util.concurrent.Callable))] which takes an asynchronous method (some method wrapped in a @scala[@scaladoc[Future](scala.concurrent.Future)]@java[@javadoc[CompletionState](java.util.concurrent.CompletionStage)]), for instance a call to retrieve data from a service, and we pipe the result back to the sender. If for some reason the service in this example isn't responding, or there is another issue, the circuit breaker will open and stop trying to hit the service again and again until the timeout is reached.

Scala
: @@snip [CircuitBreakerDocSpec.scala](/akka-docs/src/test/scala/docs/circuitbreaker/CircuitBreakerDocSpec.scala) { #circuit-breaker-usage }

Java
: @@snip [DangerousJavaActor.java](/akka-docs/src/test/java/jdocs/circuitbreaker/DangerousJavaActor.java) { #circuit-breaker-usage }
: @@snip [CircuitBreakerDocTest.java](/akka-docs/src/test/java/jdocs/circuitbreaker/CircuitBreakerDocTest.java) { #circuit-breaker-usage }

@@@ note
The Synchronous API would also wrap your call with the circuit breaker logic, however, it uses the @scala[@scaladoc[withSyncCircuitBreaker](akka.pattern.CircuitBreaker#withSyncCircuitBreaker[T](body:=%3ET):T)]@java[@javadoc[callWithSyncCircuitBreaker](akka.pattern.CircuitBreaker#callWithSyncCircuitBreaker(java.util.concurrent.Callable))] and receives a method that is not wrapped in a @scala[`Future`]@java[`CompletionState`].

Using the @scala[@apidoc[CircuitBreaker](CircuitBreaker$)'s companion object @scaladoc[apply](akka.pattern.CircuitBreaker$#apply(scheduler:akka.actor.Scheduler,maxFailures:Int,callTimeout:scala.concurrent.duration.FiniteDuration,resetTimeout:scala.concurrent.duration.FiniteDuration):akka.pattern.CircuitBreaker)]@java[@javadoc[CircuitBreaker.create](akka.pattern.CircuitBreaker#create(akka.actor.Scheduler,int,java.time.Duration,java.time.Duration))] method
will return a @apidoc[CircuitBreaker] where callbacks are executed in the caller's thread.
This can be useful if the asynchronous @scala[@scaladoc[Future](scala.concurrent.Future)]@java[@javadoc[CompletionState](java.util.concurrent.CompletionStage)] behavior is unnecessary, for
example invoking a synchronous-only API.

@@@
The `CircuitBreaker` will execute all callbacks on the default system dispatcher.

### Control failure count explicitly

Expand All @@ -112,33 +111,43 @@ All methods above accept an argument `defineFailureFn`
Type of `defineFailureFn`: @scala[@scaladoc[Try[T]](scala.util.Try) => @scaladoc[Boolean](scala.Boolean)]@java[@javadoc[BiFunction](java.util.function.BiFunction)[@javadoc[Optional[T]](java.util.Optional), @javadoc[Optional](java.util.Optional)[@javadoc[Throwable](java.lang.Throwable)], @javadoc[Boolean](java.lang.Boolean)]]

@scala[This is a function which takes in a @scaladoc[Try[T]](scala.util.Try) and returns a @scaladoc[Boolean](scala.Boolean). The @scaladoc[Try[T]](scala.util.Try) correspond to the @scaladoc[Future[T]](scala.concurrent.Future) of the protected call.]
@java[The response of a protected call is modelled using @javadoc[Optional[T]](java.util.Optional) for a successful return value and @javadoc[Optional](java.util.Optional)[@javadoc[Throwable](java.lang.Throwable)] for exceptions.] This function should return `true` if the call should increase failure count, else false.
@java[The response of a protected call is modelled using @javadoc[Optional[T]](java.util.Optional) for a successful return value and @javadoc[Optional](java.util.Optional)[@javadoc[Throwable](java.lang.Throwable)] for exceptions.] This function should return `true` if the result of a call should increase the failure count, or `false` to not affect the count.

Scala
: @@snip [CircuitBreakerDocSpec.scala](/akka-docs/src/test/scala/docs/circuitbreaker/CircuitBreakerDocSpec.scala) { #even-no-as-failure }

Java
: @@snip [EvenNoFailureJavaExample.java](/akka-docs/src/test/java/jdocs/circuitbreaker/EvenNoFailureJavaExample.java) { #even-no-as-failure }
: @@snip [CircuitBreakerDocTest.java](/akka-docs/src/test/java/jdocs/circuitbreaker/CircuitBreakerDocTest.java) { #even-no-as-failure }

### Low level API

Instead of looking up a configured circuit breaker by name, it is also possible to construct it in the source code:

Scala
: @@snip [CircuitBreakerDocSpec.scala](/akka-docs/src/test/scala/docs/circuitbreaker/CircuitBreakerDocSpec.scala) { #manual-construction }

Java
: @@snip [CircuitBreakerDocTest.java](/akka-docs/src/test/java/jdocs/circuitbreaker/CircuitBreakerDocTest.java) { #manual-construction }

This also allows for creating the circuit breaker with a specific execution context to run its callbacks on.

The low-level API allows you to describe the behavior of the @apidoc[CircuitBreaker](CircuitBreaker) in detail, including deciding what to return to the calling @apidoc[Actor](Actor) in case of success or failure. This is especially useful when expecting the remote call to send a reply.
@apidoc[CircuitBreaker](CircuitBreaker) doesn't support `Tell Protection` (protecting against calls that expect a reply) natively at the moment.
Thus you need to use the low-level power-user APIs, @apidoc[succeed](CircuitBreaker) {scala="#succeed():Unit" java="#succeed()"} and @apidoc[fail](CircuitBreaker) {scala="#fail():Unit" java="#fail()"} methods, as well as @apidoc[isClosed](CircuitBreaker) {scala="#isClosed:Boolean" java="#isClosed()"}, @apidoc[isOpen](CircuitBreaker) {scala="#isOpen:Boolean" java="#isOpen()"}, @apidoc[isHalfOpen](CircuitBreaker) {scala="#isHalfOpen:Boolean" java="#isHalfOpen()"} to implement it.
Thus, you need to use the low-level power-user APIs, @apidoc[succeed](CircuitBreaker) {scala="#succeed():Unit" java="#succeed()"} and @apidoc[fail](CircuitBreaker) {scala="#fail():Unit" java="#fail()"} methods, as well as @apidoc[isClosed](CircuitBreaker) {scala="#isClosed:Boolean" java="#isClosed()"}, @apidoc[isOpen](CircuitBreaker) {scala="#isOpen:Boolean" java="#isOpen()"}, @apidoc[isHalfOpen](CircuitBreaker) {scala="#isHalfOpen:Boolean" java="#isHalfOpen()"} to implement it.

As can be seen in the examples below, a `Tell Protection` pattern could be implemented by using the @apidoc[succeed](CircuitBreaker) {scala="#succeed():Unit" java="#succeed()"} and @apidoc[fail](CircuitBreaker) {scala="#fail():Unit" java="#fail()"} methods, which would count towards the @apidoc[CircuitBreaker](CircuitBreaker) counts.
In the example, a call is made to the remote service if the @apidoc[breaker.isClosed](CircuitBreaker) {scala="#isClosed:Boolean" java="#isClosed()"}.
In the example, a call is made to the remote service if the breaker is closed or half open.
Once a response is received, the @apidoc[succeed](CircuitBreaker) {scala="#succeed():Unit" java="#succeed()"} method is invoked, which tells the @apidoc[CircuitBreaker](CircuitBreaker) to keep the breaker closed.
On the other hand, if an error or timeout is received we trigger a @apidoc[fail](CircuitBreaker) {scala="#fail():Unit" java="#fail()"}, and the breaker accrues this failure towards its count for opening the breaker.

@@@ note

The below example doesn't make a remote call when the state is *HalfOpen*. Using the power-user APIs, it is your responsibility to judge when to make remote calls in *HalfOpen*.

@@@

Scala
: @@snip [CircuitBreakerDocSpec.scala](/akka-docs/src/test/scala/docs/circuitbreaker/CircuitBreakerDocSpec.scala) { #circuit-breaker-tell-pattern }

Java
: @@snip [TellPatternJavaActor.java](/akka-docs/src/test/java/jdocs/circuitbreaker/TellPatternJavaActor.java) { #circuit-breaker-tell-pattern }
: @@snip [CircuitBreakerDocTest.java](/akka-docs/src/test/java/jdocs/circuitbreaker/CircuitBreakerDocTest.java) { #circuit-breaker-tell-pattern }

@@@ note

This example always makes remote calls when the state is *HalfOpen*. Using the power-user APIs, it is your responsibility to judge when to make remote calls in *HalfOpen*.

@@@
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
/*
* Copyright (C) 2023 Lightbend Inc. <https://www.lightbend.com>
*/

package jdocs.circuitbreaker;

import akka.Done;
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.AbstractBehavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.Receive;
import akka.pattern.CircuitBreaker;
import akka.pattern.StatusReply;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.function.BiFunction;

public class CircuitBreakerDocTest {

// note: config sample is in Scala CircuitBreakerDocSpec

interface ThirdPartyWebService {
CompletionStage<Done> call(String id, String value);
}

static // #circuit-breaker-usage
class DataAccess extends AbstractBehavior<DataAccess.Command> {

public interface Command {}

public static class Handle implements Command {
final String value;
final ActorRef<StatusReply<Done>> replyTo;

public Handle(String value, ActorRef<StatusReply<Done>> replyTo) {
this.value = value;
this.replyTo = replyTo;
}
}

private final class HandleFailed implements Command {
final Throwable failure;
final ActorRef<StatusReply<Done>> replyTo;

public HandleFailed(Throwable failure, ActorRef<StatusReply<Done>> replyTo) {
this.failure = failure;
this.replyTo = replyTo;
}
}

private final class HandleSuceeded implements Command {
final ActorRef<StatusReply<Done>> replyTo;

public HandleSuceeded(ActorRef<StatusReply<Done>> replyTo) {
this.replyTo = replyTo;
}
}

private final class CircuitBreakerStateChange implements Command {
final String newState;

public CircuitBreakerStateChange(String newState) {
this.newState = newState;
}
}

public static Behavior<Command> create(String id, ThirdPartyWebService service) {
return Behaviors.setup(
context -> {
// #circuit-breaker-initialization
CircuitBreaker circuitBreaker =
CircuitBreaker.lookup("data-access", context.getSystem());
// #circuit-breaker-initialization
return new DataAccess(context, id, service, circuitBreaker);
});
}

private final String id;
private final ThirdPartyWebService service;
private final CircuitBreaker circuitBreaker;

public DataAccess(
ActorContext<Command> context,
String id,
ThirdPartyWebService service,
CircuitBreaker circuitBreaker) {
super(context);
this.id = id;
this.service = service;
this.circuitBreaker = circuitBreaker;
}

@Override
public Receive<Command> createReceive() {
return newReceiveBuilder()
.onMessage(Handle.class, this::onHandle)
.onMessage(HandleSuceeded.class, this::onHandleSucceeded)
.onMessage(HandleFailed.class, this::onHandleFailed)
.build();
}

private Behavior<Command> onHandle(Handle handle) {
CompletionStage<Done> futureResult =
circuitBreaker.callWithCircuitBreakerCS(() -> service.call(id, handle.value));
getContext()
.pipeToSelf(
futureResult,
(done, throwable) -> {
if (throwable != null) {
return new HandleFailed(throwable, handle.replyTo);
} else {
return new HandleSuceeded(handle.replyTo);
}
});
return this;
}

private Behavior<Command> onHandleSucceeded(HandleSuceeded handleSuceeded) {
handleSuceeded.replyTo.tell(StatusReply.ack());
return this;
}

private Behavior<Command> onHandleFailed(HandleFailed handleFailed) {
getContext().getLog().warn("Failed to call web service", handleFailed.failure);
handleFailed.replyTo.tell(StatusReply.error("Dependency service not available"));
return this;
}

// #circuit-breaker-usage
public int luckyNumber() {
// #even-no-as-failure
BiFunction<Optional<Integer>, Optional<Throwable>, Boolean> evenNoAsFailure =
(result, err) -> (result.isPresent() && result.get() % 2 == 0);

// this will return 8888 and increase failure count at the same time
return circuitBreaker.callWithSyncCircuitBreaker(() -> 8888, evenNoAsFailure);
// #even-no-as-failure
}
// #circuit-breaker-usage
}
// #circuit-breaker-usage

public static class OtherActor {
public interface Command {}

public static class Call implements Command {
public final String payload;
public final ActorRef<StatusReply<Done>> replyTo;

public Call(String payload, ActorRef<StatusReply<Done>> replyTo) {
this.payload = payload;
this.replyTo = replyTo;
}
}
}

public // #circuit-breaker-tell-pattern
static class CircuitBreakingIntermediateActor
extends AbstractBehavior<CircuitBreakingIntermediateActor.Command> {

public interface Command {}

public static class Call implements Command {
final String payload;
final ActorRef<StatusReply<Done>> replyTo;

public Call(String payload, ActorRef<StatusReply<Done>> replyTo) {
this.payload = payload;
this.replyTo = replyTo;
}
}

private class OtherActorReply implements Command {
final Optional<Throwable> failure;
final ActorRef<StatusReply<Done>> originalReplyTo;

public OtherActorReply(
Optional<Throwable> failure, ActorRef<StatusReply<Done>> originalReplyTo) {
this.failure = failure;
this.originalReplyTo = originalReplyTo;
}
}

private class BreakerOpen implements Command {}

private final ActorRef<OtherActor.Command> target;
private final CircuitBreaker breaker;

public CircuitBreakingIntermediateActor(
ActorContext<Command> context, ActorRef<OtherActor.Command> targetActor) {
super(context);
this.target = targetActor;
// #manual-construction
breaker =
CircuitBreaker.create(
getContext().getSystem().classicSystem().getScheduler(),
// maxFailures
5,
// callTimeout
Duration.ofSeconds(10),
// resetTimeout
Duration.ofMinutes(1))
.addOnOpenListener(() -> context.getSelf().tell(new BreakerOpen()));
// #manual-construction
}

@Override
public Receive<Command> createReceive() {
return newReceiveBuilder()
.onMessage(Call.class, this::onCall)
.onMessage(OtherActorReply.class, this::onOtherActorReply)
.onMessage(BreakerOpen.class, this::breakerOpened)
.build();
}

private Behavior<Command> onCall(Call call) {
if (breaker.isClosed() || breaker.isHalfOpen()) {
getContext()
.askWithStatus(
Done.class,
target,
Duration.ofSeconds(11),
(replyTo) -> new OtherActor.Call(call.payload, replyTo),
(done, failure) -> new OtherActorReply(Optional.ofNullable(failure), call.replyTo));
} else {
call.replyTo.tell(StatusReply.error("Service unavailable"));
}
return this;
}

private Behavior<Command> onOtherActorReply(OtherActorReply otherActorReply) {
if (otherActorReply.failure.isPresent()) {
breaker.fail();
getContext().getLog().warn("Service failure", otherActorReply.failure.get());
otherActorReply.originalReplyTo.tell(StatusReply.error("Service unavailable"));
} else {
breaker.succeed();
otherActorReply.originalReplyTo.tell(StatusReply.ack());
}
return this;
}

private Behavior<Command> breakerOpened(BreakerOpen breakerOpen) {
getContext().getLog().warn("Circuit breaker open");
return this;
}
}
// #circuit-breaker-tell-pattern
}
Loading

0 comments on commit 69cf92f

Please sign in to comment.