Skip to content

Commit

Permalink
feat(async): adds AsyncScheduler and updates appropriate operators.
Browse files Browse the repository at this point in the history
  • Loading branch information
trxcllnt committed Feb 26, 2016
1 parent 6279d6b commit 1aae1b7
Show file tree
Hide file tree
Showing 19 changed files with 76 additions and 36 deletions.
2 changes: 1 addition & 1 deletion spec/observables/interval-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ describe('Observable.interval', function () {
it('should specify default scheduler if incorrect scheduler specified', function () {
var scheduler = Observable.interval(10, jasmine.createSpy('dummy')).scheduler;

expect(scheduler).toBe(Rx.Scheduler.asap);
expect(scheduler).toBe(Rx.Scheduler.async);
});

it('should emit when relative interval set to zero', function () {
Expand Down
3 changes: 3 additions & 0 deletions src/Rx.DOM.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,11 @@ import {EmptyError} from './util/EmptyError';
import {ArgumentOutOfRangeError} from './util/ArgumentOutOfRangeError';
import {ObjectUnsubscribedError} from './util/ObjectUnsubscribedError';
import {asap} from './scheduler/asap';
import {async} from './scheduler/async';
import {queue} from './scheduler/queue';
import {animationFrame} from './scheduler/animationFrame';
import {AsapScheduler} from './scheduler/AsapScheduler';
import {AsyncScheduler} from './scheduler/AsyncScheduler';
import {QueueScheduler} from './scheduler/QueueScheduler';
import {AnimationFrameScheduler} from './scheduler/AnimationFrameScheduler';
import {rxSubscriber} from './symbol/rxSubscriber';
Expand All @@ -136,6 +138,7 @@ import {AjaxRequest, AjaxResponse, AjaxError, AjaxTimeoutError} from './observab
/* tslint:disable:no-var-keyword */
var Scheduler = {
asap,
async,
queue,
animationFrame
};
Expand Down
3 changes: 3 additions & 0 deletions src/Rx.KitchenSink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,10 @@ import {EmptyError} from './util/EmptyError';
import {ObjectUnsubscribedError} from './util/ObjectUnsubscribedError';
import {ArgumentOutOfRangeError} from './util/ArgumentOutOfRangeError';
import {asap} from './scheduler/asap';
import {async} from './scheduler/async';
import {queue} from './scheduler/queue';
import {AsapScheduler} from './scheduler/AsapScheduler';
import {AsyncScheduler} from './scheduler/AsyncScheduler';
import {QueueScheduler} from './scheduler/QueueScheduler';
import {TimeInterval} from './operator/timeInterval';
import {TestScheduler} from './testing/TestScheduler';
Expand All @@ -183,6 +185,7 @@ import {rxSubscriber} from './symbol/rxSubscriber';
/* tslint:disable:no-var-keyword */
var Scheduler = {
asap,
async,
queue
};

Expand Down
3 changes: 3 additions & 0 deletions src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,15 +129,18 @@ import {EmptyError} from './util/EmptyError';
import {ArgumentOutOfRangeError} from './util/ArgumentOutOfRangeError';
import {ObjectUnsubscribedError} from './util/ObjectUnsubscribedError';
import {asap} from './scheduler/asap';
import {async} from './scheduler/async';
import {queue} from './scheduler/queue';
import {AsapScheduler} from './scheduler/AsapScheduler';
import {AsyncScheduler} from './scheduler/AsyncScheduler';
import {QueueScheduler} from './scheduler/QueueScheduler';
import {rxSubscriber} from './symbol/rxSubscriber';
/* tslint:enable:no-unused-variable */

/* tslint:disable:no-var-keyword */
var Scheduler = {
asap,
async,
queue
};

Expand Down
8 changes: 4 additions & 4 deletions src/observable/IntervalObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ import {Subscriber} from '../Subscriber';
import {isNumeric} from '../util/isNumeric';
import {Scheduler} from '../Scheduler';
import {Observable} from '../Observable';
import {asap} from '../scheduler/asap';
import {async} from '../scheduler/async';

export class IntervalObservable extends Observable<number> {
static create(period: number = 0, scheduler: Scheduler = asap): Observable<number> {
static create(period: number = 0, scheduler: Scheduler = async): Observable<number> {
return new IntervalObservable(period, scheduler);
}

Expand All @@ -23,13 +23,13 @@ export class IntervalObservable extends Observable<number> {
(<any> this).schedule(state, period);
}

constructor(private period: number = 0, private scheduler: Scheduler = asap) {
constructor(private period: number = 0, private scheduler: Scheduler = async) {
super();
if (!isNumeric(period) || period < 0) {
this.period = 0;
}
if (!scheduler || typeof scheduler.schedule !== 'function') {
this.scheduler = asap;
this.scheduler = async;
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/observable/TimerObservable.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {isNumeric} from '../util/isNumeric';
import {Scheduler} from '../Scheduler';
import {Observable} from '../Observable';
import {asap} from '../scheduler/asap';
import {async} from '../scheduler/async';
import {isScheduler} from '../util/isScheduler';
import {isDate} from '../util/isDate';
import {Subscription} from '../Subscription';
Expand Down Expand Up @@ -46,7 +46,7 @@ export class TimerObservable extends Observable<number> {
}

if (!isScheduler(scheduler)) {
scheduler = asap;
scheduler = async;
}

this.scheduler = scheduler;
Expand Down
6 changes: 3 additions & 3 deletions src/operator/bufferTime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {Subscriber} from '../Subscriber';
import {Observable} from '../Observable';
import {Scheduler} from '../Scheduler';
import {Action} from '../scheduler/Action';
import {asap} from '../scheduler/asap';
import {async} from '../scheduler/async';

/**
* Buffers values from the source for a specific time period. Optionally allows
Expand All @@ -15,14 +15,14 @@ import {asap} from '../scheduler/asap';
* before emitting them and clearing them.
* @param {number} [bufferCreationInterval] the interval at which to start new
* buffers.
* @param {Scheduler} [scheduler] (optional, defaults to `asap` scheduler) The
* @param {Scheduler} [scheduler] (optional, defaults to `async` scheduler) The
* scheduler on which to schedule the intervals that determine buffer
* boundaries.
* @returns {Observable<T[]>} an observable of arrays of buffered values.
*/
export function bufferTime<T>(bufferTimeSpan: number,
bufferCreationInterval: number = null,
scheduler: Scheduler = asap): Observable<T[]> {
scheduler: Scheduler = async): Observable<T[]> {
return this.lift(new BufferTimeOperator<T>(bufferTimeSpan, bufferCreationInterval, scheduler));
}

Expand Down
4 changes: 2 additions & 2 deletions src/operator/debounceTime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {Observable} from '../Observable';
import {Subscriber} from '../Subscriber';
import {Scheduler} from '../Scheduler';
import {Subscription} from '../Subscription';
import {asap} from '../scheduler/asap';
import {async} from '../scheduler/async';

/**
* Returns the source Observable delayed by the computed debounce duration,
Expand All @@ -17,7 +17,7 @@ import {asap} from '../scheduler/asap';
* @param {Scheduler} [scheduler] the Scheduler to use for managing the timers that handle the timeout for each item.
* @returns {Observable} an Observable the same as source Observable, but drops items.
*/
export function debounceTime<T>(dueTime: number, scheduler: Scheduler = asap): Observable<T> {
export function debounceTime<T>(dueTime: number, scheduler: Scheduler = async): Observable<T> {
return this.lift(new DebounceTimeOperator(dueTime, scheduler));
}

Expand Down
4 changes: 2 additions & 2 deletions src/operator/delay.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {asap} from '../scheduler/asap';
import {async} from '../scheduler/async';
import {isDate} from '../util/isDate';
import {Operator} from '../Operator';
import {Scheduler} from '../Scheduler';
Expand All @@ -14,7 +14,7 @@ import {Observable} from '../Observable';
* @returns {Observable} an Observable that delays the emissions of the source Observable by the specified timeout or Date.
*/
export function delay<T>(delay: number|Date,
scheduler: Scheduler = asap): Observable<T> {
scheduler: Scheduler = async): Observable<T> {
const absoluteDelay = isDate(delay);
const delayFor = absoluteDelay ? (+delay - scheduler.now()) : Math.abs(<number>delay);
return this.lift(new DelayOperator(delayFor, scheduler));
Expand Down
4 changes: 2 additions & 2 deletions src/operator/inspectTime.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import {asap} from '../scheduler/asap';
import {async} from '../scheduler/async';
import {Operator} from '../Operator';
import {Scheduler} from '../Scheduler';
import {Subscriber} from '../Subscriber';
import {Observable} from '../Observable';
import {Subscription} from '../Subscription';

export function inspectTime<T>(delay: number, scheduler: Scheduler = asap): Observable<T> {
export function inspectTime<T>(delay: number, scheduler: Scheduler = async): Observable<T> {
return this.lift(new InspectTimeOperator(delay, scheduler));
}

Expand Down
4 changes: 2 additions & 2 deletions src/operator/sampleTime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ import {Observable} from '../Observable';
import {Operator} from '../Operator';
import {Subscriber} from '../Subscriber';
import {Scheduler} from '../Scheduler';
import {asap} from '../scheduler/asap';
import {async} from '../scheduler/async';

export function sampleTime<T>(delay: number, scheduler: Scheduler = asap): Observable<T> {
export function sampleTime<T>(delay: number, scheduler: Scheduler = async): Observable<T> {
return this.lift(new SampleTimeOperator(delay, scheduler));
}

Expand Down
4 changes: 2 additions & 2 deletions src/operator/throttleTime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ import {Operator} from '../Operator';
import {Subscriber} from '../Subscriber';
import {Scheduler} from '../Scheduler';
import {Subscription} from '../Subscription';
import {asap} from '../scheduler/asap';
import {async} from '../scheduler/async';
import {Observable} from '../Observable';

export function throttleTime<T>(delay: number, scheduler: Scheduler = asap): Observable<T> {
export function throttleTime<T>(delay: number, scheduler: Scheduler = async): Observable<T> {
return this.lift(new ThrottleTimeOperator(delay, scheduler));
}

Expand Down
4 changes: 2 additions & 2 deletions src/operator/timeInterval.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ import {Operator} from '../Operator';
import {Observable} from '../Observable';
import {Subscriber} from '../Subscriber';
import {Scheduler} from '../Scheduler';
import {asap} from '../scheduler/asap';
import {async} from '../scheduler/async';

export function timeInterval<T>(scheduler: Scheduler = asap): Observable<TimeInterval<T>> {
export function timeInterval<T>(scheduler: Scheduler = async): Observable<TimeInterval<T>> {
return this.lift(new TimeIntervalOperator(scheduler));
}

Expand Down
4 changes: 2 additions & 2 deletions src/operator/timeout.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {asap} from '../scheduler/asap';
import {async} from '../scheduler/async';
import {isDate} from '../util/isDate';
import {Operator} from '../Operator';
import {Subscriber} from '../Subscriber';
Expand All @@ -7,7 +7,7 @@ import {Observable} from '../Observable';

export function timeout<T>(due: number | Date,
errorToSend: any = null,
scheduler: Scheduler = asap): Observable<T> {
scheduler: Scheduler = async): Observable<T> {
let absoluteTimeout = isDate(due);
let waitFor = absoluteTimeout ? (+due - scheduler.now()) : Math.abs(<number>due);
return this.lift(new TimeoutOperator(waitFor, absoluteTimeout, errorToSend, scheduler));
Expand Down
4 changes: 2 additions & 2 deletions src/operator/timeoutWith.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {Operator} from '../Operator';
import {Subscriber} from '../Subscriber';
import {Scheduler} from '../Scheduler';
import {asap} from '../scheduler/asap';
import {async} from '../scheduler/async';
import {Subscription} from '../Subscription';
import {Observable} from '../Observable';
import {isDate} from '../util/isDate';
Expand All @@ -10,7 +10,7 @@ import {subscribeToResult} from '../util/subscribeToResult';

export function timeoutWith<T, R>(due: number | Date,
withObservable: Observable<R>,
scheduler: Scheduler = asap): Observable<T | R> {
scheduler: Scheduler = async): Observable<T | R> {
let absoluteTimeout = isDate(due);
let waitFor = absoluteTimeout ? (+due - scheduler.now()) : Math.abs(<number>due);
return this.lift(new TimeoutWithOperator(waitFor, absoluteTimeout, withObservable, scheduler));
Expand Down
4 changes: 2 additions & 2 deletions src/operator/windowTime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import {Observable} from '../Observable';
import {Subject} from '../Subject';
import {Scheduler} from '../Scheduler';
import {Action} from '../scheduler/Action';
import {asap} from '../scheduler/asap';
import {async} from '../scheduler/async';

export function windowTime<T>(windowTimeSpan: number,
windowCreationInterval: number = null,
scheduler: Scheduler = asap): Observable<Observable<T>> {
scheduler: Scheduler = async): Observable<Observable<T>> {
return this.lift(new WindowTimeOperator<T>(windowTimeSpan, windowCreationInterval, scheduler));
}

Expand Down
10 changes: 10 additions & 0 deletions src/scheduler/AsyncScheduler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import {Action} from './Action';
import {FutureAction} from './FutureAction';
import {Subscription} from '../Subscription';
import {QueueScheduler} from './QueueScheduler';

export class AsyncScheduler extends QueueScheduler {
scheduleNow<T>(work: (x?: any) => Subscription, state?: any): Action {
return new FutureAction(this, work).schedule(state, 0);
}
}
34 changes: 26 additions & 8 deletions src/scheduler/FutureAction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ export class FutureAction<T> extends Subscription implements Action {
public id: any;
public state: any;
public delay: number;
private pending: boolean = false;

constructor(public scheduler: Scheduler,
public work: (x?: any) => Subscription | void) {
Expand All @@ -17,8 +18,14 @@ export class FutureAction<T> extends Subscription implements Action {
execute() {
if (this.isUnsubscribed) {
throw new Error('How did did we execute a canceled Action?');
} else {
try {
this.work(this.state);
} catch (e) {
this.unsubscribe();
throw e;
}
}
this.work(this.state);
}

schedule(state?: any, delay: number = 0): Action {
Expand All @@ -30,34 +37,45 @@ export class FutureAction<T> extends Subscription implements Action {

protected _schedule(state?: any, delay: number = 0): Action {

this.delay = delay;
this.state = state;
this.pending = true;
const id = this.id;
this.state = state;

if (id != null) {
this.id = undefined;
root.clearTimeout(id);
// If we've already called setInterval with this delay, don't call it again.
if (id != null && this.delay === delay) {
return this;
}

this.id = root.setTimeout(() => {
this.delay = delay;

if (id != null) {
this.id = null;
root.clearInterval(id);
}

this.id = root.setInterval(() => {
this.pending = false;
const {scheduler} = this;
scheduler.actions.push(this);
scheduler.flush();
if (this.pending === false) {
this.unsubscribe();
}
}, delay);

return this;
}

protected _unsubscribe() {

this.pending = false;
const {id, scheduler} = this;
const {actions} = scheduler;
const index = actions.indexOf(this);

if (id != null) {
this.id = null;
root.clearTimeout(id);
root.clearInterval(id);
}

if (index !== -1) {
Expand Down
3 changes: 3 additions & 0 deletions src/scheduler/async.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import {AsyncScheduler} from './AsyncScheduler';

export const async = new AsyncScheduler();

0 comments on commit 1aae1b7

Please sign in to comment.