# 6 Thomson-Reuters Tick History intraday data


On utilise Dask pour de manière générale ou bien pour convertir le tout en parquet file. Une fois qu'on a une bdd en parquet file, on peut utiliser Vaex pour faire des calculs dessus.

In [1]:
import pandas as pd
import numpy as np

import re
import os

import pdb



### Parallelization

In [2]:
import dask
dask.config.set(scheduler="processes")

#@dask.delayed
def load_TRTH_trade(filename,
             tz_exchange="America/New_York",
             only_non_special_trades=True,
             only_regular_trading_hours=True,
             open_time="09:30:00",
             close_time="16:00:00",
             merge_sub_trades=True):
    try:
        if re.search('(csv|csv\\.gz)$',filename):
            DF = pd.read_csv(filename)
        if re.search(r'arrow$',filename):
            DF = pd.read_arrow(filename)
        if re.search('parquet$',filename):
            DF = pd.read_parquet(filename)

    except Exception as e:
     #   print("load_TRTH_trade could not load "+filename)
     #   print(e)
        return None
    
    try:
        DF.shape
    except Exception as e: # DF does not exist
        print("DF does not exist")
        print(e)
        return None

    
    if DF.shape[0]==0:
        return None
    
    if only_non_special_trades:
        DF = DF[DF["trade-stringflag"]=="uncategorized"]

    DF.drop(columns=["trade-rawflag","trade-stringflag"],axis=1,inplace=True)
    
    DF.index = pd.to_datetime(DF["xltime"],unit="d",origin="1899-12-30",utc=True)
    DF.index = DF.index.tz_convert(tz_exchange)  # .P stands for Arca, which is based at New York
    DF.drop(columns="xltime",inplace=True)
    
    if only_regular_trading_hours:
        DF=DF.between_time(open_time,close_time)    # warning: ever heard e.g. about Thanksgivings?
    
    if merge_sub_trades:
           DF=DF.groupby(DF.index).agg(trade_price=pd.NamedAgg(column='trade-price', aggfunc='mean'),
                                       trade_volume=pd.NamedAgg(column='trade-volume', aggfunc='sum'))
    
    return DF



#@dask.delayed
def load_TRTH_bbo(filename,
             tz_exchange="America/New_York",
             only_regular_trading_hours=True,
             merge_sub_trades=True):
    try:
        if re.search(r'(csv|csv\.gz)$',filename):
            DF = pd.read_csv(filename)
        if re.search(r'arrow$',filename):
            DF = pd.read_arrow(filename)
        if re.search(r'parquet$',filename):
            DF = pd.read_parquet(filename) 
    except Exception as e:
       # print("load_TRTH_bbo could not load "+filename)
        return None
    
    try:
        DF.shape
    except Exception as e: # DF does not exist
        print("DF does not exist")
        print(e)
        return None

    if DF.shape[0]==0:
        return None
    
        
    DF.index = pd.to_datetime(DF["xltime"],unit="d",origin="1899-12-30",utc=True)
    DF.index = DF.index.tz_convert(tz_exchange)  # .P stands for Arca, which is based at New York
    DF.drop(columns="xltime",inplace=True)
    
    if only_regular_trading_hours:
        DF=DF.between_time("09:30:00","16:00:00")    # ever heard about Thanksgivings?
        
    if merge_sub_trades:
        DF=DF.groupby(DF.index).last()
    

        
    return DF

In [5]:
import vaex

