In [110]:
import pandas as pd
import numpy as np
import queue
import requests
import json
import asyncio
import websockets
import nest_asyncio
import gzip
import time
import threading
import matplotlib.pyplot as plt 
from matplotlib.ticker import MaxNLocator
import pickle

In [113]:
async def ws_handler(id, url, msg, collective_data, start_event):
    print("ready to start")
    start_event.wait()
    duration = 10  # in secs
    print("start !")
    time.sleep(1)  # stop for main thread to ready
    
    start_time = time.time()
    while time.time() - start_time <= duration :
        try :
            async with websockets.connect(url) as websocket:
                # Send a message
                await websocket.send(json.dumps(msg))
                print(f"Sent: {msg}")
                best_bid = 0
                best_ask = 0

                while time.time() - start_time <= duration :
                    # Receive a response
                    recv = await websocket.recv()
                    
                    if isinstance(recv, bytes) :
                        # for binary data
                        decompress_data = gzip.decompress(recv)
                        response = json.loads(decompress_data.decode("utf-8"))
                        
                        try :
                            ts = time.time_ns()
                            response["local_ts"] = ts
                            new_data =  {"bids" : float(response["tick"]["bids"][0][0]), "asks" : float(response["tick"]["asks"][0][0])}
                            collective_data.put(response)
                            #collective_data.loc[ts] = {"bids" : float(response["tick"]["bids"][0][0]), "asks" : float(response["tick"]["asks"][0][0])}

                        except websockets.exceptions.ConnectionClosedError as e:
                            print(f"Connection failed to connect: {e}")   
                            raise e 
                            
                            
                        except Exception as e :
                            print(f"error {e} at htx")
                        
                            if "ping" in response.keys():
                                # Extract the ping timestamp
                                ping_timestamp = response["ping"]
                                
                                # Create and send the pong response
                                pong_message = {"pong": ping_timestamp}
                                await websocket.send(json.dumps(pong_message))
                                print(f"Sent pong msg: {pong_message}")
                            

                    else :
                        # other exchanges
                        response = json.loads(recv)    
                        # print(f"Received at {id}")
                        # print(response) 
                        
                        try :
                            ts = time.time_ns()
                            response["local_ts"] = ts
                            if id == "gateio" :
                                new_data =  {"bids" : float(response["result"]["b"]), "asks" : float(response["result"]["a"])}
                                collective_data.put(response)
                                #collective_data.loc[ts] = {"bids" : float(response["result"]["b"]), "asks" : float(response["result"]["a"])}
                            elif id == "bybit":
                                
                                
                                try :
                                    response["data"]['a'][0][0] =  float(response["data"]['a'][0][0])
                                    response["data"]['b'][0][0] =  float(response["data"]['b'][0][0])
                                    #collective_data.loc[ts] = {"bids" : float(best_bid), "asks" : float(best_ask)}
                                    #new_data = {"bids" : best_bid, "asks" : best_ask}
                                    collective_data.put(response)
                                except IndexError as e :
                                    if response["data"]['b'] != []:
                                        response["data"]['b'][0][0] =  float(response["data"]['b'][0][0])
                                    if  response["data"]['a'] != []:
                                        response["data"]['a'][0][0] =  float(response["data"]['a'][0][0])
                                    
                                    # new_data = {"bids" : float(best_bid), "asks" : float(best_ask)}
                                    collective_data.put(response)
                                    #collective_data.loc[ts] = {"bids" : float(best_bid), "asks" : float(best_ask)}
                                    
                            elif id == "bitget":
                                response["data"][0]['bids'][0][0] = float(response["data"][0]['bids'][0][0])
                                response["data"][0]['asks'][0][0] = float(response["data"][0]['asks'][0][0])
                                #new_data =  {"bids" : float(response["data"][0]['bids'][0][0]), "asks" :  float(response["data"][0]['asks'][0][0])}
                                collective_data.put(response)
                                #collective_data.loc[ts] = {"bids" : float(response["data"][0]['bids'][0][0]), "asks" :  float(response["data"][0]['asks'][0][0])}

                        except websockets.exceptions.ConnectionClosedError as e:
                            print(f"Connection failed to start: {e}")   
                            raise e 

                        except Exception as e :
                            print(f"error {e} at {id} and pass ")
                                    
        except websockets.exceptions.ConnectionClosedError as e:
            print(f"reconnect for {id}")   
            
                
    print(f"{id} finish")
    return

def send_websocket_request(thread_id, ws_url, message, df, start_event):
    asyncio.run(ws_handler(thread_id, ws_url, message, df, start_event))


