Skip to content

Acting on Futures (actOnFutures operator)

johnmcclean-aol edited this page Oct 13, 2015 · 6 revisions

The actOnFutures Operator ensures that the next operation is performed directly on the Stream of underlying Futures. The standard behaviour is act on results.

Comparing zipWithIndex

The normal behaviour of zipWithIndex is to act on the result of the previous stage.

Where the result is already available

LazyFutureStream.of("a","b","c","d")
                .zipWithIndex()
                .forEach(System.out::println)


["a",1l]
["b",2l]
["c",3l]
["d",4l]

Where the result is realised asyncrhonously

If the values are not already present, but computed or loaded asyncrhonously then completion order will determine index assigned to each result.

new LazyReact().react(()->load("a"),()->load("b"),()->load("c"),()->load("d"))
                .zipWithIndex()
                .forEach(System.out::println)


["c",1l]   <-- c completes first
["b",2l]   <-- b completes second 
["a",3l]   <-- a completes third
["d",4l]   <-- d completes fourth

Where the result is realised asyncrhonously and actOnFutures is used

When actOnFutures is used the index represents the index of the future task (and therefore the original order, or the order of the Futures input into that stage).

new LazyReact().react(()->load("a"),()->load("b"),()->load("c"),()->load("d"))
                .actOnFutures()
                .zipWithIndex()
                .forEach(System.out::println)


["c",3l]   <-- c completes first
["b",2l]   <-- b completes second 
["a",1l]   <-- a completes third
["d",4l]   <-- d completes fourth

Multithreaded reduction

The standard behaviour for reduction (or collection / mutable reduction) in simple-react is that occurs on a single thread. There is an option to batch results and process them in parallel. An alternative to both mechanisms is to make use of the CompletableFuture api to involve multiple threads during reduction.

Methods such as thenCombine or allOf can be used to perform reduction operations on the processing threads (or even resubmit a task to the configured executor, if the async api methods are used).

new LazyReact().react(()->load("a"),()->load("b"),()->load("c"),()->load("d"))
               .actOnFutures()
               .reduce((future1,future2)-> future1.thenCombine(future2, (f1,f2)-> f1+","+f2))

actOnFutures operators

Reverse

Reverse can (efficiently for some Streams) reverse the order of elements in a Stream. When called in actOnFutures the order of the underlying Futures is simply reversed.

 public LazyFutureStream<T> reverse()

LazyFutureStream.of(1, 2, 3)
                .actOnFutures()
                .reverse()
	        .toList();
	        						
//3,2,1   						

Cycle

Cycling repeats a Stream a set number of times (or infinitely). When called in actOnFutures, it is the Stream of underlying futures that is cycled (resulting eventually in a Stream of populated results from those futures).

public LazyFutureStream<T> cycle(int times)

LazyFutureStream.of(1,2,2)
                .actOnFutures()
                .cycle(3)
                .collect(Collectors.toList())
//1,2,2,1,2,2,1,2,2
public LazyFutureStream<T> cycle(int times)

LazyFutureStream.of(1,2,2)
                .actOnFutures()
                .cycle()
                .limit(6)
                .collect(Collectors.toList())

//1,2,2,1,2,2

Copying (duplicate, triplicate, quadriplicate) Streams

A Stream can be lazily duplicated using the copy related operators. When used via actOnFutures the underlying Stream of Futures is duplicated, with Future instances being shared across Streams.

Tuple2<LazyFutureStream<T>, LazyFutureStream<T>> duplicate()

Tuple2<LazyFutureStream<<Integer>, LazyFutureStream<<Integer>> copies = LazyFutureStream.of(1, 2, 3, 4, 5, 6)
                                                                                .actOnFutures()
                                                                                .duplicate();
               

//[1, 2, 3, 4, 5, 6], [1, 2, 3, 4, 5, 6]
Tuple3<LazyFutureStream<T>, LazyFutureStream<T>, LazyFutureStream<T>> triplicate()
Tuple4<LazyFutureStream<T>, LazyFutureStream<T>, LazyFutureStream<T>, LazyFutureStream<T>> quadruplicate() 

Partitioning Streams

Partitioning operators allow a Stream to be split into multiple parts. splitAtHead and headAndTail allow the head and tail to be extracted together. HeadAndTail makes the tail avaiable as a SequenceM and splitAtHead as a LazyFutureStream

Tuple2<Optional<T>, LazyFutureStream<T>> splitAtHead()

LazyFutureStream.of(1,2,3).actOnFutures().splitAtHead()
	 
//Optional[1], LazyFutureStream[2,3]

Tuple2<LazyFutureStream<T>, LazyFutureStream<T>> splitAt(int where)
HeadAndTail<T> headAndTail()
Optional<HeadAndTail<T>> headAndTailOptional()

