In [None]:
#this file performs the unzipping of the files, it creates a single data frame for every asset and saves it in the data folder
import os
import pandas as pd
import gzip
import tarfile
import xlrd
import datetime
import matplotlib.pyplot as plt
import numpy as np
from functions import *
import re
import dask
import vaex
import glob

In [None]:
# stock_tickers contains the name of all the files/stocks
stock_tickers_bbo = get_file_names('data/raw/sp100_2004-8/bbo')
stock_tickers_bbo.remove('.DS_Store') 
for file_name_bbo in stock_tickers_bbo:
    file_path_bbo = f"data/raw/sp100_2004-8/bbo/{file_name_bbo}/{file_name_bbo}_bbo.tar"
    output_path_bbo = f"data/raw/sp100_2004-8/bbo/{file_name_bbo}/"
    extract_tar(file_path_bbo, output_path_bbo)


stock_tickers_trade = get_file_names('data/raw/sp100_2004-8/trade')
stock_tickers_trade.remove('.DS_Store') 
for file_name_trade in stock_tickers_trade:
    file_path_trade = f"data/raw/sp100_2004-8/trade/{file_name_trade}/{file_name_trade}_trade.tar"
    output_path_trade = f"data/raw/sp100_2004-8/trade/{file_name_trade}/"
    extract_tar(file_path_trade, output_path_trade)

In [None]:
dask.config.set(scheduler="processes")

@dask.delayed
def load_TRTH_trade(filename,
             tz_exchange="America/New_York",
             only_non_special_trades=True,
             only_regular_trading_hours=True,
             open_time="09:30:00",
             close_time="16:00:00",
             merge_sub_trades=True):
    try:
        if re.search('(csv|csv\\.gz)$',filename):
            DF = pd.read_csv(filename, engine = "pyarrow")
        if re.search(r'arrow$',filename):
            DF = pd.read_arrow(filename)
        if re.search('parquet$',filename):
            DF = pd.read_parquet(filename)
    except Exception as e:
     #   print("load_TRTH_trade could not load "+filename)
     #   print(e)
        return None
    try:
        DF.shape
    except Exception as e: # DF does not exist
        print("DF does not exist")
        print(e)
        return None
    if DF.shape[0]==0:
        return None
    if only_non_special_trades:
        DF = DF[DF["trade-stringflag"]=="uncategorized"]
    DF.drop(columns=["trade-rawflag","trade-stringflag"],axis=1,inplace=True)
    DF.index = pd.to_datetime(DF["xltime"],unit="d",origin="1899-12-30",utc=True)
    DF.index = DF.index.tz_convert(tz_exchange)  # .P stands for Arca, which is based at New York
    DF.drop(columns="xltime",inplace=True)
    if only_regular_trading_hours:
        DF=DF.between_time(open_time,close_time)    # warning: ever heard e.g. about Thanksgivings?
    if merge_sub_trades:
           DF=DF.groupby(DF.index).agg(trade_price=pd.NamedAgg(column='trade-price', aggfunc='mean'),
                                       trade_volume=pd.NamedAgg(column='trade-volume', aggfunc='sum'))
    return DF



@dask.delayed
def load_TRTH_bbo(filename,
             tz_exchange="America/New_York",
             only_regular_trading_hours=True,
             merge_sub_trades=True):
    try:
        if re.search(r'(csv|csv\.gz)$',filename):
            DF = pd.read_csv(filename)
        if re.search(r'arrow$',filename):
            DF = pd.read_arrow(filename)
        if re.search(r'parquet$',filename):
            DF = pd.read_parquet(filename) 
    except Exception as e:
       # print("load_TRTH_bbo could not load "+filename)
        return None
    try:
        DF.shape
    except Exception as e: # DF does not exist
        print("DF does not exist")
        print(e)
        return None
    if DF.shape[0]==0:
        return None
    DF.index = pd.to_datetime(DF["xltime"],unit="d",origin="1899-12-30",utc=True)
    DF.index = DF.index.tz_convert(tz_exchange)  # .P stands for Arca, which is based at New York
    DF.drop(columns="xltime",inplace=True)
    if only_regular_trading_hours:
        DF=DF.between_time("09:30:00","16:00:00")    # ever heard about Thanksgivings?
    if merge_sub_trades:
        DF=DF.groupby(DF.index).last()
    return DF

