Skip to content

Commit

Permalink
Merge pull request #1976 from blesh/webSocketFix
Browse files Browse the repository at this point in the history
fix(WebSocketSubject): ensure all internal state properly reset
  • Loading branch information
jayphelps committed Oct 10, 2016
2 parents 5e28e52 + 62d242e commit 418d597
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 29 deletions.
97 changes: 75 additions & 22 deletions spec/observables/dom/webSocket-spec.ts
Expand Up @@ -394,6 +394,81 @@ describe('Observable.webSocket', () => {
});

describe('multiplex', () => {
it('should be retryable', () => {
const results = [];
const subject = Observable.webSocket('ws://websocket');
const source = subject.multiplex(() => {
return { sub: 'foo'};
}, () => {
return { unsub: 'foo' };
}, function (value: any) {
return value.name === 'foo';
});

source
.retry(1)
.map((x: any) => x.value)
.take(2)
.subscribe((x: any) => {
results.push(x);
});

const socket = MockWebSocket.lastSocket;
socket.open();

expect(socket.lastMessageSent).to.deep.equal({ sub: 'foo' });
socket.triggerClose({ wasClean: false }); // Bad connection

const socket2 = MockWebSocket.lastSocket;
expect(socket2).not.to.equal(socket);

socket2.open();
expect(socket2.lastMessageSent).to.deep.equal({ sub: 'foo' });

socket2.triggerMessage(JSON.stringify({ name: 'foo', value: 'test' }));
socket2.triggerMessage(JSON.stringify({ name: 'foo', value: 'this' }));

expect(results).to.deep.equal(['test', 'this']);
});

it('should be repeatable', () => {
const results = [];
const subject = Observable.webSocket('ws://websocket');
const source = subject.multiplex(() => {
return { sub: 'foo'};
}, () => {
return { unsub: 'foo' };
}, function (value: any) {
return value.name === 'foo';
});

source
.repeat(2)
.map((x: any) => x.value)
.subscribe((x: any) => {
results.push(x);
});

const socket = MockWebSocket.lastSocket;
socket.open();

expect(socket.lastMessageSent).to.deep.equal({ sub: 'foo' }, 'first multiplexed sub');
socket.triggerMessage(JSON.stringify({ name: 'foo', value: 'test' }));
socket.triggerMessage(JSON.stringify({ name: 'foo', value: 'this' }));
socket.triggerClose({ wasClean: true });

const socket2 = MockWebSocket.lastSocket;
expect(socket2).not.to.equal(socket, 'a new socket was not created');

socket2.open();
expect(socket2.lastMessageSent).to.deep.equal({ sub: 'foo' }, 'second multiplexed sub');
socket2.triggerMessage(JSON.stringify({ name: 'foo', value: 'test' }));
socket2.triggerMessage(JSON.stringify({ name: 'foo', value: 'this' }));
socket2.triggerClose({ wasClean: true });

expect(results).to.deep.equal(['test', 'this', 'test', 'this'], 'results were not equal');
});

it('should multiplex over the websocket', () => {
const results = [];
const subject = Observable.webSocket('ws://websocket');
Expand Down Expand Up @@ -432,28 +507,6 @@ describe('Observable.webSocket', () => {
(<any>socket.close).restore();
});

it('should work in combination with retry (issue #1466)', () => {
const error = { wasClean: false};
const results = [];

const subject = Observable.webSocket(<any>{url: 'ws://mysocket'})
.multiplex(
() => results.push('sub'),
() => results.push('unsub'),
() => true)
.retry(1);

subject.subscribe(
() => results.push('next'),
(e) => results.push(e));

let socket = MockWebSocket.lastSocket;

socket.triggerClose(error);

expect(results).to.deep.equal(['sub', 'unsub', 'sub', error, 'unsub']);
});

it('should not close the socket until all subscriptions complete', () => {
const socketSubject = Rx.Observable.webSocket(<any>{url: 'ws://mysocket'});
const results = [];
Expand Down
24 changes: 17 additions & 7 deletions src/observable/dom/WebSocketSubject.ts
Expand Up @@ -78,6 +78,14 @@ export class WebSocketSubject<T> extends AnonymousSubject<T> {
return sock;
}

private _resetState() {
this.socket = null;
if (!this.source) {
this.destination = new ReplaySubject();
}
this._output = new Subject<T>();
}

// TODO: factor this out to be a proper Operator/Subscriber implementation and eliminate closures
multiplex(subMsg: () => any, unsubMsg: () => any, messageFilter: (value: T) => boolean) {
const self = this;
Expand Down Expand Up @@ -155,17 +163,15 @@ export class WebSocketSubject<T> extends AnonymousSubject<T> {
observer.error(new TypeError('WebSocketSubject.error must be called with an object with an error code, ' +
'and an optional reason: { code: number, reason: string }'));
}
this.destination = new ReplaySubject();
this.socket = null;
this._resetState();
},
( ) => {
const closingObserver = this.closingObserver;
if (closingObserver) {
closingObserver.next(undefined);
}
socket.close();
this.destination = new ReplaySubject();
this.socket = null;
this._resetState();
}
);

Expand All @@ -174,9 +180,13 @@ export class WebSocketSubject<T> extends AnonymousSubject<T> {
}
};

socket.onerror = (e: Event) => observer.error(e);
socket.onerror = (e: Event) => {
this._resetState();
observer.error(e);
};

socket.onclose = (e: CloseEvent) => {
this._resetState();
const closeObserver = this.closeObserver;
if (closeObserver) {
closeObserver.next(e);
Expand Down Expand Up @@ -212,8 +222,8 @@ export class WebSocketSubject<T> extends AnonymousSubject<T> {
const { socket } = this;
if (this._output.observers.length === 0 && socket && socket.readyState === 1) {
socket.close();
this.socket = null;
}
this._resetState();
});
return subscription;
}
Expand All @@ -222,7 +232,7 @@ export class WebSocketSubject<T> extends AnonymousSubject<T> {
const { source, socket } = this;
if (socket && socket.readyState === 1) {
socket.close();
this.socket = null;
this._resetState();
}
super.unsubscribe();
if (!source) {
Expand Down

0 comments on commit 418d597

Please sign in to comment.