Skip to content

LazyFutureStream

Ophir LOJKINE edited this page Dec 5, 2017 · 6 revisions

LazyFutureStream

LazyFutureStream is a Java 8 Stream with a twist. Each element in the Stream is a Future task that is executed asynchronously, lazily, and potentially in parallel.

Creating a Stream

A LazyFutureStream behaves similarly to a standard Java 8 Stream on creation. It will not start executing asynchronous tasks until a terminal operation is invoked. There are a number of helpful factory methods. Similarly to Java 8 Stream it can be created via of(T... data).

Stream.of(1,2,3,4)
LazyFutureStream.of(1,2,3,4)

Asynchronous sequential operation

Stream.of(1,2,3,4).map(it->it+1)
LazyFutureStream.of(1,2,3,4).map(it->it+1)

Calling collect on the above Streams would result in a Collection containing 2,3,4,5. But the process by which that result was arrived at, would be very different. The LazyFutureStream would execute the non-terminal steps sequentially, but on a separate thread (free-thread concurrency). The standard Stream would operate on the current thread. Collect is a JDK 8 Stream method that blocks the current thread. LazyFutureStream can collect on a separate thread using allOf.

Parallel operation

Stream.of(1,2,3,4).parallel().map(it->it+1).collect(Collector.toList())
LazyFutureStream.parallel(1,2,3,4).map(it->it+1).collect(Collector.toList())

The operation between these two Streams is also different. The JDK parallel Stream operates by generating substreams that are processed in parallel

When a stream executes in parallel, the Java runtime partitions the stream into multiple substreams. Aggregate operations iterate over and process these substreams in parallel and then combine the results.

LazyFutureStream operates by executing each individual operation independently and responding asynchronously to a callback when it completes. This allows us to capture and respond to the state of each task when it completes. Then we can catch and recover from errors, for example. We can retry, if the call fails. We could also change the concurrency rules for each stage in the LazyFutureStream. We can also send in multiple map requests to the same stage, forking the Stream in multiple directions.

Infinite processing

LazyFutureStreams can be very large, potentially infinite in fact. LazyFutureStream provides some functions that help manage that fact.

run

Run allows the triggering terminal operation of a LazyFutureStream to run on a separate thread. This means the LazyFutureStream can be configured so as to run constantly while the current thread remains unblocked.

   LazyFutureStream.generate( () -> readNextRecord())
                   .map(record -> process(record))
                   .consume(record -> save(record))
                   .run()

Limiting infinite streams

As of SimpleReact v0.4 some care needs to be taken when using #limit# with #infinite# LazyFutureStreams. Some LazyFutureStream methods are implemented by populating an asynchronous queue and then reading data sequentially in a different stream from there. If the desired effect is constant processing of Streaming data, this should have no impact.

But where users only want to process some portion of the LazyFutureStream, by applying a limit, that limit needs to be applied in the Stream before (rather than after) affected operations such as flatMap. With the exception of flatMap and merge which return LazyFutureStream, all impacted operations return Seq instead of LazyFutureStream. We intend to add auto-closing for infinite producers when such methods are called in the future, but for now, if you wish to apply a limit to a LazyFutureStream, please do so before calling a function with return type Seq (or flatMap / merge).

All of the Stream methods are available

e.g.

   LazyFutureStream.of(1,2,3,4)
                  .filter(it -> it<3)
                  .map(num -> num*100)
                  .reduce(0,(acc,next)-> acc+next)

Along with Extra functionality

All of the SimpleReactStream API methods. All of the jOOλ Seq methods.

LazyFutureStream.parallel(1,2,3,4)
                 .retry(id-> loadFromCache(id))
                 .onFail(id-> loadFromDb(id))
                 .capture(id -> logFailure(id))
                 .concat(batchedStreams.flatMap(it -> it.stream())
                 .allOf(Collection<Data> data -> data.stream()
                                                     .reduce(new Aggregate(), (acc,next) -> 
                                                             acc.appendData(next))    
                  ).consume(aggregateData -> addToQueue(aggregateData))

To split a LazyFutureStream

 LazyFutureStream<Integer> stream = LazyFutureStream.of(1,2,3,4)
                                            .map(it->it*1000000);
 List<String> toStrings = stream.map(it-> it+"!")
                                .block();
 List<Date> toDates = stream.map(Date::new)
                            .block();

Another example

  LazyFutureStream<Data> dataStream = lazyReact.fromStreamNoCompletableFutures(
                                            data.stream())
                                            .map(it -> cleanUserData(it));

  dataStream.retry(data -> confirmSaved(data))
                           .onFail(e-> writeToFailover(e.getValue()))
                           .capture(e -> logFailure(e.getValue()) ; 
                           // <-- ensure data saved on separate thread

  List<SavedData> response = dataStream.map(data -> confirmSaved(data))
                            .block(); //<-- responsive save attempt

Asynchronous core operations and Synchronous support operations

Some methods in LazyFutureStream operate both synchronously and asynchronously. For example merge synchronously merges two streams. But.. it operates synchronously on an immediately available Stream of CompletableFutures. The results of those futures, the actual results of the merge, are populated asynchronously.

Operations will operate as asynchronously as possible with LazyFutureStreams. For example reduce operations will start processing as soon as the first future results start flowing into that phase, but meanwhile, Futures from much earlier phases may still be waiting to be populated while processing or I/O operations continue on a separate thread.

Visualising LazyFutureStream Operations

The LazyFutureStream flow is triggered by a terminal operation, and can be managed & performance optimised via configuration options and the 'run' method.

limit - limits computations started. When a limit is applied to a LazyFutureStream it is applied to the tasks before they start. For versions before the planned SimpleReact v0.5, specifying a limit early in a LazyFutureStream that you don’t want to run constantly is recommended. v0.5 will include the ability for consuming threads to auto-close producing threads and reduce the need for programmatic management of LazyFutureStreams by users.

lazyfuturestream limit skip - For LazyFutureStream specifying a skip will skip the first X tasks specified in the previous stage.

lazyfuturestream - skip map/then converts input data in one format to output data in another.

stream map/then retry allows stages in a stream to be replayed for individual tasks.

stream retry onFail allows default values, or alternative computations to be used on failure.

stream onFail capture allows logging and similar failure capture to occur.

eagerfuturestream capture allOf allows result collection to occur asynchronously, leaving the calling thread unblocked.

stream allOf flatMap splits a single result into multiple tasks by returning a Stream from the flatMap method.

stream flatMap

zip merges two streams by taking the next available result from each stream. For sequential streams, this will be determined by start order - for parallel streams, order is non-deterministic

stream zip

toQueue creates a new simplereact.aysnc.Queue that is populated asynchronously by the current Stream. Another Stream (Consumer) can be created from the Queue by calling queue.toStream() eagerfuturestream toqueue

Clone this wiki locally