In [1]:
import pandas as pd
import numpy as np
import datetime as dt
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm.auto import tqdm
from tqdm.contrib import concurrent
from multiprocessing import Pool
import itertools
import glob

# Functions

In [2]:
def add_returns(data_df):
    
    # Prices
    data_df['log_price'] = np.log(data_df['price'])
    data_df['return'] = data_df.groupby(['ticker', 'date'])['log_price'].transform(pd.Series.diff)
    
    return data_df

In [3]:
def compute_return_stats(data_df):

    # Return vars
    data_df["return_p2"] = np.power(data_df["return"], 2)
    data_df["return_p4"] = np.power(data_df["return"], 4)
    data_df["return_lag1"] = data_df.groupby(["date"])["return"].shift(1)
    data_df["return_bp"] = np.abs(np.multiply(data_df["return"], data_df["return_lag1"]))

    # BV
    data_df["bipower_var_daily"] = data_df.groupby(["date"])["return_bp"].transform("sum") * (
        78 / 77
    )
    data_df["bipower_var_tod"] = data_df.groupby(["time"])["return_bp"].transform("mean")

    # Jump detection
    alpha = 4
    data_df["tod_correction"] = np.divide(
        data_df["bipower_var_tod"], data_df["bipower_var_tod"].mean()
    )
    data_df["jump_cut"] = (
        alpha * np.sqrt(data_df["bipower_var_daily"] * data_df["tod_correction"]) * ((1 / 78) ** (0.49))
    )
    data_df['is_jump'] = (np.abs(data_df['return']) > data_df['jump_cut']).astype(int)

    ## Filtered returns
    # Realized Semivariance components
    data_df['return_p2_pos'] = data_df['return_p2']*(data_df['return'] > 0)
    data_df['return_p2_neg'] = data_df['return_p2']*(data_df['return'] < 0)
    
    # Signed jump variation
    data_df['sgn_jump_var'] = data_df['return_p2_pos'] - data_df['return_p2_neg'] 
    
    ## Filtered diffusive returns
    # Main
    data_df['return_d'] = data_df['return']*(1-data_df['is_jump'])
    data_df['return_d_pos'] = data_df['return_d']*(data_df['return_d'] > 0)
    data_df['return_d_neg'] = data_df['return_d']*(data_df['return_d'] < 0)
    # Semi
    data_df['return_d_p2'] = np.power(data_df['return_d'], 2)
    data_df['return_d_p4'] = np.power(data_df['return_d'], 4)
    data_df['return_d_p2_pos'] = np.power(data_df['return_d_pos'], 2)
    data_df['return_d_p4_pos'] = np.power(data_df['return_d_pos'], 4)
    data_df['return_d_p2_neg'] = np.power(data_df['return_d_neg'], 2)
    data_df['return_d_p4_neg'] = np.power(data_df['return_d_neg'], 4)

    ## Filtered jump returns
    # Main
    data_df['return_j'] = data_df['return']*(data_df['is_jump'])
    data_df['return_j_pos'] = data_df['return_j']*(data_df['return_j'] > 0)
    data_df['return_j_neg'] = data_df['return_j']*(data_df['return_j'] < 0)
    # Semi
    data_df['return_j_p2'] = np.power(data_df['return_j'], 2)
    data_df['return_j_p4'] = np.power(data_df['return_j'], 4)
    data_df['return_j_p2_pos'] = np.power(data_df['return_j_pos'], 2)
    data_df['return_j_p4_pos'] = np.power(data_df['return_j_pos'], 4)
    data_df['return_j_p2_neg'] = np.power(data_df['return_j_neg'], 2)
    data_df['return_j_p4_neg'] = np.power(data_df['return_j_neg'], 4)
    
    return data_df

