Permalink
379 lines (278 sloc) 16.5 KB

Creating and Subscribing to Simple Observable Sequences

You do not need to implement the Observable class manually to create an observable sequence. Similarly, you do not need to implement Observer either to subscribe to a sequence. By installing the Reactive Extension libraries, you can take advantage of the Observable type which provides many operators for you to create a simple sequence with zero, one or more elements. In addition, RxJS provides an overloaded subscribe method which allows you to pass in onNext, onError and onCompleted function handlers.

Creating a sequence from scratch

Before getting into many operators, let's look at how to create an Observable from scratch using the Rx.Observable.create method.

First, we need to ensure we reference the core rx.js file.

<script src="rx.js"></script>

Or if we're using Node.js, we can reference it as such:

var Rx = require('rx');

In this example, we will simply yield a single value of 42 and then mark it as completed. The return value is completely optional if no cleanup is required.

var source = Rx.Observable.create(function (observer) {
  // Yield a single value and complete
  observer.onNext(42);
  observer.onCompleted();

  // Any cleanup logic might go here
  return function () {
    console.log('disposed');
  }
});

var subscription = source.subscribe(
  function (x) { console.log('onNext: %s', x); },
  function (e) { console.log('onError: %s', e); },
  function () { console.log('onCompleted'); });

// => onNext: 42
// => onCompleted

subscription.dispose();
// => disposed

For most operations, this is completely overkill, but shows the very basics of how most RxJS operators work.

Creating and subscribing to a simple sequence

The following sample uses the range operator of the Observable type to create a simple observable collection of numbers. The observer subscribes to this collection using the Subscribe method of the Observable class, and provides actions that are delegates which handle onNext, onError and onCompleted. In our example, it creates a sequence of integers that starts with x and produces y sequential numbers afterwards.

As soon as the subscription happens, the values are sent to the observer. The onNext function then prints out the values.

// Creates an observable sequence of 5 integers, starting from 1
var source = Rx.Observable.range(1, 5);

// Prints out each item
var subscription = source.subscribe(
  function (x) { console.log('onNext: %s', x); },
  function (e) { console.log('onError: %s', e); },
  function () { console.log('onCompleted'); });

// => onNext: 1
// => onNext: 2
// => onNext: 3
// => onNext: 4
// => onNext: 5
// => onCompleted

When an observer subscribes to an observable sequence, the subscribe method may be using asynchronous behavior behind the scenes depending on the operator. Therefore, the subscribe call is asynchronous in that the caller is not blocked until the observation of the sequence completes. This will be covered in more details in the Using Schedulers topic.

Notice that the subscribe method returns a Disposable, so that you can unsubscribe to a sequence and dispose of it easily. When you invoke the dispose method on the observable sequence, the observer will stop listening to the observable for data. Normally, you do not need to explicitly call dispose unless you need to unsubscribe early, or when the source observable sequence has a longer life span than the observer. Subscriptions in Rx are designed for fire-and-forget scenarios without the usage of a finalizer. Note that the default behavior of the Observable operators is to dispose of the subscription as soon as possible (i.e, when an onCompleted or onError messages is published). For example, the code will subscribe x to both sequences a and b. If a throws an error, x will immediately be unsubscribed from b.

var x = Rx.Observable.zip(a, b, function (a1, b1) { return a1 + b1; }).subscribe();

You can also tweak the code sample to use the Create operator of the Observer type, which creates and returns an observer from specified OnNext, OnError, and OnCompleted action delegates. You can then pass this observer to the Subscribe method of the Observable type. The following sample shows how to do this.

// Creates an observable sequence of 5 integers, starting from 1
var source = Rx.Observable.range(1, 5);

// Create observer
var observer = Rx.Observer.create(
  function (x) { console.log('onNext: %s', x); },
  function (e) { console.log('onError: %s', e); },
  function () { console.log('onCompleted'); });

// Prints out each item
var subscription = source.subscribe(observer);

// => onNext: 1
// => onNext: 2
// => onNext: 3
// => onNext: 4
// => onNext: 5
// => onCompleted

In addition to creating an observable sequence from scratch, you can convert existing Arrays, events, callbacks and promises into observable sequences. The other topics in this section will show you how to do this.

Notice that this topic only shows you a few operators that can create an observable sequence from scratch. To learn more about other LINQ operators, see Querying Observable Sequences using LINQ Operators.

Using a timer

The following sample uses the timer operator to create a sequence. The sequence will push out the first value after 5 second has elapsed, then it will push out subsequent values every 1 second. For illustration purpose, we chain the timestamp operator to the query so that each value pushed out will be appended by the time when it is published. By doing so, when we subscribe to this source sequence, we can receive both its value and timestamp.

