Skip to content

Commit

Permalink
fix(subscriptions): unsubscribe correctly when a Subscriber throws du…
Browse files Browse the repository at this point in the history
…ring synchronous dispatch.
  • Loading branch information
trxcllnt authored and benlesh committed Feb 2, 2016
1 parent 55a2f98 commit b1698fe
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 19 deletions.
8 changes: 4 additions & 4 deletions spec/Observable-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ describe('Observable', function () {
expect(unsubscribeCalled).toBe(true);
});

it('should not run unsubscription logic when an error is thrown sending messages synchronously', function () {
it('should run unsubscription logic when an error is thrown sending messages synchronously', function () {
var messageError = false;
var messageErrorValue = false;
var unsubscribeCalled = false;
Expand All @@ -167,7 +167,7 @@ describe('Observable', function () {
}

expect(sub).toBe(undefined);
expect(unsubscribeCalled).toBe(false);
expect(unsubscribeCalled).toBe(true);
expect(messageError).toBe(true);
expect(messageErrorValue).toBe('boo!');
});
Expand Down Expand Up @@ -195,7 +195,7 @@ describe('Observable', function () {

expect(sub).toBe(undefined);
expect(subscriber.isUnsubscribed).toBe(true);
expect(unsubscribeCalled).toBe(false);
expect(unsubscribeCalled).toBe(true);
expect(messageError).toBe(true);
expect(messageErrorValue).toBe('boo!');
});
Expand Down Expand Up @@ -268,7 +268,7 @@ describe('Observable', function () {
}

expect(sub).toBe(undefined);
expect(unsubscribeCalled).toBe(false);
expect(unsubscribeCalled).toBe(true);
expect(messageError).toBe(true);
expect(messageErrorValue).toBe('boo!');
});
Expand Down
7 changes: 7 additions & 0 deletions src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,13 @@ export class Observable<T> implements CoreOperators<T> {
subscriber.add(this._subscribe(subscriber));
}

if (subscriber.syncErrorThrowable) {
subscriber.syncErrorThrowable = false;
if (subscriber.syncErrorThrown) {
throw subscriber.syncErrorValue;
}
}

return subscriber;
}

Expand Down
75 changes: 60 additions & 15 deletions src/Subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,15 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
static create<T>(next?: (x?: T) => void,
error?: (e?: any) => void,
complete?: () => void): Subscriber<T> {
return new Subscriber(next, error, complete);
const subscriber = new Subscriber(next, error, complete);
subscriber.syncErrorThrowable = false;
return subscriber;
}

public syncErrorValue: any = null;
public syncErrorThrown: boolean = false;
public syncErrorThrowable: boolean = false;

protected isStopped: boolean = false;
protected destination: Observer<any>;

Expand All @@ -33,11 +39,13 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
if (destinationOrNext instanceof Subscriber) {
this.destination = (<Observer<any>> destinationOrNext);
} else {
this.syncErrorThrowable = true;
this.destination = new SafeSubscriber<T>(this, <Observer<any>> destinationOrNext);
}
break;
}
default:
this.syncErrorThrowable = true;
this.destination = new SafeSubscriber<T>(this, <((value: T) => void)> destinationOrNext, error, complete);
break;
}
Expand Down Expand Up @@ -120,41 +128,78 @@ class SafeSubscriber<T> extends Subscriber<T> {

next(value?: T): void {
if (!this.isStopped && this._next) {
this.__tryOrUnsub(this._next, value);
}
}

__tryOrUnsub(fn: Function, value?: any): void {
try {
fn.call(this._context, value);
} catch (err) {
this.unsubscribe();
throw err;
const { _parent } = this;
if (!_parent.syncErrorThrowable) {
this.__tryOrUnsub(this._next, value);
} else if (this.__tryOrSetError(_parent, this._next, value)) {
this.unsubscribe();
}
}
}

error(err?: any): void {
if (!this.isStopped) {
const { _parent } = this;
if (this._error) {
this.__tryOrUnsub(this._error, err);
if (!_parent.syncErrorThrowable) {
this.__tryOrUnsub(this._error, err);
this.unsubscribe();
} else {
this.__tryOrSetError(_parent, this._error, err);
this.unsubscribe();
}
} else if (!_parent.syncErrorThrowable) {
this.unsubscribe();
throw err;
} else {
_parent.syncErrorValue = err;
_parent.syncErrorThrown = true;
this.unsubscribe();
}
this.unsubscribe();
}
}

complete(): void {
if (!this.isStopped) {
const { _parent } = this;
if (this._complete) {
this.__tryOrUnsub(this._complete);
if (!_parent.syncErrorThrowable) {
this.__tryOrUnsub(this._complete);
this.unsubscribe();
} else {
this.__tryOrSetError(_parent, this._complete);
this.unsubscribe();
}
} else {
this.unsubscribe();
}
}
}

private __tryOrUnsub(fn: Function, value?: any): void {
try {
fn.call(this._context, value);
} catch (err) {
this.unsubscribe();
throw err;
}
}

private __tryOrSetError(parent: Subscriber<T>, fn: Function, value?: any): boolean {
try {
fn.call(this._context, value);
} catch (err) {
parent.syncErrorValue = err;
parent.syncErrorThrown = true;
return true;
}
return false;
}

protected _unsubscribe(): void {
const { _parent } = this;
this._context = null;
this._parent = null;
_parent.unsubscribe();
}
}
}

0 comments on commit b1698fe

Please sign in to comment.