In [None]:
from IPython.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

In [1]:
import websocket
import threading
import requests
import json
import datetime
import time
import pandas as pd
import json
import numpy as np
from IPython.display import clear_output

# dYdX

In [None]:
class dYdXSocketOrderbook:
    def __init__(self, instrument): # BTC-USD
        self.socket_opened = False
        
        self.orderbook = {'bid': {}, 'ask': {}}
        
        self.ws = websocket.WebSocketApp(
            'wss://api.dydx.exchange/v3/ws',
            on_open=self._on_open,
            on_close=self._on_close,
            on_error=self._on_error, 
            on_message=self._on_message
        )
        self.wst = threading.Thread(target=self.ws.run_forever)
        self.wst.daemon = True
        self.wst.start()
        
        while not self.socket_opened:
            pass
        
        self.ws.send(json.dumps({'type': 'subscribe', 'channel': 'v3_orderbook', 'id': instrument, 'includeOffsets': True}))
        
    def _on_open(self, ws):
        self.socket_opened = True
        print("dYdX connection opened")


    def _on_close(self, ws):
        self.socket_opened = False
        print("dYdX closed connection")


    def _on_error(self, ws, error):
        print(error)


    def _on_message(self, ws, message):
        message = json.loads(message)
        if message['type'] == 'subscribed': # initialize orderbook
            ask_list = message['contents']['asks']
            bid_list = message['contents']['bids']

            for ask in ask_list:
                self.orderbook['ask'][ask['price']] = [ask['size'], ask['offset']]
            for bid in bid_list:
                self.orderbook['bid'][bid['price']] = [bid['size'], bid['offset']]

        else: # update orderbook
            offset = message['contents']['offset']
            for ask in message['contents']['asks']:
                if ask[0] in self.orderbook['ask']:
                    if offset > self.orderbook['ask'][ask[0]][1]:
                        self.orderbook['ask'][ask[0]] = [ask[1], offset]
                else:
                    self.orderbook['ask'][ask[0]] = [ask[1], offset]

            for bid in message['contents']['bids']:
                if bid[0] in self.orderbook['bid']:
                    if offset > self.orderbook['bid'][bid[0]][1]:
                        self.orderbook['bid'][bid[0]] = [bid[1], offset]
                else:
                    self.orderbook['bid'][bid[0]] = [bid[1], offset]
                    
    def get_bid_ask(self):
        temp = self.orderbook # create snapshot
        try:
            bid = sorted([float(price) for price in temp['bid'] if temp['bid'][price][0] != '0'])[-1]
            ask = sorted([float(price) for price in temp['ask'] if temp['ask'][price][0] != '0'])[0]
        except:
            bid, ask = np.nan, np.nan
        return bid, ask
    
class dYdXSocketFunding:
    def __init__(self):
        self.funding_rate = {}
        self.thread = threading.Thread(target=self.get_funding_rate_continuously)
        self.thread.daemon = True
        self.thread.start()
        
    def get_funding_rate_continuously(self):
        while True:
            res = requests.get('https://api.dydx.exchange/v3/markets')
            message = json.loads(res.text)['markets']
            self.funding_rate = {t: [float(message[t]['nextFundingRate']), datetime_string_to_timestamp(message[t]['nextFundingAt'])] for t in message}
            time.sleep(1)
            
def datetime_string_to_timestamp(t):
    timestamp = datetime.datetime.timestamp(datetime.datetime.strptime(t[:-5], '%Y-%m-%dT%H:%M:%S') + datetime.timedelta(hours=8)) # in UTC time
    return int(timestamp*1000) # seconds to milliseconds

# ApolloX

