In [10]:
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

# Data Import

## SOLUSDT

### Binance Data

In [None]:
# Initialize an empty list to store DataFrame chunks
df_list = []

# Iterate through each month, read the file, and add it to the list
for i in range(1, 12):
    # Load the data for the specific month
    path_r = '../../data/SOLUSDT/Binance/SOLUSDT-trades-2023-{i:02d}.zip'
    temp_df = pd.read_csv(f'../../data/SOLUSDT/Binance/SOLUSDT-trades-2023-{i:02d}.zip', compression='zip')

    #rename the columns
    temp_df_cols = temp_df.columns
    temp_df = temp_df.rename(columns={
    temp_df_cols[0] : 'trade_id' ,
    temp_df_cols[1] : 'price' , 
    temp_df_cols[2] : 'qty',
    temp_df_cols[3] : 'base_qty' , 
    temp_df_cols[4] : 'time'
})

    # Convert the time to datetime
    temp_df['time'] = pd.to_datetime(temp_df['time'], unit='ms')

    df_list.append(temp_df)

# Concatenate all dataframes at once
binance_df = pd.concat(df_list, ignore_index=True)

In [None]:
binance_df

In [None]:
binance_df = binance_df.drop(['trade_id', 'base_qty'], axis=1)
binance_df = binance_df[['time', 'price', 'qty']]

In [None]:
binance_df.head()

In [None]:
#export data
binance_df.to_csv('../data/SOLUSDT/csv/Binance.csv', index=False)

In [None]:
binance_df.to_csv('../../data/SOLUSDT/csv/Binance.csv', index=False)

### Bybit Data

In [None]:
#Same for Bybit
df_list = []

# Iterate through each month, read the file, and add it to the list
for i in range(1, 12):

    path_r = '../data/SOLUSDT/Bybit/SOLUSDT-2023-{i:02d}.csv.gz'
    path_a = '../../data/SOLUSDT/Bybit/SOLUSDT-2023-{i:02d}.csv.gz'
    print(path_r)
    temp_df = pd.read_csv(f'../data/SOLUSDT/Bybit/SOLUSDT-2023-{i:02d}.csv.gz', compression='gzip')

    temp_df_cols = temp_df.columns
    temp_df = temp_df.rename(columns={
    temp_df_cols[0] : 'trade_id' ,
    temp_df_cols[1] : 'time' , 
    temp_df_cols[2] : 'price',
    temp_df_cols[3] : 'qty' , 
    temp_df_cols[4] : 'side'
})
    # Convert the time to datetime
    temp_df['time'] = pd.to_datetime(temp_df['time'], unit='ms')

    df_list.append(temp_df)

# Concatenate all dataframes at once
bybit_df = pd.concat(df_list, ignore_index=True)

In [None]:
bybit_df

In [None]:
bybit_df = bybit_df.drop(['trade_id','side'], axis=1)

In [None]:
bybit_df.to_csv('../data/SOLUSDT/csv/bybit.csv', index=False)

In [None]:
bybit_df.to_csv('../../data/SOLUSDT/csv/bybit.csv', index=False)

### OKX Data

In [None]:
file = pd.read_csv("../../data/OKX/allspot-trades-2023-01-01.zip", compression='zip',sep=',', encoding='ISO-8859-1')

In [None]:
# Assuming the new column names you provided are in the same order as the existing columns
new_column_names = ['instrument_name', 'trade_id', 'side', 'size', 'price', 'created_time']

# Rename the columns
file.columns = new_column_names

# Display the first few rows of the dataframe to confirm the change
print(file.head())

In [None]:
# Filter the rows where 'instrument_name' contains 'SOL'
filtered_df = file[file['instrument_name'].str.contains('SOL')]

# Get the distinct values
distinct_values = filtered_df['instrument_name'].unique()

# Print the distinct values
for value in distinct_values:
    print(value)

In [None]:
# Filter the DataFrame
file = file[file['instrument_name'].str.contains('SOL-USDT')]

# Display the first few rows of the DataFrame to confirm the change
print(file.describe())

In [None]:
df_list = []

