Skip to content

Commit 2256e7b

Browse files
vladimakwonoj
authored andcommitted
fix(Observable): introduce Subscribable interface that will be used instead of Observable in input
1 parent 34a0d3c commit 2256e7b

File tree

5 files changed

+29
-25
lines changed

5 files changed

+29
-25
lines changed

src/Observable.ts

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import {PartialObserver} from './Observer';
1+
import {PartialObserver, Observer} from './Observer';
22
import {Operator} from './Operator';
33
import {Subscriber} from './Subscriber';
44
import {Subscription} from './Subscription';
@@ -9,17 +9,21 @@ import {toSubscriber} from './util/toSubscriber';
99
import {IfObservable} from './observable/IfObservable';
1010
import {ErrorObservable} from './observable/ErrorObservable';
1111

12-
export type ObservableOrPromise<T> = Observable<T> | Promise<T>;
12+
export interface Subscribable<T> {
13+
subscribe(observer: Observer<T>): Subscription;
14+
}
15+
16+
export type SubscribableOrPromise<T> = Subscribable<T> | Promise<T>;
1317
export type ArrayOrIterator<T> = Iterator<T> | ArrayLike<T>;
14-
export type ObservableInput<T> = ObservableOrPromise<T> | ArrayOrIterator<T>;
18+
export type ObservableInput<T> = SubscribableOrPromise<T> | ArrayOrIterator<T>;
1519

