Skip to content

Commit

Permalink
Merge 4e31794 into dca6504
Browse files Browse the repository at this point in the history
  • Loading branch information
trxcllnt committed Feb 13, 2016
2 parents dca6504 + 4e31794 commit 0388b7b
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 0 deletions.
29 changes: 29 additions & 0 deletions spec/observables/using-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/* globals describe, it */
var Rx = require('../../dist/cjs/Rx.KitchenSink');
var Observable = Rx.Observable;
var Subscription = Rx.Subscription;

describe('Observable.using', function () {
it('should dispose of the resource when the subscription is disposed', function (done) {
var disposed = false;
var source = Observable.using(
function () {
return new Subscription(function () {
disposed = true;
});
},
function (resource) {
return Observable.range(0, 3);
}
)
.take(2);

source.subscribe();

if (disposed) {
done();
} else {
done.fail('disposed should be true but was false');
}
});
});
2 changes: 2 additions & 0 deletions src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import {raceStatic} from './operator/race';
import {RangeObservable} from './observable/RangeObservable';
import {NeverObservable} from './observable/NeverObservable';
import {ErrorObservable} from './observable/ErrorObservable';
import {UsingObservable} from './observable/UsingObservable';
import {AjaxCreationMethod} from './observable/dom/AjaxObservable';
import {WebSocketSubject} from './observable/dom/WebSocketSubject';

Expand Down Expand Up @@ -201,6 +202,7 @@ export class Observable<T> implements CoreOperators<T> {
static range: typeof RangeObservable.create;
static throw: typeof ErrorObservable.create;
static timer: typeof TimerObservable.create;
static using: typeof UsingObservable.create;
static webSocket: typeof WebSocketSubject.create;
static zip: typeof zipStatic;

Expand Down
1 change: 1 addition & 0 deletions src/Rx.KitchenSink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import './add/observable/never';
import './add/observable/range';
import './add/observable/throw';
import './add/observable/timer';
import './add/observable/using';
import './add/observable/zip';

// Operators
Expand Down
6 changes: 6 additions & 0 deletions src/add/observable/using.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import {Observable} from '../../Observable';
import {UsingObservable} from '../../observable/UsingObservable';

Observable.using = UsingObservable.create;

export var _void: void;
50 changes: 50 additions & 0 deletions src/observable/UsingObservable.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import {Observable} from '../Observable';
import {Subscriber} from '../Subscriber';
import {Subscription} from '../Subscription';

export class UsingObservable<T> extends Observable<T> {

static create<T>(resourceFactory: () => Subscription,
observableFactory: (resource: Subscription) => Observable<T>): Observable<T> {
return new UsingObservable<T>(resourceFactory, observableFactory);
}

constructor(private resourceFactory: () => Subscription,
private observableFactory: (resource: Subscription) => Observable<T>) {
super();
}

protected _subscribe(subscriber: Subscriber<T>): Subscription | Function | void {

const { resourceFactory, observableFactory } = this;

let resource: Subscription,
source: Observable<T>,
error: any, errorHappened = false;

try {
resource = resourceFactory();
} catch (e) {
error = e;
errorHappened = true;
}

if (errorHappened) {
subscriber.error(error);
} else {
subscriber.add(resource);
try {
source = observableFactory(resource);
} catch (e) {
error = e;
errorHappened = true;
}

if (errorHappened) {
subscriber.error(error);
} else {
return source.subscribe(subscriber);
}
}
}
}

0 comments on commit 0388b7b

Please sign in to comment.