@dask.delayed
def load_merge_trade_bbo(ticker,date,
                         country="US",
                         dirBase="data/raw/equities/",
                         suffix="parquet",
                         suffix_save=None,
                         dirSaveBase="data/clean/equities/events",
                         saveOnly=False,
                         doSave=False
                        ):
    
    file_trade=dirBase+"/"+country+"/trade/"+ticker+"/"+str(date.date())+"-"+ticker+"-trade."+suffix
    file_bbo=file_trade.replace("trade","bbo")
    trades=load_TRTH_trade(file_trade)
    bbos  =load_TRTH_bbo(file_bbo)
    try:
        trades.shape + bbos.shape
    except:
        return None
    
    events=trades.join(bbos,how="outer")
    
    if doSave:
        dirSave=dirSaveBase+"/"+country+"/events/"+ticker
        if not os.path.isdir(dirSave):
            os.makedirs(dirSave)

        if suffix_save:
            suffix=suffix_save
        
        file_events=dirSave+"/"+str(date.date())+"-"+ticker+"-events"+"."+suffix
       # pdb.set_trace()

        saved=False
        if suffix=="arrow":
            events=vaex.from_pandas(events,copy_index=True)
            events.export_arrow(file_events)
            saved=True
        if suffix=="parquet":
         #   pdb.set_trace()
            events.to_parquet(file_events,use_deprecated_int96_timestamps=True)
            saved=True
            
        if not saved:
            print("suffix "+suffix+" : format not recognized")
            
        if saveOnly:
            return saved
    return events

In [3]:
from datetime import datetime

#ticker="SPY.P"
ticker = "AA.N"

startDate="2008-01-01" #"2010-01-01"
endDate="2008-12-31" #2010-12-31"

datelist = pd.date_range(startDate,endDate).tolist()

In [6]:
%time allpromises=[load_merge_trade_bbo(ticker,date,saveOnly=True,doSave=True,suffix="parquet",suffix_save="parquet") for date in datelist]

CPU times: total: 31.2 ms
Wall time: 44.9 ms


Thus, it takes almost no time at all to create execution promises. Let us check that we really have promises:

In [10]:
allpromises[124]

Delayed('load_merge_trade_bbo-3f0460e2-a62e-4a48-b452-007597770cc8')

To actually perform a computation, simply call the compute() function

In [11]:
%%time
allpromises[0].compute()

CPU times: total: 46.9 ms
Wall time: 2.09 s


Now, let us load all the files in a parallel way

In [12]:
%%time
alldata=dask.compute(allpromises) 

CPU times: total: 2.08 s
Wall time: 20.3 s


In [16]:
alldata['returns'] = alldata['ask-price'].pct_change()

TypeError: tuple indices must be integers or slices, not str

#### Delayed, other ways


There are alternative ways to delay a function: use dask.delayed(some_function) directly. 

In [17]:
# define the folder paths
data_folder = 'data/raw/trade/AA.N'

# get a list of all the files in the raw folder
#files = os.listdir(data_folder)

import glob #alternative to os
allfiles=glob.glob(data_folder+"/*")

In [20]:
allpromises=[dask.delayed(pd.read_parquet)(fn) for fn in allfiles]

In [22]:
allpromises.compute()

AttributeError: 'list' object has no attribute 'compute'

or defined a delayed version of a function

In [75]:
load_TRTH_trade_delayed=dask.delayed(load_TRTH_trade)

In [76]:
del alldata  # cleanup




 

### Merge trades and bbo data

If one wishes to create a single dataframe, then one can proceeed in the following way. Note that it is not needed if one uses VAEX that can aggregate several files in a single virtual dataframe (see week 8).

In [77]:
import glob

trade_files=glob.glob("Data/raw/TRTH/equities/trade/AA.N/2008*")
trade_files.sort()

allpromises=[load_TRTH_trade(fn) for fn in trade_files]
trades=dask.compute(allpromises)[0]

trades=pd.concat(trades)

In [78]:
trades

Unnamed: 0_level_0,trade_price,trade_volume
xltime,Unnamed: 1_level_1,Unnamed: 2_level_1
2008-01-02 09:30:42.321000192-05:00,36.4550,39500.0
2008-01-02 09:30:42.540999936-05:00,36.4800,1000.0
2008-01-02 09:30:43.261999872-05:00,36.4675,600.0
2008-01-02 09:30:43.882000128-05:00,36.5000,400.0
2008-01-02 09:30:44.991000064-05:00,36.4800,400.0
...,...,...
2008-12-31 15:59:58.225000192-05:00,11.2600,100.0
2008-12-31 15:59:58.387000064-05:00,11.2500,100.0
2008-12-31 15:59:58.431000064-05:00,11.2600,200.0
2008-12-31 15:59:58.999999744-05:00,11.2500,100.0


