Permalink
Fetching contributors…
Cannot retrieve contributors at this time
2572 lines (1961 sloc) 71.4 KB

Working With Streams

cyclops-react provides a lot of helpful classes and utilities to make working with streams of data a breeze. They are

  • StreamUtils : a large collection of static methods for manipulating JDK 8 Streams

  • Streamable : an interface that represents a Stream that can replayed and has a large number of available Stream operations in it’s own middle

  • ReactiveSeq : an advanced sequential stream interface that extends java.util.stream.Stream<T>, org.jooq.lambda.Seq<T>, java.lang.Iterable<T>, org.reactivestreams.Publisher<T> and adds additional operators.

StreamUtils, Streamable and ReactiveSeq share many operators, in general though ReactiveSeq has the larest range of operators.

Important

Some knowledge of the Java 8 Streams API is assumed here. ReactiveSeq provides all of the Stream operations (by inheriting from Stream) and a lot more. The Oracle tutorial for Streams is a good place to start if you are brand new to this concept.

Introduction to StreamUtils

StreamUtils provides a large range of additional operators for standard Java 8 Streams, these include operators for batching & windowing, error handling and retrying, scheduling, asyncrhonous execution, zipping, controlling emissions by time, appending, deleting and rearranging Streams and more!

Example 1. Using StreamUtils with a standard Java Stream

In this example we will delete between element 2 and element 4

List<String> result = StreamUtils.deleteBetween(Stream.of(1,2,3,4,5,6),2,4)
                                .map(it ->it+"!!")
                                .collect(Collectors.toList());

//"1!!","2!!","5!!","6!!"
Note

There is an equivalent StreamUtils class for Javaslang Streams.

Introduction to Streamable

Streamable is a class that represents something that can be Streamed repeatedly, Streamable also has a large number of operators viewable via javadoc.

Streamables can also be constructed lazily from Streams, or even in a similar manner to Streams. E.g.

Example 2. Creating a Streamable
Streamable.fromStream(Stream.of(1,2,3));
Streamable.of(1,2,3);
Streamable.generate(()->"hello world"));
ReactiveSeq.of(1,2,3).toStreamable();

And then converted to a Stream, ReactiveSeq or even manipulated directly like a Stream

Example 3. Using a Streamable
ReactiveSeq<Integer> seq = streamable.ReactiveSeq();
Stream<Integer> stream = streable.stream();

//or even treat the streamable like a stream
Streamable<Integer> initial = Streamable.of(1,2,3);
Streamable<String> nextStage = initial.map(i->"hello"+i);
Important

Streamable works by lazily constructing an intermediate collection that captures the values that pass through each phase of the Stream, and as such is not suitable for infinite or very large Streams. A Streamable can be converted to a standard non-caching Stream at any stage, however, and that should be the approach for any stages where you do not wish to cache the data traveleling through. Streamable#toReactiveSeq() converts to a non-caching Stream that also has a ReactiveSeq#toStreamable() operator, so switching between caching and non-caching contexts is straightforward.

Introduction to ReactiveSeq

ReactiveSeq is a sequential Stream, it can be created in the same manner as standard JDK streams, but offers a large number of powerful additional operators.

Example 4. Creating a ReactiveSeq
ReactiveSeq.fromStream(Stream.of(1,2,3));
ReactiveSeq.fromList(Arrays.list(1,2,3));
ReactiveSeq.of(1,2,3);
ReactiveSeq.generate(()->"hello world"));

HotStreams

Note

HotStreams are streams that are actively flowing. They can be created via the hotstream method on ReactiveSeq or in StreamUtils. They execute on a single thread on a provided executor.

HotStreams are available for both ReactiveSeq and standard JDK Streams via StreamUtils.

Example 5. Creating a HotStream
Executor exec = Executors.newFixedThreadPool(1);
ReactiveSeq.of(1,2,3)
     .peek(v->v+1)
     .peek(System.out::println)
     .connectable(exec);

In this example the Stream will start processing immediately on exec, and we will see

2

3

4

printed out to the console.

For a JDK Stream we could rewrite the code above to

Executor exec = Executors.newFixedThreadPool(1);
StreamUtils.connectable(Stream.of(1,2,3)
     .peek(v->v+1)
     .peek(System.out::println)
     ,exec);

Connecting to a HotStream

Once a HotStream has been created, users can connect to it via the connect operator. This returns another Stream that recieves values from the HotSteam. By default the connected Stream will be a standard 'cold' Stream. That is, the values will begin to accumulate in the transfer queue for the new Stream until a terminal operation is invoked for that Stream.

Multiple Streams can connect to a single HotStream.

Example 6. Connecting to a HotStream
In the example below 5,000 entries will be written out on the HotStreams executing thread, and 100 of those will also be written out on the current thread.

