Skip to content
This repository has been archived by the owner on Oct 18, 2023. It is now read-only.

Observable Utility Operators

DavidMGross edited this page Mar 17, 2014 · 72 revisions

This section explains various utility operators for working with Observables.

  • materialize( ) — convert an Observable into a list of Notifications
  • dematerialize( ) — convert a materialized Observable back into its non-materialized form
  • timestamp( ) — attach a timestamp to every item emitted by an Observable
  • serialize( ) — force an Observable to make synchronous calls and to be well-behaved
  • cache( ) — remember the sequence of items emitted by the Observable and emit the same sequence to future Subscribers
  • observeOn( ) — specify on which Scheduler a Subscriber should observe the Observable
  • subscribeOn( ) — specify which Scheduler an Observable should use when its subscription is invoked
  • parallel( ) — split the work done on the emissions from an Observable into multiple Observables each operating on its own parallel thread
  • doOnEach( ) — register an action to take whenever an Observable emits an item
  • doOnCompleted( ) — register an action to take when an Observable completes successfully
  • doOnError( ) — register an action to take when an Observable completes with an error
  • doOnTerminate( ) — register an action to take when an Observable completes, either successfully or with an error
  • finallyDo( ) — register an action to take when an Observable completes
  • delay( ) — shift the emissions from an Observable forward in time by a specified amount
  • delaySubscription( ) — hold an Subscriber's subscription request for a specified amount of time before passing it on to the source Observable
  • timeInterval( ) — emit the time lapsed between consecutive emissions of a source Observable
  • using( ) — create a disposable resource that has the same lifespan as an Observable
  • single( ) — if the Observable completes after emitting a single item, return that item, otherwise throw an exception
  • singleOrDefault( ) — if the Observable completes after emitting a single item, return that item, otherwise return a default item

materialize( )

convert an Observable into a list of Notifications

A well-formed Observable will invoke its Subscriber’s onNext method zero or more times, and then will invoke either the onCompleted or onError method exactly once. The materialize( ) method converts this series of invocations into a series of items emitted by an Observable, where it emits each such invocation as a Notification object.

For example:

numbers = Observable.from([1, 2, 3]);

numbers.materialize().subscribe(
  { if(rx.Notification.Kind.OnNext == it.kind) { println("Next: " + it.value); }
    else if(rx.Notification.Kind.OnCompleted == it.kind) { println("Completed"); }
    else if(rx.Notification.Kind.OnError == it.kind) { println("Error: " + it.exception); } },
  { println("Error: " + it.getMessage()); }, // onError
  { println("Sequence complete"); }          // onCompleted
);
Next: 1
Next: 2
Next: 3
Completed
Sequence complete

see also:


dematerialize( )

convert a materialized Observable back into its non-materialized form

You can undo the effects of materialize( ) by means of the dematerialize( ) method, which will emit the items from the Observable as though materialize( ) had not been applied to it. The following example dematerializes the materialized Observable from the previous section:

numbers = Observable.from([1, 2, 3]);

numbers.materialize().dematerialize().subscribe(
  { println(it); },                          // onNext
  { println("Error: " + it.getMessage()); }, // onError
  { println("Sequence complete"); }          // onCompleted
);
1
2
3
Sequence complete

see also:


timestamp( )

attach a timestamp to every item emitted by an Observable

The timestamp( ) method converts an Observable that emits items of type T into one that emits objects of type Timestamped<T>, where each such object is stamped with the time at which it was emitted.

def myObservable = Observable.range(1, 1000000).filter({ 0 == (it % 200000) });

myObservable.timestamp().subscribe(
  { println(it.toString()); },               // onNext
  { println("Error: " + it.getMessage()); }, // onError
  { println("Sequence complete"); }          // onCompleted
);
Timestamped(timestampMillis = 1369252582698, value = 200000)
Timestamped(timestampMillis = 1369252582740, value = 400000)
Timestamped(timestampMillis = 1369252582782, value = 600000)
Timestamped(timestampMillis = 1369252582823, value = 800000)
Timestamped(timestampMillis = 1369252582864, value = 1000000)
Sequence complete