Zipping Streams

The zipping operators in actOnFutures allow two or more Streams to be combined element by element, but by operating on the underlying Futures.

<R> LazyFutureStream<Tuple2<T, R>> zipLfs(LazyFutureStream<R> other)
<R,T2> LazyFutureStream<R> zipLfs(LazyFutureStream<T2> other, BiFunction<CompletableFuture<T>,CompletableFuture<T2>,CompletableFuture<R>> combiner)
<R> LazyFutureStream<Tuple2<T, R>> zip(Stream<R> other)
<S, U> LazyFutureStream<Tuple3<T, S, U>> zip3(
			Stream<? extends S> second, Stream<? extends U> third)
 <S, U> LazyFutureStream<Tuple3<T, S, U>> zip3Lfs(
			LazyFutureStream<? extends S> second, LazyFutureStream<? extends U> third)
<T2, T3, T4> LazyFutureStream<Tuple4<T, T2, T3, T4>> zip4(
			Stream<T2> second, Stream<T3> third, Stream<T4> fourth)
<T2, T3, T4> SequenceM<Tuple4<T, T2, T3, T4>> zip4Lfs(
			LazyFutureStream<T2> second, LazyFutureStream<T3> third, LazyFutureStream<T4> fourth)

LazyFutureStream<Tuple2<T, Long>> zipWithIndex()

Windowing and Grouping operations

These enable data in a Stream to be grouped into either simple batches or sliding windows - by size, time, data or state.

LazyFutureStream<List<T>> sliding(int windowSize) 
LazyFutureStream<List<T>> sliding(int windowSize,
			int increment) 
LazyFutureStream<List<T>> grouped(int groupSize)
<R> LazyFutureStream<R> thenCombine(BiFunction<T, T, R> combiner)

Skipping and limiting operations

LazyFutureStream<T> skip(long n)
LazyFutureStream<T> limit(long maxSize)
LazyFutureStream<T> skipLast(int num)
 LazyFutureStream<T> limitLast(int num)

Stream manipulation operations

LazyFutureStream<T> intersperse(T value)
LazyFutureStream<T> intersperse(CompletableFuture<T> value)
LazyFutureStream<T> appendStream(Stream<T> stream) 
LazyFutureStream<T> shuffle()
LazyFutureStream<T> shuffle(Random random)
LazyFutureStream<T> slice(long from, long to)
LazyFutureStream<T> appendStreamFutures(Stream<CompletableFuture<T>> stream)
LazyFutureStream<T> prependStream(Stream<T> stream)
LazyFutureStream<T> prependStreamFutures(
			Stream<CompletableFuture<T>> stream) 
LazyFutureStream<T> append(T... values)
 LazyFutureStream<T> appendFutures(CompletableFuture<T>... values)
LazyFutureStream<T> prepend(T... values) 
LazyFutureStream<T> prependFutures(CompletableFuture<T>... values)
LazyFutureStream<T> insertAt(int pos, T... values)
LazyFutureStream<T> deleteBetween(int start, int end)
LazyFutureStream<T> insertStreamAt(int pos, Stream<T> stream)
LazyFutureStream<T> insertStreamFuturesAt(int pos,
			Stream<CompletableFuture<T>> stream)
 LazyFutureStream<T> concat(Stream<T> other)
LazyFutureStream<T> concat(T other)
LazyFutureStream<T> concat(T... other)
LazyFutureStream<T> concatFutures(CompletableFuture<T>... other)
LazyFutureStream<T> concatStreamFutures(Stream<CompletableFuture<T>> other)

Terminal operations

Optional<T> elementAt(long index)
Tuple2<T, LazyFutureStream<T>> get(long index)
Set<CompletableFuture<T>> toSet()
List<CompletableFuture<T>> toList()
<C extends Collection<CompletableFuture<T>>> C toCollection(
			Supplier<C> collectionFactory)
Optional<CompletableFuture<T>> reduce(BinaryOperator<CompletableFuture<T>> accumulator)
CompletableFuture<T> foldRight(CompletableFuture<T> identity,  BinaryOperator<CompletableFuture<T>> accumulator)
CompletableFuture<T> foldLeft(CompletableFuture<T> identity,  BinaryOperator<CompletableFuture<T>> accumulator)
CompletableFuture<T> reduce(CompletableFuture<T> identity, BinaryOperator<CompletableFuture<T>> accumulator)
<U> CompletableFuture<U> reduce(CompletableFuture<U> identity,
            BiFunction<CompletableFuture<U>, ? super CompletableFuture<T>, CompletableFuture<U>> accumulator,
            BinaryOperator<CompletableFuture<U>> combiner)
<R, A> R collect(Collector<? super CompletableFuture<T>, A, R> collector)
Clone this wiki locally