Permalink
Switch branches/tags
Nothing to show
Find file Copy path
32a7164 Mar 27, 2017
2 contributors

Users who have contributed to this file

@Froussios @aldoKelvianto
297 lines (241 sloc) 12.2 KB

Sequences of coincidence

Rx tries to avoid state outside of the pipeline. However, some things are inherently stateful. A server can be up or down, a mobile device may have access to wifi, a button is held down. In Rx, we see those as events with a duration and we call them windows. Other events that happen within those windows may need to be treated differently. For example, a mobile device will postpone network requests with low priority while using more expensive channels of communication.

Window

With buffer, we saw an operator that can take a sequence and group values into chunks, based on a variety of overloads. The window operator has a one-to-one relationship with buffer. The main difference is that it doesn't return the groups in buffered chunks. Instead, it returns a sequence of sequences, each sequence corresponding to what would have been a buffer. This means that every emitted observable emits its values as soon as they appear in the source observable, rather than emitting them all at the end of the window. That relationship between buffer and window is immediately apparent by a quick look on the marble diagrams of two corresponding overloads:

With window this becomes:

If you are not already familiar with buffer, I strongly recommend that you begin with that. The overloads and resulting groupings are the same in both operators, but buffer is easier to understand and present examples for. Every buffer overload can be contructed from the window overload with the same arguments as such:

source.buffer(...) 
// same as
source.window(...).flatMap(w -> w.toList())

Window by count

You can have windows with a fixed number of elements. Once the window has emitted the required number of elements, the observable terminates and a new one starts.

You can also have skipping and overlapping windows like you do in buffer with window(int count, int skip). When windows overlap they will be emitting values simultaneously, as can be seen in the next example.

Observable
	.merge(
		Observable.range(0, 5)
	    	.window(3,1))
	.subscribe(System.out::println);

Output

0
1
1
2
2
2
3
3
3
4
4
4

We can see here that the inner observables are emitting the same item simultaneously. To more clearly see what each observable is emitting, let us format the output in a different way:

Observable.range(0, 5)
	.window(3, 1)
	.flatMap(o -> o.toList())
	.subscribe(System.out::println);

Output

[0, 1, 2]
[1, 2, 3]
[2, 3, 4]
[3, 4]
[4]

By turning the inner observables into lists, we see how closely related window is to buffer.

Window by time

Rather than having windows of fixed size, you can have windows of a fixed duration in time.

You can contruct windows that overlap or skip elements, just like you would with buffer, with

public final Observable<Observable<T>> window(long timespan, long timeshift, java.util.concurrent.TimeUnit unit)
Observable.interval(100, TimeUnit.MILLISECONDS)
	.take(5)
	.window(250, 100, TimeUnit.MILLISECONDS)
	.flatMap(o -> o.toList())
	.subscribe(System.out::println);

Output

[0, 1]
[0, 1, 2]
[1, 2, 3]
[2, 3, 4]
[3, 4]
[4]

In this example, a new window begins every 100ms and lasts 250ms. The first window opens at time 0ms and remains open long enough to catch [0, 1] (interval emits the first value at time 100ms). Every subsequent window remains open long enough to catch the next 3 values, except for when the values stop.

Window with signal

Lastly, you can define windows using another observable. Every time your signaling observable emits a value, the old window closes and a new one starts.

.

Alternatively, to have overlapping windows, you can provide a function that uses the values emitted by your signaling observable to contruct another observable that will signal the closing of the window. When the observable terminates, the corresponding window closes.

Here's an example with overlapping windows based on signaling observables

Observable.interval(100, TimeUnit.MILLISECONDS)
	.take(5)
	.window(
		Observable.interval(100, TimeUnit.MILLISECONDS),
		o -> Observable.timer(250, TimeUnit.MILLISECONDS))
	.flatMap(o -> o.toList())
	.subscribe(System.out::println);

Output

[1, 2]
[2, 3]
[3, 4]
[4]
[]

This example is the same as the previous example: a new window opens every 100ms and lasts 250ms, with the exception that the first window starts at 100ms rather than 0ms. We see a difference in results, however. The window that begins at time 100ms does not catch the value that is emitted at 100ms, and the same goes for every other window. This happens because the interval event that begins the window fires just after the interval event that is the value. Even though the two events are simultaneous in theory, in practice there is no such thing.

Join

join allows you to pair together items from two sequences. We've already seen zip, which pairs values based on their index. join allows you to pair values based on durations. Let's see the signature first:

public final <TRight,TLeftDuration,TRightDuration,R> Observable<R> join(
    Observable<TRight> right,
    Func1<T,Observable<TLeftDuration>> leftDurationSelector,
    Func1<TRight,Observable<TRightDuration>> rightDurationSelector,
    Func2<T,TRight,R> resultSelector)

join combines two sequences, called "left" and "right". The method is not static and the left sequence is implied to be the one that join is being called on. In the signature, we can see two methods called leftDurationSelector and rightDurationSelector, which take as an argument an item of the respective sequence. They return an observable that defines a duration (i.e. a window), just like in the last overload of window. These windows are used to select values to be paired together. Values that are paired are passed to the resultSelector function which will combine them into a single value, like a resultSelector in zip does. That value will be emitted by join.

