Duplex node.js stream to replicate live changes to and from Javascript objects.
Switch branches/tags
Nothing to show
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Failed to load latest commit information.



Duplex node.js stream to replicate live changes to and from Javascript objects.

build status


This module is installed via npm:

$ npm install observestream

Example Usage

ObserveStream when used in conjunction with LivelyStream will replicate data from the database pointed to LivelyStream with the local javascript object pointed to by ObserveStream:

// require the observestream class
var ObserveStream = require('observestream');

// a database to replicate to/from
var memdb = new MemLively();

// bind the database to the lively stream
var ls = new LivelyStream(memdb);

// scope will contain the local javascript versions of the data in the database
var scope = {};

// Watch for any changes on scope.target and replicate to the 'eugene' key in
// the remote database
var os = new ObserveStream('eugene', scope, 'target', {});

// Connect the database to the observestream to do two-way replication

// Making any changes to the database, should eventually replicate
// the changes to scope.target
memdb.put('eugene', { name: 'Eugene', number: 42 }, function () {});

// Making any changes to the local scope.target will replicate to he database
scope.target.name = 'Susan';


ObserveStream(key, scope, path, initialValue[, options])

Constructs a Duplex ObserveStream.

  • key - This is the key to listen to in the remote database, which will be bound to the local scope variable.
  • scope - This is the scope object (similar to a scope in angularjs) which contains the local javascript object that is to be replicated by the upstream database through LivelyStream. The object to be watched and replicated is scope[path]. So if the path is 'foo', then the object to be replicated would be scope['foo'].
  • path - The property of the scope object which will congtain the local javascript object to be replicated to and from the database attached to the LivelyStream.
  • initialValue - This is the initial value that will be stored in the remote database and used locally, if there is nothing in the database for the key.
  • options - An optional options object that the following options:
    • nextTurn (function) - A function that will be used to poll for changes of the monitored object. By default, this is a function that does setTimeout(fn, 0). But you can easily change this to setImmediate by passing in setImmediate to the NextTurn property of the options object. Note that doing this will increase the CPU utilization considerably.
    • observejs (boolean) - This default to false. This uses observejs to detect changes on the watched object. In practice this wraps the object in a series of getters and setters to detect changes. However, the limitation of this method is that you can't detect if a property has been deleted. This is why we use the nextTurn function to poll for changes. Also there maybe times where wrapping an object in getters and setters may cause issues. If you can use this method of change detection, then this will be much more CPU friendly. ObserveJS may be re-written to use the new ECMAScript Object.observe feature, which would be the best of both worlds. However, this won't work in older browsers or node.js implementations.


// Scope object which will hold the data to be watched
var scope = {}, os;

// Watch for any changes on scope.target
os = new ObserveStream('my key', scope, 'target', {});

// Use observejs to detect changes of the object
os = new ObserveStream('my key', scope, 'target', {}, { observejs: true });

// Use setImmediate for polling
os = new ObserveStream('my key', scope, 'target', {}, { nextTurn: setImmediate });

Outbound 'data' Events emitted by ObserveStream

The ObserveStream emits 'data' events with the following format:

listen events

When the ObserveStream initially is piped to a LivelyStream, it emits a listen event to tell the remote database which key to bind to and the initial value to use if the key is not present in the remote database.


['listen', { key: 'my key', initialValue: {} }]

change events

Any time there is a change in the database, a change event is emitted. The change is in changeset object diff format. For example:

['change', [
    { type: 'put', key: ['name'], value: 'Eugene' },
    { type: 'put', key: ['number'], value: 42 },
    { type: 'del', key: ['old'] } ] ]

Inbound events consumed by ObserveStream to change database values

Initial value events

The very first event that the LivelyStream fires will be the value event. The ObserveStream processes this event, and expects it, to set the initial value of the monitored object.

For example, if the initial value in the database is my value then the first event emitted would be:

['value', 'my value']

change events

When piped from a stream such as ObserveStream, the inbound stream can write events that can modify the underlying database values pointed to by the key.

The format of these events is the same as the change event listed above.


['change', [
    { type: 'put', key: ['name'], value: 'Eugene' },
    { type: 'put', key: ['number'], value: 42 },
    { type: 'del', key: ['old'] } ] ]