Skip to content

Commit

Permalink
chore(typings): added type signatures for concat/exhaust/merge/switch
Browse files Browse the repository at this point in the history
also adds signatures for the "All", "Map", and "MapTo" variants.
  • Loading branch information
david-driscoll committed Feb 24, 2016
1 parent 9b9b2de commit 212084d
Show file tree
Hide file tree
Showing 26 changed files with 312 additions and 168 deletions.
52 changes: 32 additions & 20 deletions src/CoreOperators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,21 @@ import {TimeoutSignature} from './operator/timeout';
import {TimeoutWithSignature} from './operator/timeoutWith';
import {ToArraySignature} from './operator/toArray';
import {ToPromiseSignature} from './operator/toPromise';
import {CombineAllSignature} from './operator/combineAll';
import {ConcatSignature} from './operator/concat';
import {ConcatAllSignature} from './operator/concatAll';
import {ConcatMapSignature} from './operator/concatMap';
import {ConcatMapToSignature} from './operator/concatMapTo';
import {MapSignature} from './operator/map';
import {MapToSignature} from './operator/mapTo';
import {MergeSignature} from './operator/merge';
import {MergeAllSignature} from './operator/mergeAll';
import {MergeMapSignature} from './operator/mergeMap';
import {MergeMapToSignature} from './operator/mergeMapTo';
import {SwitchSignature} from './operator/switch';
import {SwitchMapSignature} from './operator/switchMap';
import {SwitchMapToSignature} from './operator/switchMapTo';
import {ZipAllSignature} from './operator/zipAll';

