Permalink
Find file
41fb244 Apr 3, 2016
@Froussios @Tiltman @FranciscoE-Hudl
491 lines (390 sloc) 17.3 KB

Transformation of sequences

In this chapter we will see ways of changing the format of the data. In the real world, an observable may be of any type. It is uncommon that the data is already in format that we want them in. More likely, the values need to be expanded, trimmed, evaluated or simply replaced with something else.

This will complete the three basic categories of operations. map and flatMap are the fundamental methods in the third category. In literature, you will often find them refered to as "bind", for reasons that are beyond the scope of this guide.

  • Ana(morphism) T --> Observable<T>
  • Cata(morphism) Observable<T> --> T
  • Bind Observable<T1> --> Observable<T2>

In the last chapter we introduced an implementation of Subscriber for convenience. We will continue to use it in the examples of this chapter.

class PrintSubscriber extends Subscriber{
    private final String name;
    public PrintSubscriber(String name) {
        this.name = name;
    }
    @Override
    public void onCompleted() {
        System.out.println(name + ": Completed");
    }
    @Override
    public void onError(Throwable e) {
        System.out.println(name + ": Error: " + e);
    }
    @Override
    public void onNext(Object v) {
        System.out.println(name + ": " + v);
    }
}

map

The basic method for transformation is map (also known as "select" in SQL-inspired systems like LINQ). It takes a transformation function which takes an item and returns a new item of any type. The returned observable is composed of the values returned by the transformation function.

public final <R> Observable<R> map(Func1<? super T,? extends R> func)

In the first example, we will take a sequence of integers and increase them by 3

Observable<Integer> values = Observable.range(0,4);

values
    .map(i -> i + 3)
    .subscribe(new PrintSubscriber("Map"));

Output

Map: 3
Map: 4
Map: 5
Map: 6
Map: Completed

This was something we could do without map, for example by using Observable.range(3,4). In the following, we will do something more practical. The producer will emit numeric values as a string, like many UIs often do, and then use map to convert them to a more processable integer format.

Observable<Integer> values = 
        Observable.just("0", "1", "2", "3")
            .map(Integer::parseInt);

values.subscribe(new PrintSubscriber("Map"));

Output

Map: 0
Map: 1
Map: 2
Map: 3
Map: Completed

This transformation is simple enough that we could also do it on the subscriber's side, but that would be a bad division of responsibilities. When developing the side of the producer, you want to present things in the neatest and most convenient way possible. You wouldn't dump the raw data and let the consumer figure it out. In our example, since we said that the API produces integers, it should do just that. Tranfomation operators allow us to convert the initial sequences into the API that we want to expose.

cast and ofType

cast is a shorthand for the transformation of casting the items to a different type. If you had an Observable<Object> that you knew would only only emit values of type T, then it is just simpler to cast the observable, rather than do the casting in your lambda functions.

Observable<Object> values = Observable.just(0, 1, 2, 3);

values
    .cast(Integer.class)
    .subscribe(new PrintSubscriber("Map"));

Output

Map: 0
Map: 1
Map: 2
Map: 3
Map: Completed

The cast method will fail if not all of the items can be cast to the specified type.

Observable<Object> values = Observable.just(0, 1, 2, "3");

values
    .cast(Integer.class)
    .subscribe(new PrintSubscriber("Map"));

Output

Map: 0
Map: 1
Map: 2
Map: Error: java.lang.ClassCastException: Cannot cast java.lang.String to java.lang.Integer

If you would rather have such cases ignored, you can use the ofType method. This will filter our items that cannot be cast and then cast the sequence to the desired type.

Observable<Object> values = Observable.just(0, 1, "2", 3);

values
    .ofType(Integer.class)
    .subscribe(new PrintSubscriber("Map"));

Output

Map: 0
Map: 1
Map: 3
Map: Completed

timestamp and timeInterval

The timestamp and timeInterval methods enable us to enrich our values with information about the asynchronous nature of sequences. timestamp transforms values into the Timestamped<T> type, which contains the original value, along with a timestamp for when the event was emitted.

public final Observable<Timestamped<T>> timestamp()

Here's an example:

Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);

values.take(3)
    .timestamp()
    .subscribe(new PrintSubscriber("Timestamp"));

Output

Timestamp: Timestamped(timestampMillis = 1428611094943, value = 0)
Timestamp: Timestamped(timestampMillis = 1428611095037, value = 1)
Timestamp: Timestamped(timestampMillis = 1428611095136, value = 2)
Timestamp: Completed

The timestamp allows us to see that the items were emitted roughly 100ms apart (Java offers few guarantees on that).

