diff --git a/spec/observables/using-spec.ts b/spec/observables/using-spec.ts index af68344e6e..257c1a9970 100644 --- a/spec/observables/using-spec.ts +++ b/spec/observables/using-spec.ts @@ -1,15 +1,12 @@ import { expect } from 'chai'; -import * as Rx from '../../src/internal/Rx'; +import { using, range, Subscription } from '../../src'; -const Observable = Rx.Observable; -const Subscription = Rx.Subscription; - -describe('Observable.using', () => { +describe('using', () => { it('should dispose of the resource when the subscription is disposed', (done) => { let disposed = false; - const source = Observable.using( + const source = using( () => new Subscription(() => disposed = true), - (resource) => Observable.range(0, 3) + (resource) => range(0, 3) ) .take(2); @@ -26,7 +23,7 @@ describe('Observable.using', () => { const expected = 42; let disposed = false; - const e1 = Observable.using( + const e1 = using( () => new Subscription(() => disposed = true), (resource) => new Promise((resolve: any) => { resolve(expected); })); @@ -43,7 +40,7 @@ describe('Observable.using', () => { const expected = 42; let disposed = false; - const e1 = Observable.using( + const e1 = using( () => new Subscription(() => disposed = true), (resource) => new Promise((resolve: any, reject: any) => { reject(expected); })); @@ -61,7 +58,7 @@ describe('Observable.using', () => { const expectedError = 'expected'; const error = 'error'; - const source = Observable.using( + const source = using( () => { throw expectedError; }, @@ -84,7 +81,7 @@ describe('Observable.using', () => { const error = 'error'; let disposed = false; - const source = Observable.using( + const source = using( () => new Subscription(() => disposed = true), (resource) => { throw error; diff --git a/src/internal/observable/UsingObservable.ts b/src/internal/observable/UsingObservable.ts deleted file mode 100644 index ec751adc73..0000000000 --- a/src/internal/observable/UsingObservable.ts +++ /dev/null @@ -1,87 +0,0 @@ -import { Observable } from '../Observable'; -import { Subscriber } from '../Subscriber'; -import { SubscribableOrPromise, Unsubscribable, TeardownLogic } from '../types'; - -import { subscribeToResult } from '../util/subscribeToResult'; -import { OuterSubscriber } from '../OuterSubscriber'; -/** - * We need this JSDoc comment for affecting ESDoc. - * @extends {Ignored} - * @hide true - */ -export class UsingObservable extends Observable { - /** - * Creates an Observable that uses a resource which will be disposed at the same time as the Observable. - * - * Use it when you catch yourself cleaning up after an Observable. - * - * `using` is a factory operator, which accepts two functions. First function returns a disposable resource. - * It can be an arbitrary object that implements `unsubscribe` method. Second function will be injected with - * that object and should return an Observable. That Observable can use resource object during its execution. - * Both functions passed to `using` will be called every time someone subscribes - neither an Observable nor - * resource object will be shared in any way between subscriptions. - * - * When Observable returned by `using` is subscribed, Observable returned from the second function will be subscribed - * as well. All its notifications (nexted values, completion and error events) will be emitted unchanged by the output - * Observable. If however someone unsubscribes from the Observable or source Observable completes or errors by itself, - * the `unsubscribe` method on resource object will be called. This can be used to do any necessary clean up, which - * otherwise would have to be handled by hand. Note that complete or error notifications are not emitted when someone - * cancels subscription to an Observable via `unsubscribe`, so `using` can be used as a hook, allowing you to make - * sure that all resources which need to exist during an Observable execution will be disposed at appropriate time. - * - * @see {@link defer} - * - * @param {function(): ISubscription} resourceFactory A function which creates any resource object - * that implements `unsubscribe` method. - * @param {function(resource: ISubscription): Observable} observableFactory A function which - * creates an Observable, that can use injected resource object. - * @return {Observable} An Observable that behaves the same as Observable returned by `observableFactory`, but - * which - when completed, errored or unsubscribed - will also call `unsubscribe` on created resource object. - * @static true - * @name using - * @owner Observable - */ - static create(resourceFactory: () => Unsubscribable | void, - observableFactory: (resource: Unsubscribable) => SubscribableOrPromise | void): Observable { - return new UsingObservable(resourceFactory, observableFactory); - } - - constructor(private resourceFactory: () => Unsubscribable | void, - private observableFactory: (resource: Unsubscribable) => SubscribableOrPromise | void) { - super(); - } - - protected _subscribe(subscriber: Subscriber): TeardownLogic { - const { resourceFactory, observableFactory } = this; - - let resource: Unsubscribable; - - try { - resource = resourceFactory(); - return new UsingSubscriber(subscriber, resource, observableFactory); - } catch (err) { - subscriber.error(err); - } - } -} - -class UsingSubscriber extends OuterSubscriber { - constructor(destination: Subscriber, - private resource: Unsubscribable, - private observableFactory: (resource: Unsubscribable) => SubscribableOrPromise | void) { - super(destination); - destination.add(resource); - this.tryUse(); - } - - private tryUse(): void { - try { - const source = this.observableFactory.call(this, this.resource); - if (source) { - this.add(subscribeToResult(this, source)); - } - } catch (err) { - this._error(err); - } - } -} diff --git a/src/internal/observable/using.ts b/src/internal/observable/using.ts index 52406960cd..568d7a89b6 100644 --- a/src/internal/observable/using.ts +++ b/src/internal/observable/using.ts @@ -1,3 +1,63 @@ -import { UsingObservable } from './UsingObservable'; +import { Observable } from '../Observable'; +import { Unsubscribable, ObservableInput } from '../types'; +import { from } from './from'; // from from from! LAWL +import { EMPTY } from './empty'; -export const using = UsingObservable.create; \ No newline at end of file +/** + * Creates an Observable that uses a resource which will be disposed at the same time as the Observable. + * + * Use it when you catch yourself cleaning up after an Observable. + * + * `using` is a factory operator, which accepts two functions. First function returns a disposable resource. + * It can be an arbitrary object that implements `unsubscribe` method. Second function will be injected with + * that object and should return an Observable. That Observable can use resource object during its execution. + * Both functions passed to `using` will be called every time someone subscribes - neither an Observable nor + * resource object will be shared in any way between subscriptions. + * + * When Observable returned by `using` is subscribed, Observable returned from the second function will be subscribed + * as well. All its notifications (nexted values, completion and error events) will be emitted unchanged by the output + * Observable. If however someone unsubscribes from the Observable or source Observable completes or errors by itself, + * the `unsubscribe` method on resource object will be called. This can be used to do any necessary clean up, which + * otherwise would have to be handled by hand. Note that complete or error notifications are not emitted when someone + * cancels subscription to an Observable via `unsubscribe`, so `using` can be used as a hook, allowing you to make + * sure that all resources which need to exist during an Observable execution will be disposed at appropriate time. + * + * @see {@link defer} + * + * @param {function(): ISubscription} resourceFactory A function which creates any resource object + * that implements `unsubscribe` method. + * @param {function(resource: ISubscription): Observable} observableFactory A function which + * creates an Observable, that can use injected resource object. + * @return {Observable} An Observable that behaves the same as Observable returned by `observableFactory`, but + * which - when completed, errored or unsubscribed - will also call `unsubscribe` on created resource object. + */ +export function using(resourceFactory: () => Unsubscribable | void, + observableFactory: (resource: Unsubscribable | void) => ObservableInput | void): Observable { + return new Observable(subscriber => { + let resource: Unsubscribable | void; + + try { + resource = resourceFactory(); + } catch (err) { + subscriber.error(err); + return undefined; + } + + let result: ObservableInput | void; + try { + result = observableFactory(resource); + } catch (err) { + subscriber.error(err); + return undefined; + } + + const source = result ? from(result) : EMPTY; + const subscription = source.subscribe(subscriber); + return () => { + subscription.unsubscribe(); + if (resource) { + resource.unsubscribe(); + } + }; + }); +}