##### The following notebook shows the preprocessing steps from the original datasets of trade and bbo files for the 2010 year for 1 asset out of the 29. For the generalisation with the preprocessing of all 29 stocks, refer to the preprocessing.py file.

In [1]:
import pandas as pd
import polars as pl
import os

We extract all trade and bbo files from 2010 to one folder extracted/files with subfolders bbo/{stock} and trade/{stock} for every stock.

In [3]:
import tarfile

tar_path = 'GS.N-2010.tar'
extract_path = './extracted_files'  # Destination folder for extracted files

with tarfile.open(tar_path, 'r') as tar:
    tar.extractall(path=extract_path)
    print("Files extracted to:", extract_path)


Files extracted to: ./extracted_files


In [4]:
# List all files in the current directory ending with .tar
files = [file for file in os.listdir(".") if file.endswith(".tar")]
files_list = []
for file in files:
    files_list.append(file)
    
print(files_list)

['AAPL.OQ-2010.tar', 'AMGN.OQ-2010.tar', 'AXP.N-2010.tar', 'BA.N-2010.tar', 'CAT.N-2010.tar', 'CSCO.OQ-2010.tar', 'CVX.N-2010.tar', 'DOW.N-2010.tar', 'GS.N-2010.tar', 'HD.N-2010.tar', 'IBM.N-2010.tar', 'INTC.OQ-2010.tar', 'JNJ.N-2010.tar', 'JPM.N-2010.tar', 'KO.N-2010.tar', 'MCD.N-2010.tar', 'MMM.N-2010.tar', 'MRK.N-2010.tar', 'MSFT.OQ-2010.tar', 'NKE.N-2010.tar', 'PFE.N-2010.tar', 'PG.N-2010.tar', 'RTX.N-2010.tar', 'TRV.N-2010.tar', 'UNH.N-2010.tar', 'UTX.N-2010.tar', 'V.N-2010.tar', 'VZ.N-2010.tar', 'WBA.OQ-2010.tar', 'WMT.N-2010.tar', 'XOM.N-2010.tar']


In [5]:
# Functions to load bbo and trade files as timeseries dataframes
# Many trade files were unavailable without using ignore_errors=True, and then make the corrections for values of wrong types
def load_bbo(filename,
            tz_exchange="America/New_York",
            only_regular_trading_hours=True,
            hhmmss_open="09:30:00",
            hhmmss_close="16:00:00",
            merge_same_index=True):
    try:
        if filename.endswith("csv") or filename.endswith("csv.gz"):
            DF=pl.read_csv(filename)
        elif filename.endswith("parquet"):    
            DF=pl.read_parquet(filename)
        else:
            print("cannot load file "+filename+" : unknown format")
            return None
        if DF.is_empty():
            print(f"Skipping file: {filename} (empty file)")
            return None
    except:
        print(filename+" cannot be loaded")
        return None

    excel_base_date = pl.datetime(1899, 12, 30)  # Excel starts counting from 1900-01-01, but Polars needs 1899-12-30
    DF = DF.with_columns(
        (pl.col("xltime") * pl.duration(days=1) + excel_base_date).alias("index")
    )
    DF = DF.with_columns(pl.col("index").dt.convert_time_zone(tz_exchange))
    DF.drop("xltime")

    # apply common sense filter
    DF = DF.filter(pl.col("ask-price")>0).filter(pl.col("bid-price")>0).filter(pl.col("ask-price")>pl.col("bid-price"))

    if merge_same_index:
        DF = DF.group_by('index',maintain_order=True).last()   # last quote of the same timestamp
    
    if only_regular_trading_hours:
        hh_open,mm_open,ss_open = [float(x) for x in hhmmss_open.split(":")]
        hh_close,mm_close,ss_close = [float(x) for x in hhmmss_close.split(":")]

        seconds_open=hh_open*3600+mm_open*60+ss_open
        seconds_close=hh_close*3600+mm_close*60+ss_close

        DF = DF.filter(pl.col('index').dt.hour().cast(float)*3600+pl.col('index').dt.minute().cast(float)*60+pl.col('index').dt.second()>=seconds_open,
                       pl.col('index').dt.hour().cast(float)*3600+pl.col('index').dt.minute().cast(float)*60+pl.col('index').dt.second()<=seconds_close)
    
    return DF

