From a64985a8895b45cbd8b78a269e69f4c5ed05c9a7 Mon Sep 17 00:00:00 2001 From: Joel Rowley Date: Fri, 28 Jan 2022 17:02:20 -0500 Subject: [PATCH] feat: Mark subscribe() and unsubscribe() as async (#678) --- src/client/WebSocketClient.test.ts | 71 ++++++++++++++++++--------- src/client/WebSocketClient.ts | 16 +++--- src/demo/websocket-ticker.ts | 8 +-- src/demo/websocket-unsubscribe-all.ts | 12 ++--- src/demo/websocket-user.ts | 4 +- 5 files changed, 67 insertions(+), 44 deletions(-) diff --git a/src/client/WebSocketClient.test.ts b/src/client/WebSocketClient.test.ts index 8168f0c6..8c7299a3 100644 --- a/src/client/WebSocketClient.test.ts +++ b/src/client/WebSocketClient.test.ts @@ -226,10 +226,10 @@ describe('WebSocketClient', () => { const ws = mockWebSocketResponse(done, channel, statusPayload); - ws.on(WebSocketEvent.ON_MESSAGE_STATUS, message => { + ws.on(WebSocketEvent.ON_MESSAGE_STATUS, async message => { expect(message.currencies[2].details.sort_order).toBe(48); expect(message.products[72].id).toBe('XRP-USD'); - ws.unsubscribe(channel); + await ws.unsubscribe(channel); }); ws.connect(); @@ -243,9 +243,9 @@ describe('WebSocketClient', () => { const ws = mockWebSocketResponse(done, channel, tickerBTCUSD); - ws.on(WebSocketEvent.ON_MESSAGE_TICKER, tickerMessage => { + ws.on(WebSocketEvent.ON_MESSAGE_TICKER, async tickerMessage => { expect(tickerMessage.trade_id).toBe(3526965); - ws.unsubscribe(channel); + await ws.unsubscribe(channel); }); ws.connect(); @@ -259,11 +259,11 @@ describe('WebSocketClient', () => { const ws = mockWebSocketResponse(done, channel, l2snapshotBTCUSD); - ws.on(WebSocketEvent.ON_MESSAGE_L2SNAPSHOT, snapshotMessage => { + ws.on(WebSocketEvent.ON_MESSAGE_L2SNAPSHOT, async snapshotMessage => { expect(snapshotMessage.asks.length).toBe(10); expect(snapshotMessage.asks[0]).toEqual(['47009.28', '0.00100000']); expect(snapshotMessage.bids.length).toBe(10); - ws.unsubscribe(channel); + await ws.unsubscribe(channel); }); ws.connect(); @@ -277,11 +277,11 @@ describe('WebSocketClient', () => { const ws = mockWebSocketResponse(done, channel, l2updateBTCUSD); - ws.on(WebSocketEvent.ON_MESSAGE_L2UPDATE, updateMessage => { + ws.on(WebSocketEvent.ON_MESSAGE_L2UPDATE, async updateMessage => { expect(updateMessage.changes.length).toBe(5); expect(updateMessage.changes[0]).toEqual(['buy', '46961.95', '0.00000000']); expect(updateMessage.changes[1]).toEqual(['sell', '47027.24', '0.04443115']); - ws.unsubscribe(channel); + await ws.unsubscribe(channel); }); ws.connect(); @@ -295,11 +295,11 @@ describe('WebSocketClient', () => { const ws = mockWebSocketResponse(done, channel, fullActivateBTCUSD); - ws.on(WebSocketEvent.ON_MESSAGE_FULL_ACTIVATE, message => { + ws.on(WebSocketEvent.ON_MESSAGE_FULL_ACTIVATE, async message => { expect(message.profile_id).toBe('30000727-d308-cf50-7b1c-c06deb1934fc'); expect(message.private).toBe(true); expect(message.stop_type).toBe('entry'); - ws.unsubscribe(channel); + await ws.unsubscribe(channel); }); ws.connect(); @@ -313,10 +313,10 @@ describe('WebSocketClient', () => { const ws = mockWebSocketResponse(done, channel, fullReceivedLimitBTCUSD); - ws.on(WebSocketEvent.ON_MESSAGE_FULL_RECEIVED, message => { + ws.on(WebSocketEvent.ON_MESSAGE_FULL_RECEIVED, async message => { expect(message.order_type).toBe('limit'); expect(message.order_id).toBe('d50ec984-77a8-460a-b958-66f114b0de9b'); - ws.unsubscribe(channel); + await ws.unsubscribe(channel); }); ws.connect(); @@ -330,10 +330,10 @@ describe('WebSocketClient', () => { const ws = mockWebSocketResponse(done, channel, fullOpenBTCUSD); - ws.on(WebSocketEvent.ON_MESSAGE_FULL_OPEN, message => { + ws.on(WebSocketEvent.ON_MESSAGE_FULL_OPEN, async message => { expect(message.profile_id).toBe(undefined); expect(message.remaining_size).toBe('1.00'); - ws.unsubscribe(channel); + await ws.unsubscribe(channel); }); ws.connect(); @@ -347,11 +347,11 @@ describe('WebSocketClient', () => { const ws = mockWebSocketResponse(done, channel, fullDoneBTCUSD); - ws.on(WebSocketEvent.ON_MESSAGE_FULL_DONE, message => { + ws.on(WebSocketEvent.ON_MESSAGE_FULL_DONE, async message => { expect(message.profile_id).toBe(undefined); expect(message.remaining_size).toBe('0'); expect(message.reason).toBe('filled'); - ws.unsubscribe(channel); + await ws.unsubscribe(channel); }); ws.connect(); @@ -365,10 +365,10 @@ describe('WebSocketClient', () => { const ws = mockWebSocketResponse(done, channel, fullChangeBTCUSD); - ws.on(WebSocketEvent.ON_MESSAGE_FULL_CHANGE, message => { + ws.on(WebSocketEvent.ON_MESSAGE_FULL_CHANGE, async message => { expect(message.new_size).toBe('5.23512'); expect(message.old_size).toBe('12.234412'); - ws.unsubscribe(channel); + await ws.unsubscribe(channel); }); ws.connect(); @@ -382,9 +382,9 @@ describe('WebSocketClient', () => { const ws = mockWebSocketResponse(done, channel, tickerBTCUSD); - ws.on(WebSocketEvent.ON_MESSAGE_TICKER, tickerMessage => { + ws.on(WebSocketEvent.ON_MESSAGE_TICKER, async tickerMessage => { expect(tickerMessage.trade_id).toBe(3526965); - ws.unsubscribe(channel); + await ws.unsubscribe(channel); }); ws.connect(); @@ -400,9 +400,9 @@ describe('WebSocketClient', () => { const ws = mockWebSocketResponse(done, channels, matchesBTCUSD); - ws.on(WebSocketEvent.ON_MESSAGE_MATCHES, message => { + ws.on(WebSocketEvent.ON_MESSAGE_MATCHES, async message => { expect(message.trade_id).toBe(9713921); - ws.unsubscribe(channels); + await ws.unsubscribe(channels); }); ws.connect(); @@ -434,8 +434,8 @@ describe('WebSocketClient', () => { done(); }); - ws.on(WebSocketEvent.ON_OPEN, () => { - ws.subscribe({ + ws.on(WebSocketEvent.ON_OPEN, async () => { + await ws.subscribe({ name: WebSocketChannelName.USER, product_ids: ['BTC-USD'], }); @@ -443,6 +443,29 @@ describe('WebSocketClient', () => { ws.connect(); }); + + it('does not throw an exception when disconnect is called immediately after an awaited subscribe', done => { + const ws = createWebSocketClient(); + + const channel: WebSocketChannel = { + name: WebSocketChannelName.TICKER, + product_ids: ['BTC-USD', 'ETH-USD'], + }; + + ws.on(WebSocketEvent.ON_OPEN, async () => { + await ws.subscribe(channel); + + expect(() => { + ws.disconnect(); + }).not.toThrow(); + }); + + ws.on(WebSocketEvent.ON_CLOSE, () => { + done(); + }); + + ws.connect(); + }); }); describe('unsubscribe', () => { diff --git a/src/client/WebSocketClient.ts b/src/client/WebSocketClient.ts index 92284677..b9faa662 100644 --- a/src/client/WebSocketClient.ts +++ b/src/client/WebSocketClient.ts @@ -481,10 +481,6 @@ export class WebSocketClient extends EventEmitter { } async sendMessage(message: WebSocketRequest): Promise { - if (!this.socket) { - throw new Error(`Failed to send message of type "${message.type}": You need to connect to the WebSocket first.`); - } - /** * Authentication will result in a couple of benefits: * 1. Messages where you're one of the parties are expanded and have more useful fields @@ -498,18 +494,22 @@ export class WebSocketClient extends EventEmitter { }); Object.assign(message, signature); + if (!this.socket) { + throw new Error(`Failed to send message of type "${message.type}": You need to connect to the WebSocket first.`); + } + this.socket.send(JSON.stringify(message)); } - subscribe(channel: WebSocketChannel | WebSocketChannel[]): void { - this.sendMessage({ + async subscribe(channel: WebSocketChannel | WebSocketChannel[]): Promise { + await this.sendMessage({ channels: Array.isArray(channel) ? channel : [channel], type: WebSocketRequestType.SUBSCRIBE, }).finally(() => {}); } - unsubscribe(channel: WebSocketChannelName | WebSocketChannel | WebSocketChannel[]): void { - this.sendMessage({ + async unsubscribe(channel: WebSocketChannelName | WebSocketChannel | WebSocketChannel[]): Promise { + await this.sendMessage({ channels: this.mapChannels(channel), type: WebSocketRequestType.UNSUBSCRIBE, }).finally(() => {}); diff --git a/src/demo/websocket-ticker.ts b/src/demo/websocket-ticker.ts index 286b14d5..25cd49a6 100644 --- a/src/demo/websocket-ticker.ts +++ b/src/demo/websocket-ticker.ts @@ -11,9 +11,9 @@ const channel = { }; // 3. Wait for open WebSocket to send messages -client.ws.on(WebSocketEvent.ON_OPEN, () => { +client.ws.on(WebSocketEvent.ON_OPEN, async () => { // 7. Subscribe to WebSocket channel - client.ws.subscribe([channel]); + await client.ws.subscribe([channel]); }); // 4. Listen to WebSocket subscription updates @@ -26,11 +26,11 @@ client.ws.on(WebSocketEvent.ON_SUBSCRIPTION_UPDATE, subscriptions => { }); // 5. Listen to WebSocket channel updates -client.ws.on(WebSocketEvent.ON_MESSAGE_TICKER, tickerMessage => { +client.ws.on(WebSocketEvent.ON_MESSAGE_TICKER, async tickerMessage => { // 8. Receive message from WebSocket channel console.info(`Received message of type "${tickerMessage.type}".`, tickerMessage); // 9. Unsubscribe from WebSocket channel - client.ws.unsubscribe([ + await client.ws.unsubscribe([ { name: WebSocketChannelName.TICKER, product_ids: [tickerMessage.product_id], diff --git a/src/demo/websocket-unsubscribe-all.ts b/src/demo/websocket-unsubscribe-all.ts index f0396414..f7b1e52e 100644 --- a/src/demo/websocket-unsubscribe-all.ts +++ b/src/demo/websocket-unsubscribe-all.ts @@ -18,11 +18,11 @@ const channels = [ }, ]; -client.ws.on(WebSocketEvent.ON_OPEN, () => { - client.ws.subscribe(channels); +client.ws.on(WebSocketEvent.ON_OPEN, async () => { + await client.ws.subscribe(channels); }); -client.ws.on(WebSocketEvent.ON_SUBSCRIPTION_UPDATE, subscriptions => { +client.ws.on(WebSocketEvent.ON_SUBSCRIPTION_UPDATE, async subscriptions => { const subscriptionCount = subscriptions.channels.length; const uniqueProductIds = new Set(); const productIds = subscriptions.channels.map(subscription => subscription.product_ids); @@ -40,15 +40,15 @@ client.ws.on(WebSocketEvent.ON_SUBSCRIPTION_UPDATE, subscriptions => { break; case 1: console.info(`We will unsubscribe from "${WebSocketChannelName.LEVEL2}" channel...`); - client.ws.unsubscribe(WebSocketChannelName.LEVEL2); + await client.ws.unsubscribe(WebSocketChannelName.LEVEL2); break; case 2: console.info(`We will unsubscribe from "${WebSocketChannelName.HEARTBEAT}" channel...`); - client.ws.unsubscribe(WebSocketChannelName.HEARTBEAT); + await client.ws.unsubscribe(WebSocketChannelName.HEARTBEAT); break; case 3: console.info(`We will unsubscribe from "${WebSocketChannelName.TICKER}" channel...`); - client.ws.unsubscribe(WebSocketChannelName.TICKER); + await client.ws.unsubscribe(WebSocketChannelName.TICKER); break; } }); diff --git a/src/demo/websocket-user.ts b/src/demo/websocket-user.ts index 4a9fc041..5fea926c 100644 --- a/src/demo/websocket-user.ts +++ b/src/demo/websocket-user.ts @@ -16,8 +16,8 @@ client.ws.on(WebSocketEvent.ON_MESSAGE_ERROR, errorMessage => { throw new Error(`${errorMessage.message}: ${errorMessage.reason}`); }); -client.ws.on(WebSocketEvent.ON_OPEN, () => { - client.ws.subscribe(channel); +client.ws.on(WebSocketEvent.ON_OPEN, async () => { + await client.ws.subscribe(channel); }); client.ws.connect();