# 6 Thomson-Reuters Tick History intraday data


In [25]:
import pandas as pd
import numpy as np

import re
import os

import pdb

if 'Labs' in os.getcwd():
    os.chdir('..')

print(os.getcwd())

/Users/macos/Financial-Big-Data


### Parallelization

In [26]:
import dask
dask.config.set(scheduler="processes")

@dask.delayed
def load_TRTH_trade(filename,
             tz_exchange="America/New_York",
             only_non_special_trades=True,
             only_regular_trading_hours=True,
             open_time="09:30:00",
             close_time="16:00:00",
             merge_sub_trades=True):
    try:
        if re.search('(csv|csv\\.gz)$',filename):
            DF = pd.read_csv(filename)
        if re.search(r'arrow$',filename):
            DF = pd.read_arrow(filename)
        if re.search('parquet$',filename):
            DF = pd.read_parquet(filename)

    except Exception as e:
     #   print("load_TRTH_trade could not load "+filename)
     #   print(e)
        return None
    
    try:
        DF.shape
    except Exception as e: # DF does not exist
        print("DF does not exist")
        print(e)
        return None

    
    if DF.shape[0]==0:
        return None
    
    if only_non_special_trades:
        DF = DF[DF["trade-stringflag"]=="uncategorized"]

    DF.drop(columns=["trade-rawflag","trade-stringflag"],axis=1,inplace=True)
    
    DF.index = pd.to_datetime(DF["xltime"],unit="d",origin="1899-12-30",utc=True)
    DF.index = DF.index.tz_convert(tz_exchange)  # .P stands for Arca, which is based at New York
    DF.drop(columns="xltime",inplace=True)
    
    if only_regular_trading_hours:
        DF=DF.between_time(open_time,close_time)    # warning: ever heard e.g. about Thanksgivings?
    
    if merge_sub_trades:
           DF=DF.groupby(DF.index).agg(trade_price=pd.NamedAgg(column='trade-price', aggfunc='mean'),
                                       trade_volume=pd.NamedAgg(column='trade-volume', aggfunc='sum'))
    
    return DF



@dask.delayed
def load_TRTH_bbo(filename,
             tz_exchange="America/New_York",
             only_regular_trading_hours=True,
             merge_sub_trades=True):
    try:
        if re.search(r'(csv|csv\.gz)$',filename):
            DF = pd.read_csv(filename)
        if re.search(r'arrow$',filename):
            DF = pd.read_arrow(filename)
        if re.search(r'parquet$',filename):
            DF = pd.read_parquet(filename) 
    except Exception as e:
       # print("load_TRTH_bbo could not load "+filename)
        return None
    
    try:
        DF.shape
    except Exception as e: # DF does not exist
        print("DF does not exist")
        print(e)
        return None

    if DF.shape[0]==0:
        return None
    
        
    DF.index = pd.to_datetime(DF["xltime"],unit="d",origin="1899-12-30",utc=True)
    DF.index = DF.index.tz_convert(tz_exchange)  # .P stands for Arca, which is based at New York
    DF.drop(columns="xltime",inplace=True)
    
    if only_regular_trading_hours:
        DF=DF.between_time("09:30:00","16:00:00")    # ever heard about Thanksgivings?
        
    if merge_sub_trades:
        DF=DF.groupby(DF.index).last()
    

        
    return DF

In [27]:
import vaex

@dask.delayed
def load_merge_trade_bbo(ticker,date,
                         country="US",
                         dirBase="data/raw/TRTH/equities/",
                         suffix="parquet",
                         suffix_save=None,
                         dirSaveBase="data/clean/TRTH/equities/events",
                         saveOnly=False,
                         doSave=False
                        ):
    
    file_trade=dirBase+"/"+country+"/trade/"+ticker+"/"+str(date.date())+"-"+ticker+"-trade."+suffix
    file_bbo=file_trade.replace("trade","bbo")
    trades=load_TRTH_trade(file_trade)
    bbos  =load_TRTH_bbo(file_bbo)
    try:
        trades.shape + bbos.shape
    except:
        return None
    
    events=trades.join(bbos,how="outer")
    
    if doSave:
        dirSave=dirSaveBase+"/"+country+"/events/"+ticker
        if not os.path.isdir(dirSave):
            os.makedirs(dirSave)

        if suffix_save:
            suffix=suffix_save
        
        file_events=dirSave+"/"+str(date.date())+"-"+ticker+"-events"+"."+suffix
       # pdb.set_trace()

        saved=False
        if suffix=="arrow":
            events=vaex.from_pandas(events,copy_index=True)
            events.export_arrow(file_events)
            saved=True
        if suffix=="parquet":
         #   pdb.set_trace()
            events.to_parquet(file_events,use_deprecated_int96_timestamps=True)
            saved=True
            
        if not saved:
            print("suffix "+suffix+" : format not recognized")
            
        if saveOnly:
            return saved
    return events

