In [1]:
from IPython.core.display import display, HTML

import pandas as pd
import numpy as np
from scipy import stats
import random
import glob
import os
import gc

from joblib import Parallel, delayed

from sklearn import preprocessing, model_selection
from tqdm import tqdm

import matplotlib.pyplot as plt
import seaborn as sns

path_root = './'
data_dir ='./'
path_submissions = '/'

target_name = 'target'

DEBUG = False


In [2]:
def log_return(list_stock_prices):
    return np.log(list_stock_prices).diff() 

def realized_volatility(series_log_return):
    return np.sqrt(np.sum(series_log_return**2))

def realized_mad(series_log_return):
    return np.mean(np.absolute(series_log_return - np.mean(series_log_return)))

def realized_median_abs_dev(series_log_return):
    return stats.median_absolute_deviation(series_log_return, nan_policy='omit')

def rmspe(y_true, y_pred):
    return  (np.sqrt(np.mean(np.square((y_true - y_pred) / y_true))))

def calc_wap(df):
    wap = (df['bid_price1'] * df['ask_size1'] + df['ask_price1'] * df['bid_size1'])/(df['bid_size1'] + df['ask_size1'])
    return wap

def calc_wap2(df):
    wap = (df['bid_price2'] * df['ask_size2'] + df['ask_price2'] * df['bid_size2'])/(df['bid_size2'] + df['ask_size2'])
    return wap

def count_unique(series):
    return len(np.unique(series))

In [25]:
def preprocessor_book(file_path):
    df = pd.read_parquet(file_path)
    
    df['wap'] = calc_wap(df)
    df['log_return'] = df.groupby('time_id')['wap'].apply(log_return)
    
    df['wap2'] = calc_wap(df)
    df['log_return2'] = df.groupby('time_id')['wap2'].apply(log_return)
    
    df['wap_imbalance'] = abs(df['wap'] - df['wap2'])
    
    df['spread'] = (df['ask_price1'] - df['bid_price1']) / ((df['ask_price1'] + df['bid_price1'])/2)
    
    df['bid_spread'] = df['bid_price1'] - df['bid_price2']
    df['ask_spread'] = df['ask_price1'] - df['ask_price2']
    df['total_volume'] = (df['ask_size1'] + df['ask_size2']) + (df['bid_size1'] + df['bid_size2'])
    df['volume_imbalance'] = abs((df['ask_size1'] + df['ask_size2']) - (df['bid_size1'] + df['bid_size2']))
    
    agg_dict = {
        'log_return':[realized_volatility,realized_mad,realized_median_abs_dev],
        'log_return2':[realized_volatility,realized_mad,realized_median_abs_dev],
        'wap_imbalance':[np.mean],
        'spread':[np.mean],
        'bid_spread':[np.mean],
        'ask_spread':[np.mean],
        'volume_imbalance':[np.mean],
        'total_volume':[np.mean],
        'wap':[np.mean],
    }
    
    
    df_feature = pd.DataFrame(df.groupby(['time_id']).agg(agg_dict)).reset_index()
    
    df_feature.columns = ['_'.join(col) for col in df_feature.columns] #time_id is changed to time_id_
        
    #create row_id
    stock_id = file_path.split('=')[1]
    df_feature['row_id'] = df_feature['time_id_'].apply(lambda x:f'{stock_id}-{x}')
    df_feature['vpin'] = df_feature['volume_imbalance_mean']/df_feature['total_volume_mean']
    df_feature = df_feature.drop(['time_id_'],axis=1)
    
    return df_feature
    

In [18]:
def preprocessor_trade(file_path):
    df = pd.read_parquet(file_path)
    df['log_return'] = df.groupby('time_id')['price'].apply(log_return)
    df['dollar_volume'] = df['price'] * df['size']
    df['side'] = np.sign(df['price'].diff()).fillna(method='bfill')
    df['signed_volume'] = df['size'] * df['side']
    
    
    agg_dict = {
        'log_return':[realized_volatility,realized_mad,realized_median_abs_dev],
        'seconds_in_bucket':[count_unique],
        'size':[np.sum],
        'order_count':[np.mean],
        'dollar_volume':[np.sum],
        'signed_volume':[('sell_volume', lambda x: x[x<0].sum()),('buy_volume', lambda x:x[x>0].sum())]
    }
    
    df_feature = df.groupby('time_id').agg(agg_dict).reset_index()
    
    df_feature.columns = ['_'.join(col) for col in df_feature.columns]

    
    df_feature = df_feature.add_prefix('trade_')
    stock_id = file_path.split('=')[1]
    df_feature['row_id'] = df_feature['trade_time_id_'].apply(lambda x:f'{stock_id}-{x}')
    df_feature = df_feature.drop(['trade_time_id_'],axis=1)
    
    return df_feature

