MicroProfile Reactive Streams Operators
Reactive Streams is an integration SPI - it allows two different libraries that provide asynchronous streaming to be able to stream data to and from each other.
Reactive Streams is not however designed to be used directly by application developers. The semantics defined by Reactive Streams are very strict, and are non trivial, particularly in the areas of thread safety, to implement correctly. Typically, application developers are expected to use third party libraries that provide the tools necessary to manipulate and control streams. Examples include Akka Streams, RxJava and Reactor.
Depending on third party libraries for this essential application developer facing functionality however is not something that MicroProfile can do. MicroProfile specifications are not allowed to depend on anything other than the JDK and other MicroProfile specifications.
Hence, for MicroProfile to provide application developer APIs that use Reactive Streams, MicroProfile must provide its own Reactive Streams manipulation and control library. In future, it is hoped that this library will be adopted by the JDK itself, after a period of suitable incubation in the MicroProfile project. Some JDK maintainers have indicated that this would be a suitable path to get this type of functionality into the JDK.
Influences and History
The naming and scope of this API is inspired by the JDK8 java.util.stream API. The JDK8 stream API however does not capture all typical functionality that an application developer using Reactive Streams would need. For additional API naming and scope, Akka Streams, RxJava and Reactor have been used as an inspiration.
MicroProfile Reactive Streams does not contain an implementation itself but only provides the specified API, a TCK and documentation.
The following Implementations are available:
MicroProfile Reactive Streams offers a series of builders for creating instances of Reactive Streams
Subscriber’s are associated with a
CompletionStage of a result of consuming the stream, this may be the result of a reduction on the elements, or in some cases just indicates the termination of the stream either as a success or with an error.
The API builds a graph from a series of graph stages, which are provided as an SPI. A Reactive Streams engine, which is implemented by implementations of the specification, and can also manually be provided by end users, is responsible for building the graph into a running stream.
Reactive Streams is available in JDK9 in the
java.util.concurrent.Flow API, however, MicroProfile is not ready to require a baseline JDK version above 8.
For this reason, the same interfaces provided by https://www.reactive-streams.org in the
org.reactivestreams package are used instead.
This does place a dependency from MicroProfile to a third party library, however, that third party library is nothing more than the four Reactive Streams interfaces (
Processor), and these have been copied as is into JDK9.
As an interim solution while JDK9 adoption catches up, this has been deemed an acceptable exception to the rule.
The approach documented in MicroProfile Approach to Reactive for ensuring a smooth transition to the JDK9 APIs has been adopted.
The whole MicroProfile Reactive Streams project can be built via Apache Maven.
$> mvn clean install
The API documentation makes extensive use of marble diagrams to visualize what each operator does. These diagrams are generated using a DSL written specifically for this purpose, rather creating them in a visual diagram editor which would make it very difficult to ensure consistency, especially between different contributors.
All the diagrams are declared in
mp-rs-ops-marbles.js. This contains a map of all the graphs. Each graph has an array of stages, and stages have zero to many elements. The stages are as follows:
ins- An input stream. Called
insmakes the width 3 characters for nice alignment with other stages. The first parameter may optionally be a map of options, the only option currently read is
label, which is used to label the stage.
sub- A sub stream. This will automatically have a label generated for it,
nis the number of the sub stage, starting from one.
out- An output stream, the difference between
outmarbles inherit their colour from the previous
op- The operator stage. The first argument must be the operator, this may be followed by zero to many intermediate value marbles.
eff- An effect stream. Used to visualize side effects (eg, callbacks executed by
res- A result stage. This displays the result that gets redeemed by a
CompletionSubscriberwhen it completes.
Each of the stages support zero to many marbles, which are declared as follows:
n- an ‘onNext’ marble. A regular element passed through the stream. The colour is automatically computed - each marble in an
insstream gets a new colour, while all other marbles have their colour selected based on the previous
insmarble, searching up vertically first, then left. May optionally take a options argument, supporting a
linkargument which indicates that this marble should be linked to (eg as a feedback loop from the output back to the input) another marble.
onCompletemarble, indicating successful termination of the stream.
nterm- a hybrid between
term, allowing a marble to also carry a termination signal. This is used to make it clear that when a certain marble is emitted, that is the last marble and the stream will be terminated immediately.
onErrormarble, indicating failed termination of the stream.
e- a side effect. Should usually contain an example callback invocation.
i- an intermediate value. This is used for operators that compute intermediate values that are fed back into the operator callbacks with the next element.
r- a result value. This is used with the
resstage, to indicate the value that gets redeemed by the result
none- No marble. Inserts a blank space between marbles. Needed to ensure alignment of cause and effected marbles.
After editing or creating a new diagram, you can test your changes by opening them in a browser. Before you do this on the very first time, you need to run
npm install in the
mvn process-resources (or any lifecycle phase after
process-resources, such as
install) - the build uses the Maven frontend plugin to install Node, install npm, then run npm to install the dependencies. Included in the dependencies is an installation of Chromium which is used to generate the PNG diagrams for inclusion in the javadocs, this may take a while to download.
Once the dependencies are installed, you can then open
api/src/docs/js/index.html, this will show you all the rendered diagrams. No generation step is required to view these diagrams, you can simply hit refresh in the browser after making any changes.
We convert the diagrams to SVG, then to PNG, by using Puppeteer, a high level API on top of Chrome running in headless mode. The SVG diagrams are generated in Chrome, and then screenshotted to create the PNGs. This is automatically done by Puppeteer. However, we can’t run this as a part of the regular build because the MicroProfile CI and release server does not have the necessary dependencies to run Chrome. We’ve investigated a variety of different alternatives, including using different strategies for generating the diagrams, but nothing viable has come up, and unfortunately installing shared libraries in the Eclipse CBI is too high a maintenance burden for the Eclispe CBI maintainers, so they’ve refused to do it. Consequently, we need to check the diagrams into git, which means whenever they are changed, they need to be manually regenerated.
To generate the diagrams, run:
mvn -Pmarble-diagrams clean package
The diagrams will be saved to
api/src/main/java/org/eclipse/microprofile/reactive/streams/doc-files, from there they can be included in the javadocs using an image tag, eg:
<img src="doc-files/map.png" alt="map marble diagram">
Make sure to include the
alt text, the CI build will fail if it’s not there.
You can then view the diagrams in the api docs by opening
api/target/apidocs/index.html, and navigating to the class that you added the marble diagram to.
Before committing your changes, make sure to use the above command to generate the diagrams, and then check the results of it into git, including the updated
marble-diagram-hashes.json file. As part of the verification of the build, we have a task that checks that all the hashes of all the input and output files from the diagram generation process match the hashes when the diagrams were last generated. Failure to do this will result in the build failing in CI, and so it won’t pass PR validation.
Do you want to contribute to this project? Find out how you can help here.