for i in range(1, 12):
    for j in range(1, 32):
        try:
            temp_df = pd.read_csv(f'../../data/OKX/allspot-trades-2023-{i:02d}-{j:02d}.zip', compression='zip',sep=',', encoding='ISO-8859-1')

            temp_df_cols = temp_df.columns
            temp_df = temp_df.rename(columns={
            temp_df_cols[0] : 'instrument_name' ,
            temp_df_cols[1] : 'trade_id' , 
            temp_df_cols[2] : 'side',
            temp_df_cols[3] : 'size',
            temp_df_cols[4] : 'price' , 
            temp_df_cols[5] : 'time'
        })
            #Take the trades for SOL-USDT
            temp_df = temp_df[temp_df['instrument_name'].str.contains('SOL-USDT')]

            temp_df['time'] = pd.to_datetime(temp_df['time'], unit='ms')

            df_list.append(temp_df)
        except:
            pass

# Concatenate all dataframes at once
okx_df = pd.concat(df_list, ignore_index=True)

In [None]:
okx_df

In [None]:
#delete trade that aren't in 2023
okx_df = okx_df[okx_df['time'].dt.year == 2023]

In [None]:
okx_df = okx_df.drop(['instrument_name','side', 'trade_id'], axis=1)
#rename size to qty
okx_df = okx_df.rename(columns={
    'size' : 'qty'
})
okx_df = okx_df[['time', 'price', 'qty']]

In [None]:
okx_df.to_csv('../data/csv/okx.csv', index=False)

In [None]:
okx_df.to_csv('../../data/csv/okx.csv', index=False)

##

### KuCoin Data

In [None]:
# Initialize an empty list to store DataFrame chunks
df_list = []

# Iterate through each month, read the file, and add it to the list
for i in range(1, 12):
    for j in range(1, 32):
        try:
            # Load the data for the specific month
            temp_df = pd.read_csv(f'../../data/SOLUSDT/KuCoin/SOLUSDT-trades-2023-{i:02d}-{j:02d}.zip', compression='zip')
            #rename the columns
            temp_df_cols = temp_df.columns
            temp_df = temp_df.rename(columns={
            temp_df_cols[0] : 'trade_id' ,
            temp_df_cols[1] : 'time' , 
            temp_df_cols[2] : 'price',
            temp_df_cols[3] : 'qty' , 
            temp_df_cols[4] : 'side'
        })

            # Convert the time to datetime
            temp_df['time'] = pd.to_datetime(temp_df['time'], unit='ms')

            df_list.append(temp_df)
        except:
            pass

# Concatenate all dataframes at once
kuCoin_df = pd.concat(df_list, ignore_index=True)

In [None]:
kuCoin_df

In [None]:
kuCoin_df = kuCoin_df.drop(['trade_id','side'], axis=1)
kuCoin_df = kuCoin_df[['time', 'price', 'qty']]

In [None]:
kuCoin_df.to_csv('../data/SOLUSDT/csv/kuCoin.csv', index=False)

In [None]:
kuCoin_df.to_csv('../../data/SOLUSDT/csv/kuCoin.csv', index=False)

## BTCUSDT

### Binance Data

In [10]:
# Initialize an empty list to store DataFrame chunks
df_list = []

# Iterate through each month, read the file, and add it to the list
for i in range(1, 12):
    # Load the data for the specific month
    path_r = '../../data/BTCUSDT/Binance/BTCUSDT-trades-2023-{i:02d}.zip'
    temp_df = pd.read_csv(f'../../data/BTCUSDT/Binance/BTCUSDT-trades-2023-{i:02d}.zip', compression='zip')

    #rename the columns
    temp_df_cols = temp_df.columns
    temp_df = temp_df.rename(columns={
    temp_df_cols[0] : 'trade_id' ,
    temp_df_cols[1] : 'price' , 
    temp_df_cols[2] : 'qty',
    temp_df_cols[3] : 'base_qty' , 
    temp_df_cols[4] : 'time'
})

    # Convert the time to datetime
    temp_df['time'] = pd.to_datetime(temp_df['time'], unit='ms')

    df_list.append(temp_df)

# Concatenate all dataframes at once
binance_df = pd.concat(df_list, ignore_index=True)

                                                                                

In [11]:
binance_df

DataFrame[2408185868: bigint, 16541.77000000: double, 0.00257000: double, 42.51234890: double, 1672531200001: bigint, True: boolean, True.1: boolean]