see also:


serialize( )

force an Observable to make synchronous calls and to be well-behaved

It is possible for an Observable to invoke its Subscribers' methods asynchronously, perhaps in different threads. This could make an Observable poorly-behaved, in that it might invoke onCompleted or onError before one of its onNext invocations. You can force such an Observable to be well-behaved and synchronous by applying the serialize( ) method to it.

see also:


cache( )

remember the sequence of items emitted by the Observable and emit the same sequence to future Subscribers

By default, an Observable will generate its sequence of emitted items afresh for each new Subscriber that subscribes. You can force it to generate its sequence only once and then to emit this identical sequence to every Subscriber by using the cache( ) method. Compare the behavior of the following two sets of sample code, the first of which does not use cache( ) and the second of which does:

def myObservable = Observable.range(1, 1000000).filter({ 0 == (it % 400000) }).timestamp();

myObservable.subscribe(
  { println(it.toString()); },              // onNext
  { println("Error:" + it.getMessage()); }, // onError
  { println("Sequence complete"); }         // onCompleted
);
myObservable.subscribe(
  { println(it.toString()); },              // onNext
  { println("Error:" + it.getMessage()); }, // onError
  { println("Sequence complete"); }         // onCompleted
);
Timestamped(timestampMillis = 1369252832871, value = 400000)
Timestamped(timestampMillis = 1369252832951, value = 800000)
Sequence complete
Timestamped(timestampMillis = 1369252833074, value = 400000)
Timestamped(timestampMillis = 1369252833154, value = 800000)
Sequence complete
def myObservable = Observable.range(1, 1000000).filter({ 0 == (it % 400000) }).timestamp().cache();

myObservable.subscribe(
  { println(it.toString()); },              // onNext
  { println("Error:" + it.getMessage()); }, // onError
  { println("Sequence complete"); }         // onCompleted
);
myObservable.subscribe(
  { println(it.toString()); },              // onNext
  { println("Error:" + it.getMessage()); }, // onError
  { println("Sequence complete"); }         // onCompleted
);
Timestamped(timestampMillis = 1369252924548, value = 400000)
Timestamped(timestampMillis = 1369252924630, value = 800000)
Sequence complete
Timestamped(timestampMillis = 1369252924548, value = 400000)
Timestamped(timestampMillis = 1369252924630, value = 800000)
Sequence complete

Note that in the second example the timestamps are identical for both of the Subscribers, whereas in the first example they differ.

The cache( ) method will not itself trigger the execution of the source Observable; an initial Subscriber must subscribe to the Observable returned from cache( ) before it will begin emitting items.

see also:


observeOn( )

specify on which Scheduler a Subscriber should observe the Observable

To specify in which Scheduler (thread) the Observable should invoke the Subscribers' onNext( ), onCompleted( ), and onError( ) methods, call the Observable's observeOn( ) method, passing it the appropriate Scheduler.

There is a complication with using observeOn( ) on a PublishSubject or on one of the Observables emitted as a result of the groupBy( ) operator: it is possible that such Observables will begin emitting items before the subscription fully takes hold, and therefore that some of those items will never be observed. For this reason, there is a version of observeOn( ) that takes a buffersize parameter; this parameter allows you to establish a buffer that will collect any items emitted between the time of the subscription and the time the scheduled observer begins observing, so that no items will be lost.

see also:


subscribeOn( )

specify which Scheduler an Observable should use when its subscription is invoked

To specify that the work done by the Observable should be done on a particular Scheduler (thread), call the Observable's subscribeOn( ) method, passing it the appropriate Scheduler. By default (that is, unless you modify the Observable also with observeOn( )) the Observable will invoke the Subscribers' onNext( ), onCompleted( ), and onError( ) methods in this same thread.

see also:


parallel( )

split the work done on the emissions from an Observable into multiple Observables each operating on its own parallel thread

You can use the parallel( ) method to split an Observable into as many Observables as there are available processors, and to do work in parallel on each of these Observables. parallel( ) will then merge the results of these parallel computations back into a single, well-behaved Observable sequence.

see also:


doOnEach( )

