In [1]:
import sys, os
import glob
import pandas as pd
import numpy as np
from tqdm.auto import tqdm
from multiprocessing import Pool
from pandas.tseries.offsets import MonthEnd, YearEnd
import scipy.io
import wrds
from functools import reduce
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.dataset as ds
import datetime as dt
import logging

# Params

In [2]:
# Main directory
main_folder = '../../../../GitHub/High-Freq-ML'

# Folders are relative to main directory
crsp_folder = f'{main_folder}/data/crsp/daily/'
compustat_folder = f'{main_folder}/data/compustat/daily/'
taq_price_folder = f'{main_folder}/data/taq/prices/'
output_folder = f'{main_folder}/data/proc/clean_prices_etfs/'

# Identify Data Files

In [3]:
# Get list of TAQ files
taq_price_files = glob.glob(taq_price_folder + '*.parquet')
taq_price_files_dates = [x.split('/')[-1].split('_')[0].split('.')[0] for x in taq_price_files]
taq_price_files_dates = list(set(taq_price_files_dates))

# Get list of CRSP files
crsp_files = glob.glob(crsp_folder + '*.parquet')

# Start logging

In [4]:
# Prepare logging file
log_filename = "clean_prices.log"
if os.path.exists(log_filename):
    os.remove(log_filename)

# Logging configuration
formatter = logging.Formatter(
    "[{asctime}] — [{funcName:12.12}] — [{levelname:<8s} — Line {lineno:4d}]: {message}",
    style="{",
)
# logging.basicConfig(stream=None, level=logging.INFO)
logger = logging.getLogger()
handler = logging.FileHandler(filename=log_filename)
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.propagate = False
logging.info("Logging started")

# Load Data 

In [5]:
# ## Stock info

# # For reuse
# stock_info_df = pd.read_feather('../../data/keys/stock_universe.feather')
# stock_info_df['jdate'] = pd.to_datetime(stock_info_df['dt'].dt.date) + MonthEnd(0)
# stock_info_df['jdate_ym'] = stock_info_df['jdate'].dt.strftime('%Y%m')

## Delisting Returns

In [6]:
## Delisting returns
conn = wrds.Connection(**{"wrds_username": "sa400"})
crspmsedelist_df = conn.raw_sql("""
                    select DLSTDT, PERMNO, dlret
                   from crsp.msedelist
                    """)
crspmsedelist_df['date'] = pd.to_datetime(crspmsedelist_df['dlstdt'])

Loading library list...
Done


## Lagged ME

In [7]:
%%time
crsp_me_df = pd.read_feather(f'{main_folder}/data/keys/crsp_me.feather')
crsp_me_df['permno'] = crsp_me_df['permno'].astype(int)

CPU times: user 5.11 s, sys: 12.8 s, total: 17.9 s
Wall time: 8.96 s


# Process Data

## Functions