If we are more interested in how much time has passed since the last item, rather than the absolute moment in time when the items were emitted, we can use the timeInterval method.

public final Observable<TimeInterval<T>> timeInterval()

Using timeInterval in the same sequence as before:

Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);

values.take(3)
    .timeInterval()
    .subscribe(new PrintSubscriber("TimeInterval"));

Output

TimeInterval: TimeInterval [intervalInMilliseconds=131, value=0]
TimeInterval: TimeInterval [intervalInMilliseconds=75, value=1]
TimeInterval: TimeInterval [intervalInMilliseconds=100, value=2]
TimeInterval: Completed

The information captured by timestamp and timeInterval is very useful for logging and debugging. It is Rx's way of acquiring information about the asynchronicity of sequences.

materialize and dematerialize

Also useful for logging is materialize. materialize transforms a sequence into its metadata representation.

public final Observable<Notification<T>> materialize()

The notification type can represent any event, i.e. the emission of a value, an error or completion. Notice in the marble diagram above that the emission of "onCompleted" did not mean the end of the sequence, as the sequence actually ends afterwards. Here's an example

Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);

values.take(3)
    .materialize()
    .subscribe(new PrintSubscriber("Materialize"));

Output

Materialize: [rx.Notification@a4c802e9 OnNext 0]
Materialize: [rx.Notification@a4c802ea OnNext 1]
Materialize: [rx.Notification@a4c802eb OnNext 2]
Materialize: [rx.Notification@18d48ace OnCompleted]
Materialize: Completed

The Notification type contains methods for determining the type of the event as well the carried value or Throwable, if any.

dematerialize will reverse the effect of materialize, returning a materialized observable to its normal form.

flatMap

map took one value and returned another, replacing items in the sequence one-for-one. flatMap will replace an item with any number of items, including zero or infinite items. flatMap's transformation method takes values from the source observable and, for each of them, returns a new observable that emits the new values.

public final <R> Observable<R> flatMap(Func1<? super T,? extends Observable<? extends R>> func)

The observable returned by flatMap will emit all the values emitted by all the observables produced by the transformation function. Values from the same observable will be in order, but they may be interleaved with values from other observables.

Let's start with a simple example, where flatMap is applied on an observable with a single value. values will emit a single value, 2. flatMap will turn it into an observable that is the range between 0 and 2. The values in this observable are emitted in the final observable.

Observable<Integer> values = Observable.just(2);

values
    .flatMap(i -> Observable.range(0,i))
    .subscribe(new PrintSubscriber("flatMap"));

Output

flatMap: 0
flatMap: 1
flatMap: Completed

When flatMap is applied on an observable with multiple values, each value will produce a new observable. values will emit 1, 2 and 3. The resulting observables will emit the values [0], [0,1] and [0,1,2], respectively. The values will be flattened together into one observable: the one that is returned by flatMap.

Observable<Integer> values = Observable.range(1,3);

values
    .flatMap(i -> Observable.range(0,i))
    .subscribe(new PrintSubscriber("flatMap"));

Output

flatMap: 0
flatMap: 0
flatMap: 1
flatMap: 0
flatMap: 1
flatMap: 2
flatMap: Completed

Much like map, flatMap's input and output type are free to differ. In the next example, we will transform integers into Character

Observable<Integer> values = Observable.just(1);

values
    .flatMap(i -> 
        Observable.just(
            Character.valueOf((char)(i+64))
    ))
    .subscribe(new PrintSubscriber("flatMap"));

This hasn't helped us more than the map operator. There is one key difference that we can exploit to get more out of the flatMap operator. While every value must result in an Observable, nothing prevents this observable from being empty. We can use that to silenty filter the sequence while transforming it at the same time.

Observable<Integer> values = Observable.range(0,30);

values
    .flatMap(i -> {
        if (0 < i && i <= 26)
            return Observable.just(Character.valueOf((char)(i+64)));
        else
            return Observable.empty();
    })
    .subscribe(new PrintSubscriber("flatMap"));

Output

flatMap: A
flatMap: B
flatMap: C
...
flatMap: X
flatMap: Y
flatMap: Z
flatMap: Completed

This example results in the entire alphabet being printed without errors, even though the initial range exceeds that of the alphabet.

In our examples for flatMap so far, the values where in sequence: first all the values from the first observable, then all the values from the second observable. Though this seems intuitive, especially when coming from a synchronous environment, it is important to note that this is not always the case. The observable returned by flatMap emits values as soon as they are available. It just happened that in our examples, all of the observables had all of their values ready synchronously. To demonstrate, we construct asynchronous observables using the interval method.

