Permalink
Switch branches/tags
Nothing to show
Find file Copy path
368 lines (294 sloc) 14.2 KB

Hot and Cold observables

Observable sequences come in two flavours, called "hot" and "cold", that have important differences. In this chapter, we will explain what each type is and what that means for you as an Rx developer.

Cold observables

Cold observables are observables that run their sequence when and if they are subscribed to. They present the sequence from the start to each subscriber. An example of a cold observable would be Observable.interval. Regardless of when it is created and when it is subscribed to, it will generate the same sequence for every subscriber.

Observable<Long> cold = Observable.interval(200, TimeUnit.MILLISECONDS);
		
cold.subscribe(i -> System.out.println("First: " + i));
Thread.sleep(500);
cold.subscribe(i -> System.out.println("Second: " + i));

Output

First: 0
First: 1
First: 2
Second: 0
First: 3
Second: 1
First: 4
Second: 2
...

The two subscribers don't receive the same value at the same time, even though they are both subscribed to the same observable. They do see the same sequence, except that each of them sees it as having begun when they subscribed.

The code samples that we've seen in this guide so far have been cold observables, because cold observables are easier to reason about. Every observable that is created with the Observable.create is a cold observable. That includes all the shorthands that we've seen, such as just, range, timer and from.

Cold observables don't necessarily present the same sequence to each subscriber. If, for example, an observable connects to a database and emits the results of a query, the actual value will depend on the state of the database at the time of subscription. It is the fact that a subscriber will receive the whole query from the start that makes this observable cold.

Hot observables

Hot observables emit values independent of individual subscriptions. They have their own timeline and events occur whether someone is listening or not. An example of this is mouse events. A mouse is generating events regardless of whether there is a subscription listening for them. When a subscription is made, the observer receives current events as they happen. You don't receive and you don't want to receive a recap of everything that the mouse has done since booting the system. When you unsubscribe, it doesn't stop your mouse from generating events either. You're just not receiving them. If you resubscribe, you will again see current events with no recap of what you've missed.

Publish

There are ways to transform cold observables into hot and vice versa. Cold observables become hot with the publish() operator.

public final ConnectableObservable<T> publish()

publish returns a ConnectableObservable<T>, which is an extension of Observable<T> with three additional methods

public final Subscription connect()
public abstract void connect(Action1<? super Subscription> connection)
public Observable<T> refCount()

There is a variant that takes a selector that transforms a sequence before publishing it.

public final <R> Observable<R> publish(Func1<? super Observable<T>,? extends Observable<R>> selector)

The selector can do anything that we've learned to do on observables. The usefulness of this is that a single subscription is made for the selector, which can be reused as much as needed. Without this overload, reusing the observable could lead to multiple subscriptions. There is no way to guarantee that the subscriptions would happen at the same exact time and therefore see the exact same sequence.

This method returns an Observable<T> instead of a ConnectableObservable<T>, so the connection functionality we are about to discuss does not apply there.

connect

The ConnectableObservable will initially emit nothing. When calling connect, it will create a new subscription to its source observable (the one we called publish on). It will begin receiving events and pushing them to its subscribers. All of the subscribers will receive the same events at the same time, as they are practically sharing the same subscription: the one that connect created.

ConnectableObservable<Long> cold = Observable.interval(200, TimeUnit.MILLISECONDS).publish();
cold.connect();

cold.subscribe(i -> System.out.println("First: " + i));
Thread.sleep(500);
cold.subscribe(i -> System.out.println("Second: " + i));	

Output

First: 0
First: 1
First: 2
Second: 2
First: 3
Second: 3
First: 4
Second: 4
First: 5
Second: 5

Disconnecting

As we saw in connect's signature, this method returns a Subscription, just like Observable.subscribe does. You can use that reference to terminate the ConnectableObservable's subscription. That will stop events from being propagated to observers but it will not unsubscribe them from the ConnectableObservable. If you call connect again, the ConnectableObservable will start a new subscription and the old observers will begin receiving values again.