In [8]:
def clean_crsp(yyyymm):
    
    # Read in data
    crsp_df = pd.concat([pd.read_parquet(x) for x in crsp_files if f'/{yyyymm}' in x],
                       ignore_index = True)

    # Clean up columns
    crsp_df.columns = [x.lower() for x in crsp_df.columns]
    crsp_df[['ret', 'retx']] = crsp_df[['ret', 'retx']].apply(pd.to_numeric, errors = 'coerce')

    # Fix dates
    crsp_df['date'] = pd.to_datetime(crsp_df['date'], format = '%Y%m%d')

    # Get close and open prices
    crsp_df['prc'] = np.abs(crsp_df['prc'])
    crsp_df['openprc'] = np.abs(crsp_df['openprc']).fillna(crsp_df['prc'])

    # Catch nonsensical open prices: rule is to find cases where close price is 
    # 5 times larger than open price while close-to-close return  is less than 
    # 90% in magnitude
    crsp_df = crsp_df.sort_values(by = 'date')
    crsp_df['prc_lead'] = crsp_df.groupby(['permno'])['prc'].shift(-1)
    crsp_df.loc[
        crsp_df.query("prc/openprc > 5 & abs(ret) < 0.90").index, "openprc"
    ] = crsp_df.loc[crsp_df.query("prc/openprc > 5 & abs(ret) < 0.90").index, "prc"]

    # Infer close-to-open adjusted overnight returns 
    crsp_df['ret_open_close_intraday'] = (crsp_df['prc']-crsp_df['openprc'])/crsp_df['openprc']
    crsp_df['ret_close_open_adj'] = (1+crsp_df['ret'])/(1+crsp_df['ret_open_close_intraday']) - 1
    crsp_df['retx_close_open_adj'] = (1+crsp_df['retx'])/(1+crsp_df['ret_open_close_intraday']) - 1

    # Add lagged market equity
    crsp_me_subset_df = crsp_me_df.loc[crsp_me_df['date'].between(
        a:=pd.to_datetime(yyyymm, format = '%Y%m'), a + MonthEnd(1))]
    crsp_df = crsp_df.merge(crsp_me_subset_df, on = ['permno', 'date'], how = 'left')

    # Filter by share/exchange code and whether stock is primary
    proc_df = crsp_df.copy()

    # Hand-cleaning, no easy way to deal with these 'bad' permnos, so just drop
    proc_df = proc_df.loc[~proc_df['permno'].isin([90806, 83712, 47387])]
    if int(yyyymm[:4]) < 2018:
        proc_df = proc_df.loc[~proc_df['permno'].isin([15293])]
    if int(yyyymm) == 200301:
        proc_df = proc_df.loc[~proc_df['permno'].isin([87658])]

    # Create dataframes for start and end of the day
    crsp_df_start = proc_df.copy()
    crsp_df_end = proc_df.copy()
    crsp_df_end = crsp_df_end.merge(crspmsedelist_df, on = ['date', 'permno'], how = 'left')

    crsp_df_start['time'] = '09:30:00'
    crsp_df_end['time'] = '16:00:00'

    crsp_df_start['price'] = crsp_df_start['openprc']
    crsp_df_end['price'] = crsp_df_end['prc']

    crsp_df_start['return'] = crsp_df_start['ret_close_open_adj']
    crsp_df_end['return'] = (1+crsp_df_end['ret_open_close_intraday'].fillna(0))*(1+crsp_df_end['dlret'].fillna(0))-1

    crsp_df_start['returnx'] = crsp_df_start['retx_close_open_adj']
    crsp_df_end['returnx'] = crsp_df_end['ret_open_close_intraday']

    crsphf_df = pd.concat([crsp_df_start, crsp_df_end], ignore_index = True)

    # Add datetime info
    crsphf_df['datetime'] = crsphf_df['date'] + pd.to_timedelta(crsphf_df['time'])
    crsphf_df['yyyymm'] = crsphf_df['date'].dt.strftime('%Y%m').astype(int)    

    return crsphf_df[['datetime', 'date', 'permno', 'cusip', 'price', 'return', 'returnx', 'dlret', 'meq_close_lag', 'me_close_lag']]

In [9]:
# List of all times to include
all_times = [
    (pd.to_datetime("09:30:00") + pd.Timedelta(f"{x*5}min")).strftime("%H:%M:%S")
    for x in range(0, 79)
]
all_times_taq = [x.replace("09", "9") for x in all_times]


