Permalink
Switch branches/tags
Nothing to show
Find file Copy path
677 lines (558 sloc) 21.9 KB

Combining sequences

So far, we've seen most of the methods that allow us to create a sequence and transform it into what we want. However, most applications will have more than one source of input. We need a way a of combining sequences. We've already seen a few sequences that use more than one observable. In this chapter, we will see the most important operators that use multiple sequences to produce one.

Concatenation

The most straight-forward combination of sequences is to have one run after the other.

concat

The concat operator concatenates sequences one after the other. There are many overloads to concat, which allow you to provide source observables in different numbers and formats.

public static final <T> Observable<T> concat(
    Observable<? extends Observable<? extends T>> observables)
public static final <T> Observable<T> concat(
    Observable<? extends T> t1,
    Observable<? extends T> t2)
public static final <T> Observable<T> concat(Observable<? extends T> t1,
    Observable<? extends T> t2,
    Observable<? extends T> t3)
public static final <T> Observable<T> concat(Observable<? extends T> t1,
    Observable<? extends T> t2,
    Observable<? extends T> t3,
    Observable<? extends T> t4)
// All the way to 10 observables

Concatenating two (or more) given observables is straightforward.

Observable<Integer> seq1 = Observable.range(0, 3);
Observable<Integer> seq2 = Observable.range(10, 3);

Observable.concat(seq1, seq2)
	.subscribe(System.out::println);

Output

0
1
2
10
11
12

If the number of sequences to be combined is dynamic, you can provide an observable that emits the sequences to be concatenated. In this example, we will use our familiar groupBy to create a sequence that emits words starting with the same letter together.

Observable<String> words = Observable.just(
	"First",
	"Second",
	"Third",
	"Fourth",
	"Fifth",
	"Sixth"
);

Observable.concat(words.groupBy(v -> v.charAt(0)))
	.subscribe(System.out::println);

Output

First
Fourth
Fifth
Second
Sixth
Third

concat behaves like the flattening phase of concatMap. In fact, concatMap is an alias for applying map and then concat.

The concatWith operator is an alternative style of doing concat, which allows you to combine sequences one by one in a chain:

public void exampleConcatWith() {
	Observable<Integer> seq1 = Observable.range(0, 3);
	Observable<Integer> seq2 = Observable.range(10, 3);
	Observable<Integer> seq3 = Observable.just(20);
	
	seq1.concatWith(seq2)
		.concatWith(seq3)
		.subscribe(System.out::println);
}

Output

0
1
2
10
11
12
20

repeat

repeat allows you to concatenate a sequence after itself, either an infinite or a finite number of times. repeat doesn't cache the values to repeat them. When the time comes, it will start a new subscription and dispose of the old one.

public final Observable<T> repeat()
public final Observable<T> repeat(long count)

Its application is very simple

Observable<Integer> words = Observable.range(0,2);

words.repeat(2)
	.subscribe(System.out::println);

Output

0
1
0
1

repeatWhen

If you need more control than repeat gives, you can control when the repetition starts with the repeatWhen operator. The when is defined by an observable that you provide. When the original sequence completes, it waits for the handling observable to emit something (the value is irrelevant) and only then does it repeat. If the handling observable terminates, that means that the repetitions should stop.

It may be useful for the signal to know when a repetition has been completed. repeatWhen provides a special observable that emits void when a repetition terminates. You can use that observable to construct your signal.

public final Observable<T> repeatWhen(
    Func1<? super Observable<? extends java.lang.Void>,? extends Observable<?>> notificationHandler)

The argument of repeatWhen is a function that takes an observable and returns an observable. The types emitted by both objects do not matter. The input is the observable that signals the end of a repetition and the returned observable will be used to signal a restart.

In the next example, we create our version of repeat(n) using repeatWhen.

Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);

values
	.take(2)
	.repeatWhen(ob -> {
		return ob.take(2);
	})
	.subscribe(new PrintSubscriber("repeatWhen"));

Output

repeatWhen: 0
repeatWhen: 1
repeatWhen: 0
repeatWhen: 1
repeatWhen: Completed

Here the repetition happens immediately: ob emits when a repetition has ended, so the returned observable also emits right after a completed repetition. That signal the new repetition to begin.

In the next example, we create sequence that repeats every two seconds, forever.

Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);

values
	.take(5)
	.repeatWhen((ob)-> {
		ob.subscribe();
		return Observable.interval(2, TimeUnit.SECONDS);
	})
	.subscribe(new PrintSubscriber("repeatWhen"));

Note that the sequence repeats every 2 seconds regardless of when it completed. That's because we created an independent interval observable that sends a signal every 2 seconds. In the next chapter, [Time-shifted sequences](/Part 3 - Taming the sequence/5. Time-shifted sequences.md), we will see ways of dealing with sequences in time with more control.