register an action to take whenever an Observable emits an item

Use the doOnEach( ) method to register an Action that RxJava will perform each time the Observable emits an item. This action takes the item as a parameter.

There are also doOnEach( ) variants that allow you to register actions to perform if the Observable completes or informs of a throwable. The doOnNext( ) method is equivalent to the version of doOnEach( ) that registers an action to be performed only for each emitted item.

see also:


doOnCompleted( )

register an action to take when an Observable completes successfully

Use the doOnCompleted( ) method to register an Action that RxJava will perform if the Observable completes normally (not by means of an error).

see also:


doOnError( )

register an action to take when an Observable completes with an error

Use the doOnError( ) method to register an Action that RxJava will perform if the Observable terminates with an error. This action takes the Throwable representing the error as a parameter.

see also:


doOnTerminate( )

register an action to take when an Observable completes, either successfully or with an error

Use the doOnTerminate( ) method to register an Action that RxJava will perform just before the Observable calls onComplete or onError.

see also:


finallyDo( )

register an action to take when an Observable completes

You can use the finallyDo( ) method of an Observable to register an action that RxJava will invoke after that Observable invokes either the onCompleted( ) or onError( ) method of its Subscriber.

def numbers = Observable.from([1, 2, 3, 4, 5]);

numbers.finallyDo({ println('Finally'); }).subscribe(
   { println(it); },                          // onNext
   { println("Error: " + it.getMessage()); }, // onError
   { println("Sequence complete"); }          // onCompleted
);
1
2
3
4
5
Sequence complete
Finally

see also:


delay( )

shift the emissions from an Observable forward in time by a specified amount

The delay( ) operator modifies its source Observable by pausing for a particular increment of time (that you specify) before emitting each of the source Observable's items. This has the effect of shifting the entire sequence of items emitted by the Observable forward in time by that specified increment.

Note that delay( ) will not time-shift an onError( ) call in this fashion but it will forward such a call immediately to its subscribers. It will however time shift an onCompleted( ) call.

see also:


delaySubscription( )

hold a Subscriber's subscription request for a specified amount of time before passing it on to the source Observable

The delaySubscription( ) operator shifts waits for a specified period of time after receiving a subscription request before subscribing to the source Observable.

see also:


timeInterval( )

emit the time lapsed between consecutive emissions of a source Observable

The timeInterval( ) operator converts a source Observable into an Observable that emits the amount of time lapsed between consecutive emissions of the source Observable. The first emission is the amount of time lapsed between the time the Subscriber subscribed to the Observable and the time the source Observable emitted its first item. There is no corresponding emission marking the amount of time lapsed between the last emission of the source Observable and the subsequent call to onCompleted( ).

see also:


using( )

create a disposable resource that has the same lifespan as an Observable

Pass the using( ) method two factory functions: the first creates a disposable resource, the second creates an Observable. When a Subscriber subscribes to the resulting Observable, using( ) will use the Observable factory function to create the Observable the Subscriber will observe, while at the same time using the resource factory function to create a resource. When the Subscriber unsubscribes from the Observable, or when the Observable terminates (normally or with an error), using( ) will dispose of the resource it created.

see also:


single( ) and singleOrDefault( )

if the Observable completes after emitting a single item, return that item, otherwise throw an exception (or return a default item)

Use the single( ) method to retrieve the only item emitted by an Observable. single( ) will notify of a NoSuchElementException if the source Observable does not emit exactly one item.

You can also use this method to retrieve the only item emitted by an Observable that meets some particular condition (or null if the Observable method emits no such item). To do this, pass a function to single( ) that returns true if the item meets the condition. In such a case, single( ) will again notify of a NoSuchElementException unless the source Observable emits exactly one item that meets the condition.

The singleOrDefault( ) method is similar, except that while it will still notify of an exception if the underlying Observable emits more than one item, if the underlying Observable does not emit any items at all, rather than notifying of an exception, the Observable returned by singleOrDefault( ) will emit a default item that you specify. Specify that default item by passing it as the first parameter to singleOrDefault( ).

see also:

sidebar

Clone this wiki locally