ReactiveSeq.range(0,Integer.MAX_VALUE)
                    .skip(5000)
                    .peek(System.out::println)
                    .connectable(exec)
                    .connect()
                    .skip(100)
                    .forEach(next->System.out.println("Current thread : " + next);
Data transfer between Streams

HotStreams use a (configurable) transfer queue to transfer data to client Streams.

7eee02ea b658 11e5 8605 4e29116bc0f7

When the connect method is called a new Queue is created (by default an Agrona OneToOneConcurrentArrayQueue if non is provided by the user).

Back pressure

When two Streams have been joined it is possible that the producting Stream may produce data at a rate faster than the consuming queue can handle. Future versions of cyclops-react will offer tighter integration with simple-react, which has a number of strategies for dealing with scenario - but for now it is possible for the consuming Stream to signal back pressure by making use of a blocking queue as the transfer queue between the HotStream and the connected Stream.

Warning

The default transfer queue used by the connect method on a HotStream is an Agrona wait-free, bounded OneToOneConcurrentArrayQueue. If this queue fills up due to a producer out performing the consumer then an illegal state exception will be thrown.

Example 7. Applying Back Pressure
In the example below 5,000 entries will be written out on the HotStreams executing thread, the consuming thread will only emit one per second. This will cause the transfer queue to fill up, and the ReactiveSeq generating the HotStream will crash.

ReactiveSeq.range(0,Integer.MAX_VALUE)
                    .skip(5000)
                    .peek(System.out::println)
                    .connectable(exec)
                    .connect()
                    .onePer(1,TimeUnit.SECONDS)
                    .forEach(next->System.out.println("Current thread : " + next);


Instead we connect and use a BlockingStream as a transfer queue, the producing Stream will ultimately be slowed to the same rate as the consuming Stream.

ReactiveSeq.range(0,Integer.MAX_VALUE)
                    .skip(5000)
                    .peek(System.out::println)
                    .connectable(exec)
                    .connect(new BlockingQueue(400))
                    .onePer(1,TimeUnit.SECONDS)
                    .forEach(next->System.out.println("Current thread : " + next);

reactive-streams

reactive-streams is an api for advanced inter-stream operability. cyclops-react, when simple-react is added to the class path can provide both a reactive-streams publisher and subscriber.

Creating a Subscriber

ReactiveSeq has a static subscriber method that returns a cyclops-react reactive-streams Subscriber. That is a class that can subscribe to any reactive-streams publisher (e.g. an RxJava Observable, Pivotal REACTOR Stream, akka-stream etc).

cyclops-reactSubscriber has a single method ReactiveSeq() that returns a ReactiveSeq instance (remember that ReactiveSeq extenads java.util.stream.Stream - so this also a standard, sequential Java 8 Stream).

Example 8. Creating a reactive-streams Subscriber
cyclops-reactSubscriber sub = ReactiveSeq.subscriber();
sub.ReactiveSeq().toList();

//[]

In this example our subscriber will be zero, as it has not attached to a publisher, so our generated List will also be zero.

Publishing

ReactiveSeq implements reactive-streams Publisher interface, and as such has the reactive-streams api publish method.

Example 9. Connecting a Subscriber to a Publisher
cyclops-reactSubscriber sub = ReactiveSeq.subscriber();
ReactiveSeq.of(1,2,3,4).publish(sub);
sub.ReactiveSeq().toList();

//[1,2,3,4]

In this example our subscriber has connected to a publisher that will send the values 1,2,3,4 in sequence, on request.
Important

Using the reactive-streams functionality in cyclops-react requires that simple-react be included on the classpath.

forEachWithError

since cyclops-react 7.2.0

The forEachWithErrors operator allows users to iterate over a Stream providing a consumer for the elements for the Stream a lá Stream.forEach, and a consumer for the errors produced while processing the Stream.

Example 10. forEachWithError with a ReactiveSeq
List list = new ArrayList<>();
Throwable error = null;
public String load(int i){
   if(i==2)
     throw new RuntimeException();

}
ReactiveSeq.of(1,2,3,4)
         .map(this::load)
         .forEachWithError(  i->list.add(i), e->error=e);

//list =List[1,3,4]
//error = RuntimeException

forEachEvent

since cyclops-react 7.2.0

The forEachEvent operator is similar to forEachWithErrors but also accepts a Runnable that is run when the Stream has been completely consumed.

Example 11. forEachEvent with a ReactiveSeq
Closeable resource;
List list = new ArrayList<>();
Throwable error = null;
public String load(int i){
   if(i==2)
     throw new RuntimeException();

}
ReactiveSeq.of(1,2,3,4)
         .map(this::load)
         .forEachEvent(  i->list.add(i),
                         logger::error,
                         ()->resource.close());

//list =List[1,3,4]
//runtime exception logged
//resource is closed

forEachX

since cyclops-react 7.2.0

forEachX allows users to consume only a specified amount of data from the Stream, returning a reactive-streams Subscription object that in turn allows more data to be consumed as needed.

Example 12. forEachX with a JDK Stream
List list = new ArrayList<>();
Subscription s = StreamUtils.forEachX(Stream.of(1,2,3), 2,  i->list.add(i));
assertThat(list,hasItems(1,2));
assertThat(list.size(),equalTo(2));

s.request(1); //request an additional iterm from the Stream be processed.

assertThat(list,hasItems(1,2,3));
assertThat(list.size(),equalTo(3));

forEachXWithError

since cyclops-react 7.2.0

forEachXWithErrors allows users to consume only a specified amount of data from the Stream, returning a reactive-streams Subscription object that in turn allows more data to be consumed as needed. The forEachXWithErrors operator allows users to iterate over a Stream providing a consumer for the elements for the Stream a lá Stream.forEach, and a consumer for the errors produced while processing the Stream.

Example 13. forEachWithError with a ReactiveSeq
List list = new ArrayList<>();
Throwable error = null;
public String load(int i){
   if(i==2)
     throw new RuntimeException();

}
Subscription s = ReactiveSeq.of(1,2,3,4)
                          .map(this::load)
                          .forEachXWithError( 2, i->list.add(i), e->error=e);

//list =List[1]
//error = RuntimeException

s.request(1);

//list =List[1,3]

s.request(1);

//list =List[1,3,4]

forEachXEvents

forEachXEvents allows users to consume only a specified amount of data from the Stream, returning a reactive-streams Subscription object that in turn allows more data to be consumed as needed. The forEachXEvents operator is similar to forEachXWithErrors but also accepts a Runnable that is run when the Stream has been completely consumed.

Example 14. forEachXEvents with a ReactiveSeq
List list = new ArrayList<>();
Throwable error = null;
Closeable resource;
public String load(int i){
   if(i==2)
     throw new RuntimeException();

}
Subscription s = ReactiveSeq.of(1,2,3,4)
                          .map(this::load)
                          .forEachXEvents( 2, i->list.add(i),   logger::error,
                           ()->resource.close());;

//list =List[1]
//error = RuntimeException
// resource open

s.request(1);

//list =List[1,3]

s.request(1);

//list =List[1,3,4]

s.request(1); //no new elements end of Stream

//list =List[1,3,4]
// resource closed

Reactive Future Operations & Reactive Tasks

The reactive-streams based terminal operations can also be launched asynchronously, first by using the futureOperations operator to provide an Executor that will process the Stream.

Using futureOperations

The futureOperations operator opens up a world of asynchronously executed terminal operations. A large range of terminal operations are provided and for each one a CompletbableFuture is returned.

Example 15. using FutureOperatons with a JDK Stream
Executor exec = Executors.newFixedThreadPool(1);
FutureOperations terminalOps  = StreamUtils.futureOperations(Stream.of(1,2,3), exec);

//execute the collection & Stream evaluation  on the provided executor
CompletableFuture<List> futureList = terminalOps.collect(Collectors.toList());

List result  = list.join();

ReactiveTask

Each of the async Future Operations for reactive-streams (forEachX, forEachEvent etc), return a ReactiveTask object. This allows users to check the status of Stream processing, to cancel it, to request more elements to be processed from the Stream either synchronously or asynchronously.

Example 16. using FutureOperatons with a JDK Stream
List list = new ArrayList<>();
ReactiveTask s = ReactiveSeq.of(1,2,3)
                          .futureOperations(exec)
                          .forEachX( 2,  i->list.add(i));
//wait until first 2 elements are processed
s.block();

//list = List[1,2]

//trigger the remainder of the Stream processing asynchronously
ReactiveTask nextElements = s.requestAllAsync();

//if we wait until it completes
//nextElements.block();
//list = List[1,2,3]

Batching, Windowing and Sliding views

cyclops-react provides a number of different batching and windowing operations, none of which terminate / fully consume the Stream (i.e. they are compatible with infinitely large Streams). The Sliding operator creates a sliding view whereas both batch & window operators return batches of elements and differ only by return type (batch - returns a List, window - returns a Streamable).

Note

jOOλ 0.9.9 provides a large range of windowing functions inspired by SQL windowing operations. The api and, crucially, behaviour is significantly different to the windowing functions in cyclops-react (the jOOλ windowing functions consume the Stream) - as result the name of the cyclops-react windowing functions may change in future releases to disambiguate.

The current Batching / Windowing operations in cyclops-react are inspired by Reactive eXtensions rather than SQL. Like in Reactive eXtensions Batching (or Buffering) differs from Windowing only in terms of the supplied parameter type - a List for Batching and a Streamable for Windowing.

80928004 d160 11e4 85b1 227f9c7652b6

Sliding

Sliding produces a sliding view over a Stream, there are two sliding operators - one that takes just the window size and another that takes window size and the increment to be applied.

Example 17. Creating a sliding view over a Sequence
ReactiveSeq.of(1, 2, 3, 4, 5, 6)
         .sliding(2)
         .toList();

//List[[1,2],[2,3],[3,4],[4,5],[5,6]]
Example 18. A sliding view with StreamUtils and an increment
import static com.aol.cyclops2-react.streams.StreamUtils.sliding;

List<List> list = sliding(Stream.of(1, 2, 3, 4, 5, 6),3, 2)
                        .collect(Collectors.toList());

//[[1, 2, 3], [3, 4, 5], [5, 6]]

Batch / Window by size

Batch / Window by size allows elements to be grouped as they flow through the Stream into Lists or Streamables of the specified size.

Example 19. Batch by size example
ReactiveSeq.of(1,2,3,4,5, 6)
                            .map(n-> n==6? sleep(1) : n)
                            .batchBySize(4)
                            .toList()
//List[[1,2,3,4],[5,6]]
Example 20. Batch by size video

The video shows batching by size on simple-react’s LazyFutureStream which is a parellel implementation of ReactiveSeq

Batch / Window by time

Batch / Window by time group elements into either a List (Batch) or Streamable (Window) based on the time bucket they pass through the Stream.

Example 21. Batch by time example
ReactiveSeq.of(1,2,3,4,5, 6)
         .map(n-> n==6? sleep(1) : n)
         .windowByTime(10,TimeUnit.MICROSECONDS)

//Streamable[[1,2,3,4,5],[6]]
Example 22. Batch by time video

The video shows batching by time on simple-react’s LazyFutureStream which is a parellel implementation of ReactiveSeq

Tip

The idea of batching elements into time buckets might seem absurd if you are used to creating Java 8 Streams from already populated collections. This is can be really useful if you use cyclops-react-streams in conjunction with simple-react, you can connect Streams to collections that are populated asynchronously, for example on recieving a web request an async Queue could be populated that has a processing ReactiveSeq attached.

Batch / Window by size and time

Much like batchBySize groups elements into Lists based on the specified list size, and windowBySize organises streaming elements into Streamables by time bucket- batchBySizeAndTime / windowBySizeAndTime populates Lists (or Streamables) based on which ever criteria is met first. Should the max size be reached the List / Streamable is ready to move down stream, should the max time elaspe - ditto.

Example 23. Window by size and time example
ReactiveSeq.generate(this::loadData)
         .map(this::process)
         .windowByTSizeAndTime(3,1,TimeUnit.SECONDS)

//4th item takes >1 second
//Streamable[[res1,res2,res3],[res4]]

Batch / Window by state

Stateful batching and windowing allows the user to define a BiPredicate that recieves both the current element moving through the Stream and the Streamable from the previous window / batch. Returning true keeps the window / batch open, returning false closes it.

Example 24. Window Statefully example
ReactiveSeq.of(1,2,3,4,5,6)
                .windowStatefullyWhile((s,i)->s.toList().contains(4) ? true : false)
                .toList()
//streamable[1], streamable[2], streamable[3],streamable[4], streamable[5, 6]

Batch / Window while a predicate holds

Batching or Windowing while allows users to keep the window / batch open as long as the predicate holds true.

Example 25. Batch while example
ReactiveSeq.of(1,2,3,4,5,6)
                .batchWhile(i->i%3!=0)
                .toList()

//[1,2,3],[4,5,6]

Batch / Window until a predicate holds

Batching or Windowing while allows users to keep the window / batch open until the predicate holds true. .Batch until example

ReactiveSeq.of(1,2,3,4,5,6)
                .batchUntil(i->i%3==0,()->new ArrayList<>())
                .toList().size()
//[1,2,3],[4,5,6]

jOOλ based windowing

Integrated as of cyclops-react 7.3.0

jOOλ based windowing implements SQL windowing operations for Streams. There is a very good introductory blog article on the subject here 2016 Will be the Year Remembered as When Java Finally Had Window Functions!

Tip

The jOOλ functions are exceptionally powerful and flexible, but also consume the Stream. This means they will not perform as well as the simpler (but less powerful) batchBy, windowBy and sliding functions in cyclops-react. They are also not suitable for use in infinitely large Streams.

Example 26. jOOλ windowing example
// group, order, take max

ReactiveSeq.of(1, 2, 4, 2, 3)
         .window(i -> i % 2, naturalOrder())
         .map(Window::max)
// (1, 2, 4, 4, 3)
Example 27. jOOλ windowing with pretty print

An example from jOOλ windowing blog entry.

System.out.println(
    ReactiveSeq.of("a", "a", "a", "b", "c", "c", "d", "e")
       //create a window
       .window(naturalOrder())
       //produce a table from the window
       .map(w -> tuple(
              w.value(),   // v0
              w.count(),   // v1
              w.median(),  // v2
              w.lead(),    // v3
              w.lag(),     // v4
              w.toString() // v5
       ))
            .format()
);
+----+----+----+---------+---------+----------+
| v0 | v1 | v2 | v3      | v4      | v5       |
+----+----+----+---------+---------+----------+
| a  |  1 | a  | a       | {zero}  | a        |
| a  |  2 | a  | a       | a       | aa       |
| a  |  3 | a  | b       | a       | aaa      |
| b  |  4 | a  | c       | a       | aaab     |
| c  |  5 | a  | c       | b       | aaabc    |
| c  |  6 | a  | d       | c       | aaabcc   |
| d  |  7 | b  | e       | c       | aaabccd  |
| e  |  8 | b  | {zero}  | d       | aaabccde |
+----+----+----+---------+---------+----------+

Stream manipulation

cyclops-react offers many functions for manipulating Streams such as deleteBetween, insertAt and more

Prepending to a Stream

Example 28. Prepending
import static com.aol.cyclops2-react.streams.StreamUtils.prepend;

List<String> result =   prepend(Stream.of(1,2,3),100,200,300)
                                 .map(it ->it+"!!")
                                 .collect(Collectors.toList());

List<String> result =   ReactiveSeq.of(1,2,3)
                                 .prependStream(ReactiveSeq.of(100,200,300))
                                 .map(it ->it+"!!")
                                 .toList();

//["100!!","200!!","300!!","1!!","2!!","3!!"]

Appending to a Stream

Example 29. Appending
List<String> result =   ReactiveSeq.of(1,2,3)
                                 .append(100,200,300)
                                 .map(it ->it+"!!")
                                 .toList();
import static com.aol.cyclops2-react.streams.StreamUtils.appendStream;

List<String> result =   appendStream(Stream.of(1,2,3),ReactiveSeq.of(100,200,300))
                                        .map(it ->it+"!!")
                                        .collect(Collectors.toList());

//["1!!","2!!","3!!","100!!","200!!","300!!"]

Inserting at an index

Example 30. Inserting at index
List<String> result =   ReactiveSeq.of(1,2,3).insertAt(1,100,200,300)
                .map(it ->it+"!!").collect(Collectors.toList());

import static com.aol.cyclops2-react.streams.StreamUtils.insertStreamAt;

List<String> result =   insertStreamAt(Strean.of(1,2,3),1,Stream.of(100,200,300))
                                     .map(it ->it+"!!")
                                     .collect(Collectors.toList());

//["1!!","100!!","200!!","300!!","2!!","3!!"]

Deleting between two indices

The deleteBetween operator allows you to exclude elements between two zero-indexed indices. For example deleteBetween(1,3) deletesBetween the second and fourth element.

Example 31. Deleting between two indices
List<String> result =   ReactiveSeq.of(1,2,3,4,5,6)
                                 .deleteBetween(2,4)
                                 .map(it ->it+"!!")
                                 .toList();

import static com.aol.cyclops2-react.streams.StreamUtils.deleteBetween;

List<String> result =   deleteBetween(Stream.of(1,2,3,4,5,6),2,4)
                                 .map(it ->it+"!!")
                                 .collect(Collectors.toList());

//["1!!","2!!","5!!","6!!"]

SubStream

The subStream operator allows users to extract a smaller subset stream from a larger one. It works in the opposite manner to deleteBetween in that you select two zero-indexed indices between which you would like to keep data.

Example 32. creating a subStream
ReactiveSeq.of(1,2,3,4,5,6).subStream(1,3);


//ReactiveSeq[2,3]

intersperse

The intersperse operator allows a new value to be inserted between every element. .intersperse example

//ReactiveSeq.of(1, 2, 3, 4).intersperse(0);

// (1, 0, 2, 0, 3, 0, 4)

SplitBy

Example 33. splitBy
ReactiveSeq.of(1, 2, 3, 4, 5, 6).splitBy(i -> i % 2 != 0)
//tuple[ReactiveSeq[1,3,5],ReactiveSeq[2,4,6]]

Split At

Example 34. splitAt
ReactiveSeq.of(1, 2, 3, 4, 5, 6).splitAt(2)
//tuple[ReactiveSeq[1,2,3],ReactiveSeq[4,5,6]]

Copy a Stream

Example 35. Duplicate, triplicate and quadruplicate a Stream
 Tuple2<ReactiveSeq, ReactiveSeq> copies =  ReactiveSeq.of(1,2,3,4,5,6).duplicateSequence();
 Tuple3<ReactiveSeq, ReactiveSeq, ReactiveSeq> copies =ReactiveSeq.of(1,2,3,4,5,6).triplicate();
 Tuple4<ReactiveSeq, ReactiveSeq, ReactiveSeq,ReactiveSeq> copies =ReactiveSeq.of(1,2,3,4,5,6).quadruplicate();

Value Extraction

cyclops-react provides many extraction operators, including many that return a Tuple containing a value and an operational Stream (such as splitAt, splitBy, headAndTail returns an object with 2 fields), and others that access a value directly (get, single) - and throw an exception if the element doesn’t exist and some that return optional (elementAt, singleOptional).

get, elementAt

Example 36. splitAtHead, splitAt, get, elementAt
ReactiveSeq<String> helloWorld = ReactiveSeq.of("hello","world","last");
Tuple2<String,ReactiveSeq<String> headAndTail = helloWorld.splitAtHead();
String head = headAndTail._1();
//hello

ReactiveSeq<String> tail =  headAndTail._2();
//[world,last]

splitAt Stream at the specified index.

ReactiveSeq.of(1, 2, 3, 4, 5, 6).splitAt(2)
//tuple[ReactiveSeq[1,2,3],ReactiveSeq[4,5,6]]

Get at 0, this extracts the first value and returns a Stream of the remaining values (as a Tuple2)

ReactiveSeq.of(1,2,3,4).get(0)
//[1],ReactiveSeq[2,3,4]

Get at 1

ReactiveSeq.of(1,2,3,4).get(1)
//[2],ReactiveSeq[1,3,4]

ElementAt returns an optional containing the element at index (if exists) otherwise optional zero

ReactiveSeq.of(1).elementAt(0)
//Optional[1]
ReactiveSeq.of().elementAt(0).isPresent()
//false

Head And Tail Extraction

Example 37. Head and Tail on a Streamable
int head = Streamable.of(1,2,3,4).head();
//1

Streamable<Integer> tail = Streamable.of(1,2,3,4).tail();
//Streamable[2,3,4]
Sieve of Eratosthenes
Example 38. ReactiveSeq based sieve
public void sieveTest(){
    sieve(ReactiveSeq.range(2, 1_000)).forEach(System.out::println);
}

ReactiveSeq sieve(ReactiveSeq s){

    return s.headAndTailOptional().map(ht ->ReactiveSeq.of(ht.head())
                            .appendStream(sieve(ht.tail().filter(n -> n % ht.head() != 0))))
                    .orElse(ReactiveSeq.of());
}
Example 39. Streamable based sieve
public void sieveTest2(){
    sieve(Streamable.range(2, 1_000)).forEach(System.out::println);
}

Streamable sieve(Streamable s){

    return s.size()==0? Streamable.of() : Streamable.of(s.head())
                                           .appendStreamable(sieve(s.tail()
                                                                    .filter(n -> n % s.head() != 0)));
}
Example 40. StreamUtils based sieve
import static com.aol.cyclops2-react.streams.StreamUtils.headAndTailOptional;

 public void sieveTest(){
    sieve(IntStream.range(2, 1_000).boxed()).forEach(System.out::println);
}

Stream sieve(Stream s){

    return headAndTailOptional(s).map(ht ->Stream.concat(Stream.of(ht.head())
                            ,sieve(ht.tail().filter(n -> n % ht.head() != 0))))
                    .orElse(Stream.of());
}

Error handling

Recover

It is possible to recover from an exception thrown earlier in the Stream using the recover operator. It is available on ReactiveSeq, Streamable and StreamUtils. Users can choose to recover differently by Exception type, or globally.

Note

For those using simple-react, this differs from the simple-react only operator OnFail in that it does not provide the element data that failed.

Example 41. Global recover

In this example all exceptions types will be caught and recovered from.

ReactiveSeq.of(1,2,3,4)
                    .map(u->{throw new RuntimeException();})
                    .recover(e->"hello")
                    .firstValue()
//hello
Example 42. Targeted recovery

In this example we only recover from IOExceptions.

ReactiveSeq.of(1,2,3,4)
                    .map(i->i+2)
                    .map(u->throw ExceptionSoftener.throwSoftenedException( new IOException()))
                    .recover(IOException.class,e->"hello")
                    .firstValue()
//hello

Note the use of ExceptionSoftener

Retry

Retry allows a function to be retried. By default retry occurs up to 5 times with an exponential backoff.

Note

simple-react users should note that the implementation in LazyFutureStream is a significantly more advanced asynchronous retry (making use of Tomasz Nurkiewicz async retry library).

Example 43. Retry example
ReactiveSeq.of( 1,  2, 3)
         .retry(this::remoteCall)
         .map(this::continueProcessing)

//if remote call fails, it will be retried with a backoff

LazyFutureStream in simple-react provides a parallel ReactiveSeq implementation.

Retry in simple-react

Scheduling

Scheduling is available for ReactiveSeq streams and via StreamUtils.

Cron Based Scheduling

Example 44. ReactiveSeq example

Send one element of a Stream through every second.

ReactiveSeq.of(1,2,3,4)
     .peek(System.out::println)
     .schedule("* * * * * ?", ex)

This will print 1 2 3 4 With a new line per second.

We can connect to the output of this stream

HotStream connectable = ReactiveSeq.of(1,2,3,4)
                .peek(System.out::println)
                .schedule("* * * * * ?", ex);

And further process the connected Stream, in this case only processing one element per day via the debounce operator

ReactiveSeq.of(1,2,3,4)
     .peek(System.out::println)
     .schedule("* * * * * ?", ex)
     .connect()
     .debounce(1,TimeUnit.DAYS)
     .peek(this::writeToDB)
     .toList()
Example 45. java.util.stream.Stream example

The final example again with JDK 8 via the static methods in StreamUtils.

StreamUtils.debounce(StreamUtils.schedule(Stream.of(1,2,3,4)
                .peek(i->count.incrementAndGet())
                .peek(System.out::println)
                ,"* * * * * ?", ex)
                .connect()
                ,1,TimeUnit.DAYS)
                .peek(this::writeToDB)
                .toList()

Fixed Rate

Example 46. ReactiveSeq example

This time we will execute the Stream every second using a Fixed Rate delimiter

ReactiveSeq.of(1,2,3,4)
     .peek(System.out::println)
     .scheduleFixedRate(1000, ex)
     .connect()
     .debounce(1,TimeUnit.DAYS)
     .peek(this::writeToDB)
     .toList()
Example 47. java.util.stream.Stream example
StreamUtils.debounce(StreamUtils.scheduleFixedRate(Stream.of(1,2,3,4)
                .peek(i->count.incrementAndGet())
                .peek(System.out::println)
                ,1000, ex)
                .connect()
                ,1,TimeUnit.DAYS)
                .peek(this::writeToDB)
                .toList()

Fixed Delay

Example 48. ReactiveSeq example

This time we will execute the Stream every second using a Fixed Delay delimiter

ReactiveSeq.of(1,2,3,4)
     .peek(System.out::println)
     .scheduleFixedDelay(2000, ex) //2 secs after previous element passes through
     .connect()
     .debounce(1,TimeUnit.DAYS)
     .peek(this::writeToDB)
     .toList()
Example 49. java.util.stream.Stream example
StreamUtils.debounce(StreamUtils.scheduleFixedDelay(Stream.of(1,2,3,4)
                .peek(System.out::println)
                ,2000, ex)
                .connect()
                ,1,TimeUnit.DAYS)
                .peek(this::writeToDB)
                .toList()

Time based operators

cyclops-react provides a number of time based operators including - onePer, xPer, jitter, debounce, timestamp & elapsed.

Jitter

Jitter introduces a jitter into the processing of each element, a random delay up to the max threshold specified by the user. .jitter operator

ReactiveSeq.fromIntStream(IntStream.range(0, 1000))
                .map(it -> System.currentTimeMillis())
                .jitter(10_000l)
                .forEach(System.out::println);

//random wait up to 10 seconds between each value being printed
Example 50. jitter in simple-react

simple-react’s LazyFutureStream is a parellel implementation of ReactiveSeq

Fixed Delay Operator

Not to be confused with scheduling fixed delay, the fixed delay operator waits a specified amount of time before processing the next element, but does not require a ScheduledExecutorService and does not create a HotStream, the per element delay is implemented on the Stream’s executing thread when a terminal operation is invoked.

Example 51. fixed delay operator
ReactiveSeq.fromIntStream(IntStream.range(0, 1000))
                .fixedDelay(1l, TimeUnit.MICROSECONDS)
                                .forEach(System.out::println)

//wait 1 second between each value being printed
Example 52. fixed delay in simple-react

simple-react’s LazyFutureStream is a parellel implementation of ReactiveSeq

onePer operator

onePer ensures that only one element is emitted per time period, data is not lost, but rather queued and will be emitted when the next time gate opens. For an operator that drops data see debounce.

Example 53. onePer operator
ReactiveSeq.iterate(0, it -> it + 1)
                .skip(100)
                .onePer(1, TimeUnit.MICROSECONDS)
                .map(seconds -> "hello!")
                .peek(System.out::println)
                .toList();

//one value emitted per second

simple-react’s LazyFutureStream is a parellel implementation of ReactiveSeq

Tip

The xPer operator works in a similar fashion but allows only a specified number of elements through per time period. The elements will be emitted as soon as they are available, which may cause the emissions to bunch at the start of the time period.

debounce

Debounce accepts only one value per time period specified, dropping all other elements that pass through during each alloted time bucket. It acts in contrast to onePer, which doesn’t drop data but leaves it queued to travel onwards once the time deadline is reached.

Example 54. debounce operator
ReactiveSeq.of(1,2,3,4,5,6)
        .debounce(1000,TimeUnit.SECONDS).toList();

// 1
Example 55. debounce in simple-react

simple-react’s LazyFutureStream is a parellel implementation of ReactiveSeq

Timestamp

The timestamp operator maps the elements in the Stream into a Tuple2 containing the element and the timestamp at which it past through the timestamp operator. .timestamp operator

ReactiveSeq.of(1,2,3,4,5)
          .timestamp()

//[1,timestampInMillis],[2,timestampInMillis],[3,timestampInMillis] etc

Elapsed

The elasped operator maps the elements in the Stream into a Tuple2 containing the element and the elapsed time between each emission

Example 56. elapsed operator
ReactiveSeq.of(1,2,3,4,5).elapsed().noneMatch(value->value._2()<0)
=======


=== Zipping

Zipping Streams involves merging elements from multiple Streams into a single Stream of the same number of elements as the smallest Stream to be zipped.
[TIP]

If you are zipping Streams of unequal length and don’t want to lose elements, use zip in conjunction with concat, cycle and limitUntil to cycle a series of end marker elements at the end of each Stream.

Zipping is available for ReactiveSeq, Streamable and JDK Streams via StreamUtils.

==== Zip two Streams

The zip method zips two Streams and returns a ReactiveSeq (or Stream) contain a Stream of Tuple2 elements where one element in the tuple comes from one Stream and the other from the other. .zipping two Streams

ReactiveSeq.of(1,2,3,4,5,6)
         .zip(ReactiveSeqof(100,200,300,400))
         .toList();

//[(1, 100), (2, 200), (3, 300), (4, 400)]

simple-react’s LazyFutureStream is a parellel implementation of ReactiveSeq

The zip methods inherited from jOOλ that ReactiveSeq overrides only accept Seq implementations (which ReactiveSeq extends), if you want to use a JDK 8 Stream or BaseStream see the zipStream methods instead.

==== Zipping with a custom zipper

A number of the cyclops-react zip operators allow a custom zipper to be supplied (typically a BiFunction that allows users to determine how the Streams should be merged).

Stream<List<Integer>> zipped = StreamUtils.zipSequence(Stream.of(1,2,3)
                        ,ReactiveSeq.of(2,3,4),
                            (a,b) -> Arrays.asList(a,b));


//Stream[List[1,2],List[2,3],List[3,4]]

==== Zip three Streams

ReactiveSeq.of(1,2,3,4,5,6)
         .zip3(ReactiveSeq.of(100,200,300,400),ReactiveSeq.of('a','b','c'))
         .toList();
//[(1, 100, a), (2, 200, b), (3, 300, c)]

==== Zip four Streams

ReactiveSeq.of(1,2,3,4,5,6)
         .zip4(ReactiveSeq.of(100,200,300,400),ReactiveSeq.of('a','b','c'),ReactiveSeq.of("hello","world"))
         .toList();
//[(1, 100, a, hello), (2, 200, b, world)]

==== Unzip

The unzip methods take a Stream containing tuples and convert them into a Tuple containing Streams.

ReactiveSeq.unzip(ReactiveSeq.of(Tuple.tuple(1, "a"), Tuple.tuple(2, "b"), Tuple.tuple(3, "c")));

//Tuple2[ReactiveSeq[1,2,3],ReactiveSeq[a,b,c]]

==== zipWithIndex

zipWithIndex creates a Stream of Tuple2 instances, each Tuple2 contains an element from the Stream and it’s 0 bound index.

ReactiveSeq.of('a','b','c')
         .zipWithIndex()

//ReactiveSeq[Tuple['a',0],Tuple['b',1],Tuple['c',2]]
====
.zipWithIndex in simple-react
====
simple-react's LazyFutureStream is a parellel implementation of ReactiveSeq

video::v=aTFz4lhHE-M[youtube]
====


=== Efficient reversal

cyclops-react provides methods to reverse a Stream and other functions that take advantage of reversed order (such as foldRight). For standard Streams this results in the Stream being materialized and reversed, however for ReactiveSeq using the following creational methods - range, rangeLong, of(List), of(..values) all result in Sequences that can be efficiently reversed (and used in scanRight, foldRight etc).

.creating a ReactiveSeq for efficient reversal
====
```java
ReactiveSeq.range(0,Integer.MAX_VALUE);

List list;
ReactiveSeq.fromList(list);

ReactiveSeq.of(1,2,3)
        .reverse()
        .forEach(System.out::println);
```

====
.efficient reversal with a range
====
This also works with rangeLong

[source,java]

ReactiveSeq.range(0,10).skip(8).reverse()

====
.efficient reversal at creation
====
[source,java]

ReactiveSeq.reversedOf(1,2) .toList()

====
.efficient reversal from a List
====
[source,java]

List list= Arrays.asList(1,2);

ReactiveSeq.reversedListOf(list) .toList()

====

=== skip / skip (take / drop) / cycle

The JDK Streams api has operators skip and skip as of Java 8. The naming of these operators is relatively unusual compared with other languages where take / drop is more common. JDK 9 looks set to introduce new operators such as takeWhile & dropWile (maintaining the old skip and skip operators also). cyclops-react offers many of these operators already, although we currently extend (like jOOλ) the JDK 8 naming convention and use limitWhile and skipWhile.

==== LimitTime

The limitTime operator takes values from the Stream while time elapsed is less than the time specified in the method parameter.

.skip time
====
[source,java]

ReactiveSeq.range(1,1_000_000) .peek(i→sleep(i*100)) .skip(1000,TimeUnit.MILLISECONDS) .toList()

====

==== SkipTime

The skipTime operator drops elements from the Stream until the specified time period has elapsed.

.skip time
====
[source,java]

ReactiveSeq.range(1,Integer.MAX_VALUE) .peek(i→sleep(i*100)) .skip(1000,TimeUnit.MILLISECONDS) .toList()

====

==== SkipLast

Skip (drop) the specified number of entries from the end of the stream

.skipLast
====
[source,java]

ReactiveSeq.of(1,2,3,4,5) .skipLast(2) .collect(Collectors.toList());

====
==== LimitLast

Take (include) the last x elements.

[NOTE]
====
The english name limitLast is much less informative than the equiavlent takeLast, this is likely why the naming convention is changing in JDK 9 even at the cost of inconistency.
====

.limitLast
====
[source,java]

ReactiveSeq.of(1,2,3,4,5) .limitLast(2) .collect(Collectors.toList())

====
==== SkipWhile

SkipWhile drops elements from the Stream while the predicate holds, once the predicte returns true all subsequent elements are included

.skipWhile
====
[source,java]

ReactiveSeq.of(1, 2, 3, 4, 5,1).skipWhile(i→i<5);

====
==== LimitWhile

Take elements from the Stream while the predicate holds, once the predicate returns false all subsequent elements are excluded

.limitWhile
====
[source,java]

ReactiveSeq.of(1, 2, 3, 4, 5,6).limitWhile(i→i<5);

====

==== SkipUntil

Drop elements from the Stream until the predicate returns true, after which all elements are included.

.skipUntil
====
[source,java]

ReactiveSeq.of(1, 2, 3, 4, 5).skipUntil(i→i==4);

====
==== LimitUntil

Take elements from the Stream until the predicate returns true, after which all elements are excluded.

.limitUntil
====
[source,java]

ReactiveSeq.of(1, 2, 3, 4, 5).limitWhile(i→i==4);

====

==== Cycle

Repeat the Stream infinitely
.cycle
====
[source,java]

ReactiveSeq.of(1).cycle().skip(6).toList());

====
==== Cycle Times

The cycle operator repeats the Stream the specified number of times.

.cycle (times)
====
[source,java]

ReactiveSeq.of(1,2,2) .cycle(3) .collect(Collectors.toList());

====
==== Cycle Until

Cycle until repeats the Stream until the predicate holds

.cycleUntil
====
[source,java]

MutableInt count =MutableInt.of(0); ReactiveSeq.of(1,2,2) .cycleUntil(next → count.get()>6) .peek(i→ count.mutate(i→i+1)) .collect(Collectors.toList());

====
==== Cycle While

Cycle while repeats the Stream wgile the predicate holds

.cycleWhile
====
[source,java]

MutableInt count =MutableInt.of(0); ReactiveSeq.of(1,2,2) .cycleWhile(next → count++<6) .collect(Collectors.toList());

====

==== Cycle Monoid

Convert to a Stream with the result of a reduction operation repeated specified times.

[NOTE]
====
Monoid is a term from category theory. In Java the signature of Stream reduce is a monoid. In cyclops-react the Monoid class is used to encapsulate the identity value and the accumulating function. There is a Reducers class which has some useful Monoid instances for Integer addition / multiplication, String concatonation etc.
====

.cycleMonoid
====
In this example we count the number of elements in the Stream and then repeat it 4 times
[source,java]

List<Integer> list = ReactiveSeq.of(1,2,2)) .cycle(Reducers.toCountInt(),4) .collect(Collectors.toList());

====

=== flatMap operators / flatten

In addition to inhertiting flatMap from Stream, crossJoin, leftOuterJoin and innerJoin from jOOλ, cyclops-react offers a number of additional flatMap methods that accept a Function that returns a value that can be converted (implicitly)  to Stream.

==== FlatMapFile

The flatMapFile operator Streams the content of the returned File as a String. It is syntax sugar for loading the File to a Stream of Strings inside the function provided to the standard Stream flatMap method.

.flatMapFile
====
[source,java]

file://input.file ={ hello world } ReactiveSeq.of("input.file") .map(getClass().getClassLoader()::getResource) .peek(System.out::println) .map(URL::getFile) .flatMapFile(File::new) .toList();

==== FlatMapURL

The flatMapURL operator Streams the content of the returned URL as a String. It is syntax sugar for loading the URL to a Stream of Strings inside the function provided to the standard Stream flatMap method.

ReactiveSeq.of("input.file")
     .flatMapURL(getClass().getClassLoader()::getResource)
     .toList();
//List["hello","world"]

==== FlatMapCharSequence

The flatMapCharSequence converts the returned CharSequence (such as a String) to a Stream<Characters> inside the flatMap function.

ReactiveSeq.of("input.file")
     .flatMapCharSequence(i->"hello world")
     .toList()
//List['h','e','l','l','o',' ','w','o','r','l','d']

==== FlatMapBufferedReader

The flatMapBufferedReader operator Streams the content of the returned BufferedReader as a String. It is syntax sugar for loading data from the BufferedReader to a Stream of Strings inside the function provided to the standard Stream flatMap method.

ReactiveSeq.of("input.file")
     .map(getClass().getClassLoader()::getResourceAsStream)
     .map(InputStreamReader::new)
     .flatMapBufferedReader(BufferedReader::new)
     .toList()
//List["hello","world"]

==== FlatMapOptional

The flatMapOptional operator converts the returned Optional into a Stream. An zero Optional becomes and zero Stream, and an Optional with one value becomes a Stream with one value.

ReactiveSeq.of(1,2,3,null)
     .flatMapOptional(Optional::ofNullable)
     .collect(Collectors.toList())
//List[1,2,3]

==== FlatMapCompletableFuture

The flatMapCompletableFuture operator converts the returned CompletableFuture into a Stream, by calling the join method. A successfully completed CompletableFuture will become a Stream of one entry, and a failed CompletableFuture will become an zero Stream.

Think about how you start your CompletableFutures, creating them inside the function supplied to flatMap will likely result in syncrhonous blocking behaviour. This is likely only to be truly useful if you can transform futures that have already been kicked off earlier, inside your Stream (perhaps by calling thenApply / thenConsumer inside your flatMap function).

ReactiveSeq.of(1,2,3)
     .flatMapCompletableFuture(i->CompletableFuture.completedFuture(i+2))
    .collect(Collectors.toList())
//List[1,2,3]

ReactiveSeq.of(1,2,3,null)
     .flatMapCollection(i->Arrays.asList(1,2,i))
     .collect(Collectors.toList())
//List[1,2,1,1,2,2,1,2,3]

==== FlatMapCollection

The flatMapCollection operator provides syntax sugger over calling collection.stream() inside your flatMap function. .flatMapCollection

ReactiveSeq.of(1,2,3,null)
     .flatMapCollection(i->Arrays.asList(1,2,i))
     .collect(Collectors.toList())
//List[1,2,1,1,2,2,1,2,3]

FlatMapAnyM

cyclops-react provides an AnyM class that can wrap any monad type (think Stream, Optional, CompletableFuture,List, Try, FeatureToggle and similar fluently flowing classes), and it can also convert any monad type to a Stream. This operator provides syntax sugar conversion inside flatMap from any monad type to a Stream.

Example 57. flatMapAnyM

This example flatMaps a Javaslang Array into a ReactiveSeq

ReactiveSeq.of(1,2,3)
     .flatMapAnyM(i->Javaslang.fromArray(Array.ofAll(i+1,i+2,i+3))
    .collect(Collectors.toList())
//List[2,3,4,3,4,5,4,5,6]

==== flatten The flatten operator flattens a nested Stream one level, importantly it will flatten any supported monad type (Optional, CompletableFuture, List, Set, Stream, Streamable and more).

flatten is not type safe, the same method is available whether the Stream is nested or not, and the client code determines the generic return parameter - which may or may not be accurate.

This example flatMaps a Javaslang Array into a ReactiveSeq

ReactiveSeq.of(Arrays.asList(1,2)).flatten();
//ReactiveSeq(1,  2);

ReactiveSeq.of(Optional.of(1)).flatten();
//ReactiveSeq(1)

==== CrossJoin

crossJoin (inherited from jOOλ) joins two Streams by pairing every possible combination of values from both Streams

ReactiveSeq.of("hello", "goodbye").crossJoin(SeqquenceM.of("world", "day"))

//ReactiveSeq[Tuple["hello", "world"], Tuple["hello", "day"],Tuple["goodbye", "world"], Tuple["goodbye", "day"]]

==== InnerJoin

The innerJoin operator (inherited from jOOλ) joins two Streams in a similar manner to crossJoin but allows a filtering BiPredicate to be applied.

ReactiveSeq<String> stream = SeqquenceM.of("world", "hello");
ReactiveSeq.of("hello", "goodbye").crossJoin(stream,(t, u) -> Objects.equals(t, u))

 //ReactiveSeq[Tuple["hello", "hello"]]

==== LeftOuterJoin

The leftOuterJoin retains all elements from the host ReactiveSeq and joins them with elements in the supplied Stream where the predicate matches, where the predicate fails null is used.

ReactiveSeq<String> stream = SeqquenceM.of("world", "hello");
ReactiveSeq.of("hello", "goodbye").crossJoin(stream,(t, u) -> Objects.equals(t, u))

 //ReactiveSeq[Tuple["hello", "hello"],Tuple["goodbye",null]]

==== RightOuterJoin

The rightOuterJoin retains all elements from the supplied ReactiveSeq and joins them with elements in the host Stream where the predicate matches, where the predicate fails null is used.

ReactiveSeq<String> stream = SeqquenceM.of("world", "hello");
ReactiveSeq.of("hello", "goodbye").crossJoin(stream,(t, u) -> Objects.equals(t, u))

 //ReactiveSeq[Tuple["hello", "hello"],Tuple[null,"world"]]

=== map operators (map / cast)

In addition to the map method in the JDK cyclops-react also provides a cast method (inherited from jOOλ)

==== Cast operator

StreamUtils.cast(Stream.of(1, "a", 2, "b", 3),Integer.class)
// throws ClassCastException

=== for-comprehensions

ReactiveSeq has a number of operators that make it easy to iterate simultanously over multiple Streams generating a new Stream in the process - these are the various overloading versions of forEach2 & forEach3.

Loop over two Streams (one containing 3 values another 10 to create a new Stream of 30 values)

ReactiveSeq.of(1,2,3)
                 .forEach2(a->IntStream.range(0,10),
                         a->b-> a+b)
                 .toList()

//List[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 2, 3, 4, 5, 6, 7, 8,
                         9, 10, 11, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12)]

