Skip to content

Commit

Permalink
feat(from): allow Observable.from to handle array-like objects
Browse files Browse the repository at this point in the history
this brings the signature and functionality closer to `Observable.from`
that is in RxJS4.
  • Loading branch information
justinwoo committed Jan 16, 2016
1 parent 52eea95 commit 85cc8af
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 2 deletions.
48 changes: 48 additions & 0 deletions spec/observables/from-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,54 @@ describe('Observable.from', function () {
}, null, done);
}, 300);

it('should handle an ArrayLike', function (done) {
var arrayLike = {
length: 3,
0: 1,
1: 2,
2: 3
};
var expected = [1, 2, 3];
var i = 0;
Observable.from(arrayLike).subscribe(function (x) {
expect(x).toBe(expected[i++]);
}, null, done);
}, 300);

it('should handle an ArrayLike with a mapFn', function (done) {
var arrayLike = {
length: 3,
0: 1,
1: 2,
2: 3
};
var expected = [0, 2, 6];
var i = 0;
var mapFn = function (x, y) {
return x * y;
};
Observable.from(arrayLike, mapFn).subscribe(function (x) {
expect(x).toBe(expected[i++]);
}, null, done);
}, 300);

it('should handle an ArrayLike with a thisArg', function (done) {
var arrayLike = {
length: 3,
0: 1,
1: 2,
2: 3
};
var expected = [123, 123, 123];
var i = 0;
var mapFn = function (x, y) {
return this.thing;
};
Observable.from(arrayLike, mapFn, {thing: 123}).subscribe(function (x) {
expect(x).toBe(expected[i++]);
}, null, done);
});

it('should handle a promise', function (done) {
var promise = Promise.resolve('pinky swear');

Expand Down
19 changes: 17 additions & 2 deletions src/observable/from.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import {isArray} from '../util/isArray';
import {isArrayLike} from '../util/isArrayLike';
import {isFunction} from '../util/isFunction';
import {isPromise} from '../util/isPromise';
import {isScheduler} from '../util/isScheduler';
import {PromiseObservable} from './fromPromise';
import {IteratorObservable} from'./IteratorObservable';
import {ArrayObservable} from './fromArray';
import {ArrayLikeObservable} from './fromArrayLike';

import {Scheduler} from '../Scheduler';
import {SymbolShim} from '../util/SymbolShim';
Expand All @@ -15,19 +19,30 @@ export class FromObservable<T> extends Observable<T> {
super(null);
}

static create<T>(ish: any, scheduler: Scheduler = null): Observable<T> {
static create<T>(ish: any, mapFnOrScheduler: Scheduler | ((x: number, y: any) => T), thisArg?: any, lastScheduler?: Scheduler): Observable<T> {
let scheduler: Scheduler = null;
let mapFn: (x: number, y: any) => T = null;
if (isFunction(mapFnOrScheduler)) {
scheduler = lastScheduler || null;
mapFn = <(x: number, y: any) => T> mapFnOrScheduler;
} else if (isScheduler(scheduler)) {
scheduler = <Scheduler> mapFnOrScheduler;
}

if (ish != null) {
if (typeof ish[SymbolShim.observable] === 'function') {
if (ish instanceof Observable && !scheduler) {
return ish;
}
return new FromObservable(ish, scheduler);
} if (isArray(ish)) {
} else if (isArray(ish)) {
return new ArrayObservable(ish, scheduler);
} else if (isPromise(ish)) {
return new PromiseObservable(ish, scheduler);
} else if (typeof ish[SymbolShim.iterator] === 'function' || typeof ish === 'string') {
return new IteratorObservable<T>(<any>ish, null, null, scheduler);
} else if (isArrayLike(ish)) {
return new ArrayLikeObservable(ish, mapFn, thisArg, scheduler);
}
}

Expand Down
66 changes: 66 additions & 0 deletions src/observable/fromArrayLike.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import {Scheduler} from '../Scheduler';
import {Observable} from '../Observable';
import {Subscriber} from '../Subscriber';
import {Subscription} from '../Subscription';

export class ArrayLikeObservable<T> extends Observable<T> {

private mapFn: (x: number, y: any) => T;

static create<T>(arrayLike: ArrayLike<T>, mapFn: (x: number, y: any) => T, thisArg: any, scheduler?: Scheduler) {
return new ArrayLikeObservable(arrayLike, mapFn, thisArg, scheduler);
}

static dispatch(state: any) {

const { arrayLike, index, count, mapFn, subscriber } = state;

if (index >= count) {
subscriber.complete();
return;
}

const result = mapFn ? mapFn(index, arrayLike[index]) : arrayLike[index];
subscriber.next(result);

if (subscriber.isUnsubscribed) {
return;
}

state.index = index + 1;

(<any> this).schedule(state);
}

// value used if Array has one value and _isScalar
value: any;

constructor(public arrayLike: ArrayLike<T>, mapFn: (x: number, y: any) => T, thisArg: any, public scheduler?: Scheduler) {
super();
if (!scheduler && arrayLike.length === 1) {
this._isScalar = true;
this.value = arrayLike[0];
}
if (mapFn) {
this.mapFn = mapFn.bind(thisArg);
}
}

_subscribe(subscriber: Subscriber<T>): Subscription | Function | void {
let index = 0;
const { arrayLike, mapFn, scheduler } = this;
const count = arrayLike.length;

if (scheduler) {
return scheduler.schedule(ArrayLikeObservable.dispatch, 0, {
arrayLike, index, count, mapFn, subscriber
});
} else {
for (let i = 0; i < count && !subscriber.isUnsubscribed; i++) {
let result = mapFn ? mapFn(i, arrayLike[i]) : arrayLike[i];
subscriber.next(result);
}
subscriber.complete();
}
}
}
1 change: 1 addition & 0 deletions src/util/isArrayLike.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export const isArrayLike = (<T>(x: any): x is ArrayLike<T> => x && typeof x.length === 'number');

0 comments on commit 85cc8af

Please sign in to comment.