In [None]:
class ApolloXSocketOrderbook: # https://apollox-finance.gitbook.io/apollox-finance/api/api-documentation#diff.-book-depth-streams
    def __init__(self, instrument):
        self.socket_opened = False
        
        self.orderbook = {'bid': {}, 'ask': {}}
        
        self.ws = websocket.WebSocketApp(
            f'wss://fstream.apollox.finance/stream?streams={instrument.lower()}@depth',
            on_open=self._on_open,
            on_close=self._on_close,
            on_error=self._on_error, 
            on_message=self._on_message
        )
        self.wst = threading.Thread(target=self.ws.run_forever)
        self.wst.daemon = True
        self.wst.start()
        
        while not self.socket_opened:
            pass
        
        self.last_update_id = -1
        self.stream_previous_u = -1
        self.new_snapshot = False
        self.snapshot_url = f'https://fapi.apollox.finance/fapi/v1/depth?symbol={instrument}&limit=1000'
        
    def _on_open(self, ws):
        self.socket_opened = True
        print("ApolloX connection opened")


    def _on_close(self, ws):
        self.socket_opened = False
        print("ApolloX closed connection")


    def _on_error(self, ws, error):
        print(error)


    def _on_message(self, ws, message):
        self.message = message
        
        message = json.loads(message)
        upper_u = message['data']['U']
        lower_u = message['data']['u']
        previous_u = message['data']['pu']
        #print(upper_u, lower_u, self.stream_previous_u, self.last_update_id)
        
        if previous_u != self.stream_previous_u:
            self.stream_previous_u = lower_u
            self.get_snapshot()
            return
        else:
            self.stream_previous_u = lower_u
            
        if self.new_snapshot:
            if not (upper_u <= self.last_update_id and lower_u >= self.last_update_id):
                self.get_snapshot()
                return

        self.new_snapshot = False
        
        for price, size in message['data']['b']:
            price = float(price)
            size = float(size)
            if size == 0:
                if price in self.orderbook['bid']:
                    self.orderbook['bid'].pop(price)
            else:
                self.orderbook['bid'][price] = size

        for price, size in message['data']['a']:
            price = float(price)
            size = float(size)
            if size == 0:
                if price in self.orderbook['ask']:
                    self.orderbook['ask'].pop(price)
            else:
                self.orderbook['ask'][price] = size
                    
    def get_snapshot(self):
        print('snapshot, reset orderbook')
        self.new_snapshot = True
        res = requests.get(self.snapshot_url)
        message = json.loads(res.text)
        
        self.last_update_id = message['lastUpdateId']
        self.orderbook['bid'] = {float(x[0]): float(x[1]) for x in message['bids']}
        self.orderbook['ask'] = {float(x[0]): float(x[1]) for x in message['asks']}
                    
    def get_bid_ask(self):
        try:
            bid = sorted(self.orderbook['bid'].keys())[-1]
            ask = sorted(self.orderbook['ask'].keys())[0]
        except:
            bid, ask = np.nan, np.nan
        return bid, ask
    
class ApolloXSocketFunding:
    def __init__(self):
        self.socket_opened = False
        
        self.funding_rate = {}
        
        self.ws = websocket.WebSocketApp(
            f'wss://fstream.apollox.finance/stream?streams=!markPrice@arr@1s',
            on_open=self._on_open,
            on_close=self._on_close,
            on_error=self._on_error, 
            on_message=self._on_message
        )
        self.wst = threading.Thread(target=self.ws.run_forever)
        self.wst.daemon = True
        self.wst.start()
        
    def _on_open(self, ws):
        self.socket_opened = True
        print("ApolloX connection opened")


    def _on_close(self, ws):
        self.socket_opened = False
        print("ApolloX closed connection")


    def _on_error(self, ws, error):
        print(error)

    def _on_message(self, ws, message):
        message = json.loads(message)['data']
        self.funding_rate = {m['s']: [float(m['r']), m['T']] for m in message}

# Bybit