We can also filter inside forEach2

ReactiveSeq.of(2,3)
                 .forEach3(a->IntStream.range(6,9),
                           a->b->IntStream.range(100,105),
                           a->b->c -> a==3,
                           a->b->c-> a+b+c)

//List[109, 110, 111, 112, 113, 110, 111, 112, 113, 114, 111, 112, 113, 114, 115]

==== See also

=== Empty Stream handling

cyclops-react provides a number of useful methods for dealing with the case of an zero Stream (3 of which are inherited from jOOλ - onEmpty, onEmptyThrow and onEmptyGet, and one new one onEmptySwitch).

==== onEmptySwitch operator

This operator allows users to switch to a different Stream lazily defined, if the current one is zero.

ReactiveSeq.of(4,5,6)
         .onEmptySwitch(()->ReactiveSeq.of(1,2,3))
        .toList()
//[4,5,6]
ReactiveSeq.of()
         .onEmptySwitch(()->ReactiveSeq.of(1,2,3))
        .toList()
//[1,2,3]

==== onEmpty operator

This operator allows users to convert to a single valued Stream, if the current one is zero.

ReactiveSeq.of(4,5,6)
         .onEmpty(1)
        .toList()
//[4,5,6]
ReactiveSeq.of()
         .onEmpty(1)
        .toList()