def clean_taq(date):

    # Get TAQ files
    taq_df = pd.read_parquet(
        taq_price_folder + date + ".parquet", columns=["permno", "symbol", "date", "time", "price"]
    )

    # Drop "when issued" shares
    taq_df = (
        taq_df.assign(symbol_last_2=taq_df["symbol"].str.slice(-2))
        .query('symbol_last_2 != "WI"')
        .drop("symbol_last_2", axis=1)
        .copy()
    )

    # Drop cases with missing permnos
    taq_df = taq_df.loc[taq_df["permno"].str.isnumeric()]
    taq_df["permno"] = pd.to_numeric(taq_df["permno"]).astype(int)

    # Handle any missing times
    index = pd.MultiIndex.from_product(
        [taq_df["permno"].unique(), all_times_taq], names=["permno", "time"]
    )
    index_df = pd.DataFrame(index=index).reset_index()
    taq_df = (
        taq_df.merge(index_df, on=["permno", "time"], how="right")
        .sort_values(by=["permno"])
        .astype({"time": "category"})
    )
    taq_df = taq_df.sort_values(by=["permno", "time"])
    
    # Catch scenario where there are less than full set of 
    # prices in day (Thanksgiving, Christmas, etc)
    valid_times = taq_df[['time', 'price']].dropna(axis=0)['time'].unique()
    if len(valid_times) < len(all_times):
        logging.warning(f'[{date}] This date has only {len(valid_times)} valid times')

    taq_df = taq_df.loc[taq_df['time'].isin(list(valid_times))]

    # Forward fill in entries
    ffill_cols = ["price"]  # 'cusip9', 'symbol', 'ticker_identifier'
    taq_df[ffill_cols] = taq_df.groupby(["permno"])[ffill_cols].ffill()
    taq_df["date"] = int(date)

    # Add date
    taq_df["datetime"] = pd.to_datetime(taq_df["date"], format="%Y%m%d") + pd.to_timedelta(
        taq_df["time"]
    )
    
    # Drop cases where realized quarticity for the day is extremely high, just a 
    # crude way of catching very bad data
    taq_df = taq_df.sort_values(by=["permno", "datetime"])
    taq_df['price_pct_change'] = taq_df.groupby(['permno'])['price'].transform('pct_change').fillna(0)
    taq_df["log_ret_4"] = np.power(np.log(1 + taq_df["price_pct_change"]), 4)
    taq_df = taq_df[
        ~taq_df["permno"].isin(
            pd.DataFrame(taq_df.groupby(["permno"])["log_ret_4"].sum() > 1).query("log_ret_4").index
        )
    ]

    # Sort
    taq_df = taq_df.sort_values(by=["permno", "datetime"]).reset_index(drop=True)

    return taq_df[["permno", "datetime", "price"]].drop_duplicates()

In [10]:
def add_value_weights(rm_df):

    # Add value weights
    rm_df = rm_df.sort_values(by=["permno", "datetime"]).reset_index(drop=True)
    rm_df["1+retx"] = rm_df["returnx"].fillna(0) + 1
    rm_df["cumretx"] = rm_df.groupby(["permno", "date"])["1+retx"].cumprod()
    rm_df["meq_close_lag_times_cumretx"] = rm_df["meq_close_lag"] * rm_df["cumretx"]
    rm_df["me_close_lag_times_cumretx"] = rm_df["me_close_lag"] * rm_df["cumretx"]
    rm_df["value_wt"] = rm_df.groupby(["permno"])["meq_close_lag_times_cumretx"].shift(1)
    rm_df["value_wt"] = rm_df["value_wt"].fillna(rm_df["meq_close_lag"])
    rm_df["value_wt_permno"] = rm_df.groupby(["permno"])["me_close_lag_times_cumretx"].shift(1)
    rm_df["value_wt_permno"] = rm_df["value_wt"].fillna(rm_df["me_close_lag"])
    rm_df = rm_df.drop(
        [
            "1+retx",
            "cumretx",
            "meq_close_lag_times_cumretx",
            "meq_close_lag",
            "me_close_lag_times_cumretx",
            "me_close_lag",
            "price",
            "crsp_taq_merge_indicator",
        ],
        axis=1,
    )

    return rm_df


def filter_bad_merges(rm_df, close_hour, close_minute):
    """
    Deals with potentially incorrect merges between TAQ and CRSP
    by checking if the prices make sense. Main check is to see if
    the intradaily prices jump far too much at the open and close.
    """

    ## Deal with TAQ and CRSP mismatches
    # Coarse procedure to catch mismatched TAQ and CRSP stocks
    rm_df = rm_df.sort_values(by=["permno", "datetime"])
    rm_df["price_pct_change"] = rm_df.groupby(["permno"])["price"].pct_change()
    rm_df["price_pct_change_abs"] = np.abs(rm_df["price_pct_change"])
    rm_df.loc[
        (rm_df["datetime"].dt.time == dt.time(9, 35))
        | (rm_df["datetime"].dt.time == dt.time(close_hour, close_minute)),
        "price_pct_change",
    ] = np.nan
    rm_df["price_vol"] = rm_df.groupby(["permno"])["price_pct_change"].transform("std")

    # State which stocks need to be dropped
    drop_df = rm_df.loc[
        (rm_df["price_pct_change_abs"] ** 2 > 3 * rm_df["price_vol"])
        & (rm_df["price_pct_change_abs"] > 0.05)
        & (rm_df["meq_close_lag"] > 1e5)
    ]
    drop_instances_df = (
        drop_df.groupby(["permno"])["datetime"]
        .count()
        .rename("instances")
        .astype(int)
        .reset_index()
        .query("instances == 2")
    )
    if len(drop_instances_df):
        logging.warning(
            f'[{rm_df.iloc[0]["date"].date()}] Dropping PERMNOs {", ".join(drop_instances_df["permno"].unique().astype(str))} '
            + "due to potential mismatches"
        )
    rm_df = rm_df.drop(["price_pct_change", "price_pct_change_abs", "price_vol"], axis=1)

    # Drop intradaily prices for those stocks - interpolation will fill in missing later
    rm_df.loc[
        (rm_df["datetime"].dt.time != dt.time(9, 30))
        & (rm_df["datetime"].dt.time != dt.time(close_hour, close_minute))
        & (rm_df["permno"].isin(drop_instances_df["permno"].unique())),
        "price",
    ] = np.nan

    return rm_df


