Skip to content

Blocking Observable Operators

DavidMGross edited this page Jan 29, 2014 · 50 revisions

This section explains the BlockingObservable subclass. A Blocking Observable extends the ordinary Observable class by providing a set of operators on the items emitted by the Observable that block.

To transform an Observable into a BlockingObservable, use the Observable.toBlockingObservable( ) method or the BlockingObservable.from( ) method.

  • forEach( ) — invoke a function on each item emitted by the Observable; block until the Observable completes
  • forEachFuture( ) — create a futureTask that will invoke a specified function on each item emitted by an Observable
  • first( ) — block until the Observable emits an item, then return the first item emitted by the Observable
  • firstOrDefault( ) — block until the Observable emits an item or completes, then return the first item emitted by the Observable or a default item if the Observable did not emit an item
  • last( ) — block until the Observable completes, then return the last item emitted by the Observable
  • lastOrDefault( ) — block until the Observable completes, then return the last item emitted by the Observable or a default item if there is no last item
  • mostRecent( ) — returns an iterable that always returns the item most recently emitted by the Observable
  • next( ) — returns an iterable that blocks until the Observable emits another item, then returns that item
  • latest( ) — returns an iterable that blocks until or unless the Observable emits an item that has not been returned by the iterable, then returns that item
  • chunkify( ) — returns an iterable that periodically returns a list of items emitted by the source Observable since the last list
  • single( ) — if the Observable completes after emitting a single item, return that item, otherwise throw an exception
  • singleOrDefault( ) — if the Observable completes after emitting a single item, return that item, otherwise return a default item
  • toFuture( ) — convert the Observable into a Future
  • toIterable( ) — convert the sequence emitted by the Observable into an Iterable
  • getIterator( ) or toIterator( ) — convert the sequence emitted by the Observable into an Iterator

This documentation accompanies its explanations with a modified form of "marble diagrams." Here is how these marble diagrams represent Blocking Observables:

see also:


forEach( )

invoke a function on each item emitted by the Observable; block until the Observable completes

The forEach(someFunction) method is the blocking equivalent of subscribe(someFunction). When you pass a function to this method, forEach( ) will invoke your function for each item emitted by the Observable, but will only return control to you once the Observable completes (it will not otherwise indicate that the Observable has completed; there is no forEach( ) equivalent of the onError or onCompleted methods).

see also:


forEachFuture( )

create a futureTask that will invoke a specified function on each item emitted by an Observable

The forEachFuture( ) returns a FutureTask for each item emitted by the source Observable (or each item and each notification) that, when executed, will apply a function you specify to each such item (or item and notification).


first( ) and firstOrDefault( )

block until the Observable emits an item, then return the first item emitted by the Observable

Use the first( ) method to retrieve the first item emitted by an Observable, at the time the Observable emits it (it will throw an IllegalArgumentException if the source Observable completes without emitting any items).

You can also use this method to retrieve the first item emitted by an Observable that meets some particular condition. To do this, pass a function to first( ) that returns true if the item meets the condition.

The firstOrDefault( ) method is similar to first( ), except that instead of throwing an exception when there is no first item (or no first item that meets the specified condition), in such a case it will instead return a default item that you specify. Specify that default item by passing it as the first parameter to firstOrDefault( ).

see also:


last( ) and lastOrDefault( )

block until the Observable completes, then return the last item emitted by the Observable

Use the last( ) method to retrieve the last item emitted by an Observable, at the time the Observable completes (it will throw an IllegalArgumentException if the source Observable completes without emitting any items).

You can also use this method to retrieve the last item emitted by an Observable that meets some particular condition. To do this, pass a function to last( ) that returns true if the item meets the condition.

The lastOrDefault( ) method is similar to last( ), except that instead of throwing an exception when there is no last item (or no last item that meets the specified condition), in such a case it will instead return a default item that you specify. Specify that default item by passing it as the first parameter to lastOrDefault( ).

see also:


mostRecent( )

returns an iterable that always returns the item most recently emitted by the Observable

The mostRecent() method returns an iterable that on each iteration returns the item that was most recently emitted by the underlying Observable (or null if the Observable has not yet emitted an item or has completed without emitting any).