In [None]:
binance_df = binance_df.drop(['trade_id', 'base_qty'], axis=1)
binance_df = binance_df[['time', 'price', 'qty']]

In [15]:
# Define a function to calculate the volume-weighted price and total quantity
def calculate_vwp_and_qty(df, rounded_to='S'):
    # Create a new column 'volume' that represents the volume of each trade
    df['volume'] = df['price'] * df['qty']
    
    # Group by 'time' rounded down to the nearest second
    grouped = df.groupby(df['time'].dt.floor(rounded_to))
    
    # Calculate the volume-weighted price and total quantity for each group
    vwp = (grouped['volume']).sum() / grouped['qty'].sum()
    total_qty = grouped['qty'].sum()
    
    # Return a DataFrame with the 'time', 'price', and 'quantity' columns
    return pd.DataFrame({'time': vwp.index, 'price': vwp.values, 'qty': total_qty.values})

In [None]:
binance_df = calculate_vwp_and_qty(binance_df, 'S')

In [None]:
binance_df

In [None]:
binance_df.to_csv('../../data/BTCUSDT/csv/binance_arbitrage.csv', index=False)

### Bybit Data

In [None]:
df_list = []

for i in range(1, 12):
    path_r = '../data/BTCUSDT/Bybit/BTCUSDT-2023-{i:02d}.csv.gz'
    path_a = '../../data/BTCUSDT/Bybit/BTCUSDT-2023-{i:02d}.csv.gz'
    temp_df = pd.read_csv(f'../../data/BTCUSDT/Bybit/BTCUSDT-2023-{i:02d}.csv.gz', compression='gzip')

    temp_df_cols = temp_df.columns
    temp_df = temp_df.rename(columns={
    temp_df_cols[0] : 'trade_id' ,
    temp_df_cols[1] : 'time' , 
    temp_df_cols[2] : 'price',
    temp_df_cols[3] : 'qty' , 
    temp_df_cols[4] : 'side'
})
    temp_df['time'] = pd.to_datetime(temp_df['time'], unit='ms')

    df_list.append(temp_df)

bybit_df = pd.concat(df_list, ignore_index=True)

In [None]:
bybit_df 

In [None]:
bybit_df = bybit_df.drop(['trade_id','side'], axis=1)

In [None]:
bybit_df = calculate_vwp_and_qty(bybit_df, 'S')

In [None]:
bybit_df

In [None]:
bybit_df.to_csv('../../data/BTCUSDT/csv/bybit_arbitrage.csv', index=False)

### KuCoin Data

In [None]:
df_list = []

for i in range(1, 12):
    for j in range(1, 32):
        try:
            temp_df = pd.read_csv(f'../../data/BTCUSDT/KuCoin/BTCUSDT-trades-2023-{i:02d}-{j:02d}.zip', compression='zip')
            temp_df_cols = temp_df.columns
            temp_df = temp_df.rename(columns={
            temp_df_cols[0] : 'trade_id' ,
            temp_df_cols[1] : 'time' , 
            temp_df_cols[2] : 'price',
            temp_df_cols[3] : 'qty' , 
            temp_df_cols[4] : 'side'
        })

            temp_df['time'] = pd.to_datetime(temp_df['time'], unit='ms')

            df_list.append(temp_df)
        except:
            pass

kuCoin_df = pd.concat(df_list, ignore_index=True)

In [None]:
kuCoin_df = kuCoin_df.drop(['trade_id','side'], axis=1)
kuCoin_df = kuCoin_df[['time', 'price', 'qty']]

In [None]:
kuCoin_df = calculate_vwp_and_qty(kuCoin_df, 'S')

In [None]:
kuCoin_df

In [None]:
kuCoin_df.to_csv('../../data/BTCUSDT/csv/kuCoin_arbitrage.csv', index=False)

### OKX Data

In [2]:
df_list = []

