# Wrangling whole directories: trade and bbo



In [None]:
import polars as pl

import os

dirData_raw = "data/raw/TRTH/equities/US/"
dirData_clean = dirData_raw.replace("raw","clean")
if not os.path.isdir(dirData_clean):
    os.makedirs(dirData_clean)


ticker = "SPY.P"

## Column format sanity check

In [2]:
import glob 
allfiles = glob.glob(dirData_raw+"/bbo/"+ticker+"/*.parquet")

all_dtypes = [pl.scan_parquet(myfile).dtypes for myfile in allfiles]

dtypes_DF = pd.DataFrame(all_dtypes)
dtypes_DF = dtypes_DF.astype(str)                 # value_counts() require objects that can be ordered.
print(dtypes_DF)


           0        1      2        3      4
0    Float64  Float64  Int32  Float64  Int32
1    Float64  Float64  Int32  Float64  Int32
2    Float64  Float64  Int32  Float64  Int32
3    Float64  Float64  Int32  Float64  Int32
4    Float64  Float64  Int32  Float64  Int32
..       ...      ...    ...      ...    ...
565  Float64  Float64  Int32  Float64  Int32
566  Float64  Float64  Int32  Float64  Int32
567  Float64  Float64  Int32  Float64  Int32
568  Float64  Float64  Int32  Float64  Int32
569  Float64  Float64  Int32  Float64  Int32

[570 rows x 5 columns]


  all_dtypes = [pl.scan_parquet(myfile).dtypes for myfile in allfiles]


Let us now add a suffix "-bad" to all the files which do not have the same column dtypes as the majority.

Note: some files have strings instead of floats, this is due to a problem in the original .csv file and SHOULD be corrected in a production setting.

In [3]:
import os
most_common_data_types = pd.DataFrame(dtypes_DF.value_counts()).iloc[0].name

for idx in range(len(all_dtypes)):
    my_dtypes=all_dtypes[idx]
    my_dtypes=[str(x) for x in my_dtypes]

    all_good = all([a==b for a,b in zip(my_dtypes,most_common_data_types)])
    if not all_good:
        os.rename(allfiles[idx],allfiles[idx]+"-bad")


Let us do it again for the trade files

In [4]:
import glob 

allfiles = glob.glob(dirData_raw+"/trade/"+ticker+"/*parquet")

all_dtypes = [pl.scan_parquet(myfile).dtypes for myfile in allfiles]

dtypes_DF = pd.DataFrame(all_dtypes)
dtypes_DF = dtypes_DF.astype(str)


import os
most_common_data_types = pd.DataFrame(dtypes_DF.value_counts()).iloc[0].name

for idx in range(len(all_dtypes)):
    my_dtypes=all_dtypes[idx]
    my_dtypes=[str(x) for x in my_dtypes]

    all_good = all([a==b for a,b in zip(my_dtypes,most_common_data_types)])
    if not all_good:
        os.rename(allfiles[idx],allfiles[idx]+"-bad")


  all_dtypes = [pl.scan_parquet(myfile).dtypes for myfile in allfiles]


## Trade and bbo wrangling functions that allow directory-wise scan_parquet

In [5]:
def wrangle_trade(DF,
            tz_exchange="America/New_York",
            only_non_special_trades=True,
            only_regular_trading_hours=True,
            merge_sub_trades=True):
    excel_base_date = pl.datetime(1899, 12, 30)  # Excel starts counting from 1900-01-01, but Polars needs 1899-12-30
    DF = DF.with_columns(
        (pl.col("xltime") * pl.duration(days=1) + excel_base_date).alias("index")
    )
    DF = DF.with_columns(pl.col("index").dt.convert_time_zone(tz_exchange))

    if only_non_special_trades:
        DF=DF.filter(pl.col("trade-stringflag")=="uncategorized")

    DF = DF.drop(["xltime","trade-rawflag","trade-stringflag"])


    if merge_sub_trades:   # average volume-weighted trade price here
        DF=DF.group_by('index',maintain_order=True).agg([(pl.col('trade-price')*pl.col('trade-volume')).sum()/(pl.col('trade-volume').sum()).alias('trade-price'),pl.sum('trade-volume')])        
    
    return DF