see also:


next( )

returns an iterable that blocks until the Observable emits another item, then returns that item

The next( ) method returns an iterable that on each iteration blocks in Iterator.hasNext( ) or Iterator.next( ) until the underlying Observable emits another item; Iterator.next( ) then returns that item.

If the Observable emits an error then Iterator.hasNext( ) will return true and Iterator.next() will re-throw the exception.

If the Observable finishes without emitting another item then Iterator.hasNext( ) call will return false, and Iterator.next( ) will throw a NoSuchElementException.

If you only use the next( ) method, that method will block until the new value arrives, but if you use hasNext( ) and next( ) together, hasNext( ) will block until the first value arrives. After that, however, before you call the corresponding next( ) method, hasNext( ) returns the same result without being blocked. After you call the corresponding next( ) method (which returns the cached value), a subsequent call to hasNext( ) will block.

If the Iterator reaches the final item emitted by the source Observable, a subsequent call to hasNext( ) will always return false, and a subsequent call to next( ) will throw NoSuchElementException. If the source Observable issues an error, any subsequent hasNext( ) or next( ) will throw the error.

see also:


latest( )

returns an iterable that blocks until or unless the Observable emits an item that has not been returned by the iterable, then returns that item

The latest( ) operator is very similar to Observable.next( ) except that when you call Iterator.next( ) on the resulting Iterator, it will not block if a item has been emitted by the source Observable since the item returned by the previous Iterator.next( ) -- instead, it will return the most recent such item.

see also:


chunkify( )

returns an iterable that periodically returns a list of items emitted by the source Observable since the last list

The chunkify( ) operator represents a blocking observable as an Iterable, that, each time you iterate over it, returns a list of items emitted by the source Observable since the previous iteration. These lists may be empty if there have been no such items emitted.


single( ) and singleOrDefault( )

if the Observable completes after emitting a single item, return that item, otherwise throw an exception (or return a default item)

Use the single( ) method to retrieve the only item emitted by an Observable. single( ) will throw an exception if the Observable does not emit exactly one item.

You can also use this method to retrieve the only item emitted by an Observable that meets some particular condition (or null if the Observable method emits no such item). To do this, pass a function to single( ) that returns true if the item meets the condition. In such a case, single( ) will again throw an exception unless the Observable emits exactly one item that meets the condition.

The singleOrDefault( ) method is similar, except that while it will still throw an exception if the underlying Observable emits more than one item, if the underlying Observable does not emit any items at all, rather than throwing an exception singleOrDefault( ) will return a default item that you specify. Specify that default item by passing it as the first parameter to singleOrDefault( ).

see also:


transformations: toFuture( ), toIterable( ), and toIterator( )/getIterator( )

transform an Observable into a Future, an Iterable, or an Iterator

Use these methods to transform a Blocking Observable into a Future, an Iterable, or an Iterator. Note that toFuture( ) will only work on Blocking Observables that emit one or fewer items. To convert Blocking Observables that emit two or more items into Futures, instead use .toList( ).toFuture( ) to reduce the items emitted by the Observable to a single (list) item.

see also:


Appendix: similar blocking and non-blocking operators

operator result when it acts on equivalent in Rx.NET
Observable that emits multiple items Observable that emits one item Observable that emits no items
Observable.first the first item the single item Illegal Argument firstAsync
BlockingObservable.first the first item the single item Illegal Argument first
Observable.firstOrDefault the first item the single item the default item firstOrDefaultAsync
BlockingObservable.firstOrDefault the first item the single item the default item firstOrDefault
Observable.last the last item the single item Illegal Argument lastAsync
BlockingObservable.last the last item the single item Illegal Argument last
Observable.lastOrDefault the last item the single item the default item lastOrDefaultAsync
BlockingObservable.lastOrDefault the last item the single item the default item lastOrDefault
Observable.single Illegal Argument the single item Illegal Argument singleAsync
BlockingObservable.single Illegal Argument the single item Illegal Argument single
Observable.singleOrDefault Illegal Argument the single item the default item singleOrDefaultAsync
BlockingObservable.singleOrDefault Illegal Argument the single item the default item singleOrDefault