In [4]:
def compute_rv_stats(data_df):

    ## Returns data
    rv_df = (
        data_df.drop(["price", "log_price"], axis=1)
        .groupby(["ticker", "date"])
        .sum()
        .reset_index()
    )

    ## Realized Vol
    rv_df.rename(
        columns = {
            "return_p2": "rv",
            "return_p2_pos": "rs_pos",
            "return_p2_neg": "rs_neg",
            "sgn_jump_var": "sgn_jump_var",
        },
        inplace=True,
    )

    # Lags
    rv_df["rv_lag_1"] = rv_df["rv"].shift(1)
    rv_df["rv_lag_m5"] = rv_df["rv_lag_1"].rolling(5).mean()
    rv_df["rv_lag_m22"] = rv_df["rv_lag_1"].rolling(22).mean()

    rv_df["rv_d_1"] = rv_df["return_d_p2"]
    rv_df["rv_d_5"] = rv_df["rv_d_1"].rolling(5).mean()
    rv_df["rv_d_22"] = rv_df["rv_d_1"].rolling(22).mean()

    # Total jumps
    rv_df["sum_jump_p2_1"] = rv_df["return_j_p2"] * 100
    rv_df["sum_jump_p2_5"] = rv_df["sum_jump_p2_1"].rolling(5).mean()
    rv_df["sum_jump_p2_22"] = rv_df["sum_jump_p2_1"].rolling(22).mean()

    rv_df["sum_jump_p2_pos_1"] = rv_df["return_j_p2_pos"] * 100
    rv_df["sum_jump_p2_pos_5"] = rv_df["sum_jump_p2_pos_1"].rolling(5).mean()
    rv_df["sum_jump_p2_pos_22"] = rv_df["sum_jump_p2_pos_1"].rolling(22).mean()

    rv_df["sum_jump_p2_neg_1"] = rv_df["return_j_p2_neg"] * 100
    rv_df["sum_jump_p2_neg_5"] = rv_df["sum_jump_p2_neg_1"].rolling(5).mean()
    rv_df["sum_jump_p2_neg_22"] = rv_df["sum_jump_p2_neg_1"].rolling(22).mean()

    return rv_df

# Main

## Process Price Files
Get returns and identify jumps in five minute prices

In [5]:
def process_ticker_file(filename):
    
    ticker = filename.split('/')[-1].split('_')[0]
    raw_ticker_df = pd.read_feather(filename)
    
    return ticker, compute_return_stats(add_returns(raw_ticker_df))

In [6]:
%%time
data_files = glob.glob("../data/prices/tickers_clean/*.feather")

with Pool(8) as p:

    for ticker, processed_ticker_df in tqdm(
        p.imap_unordered(process_ticker_file, data_files),
        total=len(data_files),
    ):

        processed_ticker_df.to_feather(
            f"../data/returns/ticker_returns/{ticker}_processed.feather"
        )

HBox(children=(FloatProgress(value=0.0, max=455.0), HTML(value='')))


CPU times: user 1min 18s, sys: 3min 56s, total: 5min 15s
Wall time: 4min 15s


In [7]:
%%time
data_files = glob.glob("../data/prices/tickers_clean/*.feather")

def temp_f(filename):
    
    ticker = filename.split('/')[-1].split('_')[0]
    raw_ticker_df = pd.read_feather(filename)
#     print(raw_ticker_df['ticker'].iloc[0], end = '  -- ')
    temp = np.min(raw_ticker_df.groupby(['date'])['time'].count())
    if temp < 78:
        print(raw_ticker_df['ticker'].iloc[0])
        print(temp)
    
    return None
    
for result in tqdm(
    map(temp_f, data_files),
    total=len(data_files),
):
    continue

HBox(children=(FloatProgress(value=0.0, max=455.0), HTML(value='')))


CPU times: user 6.72 s, sys: 2.48 s, total: 9.2 s
Wall time: 9.98 s


## Process Return Files

Aggregate up return files to RV

In [8]:
def process_ticker_file(filename):
    
    ticker = filename.split('/')[-1].split('_')[0]
    raw_ticker_df = pd.read_feather(filename)
    
    return ticker, compute_rv_stats(raw_ticker_df)

In [9]:
%%time
data_files = glob.glob("../data/returns/ticker_returns/*.feather")

with Pool(8) as p:

    for ticker, processed_ticker_df in tqdm(
        p.imap_unordered(process_ticker_file, data_files),
        total=len(data_files),
    ):

        processed_ticker_df.to_feather(
            f"../data/returns/ticker_daily_rv/{ticker}_daily_rv.feather"
        )

