Skip to content
This repository has been archived by the owner on May 19, 2019. It is now read-only.

Commit

Permalink
optimized observing with multiple observe calls on a cursor
Browse files Browse the repository at this point in the history
  • Loading branch information
c58 committed Feb 8, 2016
1 parent 0dabca4 commit 4a3c242
Showing 1 changed file with 20 additions and 12 deletions.
32 changes: 20 additions & 12 deletions lib/CursorObservable.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ export class CursorObservable extends Cursor {
constructor(db, query, options) {
super(db, query, options);
this.maybeUpdate = _bind(this.maybeUpdate, this);
this._observers = 0;
this._updateQueue = new PromiseQueue(1);
this._propagateUpdate = debounce(_bind(this._propagateUpdate, this), 0, 0);
this._doUpdate = debounce(
_bind(this._doUpdate, this),
_defaultDebounce,
_defaultBatchSize
);
this._observers = 0;
}

static defaultDebounce() {
Expand Down Expand Up @@ -81,45 +81,53 @@ export class CursorObservable extends Cursor {
* @return {Stopper}
*/
observe(listener, options = {}) {
// Make new wrapper for make possible to observe
// multiple times (for removeListener)
const updateWrapper = (a, b) => this.maybeUpdate(a, b);
// Make possible to obbserver w/o callback
listener = listener || function() {};

this.db.on('insert', updateWrapper);
this.db.on('update', updateWrapper);
this.db.on('remove', updateWrapper);
// Start observing when no observers created
if (this._observers <= 0) {
this.db.on('insert', this.maybeUpdate);
this.db.on('update', this.maybeUpdate);
this.db.on('remove', this.maybeUpdate);
}

// Create observe stopper for current listeners
let running = true;
const self = this;
function stopper() {
if (running) {
self.db.removeListener('insert', updateWrapper);
self.db.removeListener('update', updateWrapper);
self.db.removeListener('remove', updateWrapper);
running = false;
self._observers -= 1;
self.removeListener('update', listener);
self.removeListener('stop', stopper);

running = false;
self._observers -= 1;
// Stop observing a cursor if no more observers
if (self._observers === 0) {
self._latestIds = null;
self._latestResult = null;
this._updatePromise = null;
self.emit('observeStopped');
self.db.removeListener('insert', this.maybeUpdate);
self.db.removeListener('update', this.maybeUpdate);
self.db.removeListener('remove', this.maybeUpdate);
}
}
}

// Start listening for updates and global stop
this._observers += 1;
this.on('update', listener);
this.on('stop', stopper);

// Get first result for observer or initiate
// update at first time
if (!this._updatePromise) {
this.update(true, true);
} else if (this._latestResult !== null) {
listener(this._latestResult);
}

// Wrap returned promise with useful fields
const cursorPromiseMixin = { stop: stopper };
return this._createCursorPromise(
this._updatePromise, cursorPromiseMixin
Expand Down

0 comments on commit 4a3c242

Please sign in to comment.