Skip to content

EagerFutureStream

johnmcclean-aol edited this page Feb 23, 2015 · 15 revisions

#EagerFutureStream

EagerFutureStream is a Java 8 Stream with a twist. Each element in the Stream is a Future task that is executed asynchronously and potentially in parallel.

##Creating a Stream

An EagerFutureStream starts processing as soon as it is created, and there are a number of helpful factory methods. Similarly to Java 8 Stream it can be created via of(T... data).

Stream.of(1,2,3,4)
EagerFutureStream.of(1,2,3,4)

##Asynchronous sequential operation

Calling collect on the above Streams would result in a Collection containing 1,2,3,4. But the process by which that result was arrived at, would be very different. The EagerFutureStream would execute sequentially, but on a separate thread (free-thread concurrency). The standard Stream would operate on the current thread.

##Parallel operation

Stream.of(1,2,3,4).parallel().map(it->it+1)
EagerFutureStream.parallel(1,2,3,4).map(it->it+1)

The operation between these two Streams is also different. The JDK parallel Stream operates by generating substreams that are processed in parallel

When a stream executes in parallel, the Java runtime partitions the stream into multiple substreams. Aggregate operations iterate over and process these substreams in parallel and then combine the results.

EagerFutureStream operates by executing each individual operation independently and responding to asynchronously to a call back when it completes. This allows us to capture and respond to the state of each task when it completes. When can catch and recover from errors, for example. We can retry, if the call fails. Or we could also change the concurrency rules for each stage in the EagerFutureStream.

##All of the Stream methods are available

 EagerFutureStream.of(1,2,3,4)
                  .filter(it -> it<3)
                  .map(num -> num*100)
                  .reduce(0,(acc,next)-> acc+next)

Along with Extra functionality

All of the SimpleReactStream API methods. All of the jOOλ Seq methods.

EagerFutureStream.parallel(1,2,3,4)
                 .retry(id-> loadFromCache(id))
                 .onFail(id-> loadFromDb(id))
                 .capture(id -> logFailure(id))
                 .concat(batchedStreams.flatMap(it -> it.stream())
                 .allOf(Collection<Data> data -> data.stream()
                                                     .reduce(new Aggregate(), (acc,next) -> 
                                                             acc.appendData(next))    
                  ).consume(aggregateData -> addToQueue(aggregateData))

Visualising EagerFutureStream Operations

All ships leave port on Stream creation! Operations act on the results at completion time.

eagerfuturestream limit

limit - limits results of computation, not starts.

eagerfuturestream skip

skip - skips results, not start.

stream retry

retry allows stages in a stream to be replayed for individual tasks.

stream onFail

onFail allows default values, or alternative computations to be used on failure.

eagerfuturestream capture

capture allows logging and similar failure capture to occur.

stream allOf

allOf allows result collection to occur asynchronously, leaving the calling thread unblocked.

Clone this wiki locally