Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for Bybit spot websocket endpoints #933

Closed
wants to merge 12 commits into from
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* Update: Okcoin moved to v5 API used by OKX
* Bugfix: InfluxDB none type conversions
* New Exchange: GateIO Futures
* Feature: Added support for Bybit spot orderbook and trade websocket endpoints
* Bugfix: Fix instrument types in symbol parsing on Bitmex
* Bugfix: fix crash issue when init symbol data on Kraken Futures

Expand Down
212 changes: 156 additions & 56 deletions cryptofeed/exchanges/bybit.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from yapic import json

from cryptofeed.connection import AsyncConnection, RestEndpoint, Routes, WebsocketEndpoint
from cryptofeed.defines import BID, ASK, BUY, BYBIT, CANCELLED, CANCELLING, CANDLES, FAILED, FILLED, FUNDING, L2_BOOK, LIMIT, LIQUIDATIONS, MAKER, MARKET, OPEN, PARTIAL, SELL, SUBMITTING, TAKER, TRADES, OPEN_INTEREST, INDEX, ORDER_INFO, FILLS, FUTURES, PERPETUAL
from cryptofeed.defines import BID, ASK, BUY, BYBIT, CANCELLED, CANCELLING, CANDLES, FAILED, FILLED, FUNDING, L2_BOOK, LIMIT, LIQUIDATIONS, MAKER, MARKET, OPEN, PARTIAL, SELL, SUBMITTING, TAKER, TRADES, OPEN_INTEREST, INDEX, ORDER_INFO, FILLS, FUTURES, PERPETUAL, SPOT
from cryptofeed.feed import Feed
from cryptofeed.types import OrderBook, Trade, Index, OpenInterest, Funding, OrderInfo, Fill, Candle, Liquidation

Expand All @@ -27,7 +27,7 @@
class Bybit(Feed):
id = BYBIT
websocket_channels = {
L2_BOOK: 'orderBook_200.100ms',
L2_BOOK: '', # Assigned in self.subscribe
TRADES: 'trade',
FILLS: 'execution',
ORDER_INFO: 'order',
Expand All @@ -38,11 +38,15 @@ class Bybit(Feed):
LIQUIDATIONS: 'liquidation'
}
websocket_endpoints = [
WebsocketEndpoint('wss://stream.bybit.com/realtime', channel_filter=(websocket_channels[L2_BOOK], websocket_channels[TRADES], websocket_channels[INDEX], websocket_channels[OPEN_INTEREST], websocket_channels[FUNDING], websocket_channels[CANDLES], websocket_channels[LIQUIDATIONS]), instrument_filter=('QUOTE', ('USD',)), sandbox='wss://stream-testnet.bybit.com/realtime', options={'compression': None}),
WebsocketEndpoint('wss://stream.bybit.com/realtime_public', channel_filter=(websocket_channels[L2_BOOK], websocket_channels[TRADES], websocket_channels[INDEX], websocket_channels[OPEN_INTEREST], websocket_channels[FUNDING], websocket_channels[CANDLES], websocket_channels[LIQUIDATIONS]), instrument_filter=('QUOTE', ('USDT',)), sandbox='wss://stream-testnet.bybit.com/realtime_public', options={'compression': None}),
WebsocketEndpoint('wss://stream.bybit.com/realtime_private', channel_filter=(websocket_channels[ORDER_INFO], websocket_channels[FILLS]), instrument_filter=('QUOTE', ('USDT',)), sandbox='wss://stream-testnet.bybit.com/realtime_private', options={'compression': None}),
WebsocketEndpoint('wss://stream.bybit.com/realtime', channel_filter=(websocket_channels[L2_BOOK], websocket_channels[TRADES], websocket_channels[INDEX], websocket_channels[OPEN_INTEREST], websocket_channels[FUNDING], websocket_channels[CANDLES], websocket_channels[LIQUIDATIONS]), instrument_filter=("QUOTE", ("USD",)), sandbox="wss://stream-testnet.bybit.com/realtime", options={"compression": None}),
WebsocketEndpoint('wss://stream.bybit.com/realtime_public', instrument_filter=('TYPE', (FUTURES, PERPETUAL)), channel_filter=(websocket_channels[L2_BOOK], websocket_channels[TRADES], websocket_channels[INDEX], websocket_channels[OPEN_INTEREST], websocket_channels[FUNDING], websocket_channels[CANDLES], websocket_channels[LIQUIDATIONS]), sandbox='wss://stream-testnet.bybit.com/realtime_public', options={'compression': None}),
WebsocketEndpoint('wss://stream.bybit.com/realtime_private', instrument_filter=('TYPE', (FUTURES, PERPETUAL)), channel_filter=(websocket_channels[ORDER_INFO], websocket_channels[FILLS]), sandbox='wss://stream-testnet.bybit.com/realtime_private', options={'compression': None}),
WebsocketEndpoint('wss://stream.bybit.com/spot/public/v3', instrument_filter=('TYPE', (SPOT,)), channel_filter=(websocket_channels[L2_BOOK], websocket_channels[TRADES]), sandbox='wss://stream-testnet.bybit.com/spot/public/v3', options={'compression': None}),
]
rest_endpoints = [
RestEndpoint('https://api.bybit.com', routes=Routes('/v2/public/symbols')),
RestEndpoint('https://api.bybit.com', instrument_filter=('TYPE', (SPOT,)), routes=Routes('/spot/v3/public/symbols'))
]
rest_endpoints = [RestEndpoint('https://api.bybit.com', routes=Routes('/v2/public/symbols'))]
valid_candle_intervals = {'1m', '3m', '5m', '15m', '30m', '1h', '2h', '4h', '6h', '1d', '1w', '1M'}
candle_interval_map = {'1m': '1', '3m': '3', '5m': '5', '15m': '15', '30m': '30', '1h': '60', '2h': '120', '4h': '240', '6h': '360', '1d': 'D', '1w': 'W', '1M': 'M'}