In [34]:
from datetime import datetime

ticker="LMT.N"

startDate="2010-01-01"
endDate="2010-12-31"

datelist = pd.date_range(startDate,endDate).tolist()

In [35]:
%time allpromises=[load_merge_trade_bbo(ticker,date,saveOnly=True,doSave=True,suffix="parquet",suffix_save="parquet") for date in datelist]

CPU times: user 41.1 ms, sys: 5.38 ms, total: 46.5 ms
Wall time: 54.8 ms


Thus, it takes almost no time at all to create execution promises. Let us check that we really have promises:

In [36]:
allpromises[0]

Delayed('load_merge_trade_bbo-cc3a453f-a763-4f3c-ae64-52b1f6e6f99d')

To actually perform a computation, simply call the compute() function

In [37]:
%%time
allpromises[0].compute()

CPU times: user 10.5 ms, sys: 9.69 ms, total: 20.2 ms
Wall time: 4.41 s


True

Now, let us load all the files in a parallel way

In [38]:
%%time
alldata=dask.compute(allpromises) 

CPU times: user 1.03 s, sys: 51 ms, total: 1.08 s
Wall time: 7.86 s


In [39]:
alldata

([True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,
  True,


#### Delayed, other ways


There are alternative ways to delay a function: use dask.delayed(some_function) directly. 

In [10]:
allpromises=[dask.delayed(pd.read_csv)(fn) for fn in alldata]

or defined a delayed version of a function

In [11]:
load_TRTH_trade_delayed=dask.delayed(load_TRTH_trade)

In [12]:
del alldata  # cleanup




 

### Merge trades and bbo data

If one wishes to create a single dataframe, then one can proceeed in the following way. Note that it is not needed if one uses VAEX that can aggregate several files in a single virtual dataframe (see week 8).

In [51]:
import glob

trade_files=glob.glob("data/raw/TRTH/equities/US/trade/LMT.N/2005*")
trade_files.sort()

allpromises=[load_TRTH_trade(fn) for fn in trade_files]
trades=dask.compute(allpromises)[0]

trades=pd.concat(trades)

ValueError: No objects to concatenate

In [None]:
bbo_files=glob.glob("data/raw/TRTH/equities/US/bbo/SPY.P/2009*")
bbo_files.sort()

allpromises=[load_TRTH_bbo(fn) for fn in bbo_files]
bbos=dask.compute(allpromises)[0]

bbos=pd.concat(bbos)

ValueError: No objects to concatenate

In [None]:
%time events=trades.join(bbos,how="outer")

AttributeError: 'list' object has no attribute 'join'

In [None]:
events.shape

NameError: name 'events' is not defined

We are entering into the realms of big data. Let us save this object

In [None]:
# before saving a parquet object, we need to ensure that the columns are in numeric format
events["bid-price"]=events["bid-price"].values.astype("float")
events["bid-volume"]=events["bid-volume"].values.astype("float")
events["ask-price"]=events["ask-price"].values.astype("float")
events["ask-volume"]=events["ask-volume"].values.astype("float")

#so far, one still needs to add the use_deprectated_int96_timestamps option
events.to_parquet("SPY_2009_events.parquet",use_deprecated_int96_timestamps=True,compression="brotli")

### VAEX to the rescue

In [None]:
try:
    if events.shape:
        del events
except:
    print("")




In [53]:
import vaex

df=vaex.open("data/clean/TRTH/equities/events/US/events/LMT.N/20[04][08]*arrow")
df

OSError: File pattern did not match anything data/clean/TRTH/equities/events/US/events/LMT.N/20[04][08]*arrow

In [None]:
df.export("SPY_2009-2010_events.arrow",compression="brotli")   # 20Gb uncompressed