def load_TRTH_trade_file(filename,
            tz_exchange="America/New_York",
            only_non_special_trades=True,
            only_regular_trading_hours=True,
            merge_sub_trades=True):
    try:
        if filename.endswith("csv") or filename.endswith("csv.gz"):
            DF=pl.read_csv(filename)
        elif filename.endswith("parquet"):    
            DF=pl.read_parquet(filename)
        else:
            print("cannot load file "+filename+" : unknown format")
            return None
    except:
        print(filename+" cannot be loaded")
        return None
    
    DF = wrangle_trade(DF,
            tz_exchange=tz_exchange,
            only_non_special_trades=only_non_special_trades,
            only_regular_trading_hours=only_regular_trading_hours,
            merge_sub_trades=merge_sub_trades)

    return DF



In [6]:
def wrangle_bbo(DF,
            tz_exchange="America/New_York",
            only_regular_trading_hours=True,
            hhmmss_open="09:30:00",
            hhmmss_close="16:00:00",
            merge_same_index=True):

    excel_base_date = pl.datetime(1899, 12, 30)  # Excel starts counting from 1900-01-01, but Polars needs 1899-12-30
    DF = DF.with_columns(
        (pl.col("xltime") * pl.duration(days=1) + excel_base_date).alias("index")
    )
    DF = DF.with_columns(pl.col("index").dt.convert_time_zone(tz_exchange))
    DF = DF.drop("xltime")

    # apply common sense filter
    DF = DF.filter(pl.col("ask-price")>0).filter(pl.col("bid-price")>0).filter(pl.col("ask-price")>pl.col("bid-price"))

    if merge_same_index:
        DF = DF.group_by('index',maintain_order=True).last()   # last quote of the same timestamp
    
    if only_regular_trading_hours:
        hh_open,mm_open,ss_open = [int(x) for x in hhmmss_open.split(":")]
        hh_close,mm_close,ss_close = [int(x) for x in hhmmss_close.split(":")]

        seconds_open=hh_open*3600+mm_open*60+ss_open
        seconds_close=hh_close*3600+mm_close*60+ss_close

        DF = DF.filter(pl.col('index').dt.hour().cast(pl.Int32)*3600+pl.col('index').dt.minute().cast(pl.Int32)*60+pl.col('index').dt.second().cast(pl.Int32)>=seconds_open,
                       pl.col('index').dt.hour().cast(pl.Int32)*3600+pl.col('index').dt.minute().cast(pl.Int32)*60+pl.col('index').dt.second().cast(pl.Int32)<=seconds_close)
    return DF

def load_TRTH_bbo_file(filename,
            tz_exchange="America/New_York",
            only_regular_trading_hours=True,
            hhmmss_open="09:30:00",
            hhmmss_close="16:00:00",
            merge_same_index=True):
    try:
        if filename.endswith("csv") or filename.endswith("csv.gz"):
            DF=pl.read_csv(filename)
        elif filename.endswith("parquet"):    
            DF=pl.read_parquet(filename)
        else:
            print("cannot load file "+filename+" : unknown format")
            return None
    except:
        print(filename+" cannot be loaded")
        return None

    DF = wrangle_bbo(DF,
            tz_exchange=tz_exchange,
            only_regular_trading_hours=only_regular_trading_hours,
            hhmmss_open=hhmmss_open,
            hhmmss_close=hhmmss_close,
            merge_same_index=merge_same_index)

    
    return DF



In [7]:
df_bbo = pl.scan_parquet(dirData_raw+"/bbo/"+ticker+"/*parquet")   # lazy loading
df_bbo

In [8]:
df_bbo = wrangle_bbo(df_bbo)
df_bbo   # computation graph completed with that of wrangle_bbo (nothing has been computed)

In [9]:
if not os.path.isdir(dirData_clean+"/bbo"):
    os.makedirs(dirData_clean+"/bbo")
    
df_bbo.collect(streaming=True).write_parquet(dirData_raw+"/bbo/"+ticker+".parquet")   

In [10]:
df_trade = pl.scan_parquet(dirData_raw+"/trade/SPY.P/*parquet")
df_trade = wrangle_trade(df_trade)

if not os.path.isdir(dirData_clean+"/trade"):
    os.makedirs(dirData_clean+"/trade")

df_trade.collect(streaming=True).write_parquet(dirData_clean+"/trade/"+ticker+"_trade.parquet")