def load_trade(filename,
            tz_exchange="America/New_York",
            only_non_special_trades=True,
            only_regular_trading_hours=True,
            merge_sub_trades=True):
    try:
        if filename.endswith("csv") or filename.endswith("csv.gz"):
            DF=pl.read_csv(filename, ignore_errors=True)
        elif filename.endswith("parquet"):    
            DF=pl.read_parquet(filename)
        else:
            print("cannot load file "+filename+" : unknown format")
            return None
        if DF.is_empty():
            print(f"Skipping file: {filename} (empty file)")
            return None
    except:
        print(filename+" cannot be loaded")
        return None

    excel_base_date = pl.datetime(1899, 12, 30)  # Excel starts counting from 1900-01-01, but Polars needs 1899-12-30
    DF = DF.with_columns(
        (pl.col("xltime") * pl.duration(days=1) + excel_base_date).alias("index")
    )
    DF = DF.with_columns(pl.col("index").dt.convert_time_zone(tz_exchange))
    DF.drop(["xltime","trade-rawflag","trade-stringflag"])

    if only_non_special_trades:
        DF=DF.filter(pl.col("trade-stringflag")=="uncategorized")

    if DF["trade-price"].dtype != pl.Float64:
                    DF = DF.with_columns(pl.col("trade-price").cast(pl.Float64))
    if DF["trade-volume"].dtype != pl.Float64:
                    DF = DF.with_columns(pl.col("trade-volume").cast(pl.Float64))
                    
    if merge_sub_trades:   # average volume-weighted trade price here
        DF=DF.group_by('index',maintain_order=True).agg([(pl.col('trade-price')*pl.col('trade-volume')).sum()/(pl.col('trade-volume').sum()).alias('trade-price'),pl.sum('trade-volume')])        
    
    return DF

In [6]:
# Here: go through every file for GS (2010), convert the bbo and trade files using function and save them in directory correction
import os
import time

start = time.time()

# Base directory
base_input_dir = "extracted_files/data/extraction/TRTH/raw/equities/US"
output_base_dir = "correction"

# Make bbo and trade subdirectories
categories = {
    "bbo": "bbo/GS.N",
    "trade": "trade/GS.N"
}

# Ensure output directories exist
for category in categories:
    os.makedirs(os.path.join(output_base_dir, category, "GS"), exist_ok=True)

# Process files, skipping the first file in each directory, this is because the first file cannot be loaded without errors in the case of every single asset
for category, relative_path in categories.items():
    input_dir = os.path.join(base_input_dir, relative_path)
    output_dir = os.path.join(output_base_dir, category, "GS")

    file_counter = 0

    for root, _, files in os.walk(input_dir):
        for file in sorted(files):
            if file.endswith(".csv.gz"):
                file_counter += 1
                if file_counter == 1:
                    print(f"Skipping first file: {file}")
                    continue

                input_path = os.path.join(root, file)

                # Apply load_bbo or load_trade depending on category
                if category == "bbo":
                    processed_data = load_bbo(input_path)
                elif category == "trade":
                    processed_data = load_trade(input_path)
                else:
                    continue

                if processed_data is not None:
                    # Save processed data
                    output_file = file.replace(".csv.gz", ".csv")
                    output_path = os.path.join(output_dir, output_file)
                    processed_data.write_csv(output_path)
                    print(f"Processed and saved: {output_path}")

end = time.time()
print(f"Processing took {end - start} seconds")