//[1]

==== onEmptyGet operator

This operator allows users to convert to a single valued Stream, with the value lazily supplied, if the current one is zero.

ReactiveSeq.of(4,5,6)
         .onEmptyGet(()->1)
        .toList()
//[4,5,6]
ReactiveSeq.of()
         .onEmptyGet(()->1)
        .toList()
//[1]

==== onEmptyThrow operator

This operator allows users to convert to throw a lazily created exception if the current Stream is zero.

ReactiveSeq.of(4,5,6)
         .onEmptyThrow(()->new RuntimeException("error"))
        .toList()
//[4,5,6]
ReactiveSeq.of()
         .onEmptyThrow(()->new RuntimeException("error"))
        .toList()
//RuntimeException("error");

=== Stream with a single value

cyclops-react provides the single and singleOptional operators allow users to validate that a Stream has a single value or provide a default (via Optional if not).

List<Footballer> players;
Goalkeeper goalie = ReactiveSeq.of(players)
                             .ofType(Goalkeeper.class)
                             .single();
KeyController critical = ReactiveSeq.of(suppliedPlugins)
                                  .ofType(KeyController.class)
                                  .singleOptional() //misconfigured if Optional.zero
                                  .orElse(safeModeController);

====
=== Filtering (filter / remove// ofType)

cyclops-react offers a number of filtering syntax sugare methods including ofType (inherited from jOOλ and remove)

==== OfType operator

Of Type filters the Stream keeping only those elements of the target type.

.ofType
====
[source,java]

ReactiveSeq.of(1, "a", 2, "b",3).ofType(Integer.class)

====

==== Remove operator

The reove operator removes all instances of the provided object

.remove
====
[source,java]

ReactiveSeq.of(1, 2,3).remove(2);

=== scanLeft / scanRight

==== scanLeft

scanLeft performs a non-terminal foldLeft-like operation where the elements in the Stream returned are the intermediate cumulative results. Like reduce and fold the signature of scan matches a Monoid, cyclops-react supports specifying Monoid instances as a parameter (see the Reducers class).

scanLeft starts from the left and applies the supplied function to each value, storing the intermediate cumulative results in the new Stream.

ReactiveSeq.of("a", "b", "c").scanLeft("", String::concat).toList()
//List("", "a", "ab", "abc")

ReactiveSeq.of("a", "ab", "abc").map(str -> str.length()).scanLeft(0, (u, t) -> u + t).toList(),
//List(0, 1, 3, 6)))

ReactiveSeq.of("a", "b", "c").scanLeft(Reducers.toString("")).toList()
//List("", "a", "ab", "abc")

ReactiveSeq.of("a", "ab", "abc").map(str -> str.length()).scanLeft(Reducers.toTotalInt()).toList()
//List(0, 1, 3, 6)));

==== scanRight

scanRight performs a non-terminal foldRight-like operation where the elements in the Stream returned are the intermediate cumulative results. Like reduce and fold the signature of scan matches a Monoid, cyclops-react supports specifying Monoid instances as a parameter (see the Reducers class).

scanRight starts from the middle and applies the supplied function to each value, storing the intermediate cumulative results in the new Stream.

scanRight can take advantage of cyclops-react Efficient Reversability for better performance.

ReactiveSeq.of("a", "b", "c").scanRight("", String::concat).toList()
//List("", "c", "bc", "abc")

ReactiveSeq.of("a", "ab", "abc").map(str -> str.length()).scanRight(0, (t, u) -> u + t).toList()
//List(0, 3, 5, 6)

ReactiveSeq.of("a", "b", "c").scanRight(Reducers.toString("")).toList()
//List("", "c", "bc", "abc")

ReactiveSeq.of("a", "ab", "abc").map(str -> str.length()).scanRight(Reducers.toTotalInt()).toList()
//List(0, 3, 5, 6)

=== Assertions

In addition to operators on java.util.stream.Stream like anyMatch, allMatch and noneMatch, cyclops-react offers operators such as xMatch, endsWith and startsWith.

==== EndsWith Operator

The ends with operator returns true if the Stream ends with the specified iterable or Stream, otherwise it returns false.

ReactiveSeq.of(1,2,3,4,5,6)
                .endsWith(Arrays.asList(5,6))
//true
ReactiveSeq.of(1,2,3,4,5,6)
                .endsWith(Stream.of(5,6))

//true

==== StartsWith Operator

The starts with operator returns true if the Stream starts with the specified iterable or Stream, otherwise it returns false.

ReactiveSeq.of(1,2,3,4,5,6)
                .startsWith(Arrays.asList(5,6))
//false
ReactiveSeq.of(1,2,3,4,5,6)
                .startsWith(Stream.of(1,2))

//true

==== xMatch operator

The xMatch operator returns true if the supplied predicate matches the supplied number of times.

ReactiveSeq.of(1,2,3,5,6,7).xMatch(3, i-> i>4 )
//true

=== foldLeft / foldRight / join / reduce

==== foldLeft

foldLeft performs a terminal reduction operation, that starts with an identity value and the start of the Stream, applying the identiy value and first value to a user supplied accumulation function, the second value is then applied to the result and so on until the end of the Stream when the acummulated result is returned. .foldLeft examples

Streamable.of("hello","world").foldLeft("",(a,b)->a+":"+b);
//"hello:world"

ReactiveSeq.of(1,2,3).foldLeft(0,(a,b)->a+b);
//6

StreamUtils.foldLeft(Stream.of(2,4,5),1,(a,b)->a*b));
//40

