In [1]:
import pandas as pd
import numpy as np
from tqdm import tqdm
import warnings
import os
import zstandard as zstd
import io
import random
import pickle

warnings.filterwarnings("ignore")

In [2]:
conda install zstandard

Channels:
 - defaults
 - conda-forge
Platform: osx-64
Collecting package metadata (repodata.json): done
Solving environment: done

# All requested packages already installed.


Note: you may need to restart the kernel to use updated packages.


In [13]:
def process_csv_files(directory, ticker):
    """
    Convert input csv to dataframes and prepare to process through trading signal.

    Params:
    directory (String): Name of directory that contains list of csv files.

    Returns:
    return (list[DataFrame]): List of pandas DataFrames, one for each trading day.
    """
    
    # Directory containing the CSV files
    
    ret = []
    i = 1
    # Loop through all files in the directory and convert .zst csv to df
    for filename in os.listdir(directory):
        
        if filename.endswith('.zst'):
            filepath = os.path.join(directory, filename)

            try:
                # Open the compressed file in binary mode
                with open(filepath, 'rb') as compressed_file:
                    # Initialize the decompressor
                    dctx = zstd.ZstdDecompressor()
                    
                    # Decompress the file into an in-memory buffer
                    with dctx.stream_reader(compressed_file) as decompressed_stream:
                        text_stream = io.TextIOWrapper(decompressed_stream, encoding='utf-8')
                        print(f"Processing {filename}\n")
                        # Read the decompressed data into a pandas DataFrame
                        df = pd.read_csv(text_stream, parse_dates=['ts_recv', 'ts_event'])
                        print(str(i) + ' read file')
                        df.to_pickle(os.path.join(directory,"day_" + str(i) + ".pkl"))
                        print(str(i) + ' saved file as pickle')
                        i +=1
                        print(f"Processed {filename}\n")
                    
            except Exception as e:
                print(f"Error processing {filename}\n")
                print(e)
                continue
                
            
            
    return 0

In [14]:
def process_csv_files_uncompressed(directory, ticker):
    """
    Convert input csv to dataframes and prepare to process through trading signal.

    Params:
    directory (String): Name of directory that contains list of csv files.

    Returns:
    return (list[DataFrame]): List of pandas DataFrames, one for each trading day.
    """
    
    # Directory containing the CSV files
    
    ret = []
    i = 1
    # Loop through all files in the directory and convert .zst csv to df
    for filename in os.listdir(directory):
        
        if filename.endswith('.csv'):
            filepath = os.path.join(directory, filename)

            try:
                print(f"Processing {filename}\n")
                # Read the decompressed data into a pandas DataFrame
                df = pd.read_csv(filepath, parse_dates=['ts_recv', 'ts_event'])
                print(str(i) + ' read file')
                df.to_pickle(os.path.join(directory,"day_" + str(i) + ".pkl"))
                print(str(i) + ' saved file as pickle')
                i +=1
                print(f"Processed {filename}\n")
                    
            except Exception as e:
                print(f"Error processing {filename}\n")
                print(e)
                continue
                
            
            
    return 0

In [2]:
directory = 'equity-pkl'

dfs = []

for filename in os.listdir(directory):
    if filename.endswith('.pkl'):
        filepath = os.path.join(directory, filename)
        with open(filepath, 'rb') as file:
            data = pickle.load(file)
            dfs.append(data)
            print(f"Loaded {filename}")


Loaded day_19.pkl
Loaded day_18.pkl
Loaded day_20.pkl
Loaded day_21.pkl
Loaded day_22.pkl
Loaded day_9.pkl
Loaded day_8.pkl
Loaded day_6.pkl
Loaded day_7.pkl
Loaded day_5.pkl
Loaded day_4.pkl
Loaded day_1.pkl
Loaded day_3.pkl
Loaded day_2.pkl
Loaded day_13.pkl
Loaded day_12.pkl
Loaded day_10.pkl
Loaded day_11.pkl
Loaded day_15.pkl
Loaded day_14.pkl
Loaded day_16.pkl
Loaded day_17.pkl


