Skip to content

Reactor

Richard Hightower edited this page Jul 24, 2016 · 4 revisions

The Reactor is a class that enables

  • callbacks that execute in caller's thread (thread safe, async callbacks)
  • tasks that run in the caller's thread
  • repeating tasks that run in a caller's thread
  • one shot after time period tasks that run in the caller's thread

The reakt Reactor is a lot like the QBit Reactor or the Vert.x context. It allows you to enable tasks that run in that actors or verticles thread.

The reakt Reactor creates replay promises. Replay promises execute in the same thread as the caller. They are "replayed" in the callers thread.

QBit implements a service actor model (similar to Akka type actors), and Vert.x implements a Reactor model (like Node.js).

QBit, for example ensures that all method calls are queued and handled by the service/actor thread. You can also use the QBit Reactor to ensure that callbacks happen on the same thread as the caller. This allows you callbacks to be thread safe. The Reakt Reactor is a drop in replacement for QBit Reactor except that the Reakt Reactor uses Reakt Promises, async Results and Callbacks. QBit 2 and Conekt will use Reakt's API and not its own.

You can use the Reakt Reactor with RxJava, Vert.x, or Spring Reactor and other similar minded projects to manage repeating tasks, tasks, and callbacks on the same thread as the caller (which you do not always need to do).

The Reactor is just an interface so you could replace it with an optimized version.

Reactor Methods of note

Here is a high level list of Reactor methods.

  • addRepeatingTask(interval, runnable) add a task that repeats every interval
  • runTaskAfter(afterInterval, runnable) run a task after an interval expires
  • deferRun(Runnable runnable) run a task on this thread as soon as you can
  • static reactor(...) create a reactor
  • all(...) create a promise that does not async return until all promises async return. (you can pass a timeout)
  • any(...) create a promise that does not async return until one of the promises async return. (you can pass a timeout)
  • process process all tasks, callbacks.

Here is the Reactor interface that can be implemented by anyone.

Reactor

package io.advantageous.reakt.reactor;

import io.advantageous.reakt.promise.Promise;
import io.advantageous.reakt.reactor.impl.ReactorImpl;

import java.time.Duration;
import java.util.List;

/**
 * Ensures that tasks, repeating tasks and callbacks run in the callers thread.
 * Used with actor service models like QBit, Vertx, etc.
 */
public interface Reactor {

    /**
     * Creates a default reactor.
     *
     * @return a reactor
     */
    static Reactor reactor() {
        return new ReactorImpl(Duration.ofSeconds(30), System::currentTimeMillis);
    }

    /**
     * Creates a default reactor with timeout.
     *
     * @param timeout timeout
     * @return a reactor
     */
    static Reactor reactor(final Duration timeout) {
        return new ReactorImpl(timeout, System::currentTimeMillis);
    }

    /**
     * Creates a default reactor with timeout and timesource.
     *
     * @param timeout    timeout
     * @param timeSource time source
     * @return a reactor
     */
    static Reactor reactor(final Duration timeout, final TimeSource timeSource) {
        return new ReactorImpl(timeout, timeSource);
    }

    /**
     * Create a promise.
     * After you create a promise you register its then(...) and catchError(...) and then you use it to
     * handle a callback.
     * <p>
     * Creates a replay promise that is managed by this Reactor.
     *
     * @param <T> type of result
     * @return new promise
     */
    <T> Promise<T> promise();

    /**
     * All promises must complete.
     *
     * @param promises promises
     * @return return containing promise
     */
    Promise<Void> all(final Promise<?>... promises);


    /**
     * All promises must complete.
     *
     * @param timeout  timeout
     * @param promises promises
     * @return return containing promise
     */
    Promise<Void> all(final Duration timeout,
                      final Promise<?>... promises);

    /**
     * All promises must complete.
     *
     * @param promises promises
     * @param <T>      types of promise
     * @return return containing promise
     */
    <T> Promise<Void> all(final List<Promise<T>> promises);

    /**
     * All promises must complete.
     *
     * @param timeout  timeout
     * @param promises promises
     * @param <T>      types of promise
     * @return return containing promise
     */
    <T> Promise<Void> all(final Duration timeout,
                          final List<Promise<T>> promises);

    /**
     * Any promises must complete.
     *
     * @param promises promises
     * @return return containing promise
     */
    Promise<Void> any(final Promise<?>... promises);


    /**
     * Any promises must complete.
     *
     * @param timeout  timeout
     * @param promises promises
     * @return return containing promise
     */
    Promise<Void> any(final Duration timeout,
                      final Promise<?>... promises);

    /**
     * All promises must complete.
     *
     * @param promises promises
     * @param <T>      types of promise
     * @return return containing promise
     */
    <T> Promise<Void> any(final List<Promise<T>> promises);

    /**
     * All promises must complete.
     *
     * @param timeout  timeout
     * @param promises promises
     * @param <T>      types of promise
     * @return return containing promise
     */
    <T> Promise<Void> any(final Duration timeout,
                          final List<Promise<T>> promises);


    /**
     * Add a repeating task that will run every interval
     *
     * @param interval duration of interval
     * @param runnable runnable to run.
     */
    void addRepeatingTask(final Duration interval, final Runnable runnable);

    /**
     * Add a task that will run once after the interval.
     *
     * @param afterInterval duration of interval
     * @param runnable      runnable to run.
     */
    void runTaskAfter(final Duration afterInterval, final Runnable runnable);

    /**
     * Run on this Reactor's thread as soon as you can.
     *
     * @param runnable runnable
     */
    void deferRun(Runnable runnable);

    /**
     * Allows the reactor to process its tasks, and promises (callbacks).
     */
    void process();
}
You can’t perform that action at this time.