In [79]:
bbo_files=glob.glob("Data/raw/TRTH/equities/bbo/AA.N/2008*")
bbo_files.sort()

allpromises=[load_TRTH_bbo(fn) for fn in bbo_files]
bbos=dask.compute(allpromises)[0]

bbos=pd.concat(bbos)

In [80]:
bbos

Unnamed: 0_level_0,bid-price,bid-volume,ask-price,ask-volume
xltime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2008-01-02 09:30:41.558999808-05:00,36.43,24,36.47,6
2008-01-02 09:30:41.748000-05:00,36.43,24,36.47,5
2008-01-02 09:30:41.920999936-05:00,36.43,24,36.47,7
2008-01-02 09:30:41.932000-05:00,36.43,24,36.47,1
2008-01-02 09:30:41.963000064-05:00,36.43,24,36.50,2
...,...,...,...,...
2008-12-31 15:59:59.031999488-05:00,11.25,85,11.26,87
2008-12-31 15:59:59.245999616-05:00,11.25,82,11.26,87
2008-12-31 15:59:59.440000-05:00,11.25,82,11.26,77
2008-12-31 15:59:59.503999744-05:00,11.25,82,11.26,77


In [81]:
%time events=trades.join(bbos,how="outer")

CPU times: total: 4.22 s
Wall time: 4.53 s


In [82]:
events.shape

(13103860, 6)

In [83]:
events

Unnamed: 0_level_0,trade_price,trade_volume,bid-price,bid-volume,ask-price,ask-volume
xltime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
2008-01-02 09:30:41.558999808-05:00,,,36.43,24.0,36.47,6.0
2008-01-02 09:30:41.748000-05:00,,,36.43,24.0,36.47,5.0
2008-01-02 09:30:41.920999936-05:00,,,36.43,24.0,36.47,7.0
2008-01-02 09:30:41.932000-05:00,,,36.43,24.0,36.47,1.0
2008-01-02 09:30:41.963000064-05:00,,,36.43,24.0,36.50,2.0
...,...,...,...,...,...,...
2008-12-31 15:59:59.245999616-05:00,,,11.25,82.0,11.26,87.0
2008-12-31 15:59:59.432999424-05:00,11.26,1000.0,,,,
2008-12-31 15:59:59.440000-05:00,,,11.25,82.0,11.26,77.0
2008-12-31 15:59:59.503999744-05:00,,,11.25,82.0,11.26,77.0


We are entering into the realms of big data. Let us save this object

In [84]:
# before saving a parquet object, we need to ensure that the columns are in numeric format
events["bid-price"]=events["bid-price"].values.astype("float")
events["bid-volume"]=events["bid-volume"].values.astype("float")
events["ask-price"]=events["ask-price"].values.astype("float")
events["ask-volume"]=events["ask-volume"].values.astype("float")

#so far, one still needs to add the use_deprectated_int96_timestamps option
#create new folder if needed
import os
if not os.path.isdir("Data/clean/TRTH/equities/events"):
    os.makedirs("Data/clean/TRTH/equities/events")
events.to_parquet("Data/clean/TRTH/equities/events/AA.N_2008_events.parquet",use_deprecated_int96_timestamps=True,compression="brotli")


### VAEX to the rescue

In [85]:
try:
    if events.shape:
        del events
except:
    print("")

In [111]:
import vaex

#df=vaex.open("Data/clean/TRTH/equities/events/AA.N/20[01][08]*arrow")
df = vaex.open("Data/clean/TRTH/equities/events/AA.N_2008_events.parquet")



In [None]:
df.export("SPY_2009-2010_events.arrow",compression="brotli")   # 20Gb uncompressed

"For each" in a task ==> use group_by

