-
Notifications
You must be signed in to change notification settings - Fork 0
/
OrderbookWebocketClient.ts
101 lines (86 loc) · 2.93 KB
/
OrderbookWebocketClient.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import { Subject } from 'rxjs';
import Config from 'core/Config';
import WebSocketClient from 'core/transport/WebSocketClient';
import {
OrderbookSubscribedResponse,
OrderbookFeedType,
OrderbookDeltasResponse,
OrderbookSnapshotResponse,
Orders,
WsMessage
} from 'core/models';
import { updateOrdersWithDeltas, sliceAndSort, descending } from 'core/utils';
/**
* Websocket client which receives and processes new OrderBook messages
*/
class OrderbookWebocketClient extends WebSocketClient {
// Observable stream where we are going to push new order updates from Websocket
ordersFeed = new Subject<Orders | undefined>();
private _orders: Orders | undefined;
constructor() {
super(Config.websocketUrl);
}
/**
* Initialize Websocket communication by sending the start message to the API.
* We need to wait for the connection to be opened in order to start sending the messages.
*/
start = async () => {
const isConnected = this.isConnected()
? true
: await this.init();
if (isConnected) {
const data = {
event: 'subscribe',
feed: 'book_ui_1',
product_ids: ['PI_XBTUSD']
};
this.send(JSON.stringify(data));
this.listenForMessages();
}
};
/**
* Listen for upcomming messages.
*
* Messages can be of different types but we are interested for the initial set of orders
* and deltas, so we can update currently displayed orders
*/
listenForMessages = () => {
this.ws.onmessage = (event) => {
try {
let message: WsMessage = JSON.parse(event.data);
// Initial list of orders
if (this._isInitialList(message)) {
message = message as OrderbookSnapshotResponse;
// Create format which we need for further processing
this._orders = {
asks: sliceAndSort(message.asks, 5, descending),
bids: sliceAndSort(message.bids, 5, descending)
};
// Deltas
} else if (this._orders && this._isDelta(message)) {
message = message as OrderbookDeltasResponse;
// Process deltas and update the current list of orders
this._orders = updateOrdersWithDeltas(this._orders, message);
}
// Push new orders list to observable stream
this.ordersFeed.next(this._orders);
} catch (err) {
console.log('onmessage', err);
}
};
};
/**
* Return true if thee message contains the initial order list
*/
private _isInitialList = (message: WsMessage) => (
(message as OrderbookSnapshotResponse).feed === OrderbookFeedType.OrderbookSnapshot
);
/**
* Return true if thee message contains the deltas
*/
private _isDelta = (message: WsMessage) => (
(message as OrderbookDeltasResponse).feed === OrderbookFeedType.OrderbookDelta
&& !(message as OrderbookSubscribedResponse).event
);
}
export const orderbookWebocketClient = new OrderbookWebocketClient();