The Reducers class contains a number of cyclops-react Monoid instances, Monoid is a class that maps to the method signature of reduce / foldLeft / foldRight / scanLeft / scanRight. The Reducers class contains canned reduction operations for String concatonation, Immutable List concatonation, Numeric reduction etc.

ReactiveSeq.of("a","b","c").foldLeft(Reducers.toString(""))
//"abc"

The map to type operator accepts a cyclops-react Monoid instance, and uses the mapToType function on that interface to enforce the type accepted by the Monoid.

ReactiveSeq.of(1,2,3).foldLeftMapToType(Reducers.toString(""));
// "123"

==== join

join is a specialised reduction / foldLeft operation for String concatonation.

ReactiveSeq.of("hello","2","world","4").join(",");

ReactiveSeq.of("hello","2","world","4").reduce(Reducers.toString(",");
//",hello,2,world,4"

ReactiveSeq.of(1, 2, 3).join()
//"123"
ReactiveSeq.of(1, 2, 3).join(", ")
//"1, 2, 3"
ReactiveSeq.of(1, 2, 3).join("|", "^", "$")
"^1|2|3$"ReactiveSeq.of(1, 2, 3).join()
//"123"
ReactiveSeq.of(1, 2, 3).join(", ")
//"1, 2, 3"
ReactiveSeq.of(1, 2, 3).join("|", "^", "$")
"^1|2|3$"

==== foldRight

foldRight performs a terminal reduction operation, that starts with an identity value and the end of the Stream, applying the identiy value and the last value to a user supplied accumulation function, the second last value is then applied to the result and so on until the start of the Stream when the acummulated result is returned.

Streamable.of("hello","world").foldRight("",(a,b)->a+":"+b);
//"world:hello"

ReactiveSeq.of(1,2,3).foldRight(0,(a,b)->a+b);
//6

StreamUtils.foldRight(Stream.of(2,4,5),1,(a,b)->a*b));
//40