Another thing to note is the ob.subscribe() statement, which appears to be useless. That is necessary because it forces ob to be created. In the current implementation of repeatWhen, if ob is not subscribed to, then repetitions never begin.

startWith

startWith takes a sequence and concatenates it before the observable it is applied to.

public final Observable<T> startWith(java.lang.Iterable<T> values)
public final Observable<T> startWith(Observable<T> values)
public final Observable<T> startWith(T t1)
public final Observable<T> startWith(T t1, T t2)
public final Observable<T> startWith(T t1, T t2, T t3)
// up to 10 values

Here an example

Observable<Integer> values = Observable.range(0, 3);

values.startWith(-1,-2)
	.subscribe(System.out::println);

Output

-1
-2
0
1
2

startWith is a shorthand for using concat with a just and our source sequence.

Observable.concat(
	Observable.just(-1,-2,-3),
	values)
// Same as
values.startWith(-1,-2,-3)

Concurrent sequences

Observables aren't always emitting values at predictable moments in time. We will now see some operators intended for combining sequences that emit values concurrently.

amb

ambtakes a number of observables and returns the one that emits a value first. The rest are discarded.

public static final <T> Observable<T> amb(
	java.lang.Iterable<? extends Observable<? extends T>> sources)
public static final <T> Observable<T> amb(
	Observable<? extends T> o1,
	Observable<? extends T> o2)
public static final <T> Observable<T> amb(
	Observable<? extends T> o1,
	Observable<? extends T> o2,
	Observable<? extends T> o3)
// Up to 10 observables

In the following example, amb will mirror the second observable, because it waits less to start.

Observable.amb(
		Observable.timer(100, TimeUnit.MILLISECONDS).map(i -> "First"),
		Observable.timer(50, TimeUnit.MILLISECONDS).map(i -> "Second"))
	.subscribe(System.out::println);

Output

Second

It's usefulness may be not be obvious

The amb feature can be useful if you have multiple cheap resources that can provide values, but latency is widely variable. For an example, you may have servers replicated around the world. Issuing a query is cheap for both the client to send and for the server to respond, however due to network conditions the latency is not predictable and varies considerably. Using the Amb operator, you can send the same request out to many servers and consume the result of the first that responds. -Lee Cambell www.introtorx.com

An alternative style of doing amb is the ambWith operator. ambWith allows you to combine the observables one by one in a chain. This is more convenient when using amb in the middle of a chain or operators.

Observable.timer(100, TimeUnit.MILLISECONDS).map(i -> "First")
	.ambWith(Observable.timer(50, TimeUnit.MILLISECONDS).map(i -> "Second"))
	.ambWith(Observable.timer(70, TimeUnit.MILLISECONDS).map(i -> "Third"))
	.subscribe(System.out::println);

Output

Second

merge

merge combines a set of observables into one. The resulting observable emits the values that the source observables emit, as they emit them. This means that values from different sequences can be mixed.

public static final <T> Observable<T> merge(
	java.lang.Iterable<? extends Observable<? extends T>> sequences)
public static final <T> Observable<T> merge(
	java.lang.Iterable<? extends Observable<? extends T>> sequences,
	int maxConcurrent)
public static final <T> Observable<T> merge(
	Observable<? extends Observable<? extends T>> source)
public static final <T> Observable<T> merge(
	Observable<? extends Observable<? extends T>> source,
	int maxConcurrent)
public static final <T> Observable<T> merge(
	Observable<? extends T> t1,
	Observable<? extends T> t2)
public static final <T> Observable<T> merge(
	Observable<? extends T> t1,
	Observable<? extends T> t2,
	Observable<? extends T> t3)
...

The many overloads are different ways of supplying a set of observables to merge. Here an example of what merge does

Observable.merge(
		Observable.interval(250, TimeUnit.MILLISECONDS).map(i -> "First"),
		Observable.interval(150, TimeUnit.MILLISECONDS).map(i -> "Second"))
	.take(10)
	.subscribe(System.out::println);

Output

Second
First
Second
Second
First
Second
Second
First
Second
First

The difference between concat and merge is that merge does not wait for the current observable to terminate before moving to the next. merge subscribes to every observable available to it and emits items as they come. In that way, merge is similar to the flattening part of flatMap.

Like other combinators that are static methods, merge has an alternative that allows you to merge sequences one by one in a chain. The operator is called mergeWith and the behaviour is the same. The following example has the same result as the one above.

Observable.interval(250, TimeUnit.MILLISECONDS).map(i -> "First")
	.mergeWith(Observable.interval(150, TimeUnit.MILLISECONDS).map(i -> "Second"))
	.take(10)
	.subscribe(System.out::println);