Skipping first file: 2010-01-01-GS.N-bbo.csv.gz
Processed and saved: correction\bbo\GS\2010-01-04-GS.N-bbo.csv
Processed and saved: correction\bbo\GS\2010-01-05-GS.N-bbo.csv
Processed and saved: correction\bbo\GS\2010-01-06-GS.N-bbo.csv
Processed and saved: correction\bbo\GS\2010-01-07-GS.N-bbo.csv
Processed and saved: correction\bbo\GS\2010-01-08-GS.N-bbo.csv
Processed and saved: correction\bbo\GS\2010-01-11-GS.N-bbo.csv
Processed and saved: correction\bbo\GS\2010-01-12-GS.N-bbo.csv
Processed and saved: correction\bbo\GS\2010-01-13-GS.N-bbo.csv
Processed and saved: correction\bbo\GS\2010-01-14-GS.N-bbo.csv
Processed and saved: correction\bbo\GS\2010-01-15-GS.N-bbo.csv
Skipping file: extracted_files/data/extraction/TRTH/raw/equities/US\bbo/GS.N\2010-01-18-GS.N-bbo.csv.gz (empty file)
Processed and saved: correction\bbo\GS\2010-01-19-GS.N-bbo.csv
Processed and saved: correction\bbo\GS\2010-01-20-GS.N-bbo.csv
Processed and saved: correction\bbo\GS\2010-01-21-GS.N-bbo.csv
Processed and sa

In [10]:
# Merge for all trade and bbo files that exist as processed for same dates
# Get all pairs of trade and bbo files per date where both exist

# Base directories
bbo_dir = "correction/bbo/GS"
trade_dir = "correction/trade/GS"
joined_dir = "correction/joined/GS"

# Ensure the joined directory exists
os.makedirs(joined_dir, exist_ok=True)

# Get the dates from the filenames
def extract_date(filename, suffix):
    if filename.endswith(suffix):
        parts = filename.split("-")
        if len(parts) >= 3: 
            return "-".join(parts[:3])
    return None

# Get all dates and corresponding trade and bbo files
bbo_dates = []
for file in os.listdir(bbo_dir):
    if file.endswith("-bbo.csv"):
        date = extract_date(file, "-bbo.csv")
        if date:
            bbo_dates.append((date, file))

trade_dates = []
for file in os.listdir(trade_dir):
    if file.endswith("-trade.csv"):
        date = extract_date(file, "-trade.csv")
        if date:
            trade_dates.append((date, file))

# Use of dictionaries for date matching
bbo_dict = {date: file for date, file in bbo_dates}
trade_dict = {date: file for date, file in trade_dates}

common_dates = set(bbo_dict.keys()).intersection(trade_dict.keys())

# Make the join using the optimized fill_forward scheme for every date
for date in common_dates:
    bbo_file = os.path.join(bbo_dir, bbo_dict[date])
    trade_file = os.path.join(trade_dir, trade_dict[date])

    try:
        # Read both files
        bbo_df = pl.read_csv(bbo_file)
        trade_df = pl.read_csv(trade_file)

        bbo_df = bbo_df.with_columns(
            pl.col("index")
            .str.replace(r"[-+]\d{4}$", "")  # Remove the timezone offset with regex
            .str.strptime(pl.Datetime, format="%Y-%m-%dT%H:%M:%S%.6f", strict=False)
            .alias("index")
        )

        trade_df = trade_df.with_columns(
            pl.col("index")
            .str.replace(r"[-+]\d{4}$", "")  # Remove the timezone offset with regex
            .str.strptime(pl.Datetime, format="%Y-%m-%dT%H:%M:%S%.6f", strict=False)
            .alias("index")
        )

        events = bbo_df.join(trade_df,on='index',how="full",coalesce=True).sort('index')

        events = events.with_columns(pl.col('index').dt.date().alias('date'))


        events = events.with_columns([pl.col("bid-price").forward_fill().over([pl.col("date")]), pl.col("bid-volume").forward_fill().over([pl.col("date")]), pl.col("ask-price").forward_fill().over([pl.col("date")]), pl.col("ask-volume").forward_fill().over([pl.col("date")]), pl.col("trade-price").forward_fill().over([pl.col("date")]), pl.col("trade-volume").forward_fill().over([pl.col("date")])])

        # Save the joined DataFrame
        events_file = os.path.join(joined_dir, f"{date}-GS.N-joined.csv")
        events.write_csv(events_file)

        print(f"Joined and saved: {events_file}")
    except Exception as e:
        print(f"Error processing files for date {date}: {e}")

print(f"\nTotal number of joined files: {len(common_dates)}")

