Skip to content

The framework provides DSL for easy and modular construction of asynchronous processes from simpler constructs. The framework is mostly targeted to IO-bound processes and it is not intended for CPU-bound processes.

master
Switch branches/tags
Go to file
Code

Latest commit

 

Git stats

Files

Permalink
Failed to load latest commit information.
Type
Name
Latest commit message
Commit time
 
 
 
 
 
 
 
 
 
 
 
 

AsyncFlows Framework 0.1.2-SNAPSHOT

Project information

Licensing

The framework is licensed under copyleft MIT license. This could change if the project is migrated to other project groups (Apache, Eclipse, or something else). There is currently no legal framework to accept project contributions in form of the code. However, bug reporting, experimental forking, and other feedback is welcome. The project might migrate to some project groups to solve this problem.

Caution
Currently, project is considered in the Alpha phase and there could be backward incompatible changes even for minor versions.

Code Samples

The code samples in this guide are usually taken from unit tests of the framework. So they are directly executable. If copied in own code, one usually need to resolve some static imports that are used by DSL.

Distribution

The project source repository is currently available on github.

Snapshot releases are available at repository OSS Sonatype repository.

Important
The framework currently requires Java 11.

The latest stable version is 0.1.1 is available from maven central.The following project artifacts are released:

            <!-- Core library -->
            <dependency>
                <groupId>org.asyncflows</groupId>
                <artifactId>asyncflows-core</artifactId>
                <version>0.1.1</version>
            </dependency>
            <!-- IO library -->
            <dependency>
                <groupId>org.asyncflows</groupId>
                <artifactId>asyncflows-io</artifactId>
                <version>0.1.1</version>
            </dependency>
            <!-- HTTP protocol support -->
            <dependency>
                <groupId>org.asyncflows</groupId>
                <artifactId>asyncflows-protocol-http</artifactId>
                <version>0.1.1</version>
            </dependency>

This document contains information about the current version 0.1.2-SNAPSHOT, and it might mention unreleased features. For the document corresponding to the latest stable version refer to the following link.

History

The project has started at 2007 on Java under name AsyncObjects. There were many iterations trying to find suitable DSL syntax for Java. Then there were experimental project branches AsyncGroovy and AsyncScala that were attempts to build DSL using closures, the experience gathered resulted in the current project restart firstly using inner classes, and then using the Java 8 syntax.

Framework Foundations

The concept described in this section are foundations of the framework. While they are foundation, the user of the framework rarely interacts with them directly, so do not assume that code samples here are anything like what you would see in application.Like with real building, foundations are mostly stay hidden from the sight.

Vats

A Vat is Executor that has the following guarantees:

  1. It executes events in order that was sent

  2. It executes only one event at each time

  3. During execution it is possible to get the current Vat

These guarantees allow avoiding a lot of concurrency issues and organize asynchronous processes a lot easier.

The concept of the vat is taken from E programming language, from which many ideas were borrowed in this framework.

While a vat is handling events, it specifies itself in thread context. So it is available with Vat.current(). Asynchronous operations in the framework generally inherit Vat as execution context, unless the executor is specified explicitly.

There is a special cached thread pool with daemon threads that is used for daemon vats Vats.daemonVat().

There are following vats in the core library (there are also some vats in additional libraries):

  • Vat - abstract class for all vats

  • AWTVat - vat over AWT event queue

  • BatchedVat - abstract vat that executes event in batches

  • ExecutorVat - a vat that runs over some executor. Note, that this vat occupies executor thread only when there are events to handle. If there are no events, no threads are occupied. Vat re-schedule itself after a batch of events are processed even if there are still events in the queue in order to give other vats of over the same executor a chance to process their events.

  • SingleThreadVatWithIdle - an abstract vat that occupies one thread and need to periodically poll events from an external source (for example NIO events).

  • SingeThreadVat - a vat that occupies the entire thread and can be stopped. This vat is usually used in unit tests and to start application on the main thread.

For example, the vat could be used like the following, if more high-level constructs could not be used otherwise.

        final Cell<Vat> result = new Cell<>(); // create holder for value
        final SingleThreadVat vat = new SingleThreadVat(null); // create vat
        vat.execute(() -> { // schedule event
            result.setValue(Vat.current()); // save current vat value
            vat.stop(null); // stop vat execution
        });
        assertNull(result.getValue()); // check that it is not executed yet
        vat.runInCurrentThread(); // start vat and execute event
        // vat is stopped
        assertSame(vat, result.getValue()); // get vat value

It is rarely needed to use vat directly.The typical cases are:

  • Application setup

  • Library or framework code

Promise

Promise is similar in a role to CompletableFuture that provides additional restrictions compared with CompletableFuture. It does not support get() operation directly to discourage it, and it does not permit changing result in midway.

A Promise could be wrapped into CompletableFuture, and it could be created from any CompletableStage (including CompletableFuture), when it is needed to integrate with external services. Operations on Promise are created to encourage correct usage of it.

The promise outcome is represented by Outcome class that has Failure and Success subclasses. If promise is not resolved, its outcome is null.

Linked with Promise is AResolver interface, that could act as a listener to a promise, and to specify an outcome for Promise. Only other way to specify an outcome for a promise is to pass it to the constructor of promise.

There are three versions of method that adds listener to promise:

  • listenSync(AResolver) - adds the listener for Promise that is notified in the execution context where promise is resolved. This method should be only used, if listener already has appropriate synchronizations or asynchronous event delivery implemented (for example, a resolver for other promise).

  • listen(AResolver) - adds the listener for Promise that is notified in the context of default executor where a listener is registered.

There are also some utility methods on the promise that help its usage and contain some optimizations.

  • flatMap - converts value when a promise is successful with AFunction

  • flatMapOutcome - converts outcome when promise is resolved with AFunction

  • flatMapFailure - maps failure with AFunction in case if promise failed (like try-catch in Java).

  • finallyDo - execute some code when promise finishes with any outcome (like try-finally in Java).

  • map - converts value when promise is successful with Function

  • mapOutcome - converts outcome when promise is resolved with Function

There are few more utility methods.

These functions are executed immediately, if result is already available. If not, they will be executed after promise is resolved, using the vat associated with the current thread.

Structured Asynchronous Programming

The core concept of the framework is asynchronous operation.Asynchronous operation is a sequence of logically grouped execution of the events in some events loops that that finish with some outcome (or just finish for one-way operations).

Asynchronous operators are static methods that usually return Promise and start with the prefix 'a' (for example aValue).The operations are supposed to be imported using static import to form a DSL in the programming language.

The structured programming constructs are inspired by combining ideas from two sources:

Asynchronous Functions

The most of the operators expect lambdas are arguments. These function interfaces are located at package org.asyncflows.core.function. These functions return Promise.

  • ASupplier - the suppler interface (analog of Supplier)

  • AFunction - the single argument function interface (analog of Function)

  • AFunction2 - the two argument function interface (analog of BiFunction)

  • AFunction3 - the three argument function interface

  • AFunction4 - the four argument function interface

  • AOneWayAction - the one-way action for which result is ignored ('Runnable', but with exception)

Asynchronous Context

While much of the framework functionality is able to work w/o current vat, it is best to provide a context vat. The simplest way to do so is using AsyncContext class to create temporary local context to implement some operation.

Integer i = doAsync(() -> aValue(42));
assertEquals(42, i);

The operation above creates SingeThreadedVat, run it on the current thread, and then stops vat when Promise is done with success or failure. If it is done with success, operation exits with value, otherwise it throws AsyncExecutionException.

Trivial Operations

Trivial operations are just different way to construct promise. Generally, the code should not need to create promise directly, except for few DSL cases. Use promise construction operation instead. All these trivial operations are implemented in Promise class as they are mostly factory methods for it.

aValue(42) // resolved promise that holds specified value
aFailure(new NullPointerException) // failed promise
aNull() // promise holding null
aVoid() // null promise with Void type.
aTrue() // promise holding true
aFalse() // promise holding false
aResolver(r -> r.accept(null, new NullPointerException())) // return promise, and to some things with resolver in body
aNow(()->aValue(a * b)) // evaluate body and return promise (if body failed, return failed promise)
aLater(()->aValue(a * b)) // evaluate on later turn in default vat
aLater(vat, ()->aValue(a * b)) // evaluate on later turn in the specified vat
aNever() // the process that never ends

Note, aNow looks like useless operation, but it is actually used quite often. An expression that returns a promise might result in the following:

  1. Return unresolved promise

  2. Return promise resolved with some outcome (failure or value)

  3. Return null

  4. Throw an exception