mergeDelayError

With merge, as soon as any of the source sequences fails, the merged sequence fails as well. An alternative to that behaviour is mergeDelayError, which will postpone the emission of an error and continue to merge values from sequences that haven't failed.

public static final <T> Observable<T> mergeDelayError(
	Observable<? extends Observable<? extends T>> source)
public static final <T> Observable<T> mergeDelayError(
	Observable<? extends T> t1,
        Observable<? extends T> t2)
public static final <T> Observable<T> mergeDelayError(
	Observable<? extends T> t1,
	Observable<? extends T> t2,
	Observable<? extends T> t3)
...

In the next example, we merge two observables which emit every 100ms. One fails early while the other observable continues to complete.

Observable<Long> failAt200 = 
	Observable.concat(
		Observable.interval(100, TimeUnit.MILLISECONDS).take(2),
		Observable.error(new Exception("Failed")));
Observable<Long> completeAt400 = 
	Observable.interval(100, TimeUnit.MILLISECONDS)
		.take(4);

Observable.mergeDelayError(failAt200, completeAt400)
	.subscribe(
		System.out::println,
		System.out::println);

Output

0
0
1
1
2
3
java.lang.Exception: Failed

In the beginning, both observables emit the same value. After value 1, the first sequence fails, and the merged sequence continues with values only from the second sequence.

When merging more than two sequences, the merged sequence will go on until all of the sources have terminated, successfully or with an error. If more than one sequences fail, the error in the merged sequence will be of type CompositeException

Observable<Long> failAt200 = 
Observable.concat(
	Observable.interval(100, TimeUnit.MILLISECONDS).take(2),
		Observable.error(new Exception("Failed")));
Observable<Long> failAt300 = 
	Observable.concat(
		Observable.interval(100, TimeUnit.MILLISECONDS).take(3),
		Observable.error(new Exception("Failed")));
Observable<Long> completeAt400 = 
	Observable.interval(100, TimeUnit.MILLISECONDS)
		.take(4);
		
Observable.mergeDelayError(failAt200, failAt300, completeAt400)
	.subscribe(
		System.out::println,
		System.out::println);

Output

0
0
0
1
1
1
2
2
3
rx.exceptions.CompositeException: 2 exceptions occurred. 

switchOnNext

The switchOnNext operator takes an observable that emits observables. The returned observable emits items from the most recent observable. As soon as a new observable comes, the old one is discarded and values from the newer one are emitted.

Observable.switchOnNext(
	Observable.interval(100, TimeUnit.MILLISECONDS)
		.map(i -> 
			Observable.interval(30, TimeUnit.MILLISECONDS)
				.map(i2 -> i)
		)
	)
	.take(9)
	.subscribe(System.out::println);

Output

0
0
0
1
1
1
2
2
2

This example may be a bit confusing. What we've done is creating an observable that creates a new observable every 100ms. Every created observable emits its number in the sequence every 30ms. After 100ms, each of those observables has had enough time to emit its number 3 times. Then a new observable is created, which causes them to be replaced by the new one.

switchMap

Where flatMap internally uses merge to combine the generated sequences and concatMap uses concat, there is switchMap to use switchOnNext for the flattening phase.

public final <R> Observable<R> switchMap(Func1<? super T,? extends Observable<? extends R>> func)

Every value from the source sequence is mapped through func to an observable. The values from the generated observable are emitted by the returned observable. Every time a new value arrives, func generates a new observable and switches to it, dropping the old one. The example we showed for switchOnNext can also be implemented with switchMap:

Observable.interval(100, TimeUnit.MILLISECONDS)
	.switchMap(i -> 
		Observable.interval(30, TimeUnit.MILLISECONDS)
			.map(l -> i))
	.take(9)
	.subscribe(System.out::println);

Output

0
0
0
1
1
1
2
2
2

Pairing sequences

So far, we've seen operators which, in one way or the other, flattened multiple sequences into one of the same type. The next operators put the source sequences side-by-side and use the values to create a composite value.

zip

zip is a very basic function out of functional programming. It takes two or more sequences and matches their values one-to-one by index. A function is required to combine the values. Unlike what you might expect from other environments, in RxJava zip doesn't default to combining all the values in a tuple.

In the next example, we have two sources that emit items at different rates.

Observable.zip(
		Observable.interval(100, TimeUnit.MILLISECONDS)
			.doOnNext(i -> System.out.println("Left emits " + i)),
		Observable.interval(150, TimeUnit.MILLISECONDS)
			.doOnNext(i -> System.out.println("Right emits " + i)),
		(i1,i2) -> i1 + " - " + i2)
	.take(6)
	.subscribe(System.out::println);