ConnectableObservable<Long> connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish();
Subscription s = connectable.connect();

connectable.subscribe(i -> System.out.println(i));

Thread.sleep(1000);
System.out.println("Closing connection");
s.unsubscribe();

Thread.sleep(1000);
System.out.println("Reconnecting");
s = connectable.connect();

Output

0
1
2
3
4
Closing connection
Reconnecting
0
1
2
...

When you restart by calling connect again, a new subscription will be created. If the source observable is cold, that means that the whole sequence is restarted.

If instead of terminating the connection, you want to unsubscribe from the hot observable, you can use the Subscription returned by the subscribe method.

ConnectableObservable<Long> connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish();
Subscription s = connectable.connect();

Subscription s1 = connectable.subscribe(i -> System.out.println("First: " + i));
Thread.sleep(500);
Subscription s2 = connectable.subscribe(i -> System.out.println("Second: " + i));

Thread.sleep(500);
System.out.println("Unsubscribing second");
s2.unsubscribe();

Output

First: 0
First: 1
First: 2
Second: 2
First: 3
Second: 3
First: 4
Second: 4
Unsubscribing second
First: 5
First: 6

refCount

ConnectableObservable.refCount returns Observable<T> that is connected as long as there are subscribers to it.

Observable<Long> cold = Observable.interval(200, TimeUnit.MILLISECONDS).publish().refCount();
		
Subscription s1 = cold.subscribe(i -> System.out.println("First: " + i));
Thread.sleep(500);
Subscription s2 = cold.subscribe(i -> System.out.println("Second: " + i));
Thread.sleep(500);
System.out.println("Unsubscribe second");
s2.unsubscribe();
Thread.sleep(500);
System.out.println("Unsubscribe first");
s1.unsubscribe();

System.out.println("First connection again");
Thread.sleep(500);
s1 = cold.subscribe(i -> System.out.println("First: " + i));

Output

First: 0
First: 1
First: 2
Second: 2
First: 3
Second: 3
Unsubscribe second
First: 4
First: 5
First: 6
Unsubscribe first
First connection again
First: 0
First: 1
First: 2
First: 3
First: 4

We see here that the sequence doesn't start until there are subscribers to refCount. If they all go away, the connection stops. If more come later, a new connection starts.

replay

public final ConnectableObservable<T> replay()

replay resembles the ReplaySubject. Upon connection, it will begin collecting values. Once a new observer subscribes to the observable, it will have all the collected values replayed to it. Once it has caught up, it will receive values in parallel to every other observer.

ConnectableObservable<Long> cold = Observable.interval(200, TimeUnit.MILLISECONDS).replay();
Subscription s = cold.connect();

System.out.println("Subscribe first");
Subscription s1 = cold.subscribe(i -> System.out.println("First: " + i));
Thread.sleep(700);
System.out.println("Subscribe second");
Subscription s2 = cold.subscribe(i -> System.out.println("Second: " + i));
Thread.sleep(500);

Output

Subscribe first
First: 0
First: 1
First: 2
Subscribe second
Second: 0
Second: 1
Second: 2
First: 3
Second: 3

replay returns an ConnectableObservable like publish, so we can use the same ways to unsubscribe or create a refCount observable.

There are 8 overloads for replay.

ConnectableObservable<T> replay()
<R> Observable<R> replay(Func1<? super Observable<T>,? extends Observable<R>> selector)
<R> Observable<R> replay(Func1<? super Observable<T>,? extends Observable<R>> selector, int bufferSize)
<R> Observable<R> replay(Func1<? super Observable<T>,? extends Observable<R>> selector, int bufferSize, long time, java.util.concurrent.TimeUnit unit)
<R> Observable<R> replay(Func1<? super Observable<T>,? extends Observable<R>> selector, long time, java.util.concurrent.TimeUnit unit)
ConnectableObservable<T> replay(int bufferSize)
ConnectableObservable<T> replay(int bufferSize, long time, java.util.concurrent.TimeUnit unit)
ConnectableObservable<T> replay(long time, java.util.concurrent.TimeUnit unit)

