In [1]:
import pandas as pd
import dask
import re
import glob
import os
import vaex

In [2]:
dask.config.set(scheduler="processes")

#@dask.delayed
def load_TRTH_trade(filename,
             tz_exchange="America/New_York",
             only_non_special_trades=True,
             only_regular_trading_hours=True,
             open_time="09:30:00",
             close_time="16:00:00",
             merge_sub_trades=True):
    try:
        if re.search('(csv|csv\\.gz)$',filename):
            DF = pd.read_csv(filename)
        if re.search(r'arrow$',filename):
            DF = pd.read_arrow(filename)
        if re.search('parquet$',filename):
            DF = pd.read_parquet(filename)

    except Exception as e:
     #   print("load_TRTH_trade could not load "+filename)
     #   print(e)
        return None
    
    try:
        DF.shape
    except Exception as e: # DF does not exist
        print("DF does not exist")
        print(e)
        return None

    
    if DF.shape[0]==0:
        return None
    
    if only_non_special_trades:
        DF = DF[DF["trade-stringflag"]=="uncategorized"]

    DF.drop(columns=["trade-rawflag","trade-stringflag"],axis=1,inplace=True)
    
    DF.index = pd.to_datetime(DF["xltime"],unit="d",origin="1899-12-30",utc=True)
    DF.index = DF.index.tz_convert(tz_exchange)  # .P stands for Arca, which is based at New York
    DF.drop(columns="xltime",inplace=True)
    
    if only_regular_trading_hours:
        DF=DF.between_time(open_time,close_time)    # warning: ever heard e.g. about Thanksgivings?
    
    if merge_sub_trades:
           DF=DF.groupby(DF.index).agg(trade_price=pd.NamedAgg(column='trade-price', aggfunc='mean'),
                                       trade_volume=pd.NamedAgg(column='trade-volume', aggfunc='sum'))
    
    return DF



#@dask.delayed
def load_TRTH_bbo(filename,
             tz_exchange="America/New_York",
             only_regular_trading_hours=True,
             merge_sub_trades=True):
    try:
        if re.search(r'(csv|csv\.gz)$',filename):
            DF = pd.read_csv(filename)
        if re.search(r'arrow$',filename):
            DF = pd.read_arrow(filename)
        if re.search(r'parquet$',filename):
            DF = pd.read_parquet(filename) 
    except Exception as e:
       # print("load_TRTH_bbo could not load "+filename)
        return None
    
    try:
        DF.shape
    except Exception as e: # DF does not exist
        print("DF does not exist")
        print(e)
        return None

    if DF.shape[0]==0:
        return None
    
        
    DF.index = pd.to_datetime(DF["xltime"],unit="d",origin="1899-12-30",utc=True)
    DF.index = DF.index.tz_convert(tz_exchange)  # .P stands for Arca, which is based at New York
    DF.drop(columns="xltime",inplace=True)
    
    if only_regular_trading_hours:
        DF=DF.between_time("09:30:00","16:00:00")    # ever heard about Thanksgivings?
        
    if merge_sub_trades:
        DF=DF.groupby(DF.index).last()
    

        
    return DF

In [3]:
bbo_files=glob.glob("../data/flash_crash_DJIA/US/bbo/*/*")
trade_files=glob.glob("../data/flash_crash_DJIA/US/trade/*/*")
bbo_files.sort()
trade_files.sort()


securities = list(zip(bbo_files, trade_files))

In [4]:
paths=pd.DataFrame(securities, columns=["BBO_file", "TRADE_file"])

In [5]:
paths["ticker"] = paths["BBO_file"].apply(lambda x: re.match(".*\/bbo\/(.*)\..*\/.*", x).groups()[0])

In [6]:
paths["date"] = paths.apply(lambda x: re.match(f".*\/(.*)-{x['ticker']}.*", x["BBO_file"]).groups()[0], axis=1)

In [7]:
paths

Unnamed: 0,BBO_file,TRADE_file,ticker,date
0,../data/flash_crash_DJIA/US/bbo/AAPL.OQ/2010-0...,../data/flash_crash_DJIA/US/trade/AAPL.OQ/2010...,AAPL,2010-01-01
1,../data/flash_crash_DJIA/US/bbo/AAPL.OQ/2010-0...,../data/flash_crash_DJIA/US/trade/AAPL.OQ/2010...,AAPL,2010-01-04
2,../data/flash_crash_DJIA/US/bbo/AAPL.OQ/2010-0...,../data/flash_crash_DJIA/US/trade/AAPL.OQ/2010...,AAPL,2010-01-05
3,../data/flash_crash_DJIA/US/bbo/AAPL.OQ/2010-0...,../data/flash_crash_DJIA/US/trade/AAPL.OQ/2010...,AAPL,2010-01-06
4,../data/flash_crash_DJIA/US/bbo/AAPL.OQ/2010-0...,../data/flash_crash_DJIA/US/trade/AAPL.OQ/2010...,AAPL,2010-01-07
...,...,...,...,...
7825,../data/flash_crash_DJIA/US/bbo/WMT.N/2010-12-...,../data/flash_crash_DJIA/US/trade/WMT.N/2010-1...,WMT,2010-12-27
7826,../data/flash_crash_DJIA/US/bbo/WMT.N/2010-12-...,../data/flash_crash_DJIA/US/trade/WMT.N/2010-1...,WMT,2010-12-28
7827,../data/flash_crash_DJIA/US/bbo/WMT.N/2010-12-...,../data/flash_crash_DJIA/US/trade/WMT.N/2010-1...,WMT,2010-12-29
7828,../data/flash_crash_DJIA/US/bbo/WMT.N/2010-12-...,../data/flash_crash_DJIA/US/trade/WMT.N/2010-1...,WMT,2010-12-30