def merge_crsp_taq(date, clean_crsp_date_df, interpolate=True):

    # Store additional info
    log = [date]

    # Get clean TAQ data
    clean_taq_df = clean_taq(date.strftime("%Y%m%d"))

    # Handle any missing times
    all_datetimes = pd.to_datetime(clean_taq_df["datetime"].unique())
    index = pd.MultiIndex.from_product(
        [clean_crsp_date_df["permno"].unique(), all_datetimes], names=["permno", "datetime"]
    )
    index_df = pd.DataFrame(index=index).reset_index()

    # Adjust CRSP close time to match close time based on TAQ data
    close_hour = np.max(all_datetimes).hour
    close_minute = np.max(all_datetimes).minute

    if (close_hour != 16) or (close_minute != 0):
        logging.warning(
            f" [{date.date()}] Adjusting market close time to (H={close_hour}, M={close_minute})"
        )
        clean_crsp_date_df.loc[
            clean_crsp_date_df["datetime"] == clean_crsp_date_df["datetime"].max(), "datetime"
        ] = clean_crsp_date_df.loc[
            clean_crsp_date_df["datetime"] == clean_crsp_date_df["datetime"].max(), "datetime"
        ].apply(
            lambda x: x.replace(hour=close_hour, minute=close_minute)
        )

    # Adjust CRSP open time to match close time based on TAQ data
    open_hour = np.min(all_datetimes).hour
    open_minute = np.min(all_datetimes).minute

    if (open_hour != 9) or (open_minute != 30):
        logging.warning(
            f"Adjusting market open time for {date.date()} to (H={open_hour}, M={open_minute})"
        )
        clean_crsp_date_df.loc[
            clean_crsp_date_df["datetime"] == clean_crsp_date_df["datetime"].min(), "datetime"
        ] = clean_crsp_date_df.loc[
            clean_crsp_date_df["datetime"] == clean_crsp_date_df["datetime"].min(), "datetime"
        ].apply(
            lambda x: x.replace(hour=open_hour, minute=open_minute)
        )
        raise NotImplementedError(
            f"Market open is delayed on {date.date()} to (H={open_hour}, M={open_minute}); "
            + "Adjusting for this problem has not been implemented"
        )

    # Resample the CRSP data
    resample_df = clean_crsp_date_df.merge(index_df, on=["permno", "datetime"], how="right")

    # Merge with taq
    rm_df = resample_df.merge(
        clean_taq_df,
        on=["permno", "datetime"],
        how="left",
        suffixes=["_crsp", "_taq"],
        indicator="crsp_taq_merge_indicator",
    )
    rm_df["crsp_taq_merge_indicator"] = pd.Categorical(rm_df["crsp_taq_merge_indicator"])

    # Create merged price series
    rm_df["price"] = rm_df["price_crsp"].fillna(rm_df["price_taq"])
    rm_df = rm_df.drop(["price_crsp", "price_taq"], axis=1)

    ## Fix missing data
    # Fill in missing columns
    rm_df["date"] = date
    rm_df["meq_close_lag"] = rm_df.groupby(["permno"])["meq_close_lag"].ffill()
    rm_df["me_close_lag"] = rm_df.groupby(["permno"])["me_close_lag"].ffill()

    ## Deal with mismatches between TAQ and CRSP
    rm_df = filter_bad_merges(rm_df, close_hour, close_minute)

    ## Interpolate log prices and fill in missing prices/returns
    # otherwise just use standard forward filling
    if interpolate:
        rm_df["log_price"] = np.log(rm_df["price"])
        rm_df["log_price_last"] = rm_df.groupby(["permno"])["log_price"].transform("last")
        rm_df["log_price_first"] = rm_df.groupby(["permno"])["log_price"].transform("first")
        rm_df["interp_beta"] = (rm_df["log_price_last"] - rm_df["log_price_first"]) / (
            len(all_datetimes) - 1
        )
        rm_df["count"] = rm_df.groupby(["permno"])["datetime"].cumcount()
        rm_df["log_price_interp"] = rm_df["log_price_first"] + rm_df["count"] * rm_df["interp_beta"]
        rm_df["price"] = rm_df["price"].fillna(np.exp(rm_df["log_price_interp"]))
        rm_df = rm_df.drop(
            [
                "log_price",
                "log_price_last",
                "log_price_first",
                "interp_beta",
                "count",
                "log_price_interp",
            ],
            axis=1,
        )
    else:
        rm_df["price"] = rm_df.groupby(["permno"])["price"].ffill()
        rm_df["price"] = rm_df.groupby(["permno"])["price"].bfill()

    # Add returns
    rm_df["return"] = np.where(
        rm_df["datetime"].dt.time.astype(str) == "09:30:00",
        rm_df["return"],
        rm_df.groupby(["permno"])["price"].pct_change(),
    )
    rm_df["returnx"] = np.where(
        rm_df["datetime"].dt.time.astype(str) == "09:30:00",
        rm_df["returnx"],
        rm_df.groupby(["permno"])["price"].pct_change(),
    )
    # Fix last return for delisting
    rm_df["return"] = (1 + rm_df["return"]) * (1 + rm_df["dlret"].fillna(0)) - 1
    rm_df = rm_df.drop(["dlret"], axis=1)

    # Any remaining missing returns will be those originally missing from CRSP
    rm_df["return"] = rm_df["return"].fillna(0)

    # Add value-weights
    rm_df = add_value_weights(rm_df)

    return rm_df

