Skip to content

Breaker

Richard Hightower edited this page Apr 13, 2016 · 5 revisions

A Breaker is short for Circuit Breaker. The idea behind the breaker is to wrap access to a service so that errors can be tracked and the circuit breaker can open if errors are exceeded. Like all things in Reakt there is an interface for Breaker that defines a contract but other implementations can get creative on how they detect the Breaker has been thrown.

Database is currently down

    /**
     * Reference to the db session which get connected to async.
     */
    private Breaker<Session> sessionBreaker = Breaker.opened();

//.. async connect to db and monitor connection health...

    private synchronized Breaker<Session> sessionBreaker() {
        return sessionBreaker;
    }
...
sessionBreaker()
     .ifOk(session -> {

        final ResultSetFuture resultSetFuture = session.executeAsync(
           QueryBuilder.insertInto(RESULTS_TABLE)
              .value("sequence", sequence.incrementAndGet())
              .value("rawData", snapshot.getSnapshot())
        );

       createPromiseFromResultSetFutureForStore(resultSetFuture, "Storing results snapshot");

     })
    .ifBroken(() -> callback.reject("Database connection is down, cannot store snapshot at the moment"));

Then you can add code to recover from this state using the reactor.

Here is a simple test to show how a broken breaker works.

Simple test showing how a broken breaker works

        AtomicBoolean okCalled = new AtomicBoolean();

        AtomicBoolean brokenCalled = new AtomicBoolean();

        final Breaker<Object> broken = Breaker.broken();
        broken.ifOk(o -> okCalled.set(true));

        broken.ifBroken(() -> brokenCalled.set(true));

        assertFalse(okCalled.get());
        assertTrue(broken.isBroken());
        assertTrue(brokenCalled.get());
        assertFalse(broken.isOk());

        broken.cleanup(o -> {
        });

Here is a simple test to show how an operational breaker works.

Simple test to show how an operational breaker works.

        AtomicBoolean okCalled = new AtomicBoolean();
        AtomicBoolean brokenCalled = new AtomicBoolean();
        final Breaker<Object> ok = Breaker.operational(new Object());
        ok.ifOk(o -> okCalled.set(true));
        ok.ifBroken(() -> brokenCalled.set(true));


        assertEquals(0, ok.errorCount());
        assertTrue(okCalled.get());
        assertFalse(ok.isBroken());
        assertTrue(ok.isOk());

        try {
            ok.ifOk(o -> {
                throw new IllegalStateException("ack");
            });
            fail();
        } catch (Exception ex) {

        }

        assertEquals(1, ok.errorCount());

The idea is that there can be context aware service wrappers for certain types of breaker that could detect when/if the breaker is open.

You can also specify a limit of allowed errors as follows:

Showing simple test of allowed limits for the breaker.

        AtomicBoolean okCalled = new AtomicBoolean();
        AtomicBoolean brokenCalled = new AtomicBoolean();
        final Breaker<Object> ok = Breaker.operational(new Object(), 10);
        ok.ifOk(o -> okCalled.set(true));
        ok.ifBroken(() -> brokenCalled.set(true));


        assertEquals(0, ok.errorCount());
        assertTrue(okCalled.get());
        assertFalse(ok.isBroken());
        assertTrue(ok.isOk());

        try {
            ok.ifOk(o -> {
                throw new IllegalStateException("ack");
            });
            fail();
        } catch (Exception ex) {

        }

        assertEquals(1, ok.errorCount());

        assertFalse(ok.isBroken());


        for (int index = 0; index < 20; index++) {
            try {
                ok.ifOk(o -> {
                    throw new IllegalStateException("ack");
                });
            } catch (Exception ex) {

            }
        }

        assertEquals(10, ok.errorCount());

        assertTrue(ok.isBroken());

A fuller example is coming. We are still refining the Breaker interface but we found that we were using it a lot but using the Expected interface (and before that the Optional interface) to do what we do with the Breaker.

Here is the full API for the Breaker interface.

Breaker