If we want to register listener on the result of operation, the fist two cases are not much different. The listener will be called either immediately, or later when promise is resolved. The last two cases are significantly different. They will cause listener to be never called, and the listeners will be never be called. This will case operation to hang up, if these two exceptional cases are not handled specially. The operator aNow reduces these two cases to a promise with failure outcome. So, there is no need to handle these cases specially. This greatly simplifies the code. There still could be problem in case of StackOverflowException or OutOfMemoryError, but most of the asynchronous frameworks will have problems with these failures as well.

Sequential Processes

All sequential controls method now require that they should be running in the context of the vat.

aNow and aSeq Operator

The sequential fow in AsyncFlows is organized using operations on Promise.

The operator aSeq is just alias for aNow operator. It is used to indicate that there is some chain of the sequential operations on promise. It also wraps the first expression that returns promise, so it is shown on the same block level as subsequent operations.

The following test demonstrate its usage:

        final ArrayList<Integer> list = new ArrayList<>();
        final int rc = doAsync(() ->
                aSeq(() -> {
                    list.add(1);
                    return aValue(1);
                }).flatMap(value -> {
                    list.add(value + 1);
                    throw new IllegalStateException();
                }).thenFlatGet(() -> {
                    // never called
                    list.add(-1);
                    return aValue(-1);
                }).flatMapFailure(value -> {
                    assertEquals(IllegalStateException.class, value.getClass());
                    list.add(3);
                    return aValue(42);
                }).finallyDo(() -> {
                    list.add(4);
                    return aVoid();
                }));
        assertEquals(42, rc);
        assertEquals(Arrays.asList(1, 2, 3, 4), list);

Simple Loops

The simplest loop is aSeqWhile.This loop is executed while its body returns true.

        final int rc = doAsync(() -> {
            final int[] sum = new int[1];
            final int[] current = new int[1];
            return aSeqWhile(() -> {
                sum[0] += current[0];
                current[0]++;
                return aBoolean(current[0] <= 4);
            }).thenFlatGet(() -> aValue(sum[0]));
        });
        assertEquals(10, rc);

There is also the Maybe type in the framework that represent the optional value.Differently from Java Optional, the Maybe type could hold any value including null value.It also could be serialized, passed as parameter etc.

It is possible to iterate until the value is available with this aSeqUntilValue loop.

        final int rc = doAsync(() -> {
            final int[] sum = new int[1];
            final int[] current = new int[1];
            return aSeqUntilValue(() -> {
                sum[0] += current[0];
                current[0]++;
                return current[0] <= 4 ? aMaybeEmpty() : aMaybeValue(sum[0]);
            });
        });
        assertEquals(10, rc);

Collections Loops

It is possible to iterate over collections using iterator:

        final int rc = doAsync(() -> {
            final int[] sum = new int[1];
            return aSeqForUnit(Arrays.asList(0, 1, 2, 3, 4), value -> {
                sum[0] += value;
                return aTrue();
            }).thenFlatGet(() -> aValue(sum[0]));
        });
        assertEquals(10, rc);

It is also possible to supply iteration values to collector, but in that case it is not possible to abort the loop:

        final int rc = doAsync(() ->
                aSeqForCollect(Stream.of(1, 2, 3, 4),
                        e -> aValue(e + 1),
                        Collectors.summingInt((Integer e) -> e))
        );
        assertEquals(14, rc);

The more advanced collection processing could be done by the stream framework.

Simultaneous Processes

Sequential execution is not that interesting in asynchronous context. More interesting is case when asynchronous operations overlap. It could happen in the context of the same event loop. AsyncFlows provides a number of methods to organize simultaneous asynchronous activity.

aAll Operator

The simplest form is aAll operator. The operator starts all its branches on the current vat on the current turn and executes the operation map(…​) when all branches are finished. If some branch thrown exception, the operator throws an error, but it will still wait for all branches to complete.

        final Tuple2<String, Integer> rc = doAsync(() ->
                aAll(
                        () -> aValue("The answer")
                ).and(
                        () -> aLater(() -> aValue(42))
                ).map((a, b) -> aValue(Tuple2.of(a, b))));
        assertEquals(Tuple2.of("The answer", 42), rc);

It is possible to return tuple from all arguments directly using Last suffix on the last branch.

        final Tuple2<String, Integer> rc = doAsync(() ->
                aAll(
                        () -> aValue("The answer")
                ).andLast(
                        () -> aLater(() -> aValue(42))
                ));
        assertEquals(Tuple2.of("The answer", 42), rc);

Processing Collections

Basic operation for iterating collection, streams, and iterators is aAllForCollect operators.

        final int rc = doAsync(() ->
                aAllForCollect(Stream.of(1, 2, 3, 4),
                        e -> aValue(e + 1),
                        Collectors.summingInt((Integer e) -> e))
        );
        assertEquals(14, rc);

It processes all branches in interleaving on the current event loop. Then summarize them using supplied collector.

The more advanced collection processing could be done by the stream framework.

Parallel Processes

If aAll is replaced with aPar in the previous section, then we will get parallel operations provided by the framework. By default, each branch is executed on the own new daemon vat. However, is possible to customize execution by providing an implementation of ARunner interface.

        final Tuple2<String, Integer> rc = doAsync(() ->
                aPar(
                        () -> aValue("The answer")
                ).and(
                        () -> aLater(() -> aValue(42))
                ).map((a, b) -> aValue(Tuple2.of(a, b))));
        assertEquals(Tuple2.of("The answer", 42), rc);

This is applicable to all other aAll operators.

Alternative Processing

The alternative processing is done using aAny operator. This operator starts all branches on the current turn and waits for the first branch to complete with error or success. The aAny operator is intended for error handling and querying alternative sources of information.

        int value = doAsync(() ->
                aAny(
                        () -> aLater(() -> aValue(1))
                ).orLast(
                        () -> aValue(2)
                )
        );
        assertEquals(2, value);
        try {
            doAsync(() ->
                    aAny(
                            () -> aLater(() -> aValue(1))
                    ).orLast(
                            () -> aFailure(new RuntimeException())
                    )
            );
            fail("Unreachable");
        } catch (AsyncExecutionException ex) {
            assertEquals(RuntimeException.class, ex.getCause().getClass());
        }

There is also execution mode that the aAny operator tries to wait for successful result if possible.

        int value = doAsync(() ->
                aAny(true,
                        () -> aLater(() -> aValue(1))
                ).orLast(
                        () -> aFailure(new RuntimeException())
                )
        );
        assertEquals(1, value);

The other feature of aAny operator is handling of the branches that did not reach output of aAny operator. This is important when the aAny operator opens resources that are required to be closed.Or when exceptions from failed branches need to be logged.

The sample below demonstrates usage of suppressed(…​) and suppressedFailure(…​) that could be used to receive the abandoned results. This might be used for logging and cleaning up resources. Note, these operations will be called after aAny operator promise will be resolved. In some cases vat might be already stopped at that points, so these operations might be never executed. Use these operations with care or on the vats which cannot be stopped (like daemon vat’s).

        Tuple3<Integer, Throwable, Integer> t = doAsync(
                () -> {
                    Promise<Throwable> failure = new Promise<>();
                    Promise<Integer> suppressed = new Promise<>();
                    return aAll(
                            () -> aAny(true,
                                    () -> aLater(() -> aValue(1))
                            ).or(
                                    () -> aValue(2)
                            ).or(
                                    () -> aFailure(new RuntimeException())
                            ).suppressed(v -> {
                                notifySuccess(suppressed.resolver(), v);
                            }).suppressedFailureLast(ex -> {
                                notifySuccess(failure.resolver(), ex);
                            })
                    ).and(
                            () -> failure
                    ).andLast(
                            () -> suppressed
                    );
                }
        );
        assertEquals(2, t.getValue1().intValue());
        assertEquals(RuntimeException.class, t.getValue2().getClass());
        assertEquals(1, t.getValue3().intValue());

Cancellation

The Cancellation utility class is an application of the aAny operator.

In some cases it is needed to fail the entire process if some operation has failed. For example, if one asynchronous operation has already failed, the related operations need also fail.

For that purpose, framework contains Cancellation utility class. The class monitor results of operations.

Sometimes, an operation returns the resource that require cleanup (for example open connection). In that case ignoring resource is not a valid option. For that purpose there is cleanup operation.

