Skip to content

Commit

Permalink
feat(from): allow Observable.from to handle array-like objects
Browse files Browse the repository at this point in the history
this brings the signature and functionality closer to `Observable.from`
that is in RxJS4.
  • Loading branch information
justinwoo authored and kwonoj committed Jan 31, 2016
1 parent 2c65ed4 commit 7245005
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 2 deletions.
61 changes: 61 additions & 0 deletions spec/observables/from-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,67 @@ describe('Observable.from', function () {
}, null, done);
}, 300);

it('should handle an ArrayLike', function (done) {
var arrayLike = {
length: 3,
0: 1,
1: 2,
2: 3
};
var expected = [1, 2, 3];
var i = 0;
Observable.from(arrayLike).subscribe(function (x) {
expect(x).toBe(expected[i++]);
}, null, done);
}, 300);

it('should handle an ArrayLike from arguments', function (done) {
function makeArrayLike() {
var expected = [1, 2, 3];
var i = 0;

Observable.from(arguments).subscribe(function (x) {
expect(x).toBe(expected[i++]);
}, null, done);
}

makeArrayLike(1, 2, 3);
}, 300);

it('should handle an ArrayLike with a mapFn', function (done) {
var arrayLike = {
length: 3,
0: 1,
1: 2,
2: 3
};
var expected = [1, 1, 1];
var i = 0;
var mapFn = function (v, k) {
return v - k;
};
Observable.from(arrayLike, mapFn).subscribe(function (x) {
expect(x).toBe(expected[i++]);
}, null, done);
}, 300);

it('should handle an ArrayLike with a thisArg', function (done) {
var arrayLike = {
length: 3,
0: 1,
1: 2,
2: 3
};
var expected = [123, 123, 123];
var i = 0;
var mapFn = function (x, y) {
return this.thing;
};
Observable.from(arrayLike, mapFn, {thing: 123}).subscribe(function (x) {
expect(x).toBe(expected[i++]);
}, null, done);
});

it('should handle a promise', function (done) {
var promise = Promise.resolve('pinky swear');

Expand Down
74 changes: 74 additions & 0 deletions src/observable/ArrayLikeObservable.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import {Scheduler} from '../Scheduler';
import {Observable} from '../Observable';
import {ScalarObservable} from './ScalarObservable';
import {EmptyObservable} from './EmptyObservable';
import {Subscriber} from '../Subscriber';
import {Subscription} from '../Subscription';

export class ArrayLikeObservable<T> extends Observable<T> {

private mapFn: (x: any, y: number) => T;

static create<T>(arrayLike: ArrayLike<T>, mapFn: (x: any, y: number) => T, thisArg: any, scheduler?: Scheduler): Observable<T> {
const length = arrayLike.length;
if (length === 0) {
return new EmptyObservable<T>();
} else if (length === 1 && !mapFn) {
return new ScalarObservable<T>(<any>arrayLike[0], scheduler);
} else {
return new ArrayLikeObservable(arrayLike, mapFn, thisArg, scheduler);
}
}

static dispatch(state: any) {
const { arrayLike, index, length, mapFn, subscriber } = state;

if (subscriber.isUnsubscribed) {
return;
}

if (index >= length) {
subscriber.complete();
return;
}

const result = mapFn ? mapFn(arrayLike[index], index) : arrayLike[index];
subscriber.next(result);

state.index = index + 1;

(<any> this).schedule(state);
}

// value used if Array has one value and _isScalar
private value: any;

constructor(private arrayLike: ArrayLike<T>, mapFn: (x: any, y: number) => T, thisArg: any, private scheduler?: Scheduler) {
super();
if (!mapFn && !scheduler && arrayLike.length === 1) {
this._isScalar = true;
this.value = arrayLike[0];
}
if (mapFn) {
this.mapFn = mapFn.bind(thisArg);
}
}

protected _subscribe(subscriber: Subscriber<T>): Subscription | Function | void {
let index = 0;
const { arrayLike, mapFn, scheduler } = this;
const length = arrayLike.length;

if (scheduler) {
return scheduler.schedule(ArrayLikeObservable.dispatch, 0, {
arrayLike, index, length, mapFn, subscriber
});
} else {
for (let i = 0; i < length && !subscriber.isUnsubscribed; i++) {
const result = mapFn ? mapFn(arrayLike[i], i) : arrayLike[i];
subscriber.next(result);
}
subscriber.complete();
}
}
}
20 changes: 18 additions & 2 deletions src/observable/FromObservable.ts
Original file line number Diff line number Diff line change
@@ -1,33 +1,49 @@
import {isArray} from '../util/isArray';
import {isFunction} from '../util/isFunction';
import {isPromise} from '../util/isPromise';
import {isScheduler} from '../util/isScheduler';
import {PromiseObservable} from './PromiseObservable';
import {IteratorObservable} from'./IteratorObservable';
import {ArrayObservable} from './ArrayObservable';
import {ArrayLikeObservable} from './ArrayLikeObservable';

import {Scheduler} from '../Scheduler';
import {SymbolShim} from '../util/SymbolShim';
import {Observable} from '../Observable';
import {Subscriber} from '../Subscriber';
import {ObserveOnSubscriber} from '../operator/observeOn';

const isArrayLike = (<T>(x: any): x is ArrayLike<T> => x && typeof x.length === 'number');

export class FromObservable<T> extends Observable<T> {
constructor(private ish: Observable<T> | Promise<T> | Iterator<T> | ArrayLike<T>, private scheduler: Scheduler) {
super(null);
}

static create<T>(ish: any, scheduler: Scheduler = null): Observable<T> {
static create<T>(ish: any, mapFnOrScheduler: Scheduler | ((x: any, y: number) => T), thisArg?: any, lastScheduler?: Scheduler): Observable<T> {
let scheduler: Scheduler = null;
let mapFn: (x: number, y: any) => T = null;
if (isFunction(mapFnOrScheduler)) {
scheduler = lastScheduler || null;
mapFn = <(x: number, y: any) => T> mapFnOrScheduler;
} else if (isScheduler(scheduler)) {
scheduler = <Scheduler> mapFnOrScheduler;
}

if (ish != null) {
if (typeof ish[SymbolShim.observable] === 'function') {
if (ish instanceof Observable && !scheduler) {
return ish;
}
return new FromObservable(ish, scheduler);
} if (isArray(ish)) {
} else if (isArray(ish)) {
return new ArrayObservable(ish, scheduler);
} else if (isPromise(ish)) {
return new PromiseObservable(ish, scheduler);
} else if (typeof ish[SymbolShim.iterator] === 'function' || typeof ish === 'string') {
return new IteratorObservable<T>(<any>ish, null, null, scheduler);
} else if (isArrayLike(ish)) {
return new ArrayLikeObservable(ish, mapFn, thisArg, scheduler);
}
}

Expand Down

0 comments on commit 7245005

Please sign in to comment.