Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
feat: Mark subscribe() and unsubscribe() as async (#678)
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoelr committed Jan 28, 2022
1 parent d48e45e commit a64985a
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 44 deletions.
71 changes: 47 additions & 24 deletions src/client/WebSocketClient.test.ts
Expand Up @@ -226,10 +226,10 @@ describe('WebSocketClient', () => {

const ws = mockWebSocketResponse(done, channel, statusPayload);

ws.on(WebSocketEvent.ON_MESSAGE_STATUS, message => {
ws.on(WebSocketEvent.ON_MESSAGE_STATUS, async message => {
expect(message.currencies[2].details.sort_order).toBe(48);
expect(message.products[72].id).toBe('XRP-USD');
ws.unsubscribe(channel);
await ws.unsubscribe(channel);
});

ws.connect();
Expand All @@ -243,9 +243,9 @@ describe('WebSocketClient', () => {

const ws = mockWebSocketResponse(done, channel, tickerBTCUSD);

ws.on(WebSocketEvent.ON_MESSAGE_TICKER, tickerMessage => {
ws.on(WebSocketEvent.ON_MESSAGE_TICKER, async tickerMessage => {
expect(tickerMessage.trade_id).toBe(3526965);
ws.unsubscribe(channel);
await ws.unsubscribe(channel);
});

ws.connect();
Expand All @@ -259,11 +259,11 @@ describe('WebSocketClient', () => {

const ws = mockWebSocketResponse(done, channel, l2snapshotBTCUSD);

ws.on(WebSocketEvent.ON_MESSAGE_L2SNAPSHOT, snapshotMessage => {
ws.on(WebSocketEvent.ON_MESSAGE_L2SNAPSHOT, async snapshotMessage => {
expect<number>(snapshotMessage.asks.length).toBe(10);
expect(snapshotMessage.asks[0]).toEqual(['47009.28', '0.00100000']);
expect<number>(snapshotMessage.bids.length).toBe(10);
ws.unsubscribe(channel);
await ws.unsubscribe(channel);
});

ws.connect();
Expand All @@ -277,11 +277,11 @@ describe('WebSocketClient', () => {

const ws = mockWebSocketResponse(done, channel, l2updateBTCUSD);

ws.on(WebSocketEvent.ON_MESSAGE_L2UPDATE, updateMessage => {
ws.on(WebSocketEvent.ON_MESSAGE_L2UPDATE, async updateMessage => {
expect<number>(updateMessage.changes.length).toBe(5);
expect(updateMessage.changes[0]).toEqual(['buy', '46961.95', '0.00000000']);
expect(updateMessage.changes[1]).toEqual(['sell', '47027.24', '0.04443115']);
ws.unsubscribe(channel);
await ws.unsubscribe(channel);
});

ws.connect();
Expand All @@ -295,11 +295,11 @@ describe('WebSocketClient', () => {

const ws = mockWebSocketResponse(done, channel, fullActivateBTCUSD);

ws.on(WebSocketEvent.ON_MESSAGE_FULL_ACTIVATE, message => {
ws.on(WebSocketEvent.ON_MESSAGE_FULL_ACTIVATE, async message => {
expect(message.profile_id).toBe('30000727-d308-cf50-7b1c-c06deb1934fc');
expect(message.private).toBe(true);
expect(message.stop_type).toBe('entry');
ws.unsubscribe(channel);
await ws.unsubscribe(channel);
});

ws.connect();
Expand All @@ -313,10 +313,10 @@ describe('WebSocketClient', () => {

const ws = mockWebSocketResponse(done, channel, fullReceivedLimitBTCUSD);

ws.on(WebSocketEvent.ON_MESSAGE_FULL_RECEIVED, message => {
ws.on(WebSocketEvent.ON_MESSAGE_FULL_RECEIVED, async message => {
expect(message.order_type).toBe('limit');
expect(message.order_id).toBe('d50ec984-77a8-460a-b958-66f114b0de9b');
ws.unsubscribe(channel);
await ws.unsubscribe(channel);
});

ws.connect();
Expand All @@ -330,10 +330,10 @@ describe('WebSocketClient', () => {

const ws = mockWebSocketResponse(done, channel, fullOpenBTCUSD);

ws.on(WebSocketEvent.ON_MESSAGE_FULL_OPEN, message => {
ws.on(WebSocketEvent.ON_MESSAGE_FULL_OPEN, async message => {
expect(message.profile_id).toBe(undefined);
expect(message.remaining_size).toBe('1.00');
ws.unsubscribe(channel);
await ws.unsubscribe(channel);
});

ws.connect();
Expand All @@ -347,11 +347,11 @@ describe('WebSocketClient', () => {

const ws = mockWebSocketResponse(done, channel, fullDoneBTCUSD);

ws.on(WebSocketEvent.ON_MESSAGE_FULL_DONE, message => {
ws.on(WebSocketEvent.ON_MESSAGE_FULL_DONE, async message => {
expect(message.profile_id).toBe(undefined);
expect(message.remaining_size).toBe('0');
expect(message.reason).toBe('filled');
ws.unsubscribe(channel);
await ws.unsubscribe(channel);
});

ws.connect();
Expand All @@ -365,10 +365,10 @@ describe('WebSocketClient', () => {

const ws = mockWebSocketResponse(done, channel, fullChangeBTCUSD);

ws.on(WebSocketEvent.ON_MESSAGE_FULL_CHANGE, message => {
ws.on(WebSocketEvent.ON_MESSAGE_FULL_CHANGE, async message => {
expect(message.new_size).toBe('5.23512');
expect(message.old_size).toBe('12.234412');
ws.unsubscribe(channel);
await ws.unsubscribe(channel);
});

ws.connect();
Expand All @@ -382,9 +382,9 @@ describe('WebSocketClient', () => {

const ws = mockWebSocketResponse(done, channel, tickerBTCUSD);

ws.on(WebSocketEvent.ON_MESSAGE_TICKER, tickerMessage => {
ws.on(WebSocketEvent.ON_MESSAGE_TICKER, async tickerMessage => {
expect(tickerMessage.trade_id).toBe(3526965);
ws.unsubscribe(channel);
await ws.unsubscribe(channel);
});

ws.connect();
Expand All @@ -400,9 +400,9 @@ describe('WebSocketClient', () => {

const ws = mockWebSocketResponse(done, channels, matchesBTCUSD);

ws.on(WebSocketEvent.ON_MESSAGE_MATCHES, message => {
ws.on(WebSocketEvent.ON_MESSAGE_MATCHES, async message => {
expect(message.trade_id).toBe(9713921);
ws.unsubscribe(channels);
await ws.unsubscribe(channels);
});

ws.connect();
Expand Down Expand Up @@ -434,15 +434,38 @@ describe('WebSocketClient', () => {
done();
});

ws.on(WebSocketEvent.ON_OPEN, () => {
ws.subscribe({
ws.on(WebSocketEvent.ON_OPEN, async () => {
await ws.subscribe({
name: WebSocketChannelName.USER,
product_ids: ['BTC-USD'],
});
});

ws.connect();
});

it('does not throw an exception when disconnect is called immediately after an awaited subscribe', done => {
const ws = createWebSocketClient();

const channel: WebSocketChannel = {
name: WebSocketChannelName.TICKER,
product_ids: ['BTC-USD', 'ETH-USD'],
};

ws.on(WebSocketEvent.ON_OPEN, async () => {
await ws.subscribe(channel);

expect(() => {
ws.disconnect();
}).not.toThrow();
});

ws.on(WebSocketEvent.ON_CLOSE, () => {
done();
});

ws.connect();
});
});

describe('unsubscribe', () => {
Expand Down
16 changes: 8 additions & 8 deletions src/client/WebSocketClient.ts
Expand Up @@ -481,10 +481,6 @@ export class WebSocketClient extends EventEmitter {
}

async sendMessage(message: WebSocketRequest): Promise<void> {
if (!this.socket) {
throw new Error(`Failed to send message of type "${message.type}": You need to connect to the WebSocket first.`);
}

/**
* Authentication will result in a couple of benefits:
* 1. Messages where you're one of the parties are expanded and have more useful fields
Expand All @@ -498,18 +494,22 @@ export class WebSocketClient extends EventEmitter {
});
Object.assign(message, signature);

if (!this.socket) {
throw new Error(`Failed to send message of type "${message.type}": You need to connect to the WebSocket first.`);
}

this.socket.send(JSON.stringify(message));
}

subscribe(channel: WebSocketChannel | WebSocketChannel[]): void {
this.sendMessage({
async subscribe(channel: WebSocketChannel | WebSocketChannel[]): Promise<void> {
await this.sendMessage({
channels: Array.isArray(channel) ? channel : [channel],
type: WebSocketRequestType.SUBSCRIBE,
}).finally(() => {});
}

unsubscribe(channel: WebSocketChannelName | WebSocketChannel | WebSocketChannel[]): void {
this.sendMessage({
async unsubscribe(channel: WebSocketChannelName | WebSocketChannel | WebSocketChannel[]): Promise<void> {
await this.sendMessage({
channels: this.mapChannels(channel),
type: WebSocketRequestType.UNSUBSCRIBE,
}).finally(() => {});
Expand Down
8 changes: 4 additions & 4 deletions src/demo/websocket-ticker.ts
Expand Up @@ -11,9 +11,9 @@ const channel = {
};

// 3. Wait for open WebSocket to send messages
client.ws.on(WebSocketEvent.ON_OPEN, () => {
client.ws.on(WebSocketEvent.ON_OPEN, async () => {
// 7. Subscribe to WebSocket channel
client.ws.subscribe([channel]);
await client.ws.subscribe([channel]);
});

// 4. Listen to WebSocket subscription updates
Expand All @@ -26,11 +26,11 @@ client.ws.on(WebSocketEvent.ON_SUBSCRIPTION_UPDATE, subscriptions => {
});

// 5. Listen to WebSocket channel updates
client.ws.on(WebSocketEvent.ON_MESSAGE_TICKER, tickerMessage => {
client.ws.on(WebSocketEvent.ON_MESSAGE_TICKER, async tickerMessage => {
// 8. Receive message from WebSocket channel
console.info(`Received message of type "${tickerMessage.type}".`, tickerMessage);
// 9. Unsubscribe from WebSocket channel
client.ws.unsubscribe([
await client.ws.unsubscribe([
{
name: WebSocketChannelName.TICKER,
product_ids: [tickerMessage.product_id],
Expand Down
12 changes: 6 additions & 6 deletions src/demo/websocket-unsubscribe-all.ts
Expand Up @@ -18,11 +18,11 @@ const channels = [
},
];

client.ws.on(WebSocketEvent.ON_OPEN, () => {
client.ws.subscribe(channels);
client.ws.on(WebSocketEvent.ON_OPEN, async () => {
await client.ws.subscribe(channels);
});

client.ws.on(WebSocketEvent.ON_SUBSCRIPTION_UPDATE, subscriptions => {
client.ws.on(WebSocketEvent.ON_SUBSCRIPTION_UPDATE, async subscriptions => {
const subscriptionCount = subscriptions.channels.length;
const uniqueProductIds = new Set();
const productIds = subscriptions.channels.map(subscription => subscription.product_ids);
Expand All @@ -40,15 +40,15 @@ client.ws.on(WebSocketEvent.ON_SUBSCRIPTION_UPDATE, subscriptions => {
break;
case 1:
console.info(`We will unsubscribe from "${WebSocketChannelName.LEVEL2}" channel...`);
client.ws.unsubscribe(WebSocketChannelName.LEVEL2);
await client.ws.unsubscribe(WebSocketChannelName.LEVEL2);
break;
case 2:
console.info(`We will unsubscribe from "${WebSocketChannelName.HEARTBEAT}" channel...`);
client.ws.unsubscribe(WebSocketChannelName.HEARTBEAT);
await client.ws.unsubscribe(WebSocketChannelName.HEARTBEAT);
break;
case 3:
console.info(`We will unsubscribe from "${WebSocketChannelName.TICKER}" channel...`);
client.ws.unsubscribe(WebSocketChannelName.TICKER);
await client.ws.unsubscribe(WebSocketChannelName.TICKER);
break;
}
});
Expand Down
4 changes: 2 additions & 2 deletions src/demo/websocket-user.ts
Expand Up @@ -16,8 +16,8 @@ client.ws.on(WebSocketEvent.ON_MESSAGE_ERROR, errorMessage => {
throw new Error(`${errorMessage.message}: ${errorMessage.reason}`);
});

client.ws.on(WebSocketEvent.ON_OPEN, () => {
client.ws.subscribe(channel);
client.ws.on(WebSocketEvent.ON_OPEN, async () => {
await client.ws.subscribe(channel);
});

client.ws.connect();

0 comments on commit a64985a

Please sign in to comment.