They are different ways of providing one or more of 3 parameters: bufferSize, selector and time (plus unit for time).

  • bufferSize determines the maximum amount of items to be stored and replayed. Upon subscription, the observable will replay the last bufferSize number of items. Older items are forgotten. This is useful for conserving memory.
  • time, unit determines how old an element can be before being forgotten. Upon subscription, the observable will replay items that are newer than time.
  • selector will transform the replayed observable, in the same way that publish(selector) works.

Here's an example with bufferSize

ConnectableObservable<Long> source = Observable.interval(1000, TimeUnit.MILLISECONDS)
	.take(5)
	.replay(2);

source.connect();
Thread.sleep(4500);
source.subscribe(System.out::println);

Output

2
3
4

When we connect, the source begins emitting the sequence 0,1,2,3,4 in 1s intervals. We sleep for 4.5s before subscribing, which means that the source has emitted 0,1,2,3. 0 and 1 have fallen off the buffer, so only 2 and 3 are replayed. When 4 is emitted, we receive it normally.

When providing a time window, items fall off the buffer based on time

ConnectableObservable<Long> source = Observable.interval(1000, TimeUnit.MILLISECONDS)
	.take(5)
	.replay(2000, TimeUnit.MILLISECONDS);

source.connect();
Thread.sleep(4500);
source.subscribe(System.out::println);

Output

2
3
4

cache

The cache operator has a similar function to replay, but hides away the ConnectableObservable and removes the managing of subscriptions. The internal ConnectableObservable is subscribed to when the first observer arrives. Subsequent subscribers have the previous values replayed to them from the cache and don't result in a new subscription to the source observable.

public final Observable<T> cache()
public final Observable<T> cache(int capacity)

Observable<Long> obs = Observable.interval(100, TimeUnit.MILLISECONDS)
	.take(5)
	.cache();

Thread.sleep(500);
obs.subscribe(i -> System.out.println("First: " + i));
Thread.sleep(300);
obs.subscribe(i -> System.out.println("Second: " + i));

Outout

First: 0
First: 1
First: 2
Second: 0
Second: 1
Second: 2
First: 3
Second: 3
First: 4
Second: 4

In this example, we see that the sequence begins not when the observable was created, but when the first subscriber arrived 500ms later. The second subscribers caught up with earlier values upon subscription and received future values normally.

An important thing to note is that the internal ConnectableObservable doesn't unsubscribe if all the subscribers go away, like refCount would. Once the first subscriber arrives, the source observable will be observed and cached once and for all. This matters because we can't walk away from an infinite observable anymore. Values will continue to cached until the source terminates or we run out of memory. The overload that specifies capacity isn't a solution either, as the capacity is received as a hint for optimisation and won't actually restrict the size of our cache.

Observable<Long> obs = Observable.interval(100, TimeUnit.MILLISECONDS)
	.take(5)
	.doOnNext(System.out::println)
	.cache()
	.doOnSubscribe(() -> System.out.println("Subscribed"))
	.doOnUnsubscribe(() -> System.out.println("Unsubscribed"));

Subscription subscription = obs.subscribe();
Thread.sleep(150);
subscription.unsubscribe();

Outout

Subscribed
0
Unsubscribed
1
2
3
4

In this example, doOnNext prints the values as they are produced and cached from the source observable, while doOnSubscribe and doOnUnsubscribe show the subscribers after the caching. We see that the emission of values begins with the first subscription but ignores the fact that we unsubscribed.

Multicast

The share method is an alias for Observable.publish().refCount(). It allows your subscribers to share a subscription, which is kept for as long as there are subscribers.

Continue reading

Previous Next
Time-shifted sequences Custom operators