Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Mark subscribe() and unsubscribe() as async #678

Merged
merged 2 commits into from
Jan 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
71 changes: 47 additions & 24 deletions src/client/WebSocketClient.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -226,10 +226,10 @@ describe('WebSocketClient', () => {

const ws = mockWebSocketResponse(done, channel, statusPayload);

ws.on(WebSocketEvent.ON_MESSAGE_STATUS, message => {
ws.on(WebSocketEvent.ON_MESSAGE_STATUS, async message => {
expect(message.currencies[2].details.sort_order).toBe(48);
expect(message.products[72].id).toBe('XRP-USD');
ws.unsubscribe(channel);
await ws.unsubscribe(channel);
});

ws.connect();
Expand All @@ -243,9 +243,9 @@ describe('WebSocketClient', () => {

const ws = mockWebSocketResponse(done, channel, tickerBTCUSD);

ws.on(WebSocketEvent.ON_MESSAGE_TICKER, tickerMessage => {
ws.on(WebSocketEvent.ON_MESSAGE_TICKER, async tickerMessage => {
expect(tickerMessage.trade_id).toBe(3526965);
ws.unsubscribe(channel);
await ws.unsubscribe(channel);
});

ws.connect();
Expand All @@ -259,11 +259,11 @@ describe('WebSocketClient', () => {

const ws = mockWebSocketResponse(done, channel, l2snapshotBTCUSD);

ws.on(WebSocketEvent.ON_MESSAGE_L2SNAPSHOT, snapshotMessage => {
ws.on(WebSocketEvent.ON_MESSAGE_L2SNAPSHOT, async snapshotMessage => {
expect<number>(snapshotMessage.asks.length).toBe(10);
expect(snapshotMessage.asks[0]).toEqual(['47009.28', '0.00100000']);
expect<number>(snapshotMessage.bids.length).toBe(10);
ws.unsubscribe(channel);
await ws.unsubscribe(channel);
});

ws.connect();
Expand All @@ -277,11 +277,11 @@ describe('WebSocketClient', () => {

const ws = mockWebSocketResponse(done, channel, l2updateBTCUSD);

ws.on(WebSocketEvent.ON_MESSAGE_L2UPDATE, updateMessage => {
ws.on(WebSocketEvent.ON_MESSAGE_L2UPDATE, async updateMessage => {
expect<number>(updateMessage.changes.length).toBe(5);
expect(updateMessage.changes[0]).toEqual(['buy', '46961.95', '0.00000000']);
expect(updateMessage.changes[1]).toEqual(['sell', '47027.24', '0.04443115']);
ws.unsubscribe(channel);
await ws.unsubscribe(channel);
});

ws.connect();
Expand All @@ -295,11 +295,11 @@ describe('WebSocketClient', () => {

const ws = mockWebSocketResponse(done, channel, fullActivateBTCUSD);

ws.on(WebSocketEvent.ON_MESSAGE_FULL_ACTIVATE, message => {
ws.on(WebSocketEvent.ON_MESSAGE_FULL_ACTIVATE, async message => {
expect(message.profile_id).toBe('30000727-d308-cf50-7b1c-c06deb1934fc');
expect(message.private).toBe(true);
expect(message.stop_type).toBe('entry');
ws.unsubscribe(channel);
await ws.unsubscribe(channel);
});

ws.connect();
Expand All @@ -313,10 +313,10 @@ describe('WebSocketClient', () => {

const ws = mockWebSocketResponse(done, channel, fullReceivedLimitBTCUSD);

ws.on(WebSocketEvent.ON_MESSAGE_FULL_RECEIVED, message => {
ws.on(WebSocketEvent.ON_MESSAGE_FULL_RECEIVED, async message => {
expect(message.order_type).toBe('limit');
expect(message.order_id).toBe('d50ec984-77a8-460a-b958-66f114b0de9b');
ws.unsubscribe(channel);
await ws.unsubscribe(channel);
});

ws.connect();
Expand All @@ -330,10 +330,10 @@ describe('WebSocketClient', () => {

const ws = mockWebSocketResponse(done, channel, fullOpenBTCUSD);

ws.on(WebSocketEvent.ON_MESSAGE_FULL_OPEN, message => {
ws.on(WebSocketEvent.ON_MESSAGE_FULL_OPEN, async message => {
expect(message.profile_id).toBe(undefined);
expect(message.remaining_size).toBe('1.00');
ws.unsubscribe(channel);
await ws.unsubscribe(channel);
});

ws.connect();
Expand All @@ -347,11 +347,11 @@ describe('WebSocketClient', () => {

const ws = mockWebSocketResponse(done, channel, fullDoneBTCUSD);

ws.on(WebSocketEvent.ON_MESSAGE_FULL_DONE, message => {
ws.on(WebSocketEvent.ON_MESSAGE_FULL_DONE, async message => {
expect(message.profile_id).toBe(undefined);
expect(message.remaining_size).toBe('0');
expect(message.reason).toBe('filled');
ws.unsubscribe(channel);
await ws.unsubscribe(channel);
});

ws.connect();
Expand All @@ -365,10 +365,10 @@ describe('WebSocketClient', () => {

const ws = mockWebSocketResponse(done, channel, fullChangeBTCUSD);

ws.on(WebSocketEvent.ON_MESSAGE_FULL_CHANGE, message => {
ws.on(WebSocketEvent.ON_MESSAGE_FULL_CHANGE, async message => {
expect(message.new_size).toBe('5.23512');
expect(message.old_size).toBe('12.234412');
ws.unsubscribe(channel);
await ws.unsubscribe(channel);
});

ws.connect();
Expand All @@ -382,9 +382,9 @@ describe('WebSocketClient', () => {

const ws = mockWebSocketResponse(done, channel, tickerBTCUSD);

ws.on(WebSocketEvent.ON_MESSAGE_TICKER, tickerMessage => {
ws.on(WebSocketEvent.ON_MESSAGE_TICKER, async tickerMessage => {
expect(tickerMessage.trade_id).toBe(3526965);
ws.unsubscribe(channel);
await ws.unsubscribe(channel);
});

ws.connect();
Expand All @@ -400,9 +400,9 @@ describe('WebSocketClient', () => {

const ws = mockWebSocketResponse(done, channels, matchesBTCUSD);

ws.on(WebSocketEvent.ON_MESSAGE_MATCHES, message => {
ws.on(WebSocketEvent.ON_MESSAGE_MATCHES, async message => {
expect(message.trade_id).toBe(9713921);
ws.unsubscribe(channels);
await ws.unsubscribe(channels);
});

ws.connect();
Expand Down Expand Up @@ -434,15 +434,38 @@ describe('WebSocketClient', () => {
done();
});

ws.on(WebSocketEvent.ON_OPEN, () => {
ws.subscribe({
ws.on(WebSocketEvent.ON_OPEN, async () => {
await ws.subscribe({
name: WebSocketChannelName.USER,
product_ids: ['BTC-USD'],
});
});

ws.connect();
});

it('does not throw an exception when disconnect is called immediately after an awaited subscribe', done => {
const ws = createWebSocketClient();

const channel: WebSocketChannel = {
name: WebSocketChannelName.TICKER,
product_ids: ['BTC-USD', 'ETH-USD'],
};

ws.on(WebSocketEvent.ON_OPEN, async () => {
await ws.subscribe(channel);

expect(() => {
ws.disconnect();
}).not.toThrow();
});

ws.on(WebSocketEvent.ON_CLOSE, () => {
done();
});

ws.connect();
});
});

describe('unsubscribe', () => {
Expand Down
16 changes: 8 additions & 8 deletions src/client/WebSocketClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -481,10 +481,6 @@ export class WebSocketClient extends EventEmitter {
}

async sendMessage(message: WebSocketRequest): Promise<void> {
if (!this.socket) {
throw new Error(`Failed to send message of type "${message.type}": You need to connect to the WebSocket first.`);
}

/**
* Authentication will result in a couple of benefits:
* 1. Messages where you're one of the parties are expanded and have more useful fields
Expand All @@ -498,18 +494,22 @@ export class WebSocketClient extends EventEmitter {
});
Object.assign(message, signature);

if (!this.socket) {
throw new Error(`Failed to send message of type "${message.type}": You need to connect to the WebSocket first.`);
}

this.socket.send(JSON.stringify(message));
}

subscribe(channel: WebSocketChannel | WebSocketChannel[]): void {
this.sendMessage({
async subscribe(channel: WebSocketChannel | WebSocketChannel[]): Promise<void> {
await this.sendMessage({
channels: Array.isArray(channel) ? channel : [channel],
type: WebSocketRequestType.SUBSCRIBE,
}).finally(() => {});
}

unsubscribe(channel: WebSocketChannelName | WebSocketChannel | WebSocketChannel[]): void {
this.sendMessage({
async unsubscribe(channel: WebSocketChannelName | WebSocketChannel | WebSocketChannel[]): Promise<void> {
await this.sendMessage({
channels: this.mapChannels(channel),
type: WebSocketRequestType.UNSUBSCRIBE,
}).finally(() => {});
Expand Down
8 changes: 4 additions & 4 deletions src/demo/websocket-ticker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ const channel = {
};

// 3. Wait for open WebSocket to send messages
client.ws.on(WebSocketEvent.ON_OPEN, () => {
client.ws.on(WebSocketEvent.ON_OPEN, async () => {
// 7. Subscribe to WebSocket channel
client.ws.subscribe([channel]);
await client.ws.subscribe([channel]);
});

// 4. Listen to WebSocket subscription updates
Expand All @@ -26,11 +26,11 @@ client.ws.on(WebSocketEvent.ON_SUBSCRIPTION_UPDATE, subscriptions => {
});

// 5. Listen to WebSocket channel updates
client.ws.on(WebSocketEvent.ON_MESSAGE_TICKER, tickerMessage => {
client.ws.on(WebSocketEvent.ON_MESSAGE_TICKER, async tickerMessage => {
// 8. Receive message from WebSocket channel
console.info(`Received message of type "${tickerMessage.type}".`, tickerMessage);
// 9. Unsubscribe from WebSocket channel
client.ws.unsubscribe([
await client.ws.unsubscribe([
{
name: WebSocketChannelName.TICKER,
product_ids: [tickerMessage.product_id],
Expand Down
12 changes: 6 additions & 6 deletions src/demo/websocket-unsubscribe-all.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ const channels = [
},
];

client.ws.on(WebSocketEvent.ON_OPEN, () => {
client.ws.subscribe(channels);
client.ws.on(WebSocketEvent.ON_OPEN, async () => {
await client.ws.subscribe(channels);
});

client.ws.on(WebSocketEvent.ON_SUBSCRIPTION_UPDATE, subscriptions => {
client.ws.on(WebSocketEvent.ON_SUBSCRIPTION_UPDATE, async subscriptions => {
const subscriptionCount = subscriptions.channels.length;
const uniqueProductIds = new Set();
const productIds = subscriptions.channels.map(subscription => subscription.product_ids);
Expand All @@ -40,15 +40,15 @@ client.ws.on(WebSocketEvent.ON_SUBSCRIPTION_UPDATE, subscriptions => {
break;
case 1:
console.info(`We will unsubscribe from "${WebSocketChannelName.LEVEL2}" channel...`);
client.ws.unsubscribe(WebSocketChannelName.LEVEL2);
await client.ws.unsubscribe(WebSocketChannelName.LEVEL2);
break;
case 2:
console.info(`We will unsubscribe from "${WebSocketChannelName.HEARTBEAT}" channel...`);
client.ws.unsubscribe(WebSocketChannelName.HEARTBEAT);
await client.ws.unsubscribe(WebSocketChannelName.HEARTBEAT);
break;
case 3:
console.info(`We will unsubscribe from "${WebSocketChannelName.TICKER}" channel...`);
client.ws.unsubscribe(WebSocketChannelName.TICKER);
await client.ws.unsubscribe(WebSocketChannelName.TICKER);
break;
}
});
Expand Down
4 changes: 2 additions & 2 deletions src/demo/websocket-user.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ client.ws.on(WebSocketEvent.ON_MESSAGE_ERROR, errorMessage => {
throw new Error(`${errorMessage.message}: ${errorMessage.reason}`);
});

client.ws.on(WebSocketEvent.ON_OPEN, () => {
client.ws.subscribe(channel);
client.ws.on(WebSocketEvent.ON_OPEN, async () => {
await client.ws.subscribe(channel);
});

client.ws.connect();