# Trade Aggregation
### Date: May 20, 2024

-----------

## Introduction
<a id="Introduction"></a>

This notebook aggregates the trades

## Table-of-contents

1. [Introduction](#Introduction)
2. [Table Of Contents](#Table-of-contents)
3. [Import Librarys](#Import-Librarys)
4. [Data Dictionary](#Data-Dictionary)
5. [Load Data](#Load-Data)
   - [Trades Table](#Trades-Table)
   - [Aggregation](#Aggregation)
8. [Links](#Links)
9. [Slippage Calculation and Analysis](#Slippage-Calculation-and-Analysis)
10. [Implementation of Trading Strategies](#Implementation-of-Trading-Strategies)
11. [Results and Discussion](#Results-and-Discussion)
12. [Conclusion](#Conclusion)
13. [References](#References)
14. [Appendix](#Appendix)

## Import-Librarys

In [None]:
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
import ast
import json
from scipy import stats
import os
import sys
from pathlib import Path

import polars as pl
import mysql.connector
import dask.dataframe as dd
from sqlalchemy import create_engine

In [None]:
#pd.set_option('display.max_rows', None)
#pd.set_option('display.max_columns', None)
#pd.set_option('display.max_colwidth', None)

## Data-Dictionary

The bybit websocket response docs:  
https://bybit-exchange.github.io/docs/v5/websocket/public/

## Load-Data

The first issue is how to work with a dataset this size and combine both the `trades` and `orderbook` tables. Pandas is not suitable for a dataset this side, so instead we will use Dask and Polars.

In [None]:
db_name = "data_crypto"
db_user = "root"
db_password = "root"
db_host = "localhost"

uri = 'mysql://root:root@localhost:3306/data_crypto'
uri_2 = 'mysql+pymysql://root:root@localhost:3306/data_crypto'

### Trades Table

In [None]:
table_name = "trades"
index_col = "id"

engine = create_engine(uri_2)

dask_df = dd.read_sql_table(
    con=engine,
    table_name=table_name,
    uri=uri,
    index_col=index_col,
    npartitions=400 # Adjust as needed. This worked for 16gb memory
)

dask_df = dask_df.map_partitions(lambda df: df.sort_values(by='created_at'))

### Time Window

All crypto exchanges are maker-taker markets. Every market is matched with a limit order based on time price priority. The time and sales (also known as recent trades on some exchanges) lists these trades. When a large market order is executed, and cannot be filled completely at the best price, it matches with the second optimum price, and then 3rd, etc. These partial fills within each level is listed as a separate trade in the time and sales window. These are trades we want to aggregate together. 

The only info these partial fills share is the same trade direction and that there is a small time window between them. We can look at the distribution of time deltas to see if there are noticable gaps that might indicate an optimal window or if the distribution is fairly uniform.

In [None]:
timestamps = []
time_diffs = []
trade_sides = []
direction_match = []

last = None

for row in dask_df.itertuples(name="Trade"):
    new_info = ast.literal_eval(row.info)
    
    if last:
        old_info = ast.literal_eval(last.info)
        if last.trade_side == row.trade_side:
            time_diff = new_info['T'] - old_info['T']
            trade_sides.append(row.trade_side)
            timestamps.append(new_info['T'])
            time_diffs.append(time_diff)
            direction_match.append(1)
        else:
            trade_sides.append(row.trade_side)
            timestamps.append(new_info['T'])
            time_diffs.append(0)
            direction_match.append(0)
    else:
        trade_sides.append(row.trade_side)
        timestamps.append(new_info['T'])
        time_diffs.append(0)
        direction_match.append("N/A")
        
    last = row

Plot the dataframe to compare the timedeltas in trades that share the same direction as the previous trade vs those that switch directions.  

In [None]:
df = pd.DataFrame({
    'Timestamp': pd.to_datetime(timestamps, unit='ms'),
    'TimeDiff': time_diffs,
    'TradeSide': trade_sides,
    'DirectionMatch': direction_match
})

# Drop NA row
df_filtered = df.drop(df.index[0])

same_direction = df_filtered[df_filtered['DirectionMatch'] == 1]
different_direction = df_filtered[df_filtered['DirectionMatch'] == 0]

fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 6), sharey=True)
ax1.hist(same_direction['TimeDiff'], bins=20, color='blue')
ax1.set_title('Same Direction')
ax1.set_xlabel('Time Difference (ms)')
ax1.set_ylabel('Frequency')

ax2.hist(different_direction['TimeDiff'], bins=20, color='blue')
ax2.set_title('Different Direction')
ax2.set_xlabel('Time Difference (ms)')

plt.suptitle('Histogram of Time Differences by Trade Direction Match')
plt.show()

There is no optimal time window it seems to help separate same vs different direction trades. Interestingly only the different direction trades have low time deltas, where as same direction time deltas can be over a few seconds. This is likely due to the limited dataset. We can look if theres a time difference between buy and sell trades.

In [None]:
buy_df = df[df['TradeSide'].str.lower() == 'buy']
sell_df = df[df['TradeSide'].str.lower() == 'sell']

plt.figure(figsize=(6, 4))

plt.scatter(buy_df['Timestamp'], buy_df['TimeDiff'], color='blue', label='Buy', marker='o')
plt.scatter(sell_df['Timestamp'], sell_df['TimeDiff'], color='red', label='Sell', marker='x')

plt.xlabel('Timestamp')
plt.ylabel('Time Difference (ms)')
plt.title('Time Differences Between Trades')
plt.xticks(rotation=45)
plt.legend()
plt.show()

There is no noticable difference except for at the end of the dataset. 

Since there is no major difference by trade side or direction, we can look at all the time deltas on a lower timeframe. 

In [None]:
# Drop NA first row 
df_filtered = df.drop(df.index[0])

df_filtered_20 = df[df['TimeDiff'] < 10]

fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 6))


counts1, bins1, patches1 = ax1.hist(df['TimeDiff'], bins=50, edgecolor='black')
ax1.set_xlabel('Time Difference (ms)')
ax1.set_ylabel('Frequency')
ax1.set_title('Histogram of Time Differences Between Trades')

counts2, bins2, patches2  = ax2.hist(df_filtered_20['TimeDiff'], bins=50, edgecolor='black')
ax2.set_xlabel('Time Difference (ms)')
ax2.set_ylabel('Frequency')
ax2.set_title('Histogram of Time Differences (Less than 10 ms) Between Trades')

# Annotate
bar_index = 0
x = bins1[bar_index] + (bins1[bar_index + 1] - bins1[bar_index]) / 2  # Center
y = counts1[bar_index]  # Height of the first bar
annotation_text = "Partial Fills"
ax1.annotate(annotation_text, (x, y), textcoords="offset points", xytext=(50,0), ha='center', arrowprops=dict(arrowstyle="->"))

plt.show()

In [None]:
mean = np.mean(df['TimeDiff'])
median = np.median(df['TimeDiff'])
mode = stats.mode(df['TimeDiff'])

# Print results
print("Mean:", mean)
print("Median:", median)
print("Mode:", mode)

A time window of 0.4 ms should work as a good starting point. Partial fills of larger trades make up the majortity of the time and sales data, and would have by far the shortest changes in timestamp, being essentially instantanous with the ms timestamp accuracy. This means that the first bar is very likely all the partial fills we are trying to aggregate together, so we can expect a fairly accurate aggregation.

### Aggregation

Define custom aggregating class to aggregate and hold the data.

In [None]:
class CustomStack(list):

    def __init__(self, time_window):
        self.time_window = time_window

    def append(self, trade):
        """
        Append an item to the stack only if it matches the pattern or stack is empty. 
        Returns None if successful append, othewise returns df of the aggregated data
        """

        # If the stack is empty, append without checks
        if not self:  
            super().append(trade)
            return None

        # Ignore trades that are not of the same exchange / symbol in case incoming data has mixed data
        if trade.exchange != self[-1].exchange or trade.symbol != self[-1].symbol:
            return None
            
        # If trade is of different side or outside the time window
        # to be considered part of the current stack, then 
        # aggregate, clear stack, and return data
        if (trade.trade_side != self[-1].trade_side) or (trade.created_at - self[-1].created_at >= self.time_window):
            data = self.aggregate()
            self.clear()
            return data
        
    def is_empty(self):
        """Check if the stack is empty"""
        return not self

    def top(self):
        """Return the most recent item at the top of the stack"""
        if not self.is_empty():
            return self[-1]
        raise Exception("Stack is empty")

    def size(self):
        """Return the size of the stack"""
        return len(self)
    
    def aggregate(self):
        '''
        Aggregate the trades. Exchange, symbol, 
        '''
        trade_ids_json = json.dumps([trade.trade_id for trade in self])
        executed_prices_json = json.dumps([trade.executed_price for trade in self])
        base_amounts_json = json.dumps([trade.base_amount for trade in self])
        costs_json = json.dumps([trade.cost for trade in self])
        infos_json = json.dumps([ast.literal_eval(trade.info) for trade in self])
        date_times_json = json.dumps([trade.date_time.strftime('%Y-%m-%d %H:%M:%S') for trade in self])
        created_ats_json = json.dumps([trade.created_at for trade in self])
    
        df = pd.DataFrame({
        'exchange': self[-1].exchange,
        'symbol': self[-1].symbol,
        'trade_id': trade_ids_json,
        'trade_side': self[-1].trade_side,
        'executed_price': executed_prices_json,
        'base_amount': base_amounts_json,
        'cost': costs_json,
        'info': infos_json,
        'date_time': date_times_json,
        'created_at': created_ats_json
        }, index =[0])
        
        return df

Run the aggregation

In [None]:
stack = CustomStack(0.5)
file_name = 'aggregates'

# Get the current working directory (assuming this is run in a Jupyter Notebook)
current_dir = Path(os.getcwd())
print("Current directory:", current_dir)

# Navigate two levels up from 'notebooks' to the 'liquidity' project root
project_root = current_dir.parents[0]
print("Project root:", project_root)

# Define the path to the 'data/processed' directory
target_path = project_root / 'data' / 'processed'
print("Target path for Parquet files:", target_path)

# Define the full path for the Parquet file
filepath = target_path
print("Full file path:", filepath)

count = 0
for row in dask_df.itertuples(name="Trade"):
    result = stack.append(row)

    if result is not None:
        print(row.Index)
        ddf = dd.from_pandas(result, npartitions=1)
        ddf.to_parquet(filepath.as_posix(), engine='pyarrow', write_index=False, append=True)

print("finished")

### Links
- https://blog.dask.org/2021/11/02/choosing-dask-chunk-sizes#rough-rules-of-thumb
- https://docs.dask.org/en/stable/best-practices.html
- https://docs.dask.org/en/stable/dashboard.html
- https://www.coiled.io/blog/reducing-dask-memory-usage
- https://bicortex.com/data-analysis-with-dask-a-python-scale-out-parallel-computation-framework-for-big-data/
- https://www.architecture-performance.fr/ap_blog/reading-a-sql-table-by-chunks-with-pandas/