1620
/**
1721
* A representation of any set of values over any amount of time. This the most basic building block
1822
* of RxJS.
1923
*
2024
* @class Observable<T>
2125
*/
22-
export class Observable<T> {
26+
export class Observable<T> implements Subscribable<T> {
2327

2428
public _isScalar: boolean = false;
2529

src/operator/debounce.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import {Operator} from '../Operator';
2-
import {Observable, ObservableOrPromise} from '../Observable';
2+
import {Observable, SubscribableOrPromise} from '../Observable';
33
import {Subscriber} from '../Subscriber';
44
import {Subscription} from '../Subscription';
55

@@ -19,16 +19,16 @@ import {subscribeToResult} from '../util/subscribeToResult';
1919
* @method debounce
2020
* @owner Observable
2121
*/
22-
export function debounce<T>(durationSelector: (value: T) => ObservableOrPromise<number>): Observable<T> {
22+
export function debounce<T>(durationSelector: (value: T) => SubscribableOrPromise<number>): Observable<T> {
2323
return this.lift(new DebounceOperator(durationSelector));
2424
}
2525

2626
export interface DebounceSignature<T> {
27-
(durationSelector: (value: T) => ObservableOrPromise<number>): Observable<T>;
27+
(durationSelector: (value: T) => SubscribableOrPromise<number>): Observable<T>;
2828
}
2929

3030
class DebounceOperator<T> implements Operator<T, T> {
31-
constructor(private durationSelector: (value: T) => ObservableOrPromise<number>) {
31+
constructor(private durationSelector: (value: T) => SubscribableOrPromise<number>) {
3232
}
3333

3434
call(subscriber: Subscriber<T>): Subscriber<T> {
@@ -42,7 +42,7 @@ class DebounceSubscriber<T, R> extends OuterSubscriber<T, R> {
4242
private durationSubscription: Subscription = null;
4343

4444
constructor(destination: Subscriber<R>,
45-
private durationSelector: (value: T) => ObservableOrPromise<number>) {
45+
private durationSelector: (value: T) => SubscribableOrPromise<number>) {
4646
super(destination);
4747
}
4848

@@ -63,7 +63,7 @@ class DebounceSubscriber<T, R> extends OuterSubscriber<T, R> {
6363
this.destination.complete();
6464
}
6565

66-
private _tryNext(value: T, duration: ObservableOrPromise<number>): void {
66+
private _tryNext(value: T, duration: SubscribableOrPromise<number>): void {
6767
let subscription = this.durationSubscription;
6868
this.value = value;
6969
this.hasValue = true;

src/operator/inspect.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import {Operator} from '../Operator';
22
import {Subscriber} from '../Subscriber';
3-
import {Observable, ObservableOrPromise} from '../Observable';
3+
import {Observable, SubscribableOrPromise} from '../Observable';
44
import {Subscription} from '../Subscription';
55

66
import {tryCatch} from '../util/tryCatch';
@@ -14,16 +14,16 @@ import {subscribeToResult} from '../util/subscribeToResult';
1414
* @method inspect
1515
* @owner Observable
1616
*/
17-
export function inspect<T>(durationSelector: (value: T) => ObservableOrPromise<any>): Observable<T> {
17+
export function inspect<T>(durationSelector: (value: T) => SubscribableOrPromise<any>): Observable<T> {
1818
return this.lift(new InspectOperator(durationSelector));
1919
}
2020

2121
export interface InspectSignature<T> {
22-
(durationSelector: (value: T) => ObservableOrPromise<any>): Observable<T>;
22+
(durationSelector: (value: T) => SubscribableOrPromise<any>): Observable<T>;
2323
}
2424

2525
class InspectOperator<T> implements Operator<T, T> {
26-
constructor(private durationSelector: (value: T) => ObservableOrPromise<any>) {
26+
constructor(private durationSelector: (value: T) => SubscribableOrPromise<any>) {
2727
}
2828

2929
call(subscriber: Subscriber<T>): Subscriber<T> {
@@ -38,7 +38,7 @@ class InspectSubscriber<T, R> extends OuterSubscriber<T, R> {
3838
private throttled: Subscription;
3939

4040
constructor(destination: Subscriber<T>,
41-
private durationSelector: (value: T) => ObservableOrPromise<any>) {
41+
private durationSelector: (value: T) => SubscribableOrPromise<any>) {
4242
super(destination);
4343
}
4444

src/operator/mergeMapTo.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import {Observable, ObservableInput, ObservableOrPromise} from '../Observable';
1+
import {Observable, ObservableInput, SubscribableOrPromise} from '../Observable';
22
import {Operator} from '../Operator';
33
import {PartialObserver} from '../Observer';
44
import {Subscriber} from '../Subscriber';
@@ -31,7 +31,7 @@ export interface MergeMapToSignature<T> {
3131
// TODO: Figure out correct signature here: an Operator<Observable<T>, R>
3232
// needs to implement call(observer: Subscriber<R>): Subscriber<Observable<T>>
3333
export class MergeMapToOperator<T, I, R> implements Operator<Observable<T>, R> {
34-
constructor(private ish: ObservableOrPromise<I>,
34+
constructor(private ish: SubscribableOrPromise<I>,
3535
private resultSelector?: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R,
3636
private concurrent: number = Number.POSITIVE_INFINITY) {
3737
}
@@ -48,7 +48,7 @@ export class MergeMapToSubscriber<T, I, R> extends OuterSubscriber<T, I> {
4848
protected index: number = 0;
4949

5050
constructor(destination: Subscriber<R>,
51-
private ish: ObservableOrPromise<I>,
51+
private ish: SubscribableOrPromise<I>,
5252
private resultSelector?: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R,
5353
private concurrent: number = Number.POSITIVE_INFINITY) {
5454
super(destination);

src/operator/throttle.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import {Operator} from '../Operator';
2-
import {Observable, ObservableOrPromise} from '../Observable';
2+
import {Observable, SubscribableOrPromise} from '../Observable';
33
import {Subscriber} from '../Subscriber';
44
import {Subscription} from '../Subscription';
55

@@ -13,16 +13,16 @@ import {subscribeToResult} from '../util/subscribeToResult';
1313
* @method throttle
1414
* @owner Observable
1515
*/
16-
export function throttle<T>(durationSelector: (value: T) => ObservableOrPromise<number>): Observable<T> {
16+
export function throttle<T>(durationSelector: (value: T) => SubscribableOrPromise<number>): Observable<T> {
1717
return this.lift(new ThrottleOperator(durationSelector));
1818
}
1919

2020
export interface ThrottleSignature<T> {
21-
(durationSelector: (value: T) => ObservableOrPromise<number>): Observable<T>;
21+
(durationSelector: (value: T) => SubscribableOrPromise<number>): Observable<T>;
2222
}
2323

2424
class ThrottleOperator<T> implements Operator<T, T> {
25-
constructor(private durationSelector: (value: T) => ObservableOrPromise<number>) {
25+
constructor(private durationSelector: (value: T) => SubscribableOrPromise<number>) {
2626
}
2727

2828
call(subscriber: Subscriber<T>): Subscriber<T> {
@@ -34,7 +34,7 @@ class ThrottleSubscriber<T, R> extends OuterSubscriber<T, R> {
3434
private throttled: Subscription;
3535

3636
constructor(protected destination: Subscriber<T>,
37-
private durationSelector: (value: T) => ObservableOrPromise<number>) {
37+
private durationSelector: (value: T) => SubscribableOrPromise<number>) {
3838
super(destination);
3939
}
4040

@@ -45,7 +45,7 @@ class ThrottleSubscriber<T, R> extends OuterSubscriber<T, R> {
4545
}
4646

4747
private tryDurationSelector(value: T): void {
48-
let duration: ObservableOrPromise<number> = null;
48+
let duration: SubscribableOrPromise<number> = null;
4949
try {
5050
duration = this.durationSelector(value);
5151
} catch (err) {
@@ -55,7 +55,7 @@ class ThrottleSubscriber<T, R> extends OuterSubscriber<T, R> {
5555
this.emitAndThrottle(value, duration);
5656
}
5757

58-
private emitAndThrottle(value: T, duration: ObservableOrPromise<number>) {
58+
private emitAndThrottle(value: T, duration: SubscribableOrPromise<number>) {
5959
this.add(this.throttled = subscribeToResult(this, duration));
6060
this.destination.next(value);
6161
}

0 commit comments

Comments
 (0)