Skip to content

Commit d4a9aac

Browse files
benleshjayphelps
authored andcommitted
fix(Observable): errors thrown during subscription are now properly sent down error channel (#2313)
- Fixes `Observable.create(fn)` and `new Observable(fn)` such that any error thrown in `fn` on subscription will be sent to the subscriber's error handler. - Fixes a subject test that was relying on the errant behavior. fixes #1833
1 parent c70a09d commit d4a9aac

File tree

3 files changed

+36
-4
lines changed

3 files changed

+36
-4
lines changed

spec/Observable-spec.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,18 @@ describe('Observable', () => {
2929
source.subscribe(function (x) { expect(x).to.equal(1); }, null, done);
3030
});
3131

32+
it('should send errors thrown in the constructor down the error path', (done) => {
33+
new Observable((observer) => {
34+
throw new Error('this should be handled');
35+
})
36+
.subscribe({
37+
error(err) {
38+
expect(err).to.deep.equal(new Error('this should be handled'));
39+
done();
40+
}
41+
});
42+
});
43+
3244
describe('forEach', () => {
3345
it('should iterate and return a Promise', (done: MochaDone) => {
3446
const expected = [1, 2, 3];
@@ -582,6 +594,18 @@ describe('Observable.create', () => {
582594
});
583595
expect(called).to.be.true;
584596
});
597+
598+
it('should send errors thrown in the passed function down the error path', (done) => {
599+
Observable.create((observer) => {
600+
throw new Error('this should be handled');
601+
})
602+
.subscribe({
603+
error(err) {
604+
expect(err).to.deep.equal(new Error('this should be handled'));
605+
done();
606+
}
607+
});
608+
});
585609
});
586610

587611
/** @test {Observable} */

spec/Subject-spec.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -261,10 +261,8 @@ describe('Subject', () => {
261261
expect(() => {
262262
subject.subscribe(
263263
function (x) { results3.push(x); },
264-
function (e) { results3.push('E'); },
265-
() => { results3.push('C'); }
266264
);
267-
}).to.throw();
265+
}).to.throw(Rx.ObjectUnsubscribedError);
268266

269267
expect(results1).to.deep.equal([1, 2, 3, 4, 5]);
270268
expect(results2).to.deep.equal([3, 4, 5]);

src/Observable.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ export class Observable<T> implements Subscribable<T> {
9595
if (operator) {
9696
operator.call(sink, this.source);
9797
} else {
98-
sink.add(this._subscribe(sink));
98+
sink.add(this._trySubscribe(sink));
9999
}
100100

101101
if (sink.syncErrorThrowable) {
@@ -108,6 +108,16 @@ export class Observable<T> implements Subscribable<T> {
108108
return sink;
109109
}
110110

111+
private _trySubscribe(sink: Subscriber<T>): TeardownLogic {
112+
try {
113+
return this._subscribe(sink);
114+
} catch (err) {
115+
sink.syncErrorThrown = true;
116+
sink.syncErrorValue = err;
117+
sink.error(err);
118+
}
119+
}
120+
111121
/**
112122
* @method forEach
113123
* @param {Function} next a handler for each value emitted by the observable

0 commit comments

Comments
 (0)