In [28]:
def preprocessor(list_stock_ids, is_train = True):
    from joblib import Parallel, delayed # parallel computing to save time
    df = pd.DataFrame()
    
    def for_joblib(stock_id):
        if is_train:
            file_path_book = data_dir + "book_train.parquet/stock_id=" + str(stock_id)
            file_path_trade = data_dir + "trade_train.parquet/stock_id=" + str(stock_id)
        else:
            file_path_book = data_dir + "book_test.parquet/stock_id=" + str(stock_id)
            file_path_trade = data_dir + "trade_test.parquet/stock_id=" + str(stock_id)
            
        df_tmp = pd.merge(preprocessor_book(file_path_book),preprocessor_trade(file_path_trade),on='row_id',how='left')
     
        return pd.concat([df,df_tmp])
    
    df = Parallel(n_jobs=-1, verbose=1)(
        delayed(for_joblib)(stock_id) for stock_id in list_stock_ids
        )

    df =  pd.concat(df,ignore_index = True)
    return df

In [29]:
train = pd.read_csv(os.path.join(data_dir,'train.csv'))

In [30]:
def cusum_filter(raw_time_series, threshold, time_stamps=True):
    """
    Advances in Financial Machine Learning, Snippet 2.4, page 39.

    The Symmetric Dynamic/Fixed CUSUM Filter.

    The CUSUM filter is a quality-control method, designed to detect a shift in the mean value of a measured quantity
    away from a target value. The filter is set up to identify a sequence of upside or downside divergences from any
    reset level zero. We sample a bar t if and only if S_t >= threshold, at which point S_t is reset to 0.

    One practical aspect that makes CUSUM filters appealing is that multiple events are not triggered by raw_time_series
    hovering around a threshold level, which is a flaw suffered by popular market signals such as Bollinger Bands.
    It will require a full run of length threshold for raw_time_series to trigger an event.

    Once we have obtained this subset of event-driven bars, we will let the ML algorithm determine whether the occurrence
    of such events constitutes actionable intelligence. Below is an implementation of the Symmetric CUSUM filter.

    Note: As per the book this filter is applied to closing prices but we extended it to also work on other
    time series such as volatility.

    :param raw_time_series: (pd.Series) Close prices (or other time series, e.g. volatility).
    :param threshold: (float or pd.Series) When the abs(change) is larger than the threshold, the function captures
                      it as an event, can be dynamic if threshold is pd.Series
    :param time_stamps: (bool) Default is to return a DateTimeIndex, change to false to have it return a list.
    :return: (datetime index vector) Vector of datetimes when the events occurred. This is used later to sample.
    """

    t_events = []
    s_pos = 0
    s_neg = 0

    # log returns
    raw_time_series = pd.DataFrame(raw_time_series)  # Convert to DataFrame
    raw_time_series.columns = ['price']
    raw_time_series['log_ret'] = raw_time_series.price.apply(np.log).diff()
    if isinstance(threshold, (float, int)):
        raw_time_series['threshold'] = threshold
    elif isinstance(threshold, pd.Series):
        raw_time_series.loc[threshold.index, 'threshold'] = threshold
    else:
        raise ValueError('threshold is neither float nor pd.Series!')

    raw_time_series = raw_time_series.iloc[1:]  # Drop first na values

    # Get event time stamps for the entire series
    for tup in raw_time_series.itertuples():
        thresh = tup.threshold
        pos = float(s_pos + tup.log_ret)
        neg = float(s_neg + tup.log_ret)
        s_pos = max(0.0, pos)
        s_neg = min(0.0, neg)

        if s_neg < -thresh:
            s_neg = 0
            t_events.append(tup.Index)

        elif s_pos > thresh:
            s_pos = 0
            t_events.append(tup.Index)

    # Return DatetimeIndex or list
    if time_stamps:
        event_timestamps = pd.DatetimeIndex(t_events)
        return event_timestamps

    return t_events



In [34]:
filtered_train_ids = cusum_filter(train['target'],.1,False) 

In [35]:
train = train.iloc[filtered_train_ids]

In [36]:
%%time
train_ids = train.stock_id.unique()

df_train = preprocessor(list_stock_ids=train_ids, is_train=True)

print(f'train shape {df_train.shape}')
display(df_train.head())
display(df_train.tail())

[Parallel(n_jobs=-1)]: Using backend LokyBackend with 8 concurrent workers.
[Parallel(n_jobs=-1)]: Done  34 tasks      | elapsed:  1.8min


train shape (428932, 24)


[Parallel(n_jobs=-1)]: Done 112 out of 112 | elapsed:  5.6min finished