First, we need to ensure we reference the proper files if in the browser. Note that the RxJS NPM Package already includes all operators by default.

<script src="rx.js"></script>
<script src="rx.time.js"></script>

Now on to our example

console.log('Current time: ' + Date.now());

var source = Rx.Observable.timer(
  5000, /* 5 seconds */
  1000 /* 1 second */)
   .timestamp();

var subscription = source.subscribe(
  function (x) {
    console.log(x.value + ': ' + x.timestamp);
  });

/* Output may be similar to this */
// Current time: 1382560697820
// 0: 1382560702820
// 1: 1382560703820
// 2: 1382560704820

By using the timestamp operator, we have verified that the first item is indeed pushed out 5 seconds after the sequence has started, and each item is published 1 second later.

Converting Arrays and Iterables to an Observable Sequence

Using the Rx.Observable.from operator, you can convert an array to observable sequence.

var array = [1,2,3,4,5];

// Converts an array to an observable sequence
var source = Rx.Observable.from(array);

// Prints out each item
var subscription = source.subscribe(
  function (x) { console.log('onNext: %s', x); },
  function (e) { console.log('onError: %s', e); },
  function () { console.log('onCompleted'); });

// => onNext: 1
// => onNext: 2
// => onNext: 3
// => onNext: 4
// => onNext: 5
// => onCompleted

You can also convert array-like objects such as objects with a length property and indexed with numbers. In this case, we'll simply have an object with a length of 5.

var arrayLike = { length: 5 };

// Converts an array to an observable sequence
var source = Rx.Observable.from(arrayLike, function (v, k) { return k + 1; });

// Prints out each item
var subscription = source.subscribe(
  function (x) { console.log('onNext: %s', x); },
  function (e) { console.log('onError: %s', e); },
  function () { console.log('onCompleted'); });

// => onNext: 1
// => onNext: 2
// => onNext: 3
// => onNext: 4
// => onNext: 5
// => onCompleted

In addition, we can also use ES6 Iterable objects such as Map and Set using from to an observable sequence. In this example, we can take a Set and convert it to an observable sequence.

var set = new Set([1,2,3,4,5]);

// Converts a Set to an observable sequence
var source = Rx.Observable.from(set);

// Prints out each item
var subscription = source.subscribe(
  function (x) { console.log('onNext: %s', x); },
  function (e) { console.log('onError: %s', e); },
  function () { console.log('onCompleted'); });

// => onNext: 1
// => onNext: 2
// => onNext: 3
// => onNext: 4
// => onNext: 5
// => onCompleted

We can also do a Map as well by applying the same technique.

var map = new Map([['key1', 1], ['key2', 2]]);

// Converts a Map to an observable sequence
var source = Rx.Observable.from(map);

// Prints out each item
var subscription = source.subscribe(
  function (x) { console.log('onNext: %s', x); },
  function (e) { console.log('onError: %s', e); },
  function () { console.log('onCompleted'); });

// => onNext: key1, 1
// => onNext: key2, 2
// => onCompleted

The from method can also support ES6 generators which may already be in your browser, or coming to a browser near you. This allows us to do such things as Fibonacci sequences and so forth and convert them to an observable sequence.

function* fibonacci () {
  var fn1 = 1;
  var fn2 = 1;
  while (1){
    var current = fn2;
    fn2 = fn1;
    fn1 = fn1 + current;
    yield current;
  }
}

// Converts a generator to an observable sequence
var source = Rx.Observable.from(fibonacci()).take(5);

// Prints out each item
var subscription = source.subscribe(
  function (x) { console.log('onNext: %s', x); },
  function (e) { console.log('onError: %s', e); },
  function () { console.log('onCompleted'); });

// => onNext: 1
// => onNext: 1
// => onNext: 2
// => onNext: 3
// => onNext: 5
// => onCompleted

Cold vs. Hot Observables

Cold observables start running upon subscription, i.e., the observable sequence only starts pushing values to the observers when Subscribe is called. Values are also not shared among subscribers. This is different from hot observables such as mouse move events or stock tickers which are already producing values even before a subscription is active. When an observer subscribes to a hot observable sequence, it will get all values in the stream that are emitted after it subscribes. The hot observable sequence is shared among all subscribers, and each subscriber is pushed the next value in the sequence. For example, even if no one has subscribed to a particular stock ticker, the ticker will continue to update its value based on market movement. When a subscriber registers interest in this ticker, it will automatically receive the next tick.

The following example demonstrates a cold observable sequence. In this example, we use the Interval operator to create a simple observable sequence of numbers pumped out at specific intervals, in this case, every 1 second.