build WS connection for each exchange

In [118]:
# run for 5 mins 
# threading
nest_asyncio.apply()

start_event = threading.Event()
ts = time.time_ns()
target_currency = "BTC"
base_currency = "USDT"
threads = []
ids = ["bitget", "htx", "gateio", "bybit"]
urls = [ "wss://ws.bitget.com/v2/ws/public", 
         "wss://api.hbdm.com/linear-swap-ws",
         "wss://fx-ws.gateio.ws/v4/ws/usdt",
         "wss://stream.bybit.com/v5/public/linear"
        ] 
msgs = [{"op" :"subscribe", "args" : [ {"instType" : f"{base_currency}-FUTURES", "channel" : "books1", "instId" : f"{target_currency}{base_currency}"}]}, 
        {"sub" :f"market.{target_currency}-{base_currency}.depth.step0", "id" : "test0"},
        {"time" : ts, "channel" : "futures.book_ticker", "event" : "subscribe", "payload" : [f"{target_currency}_{base_currency}"]},
        {"op" :"subscribe", "args" : [f"orderbook.1.{target_currency}{base_currency}"],}
        ] 
dfs = [pd.DataFrame(columns=["local_ts", "bids", "asks"]) for _ in range(4)] # data storage 
sync_queues=  queue.Queue(maxsize=10000) # cross thread sharing queue 
# worker threads to collecting data
for i in range(4):
    
    thread = threading.Thread(target=send_websocket_request, args=(ids[i], urls[i], msgs[i], sync_queues, start_event))
    threads.append(thread)
    thread.start()
    
print("All threads are ready. Starting in 1 seconds...")
time.sleep(1)  # Optional delay before starting
start_event.set()  # Signal threads to start

# # main thread to collecting data
# prod_data_queues = [queue.Queue() for _ in range(4)]  # to store data from worker thread
# cache_data = [ 0 for _ in range(4) ]  # to cache data for each exchange 
start_time = int(time.time())
# duration = 10
try :

    while True :
        time.sleep(5)  # collecting peroid 
        # Write queue data to a binary file
        end_time = int(time.time())
        with open(f"/home/timchen/perp_arb_analysis/data/data_from_{start_time}_to_{end_time}.bin", "wb") as binary_file:
            while not sync_queues.empty():
                data = sync_queues.get()
                pickle.dump(data, binary_file)  # Serialize and write each item to the file
                print(f"Written to file: {data}")
        start_time = end_time
                
        

except KeyboardInterrupt:
    print("stop by user")
except Exception as e :
    print("unexpected error")
    raise e
    

for thread in threads:
    thread.join()

print("All threads have finished.")

ready to start
ready to start
ready to start
ready to start
All threads are ready. Starting in 1 seconds...
start !start !
start !
start !