Expand All @@ -58,23 +62,45 @@ def _parse_symbol_data(cls, data: dict) -> Tuple[Dict, Dict]:
ret = {}
info = defaultdict(dict)

# PERPETUAL & FUTURES
for symbol in data['result']:
base = symbol['base_currency']
quote = symbol['quote_currency']
for msg in data:
if isinstance(msg['result'], dict):
# SPOT
for symbol in msg['result']['list']:
base = symbol['baseCoin']
quote = symbol['quoteCoin']

stype = SPOT
expiry = None

stype = PERPETUAL
expiry = None
if not symbol['name'].endswith(quote):
stype = FUTURES
year = symbol['name'].replace(base + quote, '')[-2:]
expiry = year + symbol['alias'].replace(base + quote, '')[-4:]
s = Symbol(base, quote, type=stype, expiry_date=expiry)

s = Symbol(base, quote, type=stype, expiry_date=expiry)
# Bybit spot and perps share the same symbol['name'], so
# here it is formed using the base and quote coins, separated
# by a slash. This is consistent with the UI.
ret[s.normalized] = f'{base}/{quote}'
info['tick_size'][s.normalized] = symbol['minPricePrecision']
info['instrument_type'][s.normalized] = stype

ret[s.normalized] = symbol['name']
info['tick_size'][s.normalized] = symbol['price_filter']['tick_size']
info['instrument_type'][s.normalized] = stype
else:
# PERPETUAL & FUTURES
for symbol in msg['result']:
base = symbol['base_currency']
quote = symbol['quote_currency']

stype = PERPETUAL
expiry = None
if not symbol['name'].endswith(quote):
stype = FUTURES
year = symbol['name'].replace(base + quote, '')[-2:]
expiry = year + symbol['alias'].replace(base + quote, '')[-4:]

s = Symbol(base, quote, type=stype, expiry_date=expiry)

ret[s.normalized] = symbol['name']
info['tick_size'][s.normalized] = symbol['price_filter'][
'tick_size'
]
info['instrument_type'][s.normalized] = stype

return ret, info

Expand Down Expand Up @@ -162,18 +188,34 @@ async def message_handler(self, msg: str, conn, timestamp: float):

if "success" in msg:
if msg['success']:
if msg['request']['op'] == 'auth':
LOG.debug("%s: Authenticated successful", conn.uuid)
elif msg['request']['op'] == 'subscribe':
LOG.debug("%s: Subscribed to channels: %s", conn.uuid, msg['request']['args'])
if 'request' in msg:
# PERPETUAL & FUTURES
if msg['request']['op'] == 'auth':
LOG.debug("%s: Authenticated successful", conn.uuid)
elif msg['request']['op'] == 'subscribe':
LOG.debug("%s: Subscribed to channels: %s", conn.uuid, msg['request']['args'])
else:
LOG.warning("%s: Unhandled 'successs' message received", conn.uuid)
else:
LOG.warning("%s: Unhandled 'successs' message received", conn.uuid)
# SPOT
if msg["op"] == "auth":
LOG.debug("%s: Authenticated successful", conn.uuid)
elif msg["op"] == "subscribe":
LOG.debug("%s: Subscribed to channel", conn.uuid)
else:
LOG.warning(
"%s: Unhandled 'successs' message received", conn.uuid
)

else:
LOG.error("%s: Error from exchange %s", conn.uuid, msg)
elif msg["topic"].startswith('trade'):
await self._trade(msg, timestamp)
elif msg["topic"].startswith('orderBook'):
await self._book(msg, timestamp)
elif msg["topic"].startswith('orderbook'):
# Spot orderbook
await self._book(msg, timestamp)
elif msg['topic'].startswith('liquidation'):
await self._liquidation(msg, timestamp)
elif "instrument_info" in msg["topic"]:
Expand All @@ -196,9 +238,20 @@ async def subscribe(self, connection: AsyncConnection):
for pair in connection.subscription[chan]:
sym = str_to_symbol(self.exchange_symbol_to_std_symbol(pair))

if sym.type == SPOT:
# Convert back to standard form
pair = "".join(pair.split("/"))

