Skip to content

Latest commit

 

History

History
81 lines (68 loc) · 4.15 KB

File metadata and controls

81 lines (68 loc) · 4.15 KB

Similarities and differences with the Java Stream API

The API shares a lot of similarities with the Java Stream API. This similarity has been done on purpose to ease the adoption of the API. However, there are some differences and this section highlights them.

Asynchronous processing

The goal of the Reactive Stream Operators specification is to define building blocks to enable the implementation of asynchronous processing of stream of data. On the other hand, the Java Stream API provides a synchronous approach to compute a result by analyzing data conveyed in a stream. Because of this asynchronous vs. synchronous processing, the terminal stages (such as collect, findFirst…​) define by this API return CompletableStage<T> and not T. Indeed, only when the result has been computed the returned CompletableStage is completed. As an example, here is the two versions of the same processing:

List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

// Java Stream version
int sum = list.stream()
  .map(i -> i + 1)
  .mapToInt(i -> i)
  .sum();
// At the point the sum is computed
System.out.println("sum: " + sum);

// Reactive Streams Operators version
CompletionStage<Integer> future = ReactiveStreams.fromIterable(list)
  .map(i -> i + 1)
  .collect(Collectors.summingInt(i -> i))
  .run();
future.whenComplete((res, err) -> System.out.println("async sum: " + res));

The asynchronous vs. synchronous difference also means that the error propagation works differently. In the Java Streams API, the processing can be wrapped in a try/catch construct. In the asynchronous case, the error is propagated into the returned future. In the example above, the function passed to the whenComplete stage receives the result as well as the failure (if any). If the processing throws an exception, the function can react by looking at the err parameter.

No parallel processing

The Reactive Streams specification is intrinsically sequential. So none of the parallel processing ability from the Java Stream API are supported. As a consequence, the API does not provide a parallel​() method. Also, operations like findAny are not provided as the behavior would be equivalent to the provided findFirst method.

Other differences

  • allMatch, anyMatch and nonMatch can be achieved by combining filter and findFirst

  • collect(Collector<? super T,A,R> collector) - the combiner part of the collector is not used because of the sequential nature of Reactive Streams.

  • collect(Supplier<R> supplier, BiConsumer<R,? super T> accumulator, BiConsumer<R,R> combiner) is provided as collect (Supplier<R> supplier, BiConsumer<R,? super T> accumulator). Indeed, the combiner is not used because of the sequential nature of Reactive Streams.

  • count is not provided but can be implemented using .collect(Collectors.counting()) instead.

  • findAny is not supported, use findFirst instead. Because of the sequential nature of Reactive Streams, the method has the same semantic.

  • flatMapTo and mapTo are not provided. These can easily be replaced using regular flatMap and map methods, or methods from Collectors.

  • forEachOrdered is not provided as Reactive Streams mandates ordering. So forEach should be used instead.

  • max and min can be achieved using .collect(Collectors.maxBy(…​)) and .collect(Collectors.minBy(…​))

  • sorted is not supported

  • toArray is not supported, toList can be used instead