Skip to content

Commit

Permalink
fix(Subject): throw ObjectUnsubscribedError when unsubecribed
Browse files Browse the repository at this point in the history
closes #859
  • Loading branch information
kwonoj authored and benlesh committed Feb 9, 2016
1 parent 51add30 commit 29b630b
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 14 deletions.
60 changes: 51 additions & 9 deletions spec/Subject-spec.js
Expand Up @@ -129,9 +129,6 @@ describe('Subject', function () {
subscription1.unsubscribe();

subject.complete();
subject.next(9);
subject.complete();
subject.error(new Error('err'));

subscription2.unsubscribe();

Expand Down Expand Up @@ -179,9 +176,6 @@ describe('Subject', function () {

subscription1.unsubscribe();

subject.error(new Error('err'));
subject.next(9);
subject.complete();
subject.error(new Error('err'));

subscription2.unsubscribe();
Expand Down Expand Up @@ -221,9 +215,6 @@ describe('Subject', function () {
subscription1.unsubscribe();

subject.complete();
subject.next(9);
subject.complete();
subject.error(new Error('err'));

subscription2.unsubscribe();

Expand Down Expand Up @@ -509,6 +500,57 @@ describe('Subject', function () {
source.subscribe(subject);
});

it('should throw ObjectUnsubscribedError when emit after unsubscribed', function () {
var subject = new Rx.Subject();
subject.unsubscribe();

expect(function () {
subject.next('a');
}).toThrow(new Rx.ObjectUnsubscribedError());

expect(function () {
subject.error('a');
}).toThrow(new Rx.ObjectUnsubscribedError());

expect(function () {
subject.complete();
}).toThrow(new Rx.ObjectUnsubscribedError());
});

it('should throw ObjectUnsubscribedError when emit after completed', function () {
var subject = new Rx.Subject();
subject.complete();

expect(function () {
subject.next('a');
}).toThrow(new Rx.ObjectUnsubscribedError());

expect(function () {
subject.error('a');
}).toThrow(new Rx.ObjectUnsubscribedError());

expect(function () {
subject.complete();
}).toThrow(new Rx.ObjectUnsubscribedError());
});

it('should throw ObjectUnsubscribedError when emit after error', function () {
var subject = new Rx.Subject();
subject.error('e');

expect(function () {
subject.next('a');
}).toThrow(new Rx.ObjectUnsubscribedError());

expect(function () {
subject.error('a');
}).toThrow(new Rx.ObjectUnsubscribedError());

expect(function () {
subject.complete();
}).toThrow(new Rx.ObjectUnsubscribedError());
});

describe('asObservable', function () {
it('should hide subject', function () {
var subject = new Rx.Subject();
Expand Down
2 changes: 0 additions & 2 deletions spec/subjects/AsyncSubject-spec.js
Expand Up @@ -70,8 +70,6 @@ describe('AsyncSubject', function () {
expect(observer.results).toEqual([]);
subject.complete();
expect(observer.results).toEqual([2, 'done']);
subject.next(3);
expect(observer.results).toEqual([2, 'done']);
});

it('should not emit values if unsubscribed before complete', function () {
Expand Down
5 changes: 4 additions & 1 deletion spec/subjects/BehaviorSubject-spec.js
Expand Up @@ -96,7 +96,10 @@ describe('BehaviorSubject', function () {

subject.next('foo');
subject.complete();
subject.next('bar');

expect(function () {
subject.next('bar');
}).toThrow(new Rx.ObjectUnsubscribedError());
});

it('should clean out unsubscribed subscribers', function (done) {
Expand Down
19 changes: 17 additions & 2 deletions src/Subject.ts
Expand Up @@ -6,6 +6,9 @@ import {Subscription} from './Subscription';
import {SubjectSubscription} from './subject/SubjectSubscription';
import {rxSubscriber} from './symbol/rxSubscriber';

import {throwError} from './util/throwError';
import {ObjectUnsubscribedError} from './util/ObjectUnsubscribedError';

export class Subject<T> extends Observable<T> implements Observer<T>, Subscription {

static create: Function = <T>(destination: Observer<T>, source: Observable<T>): Subject<T> => {
Expand Down Expand Up @@ -53,10 +56,10 @@ export class Subject<T> extends Observable<T> implements Observer<T>, Subscripti
return subscriber.error(this.errorValue);
} else if (this.hasCompleted) {
return subscriber.complete();
} else if (this.isUnsubscribed) {
throw new Error('Cannot subscribe to a disposed Subject.');
}

this.throwIfUnsubscribed();

const subscription = new SubjectSubscription(this, subscriber);

this.observers.push(subscriber);
Expand All @@ -73,6 +76,8 @@ export class Subject<T> extends Observable<T> implements Observer<T>, Subscripti
}

next(value: T): void {
this.throwIfUnsubscribed();

if (this.isStopped) {
return;
}
Expand All @@ -89,6 +94,8 @@ export class Subject<T> extends Observable<T> implements Observer<T>, Subscripti
}

error(err?: any): void {
this.throwIfUnsubscribed();

if (this.isStopped) {
return;
}
Expand All @@ -105,6 +112,8 @@ export class Subject<T> extends Observable<T> implements Observer<T>, Subscripti
}

complete(): void {
this.throwIfUnsubscribed();

if (this.isStopped) {
return;
}
Expand Down Expand Up @@ -200,6 +209,12 @@ export class Subject<T> extends Observable<T> implements Observer<T>, Subscripti
this.unsubscribe();
}

private throwIfUnsubscribed(): void {
if (this.isUnsubscribed) {
throwError(new ObjectUnsubscribedError());
}
}

[rxSubscriber]() {
return new Subscriber<T>(this);
}
Expand Down

0 comments on commit 29b630b

Please sign in to comment.