Joined and saved: correction/joined/GS\2010-01-14-GS.N-joined.csv
Joined and saved: correction/joined/GS\2010-06-03-GS.N-joined.csv
Joined and saved: correction/joined/GS\2010-05-06-GS.N-joined.csv
Joined and saved: correction/joined/GS\2010-05-05-GS.N-joined.csv
Joined and saved: correction/joined/GS\2010-07-07-GS.N-joined.csv
Joined and saved: correction/joined/GS\2010-08-05-GS.N-joined.csv
Joined and saved: correction/joined/GS\2010-08-06-GS.N-joined.csv
Joined and saved: correction/joined/GS\2010-11-16-GS.N-joined.csv
Joined and saved: correction/joined/GS\2010-10-26-GS.N-joined.csv
Joined and saved: correction/joined/GS\2010-11-22-GS.N-joined.csv
Joined and saved: correction/joined/GS\2010-09-03-GS.N-joined.csv
Joined and saved: correction/joined/GS\2010-02-19-GS.N-joined.csv
Joined and saved: correction/joined/GS\2010-10-05-GS.N-joined.csv
Joined and saved: correction/joined/GS\2010-11-10-GS.N-joined.csv
Joined and saved: correction/joined/GS\2010-01-06-GS.N-joined.csv
Joined and

In [13]:
# Ensure all files have same datatypes for their rows
# Changed to parquet to ensure dtypes correct, since in csv types inferred from polars so inconsistent

# Base directory
joined_dir = "correction/joined/GS"

# Get all files from directory
files = [os.path.join(joined_dir, file) for file in os.listdir(joined_dir) if file.endswith(".csv")]

# Check and update each file's schema
for file in sorted(files):
    df = pl.read_csv(file)

    # Replace NaN values with a default value, use of 0 because no other trade will have 0, meaning we can safely switch back to nan for these values
    df = df.with_columns(pl.col("trade-price").fill_null("0.0"))
    df = df.with_columns(pl.col("trade-volume").fill_null("0"))

    # Check and convert column to Float64 if not already
    if df["trade-price"].dtype != pl.Float64:
        df = df.with_columns(pl.col("trade-price").cast(pl.Float64))

    if df["trade-volume"].dtype != pl.Int64:
        df = df.with_columns(pl.col("trade-volume").cast(pl.Float64))

    # Revert the default value back to NaN
    df = df.with_columns(
        pl.when(pl.col("trade-price") == 0.0).then(None).otherwise(pl.col("trade-price")).alias("trade-price")
    )
    df = df.with_columns(
        pl.when(pl.col("trade-volume") == 0.0).then(None).otherwise(pl.col("trade-volume")).alias("trade-volume")
    )

    parquet_file = file.replace(".csv", ".parquet")
    df.write_parquet(parquet_file)
    print(f"File updated and saved: {parquet_file}")

    # Remove the original CSV file
    os.remove(file)
    print(f"Original CSV file removed: {file}")

print("Processing complete.")

File updated and saved: correction/joined/GS\2010-01-04-GS.N-joined.parquet
Original CSV file removed: correction/joined/GS\2010-01-04-GS.N-joined.csv
File updated and saved: correction/joined/GS\2010-01-05-GS.N-joined.parquet
Original CSV file removed: correction/joined/GS\2010-01-05-GS.N-joined.csv
File updated and saved: correction/joined/GS\2010-01-06-GS.N-joined.parquet
Original CSV file removed: correction/joined/GS\2010-01-06-GS.N-joined.csv
File updated and saved: correction/joined/GS\2010-01-07-GS.N-joined.parquet
Original CSV file removed: correction/joined/GS\2010-01-07-GS.N-joined.csv
File updated and saved: correction/joined/GS\2010-01-08-GS.N-joined.parquet
Original CSV file removed: correction/joined/GS\2010-01-08-GS.N-joined.csv
File updated and saved: correction/joined/GS\2010-01-11-GS.N-joined.parquet
Original CSV file removed: correction/joined/GS\2010-01-11-GS.N-joined.csv
File updated and saved: correction/joined/GS\2010-01-12-GS.N-joined.parquet
Original CSV file 

In [14]:
# Method to get all stock names from the tar file names