Tasks of Week 7:
Choose a US asset from the intraday data db (data sur disque dur), use all the les
 
Plot the average spread (askbid) spread per bar of 5
minutes
Same thing for the relative spread (askbid)/mid price
Plot the average time between two transactions per bar of
5 minutes
Use a median instead. Do you see any difference?
Compare with a European equity. What happens at 14:30
or 15:30?

In [123]:
#load clean (intraday) data
df = vaex.open("Data/clean/TRTH/equities/events/AA.N_2008_events.parquet")
#df['xltime'] = df['xltime'].astype('float')

In [124]:
#Concert to pandas dataframe
df = df.to_pandas_df()
#to date time
df['xltime'] = pd.to_datetime(df['xltime'], unit='d', origin='1899-12-30')
df.head()


ValueError: '0          2008-01-02 14:30:41.558999808
1          2008-01-02 14:30:41.748000000
2          2008-01-02 14:30:41.920999936
3          2008-01-02 14:30:41.932000000
4          2008-01-02 14:30:41.963000064
                        ...             
13103855   2008-12-31 20:59:59.245999616
13103856   2008-12-31 20:59:59.432999424
13103857   2008-12-31 20:59:59.440000000
13103858   2008-12-31 20:59:59.503999744
13103859   2008-12-31 20:59:59.804999936
Name: xltime, Length: 13103860, dtype: datetime64[ns]' is not compatible with origin='1899-12-30'; it must be numeric with a unit specified

In [121]:
df['mid_pprice'] = (df['bid-price'] + df['ask-price'])/2

df.set_index

In [None]:
#Plotting the signature plot
#sampling_interval
dt = 5 #seconds


In [116]:

##DOES NOT WORK 
from vaex import column


df['spread'] = df['ask-price'] - df['bid-price']

print(type(df)) # it is a vaex dataframe
#group_by bars of 5 minutes
df['seconds'] = (df['xltime'] % 1) * 86400
step = 5
df['div_delta'] = df['seconds'] // step 
print(type(df['div_delta']))



<class 'vaex.dataframe.DataFrameLocal'>
<class 'vaex.expression.Expression'>


ValueError: #           trade_price    trade_volume    bid-price    bid-volume    ask-price    ask-volume    spread                div_delta    seconds    xltime
0           --             --              36.43        24.0          36.47        6.0           0.03999999999999915   error        0.0        1.1992842415589998e+18
1           --             --              36.43        24.0          36.47        5.0           0.03999999999999915   error        0.0        1.199284241748e+18
2           --             --              36.43        24.0          36.47        7.0           0.03999999999999915   error        0.0        1.199284241921e+18
3           --             --              36.43        24.0          36.47        1.0           0.03999999999999915   error        0.0        1.199284241932e+18
4           --             --              36.43        24.0          36.5         2.0           0.07000000000000028   error        0.0        1.199284241963e+18
...         ...            ...             ...          ...           ...          ...           ...                   ...          ...        ...
13,103,855  --             --              11.25        82.0          11.26        87.0          0.009999999999999787  error        0.0        1.2307571992459996e+18
13,103,856  11.26          1000.0          --           --            --           --            --                    error        0.0        1.2307571994329994e+18
13,103,857  --             --              11.25        82.0          11.26        77.0          0.009999999999999787  error        0.0        1.23075719944e+18
13,103,858  --             --              11.25        82.0          11.26        77.0          0.009999999999999787  error        0.0        1.2307571995039997e+18
13,103,859  --             --              11.25        82.0          11.26        92.0          0.009999999999999787  error        0.0        1.230757199805e+18 is not of string or Expression type, but <class 'vaex.dataframe.DataFrameLocal'>

.resample('5T'): This part of the code is responsible for the resampling process. It takes the DataFrame df and resamples it based on a time interval of 5 minutes. The '5T' is a string format specifying a 5-minute interval. The 'T' stands for "minutes." You can use other frequency strings to specify different time intervals (e.g., 'H' for hours, 'D' for days, etc.).