Let’s consider a case when we have some consumer, and some provider of values. For that purpose, we will use queue components, that will be explained later in that guide. We will assume that provider fail, so consumer might fail to receive expected value that would terminate processing. In that case, we would like to consumer to fail as well. For example:

        ArrayList<Integer> list = new ArrayList<>();
        doAsync(() -> {
            SimpleQueue<Integer> queue = new SimpleQueue<>();
            Cancellation cancellation = new Cancellation();
            return aAll(
                    // () -> aSeqWhile(() -> queue.take().map(t -> {
                    () -> aSeqWhile(() -> cancellation.run(queue::take).map(t -> {
                        if (t == null) {
                            return false;
                        } else {
                            list.add(t);
                            return true;
                        }
                    }))
            ).andLast(
                    () -> aSeq(
                            () -> queue.put(1)
                    ).thenFlatGet(
                            () -> queue.put(2)
                    ).thenFlatGet(
                            // pause
                            () -> aSeqForUnit(rangeIterator(1, 10), t -> aLater(() -> aTrue()))
                    ).thenFlatGet(
                            () -> cancellation.run(() -> aFailure(new RuntimeException()))
                    )
            ).mapOutcome(o -> {
                assertTrue(o.isFailure());
                assertEquals(RuntimeException.class, o.failure().getClass());
                return true;
            });
        });
        assertEquals(Arrays.asList(1, 2), list);

If we do queue reading like in commented out line, the test will hang up, because the consumer will never receive the value, because supplier failed. But, in uncommented line, we wrap call to queue.take() into cancellation runner. This allows us to fail all executions of cancellation that are active or will be active. Inside the call of cancellation.run(…​) there is any operator against common promise, if any of the cancellation.run(…​) fails, that promise fails as well. Otherwise, it stays in unresolved state.

Context Propagation

Some API requires propagation of the context and setting the context for action execution. The best option would be passing it via implicit or explicit arguments, but in some cases it is not practical, particularly in case of integration with different frameworks that rely on thread-local variables to keep contextual information.

To simplify handling of such cases AsyncFlows provides Context API. The context API allows is automatic propagation of context ot most actions.

Basic operation aLater(…​) and aSend(…​) support such propagation, and most of control constructs are using them. So it is recommended to rely on them when you are creating own DSL operations.

Context Object

The class Context is the basic element of the context propagation functionality. The context construction starts with Context.empty() then elements could be added with context.with(…​) and context.withPrivate(…​) entries, and removed with context.without(…​) entries.

The context has two operators for establishing the context: context.setContext() that returns Context.Cleanup that could be used in 'try with resources' Java statement.

final Context test = ...;
try (Cleanup ignored = test.setContext()) {
    action.doIt();
}

The method context.run(…​) that set context runs runnable and then close context cleanup.

test.run(() -> {
    action.doIt();
});

There are private and public entries in the context. The public entries are added using context.with(…​) operator, and they could be later removed with context.without(…​) operation.

Each such entry is associated with key of the type ContextKey. The context values could be added later with the method context.getOrNull(…​) and other get methods.

The keys are created as the following:

    private static final ContextKey<String> TEST_KEY = ContextKey.get(ContextTest.class, "test");

The type parameter of key is the type of the value associated with context. Then the key could be used to access and add context values.

final Context test = Context.empty().with(TEST_KEY, "test");
assertNull(Context.current().getOrNull(TEST_KEY));
test.run(() -> {
    assertEquals("test", Context.current().getOrNull(TEST_KEY));
    test.without(TEST_KEY).run(() -> {
        assertNull(Context.current().getOrNull(TEST_KEY));
    });
    assertEquals("test", Context.current().getOrNull(TEST_KEY));
});
assertNull(Context.current().getOrNull(TEST_KEY));

Context is an immutable object, and each modification of the context return a new instance of the context. However, context entries could contain mutable objects (for example logging MDC) When new instance of the context is established, the old instance is completely rolled back.

Active Context Entries

Some context entries require modification of the thread when context is established: setting thread local state, modifying security context, setting context class loader, joining or leaving transactions, etc.

To support such contextual entries an interface ActiveContextEntry was introduced. When context with such value is activated, the method Cleanup setContextInTheCurrentThread() is invoked. The returned value is used to return to the previous state of the context. The convention is that such state should be equal to the previous state, for example, the previous state of ThreadLocal should be set.

See the definition of ContextClassLoaderEntry for example of typical ActiveContextEntry. As a convention, such entries should provide the static with*(…​) methods that return UnaryOperator<Context> that could be passed to context.transform(…​) method, instead of requiring adding such entries explicitly. This allows hiding implementation details like keys and initialization of initial values. For example:

public static UnaryOperator<Context> withSavedContextClassloader() {
    return withContextClassloader(Thread.currentThread().getContextClassLoader());
}

public static UnaryOperator<Context> withContextClassloader(ClassLoader newClassLoader) {
    return c -> c.with(KEY, new ContextClassLoaderEntry(newClassLoader));
}

Such method could be used later as the following:

final Context test1 = Context.empty().transform(withContextClassloader(classLoader));

Sometimes it is not practical or possible to require creation of the separate key for context entries, as context entries could have own identity (for example, ThreadLocal). Such entries exist only for establishing the thread context, and there is no meaningful textual names for such objects. To support such entries, the interface PrivateContextEntry was introduced. It is possible to add it context (or replace with in a new instance of context), but it is not possible to create a context with such entry removed.

Instead of the explicit key, such entries should implement the method Object identity() that returns identity object for context entry, this identity object will be used as a key (it will be compared by the operator ==). For thread local context entry, such entry will return a reference to thread local itself. See ThreadLocalEntry as an example of such context entry.

The package org.asyncflows.core.context.util contains a number of useful active context entries that could be used as examples.

Asynchronous Operations

While context framework does not depend on the rest of the AsyncFlows framework, and it could be used independently, the AsyncFlows framework integrates with it and provide some ready to use control constructs.

There are two versions of the contextual execution, one that gets ready to use context, and one that updates the context.

doAsync(() -> {
    assertNull(MDC.get("k"));
    return inContext(withMdcEntry("k", "v"), () -> { // updates current context using
        assertEquals("v", MDC.get("k"));             // the function return from withMdcEntry
        return inContext(Context.empty(), () -> { // explicitly passed context
            assertNull(MDC.get("k")); // value is not set, because it is running in empty context
            return aVoid();
        });
    }).thenGet(() -> {
        assertNull(MDC.get("k"));
        return null;
    });
});

It is also possible to create contexts asynchronously, if establishing context requires contacting some external service. The same sample in the asynchronous version:

doAsync(() -> {
    assertNull(MDC.get("k"));
    return inContext(c -> aValue(c.transform(withMdcEntry("k", "v")))).run(() -> aLater(() -> {
        assertEquals("v", MDC.get("k"));
        return inContext(() -> aValue(Context.empty())).run(() -> {
            assertNull(MDC.get("k"));
            return aVoid();
        }).thenGet(() -> {
            assertEquals("v", MDC.get("k"));
            return aVoid();
        });
    })).thenGet(() -> {
        assertNull(MDC.get("k"));
        return null;
    });
});

Safety frames

The asynchronous operations generally do not own data, and many changes could happen to data when there is a simultaneous processing on it.

Generally, the code should be written that data invariant should be maintained while single closure is executed. Other closures represent code that might be executed after something has changed.

If there is no explicit fork like (aPar, aSed/aLater to other vat, calls to components), the mutable data could be assumed to be safe to use from vat as vat context would not switch while operation is in progress. The thread might be different, but there will be write/read barriers for the new thread.

If callback is passed to other Vat, it usually need to be exported in order to be executed in this Vat context with the same safety guarantees using FunctionExporter or other way.

Object-Oriented Programming

As we have seen in the previous section, the framework support rich set of asynchronous operators that support functional and structured asynchronous programming. And, the framework also supports creation of asynchronous components, so normal object-oriented programming could be used as well.

Classes and Interfaces

The asynchronous interface is normal Java interface that has methods that return Promise or void. The other types of methods could present on the interface, but they will not be supported by the runtime, and they will throw an exception. Let’s consider a simple Queue interface:

@Asynchronous
public interface ATestQueue<T> {
    Promise<T> take();
    void put(T element);
}

The method put(…​) is one way, the method is one-way is just for demonstration here. AQueue component in the library returns Promise<Void> because there might be errors on put operations. The method take() returns the Promise as it might need to wait until some value is available. By convention, the interface names start with 'A' to indicate that is an asynchronous interface.