HBox(children=(FloatProgress(value=0.0, max=471.0), HTML(value='')))


CPU times: user 9.78 s, sys: 2.78 s, total: 12.6 s
Wall time: 28.4 s


## Double Check

In [10]:
test_df = pd.read_feather('../data/returns/ticker_daily_rv/AAP_daily_rv.feather')
test_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1447 entries, 0 to 1446
Data columns (total 48 columns):
 #   Column              Non-Null Count  Dtype         
---  ------              --------------  -----         
 0   ticker              1447 non-null   object        
 1   date                1447 non-null   datetime64[ns]
 2   return              1447 non-null   float64       
 3   rv                  1447 non-null   float64       
 4   return_p4           1447 non-null   float64       
 5   return_lag1         1447 non-null   float64       
 6   return_bp           1447 non-null   float64       
 7   bipower_var_daily   1447 non-null   float64       
 8   bipower_var_tod     1447 non-null   float64       
 9   tod_correction      1447 non-null   float64       
 10  jump_cut            1447 non-null   float64       
 11  is_jump             1447 non-null   int64         
 12  rs_pos              1447 non-null   float64       
 13  rs_neg              1447 non-null   float64     

In [11]:
test_df['date'].unique()

array(['2015-01-02T00:00:00.000000000', '2015-01-05T00:00:00.000000000',
       '2015-01-06T00:00:00.000000000', ...,
       '2020-09-28T00:00:00.000000000', '2020-09-29T00:00:00.000000000',
       '2020-09-30T00:00:00.000000000'], dtype='datetime64[ns]')

### AMZN Check

In [12]:
pd.read_feather('../data/returns/ticker_returns/NFLX_processed.feather').info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 112866 entries, 0 to 112865
Data columns (total 37 columns):
 #   Column             Non-Null Count   Dtype         
---  ------             --------------   -----         
 0   date               112866 non-null  datetime64[ns]
 1   time               112866 non-null  object        
 2   ticker             112866 non-null  object        
 3   datetime           112866 non-null  datetime64[ns]
 4   price              112866 non-null  float64       
 5   log_price          112866 non-null  float64       
 6   return             111419 non-null  float64       
 7   return_p2          111419 non-null  float64       
 8   return_p4          111419 non-null  float64       
 9   return_lag1        109972 non-null  float64       
 10  return_bp          109972 non-null  float64       
 11  bipower_var_daily  112866 non-null  float64       
 12  bipower_var_tod    109972 non-null  float64       
 13  tod_correction     109972 non-null  float64 

In [13]:
amzn_prices_old_df = pd.read_feather('../data/temp/AMZN_prices.feather')

In [14]:
amzn_prices_new_df = pd.read_feather(
    "../data/returns/tickers_processed/AMZN_processed.feather"
)

FileNotFoundError: [Errno 2] Failed to open local file '../data/returns/tickers_processed/AMZN_processed.feather'. Detail: [errno 2] No such file or directory

In [None]:
amzn_prices_old_df

In [None]:
merged_df = amzn_prices_new_df.merge(amzn_prices_old_df, how = 'inner', on = ['ticker', 'date', 'datetime'])

In [None]:
merged_df.head()

In [None]:
merged_df.columns

In [None]:
import matplotlib.pyplot as plt

In [None]:
check_cols = [y for y in merged_df.columns if '_x' in y]

In [None]:
for check_col  in check_cols:
    print(check_col[:-2])
    try: 
        print(np.max(np.abs(merged_df[check_col].astype(float) - merged_df[check_col[:-2] + '_y'].astype(float))))
    except:
        print('')

In [None]:
np.max(np.abs(merged_df['price_x'] - merged_df['price_y']))

In [None]:
np.max(np.abs(merged_df['return_x'] - merged_df['return_y']))

In [None]:
np.max(np.abs(merged_df['return_x'] - merged_df['return_y']))

In [None]:
plt.plot(merged_df['price_x'] - merged_df['price_y'])