diff --git a/spec/observables/interval-spec.js b/spec/observables/interval-spec.js index 89feaae4ba9..b7061fb8726 100644 --- a/spec/observables/interval-spec.js +++ b/spec/observables/interval-spec.js @@ -12,7 +12,7 @@ describe('Observable.interval', function () { it('should specify default scheduler if incorrect scheduler specified', function () { var scheduler = Observable.interval(10, jasmine.createSpy('dummy')).scheduler; - expect(scheduler).toBe(Rx.Scheduler.asap); + expect(scheduler).toBe(Rx.Scheduler.async); }); it('should emit when relative interval set to zero', function () { diff --git a/src/Rx.DOM.ts b/src/Rx.DOM.ts index d8fe44f357f..f43f74c9cdf 100644 --- a/src/Rx.DOM.ts +++ b/src/Rx.DOM.ts @@ -124,9 +124,11 @@ import {EmptyError} from './util/EmptyError'; import {ArgumentOutOfRangeError} from './util/ArgumentOutOfRangeError'; import {ObjectUnsubscribedError} from './util/ObjectUnsubscribedError'; import {asap} from './scheduler/asap'; +import {async} from './scheduler/async'; import {queue} from './scheduler/queue'; import {animationFrame} from './scheduler/animationFrame'; import {AsapScheduler} from './scheduler/AsapScheduler'; +import {AsyncScheduler} from './scheduler/AsyncScheduler'; import {QueueScheduler} from './scheduler/QueueScheduler'; import {AnimationFrameScheduler} from './scheduler/AnimationFrameScheduler'; import {rxSubscriber} from './symbol/rxSubscriber'; @@ -136,6 +138,7 @@ import {AjaxRequest, AjaxResponse, AjaxError, AjaxTimeoutError} from './observab /* tslint:disable:no-var-keyword */ var Scheduler = { asap, + async, queue, animationFrame }; diff --git a/src/Rx.KitchenSink.ts b/src/Rx.KitchenSink.ts index 5ed26205216..1d85e832f82 100644 --- a/src/Rx.KitchenSink.ts +++ b/src/Rx.KitchenSink.ts @@ -171,8 +171,10 @@ import {EmptyError} from './util/EmptyError'; import {ObjectUnsubscribedError} from './util/ObjectUnsubscribedError'; import {ArgumentOutOfRangeError} from './util/ArgumentOutOfRangeError'; import {asap} from './scheduler/asap'; +import {async} from './scheduler/async'; import {queue} from './scheduler/queue'; import {AsapScheduler} from './scheduler/AsapScheduler'; +import {AsyncScheduler} from './scheduler/AsyncScheduler'; import {QueueScheduler} from './scheduler/QueueScheduler'; import {TimeInterval} from './operator/timeInterval'; import {TestScheduler} from './testing/TestScheduler'; @@ -183,6 +185,7 @@ import {rxSubscriber} from './symbol/rxSubscriber'; /* tslint:disable:no-var-keyword */ var Scheduler = { asap, + async, queue }; diff --git a/src/Rx.ts b/src/Rx.ts index 15ec5b54889..9b5edf2a9c7 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -129,8 +129,10 @@ import {EmptyError} from './util/EmptyError'; import {ArgumentOutOfRangeError} from './util/ArgumentOutOfRangeError'; import {ObjectUnsubscribedError} from './util/ObjectUnsubscribedError'; import {asap} from './scheduler/asap'; +import {async} from './scheduler/async'; import {queue} from './scheduler/queue'; import {AsapScheduler} from './scheduler/AsapScheduler'; +import {AsyncScheduler} from './scheduler/AsyncScheduler'; import {QueueScheduler} from './scheduler/QueueScheduler'; import {rxSubscriber} from './symbol/rxSubscriber'; /* tslint:enable:no-unused-variable */ @@ -138,6 +140,7 @@ import {rxSubscriber} from './symbol/rxSubscriber'; /* tslint:disable:no-var-keyword */ var Scheduler = { asap, + async, queue }; diff --git a/src/observable/IntervalObservable.ts b/src/observable/IntervalObservable.ts index df7ccebe36e..eb331b9de53 100644 --- a/src/observable/IntervalObservable.ts +++ b/src/observable/IntervalObservable.ts @@ -2,10 +2,10 @@ import {Subscriber} from '../Subscriber'; import {isNumeric} from '../util/isNumeric'; import {Scheduler} from '../Scheduler'; import {Observable} from '../Observable'; -import {asap} from '../scheduler/asap'; +import {async} from '../scheduler/async'; export class IntervalObservable extends Observable { - static create(period: number = 0, scheduler: Scheduler = asap): Observable { + static create(period: number = 0, scheduler: Scheduler = async): Observable { return new IntervalObservable(period, scheduler); } @@ -23,13 +23,13 @@ export class IntervalObservable extends Observable { ( this).schedule(state, period); } - constructor(private period: number = 0, private scheduler: Scheduler = asap) { + constructor(private period: number = 0, private scheduler: Scheduler = async) { super(); if (!isNumeric(period) || period < 0) { this.period = 0; } if (!scheduler || typeof scheduler.schedule !== 'function') { - this.scheduler = asap; + this.scheduler = async; } } diff --git a/src/observable/TimerObservable.ts b/src/observable/TimerObservable.ts index 758c05e9022..ce2940a133e 100644 --- a/src/observable/TimerObservable.ts +++ b/src/observable/TimerObservable.ts @@ -1,7 +1,7 @@ import {isNumeric} from '../util/isNumeric'; import {Scheduler} from '../Scheduler'; import {Observable} from '../Observable'; -import {asap} from '../scheduler/asap'; +import {async} from '../scheduler/async'; import {isScheduler} from '../util/isScheduler'; import {isDate} from '../util/isDate'; import {Subscription} from '../Subscription'; @@ -46,7 +46,7 @@ export class TimerObservable extends Observable { } if (!isScheduler(scheduler)) { - scheduler = asap; + scheduler = async; } this.scheduler = scheduler; diff --git a/src/operator/bufferTime.ts b/src/operator/bufferTime.ts index f18bf385716..3b04e34de81 100644 --- a/src/operator/bufferTime.ts +++ b/src/operator/bufferTime.ts @@ -3,7 +3,7 @@ import {Subscriber} from '../Subscriber'; import {Observable} from '../Observable'; import {Scheduler} from '../Scheduler'; import {Action} from '../scheduler/Action'; -import {asap} from '../scheduler/asap'; +import {async} from '../scheduler/async'; /** * Buffers values from the source for a specific time period. Optionally allows @@ -15,14 +15,14 @@ import {asap} from '../scheduler/asap'; * before emitting them and clearing them. * @param {number} [bufferCreationInterval] the interval at which to start new * buffers. - * @param {Scheduler} [scheduler] (optional, defaults to `asap` scheduler) The + * @param {Scheduler} [scheduler] (optional, defaults to `async` scheduler) The * scheduler on which to schedule the intervals that determine buffer * boundaries. * @returns {Observable} an observable of arrays of buffered values. */ export function bufferTime(bufferTimeSpan: number, bufferCreationInterval: number = null, - scheduler: Scheduler = asap): Observable { + scheduler: Scheduler = async): Observable { return this.lift(new BufferTimeOperator(bufferTimeSpan, bufferCreationInterval, scheduler)); } diff --git a/src/operator/debounceTime.ts b/src/operator/debounceTime.ts index e3201ea1c59..be78325c988 100644 --- a/src/operator/debounceTime.ts +++ b/src/operator/debounceTime.ts @@ -3,7 +3,7 @@ import {Observable} from '../Observable'; import {Subscriber} from '../Subscriber'; import {Scheduler} from '../Scheduler'; import {Subscription} from '../Subscription'; -import {asap} from '../scheduler/asap'; +import {async} from '../scheduler/async'; /** * Returns the source Observable delayed by the computed debounce duration, @@ -17,7 +17,7 @@ import {asap} from '../scheduler/asap'; * @param {Scheduler} [scheduler] the Scheduler to use for managing the timers that handle the timeout for each item. * @returns {Observable} an Observable the same as source Observable, but drops items. */ -export function debounceTime(dueTime: number, scheduler: Scheduler = asap): Observable { +export function debounceTime(dueTime: number, scheduler: Scheduler = async): Observable { return this.lift(new DebounceTimeOperator(dueTime, scheduler)); } diff --git a/src/operator/delay.ts b/src/operator/delay.ts index bb710027f7b..e9fa558c39c 100644 --- a/src/operator/delay.ts +++ b/src/operator/delay.ts @@ -1,4 +1,4 @@ -import {asap} from '../scheduler/asap'; +import {async} from '../scheduler/async'; import {isDate} from '../util/isDate'; import {Operator} from '../Operator'; import {Scheduler} from '../Scheduler'; @@ -14,7 +14,7 @@ import {Observable} from '../Observable'; * @returns {Observable} an Observable that delays the emissions of the source Observable by the specified timeout or Date. */ export function delay(delay: number|Date, - scheduler: Scheduler = asap): Observable { + scheduler: Scheduler = async): Observable { const absoluteDelay = isDate(delay); const delayFor = absoluteDelay ? (+delay - scheduler.now()) : Math.abs(delay); return this.lift(new DelayOperator(delayFor, scheduler)); diff --git a/src/operator/inspectTime.ts b/src/operator/inspectTime.ts index 7c23436bb87..0958f9661e5 100644 --- a/src/operator/inspectTime.ts +++ b/src/operator/inspectTime.ts @@ -1,11 +1,11 @@ -import {asap} from '../scheduler/asap'; +import {async} from '../scheduler/async'; import {Operator} from '../Operator'; import {Scheduler} from '../Scheduler'; import {Subscriber} from '../Subscriber'; import {Observable} from '../Observable'; import {Subscription} from '../Subscription'; -export function inspectTime(delay: number, scheduler: Scheduler = asap): Observable { +export function inspectTime(delay: number, scheduler: Scheduler = async): Observable { return this.lift(new InspectTimeOperator(delay, scheduler)); } diff --git a/src/operator/sampleTime.ts b/src/operator/sampleTime.ts index a52a41d7889..1bd24d733ca 100644 --- a/src/operator/sampleTime.ts +++ b/src/operator/sampleTime.ts @@ -2,9 +2,9 @@ import {Observable} from '../Observable'; import {Operator} from '../Operator'; import {Subscriber} from '../Subscriber'; import {Scheduler} from '../Scheduler'; -import {asap} from '../scheduler/asap'; +import {async} from '../scheduler/async'; -export function sampleTime(delay: number, scheduler: Scheduler = asap): Observable { +export function sampleTime(delay: number, scheduler: Scheduler = async): Observable { return this.lift(new SampleTimeOperator(delay, scheduler)); } diff --git a/src/operator/throttleTime.ts b/src/operator/throttleTime.ts index 3f62a463b9a..575f7b79b86 100644 --- a/src/operator/throttleTime.ts +++ b/src/operator/throttleTime.ts @@ -2,10 +2,10 @@ import {Operator} from '../Operator'; import {Subscriber} from '../Subscriber'; import {Scheduler} from '../Scheduler'; import {Subscription} from '../Subscription'; -import {asap} from '../scheduler/asap'; +import {async} from '../scheduler/async'; import {Observable} from '../Observable'; -export function throttleTime(delay: number, scheduler: Scheduler = asap): Observable { +export function throttleTime(delay: number, scheduler: Scheduler = async): Observable { return this.lift(new ThrottleTimeOperator(delay, scheduler)); } diff --git a/src/operator/timeInterval.ts b/src/operator/timeInterval.ts index 5cba3e2c8b4..d1830fe1123 100644 --- a/src/operator/timeInterval.ts +++ b/src/operator/timeInterval.ts @@ -2,9 +2,9 @@ import {Operator} from '../Operator'; import {Observable} from '../Observable'; import {Subscriber} from '../Subscriber'; import {Scheduler} from '../Scheduler'; -import {asap} from '../scheduler/asap'; +import {async} from '../scheduler/async'; -export function timeInterval(scheduler: Scheduler = asap): Observable> { +export function timeInterval(scheduler: Scheduler = async): Observable> { return this.lift(new TimeIntervalOperator(scheduler)); } diff --git a/src/operator/timeout.ts b/src/operator/timeout.ts index 326b0189c2f..36b7c0b363b 100644 --- a/src/operator/timeout.ts +++ b/src/operator/timeout.ts @@ -1,4 +1,4 @@ -import {asap} from '../scheduler/asap'; +import {async} from '../scheduler/async'; import {isDate} from '../util/isDate'; import {Operator} from '../Operator'; import {Subscriber} from '../Subscriber'; @@ -7,7 +7,7 @@ import {Observable} from '../Observable'; export function timeout(due: number | Date, errorToSend: any = null, - scheduler: Scheduler = asap): Observable { + scheduler: Scheduler = async): Observable { let absoluteTimeout = isDate(due); let waitFor = absoluteTimeout ? (+due - scheduler.now()) : Math.abs(due); return this.lift(new TimeoutOperator(waitFor, absoluteTimeout, errorToSend, scheduler)); diff --git a/src/operator/timeoutWith.ts b/src/operator/timeoutWith.ts index 98e0ea1c9a1..f49395ffc9b 100644 --- a/src/operator/timeoutWith.ts +++ b/src/operator/timeoutWith.ts @@ -1,7 +1,7 @@ import {Operator} from '../Operator'; import {Subscriber} from '../Subscriber'; import {Scheduler} from '../Scheduler'; -import {asap} from '../scheduler/asap'; +import {async} from '../scheduler/async'; import {Subscription} from '../Subscription'; import {Observable} from '../Observable'; import {isDate} from '../util/isDate'; @@ -10,7 +10,7 @@ import {subscribeToResult} from '../util/subscribeToResult'; export function timeoutWith(due: number | Date, withObservable: Observable, - scheduler: Scheduler = asap): Observable { + scheduler: Scheduler = async): Observable { let absoluteTimeout = isDate(due); let waitFor = absoluteTimeout ? (+due - scheduler.now()) : Math.abs(due); return this.lift(new TimeoutWithOperator(waitFor, absoluteTimeout, withObservable, scheduler)); diff --git a/src/operator/windowTime.ts b/src/operator/windowTime.ts index bf68d4a4b10..f8a44a24239 100644 --- a/src/operator/windowTime.ts +++ b/src/operator/windowTime.ts @@ -4,11 +4,11 @@ import {Observable} from '../Observable'; import {Subject} from '../Subject'; import {Scheduler} from '../Scheduler'; import {Action} from '../scheduler/Action'; -import {asap} from '../scheduler/asap'; +import {async} from '../scheduler/async'; export function windowTime(windowTimeSpan: number, windowCreationInterval: number = null, - scheduler: Scheduler = asap): Observable> { + scheduler: Scheduler = async): Observable> { return this.lift(new WindowTimeOperator(windowTimeSpan, windowCreationInterval, scheduler)); } diff --git a/src/scheduler/AsyncScheduler.ts b/src/scheduler/AsyncScheduler.ts new file mode 100644 index 00000000000..a92a2a01c97 --- /dev/null +++ b/src/scheduler/AsyncScheduler.ts @@ -0,0 +1,10 @@ +import {Action} from './Action'; +import {FutureAction} from './FutureAction'; +import {Subscription} from '../Subscription'; +import {QueueScheduler} from './QueueScheduler'; + +export class AsyncScheduler extends QueueScheduler { + scheduleNow(work: (x?: any) => Subscription, state?: any): Action { + return new FutureAction(this, work).schedule(state, 0); + } +} diff --git a/src/scheduler/FutureAction.ts b/src/scheduler/FutureAction.ts index 6341f0ed79a..87072411a1c 100644 --- a/src/scheduler/FutureAction.ts +++ b/src/scheduler/FutureAction.ts @@ -8,6 +8,7 @@ export class FutureAction extends Subscription implements Action { public id: any; public state: any; public delay: number; + private pending: boolean = false; constructor(public scheduler: Scheduler, public work: (x?: any) => Subscription | void) { @@ -17,8 +18,14 @@ export class FutureAction extends Subscription implements Action { execute() { if (this.isUnsubscribed) { throw new Error('How did did we execute a canceled Action?'); + } else { + try { + this.work(this.state); + } catch (e) { + this.unsubscribe(); + throw e; + } } - this.work(this.state); } schedule(state?: any, delay: number = 0): Action { @@ -30,20 +37,30 @@ export class FutureAction extends Subscription implements Action { protected _schedule(state?: any, delay: number = 0): Action { - this.delay = delay; - this.state = state; + this.pending = true; const id = this.id; + this.state = state; - if (id != null) { - this.id = undefined; - root.clearTimeout(id); + // If we've already called setInterval with this delay, don't call it again. + if (id != null && this.delay === delay) { + return this; } - this.id = root.setTimeout(() => { + this.delay = delay; + + if (id != null) { this.id = null; + root.clearInterval(id); + } + + this.id = root.setInterval(() => { + this.pending = false; const {scheduler} = this; scheduler.actions.push(this); scheduler.flush(); + if (this.pending === false) { + this.unsubscribe(); + } }, delay); return this; @@ -51,13 +68,14 @@ export class FutureAction extends Subscription implements Action { protected _unsubscribe() { + this.pending = false; const {id, scheduler} = this; const {actions} = scheduler; const index = actions.indexOf(this); if (id != null) { this.id = null; - root.clearTimeout(id); + root.clearInterval(id); } if (index !== -1) { diff --git a/src/scheduler/async.ts b/src/scheduler/async.ts new file mode 100644 index 00000000000..fce4d75f681 --- /dev/null +++ b/src/scheduler/async.ts @@ -0,0 +1,3 @@ +import {AsyncScheduler} from './AsyncScheduler'; + +export const async = new AsyncScheduler();