@dask.delayed
def load_merge_trade_bbo(ticker,date,
                         dirBase="data/raw/sp100_2004-8/",
                         suffix="csv.gz",
                         suffix_save=None,
                         dirSaveBase="data/clean/sp100_2004-8/events",
                         saveOnly=False,
                         doSave=False
                        ):
    
    file_trade=dirBase+"/"+"/trade/"+ticker+"/"+str(date.date())+"-"+ticker+"-trade."+suffix
    file_bbo=file_trade.replace("trade","bbo")
    trades=load_TRTH_trade(file_trade)
    bbos  =load_TRTH_bbo(file_bbo)
    try:
        trades.shape + bbos.shape
    except:
        return None
    
    events=trades.join(bbos,how="outer")
    
    if doSave:
        dirSave=dirSaveBase+"/"+"/events/"+ticker
        if not os.path.isdir(dirSave):
            os.makedirs(dirSave)

        if suffix_save:
            suffix=suffix_save
        
        file_events=dirSave+"/"+str(date.date())+"-"+ticker+"-events"+"."+suffix
       # pdb.set_trace()

        saved=False
        if suffix=="arrow":
            events=vaex.from_pandas(events,copy_index=True)
            events.export_arrow(file_events)
            saved=True
        if suffix=="parquet":
         #   pdb.set_trace()
            events.to_parquet(file_events,use_deprecated_int96_timestamps=True)
            saved=True
            
        if not saved:
            print("suffix "+suffix+" : format not recognized")
            
        if saveOnly:
            return saved
    return events

In [None]:

def data_to_parquet(ticker):
    if ~os.path.exists(f"data/clean/sp100_2004-8/{ticker}.parquet"):
        trade_files=glob.glob(f"data/raw/sp100_2004-8/trade/{ticker}/*.csv.gz")
        trade_files.sort()
        allpromises=[load_TRTH_trade(fn) for fn in trade_files]
        trades=dask.compute(allpromises)[0]
        trades=pd.concat(trades)

        bbo_files=glob.glob(f"data/raw/sp100_2004-8/bbo/{ticker}/*.csv.gz")
        bbo_files.sort()
        allpromises=[load_TRTH_bbo(fn) for fn in bbo_files]
        bbos=dask.compute(allpromises)[0]
        bbos=pd.concat(bbos)

        events=trades.join(bbos,how="outer")

        # Filling NaNs in 'ask_price' column with the last known value from 'ask_price' column
        events = events.replace('()', np.nan)
        events['ask-price'] = events['ask-price'].bfill()
        events['bid-price'] = events['bid-price'].bfill()
        events['ask-volume'] = events['ask-volume'].bfill()
        events['bid-volume'] = events['bid-volume'].bfill()
        events['ask-price'] = events['ask-price'].ffill()
        events['bid-price'] = events['bid-price'].ffill()
        events['ask-volume'] = events['ask-volume'].ffill()
        events['bid-volume'] = events['bid-volume'].ffill()

        events = events.dropna(subset=['trade_price'])
        events["bid-price"] = events["bid-price"].values.astype("float")
        events["bid-volume"]=events["bid-volume"].values.astype("float")
        events["ask-price"]=events["ask-price"].values.astype("float")
        events["ask-volume"]=events["ask-volume"].values.astype("float")

        events.to_parquet(f"data/clean/sp100_2004-8/{ticker}.parquet")

In [None]:
tickers = get_file_names("data/raw/sp100_2004-8/trade/")
tickers.remove('.DS_Store') #I have no '.DS_Store'file
tickers.remove('MSFT.O')
tickers.remove('ORCL.N')

for ticker in tickers:
    data_to_parquet(ticker)