export interface CoreOperators<T> {
buffer: BufferSignature<T>;
Expand All @@ -79,12 +94,12 @@ export interface CoreOperators<T> {
bufferWhen: BufferWhenSignature<T>;
cache: CacheSignature<T>;
catch: CatchSignature<T>;
combineAll: <R>(project?: (...values: Array<any>) => R) => Observable<R>;
combineAll: CombineAllSignature<T>;
combineLatest: CombineLatestSignature<T>;
concat?: <R>(...observables: (Observable<any> | Scheduler)[]) => Observable<R>;
concatAll?: () => Observable<T>;
concatMap?: <R>(project: ((x: T, ix: number) => Observable<any>), projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable<R>;
concatMapTo?: <R>(observable: Observable<any>, projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable<R>;
concat: ConcatSignature<T>;
concatAll: ConcatAllSignature<T>;
concatMap: ConcatMapSignature<T>;
concatMapTo: ConcatMapToSignature<T>;
count: CountSignature<T>;
dematerialize: DematerializeSignature<T>;
debounce: DebounceSignature<T>;
Expand All @@ -98,10 +113,8 @@ export interface CoreOperators<T> {
filter: FilterSignature<T>;
finally: FinallySignature<T>;
first: FirstSignature<T>;
flatMap?: <R>(project: ((x: T, ix: number) => Observable<any>),
projectResult?: (x: T, y: any, ix: number, iy: number) => R,
concurrent?: number) => Observable<R>;
flatMapTo?: <R>(observable: Observable<any>, projectResult?: (x: T, y: any, ix: number, iy: number) => R, concurrent?: number) => Observable<R>;
flatMap: MergeMapSignature<T>;
flatMapTo: MergeMapToSignature<T>;
groupBy: GroupBySignature<T>;
ignoreElements: IgnoreElementsSignature<T>;
inspect: InspectSignature<T>;
Expand All @@ -110,14 +123,13 @@ export interface CoreOperators<T> {
let: LetSignature<T>;
letBind: LetSignature<T>;
every: EverySignature<T>;
map?: <R>(project: (x: T, ix?: number) => R, thisArg?: any) => Observable<R>;
mapTo?: <R>(value: R) => Observable<R>;
map: MapSignature<T>;
mapTo: MapToSignature<T>;
materialize: MaterializeSignature<T>;
merge?: (...observables: any[]) => Observable<any>;
mergeAll?: (concurrent?: number) => Observable<T>;
mergeMap?: <R>(project: ((x: T, ix: number) => Observable<any>),
projectResult?: (x: T, y: any, ix: number, iy: number) => R, concurrent?: number) => Observable<R>;
mergeMapTo?: <R>(observable: Observable<any>, projectResult?: (x: T, y: any, ix: number, iy: number) => R, concurrent?: number) => Observable<R>;
merge: MergeSignature<T>;
mergeAll: MergeAllSignature<T>;
mergeMap: MergeMapSignature<T>;
mergeMapTo: MergeMapToSignature<T>;
multicast: MulticastSignature<T>;
observeOn: ObserveOnSignature<T>;
partition: PartitionSignature<T>;
Expand All @@ -141,9 +153,9 @@ export interface CoreOperators<T> {
skipWhile: SkipWhileSignature<T>;
startWith: StartWithSignature<T>;
subscribeOn: SubscribeOnSignature<T>;
switch?: () => Observable<T>;
switchMap?: <R>(project: ((x: T, ix: number) => Observable<any>), projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable<R>;
switchMapTo?: <R>(observable: Observable<any>, projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable<R>;
switch: SwitchSignature<T>;
switchMap: SwitchMapSignature<T>;
switchMapTo: SwitchMapToSignature<T>;
take: TakeSignature<T>;
takeLast: TakeLastSignature<T>;
takeUntil: TakeUntilSignature<T>;
Expand All @@ -161,5 +173,5 @@ export interface CoreOperators<T> {
windowWhen: WindowWhenSignature<T>;
withLatestFrom: WithLatestFromSignature<T>;
zip: ZipSignature<T>;
zipAll?: <R>(project?: (...values: Array<any>) => R) => Observable<R>;
zipAll: ZipAllSignature<T>;
}
53 changes: 32 additions & 21 deletions src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,21 @@ import {TimeoutSignature} from './operator/timeout';
import {TimeoutWithSignature} from './operator/timeoutWith';
import {ToArraySignature} from './operator/toArray';
import {ToPromiseSignature} from './operator/toPromise';
import {CombineAllSignature} from './operator/combineAll';
import {ConcatSignature} from './operator/concat';
import {ConcatAllSignature} from './operator/concatAll';
import {ConcatMapSignature} from './operator/concatMap';
import {ConcatMapToSignature} from './operator/concatMapTo';
import {MapSignature} from './operator/map';
import {MapToSignature} from './operator/mapTo';
import {MergeSignature} from './operator/merge';
import {MergeAllSignature} from './operator/mergeAll';
import {MergeMapSignature} from './operator/mergeMap';
import {MergeMapToSignature} from './operator/mergeMapTo';
import {SwitchSignature} from './operator/switch';
import {SwitchMapSignature} from './operator/switchMap';
import {SwitchMapToSignature} from './operator/switchMapTo';
import {ZipAllSignature} from './operator/zipAll';

export type ObservableOrPromise<T> = Observable<T> | Promise<T>;
export type ArrayOrIterator<T> = Iterator<T> | ArrayLike<T>;
Expand Down Expand Up @@ -264,12 +279,12 @@ export class Observable<T> implements CoreOperators<T> {
bufferWhen: BufferWhenSignature<T>;
cache: CacheSignature<T>;
catch: CatchSignature<T>;
combineAll: <R>(project?: (...values: Array<any>) => R) => Observable<R>;
combineAll: CombineAllSignature<T>;
combineLatest: CombineLatestSignature<T>;
concat: <R>(...observables: (Observable<any> | Scheduler)[]) => Observable<R>;
concatAll: () => Observable<any>;
concatMap: <R>(project: ((x: T, ix: number) => Observable<any>), projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable<R>;
concatMapTo: <R>(observable: Observable<any>, projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable<R>;
concat: ConcatSignature<T>;
concatAll: ConcatAllSignature<T>;
concatMap: ConcatMapSignature<T>;
concatMapTo: ConcatMapToSignature<T>;
count: CountSignature<T>;
dematerialize: DematerializeSignature<T>;
debounce: DebounceSignature<T>;
Expand All @@ -283,10 +298,8 @@ export class Observable<T> implements CoreOperators<T> {
filter: FilterSignature<T>;
finally: FinallySignature<T>;
first: FirstSignature<T>;
flatMap: <R>(project: ((x: T, ix: number) => Observable<any>),
projectResult?: (x: T, y: any, ix: number, iy: number) => R,
concurrent?: number) => Observable<R>;
flatMapTo: <R>(observable: Observable<any>, projectResult?: (x: T, y: any, ix: number, iy: number) => R, concurrent?: number) => Observable<R>;
flatMap: MergeMapSignature<T>;
flatMapTo: MergeMapToSignature<T>;
groupBy: GroupBySignature<T>;
ignoreElements: IgnoreElementsSignature<T>;
inspect: InspectSignature<T>;
Expand All @@ -295,15 +308,13 @@ export class Observable<T> implements CoreOperators<T> {
let: LetSignature<T>;
letBind: LetSignature<T>;
every: EverySignature<T>;
map: <R>(project: (x: T, ix?: number) => R, thisArg?: any) => Observable<R>;
mapTo: <R>(value: R) => Observable<R>;
map: MapSignature<T>;
mapTo: MapToSignature<T>;
materialize: MaterializeSignature<T>;
merge: (...observables: any[]) => Observable<any>;
mergeAll: (concurrent?: any) => Observable<any>;
mergeMap: <R>(project: ((x: T, ix: number) => Observable<any>),
projectResult?: (x: T, y: any, ix: number, iy: number) => R,
concurrent?: number) => Observable<R>;
mergeMapTo: <R>(observable: Observable<any>, projectResult?: (x: T, y: any, ix: number, iy: number) => R, concurrent?: number) => Observable<R>;
merge: MergeSignature<T>;
mergeAll: MergeAllSignature<T>;
mergeMap: MergeMapSignature<T>;
mergeMapTo: MergeMapToSignature<T>;
multicast: MulticastSignature<T>;
observeOn: ObserveOnSignature<T>;
partition: PartitionSignature<T>;
Expand All @@ -327,9 +338,9 @@ export class Observable<T> implements CoreOperators<T> {
skipWhile: SkipWhileSignature<T>;
startWith: StartWithSignature<T>;
subscribeOn: SubscribeOnSignature<T>;
switch: <R>() => Observable<R>;
switchMap: <R>(project: ((x: T, ix: number) => Observable<any>), projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable<R>;
switchMapTo: <R>(observable: Observable<any>, projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable<R>;
switch: SwitchSignature<T>;
switchMap: SwitchMapSignature<T>;
switchMapTo: SwitchMapToSignature<T>;
take: TakeSignature<T>;
takeLast: TakeLastSignature<T>;
takeUntil: TakeUntilSignature<T>;
Expand All @@ -347,7 +358,7 @@ export class Observable<T> implements CoreOperators<T> {
windowWhen: WindowWhenSignature<T>;
withLatestFrom: WithLatestFromSignature<T>;
zip: ZipSignature<T>;
zipAll: <R>(project?: (...values: Array<any>) => R) => Observable<R>;
zipAll: ZipAllSignature<T>;

/**
* @method Symbol.observable
Expand Down
7 changes: 4 additions & 3 deletions src/Rx.KitchenSink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import {MinSignature} from './operator/min';
import {PairwiseSignature} from './operator/pairwise';
import {TimeIntervalSignature} from './operator/timeInterval';
import {MergeScanSignature} from './operator/mergeScan';
import {SwitchFirstSignature} from './operator/exhaust';
import {SwitchFirstMapSignature} from './operator/exhaustMap';

export interface KitchenSinkOperators<T> extends CoreOperators<T> {
isEmpty?: IsEmptySignature<T>;
Expand All @@ -29,9 +31,8 @@ export interface KitchenSinkOperators<T> extends CoreOperators<T> {
pairwise?: PairwiseSignature<T>;
timeInterval?: TimeIntervalSignature<T>;
mergeScan?: MergeScanSignature<T>;
exhaust?: () => Observable<T>;
exhaustMap?: <R>(project: ((x: T, ix: number) => Observable<any>),
projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable<R>;
exhaust?: SwitchFirstSignature<T>;
exhaustMap?: SwitchFirstMapSignature<T>;
}

// statics
Expand Down
4 changes: 2 additions & 2 deletions src/add/operator/mergeMap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import {Observable} from '../../Observable';
import {mergeMap} from '../../operator/mergeMap';

Observable.prototype.mergeMap = mergeMap;
Observable.prototype.flatMap = mergeMap;
Observable.prototype.mergeMap = <any>mergeMap;
Observable.prototype.flatMap = <any>mergeMap;

export var _void: void;
2 changes: 1 addition & 1 deletion src/add/operator/mergeMapTo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
import {Observable} from '../../Observable';
import {mergeMapTo} from '../../operator/mergeMapTo';

Observable.prototype.mergeMapTo = mergeMapTo;
Observable.prototype.mergeMapTo = <any>mergeMapTo;

export var _void: void;
2 changes: 1 addition & 1 deletion src/observable/BoundNodeCallbackObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export class BoundNodeCallbackObservable<T> extends Observable<T> {
selector: Function | void = undefined,
scheduler?: Scheduler): Function {
return (...args: any[]): Observable<T> => {
return new BoundNodeCallbackObservable(callbackFunc, <any>selector, args, scheduler);
return new BoundNodeCallbackObservable<T>(callbackFunc, <any>selector, args, scheduler);
};
}

Expand Down
6 changes: 3 additions & 3 deletions src/observable/FromObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ export class FromObservable<T> extends Observable<T> {
if (ish instanceof Observable && !scheduler) {
return ish;
}
return new FromObservable(ish, scheduler);
return new FromObservable<T>(ish, scheduler);
} else if (isArray(ish)) {
return new ArrayObservable(ish, scheduler);
return new ArrayObservable<T>(ish, scheduler);
} else if (isPromise(ish)) {
return new PromiseObservable(ish, scheduler);
return new PromiseObservable<T>(ish, scheduler);
} else if (typeof ish[SymbolShim.iterator] === 'function' || typeof ish === 'string') {
return new IteratorObservable<T>(<any>ish, null, null, scheduler);
} else if (isArrayLike(ish)) {
Expand Down
2 changes: 1 addition & 1 deletion src/observable/dom/WebSocketSubject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ export class WebSocketSubject<T> extends Subject<T> {
}

static create<T>(urlConfigOrSource: string | WebSocketSubjectConfig): WebSocketSubject<T> {
return new WebSocketSubject(urlConfigOrSource);
return new WebSocketSubject<T>(urlConfigOrSource);
}

constructor(urlConfigOrSource: string | WebSocketSubjectConfig | Observable<T>, destination?: Observer<T>) {
Expand Down
7 changes: 6 additions & 1 deletion src/operator/combineAll.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ import {Observable} from '../Observable';
* most recent values from each collected observable as arguments, in order.
* @returns {Observable} an observable of projected results or arrays of recent values.
*/
export function combineAll<T, R>(project?: (...values: Array<any>) => R): Observable<R> {
export function combineAll<R>(project?: (...values: Array<any>) => R): Observable<R> {
return this.lift(new CombineLatestOperator(project));
}

export interface CombineAllSignature<T> {
(): Observable<T[]>;
<R>(project?: (...values: Array<T>) => R): Observable<R>;
}
33 changes: 28 additions & 5 deletions src/operator/concat.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {Observable} from '../Observable';
import {Observable, ObservableInput} from '../Observable';
import {Scheduler} from '../Scheduler';
import {isScheduler} from '../util/isScheduler';
import {ArrayObservable} from '../observable/ArrayObservable';
Expand All @@ -12,23 +12,46 @@ import {MergeAllOperator} from './mergeAll';
* @params {Scheduler} [scheduler] an optional scheduler to schedule each observable subscription on.
* @returns {Observable} All values of each passed observable merged into a single observable, in order, in serial fashion.
*/
export function concat<T, R>(...observables: Array<Observable<any> | Scheduler>): Observable<R> {
return concatStatic(this, ...observables);
export function concat<T, R>(...observables: Array<ObservableInput<any> | Scheduler>): Observable<R> {
return concatStatic<T, R>(this, ...observables);
}

/* tslint:disable:max-line-length */
export interface ConcatSignature<T> {
(scheduler?: Scheduler): Observable<T>;
<T2>(v2: ObservableInput<T2>, scheduler?: Scheduler): Observable<T | T2>;
<T2, T3>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, scheduler?: Scheduler): Observable<T | T2 | T3>;
<T2, T3, T4>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, scheduler?: Scheduler): Observable<T | T2 | T3 | T4>;
<T2, T3, T4, T5>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, scheduler?: Scheduler): Observable<T | T2 | T3 | T4 | T5>;
<T2, T3, T4, T5, T6>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, v6: ObservableInput<T6>, scheduler?: Scheduler): Observable<T | T2 | T3 | T4 | T5 | T6>;
(...observables: Array<ObservableInput<T> | Scheduler>): Observable<T>;
<R>(...observables: Array<ObservableInput<any> | Scheduler>): Observable<R>;
}
/* tslint:enable:max-line-length */

/**
* Joins multiple observables together by subscribing to them one at a time and merging their results
* into the returned observable. Will wait for each observable to complete before moving on to the next.
* @params {...Observable} the observables to concatenate
* @params {Scheduler} [scheduler] an optional scheduler to schedule each observable subscription on.
* @returns {Observable} All values of each passed observable merged into a single observable, in order, in serial fashion.
*/
export function concatStatic<T, R>(...observables: Array<Observable<any> | Scheduler>): Observable<R> {
/* tslint:disable:max-line-length */
export function concatStatic<T>(v1: ObservableInput<T>, scheduler?: Scheduler): Observable<T>;
export function concatStatic<T, T2>(v1: ObservableInput<T>, v2: ObservableInput<T2>, scheduler?: Scheduler): Observable<T | T2>;
export function concatStatic<T, T2, T3>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, scheduler?: Scheduler): Observable<T | T2 | T3>;
export function concatStatic<T, T2, T3, T4>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, scheduler?: Scheduler): Observable<T | T2 | T3 | T4>;
export function concatStatic<T, T2, T3, T4, T5>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, scheduler?: Scheduler): Observable<T | T2 | T3 | T4 | T5>;
export function concatStatic<T, T2, T3, T4, T5, T6>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, v6: ObservableInput<T6>, scheduler?: Scheduler): Observable<T | T2 | T3 | T4 | T5 | T6>;
export function concatStatic<T>(...observables: (ObservableInput<T> | Scheduler)[]): Observable<T>;
export function concatStatic<T, R>(...observables: (ObservableInput<any> | Scheduler)[]): Observable<R>;
/* tslint:enable:max-line-length */
export function concatStatic<T, R>(...observables: Array<ObservableInput<any> | Scheduler>): Observable<R> {
let scheduler: Scheduler = null;
let args = <any[]>observables;
if (isScheduler(args[observables.length - 1])) {
scheduler = args.pop();
}

return new ArrayObservable(observables, scheduler).lift(new MergeAllOperator<T | R>(1));
return new ArrayObservable(observables, scheduler).lift(new MergeAllOperator<R>(1));
}
6 changes: 5 additions & 1 deletion src/operator/concatAll.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,9 @@ import {MergeAllOperator} from './mergeAll';
* @returns {Observable} an observable of values merged from the incoming observables.
*/
export function concatAll<T>(): T {
return this.lift(new MergeAllOperator(1));
return this.lift(new MergeAllOperator<T>(1));
}

export interface ConcatAllSignature<T> {
(): T;
}
12 changes: 9 additions & 3 deletions src/operator/concatMap.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import {MergeMapOperator} from './mergeMap';
import {Observable} from '../Observable';
import {Observable, ObservableInput} from '../Observable';

/**
* Maps values from the source observable into new Observables, then merges them in a serialized fashion,
Expand All @@ -20,7 +20,13 @@ import {Observable} from '../Observable';
* @returns {Observable} an observable of values merged from the projected Observables as they were subscribed to,
* one at a time. Optionally, these values may have been projected from a passed `projectResult` argument.
*/
export function concatMap<T, R, R2>(project: (value: T, index: number) => Observable<R>,
resultSelector?: (outerValue: T, innerValue: R, outerIndex: number, innerIndex: number) => R2) {
export function concatMap<T, I, R>(project: (value: T, index: number) => ObservableInput<I>,
resultSelector?: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R) {
return this.lift(new MergeMapOperator(project, resultSelector, 1));
}

export interface ConcatMapSignature<T> {
<R>(project: (value: T, index: number) => ObservableInput<R>): Observable<R>;
<I, R>(project: (value: T, index: number) => ObservableInput<I>,
resultSelector: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R): Observable<R>;
}
Loading

0 comments on commit 212084d

Please sign in to comment.