The Reducers class contains a number of cyclops-react Monoid instances, Monoid is a class that maps to the method signature of reduce / foldLeft / foldRight / scanLeft / scanRight. The Reducers class contains canned reduction operations for String concatonation, Immutable List concatonation, Numeric reduction etc.

ReactiveSeq.of("a","b","c").foldRight(Reducers.toString(""))
//"cba"

The map to type operator accepts a cyclops-react Monoid instance, and uses the mapToType function on that interface to enforce the type accepted by the Monoid.

ReactiveSeq.of(1,2,3).foldRightMapToType(Reducers.toString(""));
// "321"

==== Reduce

The reduce operator is a foldLeft like operator, with some functions inherited from java.util.stream.Stream.

The mapReduce operator incorporates a mapToType operation with reduction.

ReactiveSeq.of("hello","2","world","4").mapReduce(Reducers.toCountInt())
//4

ReactiveSeq.of("one","two","three","four").mapReduce(this::toInt,Reducers.toTotalInt())
//10

We can reduce a Stream using multiple monoids / reducers at once.

Monoid sum = Monoid.of(0,(a,b)->a+b);
Monoid mult = Monoid.of(1,(a,b)->a*b);
List<Integer> result = ReactiveSeq.of(1,2,3,4)).reduce(Arrays.asList(sum,mult) );
//List[10,24]