In [8]:
@dask.delayed
def load_merge_trade_bbo(ticker,date, bbo_path, trade_path,
                         country="US",
                         dirBase="data/flash_crash_DJIA/US",
                         suffix="parquet",
                         suffix_save=None,
                         dirSaveBase="data/clean/flash_crash_DJIA/US/events",
                         saveOnly=False,
                         doSave=False
                        ):
    

    trades = load_TRTH_trade(trade_path)
    bbos  = load_TRTH_bbo(bbo_path)
    try:
        trades.shape + bbos.shape
    except:
        return None
    
    events=trades.join(bbos,how="outer")
    
    if doSave:
        dirSave=dirSaveBase+"/"+country+"/events/"+ticker
        if not os.path.isdir(dirSave):
            os.makedirs(dirSave)

        if suffix_save:
            suffix=suffix_save
        
        file_events=dirSave+"/"+date+"-"+ticker+"-events"+"."+suffix
       # pdb.set_trace()

        saved=False
        if suffix=="arrow":
            events=vaex.from_pandas(events,copy_index=True)
            events.export_arrow(file_events)
            saved=True
        if suffix=="parquet":
         #   pdb.set_trace()
            events.to_parquet(file_events,use_deprecated_int96_timestamps=True)
            saved=True
            
        if not saved:
            print("suffix "+suffix+" : format not recognized")
            
        if saveOnly:
            return saved
    return events

In [9]:
paths

Unnamed: 0,BBO_file,TRADE_file,ticker,date
0,../data/flash_crash_DJIA/US/bbo/AAPL.OQ/2010-0...,../data/flash_crash_DJIA/US/trade/AAPL.OQ/2010...,AAPL,2010-01-01
1,../data/flash_crash_DJIA/US/bbo/AAPL.OQ/2010-0...,../data/flash_crash_DJIA/US/trade/AAPL.OQ/2010...,AAPL,2010-01-04
2,../data/flash_crash_DJIA/US/bbo/AAPL.OQ/2010-0...,../data/flash_crash_DJIA/US/trade/AAPL.OQ/2010...,AAPL,2010-01-05
3,../data/flash_crash_DJIA/US/bbo/AAPL.OQ/2010-0...,../data/flash_crash_DJIA/US/trade/AAPL.OQ/2010...,AAPL,2010-01-06
4,../data/flash_crash_DJIA/US/bbo/AAPL.OQ/2010-0...,../data/flash_crash_DJIA/US/trade/AAPL.OQ/2010...,AAPL,2010-01-07
...,...,...,...,...
7825,../data/flash_crash_DJIA/US/bbo/WMT.N/2010-12-...,../data/flash_crash_DJIA/US/trade/WMT.N/2010-1...,WMT,2010-12-27
7826,../data/flash_crash_DJIA/US/bbo/WMT.N/2010-12-...,../data/flash_crash_DJIA/US/trade/WMT.N/2010-1...,WMT,2010-12-28
7827,../data/flash_crash_DJIA/US/bbo/WMT.N/2010-12-...,../data/flash_crash_DJIA/US/trade/WMT.N/2010-1...,WMT,2010-12-29
7828,../data/flash_crash_DJIA/US/bbo/WMT.N/2010-12-...,../data/flash_crash_DJIA/US/trade/WMT.N/2010-1...,WMT,2010-12-30


In [14]:
events={}
for ticker in paths.ticker.unique()[:2]:
    all_promises=paths[paths["ticker"]==ticker].apply(lambda x: load_merge_trade_bbo(x['ticker'], x['date'], x['BBO_file'], x['TRADE_file']), axis=1)
    events[ticker] = dask.compute(all_promises.values.tolist())[0]

In [10]:
#all_promises=paths.apply(lambda x: load_merge_trade_bbo(x['ticker'], x['date'], x['BBO_file'], x['TRADE_file']), axis=1)

In [11]:
#events=dask.compute(all_promises.values.tolist())[0]

In [12]:
#pd.concat(events)