public class TestQueue<T> implements ATestQueue<T>, ExportableComponent<ATestQueue<T>> {
    private final Deque<T> elements = new LinkedList<>();
    private final Deque<AResolver<T>> resolvers = new LinkedList<>();

    private void invariantCheck() {
        // checks that queue invariant holds
        if(!elements.isEmpty() && !resolvers.isEmpty()) {
            throw new RuntimeException("BUG: one of the collections should be empty");
        }
    }

    @Override
    public Promise<T> take() {
        invariantCheck();
        if (elements.isEmpty()) {
            return aResolver(r -> {
                resolvers.addLast(r);
            });
        } else {
            return aValue(elements.removeFirst());
        }
    }

    @Override
    public void put(final T element) {
        invariantCheck();
        if (resolvers.isEmpty()) {
            elements.addLast(element);
        } else {
            notifySuccess(resolvers.removeFirst(), element);
        }
    }

    @Override
    public ATestQueue<T> export(final Vat vat) {
        return exportTestQueue(vat, this);
    }
}

The basic idea of the implementation is that we have two queues, queue of values and queue of waiters for value. Only one of the queues could contain values at the same time.

The method take() just returns the value if value is available, but if value is not available, it returns not resolved promise and saves resolver to queue of resolvers.

The method put(…​) checks if there is some resolver and if there is, the waiter is notified and value is supplied to requester. Otherwise, the value is saved. If invariant of put method fails, the error will be logged by AsyncFlows framework, but caller will not receive it. This is why one-way methods should be generally avoided.

The class also implements interface ExportableComponent. This interface indicates that class is not safe to use outside of the vat, and it should be generally exported. The proxies could be written manually or APT code generator could be used to generate proxies.

The exporter could be written manually, and would look like this:

    public static <T> ATestQueue<T> exportTestQueue(final ATestQueue<T> service, final Vat vat) {
        return new ATestQueue<T>() {
            @Override
            public Promise<T> take() {
                return aLater(vat, () -> service.take());
            }

            @Override
            public void put(T element) {
                aOneWay(vat, () -> put(element));
            }
        };
    }

Let’s test this method:

        final int rc = doAsync(() -> {
            final ATestQueue<Integer> queue = new TestQueue<Integer>().export();
            return aAll(() -> aSeqForUnit(rangeIterator(0, 10), i -> {
                queue.put(i + 1);
                return aTrue();
            })).and(() -> aSeqForCollect(rangeIterator(0, 10),
                    i -> queue.take(),
                    Collectors.summingInt((Integer i) -> i))
            ).selectValue2();
        });
        assertEquals((11 * 10) / 2, rc);

Asynchronous Proxy Generator

The AsyncFlows framework includes annotation processor for generating proxies. This annotation processor is used for generating proxies for all asynchronous interfaces in the framework.

To enable annotation processor, add it as optional dependency like the following:

        <dependency>
            <groupId>org.asyncflows</groupId>
            <artifactId>asyncflows-apt</artifactId>
            <version>0.1.1</version>
            <optional>true</optional>
        </dependency>

The annotation processor will generate proxies for all interfaces with @Asynchronous annotation.The implementation will be generated only for non-default interface methods.

The generated proxy will look like the following:

@javax.annotation.Generated("org.asyncflows.apt.AsynchronousProxyProcessor")
public final class ATestQueueProxyFactory implements java.util.function.BiFunction<org.asyncflows.core.vats.Vat, java.lang.Object, java.lang.Object>, org.asyncflows.core.util.AsynchronousService {
    public static final ATestQueueProxyFactory INSTANCE = new ATestQueueProxyFactory();

    /**
     * Create a proxy.
     *
     * @param vat     the vat
     * @param service the service to export
     * @param <T> a type parameter
     * @return the exported service
     */
    public static <T> org.asyncflows.core.util.sample.ATestQueue<T> createProxy(org.asyncflows.core.vats.Vat vat, org.asyncflows.core.util.sample.ATestQueue<T> service) {
        return new ATestQueueAsyncProxy<T>(vat, service);
    }

    /**
     * Create a proxy.
     *
     * @param vat     the vat
     * @param service the service to export
     * @param <T> a type parameter
     * @return the exported service
     */
    public <T> org.asyncflows.core.util.sample.ATestQueue<T> export(org.asyncflows.core.vats.Vat vat, org.asyncflows.core.util.sample.ATestQueue<T> service) {
        return createProxy(vat, service);
    }

    @Override
    @SuppressWarnings("unchecked")
    public java.lang.Object apply(org.asyncflows.core.vats.Vat vat, java.lang.Object service) {
        return createProxy(vat, (org.asyncflows.core.util.sample.ATestQueue) service);
    }

    @javax.annotation.Generated("org.asyncflows.apt.AsynchronousProxyProcessor")
    private static final class ATestQueueAsyncProxy<T> implements org.asyncflows.core.util.sample.ATestQueue<T> {
        private final org.asyncflows.core.vats.Vat vat;
        private final org.asyncflows.core.util.sample.ATestQueue<T> service;

        private ATestQueueAsyncProxy(final org.asyncflows.core.vats.Vat vat, final org.asyncflows.core.util.sample.ATestQueue<T> service) {
            java.util.Objects.requireNonNull(vat);
            java.util.Objects.requireNonNull(service);
            this.vat = vat;
            this.service = service;
        }

        @Override
        public int hashCode() {
            return System.identityHashCode(service);
        }

        @Override
        public boolean equals(java.lang.Object o2) {
            return this == o2 || (o2 != null && o2.getClass() == getClass() && ((ATestQueueAsyncProxy)o2).service == this.service);
        }

        @Override
        public org.asyncflows.core.Promise<T> take() {
            return org.asyncflows.core.CoreFlows.aLater(this.vat, () -> this.service.take());
        }

        @Override
        public void put(T element) {
            org.asyncflows.core.CoreFlows.aOneWay(this.vat, () -> this.service.put(element));
        }
    }
}

The rules are the following:

  • The default interface methods are not delegated, and the default implementation is used. These methods are supposed to provide utility services.

  • The methods that are returning Promise are delegated to the Proxy’s vat using aLater operator.

  • The methods that are returning void are delegated to the Proxy’s vat using aOneWay operator.

  • Other methods just throw a UnsupportedOperationException

Garbage Collection Consideration

The framework objects are generally garbage collected by Java. There is no need to perform explicit cleanup for them, if they do not hold any sensitive resources like IO streams.

The object is prevented from garbage collection in the following cases:

  • There is a direct reference to object or its proxy

  • There is an event on the queue that references the object

  • There is a listener registered to some uncompleted promise, that is held by the external listener. This usually means that there is some asynchronous operation is in progress.

Generally, the rules for garbage collection are the same as for normal Java code. But we also need to consider promise chains as a call stack. So references held by promises should be considered as stack references to objects.

The vat object is shared between many AsyncFlows objects and asynchronous operators. The Vat might need to be stopped. However, this usually apply to Vats that occupy thread like SelectorVat or SingleThreadVat. Even for these vats starting/stopping is handled by the utility methods doAsync(…​) and SelectorVatUtil.run(…​).

Concurrency Considerations

It is assumed that asynchronous operations do not invoke blocking functionality. So many simultaneous asynchronous operations will safely take their turns on the single queue. However, it is not always so as some operations require calls of non-asynchronous API or to perform CPU-intensive operations.

CPU-bound operations should be generally delegated to the ForkJoin pool (aForkJoinGet(…​)). IO-bound synchronous operations should be delegated to daemon thread pool (aDaemonGet(…​)). If you are in doubt, just send it to daemon pool. There are utilities that start operations on corresponding pools using vats.These operations do not establish asynchronous context on corresponding pools, so they are quite lightweight and suitable to invocation of some synchronous method.

If asynchronous context need to be established, it is better to use aLater(Vats.daemonVat(), …​) or aLater(Vats.forkJoinVat(), …​). These operations will create a new vats that runs over corresponding pools.

Request Queue

In the queue sample, the asynchronous operations are written in the way, that no new problems will happen if method will be called before some previous method finishes. In Java synchronous code this is usually handled by the statement 'synchronized'. In this framework similar functionality is provided by RequestQueue. The biggest difference from Java synchronization is that nested invocations of request queue are not supported. Other major difference is that this utility class is indendent for use from single vat, so it should not be exposed outside of the asynchronous components.

