Skip to content

Commit

Permalink
feat(bybit-ws-public): re-subscribe topics on disconnect
Browse files Browse the repository at this point in the history
  • Loading branch information
iam4x committed Apr 12, 2023
1 parent db314bf commit e56a138
Showing 1 changed file with 22 additions and 7 deletions.
29 changes: 22 additions & 7 deletions src/exchanges/bybit/bybit.ws-public.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { proxy } from '@iam4x/valtio/dist/vanilla';
import BigNumber from 'bignumber.js';
import { flatten } from 'lodash';

import type { Candle, OHLCVOptions, OrderBook } from '../../types';
import { BaseWebSocket } from '../base.ws';
Expand All @@ -12,14 +13,24 @@ type MessageHandlers = {
[topic: string]: (json: Data) => void;
};

type SubscribedTopics = {
[id: string]: string[] | string;
};

export class BybitPublicWebsocket extends BaseWebSocket<Bybit> {
topics: SubscribedTopics = {};
messageHandlers: MessageHandlers = {
instrument_info: (d: Data) => this.handleInstrumentInfoEvents(d),
pong: () => this.handlePongEvent(),
};

connectAndSubscribe = () => {
if (!this.isDisposed) {
// add instrument_info topics to subscribe on
this.topics.instrumentInfos = this.parent.store.markets.map(
(m) => `instrument_info.100ms.${m.symbol}`
);

this.ws = new WebSocket(
BASE_WS_URL.public[this.parent.options.testnet ? 'testnet' : 'livenet']
);
Expand All @@ -45,13 +56,8 @@ export class BybitPublicWebsocket extends BaseWebSocket<Bybit> {
};

subscribe = () => {
const payload = {
op: 'subscribe',
args: this.parent.store.markets.map(
(m) => `instrument_info.100ms.${m.symbol}`
),
};

const topics = flatten(Object.values(this.topics));
const payload = { op: 'subscribe', args: topics };
this.ws?.send?.(JSON.stringify(payload));
};

Expand Down Expand Up @@ -132,6 +138,9 @@ export class BybitPublicWebsocket extends BaseWebSocket<Bybit> {
const payload = { op: 'subscribe', args: [topic] };
this.ws?.send?.(JSON.stringify(payload));
this.parent.log(`Switched to [${opts.symbol}:${opts.interval}]`);

// store subscribed topic to re-subscribe on reconnect
this.topics.ohlcv = topic;
}
} else {
setTimeout(() => waitForConnectedAndSubscribe(), 100);
Expand All @@ -147,6 +156,7 @@ export class BybitPublicWebsocket extends BaseWebSocket<Bybit> {
}

delete this.messageHandlers[topic];
delete this.topics.ohlcv;
};
};

Expand Down Expand Up @@ -232,6 +242,9 @@ export class BybitPublicWebsocket extends BaseWebSocket<Bybit> {

const payload = { op: 'subscribe', args: [topic] };
this.ws?.send?.(JSON.stringify(payload));

// store subscribed topic to re-subscribe on reconnect
this.topics.orderBook = topic;
}
} else {
setTimeout(() => waitForConnectedAndSubscribe(), 100);
Expand All @@ -247,6 +260,8 @@ export class BybitPublicWebsocket extends BaseWebSocket<Bybit> {
}

delete this.messageHandlers[topic];
delete this.topics.orderBook;

orderBook.asks = [];
orderBook.bids = [];
};
Expand Down

0 comments on commit e56a138

Please sign in to comment.