tar_paths = ['AAPL.OQ-2010.tar', 'AMGN.OQ-2010.tar', 'AXP.N-2010.tar', 'BA.N-2010.tar', 'CAT.N-2010.tar', 'CSCO.OQ-2010.tar', 'CVX.N-2010.tar', 'DOW.N-2010.tar', 'GS.N-2010.tar', 'HD.N-2010.tar', 'IBM.N-2010.tar', 'INTC.OQ-2010.tar', 'JNJ.N-2010.tar', 'JPM.N-2010.tar', 'KO.N-2010.tar', 'MCD.N-2010.tar', 'MMM.N-2010.tar', 'MRK.N-2010.tar', 'MSFT.OQ-2010.tar', 'NKE.N-2010.tar', 'PFE.N-2010.tar', 'PG.N-2010.tar', 'RTX.N-2010.tar', 'TRV.N-2010.tar', 'UNH.N-2010.tar', 'UTX.N-2010.tar', 'V.N-2010.tar', 'VZ.N-2010.tar', 'WBA.OQ-2010.tar', 'WMT.N-2010.tar', 'XOM.N-2010.tar']
extract_path = './extracted_files'

def get_stocks_names(paths_list):
    stocks = []
    for path in paths_list:
        stocks.append(path.split('.')[0])
    return stocks

stocks = get_stocks_names(tar_paths)
print(stocks)

['AAPL', 'AMGN', 'AXP', 'BA', 'CAT', 'CSCO', 'CVX', 'DOW', 'GS', 'HD', 'IBM', 'INTC', 'JNJ', 'JPM', 'KO', 'MCD', 'MMM', 'MRK', 'MSFT', 'NKE', 'PFE', 'PG', 'RTX', 'TRV', 'UNH', 'UTX', 'V', 'VZ', 'WBA', 'WMT', 'XOM']


In [15]:
# Merge all dates to get one df for the entire year 2010 

# Define base directories
joined_dir = "correction/joined/GS"
output_dir = "correction/final_yearly" 
output_file = os.path.join(output_dir, "GS_combined.parquet")

# Ensure directory exists
os.makedirs(output_dir, exist_ok=True)

# Collect all files in the directory
files = [os.path.join(joined_dir, file) for file in os.listdir(joined_dir) if file.endswith(".parquet")]

# Initialize polar df to put all trades of one year in same df
combined_df = pl.DataFrame()

# Process each file
for file in sorted(files):
    try:
        df = pl.read_parquet(file)
        # Append to combined DataFrame
        combined_df = pl.concat([combined_df, df], how="vertical")
    except Exception as e:
        print(f"Error reading file {file}: {e}")

# Ensure the combined DataFrame is sorted by date
combined_df = combined_df.sort("index")

# Save to parquet file
combined_df.write_parquet(output_file)

print(f"All files combined and saved to: {output_file}")

All files combined and saved to: correction/final_yearly\GS_combined.parquet


In [41]:
GS = pl.read_parquet(output_file)
GS

index,xltime,bid-price,bid-volume,ask-price,ask-volume,trade-price,trade-volume,date
str,f64,f64,i64,f64,i64,f64,i64,str
"""2010-01-04T09:30:28.904000""",40182.604501,170.17,1,170.25,3,,,"""2010-01-04"""
"""2010-01-04T09:30:28.914000""",40182.604501,170.17,1,170.25,2,170.25,300,"""2010-01-04"""
"""2010-01-04T09:30:28.925000""",40182.604501,170.05,3,170.33,10,170.081111,900,"""2010-01-04"""
"""2010-01-04T09:30:28.935000""",40182.604502,170.05,1,170.33,10,170.258,1000,"""2010-01-04"""
"""2010-01-04T09:30:28.945000""",40182.604502,170.05,1,170.44,4,170.283333,1200,"""2010-01-04"""
…,…,…,…,…,…,…,…,…
"""2010-12-31T15:59:57.783000""",40543.874974,168.17,2,168.22,2,168.22,252,"""2010-12-31"""
"""2010-12-31T15:59:58.238000""",40543.87498,168.17,2,168.23,40,168.22,252,"""2010-12-31"""
"""2010-12-31T15:59:58.240000""",,168.17,2,168.23,40,168.223034,356,"""2010-12-31"""
"""2010-12-31T15:59:59.076999""",40543.874989,168.17,1,168.23,40,168.223034,356,"""2010-12-31"""


We have a parquet for all trades in 2010 for the GS asset. We can see it has initial null values because of the fill_forward method between bbo and trade files. The preprocessing