## Main

In [14]:
# # Remove existing files
# [os.remove(x) for x in tqdm(glob.glob(output_folder + '*'))];

In [15]:
%%time
def helper_func(df_group):
    return merge_crsp_taq(df_group[0], df_group[1], interpolate = True)

def process_date(yyyymm):

    # Clean CRSP dataframe for the month
    clean_crsp_df = clean_crsp(yyyymm)
    
    # Go through each date and infill TAQ prices
    yyyymm_dates = clean_crsp_df["date"].unique()
    df_list = []
    for df in tqdm(map(helper_func, clean_crsp_df.groupby(["date"]))):
        df_list.append(df)
        
    # Concatenate all files for the month
    final_df = pd.concat(df_list, ignore_index = True)

    # Save
    metadata_collector = []
    
    for day, df in final_df.groupby(['date']):
    
        mc = []
        table = pa.Table.from_pandas(df)
        filename = str(pd.to_datetime(day).strftime('%Y%m%d')) + '.parquet'
        pq.write_table(table, output_folder + filename, metadata_collector=mc)
        mc[-1].set_file_path(filename)
        metadata_collector.append(mc)
    
    return metadata_collector, table.schema

process_date('201301')

0it [00:00, ?it/s]

KeyboardInterrupt: 

In [12]:
%%time
def helper_func(df_group):
    return merge_crsp_taq(df_group[0], df_group[1], interpolate = True)

def process_date(yyyymm):

    # Clean CRSP dataframe for the month
    clean_crsp_df = clean_crsp(yyyymm)
    
    # Go through each date and infill TAQ prices
    yyyymm_dates = clean_crsp_df["date"].unique()
    df_list = []
    for df in map(helper_func, clean_crsp_df.groupby(["date"])):
        df_list.append(df)
        
    # Concatenate all files for the month
    final_df = pd.concat(df_list, ignore_index = True)

    # Save
    metadata_collector = []
    
    for day, df in final_df.groupby(['date']):
    
        mc = []
        table = pa.Table.from_pandas(df)
        filename = str(pd.to_datetime(day).strftime('%Y%m%d')) + '.parquet'
        pq.write_table(table, output_folder + filename, metadata_collector=mc)
        mc[-1].set_file_path(filename)
        metadata_collector.append(mc)
    
    return metadata_collector, table.schema


