Skip to content

Commit

Permalink
fix(websocket): no longer throws errors in operators applied to it (#…
Browse files Browse the repository at this point in the history
…3577)

There was an issue where using operators on a WebSocketSubject resulted in errors being thrown
because the source was not set in lift.

This also updates the tests to use pipeable operators.
  • Loading branch information
benlesh committed Apr 20, 2018
1 parent d267b38 commit cb38ddf
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 22 deletions.
62 changes: 40 additions & 22 deletions spec/observables/dom/webSocket-spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { expect } from 'chai';
import * as sinon from 'sinon';
import { websocket } from 'rxjs/websocket';
import { map, retry, take, repeat } from 'rxjs/operators';
import { map, retry, take, repeat, takeWhile } from 'rxjs/operators';

declare const __root__: any;

Expand Down Expand Up @@ -51,6 +51,20 @@ describe('websocket', () => {
subject.unsubscribe();
});

it('should allow use of operators and subscribe', () => {
const subject = websocket<string>('ws://mysocket');
const results: any[] = [];

subject.pipe(
map(x => x + '!'),
)
.subscribe(x => results.push(x));

MockWebSocket.lastSocket.triggerMessage(JSON.stringify('ngconf 2018 bug'));

expect(results).to.deep.equal(['ngconf 2018 bug!']);
});

it('receive multiple messages', () => {
const expected = ['what', 'do', 'you', 'do', 'with', 'a', 'drunken', 'sailor?'];
const results: string[] = [];
Expand Down Expand Up @@ -526,13 +540,15 @@ describe('websocket', () => {
const sub1 = socketSubject.multiplex(
() => 'no-op',
() => results.push('A unsub'),
(req: any) => req.id === 'A')
.takeWhile((req: any) => !req.complete)
.subscribe(
() => results.push('A next'),
(e) => results.push('A error ' + e),
() => results.push('A complete')
);
(req: any) => req.id === 'A'
).pipe(
takeWhile((req: any) => !req.complete)
)
.subscribe(
() => results.push('A next'),
(e) => results.push('A error ' + e),
() => results.push('A complete')
);

socketSubject.multiplex(
() => 'no-op',
Expand Down Expand Up @@ -581,24 +597,26 @@ describe('websocket', () => {
socketSubject.multiplex(
() => 'no-op',
() => results.push('A unsub'),
req => req.id === 'A')
.takeWhile(req => !req.complete)
.subscribe(
() => results.push('A next'),
(e) => results.push('A error ' + e),
() => results.push('A complete')
);
req => req.id === 'A'
).pipe(
takeWhile(req => !req.complete)
).subscribe(
() => results.push('A next'),
(e) => results.push('A error ' + e),
() => results.push('A complete')
);

socketSubject.multiplex(
() => 'no-op',
() => results.push('B unsub'),
req => req.id === 'B')
.takeWhile(req => !req.complete)
.subscribe(
() => results.push('B next'),
(e) => results.push('B error ' + e),
() => results.push('B complete')
);
req => req.id === 'B'
).pipe(
takeWhile(req => !req.complete)
).subscribe(
() => results.push('B next'),
(e) => results.push('B error ' + e),
() => results.push('B complete')
);

// Setup socket and send messages
let socket = MockWebSocket.lastSocket;
Expand Down
1 change: 1 addition & 0 deletions src/internal/observable/dom/WebSocketSubject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ export class WebSocketSubject<T> extends AnonymousSubject<T> {
lift<R>(operator: Operator<T, R>): WebSocketSubject<R> {
const sock = new WebSocketSubject<R>(this._config as WebSocketSubjectConfig<any>, <any> this.destination);
sock.operator = operator;
sock.source = this;
return sock;
}

Expand Down

0 comments on commit cb38ddf

Please sign in to comment.