=== conversions

=== toList / toSet / toMap

The are are a number of convenience methods for creating collections.

ReactiveSeq.of(1,2,3).toList()
//List[1,2,3]

ReactiveSeq.of(1,2,3).toList(()-> new LinkedList())
//LinkedList[1,2,3]

ReactiveSeq.of(1,2,3,1,2,3).toSet()
//Set[1,2,3]

ReactiveSeq.of(1,2,3).toMap(v->"key:"+v,v->v)
//Map["key:1":1,"key:2":2,"key:3":3]

=== toLazyCollection / toConcurrentLazyCollection

cyclops-react provides operators to lazily create a collection from the Stream. The values are only pulled through the Stream as the lazy collection is used. toConcurrentLazyCollection synchronizes iteration through the underlying Stream.

Collection<Integer> col = ReactiveSeq.of(1,2,3,4,5)
                                            .peek(System.out::println)
                                            .toLazyCollection();

col.forEach(System.out::println);
Collection<Integer> col = ReactiveSeq.of(1,2,3,4,5)
                                   .peek(System.out::println)
                                   .toConcurrentLazyCollection();

col.forEach(System.out::println);

=== toCompletableFuture / toOptional

Optional<List<String>> stream = ReactiveSeq.of("hello","world")
                                                .toOptional();