for i in range(1, 12):
    for j in range(1, 32):
        try:
            temp_df = pd.read_csv(f'../../data/OKX/allspot-trades-2023-{i:02d}-{j:02d}.zip', compression='zip',sep=',', encoding='ISO-8859-1')
            temp_df_cols = temp_df.columns
            temp_df = temp_df.rename(columns={
            temp_df_cols[0] : 'instrument_name' ,
            temp_df_cols[1] : 'trade_id' , 
            temp_df_cols[2] : 'side',
            temp_df_cols[3] : 'size',
            temp_df_cols[4] : 'price' , 
            temp_df_cols[5] : 'time'
        })
            
            temp_df = temp_df[temp_df['instrument_name'].str.contains('BTC-USDT')]
            temp_df['time'] = pd.to_datetime(temp_df['time'], unit='ms')

            df_list.append(temp_df)
        except:
            pass

okx_df = pd.concat(df_list, ignore_index=True)

In [3]:
okx_df = okx_df[okx_df['time'].dt.year == 2023]

In [4]:
okx_df = okx_df.drop(['instrument_name','side', 'trade_id'], axis=1)
okx_df = okx_df.rename(columns={
    'size' : 'qty'
})
okx_df = okx_df[['time', 'price', 'qty']]

In [7]:
okx_df = calculate_vwp_and_qty(okx_df, 'S')

In [9]:
okx_df

Unnamed: 0,time,price,qty
0,2023-01-01 00:00:02,16546.195307,0.009271
1,2023-01-01 00:00:04,16546.100000,0.029228
2,2023-01-01 00:00:05,16546.200000,0.000604
3,2023-01-01 00:00:06,16546.152305,0.001155
4,2023-01-01 00:00:07,16546.200000,0.006044
...,...,...,...
11435558,2023-11-30 15:59:54,37610.173981,0.122344
11435559,2023-11-30 15:59:56,37611.350620,0.001443
11435560,2023-11-30 15:59:57,37612.900000,0.010400
11435561,2023-11-30 15:59:58,37612.900000,0.020280


In [8]:
okx_df.to_csv('../../data/BTCUSDT/csv/okx_arbitrage.csv', index=False)

## SOLBTC

### Binance Data

In [12]:
# Initialize an empty list to store DataFrame chunks
df_list = []

# Iterate through each month, read the file, and add it to the list
for i in range(1, 12):
    # Load the data for the specific month
    path_r = '../../data/SOLBTC/SOLBTC-trades-2023-{i:02d}.zip'
    temp_df = pd.read_csv(f'../../data/SOLBTC/SOLBTC-trades-2023-{i:02d}.zip', compression='zip')

    #rename the columns
    temp_df_cols = temp_df.columns
    temp_df = temp_df.rename(columns={
    temp_df_cols[0] : 'trade_id' ,
    temp_df_cols[1] : 'price' , 
    temp_df_cols[2] : 'qty',
    temp_df_cols[3] : 'base_qty' , 
    temp_df_cols[4] : 'time'
})

    # Convert the time to datetime
    temp_df['time'] = pd.to_datetime(temp_df['time'], unit='ms')

    df_list.append(temp_df)

# Concatenate all dataframes at once
binance_df = pd.concat(df_list, ignore_index=True)

In [13]:
binance_df = binance_df.drop(['trade_id', 'base_qty'], axis=1)
binance_df = binance_df[['time', 'price', 'qty']]

In [16]:
binance_df = calculate_vwp_and_qty(binance_df, 'S')

In [17]:
binance_df

Unnamed: 0,time,price,qty
0,2023-01-01 00:00:00,0.000603,26.78
1,2023-01-01 00:00:06,0.000603,6.77
2,2023-01-01 00:00:07,0.000603,57.95
3,2023-01-01 00:00:12,0.000603,30.00
4,2023-01-01 00:00:16,0.000602,30.00
...,...,...,...
4776890,2023-11-30 23:59:32,0.001570,9.63
4776891,2023-11-30 23:59:37,0.001570,1.39
4776892,2023-11-30 23:59:52,0.001571,0.82
4776893,2023-11-30 23:59:56,0.001571,109.04


In [20]:
binance_df.to_csv('../../data/SOLBTC/csv/binance_arbitrage.csv', index=False)

24/01/25 18:46:37 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 994492 ms exceeds timeout 120000 ms
24/01/25 18:46:37 WARN SparkContext: Killing executors is not supported by current scheduler.
24/01/25 18:46:40 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:80)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:642)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1223)
	at o