The basic method of RequestQueue is run(ASupplier<T>), this method has some utility variants like runSeqWhile(…​). This method executes method if request queue is empty and no method is executing currently, and suspends execution putting it to the queue if there is some execution in progress. So it is some kind of private event queue, but more flexible. There are also suspend/resume utility methods that are analogs of Java wait/notify.

As example, lets consider Semaphore implementation similar to Java Semaphore class.

public interface ASemaphore {
    void release(int permits);
    void release();
    Promise<Void> acquire();
    Promise<Void> acquire(int permits);
}

The class in the library is implemented like the following:

public final class Semaphore implements ASemaphore, ExportableComponent<ASemaphore> {
    private final RequestQueue requests = new RequestQueue();
    private int permits;

    public Semaphore(final int permits) {
        this.permits = permits;
    }

    @Override
    public void release(final int releasedPermits) {
        if (releasedPermits <= 0) {
            return;
        }
        permits += releasedPermits;
        requests.resume();
    }

    @Override
    public void release() {
        release(1);
    }

    @Override
    public Promise<Void> acquire() {
        return acquire(1);
    }

    @Override
    public Promise<Void> acquire(final int requestedPermits) {
        if (requestedPermits <= 0) {
            return aFailure(new IllegalArgumentException("The requestedPermits must be positive: " + requestedPermits));
        }
        return requests.runSeqWhile(() -> {
            if (requestedPermits <= permits) {
                permits -= requestedPermits;
                return aFalse();
            } else {
                return requests.suspendThenTrue();
            }
        });
    }

    @Override
    public ASemaphore export(final Vat vat) {
        return UtilExporter.export(vat, this);
    }
}

The method acquire(…​) needs to be ordered to implement FIFO ordering. Some parts of the method do not need to be protected, and we can check input as we please. The rest of method is the protected loop. In the loop we check if there are permits available, and if they are, we just stop loop and this cause promise returned by run method to resolve as well. However, if they are not available, we suspend execution, and we repeat operation when suspend ends.

The operation release(…​) does not need to be ordered. So it is not protected by request queue. The release method invokes requests.resume() to notify acquire(…​) requests that new permits were added. The promise returned from suspend resolves on it, and the acquisition loop continues. New amount of permits might be sufficient or not. It is decided in the context of the operation acquire(…​). If there is no acquire operation pending, the resume operation is doing nothing.

Let’s see how it works in test:

        final ArrayList<Integer> result = new ArrayList<>();
        final Void t = doAsync(() -> {
            final ASemaphore semaphore = new Semaphore(0).export();
            //noinspection Convert2MethodRef
            return aAll(() ->
                            aSeq(
                                    () -> semaphore.acquire().listen(o -> result.add(1))
                            ).thenFlatGet(
                                    () -> semaphore.acquire(3).listen(o -> result.add(2))
                            ).thenFlatGet(
                                    () -> semaphore.acquire().listen(o -> result.add(3))
                            )
            ).andLast(() ->
                    aSeq(
                            () -> aForRange(0, 10).toVoid()
                    ).thenFlatGet(() -> {
                        result.add(-1);
                        semaphore.release(2);
                        return aVoid();
                    }).thenFlatGet(
                            () -> aForRange(0, 10).toVoid()
                    ).thenFlatGet(() -> {
                        result.add(-2);
                        semaphore.release();
                        return aVoid();
                    }).thenFlatGet(
                            () -> aForRange(0, 10).toVoid()
                    ).thenFlatGet(() -> {
                        result.add(-3);
                        semaphore.release(3);
                        return aVoid();
                    }));
        });
        assertSame(null, t);
        assertEquals(Arrays.asList(-1, 1, -2, -3,  2, 3), result);

Troubleshooting

Logging

The framework uses slf4j for logging. All exceptions that are received during listener notification are logged on the debug level. If you do not receive some events for some reason, you could try enabling debug logging for the framework.

A good logging could greatly help troubleshooting the applications.

Trace Provider

The execution trace of asynchronous operations is difficult to record. In the framework, it is possible to enable call tracing for the application using system property:

org.asyncflows.core.trace.provider=EXCEPTION

If this property is enabled, the stack trace will look like the following:

java.lang.IllegalStateException: Test
	at org.asyncflows.core.CoreFlowsTest.lambda$null$3(CoreFlowsTest.java:51)
	at org.asyncflows.core.CoreFlows.aNow(CoreFlows.java:191)
	at org.asyncflows.core.CoreFlows.lambda$null$2(CoreFlows.java:256)
	at org.asyncflows.core.vats.BatchedVat.runBatch(BatchedVat.java:148)
	at org.asyncflows.core.vats.SingleThreadVatWithIdle.runInCurrentThread(SingleThreadVatWithIdle.java:63)
	at org.asyncflows.core.AsyncContext.doAsyncOutcome(AsyncContext.java:69)
	at org.asyncflows.core.AsyncContext.doAsync(AsyncContext.java:82)
	... 55 more
	Suppressed: org.asyncflows.core.trace.PromiseTraceExceptionProvider$PromiseTraceException
		at org.asyncflows.core.PromiseTraceExceptionProvider.recordTrace(PromiseTraceExceptionProvider.java:102)
		at org.asyncflows.core.Promise.<init>(Promise.java:92)
		at org.asyncflows.core.CoreFlows.aResolver(CoreFlows.java:171)
		at org.asyncflows.core.CoreFlows.aLater(CoreFlows.java:255)
		at org.asyncflows.core.CoreFlows.aLater(CoreFlows.java:268)
		at org.asyncflows.core.CoreFlowsTest.lambda$testThrowLater$4(CoreFlowsTest.java:50)
		at org.asyncflows.core.CoreFlows.aNow(CoreFlows.java:191)
		at org.asyncflows.core.AsyncContext.lambda$doAsyncOutcome$1(AsyncContext.java:65)
		... 59 more

The exception org.asyncflows.core.PromiseTraceExceptionProvider$PromiseTraceException is entry created by the exception trace provider.This provider is quite expensive from CPU perspective as it creates an exception for each unresolved promise, so it is suggested to use it only during problem investigation.

This feature is experimental.It is also possible to write own trace providers.Refer to interface PromiseTraceProvider for more information.

Debugging Considerations

When debugging, the stack trace is not available directly, but it is still possible to examine asynchronous stack by starting from resolvers passed from upper contexts. The Java saves variables in Java objects referenced by lambdas. If trace feature is enabled, it is also possible to find out stack trace for location where promise was created.

So the debugging is more difficult, but it is still possible using framework.

Library

Streams

Streams library is similar to Java stream library, but there are some key differences.The first obvious difference is that asynchronous streams provide asynchronous stream access operations.The second difference is API design.

Pull Streams

Asynchronous streams provide two lean interfaces and there is no intention to provide additional operations here.

public interface AStream<T> extends ACloseable {
    Promise<Maybe<T>> next();
}

public interface ASink<T> extends ACloseable {
    Promise<Void> put(T value);
    Promise<Void> fail(Throwable error);
    Promise<Void> finished();
}

The stream operations like map, flatMap, filter, and others are provided by stream builders.Work with StreamBuilder typically starts with some AsyncStreams class method like aForRange or aForStream.Stream building starts in pull mode.So all elements will be processed sequentially.The stream builder supports typical stream operations like map, filter, flatMap, leftFold, and collect.These operations accept asynchronous operations instead of synchronous ones.

        final int rc = doAsync(() ->
                aForRange(0, 11)
                        .filter(i -> aBoolean(i % 2 == 0))
                        .map(i -> aValue(i / 2))
                        .collect(Collectors.summingInt(e -> e))
        );
        assertEquals(15, rc);

Some methods also have the variant Sync that accepts Java functional interfaces.

        final int rc = doAsync(() ->
                aForRange(0, 11)
                        .filterSync(i -> i % 2 == 0)
                        .mapSync(i -> i / 2)
                        .collect(Collectors.summingInt(e -> e))
        );
        assertEquals(15, rc);

It is also possible to specify processing window. This window is basically prefetch buffer for a sequential stream. If several stages take long time, it is reasonable to start processing next records at advance up to specified limit. The example below specifies that exactly one element is pre-fetched. The sample is also shows usage of process(…​) method that could be used to implement reusable parts of processing pipeline.

        final Function<StreamBuilder<Integer>, StreamBuilder<Integer>> delay =
                s -> s.map(a -> aForRange(0, 10).toVoid().thenValue(a));
        List<Integer> result = new ArrayList<>();
        final int rc = doAsync(() ->
                aForRange(0, 10)
                        .filter(i -> aBoolean(i % 2 == 0))
                        .mapSync(a -> {
                            result.add(a);
                            return a;
                        })
                        .window(1)
                        .process(delay)
                        .mapSync(a -> {
                            result.add(-a);
                            return a;
                        })
                        .map(i -> aValue(i / 2))
                        .collect(Collectors.summingInt(e -> e))
        );
        assertEquals(10, rc);
        assertEquals(Arrays.asList(0, 2, -0, 4, -2, 6, -4, 8, -6, -8), result);