## Setup 
# Pyarrow 
metadata_collector = []

# Logging
log = []

# List of dates
yyyymm_list = np.sort(
    list(map(lambda x: pd.to_datetime(x).strftime("%Y%m"), pd.date_range('1996-01', '2021-01', freq = '1m')))
)

# Parallel process each month
with Pool(11) as p:
    for mc, schema in tqdm(p.imap_unordered(process_date, yyyymm_list), total = len(yyyymm_list)):
        
        # Add to lists
        metadata_collector += mc

  0%|          | 0/300 [00:00<?, ?it/s]

Process ForkPoolWorker-8:
Process ForkPoolWorker-10:


KeyboardInterrupt: 

# Check output

## Check volatility
* Compute fourth moment of returns for each stock
* Check stocks that exceed a certain threshold

In [None]:
file_list = glob.glob('../../data/proc/clean_prices/*')
df_list = []

def helper_func(file):
    df = pd.read_parquet(file)
    df['return4'] = (np.log(1+df['return']))**4
    output_df = df.groupby(['permno', 'date'])['return4'].sum().reset_index()
    return output_df
    
with Pool(11) as p:
    for output_df in tqdm(p.imap_unordered(helper_func, file_list), total = len(file_list)):
        df_list.append(output_df)

In [None]:
# Concatenate all data and add stock info
concat_df = pd.concat(df_list)
stock_info_df = pd.read_feather('../../data/keys/stock_universe.feather')
concat_sum_df = (
    concat_df.groupby(["permno"])
    .sum()
    .reset_index()
    .merge(stock_info_df.groupby(["permno"]).last().reset_index(), on=["permno"], how="left")
)

In [None]:
# Check distribution of r4
concat_df['return4_winsor'] = concat_df['return4']
concat_df.loc[concat_df['return4'] > 1, 'return4_winsor'] = 1
concat_df['return4_winsor'].hist(bins = 30)
print(f"Percent of stocks with very high RQ: {np.mean(concat_df['return4'] > 1):.5%}")

In [None]:
# Check stocks with largest RQ
concat_sum_df.sort_values(by="return4")[["permno", "return4", "shrcd", "exchcd", "meq_max"]].tail(
    50
).query("meq_max > 100000")

In [None]:
file_list = glob.glob("../../data/proc/clean_prices/*")
df_prices_list = []
permno_select = concat_sum_df.query("return4 > 10 & meq_max > 100000")["permno"].unique()


def helper_func(file):
    df = pd.read_parquet(file, columns = ['datetime', 'permno', 'return', 'value_wt'])
    output_df = df.query("permno in @permno_select")
    return output_df


with Pool(11) as p:
    for output_df in tqdm(p.imap_unordered(helper_func, file_list), total=len(file_list)):
        df_prices_list.append(output_df)

In [None]:
# Concatenate all data and add stock info
concat_prices_df = pd.concat(df_prices_list)
# stock_info_df = pd.read_feather('../../data/keys/stock_universe.feather')
# concat_sum_df = (
#     concat_df.groupby(["permno"])
#     .sum()
#     .reset_index()
#     .merge(stock_info_df.groupby(["permno"]).last().reset_index(), on=["permno"], how="left")
# )

In [None]:
len(permno_select)

In [None]:
permno_check = 87658

In [None]:
stock_info_df.assign(prc_abs = np.abs(stock_info_df['prc'])).query('permno == @permno_check').plot('dt', 'prc_abs')

In [None]:
concat_prices_df.query('permno== @permno_check').set_index('datetime')['return'].dropna().apply(lambda x: np.log(1+x)).cumsum().plot()

In [None]:
concat_prices_df.query('permno== @permno_check').set_index('datetime')['value_wt'].dropna().plot()

In [None]:
concat_prices_df.query('permno== @permno_check').query('abs(`return`) > 0.9')