Skip to content
This repository has been archived by the owner on Apr 20, 2018. It is now read-only.

Commit

Permalink
startWith
Browse files Browse the repository at this point in the history
  • Loading branch information
mattpodwysocki committed Feb 26, 2016
1 parent 2cceb47 commit 5fc6c6d
Show file tree
Hide file tree
Showing 4 changed files with 218 additions and 0 deletions.
10 changes: 10 additions & 0 deletions src/core/concurrency/defaultscheduler.js
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,16 @@
return new BinaryDisposable(disposable, new LocalClearDisposable(id));
};

function scheduleLongRunning(state, action, disposable) {
return function () { action(state, disposable); };
}

DefaultScheduler.prototype.scheduleLongRunning = function (state, action) {
var disposable = disposableCreate(noop);
scheduleMethod(scheduleLongRunning(state, action, disposable));
return disposable;
};

return DefaultScheduler;
}(Scheduler));

Expand Down
22 changes: 22 additions & 0 deletions src/modular/observable/startwith.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
'use strict';

var concat = require('./concat');
var fromArray = require('./fromarray');
var isScheduler = require('../scheduler').isScheduler;

global._Rx || (global._Rx = {});
if (!global._Rx.immediateScheduler) {
require('../scheduler/immediatescheduler');
}

module.exports = function startWith () {
var source = arguments[0], scheduler, start = 1;
if (isScheduler(arguments[1])) {
scheduler = arguments[1];
start = 2;
} else {
scheduler = global._Rx.immediateScheduler;
}
for(var args = [], i = start, len = arguments.length; i < len; i++) { args.push(arguments[i]); }
return concat(fromArray(args, scheduler), source);
};
11 changes: 11 additions & 0 deletions src/modular/scheduler/defaultscheduler.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ var BinaryDisposable = require('../binarydisposable');
var SingleAssignmentDisposable = require('../singleassignmentdisposable');
var Scheduler = require('../scheduler');
var isFunction = require('../helpers/isfunction');
var noop = require('../helpers/noop');
var tryCatchUtils = require('../internal/trycatchutils');
var tryCatch = tryCatchUtils.tryCatch, thrower = tryCatchUtils.thrower;
var inherits = require('inherits');
Expand Down Expand Up @@ -168,6 +169,16 @@ DefaultScheduler.prototype._scheduleFuture = function (state, dueTime, action) {
return new BinaryDisposable(disposable, new ClearDisposable(global.clearTimeout, id));
};

function scheduleLongRunning(state, action, disposable) {
return function () { action(state, disposable); };
}

DefaultScheduler.prototype.scheduleLongRunning = function (state, action) {
var disposable = Disposable.create(noop);
scheduleMethod(scheduleLongRunning(state, action, disposable));
return disposable;
};

global._Rx || (global._Rx = {});
global._Rx.defaultScheduler = new DefaultScheduler();

Expand Down
175 changes: 175 additions & 0 deletions src/modular/test/startwith.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
'use strict';

var test = require('tape');
var Observable = require('../observable');
var TestScheduler = require('../testing/testscheduler');
var reactiveAssert = require('../testing/reactiveassert');
var ReactiveTest = require('../testing/reactivetest');
var onNext = ReactiveTest.onNext,
onError = ReactiveTest.onError,
onCompleted = ReactiveTest.onCompleted;

Observable.addToPrototype({
startWith: require('../observable/startwith')
});

global._Rx || (global._Rx = {});
if (!global._Rx.currentThreadScheduler) {
require('../scheduler/currentthreadscheduler');
}

test('Observable#startWith normal', function (t) {
var scheduler = new TestScheduler();

var xs = scheduler.createHotObservable(
onNext(150, 1),
onNext(220, 2),
onCompleted(250)
);

var results = scheduler.startScheduler(function () {
return xs.startWith(1);
});

reactiveAssert(t, results.messages, [
onNext(200, 1),
onNext(220, 2),
onCompleted(250)
]);

t.end();
});

test('Observable#startWith never', function (t) {
var scheduler = new TestScheduler();

var xs = scheduler.createHotObservable(
onNext(150, 1)
);

var results = scheduler.startScheduler(function () {
return xs.startWith(scheduler, 1);
});

reactiveAssert(t, results.messages, [
onNext(201, 1)
]);

t.end();
});

test('Observable#startWith empty', function (t) {
var scheduler = new TestScheduler();

var xs = scheduler.createHotObservable(
onNext(150, 1),
onCompleted(250)
);

var results = scheduler.startScheduler(function () {
return xs.startWith(scheduler, 1);
});

reactiveAssert(t, results.messages, [
onNext(201, 1),
onCompleted(250)
]);

t.end();
});

test('Observable#startWith one', function (t) {
var scheduler = new TestScheduler();

var xs = scheduler.createHotObservable(
onNext(150, 1),
onNext(220, 2),
onCompleted(250)
);

var results = scheduler.startScheduler(function () {
return xs.startWith(scheduler, 1);
});

reactiveAssert(t, results.messages, [
onNext(201, 1),
onNext(220, 2),
onCompleted(250)
]);

t.end();
});

test('Observable#startWith multiple', function (t) {
var scheduler = new TestScheduler();

var xs = scheduler.createHotObservable(
onNext(150, 1),
onNext(220, 4),
onCompleted(250)
);

var results = scheduler.startScheduler(function () {
return xs.startWith(scheduler, 1, 2, 3);
});

reactiveAssert(t, results.messages, [
onNext(201, 1),
onNext(202, 2),
onNext(203, 3),
onNext(220, 4),
onCompleted(250)
]);

t.end();
});

test('Observable#startWith error', function (t) {
var error = new Error();

var scheduler = new TestScheduler();

var xs = scheduler.createHotObservable(
onNext(150, 1),
onError(250, error)
);

var results = scheduler.startScheduler(function () {
return xs.startWith(scheduler, 1, 2, 3);
});

reactiveAssert(t, results.messages, [
onNext(201, 1),
onNext(202, 2),
onNext(203, 3),
onError(250, error)
]);

t.end();
});

test('Observable#startWith is unaffected by currentThread scheduler', function (t) {
var scheduler = new TestScheduler();

var xs = scheduler.createHotObservable(
onNext(150, 1),
onNext(220, 2),
onCompleted(250)
);

var results;

global._Rx.currentThreadScheduler.schedule(null, function () {
results = scheduler.startScheduler(function () {
return xs.startWith(scheduler, 1);
});
});

reactiveAssert(t, results.messages, [
onNext(201, 1),
onNext(220, 2),
onCompleted(250)
]);

t.end();
});

0 comments on commit 5fc6c6d

Please sign in to comment.