'All' Streams

The all stream process values in the same way, but the difference is that all steps between .all() call and final processing of values (or switch to pull()) are always processed, even in case of failures.This allows to ensure processing of group of objects even in case of failures.For example, to close a collection of streams, even if close operation on some of them fail.

Like for aAll* operators, the processing done is parallel for all elements.However, it is possible to limit amount of parallel processing using .window(n) call.In that case only several elements will be processed at the same time.This might be useful if the task is taxing on resources.

        final int rc = doAsync(() ->
                aForRange(0, 11)
                        .all(2)
                        .filterSync(i -> i % 2 == 0)
                        .mapSync(i -> i / 2)
                        .collect(Collectors.summingInt(e -> e))
        );
        assertEquals(15, rc);

Note, while each stage is parallel, the current implementation waits until previous element was passed to next stage before passing element to next stage. This might introduce delays to processing, but maintain the same order as pull stream processing. More optimized solution might be developed later.

Working with resources

Stream is closeable resource, and it is possible to work with a stream and other closeable resources with aTry statement similar to Java language try statement. The try statement accepts resource references, promises for resource references, and actions that open resources. Then it closes resource after it has been used. Let’s define a simple resource.

    public static class SampleResource implements ACloseable, ExportableComponent<ACloseable> {
        private final Cell<Boolean> closed;

        public SampleResource(final Cell<Boolean> closed) {
            this.closed = closed;
        }

        @Override
        public Promise<Void> close() {
            closed.setValue(true);
            return aVoid();
        }

        @Override
        public ACloseable export(final Vat vat) {
            return () -> ResourceUtil.closeResource(vat, SampleResource.this);
        }
    }

This resource just support close action.Also, to support work with resources there are classes CloseableBase and ChainedCloseableBase that simplify creating resource wrappers.Now, we could try different options of working with resources:

        final Cell<Boolean> r1 = new Cell<>(false);
        final Cell<Boolean> r2 = new Cell<>(false);
        final Cell<Boolean> r3 = new Cell<>(false);
        doAsync(() -> aTry(
                () -> aValue(new SampleResource(r1).export())
        ).andChain(
                value -> aValue(new SampleResource(r2).export())
        ).andChainSecond(
                value -> aValue(new SampleResource(r3).export())
        ).run((value1, value2, value3) -> aVoid()));
        assertTrue(r1.getValue());
        assertTrue(r2.getValue());
        assertTrue(r3.getValue());

Up to three resources could be opened with one aTry operator. However, it is also possible to nest aTry operators, so previously opened resources are accessible in lexical scope.

IO Library

Core IO

The IO library is also built upon lean interfaces and different operations built upon it. The following are core interfaces of the library:

public interface AInput<B extends Buffer> extends ACloseable {
    Promise<Integer> read(B buffer);
}
public interface AOutput<B extends Buffer> extends ACloseable {
    Promise<Void> write(B buffer);
    Promise<Void> flush();
}
public interface AChannel<B extends Buffer> extends ACloseable {
    Promise<AInput<B>> getInput();
    Promise<AOutput<B>> getOutput();
}

As you could see, these interfaces are suitable for both character IO and byte IO. Some operations that work with these interfaces are [generic](asyncflows-io/src/main/java/org/asyncflows/io/IOUtil.java).

The following functionality is supported out of the box:

Network Library

There are two implementations of socket library based on traditional blocking sockets and selector library. The later an implementation based on asynchronous sockets is planned to be tested.

Implementation based on traditional blocking sockets API sometimes hangs on Windows, so it is not recommended to use if runtime also supports selector sockets. This implementation is left only backward compatibility with non-complete Java runtimes.

The sockets are just byte channels with few additional operators, and they support the same operations. However, there are few additional operations.

public interface ASocket extends AChannel<ByteBuffer> {
    Promise<Void> setOptions(SocketOptions options);
    Promise<Void> connect(SocketAddress address);
    Promise<SocketAddress> getRemoteAddress();
    Promise<SocketAddress> getLocalAddress();
}
public interface AServerSocket extends ACloseable {
    Promise<SocketAddress> bind(SocketAddress address, int backlog);
    Promise<SocketAddress> bind(SocketAddress address);
    Promise<Void> setDefaultOptions(SocketOptions options);
    Promise<SocketAddress> getLocalSocketAddress();
    Promise<ASocket> accept();
}
public interface ASocketFactory {
    Promise<ASocket> makeSocket();
    Promise<AServerSocket> makeServerSocket();
    Promise<ADatagramSocket> makeDatagramSocket();
}
public interface ADatagramSocket extends ACloseable {
    Promise<Void> setOptions(SocketOptions options);
    Promise<Void> connect(SocketAddress address);
    Promise<Void> disconnect();
    Promise<SocketAddress> getRemoteAddress();
    Promise<SocketAddress> getLocalAddress();
    Promise<SocketAddress> bind(SocketAddress address);
    Promise<Void> send(ByteBuffer buffer);
    Promise<Void> send(SocketAddress address, ByteBuffer buffer);
    Promise<SocketAddress> receive(ByteBuffer buffer);
}

These interfaces could be used in the way similar to traditional synchronous code. See echo server and echo client as examples.

TLS support

TLS implementation relies on Java SSLEngine for asynchronous processing, so it follows all restrictions enforced by it.Note, SSL protocols are not supported by Java’s SSLEngine anymore, so the framework stick with TLS name.

The TLS implementation is just a ASocketFactory that wraps other socket factory.Interfaces are the same as for sockets with two additional operations on the socket:

public interface ATlsSocket extends ASocket {
    Promise<Void> handshake();
    Promise<SSLSession> getSession();
}

First one allows initiating handshake, the second one allows accessing session and examining certificates.

There are no TLS related parameters on TlsSocket factory, instead there are a factory methods for SSLEngine which allow configuring needed parameters for SSLEngine before using it in the processing:

public class TlsSocketFactory implements ASocketFactory, ExportableComponent<ASocketFactory> {
    public void setServerEngineFactory(final AFunction<SocketAddress, SSLEngine> serverEngineFactory) {
       ...
    }
    public void setClientEngineFactory(final AFunction<SocketAddress, SSLEngine> clientEngineFactory) {
        ...
    }
}

These factories need to configure TLS parameters basing on SocketAddress.It is expected, that different TlsSocketFactory instances will be used for different security contexts.

HTTP 1.1 support

The framework provides experimental support for HTTP 1.1 protocol on client and server side. The code is currently more like low-level protocol implementation rather than ready to use application server.The neither side is finished, but it could be experimented with. HTTPS is not implemented at the moment.

See [unit test](asyncflows-protocol-http/src/test/java/org/asyncflows/protocol/http/core) for sample code.

How framework addresses the typical problems

Back pressure

Many asynchronous libraries have a back pressure problem. When one source of data provides more data than consumer might consume. Some frameworks did not have a solution for the problem (like Netty before 4.0), some introduce unnatural solutions like disabling/enabling reading (like Vert.x and modern Netty), some hide it inside framework (like Akka), or provide a separate event listeners for channels (like Apache HttpCore Async 5.x).

However, there is no such problem with synchronous io in Java, as streams block if nothing could be written to it:

long length = 0;
byte[] b = new byte[4096]
while(true)  {
   int c = in.read(b)
   if(c < 0) {
      break;
   }
   length += c;
   out.write(b, 0, c);
}
return length;

That is practically all.Back pressure propagates naturally via blocking.No more data will be read, if write is not complete.If there is error, it will be propagated to caller.

The framework provides practically the same approach.There is no explicit backpressure control. The output stream is accepting request, and return to caller when it is finished processing it, including sending data to downstream.

    public final Promise<Long> copy(final AInput<ByteBuffer> input, final AOutput<ByteBuffer> output, int bufferSize) {
        ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
        final long[] result = new long[1];
        return aSeqWhile(
                () -> input.read(buffer).flatMap(value -> {
                    if (isEof(value)) {
                        return aFalse();
                    } else {
                        result[0] += +value;
                        buffer.flip();
                        return output.write(buffer).thenFlatGet(() -> {
                            buffer.compact();
                            return aTrue();
                        });
                    }
                })
        ).thenGet(() -> result[0]);
    }

