From 2256e7bcb2d8c450fd7e90859acaad1755a561cc Mon Sep 17 00:00:00 2001 From: Vladimir Matveev Date: Tue, 15 Mar 2016 16:24:05 -0700 Subject: [PATCH] fix(Observable): introduce Subscribable interface that will be used instead of Observable in input --- src/Observable.ts | 12 ++++++++---- src/operator/debounce.ts | 12 ++++++------ src/operator/inspect.ts | 10 +++++----- src/operator/mergeMapTo.ts | 6 +++--- src/operator/throttle.ts | 14 +++++++------- 5 files changed, 29 insertions(+), 25 deletions(-) diff --git a/src/Observable.ts b/src/Observable.ts index f73d121953..afc469625b 100644 --- a/src/Observable.ts +++ b/src/Observable.ts @@ -1,4 +1,4 @@ -import {PartialObserver} from './Observer'; +import {PartialObserver, Observer} from './Observer'; import {Operator} from './Operator'; import {Subscriber} from './Subscriber'; import {Subscription} from './Subscription'; @@ -9,9 +9,13 @@ import {toSubscriber} from './util/toSubscriber'; import {IfObservable} from './observable/IfObservable'; import {ErrorObservable} from './observable/ErrorObservable'; -export type ObservableOrPromise = Observable | Promise; +export interface Subscribable { + subscribe(observer: Observer): Subscription; +} + +export type SubscribableOrPromise = Subscribable | Promise; export type ArrayOrIterator = Iterator | ArrayLike; -export type ObservableInput = ObservableOrPromise | ArrayOrIterator; +export type ObservableInput = SubscribableOrPromise | ArrayOrIterator; /** * A representation of any set of values over any amount of time. This the most basic building block @@ -19,7 +23,7 @@ export type ObservableInput = ObservableOrPromise | ArrayOrIterator; * * @class Observable */ -export class Observable { +export class Observable implements Subscribable { public _isScalar: boolean = false; diff --git a/src/operator/debounce.ts b/src/operator/debounce.ts index d999d70558..585505ac18 100644 --- a/src/operator/debounce.ts +++ b/src/operator/debounce.ts @@ -1,5 +1,5 @@ import {Operator} from '../Operator'; -import {Observable, ObservableOrPromise} from '../Observable'; +import {Observable, SubscribableOrPromise} from '../Observable'; import {Subscriber} from '../Subscriber'; import {Subscription} from '../Subscription'; @@ -19,16 +19,16 @@ import {subscribeToResult} from '../util/subscribeToResult'; * @method debounce * @owner Observable */ -export function debounce(durationSelector: (value: T) => ObservableOrPromise): Observable { +export function debounce(durationSelector: (value: T) => SubscribableOrPromise): Observable { return this.lift(new DebounceOperator(durationSelector)); } export interface DebounceSignature { - (durationSelector: (value: T) => ObservableOrPromise): Observable; + (durationSelector: (value: T) => SubscribableOrPromise): Observable; } class DebounceOperator implements Operator { - constructor(private durationSelector: (value: T) => ObservableOrPromise) { + constructor(private durationSelector: (value: T) => SubscribableOrPromise) { } call(subscriber: Subscriber): Subscriber { @@ -42,7 +42,7 @@ class DebounceSubscriber extends OuterSubscriber { private durationSubscription: Subscription = null; constructor(destination: Subscriber, - private durationSelector: (value: T) => ObservableOrPromise) { + private durationSelector: (value: T) => SubscribableOrPromise) { super(destination); } @@ -63,7 +63,7 @@ class DebounceSubscriber extends OuterSubscriber { this.destination.complete(); } - private _tryNext(value: T, duration: ObservableOrPromise): void { + private _tryNext(value: T, duration: SubscribableOrPromise): void { let subscription = this.durationSubscription; this.value = value; this.hasValue = true; diff --git a/src/operator/inspect.ts b/src/operator/inspect.ts index 20ba7fb09b..8ba5f5483d 100644 --- a/src/operator/inspect.ts +++ b/src/operator/inspect.ts @@ -1,6 +1,6 @@ import {Operator} from '../Operator'; import {Subscriber} from '../Subscriber'; -import {Observable, ObservableOrPromise} from '../Observable'; +import {Observable, SubscribableOrPromise} from '../Observable'; import {Subscription} from '../Subscription'; import {tryCatch} from '../util/tryCatch'; @@ -14,16 +14,16 @@ import {subscribeToResult} from '../util/subscribeToResult'; * @method inspect * @owner Observable */ -export function inspect(durationSelector: (value: T) => ObservableOrPromise): Observable { +export function inspect(durationSelector: (value: T) => SubscribableOrPromise): Observable { return this.lift(new InspectOperator(durationSelector)); } export interface InspectSignature { - (durationSelector: (value: T) => ObservableOrPromise): Observable; + (durationSelector: (value: T) => SubscribableOrPromise): Observable; } class InspectOperator implements Operator { - constructor(private durationSelector: (value: T) => ObservableOrPromise) { + constructor(private durationSelector: (value: T) => SubscribableOrPromise) { } call(subscriber: Subscriber): Subscriber { @@ -38,7 +38,7 @@ class InspectSubscriber extends OuterSubscriber { private throttled: Subscription; constructor(destination: Subscriber, - private durationSelector: (value: T) => ObservableOrPromise) { + private durationSelector: (value: T) => SubscribableOrPromise) { super(destination); } diff --git a/src/operator/mergeMapTo.ts b/src/operator/mergeMapTo.ts index df0dc0f56e..4f5edd79cb 100644 --- a/src/operator/mergeMapTo.ts +++ b/src/operator/mergeMapTo.ts @@ -1,4 +1,4 @@ -import {Observable, ObservableInput, ObservableOrPromise} from '../Observable'; +import {Observable, ObservableInput, SubscribableOrPromise} from '../Observable'; import {Operator} from '../Operator'; import {PartialObserver} from '../Observer'; import {Subscriber} from '../Subscriber'; @@ -31,7 +31,7 @@ export interface MergeMapToSignature { // TODO: Figure out correct signature here: an Operator, R> // needs to implement call(observer: Subscriber): Subscriber> export class MergeMapToOperator implements Operator, R> { - constructor(private ish: ObservableOrPromise, + constructor(private ish: SubscribableOrPromise, private resultSelector?: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R, private concurrent: number = Number.POSITIVE_INFINITY) { } @@ -48,7 +48,7 @@ export class MergeMapToSubscriber extends OuterSubscriber { protected index: number = 0; constructor(destination: Subscriber, - private ish: ObservableOrPromise, + private ish: SubscribableOrPromise, private resultSelector?: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R, private concurrent: number = Number.POSITIVE_INFINITY) { super(destination); diff --git a/src/operator/throttle.ts b/src/operator/throttle.ts index b773cec244..90cae2853f 100644 --- a/src/operator/throttle.ts +++ b/src/operator/throttle.ts @@ -1,5 +1,5 @@ import {Operator} from '../Operator'; -import {Observable, ObservableOrPromise} from '../Observable'; +import {Observable, SubscribableOrPromise} from '../Observable'; import {Subscriber} from '../Subscriber'; import {Subscription} from '../Subscription'; @@ -13,16 +13,16 @@ import {subscribeToResult} from '../util/subscribeToResult'; * @method throttle * @owner Observable */ -export function throttle(durationSelector: (value: T) => ObservableOrPromise): Observable { +export function throttle(durationSelector: (value: T) => SubscribableOrPromise): Observable { return this.lift(new ThrottleOperator(durationSelector)); } export interface ThrottleSignature { - (durationSelector: (value: T) => ObservableOrPromise): Observable; + (durationSelector: (value: T) => SubscribableOrPromise): Observable; } class ThrottleOperator implements Operator { - constructor(private durationSelector: (value: T) => ObservableOrPromise) { + constructor(private durationSelector: (value: T) => SubscribableOrPromise) { } call(subscriber: Subscriber): Subscriber { @@ -34,7 +34,7 @@ class ThrottleSubscriber extends OuterSubscriber { private throttled: Subscription; constructor(protected destination: Subscriber, - private durationSelector: (value: T) => ObservableOrPromise) { + private durationSelector: (value: T) => SubscribableOrPromise) { super(destination); } @@ -45,7 +45,7 @@ class ThrottleSubscriber extends OuterSubscriber { } private tryDurationSelector(value: T): void { - let duration: ObservableOrPromise = null; + let duration: SubscribableOrPromise = null; try { duration = this.durationSelector(value); } catch (err) { @@ -55,7 +55,7 @@ class ThrottleSubscriber extends OuterSubscriber { this.emitAndThrottle(value, duration); } - private emitAndThrottle(value: T, duration: ObservableOrPromise) { + private emitAndThrottle(value: T, duration: SubscribableOrPromise) { this.add(this.throttled = subscribeToResult(this, duration)); this.destination.next(value); }