Output

Left emits 0
Right emits 0
0 - 0
Left emits 1
Right emits 1
Left emits 2
1 - 1
Left emits 3
Right emits 2
2 - 2
Left emits 4
Left emits 5
Right emits 3
3 - 3
Left emits 6
Right emits 4
4 - 4
Left emits 7
Right emits 5
Left emits 8
5 - 5

As we can see, zip matched values based on index.

zip has multiple overloads for zipping more than two sequences together.

public static final <R> Observable<R> zip(
    java.lang.Iterable<? extends Observable<?>> ws,
    FuncN<? extends R> zipFunction)
public static final <R> Observable<R> zip(
    Observable<? extends Observable<?>> ws,
    FuncN<? extends R> zipFunction)
public static final <T1,T2,R> Observable<R> zip(
    Observable<? extends T1> o1,
    Observable<? extends T2> o2,
    Func2<? super T1,? super T2,? extends R> zipFunction)
public static final <T1,T2,T3,R> Observable<R> zip(
    Observable<? extends T1> o1,
    Observable<? extends T2> o2,
    Observable<? extends T3> o3,
    Func3<? super T1,? super T2,? super T3,? extends R> zipFunction)
/// etc

When zipping more than two sequences, the operator will wait until all of the sources have emitted the next value before it emits the next zipped value. In the next example, we add another source with its own frequency again.

Observable.zip(
		Observable.interval(100, TimeUnit.MILLISECONDS),
		Observable.interval(150, TimeUnit.MILLISECONDS),
		Observable.interval(050, TimeUnit.MILLISECONDS),
		(i1,i2,i3) -> i1 + " - " + i2 + " - " + i3)
	.take(6)
	.subscribe(System.out::println);

Output

0 - 0 - 0
1 - 1 - 1
2 - 2 - 2
3 - 3 - 3
4 - 4 - 4
5 - 5 - 5

The zipped sequence terminates when any of the source sequences terminates successfully. Further values from the other sequences will be ignored. We can see that in the next example, where we zip sequences of different sizes and count the elements in the zipped sequence.

Observable.zip(
		Observable.range(0, 5),
		Observable.range(0, 3),
		Observable.range(0, 8),
		(i1,i2,i3) -> i1 + " - " + i2 + " - " + i3)
	.count()
	.subscribe(System.out::println);

Output

3

The zipped sequence contains as many elements as the shortest source sequence.

There is also the zipWith operator, which is an alternative style of zipping 2 sequences. zipWith allows you to zip in a chain, but it can be inconvenient for zipping more that two sequences.

Observable.interval(100, TimeUnit.MILLISECONDS)
	.zipWith(
		Observable.interval(150, TimeUnit.MILLISECONDS), 
		(i1,i2) -> i1 + " - " + i2)
	.take(6)
	.subscribe(System.out::println);

Output

0 - 0
1 - 1
2 - 2
3 - 3
4 - 4
5 - 5

The zipWith also has an overload that allows you to zip your observable sequence with an iterable.

Observable.range(0, 5)
	.zipWith(
		Arrays.asList(0,2,4,6,8),
		(i1,i2) -> i1 + " - " + i2)
	.subscribe(System.out::println);

Output

0 - 0
1 - 2
2 - 4
3 - 6
4 - 8

combineLatest

Where zip uses indices, combineLatest will use time. Every time one of the observables being combined emits a value, that value is combined with the latest value by the other observable. Once again, a function is required to combine the values.

Observable.combineLatest(
		Observable.interval(100, TimeUnit.MILLISECONDS)
			.doOnNext(i -> System.out.println("Left emits")),
		Observable.interval(150, TimeUnit.MILLISECONDS)
			.doOnNext(i -> System.out.println("Right emits")),
		(i1,i2) -> i1 + " - " + i2
	)
	.take(6)
	.subscribe(System.out::println);

Output

Left emits
Right emits
0 - 0
Left emits
1 - 0
Left emits
2 - 0
Right emits
2 - 1
Left emits
3 - 1
Right emits
3 - 2

As we can see, combineLatest first it waits for every sequence to have a value. After that, every value emitted by either observable results in a combined value being emitted.

Just like every combinator we've seen in this chapter, there are overloads that allow to combine more than two sequences.

I like to think of combineLatest as one event occuring in the context of another. combineLatest is very useful when consuming input from GUIs, where multiple stateful GUI controls affect the same output. Imagine a text input field, a paragraph that echoes the text and a checkbox that signals to capitalise it or not. Everytime the text field or the checkbox changes, combineLatest will combine the text with the decision to capitalise it or not. The end result is ready to be written to the output.

Continue reading

Previous Next
Advanced error handling Time-shifted sequences