Skip to content

Commit b1698fe

Browse files
trxcllntbenlesh
authored andcommitted
fix(subscriptions): unsubscribe correctly when a Subscriber throws during synchronous dispatch.
1 parent 55a2f98 commit b1698fe

File tree

3 files changed

+71
-19
lines changed

3 files changed

+71
-19
lines changed

spec/Observable-spec.js

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ describe('Observable', function () {
146146
expect(unsubscribeCalled).toBe(true);
147147
});
148148

149-
it('should not run unsubscription logic when an error is thrown sending messages synchronously', function () {
149+
it('should run unsubscription logic when an error is thrown sending messages synchronously', function () {
150150
var messageError = false;
151151
var messageErrorValue = false;
152152
var unsubscribeCalled = false;
@@ -167,7 +167,7 @@ describe('Observable', function () {
167167
}
168168

169169
expect(sub).toBe(undefined);
170-
expect(unsubscribeCalled).toBe(false);
170+
expect(unsubscribeCalled).toBe(true);
171171
expect(messageError).toBe(true);
172172
expect(messageErrorValue).toBe('boo!');
173173
});
@@ -195,7 +195,7 @@ describe('Observable', function () {
195195

196196
expect(sub).toBe(undefined);
197197
expect(subscriber.isUnsubscribed).toBe(true);
198-
expect(unsubscribeCalled).toBe(false);
198+
expect(unsubscribeCalled).toBe(true);
199199
expect(messageError).toBe(true);
200200
expect(messageErrorValue).toBe('boo!');
201201
});
@@ -268,7 +268,7 @@ describe('Observable', function () {
268268
}
269269

270270
expect(sub).toBe(undefined);
271-
expect(unsubscribeCalled).toBe(false);
271+
expect(unsubscribeCalled).toBe(true);
272272
expect(messageError).toBe(true);
273273
expect(messageErrorValue).toBe('boo!');
274274
});

src/Observable.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,13 @@ export class Observable<T> implements CoreOperators<T> {
118118
subscriber.add(this._subscribe(subscriber));
119119
}
120120

121+
if (subscriber.syncErrorThrowable) {
122+
subscriber.syncErrorThrowable = false;
123+
if (subscriber.syncErrorThrown) {
124+
throw subscriber.syncErrorValue;
125+
}
126+
}
127+
121128
return subscriber;
122129
}
123130

src/Subscriber.ts

Lines changed: 60 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,15 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
99
static create<T>(next?: (x?: T) => void,
1010
error?: (e?: any) => void,
1111
complete?: () => void): Subscriber<T> {
12-
return new Subscriber(next, error, complete);
12+
const subscriber = new Subscriber(next, error, complete);
13+
subscriber.syncErrorThrowable = false;
14+
return subscriber;
1315
}
1416

17+
public syncErrorValue: any = null;
18+
public syncErrorThrown: boolean = false;
19+
public syncErrorThrowable: boolean = false;
20+
1521
protected isStopped: boolean = false;
1622
protected destination: Observer<any>;
1723

@@ -33,11 +39,13 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
3339
if (destinationOrNext instanceof Subscriber) {
3440
this.destination = (<Observer<any>> destinationOrNext);
3541
} else {
42+
this.syncErrorThrowable = true;
3643
this.destination = new SafeSubscriber<T>(this, <Observer<any>> destinationOrNext);
3744
}
3845
break;
3946
}
4047
default:
48+
this.syncErrorThrowable = true;
4149
this.destination = new SafeSubscriber<T>(this, <((value: T) => void)> destinationOrNext, error, complete);
4250
break;
4351
}
@@ -120,41 +128,78 @@ class SafeSubscriber<T> extends Subscriber<T> {
120128

121129
next(value?: T): void {
122130
if (!this.isStopped && this._next) {
123-
this.__tryOrUnsub(this._next, value);
124-
}
125-
}
126-
127-
__tryOrUnsub(fn: Function, value?: any): void {
128-
try {
129-
fn.call(this._context, value);
130-
} catch (err) {
131-
this.unsubscribe();
132-
throw err;
131+
const { _parent } = this;
132+
if (!_parent.syncErrorThrowable) {
133+
this.__tryOrUnsub(this._next, value);
134+
} else if (this.__tryOrSetError(_parent, this._next, value)) {
135+
this.unsubscribe();
136+
}
133137
}
134138
}
135139

136140
error(err?: any): void {
137141
if (!this.isStopped) {
142+
const { _parent } = this;
138143
if (this._error) {
139-
this.__tryOrUnsub(this._error, err);
144+
if (!_parent.syncErrorThrowable) {
145+
this.__tryOrUnsub(this._error, err);
146+
this.unsubscribe();
147+
} else {
148+
this.__tryOrSetError(_parent, this._error, err);
149+
this.unsubscribe();
150+
}
151+
} else if (!_parent.syncErrorThrowable) {
152+
this.unsubscribe();
153+
throw err;
154+
} else {
155+
_parent.syncErrorValue = err;
156+
_parent.syncErrorThrown = true;
157+
this.unsubscribe();
140158
}
141-
this.unsubscribe();
142159
}
143160
}
144161

145162
complete(): void {
146163
if (!this.isStopped) {
164+
const { _parent } = this;
147165
if (this._complete) {
148-
this.__tryOrUnsub(this._complete);
166+
if (!_parent.syncErrorThrowable) {
167+
this.__tryOrUnsub(this._complete);
168+
this.unsubscribe();
169+
} else {
170+
this.__tryOrSetError(_parent, this._complete);
171+
this.unsubscribe();
172+
}
173+
} else {
174+
this.unsubscribe();
149175
}
176+
}
177+
}
178+
179+
private __tryOrUnsub(fn: Function, value?: any): void {
180+
try {
181+
fn.call(this._context, value);
182+
} catch (err) {
150183
this.unsubscribe();
184+
throw err;
151185
}
152186
}
153187

188+
private __tryOrSetError(parent: Subscriber<T>, fn: Function, value?: any): boolean {
189+
try {
190+
fn.call(this._context, value);
191+
} catch (err) {
192+
parent.syncErrorValue = err;
193+
parent.syncErrorThrown = true;
194+
return true;
195+
}
196+
return false;
197+
}
198+
154199
protected _unsubscribe(): void {
155200
const { _parent } = this;
156201
this._context = null;
157202
this._parent = null;
158203
_parent.unsubscribe();
159204
}
160-
}
205+
}

0 commit comments

Comments
 (0)