//Optional[List["hello","world"]]

 CompletableFuture<List<String>> cf = ReactiveSeq.of("hello","world")
                                            .toCompletableFuture();

//CompletableFuture[List["hello","world"]]

=== anyM / toStreamable

The anyM operator wraps the ReactiveSeq in the cyclops-react anyM wrapper class that can provide a common api over any monad type.

A monad is a fluent, generic wrapper type (that obeys the monad laws) - like Stream, Optional, CompleteableFuture in Java as well as Try & FeatureToggle in cyclops-react.

AnyM keeps type information for the underlying data - so if we call anyM() on a ReactiveSeq of integers the result is an AnyM<Integer>. AnyM facilates writing common code that can accept any monad type.

The toStreamable operator converts the ReactiveSeq to a caching Streamable.

Streamable<String> caching = ReactiveSeq.of("hello","world")
                                                .toStreamable();

caching.forEach(System.out::println);
caching.forEach(System.out::println);

//can print the contents of the Stream twice.

AnyM<String> anyM = ReactiveSeq.of("hello","world")
                                            .anyM();


anyM.map(v->v+"!");
//AnyM[ReactiveSeq["hello!","world!"]]

==== Combinations & Permutations

Comintations & permutations return all the cominations and permutations of values within a Stream respectively.

These opertors make use of the caching streamable so is not suitable for infinte Streams.

Streamable.of(1,2,3).combinations(2)

//Streamable[Streamable[1,2],Streamable[1,3],Streamable[2,3]]
Streamable.of(1, 2, 3).permutations()

//Streamable[Streamable[1, 2, 3],Streamable[1, 3, 2], Streamable[2, 1, 3], Streamable[2, 3, 1], Streamable[3, 1, 2], Streamable[3, 2, 1]

=== Async terminal operations

The futureOperations operator takes an Executor, and returns the set of available asynchronous terminal operations, each of which returns a CompletableFuture. The Stream will be executed on a single thread from the supplied executor.

These methods are available via ReactiveSeq or to plain JDK 8 Streams via com.aol.cyclops2-react.streams.StreamUtils, for Javaslang Streams use com.aol.cyclops2-react.javaslang.streams.StreamUtils.

The available asynchronous terminal operations as of cylcops 7.1.0 are detailed in the FutureOperations javadoc

==== FutureOperations

Terminal operations can now all be called asynchronously e.g.

        CompletableFuture size = ReactiveSeq.of(1,2,3,4)
                                                          .futureOperations(exec)
                                                          .count();

Available operations

  • public CompletableFuture<List<T>> toList()

Asynchronously perform a mutable reduction to a JDK List

 CompletableFuture<List<Data>> myList = ReactiveSeq.of(1,2,3,4)
                                                    .map(this::loadFromDb)
                                                       .futureOperations(getExecutor())

                                                    .toList();
  • public CompletableFuture<Set<T>> toSet()

Asynchronously perform a mutable reduction to a JDK Set

CompletableFuture<Set<Data>> myList = ReactiveSeq.of(1,2,3,4)
                                                            .map(this::loadFromDb)
                                                            .futureOperations(getExecutor())
                                                            .toSet();
  • public <U extends Comparable<U>> CompletableFuture<Optional<T>> minBy(Function<T, U> function) Asynchronously capture the minimum value in this stream using the provided function

    CompletableFuture<Optional> min =  ReactiveSeq.of(1, 2, 3, 4, 5, 6)
                                                                      .futureOperations(exec)
                                                                      .minBy(t -> Math.abs(t - 5));
    //min CompletableFuture[Optional[5]]  //5-5 =0
    
    * public <U extends Comparable<U>> CompletableFuture<Optional<T>>
    maxBy(Function<T, U> function) Asynchronously capture the maximum value
    in this stream using the provided function
    
    CompletableFuture<Optional> max =  ReactiveSeq.of(1, 2, 3, 4, 5, 6)
                                                                      .futureOperations(exec)
                                                                      .maxBy(t -> Math.abs(t - 5));
    //min CompletableFuture[Optional[1]]  //Math.abs(1-5) =4
    
    * public <R, A> CompletableFuture<R> collect(Collector<? super T, A, R>
    collector) Asynchronously perform a Stream collection ```java
    CompletableFuture> list = ReactiveSeq.of(1,2,3,4,5)
    .futureOperations(exec) .collect(Collectors.toList());
    
    //CompletableFuture[1,2,3,4,5]