In [2]:
def moving_average_signal(df, short_window, long_window, b):
    """
    Generate trading signals based on moving average cross-over strategy.

    Params:
    prices (Series): asset prices
    short_window (int): Window size for the short-term moving average
    long_window (int): Window size for the long-term moving average
    b (int): Bandwidth parameter that determines the buy/sell thresholds

    Return:
    return (Series): Series with trading signals (+1 for buy, -1 for sell, 0 for hold)
    """

    # Create midprice column (average of bid and ask prices)
    mid_price = (df['bid_px_00'] + df['ask_px_00']) / 2
    
    # Calculate short-term and long-term moving averages
    short_ma = mid_price.rolling(window=short_window).mean()
    long_ma = mid_price.rolling(window=long_window).mean()
    
    # Define thresholds
    upper_threshold = (1 + b) * long_ma
    lower_threshold = (1 - b) * long_ma
    
    signal = pd.Series(0, index=df.index)

    # Generate buy signals (+1 where short_ma > upper_threshold)
    signal[short_ma > upper_threshold] = 1
    
    # Generate sell signals (-1 where short_ma < lower_threshold)
    signal[short_ma < lower_threshold] = -1

    #Set signal to 0 at the end of the day to ensure no overnight positions
    signal[df.index.time > pd.to_datetime('19:55').time()] = 0

    return signal

In [8]:
import pandas as pd

def execute_trading_signal(bbo_df, signal, order_size=1_000_000, execution_delay='0ms'):
    """
    Execute the trading signal based on the available order book depth and calculate actual P&L with execution delays.
    
    Params:
    bbo_df (DataFrame): DataFrame containing the order book and signals for each day.
    signal (Series): Series containing the generated trading signals.
    order_size (float): Size of the order for each trade (default $1M).
    execution_delay (str): The execution delay (e.g., '0ms', '100ms', '1s') to simulate latency before trade execution.
    
    Returns:
    DataFrame: Updated DataFrame with calculated returns.
    """
    
    # Initialize position and return
    position = 0
    shares_held = 0
    bbo_df['return'] = 0

    # Process MPB-10 data to get 3-level deep order book
    bids = bbo_df[['bid_px_00', 'bid_sz_00', 'bid_px_01', 'bid_sz_01', 'bid_px_02', 'bid_sz_02']].copy()
    asks = bbo_df[['ask_px_00', 'ask_sz_00', 'ask_px_01', 'ask_sz_01', 'ask_px_02', 'ask_sz_02']].copy()

    # Convert the execution delay to a pandas Timedelta for future lookups
    delay_timedelta = pd.to_timedelta(execution_delay)

    for i in signal.index:
        current_signal = signal.loc[i]

        # Calculate the future timestamp for delayed execution
        delayed_timestamp = i + delay_timedelta

        # Ensure the delayed timestamp is within the bounds of the data by finding the next available one
        if delayed_timestamp not in bbo_df.index:
            # Use `searchsorted` to find the index of the next available timestamp
            pos = bbo_df.index.searchsorted(delayed_timestamp)
            
            # If no valid future timestamp is found, skip to the next iteration
            if pos >= len(bbo_df.index):
                print(f"No valid future timestamp available for {i}. Skipping.")
                continue

            delayed_timestamp = bbo_df.index[pos]

        # Buy logic: when signal is +1 and we don't have an open position
        if current_signal == 1 and shares_held == 0:
            shares_bought = 0
            investment = 0

            # Buy up to 3 levels of the ask side until $1M is spent
            for level in range(3):
                ask_price = asks.loc[delayed_timestamp, f'ask_px_0{level}']
                ask_size = asks.loc[delayed_timestamp, f'ask_sz_0{level}']
                level_investment = min(order_size - investment, ask_price * ask_size)
                
                # Calculate number of shares to buy
                level_shares = level_investment / ask_price
                shares_bought += level_shares
                investment += level_investment

                if investment >= order_size:
                    break  # Exit loop when $1M investment is reached

            position += investment
            shares_held += shares_bought

            # Mark the negative cash flow from buying
            bbo_df.loc[i, 'return'] = -investment

        # Sell logic: when signal is -1 and we don't have an open position
        elif current_signal == -1 and shares_held > 0:
            shares_sold = 0
            revenue = 0

            # Sell up to 3 levels of the bid side, until $1M worth of shares are sold
            for level in range(3):
                bid_price = bids.loc[delayed_timestamp, f'bid_px_0{level}']
                bid_size = bids.loc[delayed_timestamp, f'bid_sz_0{level}']
                level_revenue = min(order_size - revenue, bid_price * bid_size)

                level_shares = level_revenue / bid_price
                shares_sold += level_shares
                revenue += level_revenue

                if revenue >= order_size:
                    break

            position -= revenue
            shares_held -= shares_sold

            # Mark the positive cash flow from selling
            bbo_df.loc[i, 'return'] = revenue

        # Exit logic: signal turns 0 and we have an open position
        elif current_signal == 0 and shares_held != 0:
            # Liquidate the position by settling the open trade
            if shares_held > 0:
                bid_price = bids.loc[delayed_timestamp, 'bid_px_00']
                sell_qty = min(shares_held, bids.loc[delayed_timestamp, 'bid_sz_00'])
                revenue = sell_qty * bid_price
                position -= revenue
                shares_held -= sell_qty

                # Mark the positive cash flow from selling
                bbo_df.loc[i, 'return'] = revenue

            elif shares_held < 0:
                ask_price = asks.loc[delayed_timestamp, 'ask_px_00']
                buy_qty = min(abs(shares_held), asks.loc[delayed_timestamp, 'ask_sz_00'])
                investment = buy_qty * ask_price
                position += investment
                shares_held += buy_qty

                # Mark the negative cash flow from buying
                bbo_df.loc[i, 'return'] = -investment

    # Scale the cumulative return
    bbo_df['cumulative_return'] = (bbo_df['return']).cumsum()
    bbo_df['scaled_return'] = bbo_df['cumulative_return'] / 1_000_000

    return bbo_df