There are more code as asynchronous operations need to be handled and working with buffers is more complex than with arrays, but still it is very similar to what is written for synchronous streams.

Such way of handling back pressure does not necessary limit parallelism.It is possible to use features of the framework to ensure that reads and writes are done in parallel when it makes sense.

    public static Promise<Long> copy(final AInput<ByteBuffer> input, final AOutput<ByteBuffer> output, int buffers, int bufferSize) {
        final SimpleQueue<ByteBuffer> readQueue = new SimpleQueue<>();
        final SimpleQueue<ByteBuffer> writeQueue = new SimpleQueue<>();
           final Cancellation cancellation = cancellation();
        for (int i = 0; i < buffers; i++) {
            readQueue.put(ByteBuffer.allocate(bufferSize));
        }
        final long[] result = new long[1];
        return aAll(
                () -> aSeqWhile(
                           () -> cancellation.run(readQueue::take).flatMap(
                                   b -> cancellation.run(() -> input.read(b)).flatMap(c -> {
                            if (isEof(c)) {
                                writeQueue.put(null);
                                return aFalse();
                            } else {
                                result[0] += c;
                                writeQueue.put(b);
                                return aTrue();
                            }
                        }))
                )
        ).and(
                () -> aSeqWhile(
                           () -> cancellation.run(writeQueue::take).flatMap(b -> {
                            if(b == null) {
                                return aFalse();
                            } else {
                                b.flip();
                                   return cancellation.run(() -> output.write(b)).thenGet(() -> {
                                    b.compact();
                                    readQueue.put(b);
                                    return true;
                                });
                            }
                        })
                )
        ).map((a, b) -> aValue(result[0]));
    }

In the provided sample, the read operation uses buffers to read when available, and writes when buffer with data is available. So if writes are slower or reads are slower, the algorithm will adapt to the speed. This algorithm makes sense with no more than four buffers, as one buffer is for reading, one for writing, and two are in flight over the queue.

Control flow inversion

Most of asynchronous libraries require inversion of control flow. Most of asynchronous frameworks use concepts like decoders and encoders. These are two poor things that have to implement explicit tracking of the current state of reading or writing. If there is a recursive state like xml or json, they have to keep the explicit stack of state.

The biggest problem with such approach is that such code is not readable as state of the process does not match state of the code. This is exactly the same problem that is mentioned is the famous article Edsger W. Dijkstra "Go To Statement Considered Harmful". There is excellent analysis of that article that translates the article to more modern context: David R. Tribble "Go To Statement Considered Harmful: A Retrospective". Control flow inversion causes the same problem as it was described by Edsger W. Dijkstra:

"" My second remark is that our intellectual powers are rather geared to master static relations and that our powers to visualize processes evolving in time are relatively poorly developed. For that reason we should do (as wise programmers aware of our limitations) our utmost to shorten the conceptual gap between the static program and the dynamic process, to make the correspondence between the program (spread out in text space) and the process (spread out in time) as trivial as possible. ""
— Edsger W. Dijkstra
Go To Statement Considered Harmful

It is very hard to understand what is happening in the process and to what states it could go by analysis of the code.It is much simpler when control flow is evident from the code structure.AsyncFlows library provide such flow.

I would say that direct event sending to some queue or actor is similar to "go to" operator in programming languages.At least it has the same properties.

"" The unbridled use of the go to statement has an immediate consequence that it becomes terribly hard to find a meaningful set of coordinates in which to describe the process progress.Usually, people take into account as well the values of some well chosen variables, but this is out of the question because it is relative to the progress that the meaning of these values is to be understood!With the go to statement one can, of course, still describe the progress uniquely by a counter counting the number of actions performed since program start (viz. a kind of normalized clock).The difficulty is that such a coordinate, although unique, is utterly unhelpful.In such a coordinate system it becomes an extremely complicated affair to define all those points of progress where, say, n equals the number of persons in the room minus one! ""
— Edsger W. Dijkstra
Go To Statement Considered Harmful

If we have event handlers, that are to keep own state, we also do no have a context, that helps us to understand context.We need to consider all events to be possible at every moment of time.The pain is real. For example of pain of Actor programming paradigm cased by event sending in Erlang context, one could watch the presentation Death by Accidental Complexity.While presentation is using Erlang sample, the problems described are common for many other technologies, particularly Actor-based.

"" * The whole matrix needs to be revisited if messages/features are added or removed * What we do in each cell is by no means obvious - depends on history * What to do when unexpected message arrives in a transition state is practically never specified (we must invent some reasonable response.) * Abstraction is broken, encapsulation is broken * Code reuse becomes practically impossible ""
— Ulf Wiger
Death by Accidental Complexity (slide Apparent Problems at 26:49)

The core of the problem is the same as what was described by Edsger W. Dijkstra for "go to": the code structure does not mach control flow structure, so we could not reason about application state by reading code. The solution to the problem is also the same: structured asynchronous programming. While Ulf Wiger identifies the problem correctly in the presentation, the proposed solution looks like poor man semi-structured programming using event filtering.

Inter-Process Communications

The AsyncFlows framework is intended to implement control flow inside the application. There is no special means to organize inter-process communications. However, the libraries could be used to organize such communications. For example, JAX-RS 2.0 supports asynchronous invocations in client and server contexts.

The most of inter-process communication protocols are currently based on the language and application independent meta-protocols, where exact choices it depends on the context. Most popular now are HTTP-based protocols, and additional transports like Web Sockets are also getting popular in some contexts. In some specific situations even memory-mapped files works well. As for message formats, there is a wide range of them starting from XML and JSON to ASN.1 and protobuf.

Prescribing a specific solution is not practical in the current situation. The framework is designed in the way that allows implementing most of such solutions over it. If there is a ready to use asynchronous API, the framework might reuse it with some wrappers.

Java EE support

TBD

Spring Framework

TBD

Servlets

TBD

JAX-RS

TBD

Java EE Security Manager

The framework uses an own thread pool in some cases, and it could be incompatible with Java EE when a security manager is enabled. Turn off security manager or add appropriate permissions for your application. Also, the contextual security checks are not so valid in the asynchronous context, and they could be break important assumptions about security if Java EE components are called.

The contextual security information like active user should be passed as parameters, and it needs to be reestablished before invocation of Java EE functionality that requires it (for example Hibernate audit support). Some technologies have support for re-establishing context (like AsyncContext.run(Runnable) for servlets).

Comparison with other frameworks

Actors (Akka)

Comparing with Scala actors, there are the following key points of difference.

  1. In the AsyncFlows framework, component and event queue are separated and one queue could support many small components. Practically, there is at least one one asynchronous micro-component for each asynchronous operation.In Scala, there are only one asynchronous component for each event queue.This leads to problems with resource management as state of component need to be tracked.

  2. Event dispatch in Akka is done explicitly and each queue supports only closed set of events. There is no interfaces for components and even returning result is different each time.(TypedActors try to solve problem of explicit dispatch, but introduce own set of the problems due to blocking calls, and also still support only closed set of events). AsyncFlows support open set of events, as they translate to Runnable anyway.As many components could leave

  3. Actors are heavy-weight as they are integrated with event queue.They also need to be deleted explicitly to free resources.By comparison, AsyncFlows do not manage components explicitly, as they could garbage collected normally. Some Vats needs to be managed explicitly, but these vats are usually used as application starting point in the main thread or have the same lifetime as application (NIO). ExecutorVat does not need to be explicitly stopped (the underlying executor needs to be stopped, but daemon executor creates and frees threads as needed and does not need to be stopped).

  4. As Akka Actors work with event queue directly, it is possible handle events not in the order they were sent to actor. AsyncFlows insists on handling events in the order they are received by a vat.Reordering of event handling still could be done by utility classes like RequestQueue.

Generally, AsyncFlows support more flexible management of asynchronous components, and their relationship to event queues. Also, AsyncFlows support running the entire network application in the single thread, while Akka requires multiple threads by design.

CompletableFuture

Java’s CompletableFuture is similar to AsyncFlows Promise. CompletableFuture has a lot of utility methods that implement much of functionality similar to provided by the AsyncFlows framework. However, AsyncObjects Framework shifts this functionality from Promise to operators that are built upon Promise (operation builders, static methods). The difference is small, but it greatly affects usability as AsyncFlows does not need a lot of methods since many methods could be replaced by combination of existing method.