package io.advantageous.reakt;

import io.advantageous.reakt.impl.BreakerImpl;

import java.util.function.Consumer;


/**
 * Represents a Circuit Breaker.
 * The contained service can be broken (open circuit) or operational (closed circuit).
 * <p>
 * This represents a service which may or may not be available.
 * </p>
 * We were using Expected a lot where we really wanted something like a Breaker.
 * <p>
 * This could be extended to blow the circuit with different conditions by providing
 * your own Breaker.
 * </p>
 * <p>
 * Also we want to use interfaces for all core concepts.
 * </p>
 * In addition we wanted callback for ifBroken and ifOperational.
 * <p>
 * If a service is active and healthy, {@code isOperational()} will return {@code true}.
 * If a service is not healthy or not working, {code isBroken()} will return {@code true}.
 * </p>
 * This is heavily modeled after {@code Expected} optional.
 */
public interface Breaker<T> {
    /**
     * Common instance for {@code broken()}.
     */
    Breaker OPENED = new BreakerImpl<>();

    /**
     * Returns an empty {@code Breaker} instance.  No service is present for this
     * value.
     *
     * @param <T> Type of the non-existent value
     * @return an empty {@code ExpectedImpl}
     */
    static <T> Breaker<T> broken() {
        @SuppressWarnings("unchecked")
        Breaker<T> t = OPENED;
        return t;
    }

    /**
     * Returns an {@code Breaker} using the specified present value, which must not be null.
     *
     * @param <T>   the class of the value
     * @param value the value to be present. Must be non-null
     * @return an {@code ExpectedImpl} with the value present
     * @throws NullPointerException if value is null
     */
    static <T> Breaker<T> operational(T value) {
        return new BreakerImpl<>(value);
    }

    /**
     * Returns an {@code Breaker} using the specified present value, which must not be null.
     *
     * @param <T>            the class of the value
     * @param value          the value to be present. Must be non-null
     * @param maxErrorsCount max error count
     * @return an {@code ExpectedImpl} with the value present
     * @throws NullPointerException if value is null
     */
    static <T> Breaker<T> operational(T value, final int maxErrorsCount) {
        return new BreakerImpl<>(value, maxErrorsCount);
    }


    /**
     * Return {@code true} if the service is broken, otherwise {@code false}.
     *
     * @return {@code true} if the service is broken, otherwise {@code false}
     */
    boolean isBroken();


    /**
     * Return {@code true} if the service is working, otherwise {@code false}.
     *
     * @return {@code true} if the service is working, otherwise {@code false}.
     */
    boolean isOperational();

    /**
     * Short version of isOperational.
     *
     * @return ok
     */
    default boolean isOk() {
        return isOperational();
    }

    /**
     * If a service is beleived to be working, invoke the consumer with the value.
     * <p>
     * This tracks errors thrown by the consumer.
     *
     * @param consumer executed if a value is present
     * @return this, fluent API
     * @throws NullPointerException if value is present and {@code consumer} is
     *                              null
     */
    Breaker<T> ifOperational(Consumer<? super T> consumer);


    /**
     * Short version of ifOperational.
     * If a service is beleived to be working, invoke the consumer with the value.
     *
     * @param consumer executed if a value is present
     * @return this, fluent API
     * @throws NullPointerException if value is present and {@code consumer} is
     *                              null
     */
    default Breaker<T> ifOk(Consumer<? super T> consumer) {
        return ifOperational(consumer);
    }

    /**
     * If a service is broken, invoke the runnable.
     *
     * @param runnable executed if a value is not present
     * @return this, fluent API
     */
    Breaker<T> ifBroken(Runnable runnable);

    /**
     * If a service is broken but present, invoke the consumer.
     * This is used to do clean up, like closing a connection.
     *
     * @param consumer executed if a value is not present
     * @return this, fluent API
     */
    Breaker<T> cleanup(Consumer<? super T> consumer);

    /**
     * @return number of errors detected.
     */
    long errorCount();

}
You can’t perform that action at this time.