In [None]:
return_sig_1 = []
return_sig_2 = []
return_sig_3 = []

directory = 'equity-pkl'

def extract_day(filename):
    return int(filename.split('_')[1].split('.')[0])

# Get all .pkl files in the directory and sort by day number
file_list = [f for f in os.listdir(directory) if f.endswith('.pkl')]
file_list_sorted = sorted(file_list, key=extract_day)

for filename in file_list_sorted:
    print('Processing ' + filename)
    
    filepath = os.path.join(directory, filename)
    
    with open(filepath, 'rb') as file:
        df = pickle.load(file)
        
        if 'ts_event' in list(df.columns):
            df = df.set_index('ts_event')
                    
        bbo_df = df.between_time('13:40', '19:55', inclusive='left')
        tkr_df = bbo_df[bbo_df['symbol'] == 'ANF'].resample('100ms').last().ffill()

        signal_1 = moving_average_signal(tkr_df, 50, 500, 0.0015)  # No delay
        signal_2 = moving_average_signal(tkr_df, 100, 1000, 0.0025) # 100ms
        signal_3 = moving_average_signal(tkr_df, 500, 5000, 0.0035) # 1s

        print('Successfully generated trading signal. Sending to generate returns.')
        
        # Execute signals with the corresponding delays
        returns_0ms = execute_trading_signal(tkr_df, signal_1, execution_delay='0ms')
        returns_100ms = execute_trading_signal(tkr_df, signal_2, execution_delay='10ms')
        returns_1s = execute_trading_signal(tkr_df, signal_3, execution_delay='100ms')
        
        print('Returns successfully generated for ' + filename, '\n')

        # Extract the cumulative returns
        ret_0ms = returns_0ms["cumulative_return"].tail(1).values[0]
        ret_100ms = returns_100ms["cumulative_return"].tail(1).values[0]
        ret_1s = returns_1s["cumulative_return"].tail(1).values[0]

        # Append to respective return lists
        return_sig_1.append(ret_0ms)
        return_sig_2.append(ret_100ms)
        return_sig_3.append(ret_1s)

        print(f'Delay 0ms return: {ret_0ms}')
        print(f'Delay 100ms return: {ret_100ms}')
        print(f'Delay 1s return: {ret_1s}\n')


Processing day_1.pkl
Successfully generated trading signal. Sending to generate returns.
No valid future timestamp available for 2024-08-23 19:54:55.900000+00:00. Skipping.
No valid future timestamp available for 2024-08-23 19:54:55.900000+00:00. Skipping.
Returns successfully generated for day_1.pkl 

Delay 0ms return: -209.39000000000397
Delay 100ms return: -209.39000000000397
Delay 1s return: -209.39000000000397

Processing day_2.pkl
Successfully generated trading signal. Sending to generate returns.
No valid future timestamp available for 2024-08-30 19:54:59.900000+00:00. Skipping.
No valid future timestamp available for 2024-08-30 19:54:59.900000+00:00. Skipping.
Returns successfully generated for day_2.pkl 

Delay 0ms return: -915.0300000000439
Delay 100ms return: -915.0300000000439
Delay 1s return: -915.0300000000439

Processing day_3.pkl
Successfully generated trading signal. Sending to generate returns.
No valid future timestamp available for 2024-08-22 19:54:59+00:00. Skippin