There was an experimental version of the framework that used CompletableFuture as foundation instead of promise. However, this version proved to be less usable, as it is more complex to listen for events, for example it is not possible just to listen to CompletableFuture w/o creating another completable future. Also, the defaults for execution context are different. The framework defaults to the current Vat. The CompletableFuture defaults to ForkJoin pool. This pool is generally not acceptable for IO operations, and IO could block it for indefinite time. Small errors could lead to application blocking. Practically all invocations on CompletableFuture required explicit specification of target vat.

AsyncFlows also has a lot of utility methods, that do not make sense as CompletableFuture API. For example, loops, request queues, cancellation.

Also, CompletableFuture does not have component model. It is just a single class w/o larger contexts. When and how asynchronous method is executed is left up to component designer.

Netty

The netty is organized as multi-stage event processing.It works very well when uniform processing is needed. The problem is that most of processing that is needed is non-uniform.There are generally recursive logical asynchronous processes built upon event streams.Netty requires implementing such processes using explicit stacks and other means.

In contrast, AsyncFlows allows to freely use recursion when needed, just like in normal synchronous code. There is no need for inversion of control.

Up to recent versions of Netty, the netty did not support back pressure regulation, and because of event notification approach, there were no natural way to specify it.The current way is still cumbersome.

On other hand, netty contains implementation of many network protocols.And it makes sense to reuse these implementations from AsyncFlows.There is plan to create a library that access Netty channels from AsyncFlows framework.

Reactive Programming

The reactive programming is higher-level and more narrow paradigm than what is targeted by this framework. So it does not make sense to compare them directly. However, the concepts from reactive programming could be relatively easily implemented using framework constructs. The reactive programming mixes several concepts together, i.e. data stream processing and tracking/propagating changes. These are somewhat different tasks, and have different data processing needs, for example with tracking changes there is no problem to drop intermediate changes, but for processing data streams this might be not acceptable.

The data stream processing is covered by stream library in AsyncFlows.

The event processing is not covered in standard library yet, but it could be implemented using standard means of asynchronous component development, like it is done in sample tracker library. The demo reproduces some typical scenarios. Java 9 flows are more oriented to similar task, and there might be some integration in the future.

Other Programming Languages

The framework relies on Java 8 functional interfaces to create DSL. So if other language supports them in reasonable way, it is possible to use this DSL language in similar way.

Groovy

Groovy since version 2.4 supports java functional interfaces using closure syntax.However, sometimes more type annotations are needed, to specify parameter types if type checking is wanted.The syntax actually looks more nice for groovy.

        def t = doAsync {
            def failure = new Promise<Throwable>();
            def suppressed = new Promise<Integer>();
            aAll {
                aAny(true) {
                    aLater { aValue(1) }
                } or {
                    aValue(2)
                } or {
                    aFailure(new RuntimeException())
                } suppressed {
                    notifySuccess(suppressed.resolver(), it)
                } suppressedFailureLast {
                    notifySuccess(failure.resolver(), it);
                }
            } and {
                failure
            } andLast {
                suppressed
            }
        }
        assertEquals(2, t.getValue1().intValue());
        assertEquals(RuntimeException.class, t.getValue2().getClass());
        assertEquals(1, t.getValue3().intValue());

There is much less visual noise in groovy version than in Java version of the same test. The Groovy is a good choice of using with the framework if there is no special concerns about performance.

Note, Groovy currently implements lambdas using inner classes, so more classes are generated comparing to Java 8 code. This might lead to higher application start time.

Kotlin

The Kotlin language also has compact syntax that support DSL creation.It is also possible to write a compact code with much less visual noise in Kotlin as well.

        val t = doAsync {
            val failure = Promise<Throwable>()
            val suppressed = Promise<Int>()
            aAll {
                aAny(true) {
                    aLater { aValue(1) }
                }.or {
                    aFailure(RuntimeException())
                }.or {
                    aValue(2)
                }.suppressed { v ->
                    notifySuccess(suppressed.resolver(), v)
                }.suppressedFailureLast { ex ->
                    notifySuccess<Throwable>(failure.resolver(), ex)
                }
            }.and {
                failure
            }.andLast {
                suppressed
            }
        }
        assertEquals(2, t.value1)
        assertEquals(RuntimeException::class.java, t.value2.javaClass)
        assertEquals(1, t.value3)

So Kotlin is also good language to write structured asynchronous code if you project allows for it.

Note, Kotlin currently implement lambdas using inner classes, so more classes are generated comparing to Java 8 code.This might lead to higher application start time.

Kotlin Coroutines

Caution
Obsolete, it needs to be revised with flows and coroutines in 1.4.x.

The Kotlin Corountines is an experimental feature similar to C# async support, and there are some similar problems and advantages.

The extension is implemented as compiler extension with support library.

  1. There is no explicit safety frames. It is not clear from lexical scope what code can execute w/o interleaving with other code. In AsyncFlows, safe frame boundaries are more explicit.

  2. It is not always clear in what thread the code will be executed.In coroutines there is only one point for specifying context launch(context){…​}, but after that each component is on its own. Controlling execution context looks like quite complex.Controlling and clear understanding of the execution context is important in the following aspects:

    • Some code requires specific execution context to be used (For example for using with NIO Selectors or AWT/Swing components)

    • Some code is either CPU-bound (so it should be go to ForkJoin), and some code is blocking and IO-bound(so it should go to some unlimited thread pool).AsyncFlows solves it by the following means:

      • The context normally is inherited from parent for asynchronous operation

      • There are ways to change context explicitly (aSend, aLater, aPar)

      • Each component has own context declaring during exporting, that is reestablished on each call.

  3. Coroutines provide very compact syntax for sequential operations, i.e. waiting and resuming until some ready to continue. But coroutines provide little support from combining simultaneous operations (aAll*, aAny*, aPar*). There is practically only fork operation. partial support is provided by contextual await() operations. There is no support yet for combining them in the code explicitly. The problem could be fixed by providing a richer library with operators similar to AsyncFlows.

  4. Base concurrency abstractions looks like more complex than in AsyncFlows. Concurrency context combines continuation scheduling, context variables, and many resume/suspend etc. Practically these are orthogonal aspects, and they may be decoupled (and they are decoupled in AsyncFlows):

    • Scheduling actions: Vat

    • Resuming/Suspending: Promise

    • Contextual variables: Components and Asynchronous operations with lexical scope

  5. Context combinators could provide more interesting methods of integration with legacy frameworks like Java EE. Some of these ideas could be also implemented in AsyncFlows with minor refactoring the current Vat API.

Also, coroutines are bound to Kotlin with compiler support. So it is hard to write library code that is intended to be used by other programming languages. AsyncFlows is designed as mostly language-agnostic, and if language provides a reasonable integration with JVM, it is likely that AsyncFlows could be used with it.

Some library extension might be done in the future to integrate with Kotlin coroutines, so it might be possible to get advantages of both approaches.

Scala

The Scala is not directly supported as it wraps Java types and this causes multiple problems in different places, if AsyncFlows used directly. So for the Scala adapters needed and support for scala collections needs to be implemented. Some code could be executed directly, but it is less usable than in other languages.

Generally, the framework ideas are well compatible with Scala, and few first research versions of the framework were implemented in Scala.This Java version is based on ideas from Scala version. And Java 8 finally allows more compact syntax to be used.

The future versions of the framework might provide Scala support again after the framework stabilization.However, comparing to Kotlin and Groovy, there is not so big productivity increase and there even some additional complications cased by features of Scala language. So this feature has low priority.There is previous iteration of scala adapter at this link.

In the old sample code, control flow looked like the following:

    val list = new ListBuffer[Integer]
    val rc: Int = doAsync {
      aSeq {
        list += 1
        1
      } map { value =>
        list += value + 1
        throw new IllegalStateException
      } thenDo {
        list += -1
        aValue(-1)
      } failed {
        case value: IllegalStateException =>
          list += 3
          42
      } finallyDo {
        list += 4
      }
    }

    assertEquals(42, rc)
    assertEquals(List(1, 2, 3, 4), list)

It looks the same as Groovy version (but with better typing), and it a bit cleaner than Kotlin version.

About

The framework provides DSL for easy and modular construction of asynchronous processes from simpler constructs. The framework is mostly targeted to IO-bound processes and it is not intended for CPU-bound processes.

Topics

Resources

License

Packages

No packages published