Two observers then subscribe to this sequence and print out its values. You will notice that the sequence is reset for each subscriber, in which the second subscription will restart the sequence from the first value.

First, we need to ensure we reference the proper files if in the browser. Note that the RxJS NPM Package already includes all operators by default.

<script src="rx.lite.js"></script>

And now to the example.

var source = Rx.Observable.interval(1000);

var subscription1 = source.subscribe(
  function (x) { console.log('Observer 1: onNext: ' + x); },
  function (e) { console.log('Observer 1: onError: ' + e.message); },
  function () { console.log('Observer 1: onCompleted'); });

var subscription2 = source.subscribe(
  function (x) { console.log('Observer 2: onNext: ' + x); },
  function (e) { console.log('Observer 2: onError: ' + e.message); },
  function () { console.log('Observer 2: onCompleted'); });

setTimeout(function () {
  subscription1.dispose();
  subscription2.dispose();
}, 5000);

// => Observer 1: onNext: 0
// => Observer 2: onNext: 0
// => Observer 1: onNext: 1
// => Observer 2: onNext: 1
// => Observer 1: onNext: 2
// => Observer 2: onNext: 2
// => Observer 1: onNext: 3
// => Observer 2: onNext: 3

In the following example, we convert the previous cold observable sequence source to a hot one using the publish operator, which returns a ConnectableObservable instance we name hot. The publish operator provides a mechanism to share subscriptions by broadcasting a single subscription to multiple subscribers. The hot variable acts as a proxy by subscribing to source and, as it receives values from source, pushing them to its own subscribers. To establish a subscription to the backing source and start receiving values, we use the ConnectableObservable.prototype.connect method. Since ConnectableObservable inherits Observable, we can use subscribe to subscribe to this hot sequence even before it starts running. Notice that in the example, the hot sequence has not been started when subscription1 subscribes to it. Therefore, no value is pushed to the subscriber. After calling Connect, values are then pushed to subscription1. After a delay of 3 seconds, subscription2 subscribes to hot and starts receiving the values immediately from the current position (3 in this case) until the end. The output looks like this:

// => Current time: 1382562433256
// => Current Time after 1st subscription: 1382562433260
// => Current Time after connect: 1382562436261
// => Observer 1: onNext: 0
// => Observer 1: onNext: 1
// => Current Time after 2nd subscription: 1382562439262
// => Observer 1: onNext: 2
// => Observer 2: onNext: 2
// => Observer 1: onNext: 3
// => Observer 2: onNext: 3
// => Observer 1: onNext: 4
// => Observer 2: onNext: 4

First, we need to ensure we reference the proper files if in the browser. Note that the RxJS NPM Package already includes all operators by default.

<script src="rx.lite.js"></script>

Now onto the example!

console.log('Current time: ' + Date.now());

// Creates a sequence
var source = Rx.Observable.interval(1000);

// Convert the sequence into a hot sequence
var hot = source.publish();

// No value is pushed to 1st subscription at this point
var subscription1 = hot.subscribe(
  function (x) { console.log('Observer 1: onNext: %s', x); },
  function (e) { console.log('Observer 1: onError: %s', e); },
  function () { console.log('Observer 1: onCompleted'); });

console.log('Current Time after 1st subscription: ' + Date.now());

// Idle for 3 seconds
setTimeout(function () {

  // Hot is connected to source and starts pushing value to subscribers
  hot.connect();

  console.log('Current Time after connect: ' + Date.now());

  // Idle for another 3 seconds
  setTimeout(function () {

    console.log('Current Time after 2nd subscription: ' + Date.now());

    var subscription2 = hot.subscribe(
      function (x) { console.log('Observer 2: onNext: %s', x); },
      function (e) { console.log('Observer 2: onError: %s', e); },
      function () { console.log('Observer 2: onCompleted'); });

  }, 3000);
}, 3000);

Analogies

It helps to think of cold and hot Observables as movies or performances that one can watch ("subscribe").

  • Cold Observables: movies.
  • Hot Observables: live performances.
  • Hot Observables replayed: live performances recorded on video.

Whenever you watch a movie, your run of the movie is independent of anyone else's run, even though all movie watchers see the same effects. On the other hand, a live performance is shared to multiple viewers. If you arrive late to a live performance, you will simply miss some of it. However, if it was recorded on video (in RxJS this would happen with a BehaviorSubject or a ReplaySubject), you can watch a "movie" of the live performance. A .publish().refCount() live performance is one where the artists quit playing when no one is watching, and start playing again when there is at least one person in the audience.