Unnamed: 0,log_return_realized_volatility,log_return_realized_mad,log_return_realized_median_abs_dev,log_return2_realized_volatility,log_return2_realized_mad,log_return2_realized_median_abs_dev,wap_imbalance_mean,spread_mean,bid_spread_mean,ask_spread_mean,...,vpin,trade_log_return_realized_volatility,trade_log_return_realized_mad,trade_log_return_realized_median_abs_dev,trade_seconds_in_bucket_count_unique,trade_size_sum,trade_order_count_mean,trade_dollar_volume_sum,trade_signed_volume_sell_volume,trade_signed_volume_buy_volume
0,0.004499,0.000157,5.3e-05,0.004499,0.000157,5.3e-05,0.0,0.000852,0.000176,-0.000151,...,0.416987,0.002006,0.000271,0.000345,40,3179.0,2.75,3190.139181,-1328.0,1850.0
1,0.001204,3.8e-05,5e-06,0.001204,3.8e-05,5e-06,0.0,0.000394,0.000142,-0.000135,...,0.345242,0.000901,0.000133,0.000149,30,1289.0,1.9,1289.353432,-543.0,740.0
2,0.002369,9.2e-05,2e-05,0.002369,9.2e-05,2e-05,0.0,0.000725,0.000197,-0.000198,...,0.339653,0.001961,0.000298,0.000307,25,2161.0,2.72,2158.608928,-1671.0,490.0
3,0.002574,0.000113,1.2e-05,0.002574,0.000113,1.2e-05,0.0,0.00086,0.00019,-0.000108,...,0.335924,0.001561,0.000321,0.000383,15,1962.0,3.933333,1959.605547,-1127.0,819.0
4,0.001894,6.8e-05,1.1e-05,0.001894,6.8e-05,1.1e-05,0.0,0.000397,0.000191,-0.000109,...,0.360836,0.000871,0.000138,0.000158,22,1791.0,4.045455,1790.254496,-1092.0,696.0


Unnamed: 0,log_return_realized_volatility,log_return_realized_mad,log_return_realized_median_abs_dev,log_return2_realized_volatility,log_return2_realized_mad,log_return2_realized_median_abs_dev,wap_imbalance_mean,spread_mean,bid_spread_mean,ask_spread_mean,...,vpin,trade_log_return_realized_volatility,trade_log_return_realized_mad,trade_log_return_realized_median_abs_dev,trade_seconds_in_bucket_count_unique,trade_size_sum,trade_order_count_mean,trade_dollar_volume_sum,trade_signed_volume_sell_volume,trade_signed_volume_buy_volume
428927,0.003691,0.00011,2.9e-05,0.003691,0.00011,2.9e-05,0.0,0.000878,9.1e-05,-0.000202,...,0.398081,0.002171,0.000297,0.000409,37,2570.0,2.783784,2568.838117,-1243.0,1227.0
428928,0.004104,0.00017,8.9e-05,0.004104,0.00017,8.9e-05,0.0,0.000706,0.000126,-0.000142,...,0.618842,0.00218,0.000225,0.000213,43,2323.0,3.418605,2327.828627,-1048.0,1190.0
428929,0.003118,9.7e-05,1.6e-05,0.003118,9.7e-05,1.6e-05,0.0,0.000739,0.000189,-0.000192,...,0.730856,0.001921,0.000261,0.000332,35,3740.0,2.8,3742.254714,-2872.0,770.0
428930,0.003661,0.00011,8.1e-05,0.003661,0.00011,8.1e-05,0.0,0.00053,0.000143,-0.000134,...,0.341578,0.002051,0.000182,0.000224,80,9389.0,2.925,9406.795437,-3365.0,5569.0
428931,0.002091,8.9e-05,5.7e-05,0.002091,8.9e-05,5.7e-05,0.0,0.000432,0.000109,-0.000159,...,0.333969,0.001041,0.000136,0.00013,36,5325.0,3.0,5326.415054,-3990.0,1334.0


Wall time: 5min 34s


In [37]:
df_train = df_train.iloc[filtered_train_ids]

In [38]:
df_train.columns

Index(['log_return_realized_volatility', 'log_return_realized_mad',
       'log_return_realized_median_abs_dev', 'log_return2_realized_volatility',
       'log_return2_realized_mad', 'log_return2_realized_median_abs_dev',
       'wap_imbalance_mean', 'spread_mean', 'bid_spread_mean',
       'ask_spread_mean', 'volume_imbalance_mean', 'total_volume_mean',
       'wap_mean', 'row_id', 'vpin', 'trade_log_return_realized_volatility',
       'trade_log_return_realized_mad',
       'trade_log_return_realized_median_abs_dev',
       'trade_seconds_in_bucket_count_unique', 'trade_size_sum',
       'trade_order_count_mean', 'trade_dollar_volume_sum',
       'trade_signed_volume_sell_volume', 'trade_signed_volume_buy_volume'],
      dtype='object')

In [39]:
train['row_id'] = train['stock_id'].astype(str) + '-' + train['time_id'].astype(str)
train = train[['row_id','target']]
df_train = train.merge(df_train, on = ['row_id'], how = 'left')

In [40]:
df_train.to_csv('train_processed_v2.csv',index=False)