Skip to content
This repository has been archived by the owner on Apr 20, 2018. It is now read-only.

Commit

Permalink
Adding switchFirst
Browse files Browse the repository at this point in the history
  • Loading branch information
mattpodwysocki committed Nov 11, 2015
1 parent 0e10e23 commit 3e4f09d
Show file tree
Hide file tree
Showing 9 changed files with 648 additions and 10 deletions.
3 changes: 2 additions & 1 deletion .jshintrc
Expand Up @@ -11,5 +11,6 @@
"boss": true,
"eqnull": true,
"node": true,
"-W030": true
"-W030": true,
"predef": [ "Promise" ]
}
15 changes: 8 additions & 7 deletions src/core/linq/observable/switchfirst.js
Expand Up @@ -6,13 +6,14 @@
}

SwitchFirstObservable.prototype.subscribeCore = function (o) {
var m = new SingleAssignmentDisposable(), g = new CompositeDisposable(),
state = {
hasCurrent: false,
isStopped: false,
o: o,
g: g
};
var m = new SingleAssignmentDisposable(),
g = new CompositeDisposable(),
state = {
hasCurrent: false,
isStopped: false,
o: o,
g: g
};

g.add(m);
m.setDisposable(this.source.subscribe(new SwitchFirstObserver(state)));
Expand Down
2 changes: 1 addition & 1 deletion src/core/perf/operators/switch.js
Expand Up @@ -54,7 +54,7 @@
InnerObserver.prototype.completed = function () {
if (this.parent.latest === this.id) {
this.parent.hasLatest = false;
this.parent.isStopped && this.parent.o.onCompleted();
this.parent.stopped && this.parent.o.onCompleted();
}
};

Expand Down
8 changes: 8 additions & 0 deletions src/modular/observable/flatmaplatest.js
@@ -0,0 +1,8 @@
'use strict';

var FlatMapObservable = require('./flatmapobservable');
var switchLatest = require('./switch');

module.exports = function flatMapLatest (source, selector, resultSelector, thisArg) {
return switchLatest(new FlatMapObservable(source, selector, resultSelector, thisArg));
};
2 changes: 1 addition & 1 deletion src/modular/observable/skipuntil.js
@@ -1,7 +1,7 @@
'use strict';

var AbstractObserver = require('../observer/abstractobserver');
var ObservableBase = require('./observablebase');
var AbstractObserver = require('../observer/abstractobserver');
var BinaryDisposable = require('../binarydisposable');
var SingleAssignmentDisposable = require('../singleassignmentdisposable');
var fromPromise = require('./frompromise');
Expand Down
68 changes: 68 additions & 0 deletions src/modular/observable/switch.js
@@ -0,0 +1,68 @@
'use strict';

var ObservableBase = require('./observablebase');
var AbstractObserver = require('../observer/abstractobserver');
var BinaryDisposable = require('../binarydisposable');
var SerialDisposable = require('../serialdisposable');
var SingleAssignmentDisposable = require('../singleassignmentdisposable');
var fromPromise = require('./frompromise');
var isPromise = require('../helpers/ispromise');
var inherits = require('util').inherits;

function InnerObserver(p, id) {
this._p = p;
this._id = id;
AbstractObserver.call(this);
}

inherits(InnerObserver, AbstractObserver);

InnerObserver.prototype.next = function (x) { this._p._latest === this._id && this._p._o.onNext(x); };
InnerObserver.prototype.error = function (e) { this._p._latest === this._id && this._p._o.onError(e); };
InnerObserver.prototype.completed = function () {
if (this._p._latest === this._id) {
this._p._hasLatest = false;
this._p._stopped && this._p._o.onCompleted();
}
};

function SwitchObserver(o, inner) {
this._o = o;
this._inner = inner;
this._stopped = false;
this._latest = 0;
this._hasLatest = false;
AbstractObserver.call(this);
}

inherits(SwitchObserver, AbstractObserver);

SwitchObserver.prototype.next = function (innerSource) {
var d = new SingleAssignmentDisposable(), id = ++this._latest;
this._hasLatest = true;
this._inner.setDisposable(d);
isPromise(innerSource) && (innerSource = fromPromise(innerSource));
d.setDisposable(innerSource.subscribe(new InnerObserver(this, id)));
};
SwitchObserver.prototype.error = function (e) { this._o.onError(e); };
SwitchObserver.prototype.completed = function () { this._stopped = true; !this._hasLatest && this._o.onCompleted(); };

function SwitchObservable(source) {
this.source = source;
ObservableBase.call(this);
}

inherits(SwitchObservable, ObservableBase);

SwitchObservable.prototype.subscribeCore = function (o) {
var inner = new SerialDisposable(), s = this.source.subscribe(new SwitchObserver(o, inner));
return new BinaryDisposable(s, inner);
};

/**
* Transforms an observable sequence of observable sequences into an observable sequence producing values only from the most recent observable sequence.
* @returns {Observable} The observable sequence that at any point in time produces the elements of the most recent inner observable sequence that has been received.
*/
module.exports = function switch_(source) {
return new SwitchObservable(source);
};
82 changes: 82 additions & 0 deletions src/modular/observable/switchfirst.js
@@ -0,0 +1,82 @@
'use strict';

var ObservableBase = require('./observablebase');
var AbstractObserver = require('../observer/abstractobserver');
var CompositeDisposable = require('../compositedisposable');
var SingleAssignmentDisposable = require('../singleassignmentdisposable');
var fromPromise = require('./frompromise');
var isPromise = require('../helpers/ispromise');
var inherits = require('util').inherits;

function InnerObserver(state, inner) {
this._s = state;
this._i = inner;
AbstractObserver.call(this);
}

inherits(InnerObserver, AbstractObserver);

InnerObserver.prototype.next = function (x) { this._s.o.onNext(x); };
InnerObserver.prototype.error = function (e) { this._s.o.onError(e); };
InnerObserver.prototype.completed = function () {
this._s.g.remove(this._i);
this._s.hasCurrent = false;
this._s.isStopped && this._s.g.length === 1 && this._s.o.onCompleted();
};

function SwitchFirstObserver(state) {
this._s = state;
AbstractObserver.call(this);
}

inherits(SwitchFirstObserver, AbstractObserver);

SwitchFirstObserver.prototype.next = function (x) {
if (!this._s.hasCurrent) {
this._s.hasCurrent = true;
isPromise(x) && (x = fromPromise(x));
var inner = new SingleAssignmentDisposable();
this._s.g.add(inner);
inner.setDisposable(x.subscribe(new InnerObserver(this._s, inner)));
}
};

SwitchFirstObserver.prototype.error = function (e) {
this._s.o.onError(e);
};

SwitchFirstObserver.prototype.completed = function () {
this._s.isStopped = true;
!this._s.hasCurrent && this._s.g.length === 1 && this._s.o.onCompleted();
};

function SwitchFirstObservable(source) {
this.source = source;
ObservableBase.call(this);
}

inherits(SwitchFirstObservable, ObservableBase);

SwitchFirstObservable.prototype.subscribeCore = function (o) {
var m = new SingleAssignmentDisposable(),
g = new CompositeDisposable(),
state = {
hasCurrent: false,
isStopped: false,
o: o,
g: g
};

g.add(m);
m.setDisposable(this.source.subscribe(new SwitchFirstObserver(state)));
return g;
};

/**
* Performs a exclusive waiting for the first to finish before subscribing to another observable.
* Observables that come in between subscriptions will be dropped on the floor.
* @returns {Observable} A exclusive observable with only the results that happen when subscribed.
*/
module.exports = function switchFirst (source) {
return new SwitchFirstObservable(source);
};

0 comments on commit 3e4f09d

Please sign in to comment.