In [None]:
class BybitSocketOrderbook:
    def __init__(self, instrument): 
        self.socket_opened = False
        
        self.orderbook = {'bid': {}, 'ask': {}}
        
        self.ws = websocket.WebSocketApp(
            'wss://stream.bybit.com/v5/public/linear',
            on_open=self._on_open,
            on_close=self._on_close,
            on_error=self._on_error, 
            on_message=self._on_message
        )
        self.wst = threading.Thread(target=self.ws.run_forever)
        self.wst.daemon = True
        self.wst.start()
        
        while not self.socket_opened:
            pass
        
        self.ws.send(json.dumps({"req_id": "test", "op": "subscribe", "args": [f'orderbook.50.{instrument}']}))
        
    def _on_open(self, ws):
        self.socket_opened = True
        print("Bybit connection opened")


    def _on_close(self, ws):
        self.socket_opened = False
        print("Bybit closed connection")


    def _on_error(self, ws, error):
        print(error)


    def _on_message(self, ws, message):
        message = json.loads(message)

        if message["type"] == "snapshot": # initialize orderbook (or overwrite it if it already exists)
            self.orderbook['bid'] = {float(x[0]): float(x[1]) for x in message['data']['b']}
            self.orderbook['ask'] = {float(x[0]): float(x[1]) for x in message['data']['a']}
        else: # Update the order book message with the new delta update
            for price, size in message['data']['b']:
                price = float(price)
                size = float(size)
                if size == 0:
                    if price in self.orderbook['bid']:
                        self.orderbook['bid'].pop(price)
                else:
                    self.orderbook['bid'][price] = size

            for price, size in message['data']['a']:
                price = float(price)
                size = float(size)
                if size == 0:
                    if price in self.orderbook['ask']:
                        self.orderbook['ask'].pop(price)
                else:
                    self.orderbook['ask'][price] = size
        
    def get_bid_ask(self):
        try:
            bid = sorted(self.orderbook['bid'].keys())[-1] # highest bid
            ask = sorted(self.orderbook['ask'].keys())[0] # lowest ask
        except:
            bid, ask = np.nan, np.nan
        return bid, ask
    
    
class BybitSocketFunding:
    def __init__(self, instrument):
        self.socket_opened = False
        self.funding_rate = []
        
        self.ws = websocket.WebSocketApp(
           'wss://stream.bybit.com/v5/public/linear',
            on_open=self._on_open,
            on_close=self._on_close,
            on_error=self._on_error, 
            on_message=self._on_message
        )
        self.wst = threading.Thread(target=self.ws.run_forever)
        self.wst.daemon = True
        self.wst.start()

        while not self.socket_opened:
            pass
        
        self.ws.send(json.dumps({"req_id": "test", "op": "subscribe", "args": [f'tickers.{instrument}']}))
        
    def _on_open(self, ws):
        self.socket_opened = True
        print("Bybit connection opened")


    def _on_close(self, ws):
        self.socket_opened = False
        print("Bybit closed connection")

    def _on_error(self, ws, error):
        print(error)
        
    def _on_message(self, ws, message):
        message = json.loads(message)['data']['fundingRate']
        self.funding_rate = [message]


In [None]:
dydx_socket_orderbook = dYdXSocketOrderbook('BTC-USD')
dydx_socket_funding = dYdXSocketFunding()
apollox_socket_orderbook = ApolloXSocketOrderbook('BTCUSDT')
apollox_socket_funding = ApolloXSocketFunding()
bybit_socket_orderbook = BybitSocketOrderbook('BTCUSDT')
bybit_socket_funding = BybitSocketFunding('BTCUSDT')

In [None]:
data = pd.DataFrame(columns = [
    'dydx_bid', 'dydx_ask', 'dydx_predicted_funding_rate', 'dydx_next_funding_rate_time',
    'apollox_bid', 'apollox_ask', 'apollox_predicted_funding_rate', 'apollox_next_funding_rate_time',
    'bybit_bid', 'bybit_ask', 'bybit_predicted_funding_rate'
])
filename = str(datetime.datetime.now()).replace(':', ';')[:-7]

while True:
    data.loc[str(datetime.datetime.now()).replace(':', ';')[:-7]] = [
        *dydx_socket_orderbook.get_bid_ask(), #tuple (bid, ask)
        *dydx_socket_funding.funding_rate['BTC-USD'], #list [funding rate, funding time], original dict: {'BTC-USD': [funding rate, next funding rate time]}
        *apollox_socket_orderbook.get_bid_ask(),
        *apollox_socket_funding.funding_rate['BTCUSDT'],
        *bybit_socket_orderbook.get_bid_ask(),
        *bybit_socket_funding.funding_rate
    ]
    
    data.to_csv(rf"C:\Users\User\Desktop\dydx_cafg\live data\{filename}.csv")
    
    if len(data) >= 3600:
        filename = str(datetime.datetime.now()).replace(':', ';')[:-7]
        data = pd.DataFrame(columns = [
            'dydx_bid', 'dydx_ask', 'dydx_predicted_funding_rate', 'dydx_next_funding_rate_time',
            'apollox_bid', 'apollox_ask', 'apollox_predicted_funding_rate', 'apollox_next_funding_rate_time'
        ])
    
    time.sleep(1)