if self.exchange_channel_to_std(chan) == CANDLES:
c = chan if sym.quote == 'USD' else 'candle'
sub = [f"{c}.{self.candle_interval_map[self.candle_interval]}.{pair}"]
elif self.exchange_channel_to_std(chan) == L2_BOOK:
l2_book_channel = {
SPOT: "orderbook.40",
FUTURES: "orderBook_200.100ms",
PERPETUAL: "orderBook_200.100ms",
}
sub = [f"{l2_book_channel[sym.type]}.{pair}"]
else:
sub = [f"{chan}.{pair}"]

Expand Down Expand Up @@ -342,21 +395,47 @@ async def _trade(self, msg: dict, timestamp: float):
"cross_seq":163261271}]}
"""
data = msg['data']
for trade in data:
if isinstance(trade['trade_time_ms'], str):
ts = int(trade['trade_time_ms'])
else:
ts = trade['trade_time_ms']
if isinstance(data, list):
# PERPETUAL & FUTURES
for trade in data:
if isinstance(trade['trade_time_ms'], str):
ts = int(trade['trade_time_ms'])
else:
ts = trade['trade_time_ms']

t = Trade(
self.id,
self.exchange_symbol_to_std_symbol(trade['symbol']),
BUY if trade['side'] == 'Buy' else SELL,
Decimal(trade['size']),
Decimal(trade['price']),
self.timestamp_normalize(ts),
id=trade['trade_id'],
raw=trade
)
await self.callback(TRADES, t, timestamp)
else:
# SPOT
exchange_symbol = msg["topic"].split(".")[-1]

# Need to match to a symbol with a slash (per spot)
adj_symbol = self.exchange_symbol_to_std_symbol(exchange_symbol)

if "PERP" in adj_symbol:
# Convert to spot symbol
adj_symbol = "/".join(adj_symbol.split("-")[:-1])

ts = data["t"]
side = BUY if data["m"] else SELL
t = Trade(
self.id,
self.exchange_symbol_to_std_symbol(trade['symbol']),
BUY if trade['side'] == 'Buy' else SELL,
Decimal(trade['size']),
Decimal(trade['price']),
self.exchange_symbol_to_std_symbol(adj_symbol),
side,
Decimal(data["q"]),
Decimal(data["p"]),
self.timestamp_normalize(ts),
id=trade['trade_id'],
raw=trade
id=data["v"],
raw=msg,
)
await self.callback(TRADES, t, timestamp)

Expand All @@ -366,35 +445,56 @@ async def _book(self, msg: dict, timestamp: float):
data = msg['data']
delta = {BID: [], ASK: []}

if update_type == 'snapshot':
if msg["topic"].startswith("orderbook"):
# SPOT
if "PERP" in pair:
# Convert to spot symbol
pair = "-".join(pair.split("-")[:-1])

sym = str_to_symbol(pair)

if sym.type == SPOT:
# Spot book snapshot
delta = None

self._l2_book[pair] = OrderBook(self.id, pair, max_depth=self.max_depth)
# the USDT perpetual data is under the order_book key

self._l2_book[pair].book[BID] = {Decimal(price): size for price, size in data["b"]}
self._l2_book[pair].book[ASK] = {Decimal(price): size for price, size in data["a"]}

else:
# PERPETUAL & FUTURES
if update_type == 'snapshot':
delta = None
self._l2_book[pair] = OrderBook(self.id, pair, max_depth=self.max_depth)
# the USDT perpetual data is under the order_book key
if 'order_book' in data:
data = data['order_book']

for update in data:
side = BID if update['side'] == 'Buy' else ASK
self._l2_book[pair].book[side][Decimal(update['price'])] = Decimal(update['size'])
else:
for delete in data['delete']:
side = BID if delete['side'] == 'Buy' else ASK
price = Decimal(delete['price'])
delta[side].append((price, 0))
del self._l2_book[pair].book[side][price]

for utype in ('update', 'insert'):
for update in data[utype]:
for update in data:
side = BID if update['side'] == 'Buy' else ASK
price = Decimal(update['price'])
amount = Decimal(update['size'])
delta[side].append((price, amount))
self._l2_book[pair].book[side][price] = amount
self._l2_book[pair].book[side][Decimal(update['price'])] = Decimal(update['size'])

# timestamp is in microseconds
ts = msg['timestamp_e6']
else:
for delete in data['delete']:
side = BID if delete['side'] == 'Buy' else ASK
price = Decimal(delete['price'])
delta[side].append((price, 0))
del self._l2_book[pair].book[side][price]

for utype in ('update', 'insert'):
for update in data[utype]:
side = BID if update['side'] == 'Buy' else ASK
price = Decimal(update['price'])
amount = Decimal(update['size'])
delta[side].append((price, amount))
self._l2_book[pair].book[side][price] = amount

# timestamp_e6 is in microseconds
ts = msg['timestamp_e6'] if 'timestamp_e6' in msg else msg['data']['t'] * 1e3
if isinstance(ts, str):
ts = int(ts)

await self.book_callback(L2_BOOK, self._l2_book[pair], timestamp, timestamp=ts / 1000000, raw=msg, delta=delta)

async def _order(self, msg: dict, timestamp: float):
Expand Down