Observable.just(100, 150)
    .flatMap(i ->
        Observable.interval(i, TimeUnit.MILLISECONDS)
            .map(v -> i)
    )
    .take(10)
    .subscribe(new PrintSubscriber("flatMap"));

We started with the values 100 and 150, which we used as the interval period for the asynchronous observables created in flatMap. Since interval emits the numbers 1,2,3... in both cases, to better distinguish the two observables, we replaced those values with interval time that each observable operates on.

Output

flatMap: 100
flatMap: 150
flatMap: 100
flatMap: 100
flatMap: 150
flatMap: 100
flatMap: 150
flatMap: 100
flatMap: 100
flatMap: 150
flatMap: Completed

We can see that the two observables are interleaved into one.

concatMap

Even though flatMap shares its name with a very common operator in functional programming, we saw that it doesn't behave exactly like a functional programmer would expect. flatMap may interleave the supplied sequences. There is an operator that won't interleave the sequences and is called concatMap, because it is related to the concat operator that we will see later.

Observable.just(100, 150)
    .concatMap(i ->
        Observable.interval(i, TimeUnit.MILLISECONDS)
            .map(v -> i)
            .take(3))
    .subscribe(
        System.out::println,
        System.out::println,
        () -> System.out.println("Completed"));

Output

100
100
100
150
150
150
Completed

We can see in the output that the two sequences are kept separate. Note that the concatMap operator only works with terminating sequences: it can't move on to the next sequence before the current sequence terminates. For that reason, we had to limit interval's infinite sequence with take.

flatMapIterable

flatMap and concatMap flatten a sequence of observables, as produced by their selector function, into one observable. We can also flatten a sequence of iterables with flatMapIterable. This is similar to flatMap, only our selector function creates iterables instead.

Consider, instead of Observable.range, a function that produces the range as an iterable.

public static Iterable<Integer> range(int start, int count) {
    List<Integer> list = new ArrayList<>();
    for (int i=start ; i<start+count ; i++) {
        list.add(i);
    }
    return list;
}
Observable.range(1, 3)
    .flatMapIterable(i -> range(1, i))
    .subscribe(System.out::println);

Output

1
1
2
1
2
3

As expected, the 3 iterables that we created are flattened in a single observable sequence.

As an Rx developer, you are advised to present your data as observable sequences and avoid mixing observables with iterables. However, when your data is already in the format of a collection, e.g. because standard Java operations returned them like that, it can be simpler or faster to just use them as they are without converting them first. flatMapIterable also eliminates the need to make a choice about interleaving or not: flatMapIterable doesn't interleave, just like you would expect from a synchronous flatMap.

There is a second overload to flatMapIterable that allows you to combine every value in the iterable with the value that produced the iterable.

Observable.range(1, 3)
    .flatMapIterable(
        i -> range(1, i),
        (ori, rv) -> ori * (Integer) rv)
    .subscribe(System.out::println);

Output

1
2
4
3
6
9

Here, we multiplied every value in the iterable range with the value that seeded the range: [1*1], [1*2, 2*2], [1*3, 2*3, 3*3].

Java lacks a way to do map on its standard collections. It is therefore impossible to transform the iterable before the seeding value disappears (here, the i in i -> range(1, i)). Here, our iterable is just a list, so we could have just modified the iterable before returning it. However, if our iterable isn't a collection, we would have to either implement a map for iterables ourselves, or manually collect the modified values into a new collection and return that. This overload of flatMapIterable saves us from having to insert this ugliness in the middle of our pipeline.

The concept of laziness isn't very common in Java, so you may be confused as to what kind of iterable isn't a collection. For the sake of example, consider the following iterable that generates a range lazily. It allows us to iterate over a range by calculating the next value from the previous one. In this way, we save the memory of storing the whole range.

public static class Range implements Iterable<Integer> {

    private static class RangeIterator implements Iterator<Integer> {

        private int next;
        private final int end;

        RangeIterator(int start, int count) {
            this.next = start;
            this.end = start + count;
        }

        @Override
        public boolean hasNext() {
            return next < end;
        }

        @Override
        public Integer next() {
            return next++;
        }

    }

    private final int start;
    private final int count;

    public Range(int start, int count) {
        this.start = start;
        this.count = count;
    }

    @Override
    public Iterator<Integer> iterator() {
        return new RangeIterator(start, count);
    }

}

We can now iterate over a range and transform it without the need to store anything.

Observable.range(1, 3)
    .flatMapIterable(
        i -> new Range(1, i),
        (ori, rv) -> ori * (Integer) rv)
    .subscribe(System.out::println);

Output

1
2
4
3
6
9

Continue reading

Previous Next
Aggregation Chapter 3 - Taming the sequence