The thing that makes join powerful, but also complicated to understand, is how values are selected to be paired. Every value that arrives in a sequence begins a window for itself. The corresponding duration selector decides when the window for each value will terminate. While the window is open, any value arriving in the opposite sequence will be paired with it. The process is symmetrical for the left and right sequences, so let's just consider a case where the items of only one sequence have windows.

In the first example, the windows in the left sequence never close, while the windows in the right sequence are 0.

Observable<String> left = 
		Observable.interval(100, TimeUnit.MILLISECONDS)
			.map(i -> "L" + i);
Observable<String> right = 
		Observable.interval(200, TimeUnit.MILLISECONDS)
			.map(i -> "R" + i);

left
	.join(
		right,
		i -> Observable.never(),
		i -> Observable.timer(0, TimeUnit.MILLISECONDS),
		(l,r) -> l + " - " + r
	)
	.take(10)
	.subscribe(System.out::println);

Output

L0 - R0
L1 - R0
L0 - R1
L1 - R1
L2 - R1
L3 - R1
L0 - R2
L1 - R2
L2 - R2
L3 - R2

When a window for a left value never ends, what that means is that every value from the left sequence will be paired with every value that comes after it from the right sequence. Because here the right sequence has half the frequence of the left sequence, between two right values, two more windows have opened on the left. The first right value is paired with the first 2 left values, the second right value is paired with the first 4 left values, the third with 6 and so on.

Lets change the example and see what happens when left and right emit every 100ms and left windows close after 150ms. What happens then is that every left window remains open long enough to catch two right values: one that is emitted at the same time and another after 100ms.

Observable<String> left = 
		Observable.interval(100, TimeUnit.MILLISECONDS)
			.map(i -> "L" + i);
Observable<String> right = 
		Observable.interval(100, TimeUnit.MILLISECONDS)
			.map(i -> "R" + i);

left
	.join(
		right,
		i -> Observable.timer(150, TimeUnit.MILLISECONDS),
		i -> Observable.timer(0, TimeUnit.MILLISECONDS),
		(l,r) -> l + " - " + r
	)
	.take(10)
	.subscribe(System.out::println);

Output

L0 - R0
L0 - R1
L1 - R1
L1 - R2
L2 - R2
L2 - R3
L3 - R3
L3 - R4
L4 - R4
L4 - R5

Both sequences have windows. Every value of a sequence is paired with:

  • Any older value of the opposite sequence, if the window of the older value is still open
  • Any newer value of the opposite sequence, if the window for this value is still open

groupJoin

As soon as it detected a pair, join passed the two values to the result selector and emitted the result. groupJoin takes it one step further. Let's start with the signature

public final <T2,D1,D2,R> Observable<R> groupJoin(
    Observable<T2> right,
    Func1<? super T,? extends Observable<D1>> leftDuration,
    Func1<? super T2,? extends Observable<D2>> rightDuration,
    Func2<? super T,? super Observable<T2>,? extends R> resultSelector)

The signature is the same as join exept for the resultSelector. Now the result selector takes an item from the left sequence and an observable of values from the right sequence. That observable will emit every right value that the left value is paired with. The pairing in groupJoin is symmetrical, just like join, but the contruction of results isn't. An alternative implementation of this method could have been if the argument of the resultSelect was a single GroupedObservable, where the left value is the key and the right values are being emitted.

Lets revisit our example from join where the windows on the left never close.

Observable<String> left = 
		Observable.interval(100, TimeUnit.MILLISECONDS)
			.map(i -> "L" + i)
			.take(6);
Observable<String> right = 
		Observable.interval(200, TimeUnit.MILLISECONDS)
			.map(i -> "R" + i)
			.take(3);

left
	.groupJoin(
		right,
		i -> Observable.never(),
		i -> Observable.timer(0, TimeUnit.MILLISECONDS),
		(l, rs) -> rs.toList().subscribe(list -> System.out.println(l + ": " + list))
	)
	.subscribe();

Output

L0: [R0, R1, R2]
L1: [R0, R1, R2]
L2: [R1, R2]
L3: [R1, R2]
L4: [R2]
L5: [R2]

In the result selector, we have a left value and an observable of right values. We used that to print all the values from the right that were paired to each left value. If you go back to the example which used join, you'll see that the pairs are the same. What is changes is how they are made available to us in the resultSelector.

You can implement join with groupJoin and flatMap

.join(
	right,
	leftDuration
	rightDuration,
	(l,r) -> joinResultSelector(l,r)
)
// same as
.groupJoin(
	right,
	leftDuration
	rightDuration,
	(l, rs) -> rs.map(r -> joinResultSelector(l,r))
)
.flatMap(i -> i)

You can also implement groupJoin with join and groupBy. Doing so would require you to contruct tuples as a result and do groupBy on the left part of the tuple. We will leave the code for this example to the reader's appetite for hands-on.

Continue reading

Previous Next
Testing Rx Backpressure