Sent: {'time': 1733150872904406131, 'channel': 'futures.book_ticker', 'event': 'subscribe', 'payload': ['BTC_USDT']}
Sent: {'sub': 'market.BTC-USDT.depth.step0', 'id': 'test0'}
error 'b' at gateio and pass 
Sent: {'op': 'subscribe', 'args': ['orderbook.1.BTCUSDT']}
error 'tick' at htx
error 'data' at bybit and pass 
Sent: {'op': 'subscribe', 'args': [{'instType': 'USDT-FUTURES', 'channel': 'books1', 'instId': 'BTCUSDT'}]}
error 'data' at bitget and pass 
error 'tick' at htx
Sent pong msg: {'pong': 1733150876851}
Written to file: {'ch': 'market.BTC-USDT.depth.step0', 'ts': 1733150873578, 'tick': {'mrid': 100044068130009, 'id': 1733150873, 'bids': [[96649.8, 125], [96647, 16], [96645.6, 1], [96642.8, 174], [96640.8, 1], [96640.1, 653], [96640, 7], [96635.9, 1], [96634.2, 400], [96633.7, 77], [96631.6, 651], [96631.1, 1], [96630.4, 3], [96629.4, 1], [

In [None]:
# parse binary file
with open("/home/timchen/perp_arb_analysis/data/data_from_1733150652_to_1733150657.bin", "rb") as binary_file:
    while True:
        try:
            data = pickle.load(binary_file)  # Deserialize each item
            print(f"Read from file: {data}")
        except EOFError:
            break

In [None]:
for j in range(sync_queues.qsize()) :
       curr = sync_queues.get()   
       print(curr) 

In [None]:
# check data
# for id, q in zip(ids, prod_data_queues):
#     print(id)
#     print(q.qsize())
# for i in range(4):
#     for j in range(prod_data_queues[i].qsize()) :
#         curr = prod_data_queues[i].get()    
#         if curr["bids"] != 1.0 : 
#             dfs[i] = dfs[i].append(curr, ignore_index=True)
    
#     dfs[i] = dfs[i].set_index("local_ts")
#     print(dfs[i].shape)
#     print("next")


In [None]:
for i in dfs[0].loc[:, "bids"]:
    print(i)

In [None]:
# plot 
for i in range(4):
    combinatio = []
    for j in range(i+1, 4):
        curr_set = (i, j)
        print(ids[i], ids[j])
        
        df = dfs[i]
        df2 = dfs[j]

        # Create subplots
        fig, (ax1, ax2, ax3) = plt.subplots(1, 3, figsize=(21, 7))  # 1 row, 2 columns

        # First subplot
        ax1.plot(df["bids"], label= f"bid at {ids[i]}", linestyle='-', color='blue')
        ax1.plot(df2["asks"], label= f"ask at {ids[j]}", linestyle='--', color='red')
        ax1.set_title(f"bid of {ids[i]} and ask of {ids[j]} analysis")
        ax1.legend()
        ax1.xaxis.set_major_locator(MaxNLocator(25))  # Show up to 10 x-ticks
        ax1.yaxis.set_major_locator(MaxNLocator(25))  # Show up to 10 y-ticks
        ax1.tick_params(axis='x', rotation=45) 

        # Second subplot
        ax2.plot(df2["bids"], label= f"bid at {ids[j]}", linestyle='-', color='red')
        ax2.plot(df["asks"], label= f"ask at {ids[i]}", linestyle='--', color='blue')
        ax2.set_title(f"bid of {ids[j]} and ask of {ids[i]} analysis")
        ax2.legend()
        ax2.xaxis.set_major_locator(MaxNLocator(25))  # Show up to 10 x-ticks
        ax2.yaxis.set_major_locator(MaxNLocator(25))  # Show up to 10 y-ticks
        ax2.tick_params(axis='x', rotation=45) 

        # plot spread
        spread_len = min(df.shape[0], df2.shape[0])
        spread1 = df.iloc[ : , 1].to_numpy() / df2.iloc[ : , 0].to_numpy()
        spread2 = df2.iloc[ :, 1].to_numpy() / df.iloc[ : , 0].to_numpy()
        
        
        # third sublot
        ax3.plot(spread1, label= f"{ids[i]} asks - {ids[j]} bids", linestyle='-', color='blue')
        #ax3.plot(spread2, label= f"{ids[j]} asks - {ids[i]} bids", linestyle='--', color='green')
        # Plot a horizontal line at y=0
        mean1 = round(spread1.mean(), 5)
        #mean2 = round(spread2.mean(), 5)
        ax3.axhline(y=mean1, color="red", linestyle="-", label=f"y=mean of spread-1:{mean1}")
        #3ax3.axhline(y=mean2, color="red", linestyle="-", label=f"y= mean of spread-2:{mean2}")

        ax3.set_title(f"spread of {ids[j]} and {ids[i]} analysis")
        ax3.legend(loc="lower right")
        ax3.xaxis.set_major_locator(MaxNLocator(25))  # Show up to 10 x-ticks
        ax3.yaxis.set_major_locator(MaxNLocator(25))  # Show up to 10 y-ticks
        ax3.tick_params(axis='x', rotation=45) 

        ax1.grid(True, linestyle="--", alpha=0.5)
        ax2.grid(True, linestyle="--", alpha=0.5)
        ax3.grid(True, linestyle="--", alpha=0.5)
        
        
        # plt.plot(df["bids"], label= f"bid at {ids[i]}", linestyle='-', color='blue')
        # plt.plot(df["asks"], label= f"ask at {ids[j]}", linestyle='--', color='red')
        # plt.plot(df2["bids"], label= f"bid at {ids[i]}", linestyle='-', color='green')
        # plt.plot(df2["asks"], label= f"ask at {ids[j]}", linestyle='--', color='black')
        # plt.xticks(rotation=45)
        # plt.legend()
        # plt.title(f"{ids[i]} and {ids[j]}")
        # plt.gca().xaxis.set_major_locator(MaxNLocator(25))  # Show up to 10 x-ticks
        # plt.gca().yaxis.set_major_locator(MaxNLocator(25))  # Show up to 10 y-ticks

        # Show the figure
        plt.tight_layout()
        plt.show()