In [1]:
%reload_ext autoreload
%autoreload 2

import glob
import os, gc
import numpy as numpy
import numpy.matlib
import pandas as pd
import datatable as dt
from collections import defaultdict
from tqdm.auto import tqdm
from sklearn.metrics import r2_score
from numba import njit
from joblib import Parallel, delayed

from utils import *
from numba_functions import *

from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler, PowerTransformer

# TF
import tensorflow as tf
import tensorflow.keras.backend as K
from tensorflow.keras.callbacks import Callback, ReduceLROnPlateau, ModelCheckpoint, EarlyStopping

2021-09-18 02:31:19.977668: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0


In [2]:
N_FOLD = 5
N_MINS = 5
MIN_SIZE = 600 // N_MINS

SOL_NAME = '501'
mkdir(f'./models/{SOL_NAME}/')

In [3]:
# CONSTANT
MEAN = -5.762330803300896
STD = 0.6339307835941186
EPS = 1e-9

In [4]:
# stock and time id
list_stock_id = get_stock_id()
list_time_id = get_time_id()

# Feature generating functions

In [5]:
def unstack_agg(df, agg_col):
    df = df.unstack(level=1)
    df.columns = [f'{agg_col}_{k}' for k in df.columns]
    return df.reset_index()

def init_feature_df(df_book, stock_id):
    df_feature = pd.DataFrame(df_book['time_id'].unique())
    df_feature['stock_id'] = stock_id
    df_feature.columns = ['time_id', 'stock_id']
    return df_feature[['stock_id', 'time_id']]

def add_stats(df, cols, data_name, suffix='', axis=0):
    unwrap = lambda x: x.item() if len(cols) == 1 else x
    df[f'{data_name}_{suffix}_mean'] = unwrap(df[cols].mean(axis=axis).values)
    df[f'{data_name}_{suffix}_std'] = unwrap(df[cols].std(axis=axis).values)
    df[f'{data_name}_{suffix}_skew'] = unwrap(df[cols].skew(axis=axis).values)
    df[f'{data_name}_{suffix}_min'] = unwrap(df[cols].min(axis=axis).values)
    df[f'{data_name}_{suffix}_q1'] = unwrap(df[cols].quantile(q=0.25, axis=axis).values)
    df[f'{data_name}_{suffix}_q2'] = unwrap(df[cols].quantile(q=0.50, axis=axis).values)
    df[f'{data_name}_{suffix}_q3'] = unwrap(df[cols].quantile(q=0.75, axis=axis).values)
    df[f'{data_name}_{suffix}_max'] = unwrap(df[cols].max(axis=axis).values)
    return df

def add_feature_min(df_feature, df, configs):
    df['min_id'] = df['seconds_in_bucket'] // MIN_SIZE
    df_gb_min = df.groupby(['time_id', 'min_id'])
    for data_col, agg_func, agg_col in configs:
        # agg by min
        df_ = df_gb_min[data_col].agg(agg_func, engine='numba')
        df_ = unstack_agg(df_, agg_col)
        df_feature = df_feature.merge(df_, on=['time_id'], how='left')
        # gen stats by min and by time
        cols = [f'{agg_col}_{k}' for k in range(N_MINS)]
        for c in cols:
            if c not in df_feature:
                df_feature[c] = 0
        df_feature = add_stats(df_feature, cols=cols, data_name=agg_col, suffix='min', axis=1)
    return df_feature.fillna(0.0)

def add_feature_time(df_feature, df, configs):
    df_gb_time = df.groupby(['time_id'])
    for data_col, agg_func, agg_col in configs:
        # agg by time
        df_ = df_gb_time[data_col].agg(agg_func, engine='numba')
        df_.name = f'{agg_col}_time'
        df_feature = df_feature.merge(df_, on=['time_id'], how='left')
    return df_feature.fillna(0.0)

def ffill_book(df_book):
    list_time_id_book = df_book.time_id.unique()
    df_ = pd.DataFrame()
    df_['time_id'] = np.matlib.repeat(list_time_id_book, 600)
    df_['seconds_in_bucket'] = np.matlib.repmat(range(600), 1, len(list_time_id_book)).ravel()
    df_book = df_.merge(df_book, on=['time_id', 'seconds_in_bucket'], how='left')
    df_book = df_book.set_index('time_id').groupby(level='time_id').ffill().bfill().reset_index() 
    return df_book

# Generating features

In [6]:
book_configs = [
    ('log_return1', rv_numba, 'B_RV1'),
    ('log_return2', rv_numba, 'B_RV2'),
    ('seconds_in_bucket', count_numba, 'B_NROW'),
    ('bid_vol1', sum_numba, 'B_BVOL1'),
    ('bid_vol2', sum_numba, 'B_BVOL2'),
    ('ask_vol1', sum_numba, 'B_AVOL1'),
    ('ask_vol2', sum_numba, 'B_AVOL2'),
]

book_configs_ffill = [
    ('bid_price1', mean_numba, 'B_BP1'),
    ('bid_price2', mean_numba, 'B_BP2'),
    ('ask_price1', mean_numba, 'B_AP1'),
    ('ask_price2', mean_numba, 'B_AP2'),
    ('bid_size1', mean_numba, 'B_BS1'),
    ('bid_size2', mean_numba, 'B_BS2'),
    ('ask_size1', mean_numba, 'B_AS1'),
    ('ask_size2', mean_numba, 'B_AS2'),
    # new features
    ('price1_diff', mean_numba, 'Z_P1-DIFF'),
    ('price2_diff', mean_numba, 'Z_P2-DIFF'),
    ('price1_dabs', mean_numba, 'Z_P1-DABS'),
    ('price2_dabs', mean_numba, 'Z_P2-DABS'),
    ('price_spread1', mean_numba, 'Z_SPREAD1'),
]

trade_configs = [
    ('vol', sum_numba, 'T_VOL'),
    ('order_count', sum_numba, 'T_OC'),
    ('size', sum_numba, 'T_SIZE'),
    ('seconds_in_bucket', count_numba, 'T_NROW'),
]

In [None]:
# [f[-1] for f in book_configs+book_configs_ffill+trade_configs] + ['Z_RATIO']

In [7]:
def gen_df_feature(stock_id):
    # -----------------------------------------------------------------
    # Book data (no ffill)
    book_parquet_path = get_path_by_id('book', stock_id)
    df_book = pd.read_parquet(book_parquet_path)
    df_book_ff = df_book.copy()
    df_feature = init_feature_df(df_book, stock_id)
    # add wap and log_return
    df_book['wap1'] = calc_wap_njit(
        df_book.bid_price1.values,
        df_book.ask_price1.values,
        df_book.bid_size1.values,
        df_book.ask_size1.values
    )
    df_book['wap2'] = calc_wap_njit(
        df_book.bid_price2.values,
        df_book.ask_price2.values,
        df_book.bid_size1.values + df_book.bid_size2.values,
        df_book.ask_size1.values + df_book.ask_size2.values
    )
    df_book['log_return1'] = df_book.groupby(['time_id'])['wap1'].apply(calc_log_return).fillna(0)
    df_book['log_return2'] = df_book.groupby(['time_id'])['wap2'].apply(calc_log_return).fillna(0)
    # add vols
    df_book['bid_vol1'] = prod_njit(df_book['bid_price1'].values, df_book['bid_size1'].values)
    df_book['bid_vol2'] = prod_njit(df_book['bid_price2'].values, df_book['bid_size2'].values)
    df_book['ask_vol1'] = prod_njit(df_book['ask_price1'].values, df_book['ask_size1'].values)
    df_book['ask_vol2'] = prod_njit(df_book['ask_price2'].values, df_book['ask_size2'].values)
    # generate book features
    df_feature = add_feature_min(df_feature, df_book, book_configs)
    df_feature = add_feature_time(df_feature, df_book, book_configs)


    # -----------------------------------------------------------------
    # Book data (ffill) 
    df_book_ff = ffill_book(df_book_ff)
    # new features
    df_book_ff['price1_diff'] = df_book_ff['ask_price1'] - df_book_ff['bid_price1']
    df_book_ff['price2_diff'] = df_book_ff['ask_price2'] - df_book_ff['bid_price2']
    df_book_ff['price1_dabs'] = df_book_ff['price1_diff'].abs()
    df_book_ff['price2_dabs'] = df_book_ff['price2_diff'].abs()
    df_book_ff['price_spread1'] = (df_book_ff['ask_price1'] - df_book_ff['bid_price1']) / (df_book_ff['ask_price1'] + df_book_ff['bid_price1'])
    # generate book features
    df_feature = add_feature_min(df_feature, df_book_ff, book_configs_ffill)
    df_feature = add_feature_time(df_feature, df_book_ff, book_configs_ffill)
    

    # -----------------------------------------------------------------
    # Trade data
    trade_parquet_path = get_path_by_id('trade', stock_id)
    df_trade = pd.read_parquet(trade_parquet_path)
    # add vol
    df_trade['vol'] = prod_njit(df_trade['price'].values, df_trade['size'].values)
    # generate trade features
    df_feature = add_feature_min(df_feature, df_trade, trade_configs)
    df_feature = add_feature_time(df_feature, df_trade, trade_configs)


    # -----------------------------------------------------------------
    # Combined feature
    log_return = df_trade.merge(df_book, on=['time_id', 'seconds_in_bucket'], how='left').groupby('time_id')['log_return1'].agg(lambda x: np.sum(np.square(x)))
    total_log_return = df_book.groupby('time_id')['log_return1'].agg(lambda x: np.sum(np.square(x)))
    df_feature['Z_RATIO'] = (log_return / total_log_return).values
    df_feature['Z_RATIO'] = df_feature['Z_RATIO'].fillna(0.0)
    return df_feature

In [8]:
list_dfs = Parallel(n_jobs=-1)(delayed(gen_df_feature)(stock_id) for stock_id in tqdm(list_stock_id))
df_train = pd.concat(list_dfs).reset_index(drop=True)
df_train = df_train.sort_values(['stock_id', 'time_id']).reset_index(drop=True)

100%|██████████| 112/112 [04:00<00:00,  2.15s/it]


In [9]:
# Combine feature and target
df_target = dt.fread('./dataset/train.csv').to_pandas()
df_train = df_train.merge(df_target, on=['stock_id', 'time_id'], how='inner', validate='one_to_one')
fea_cols = [c for c in df_train.columns if c.startswith('B_') or c.startswith('T_') or c.startswith('Z_')]
# Save df_train
dt.Frame(df_train).to_csv(f'./dataset/train_{SOL_NAME}_LGB.csv')

# Data Preprocessing

In [6]:
df_train = dt.fread(f'./dataset/train_{SOL_NAME}_LGB.csv').to_pandas()
fea_cols = [c for c in df_train.columns if c.startswith('B_') or c.startswith('T_') or c.startswith('Z_')]

In [7]:
pipe = make_pipeline(StandardScaler(), PowerTransformer())
df_train[fea_cols] = pipe.fit_transform(df_train[fea_cols].values)
save_pickle(pipe, f'./models/{SOL_NAME}/pipe.pkl')

Done!


In [8]:
# df_train[fea_cols] = df_train[fea_cols].clip(-10, 10, axis=1)
dt.Frame(df_train).to_csv(f'./dataset/train_{SOL_NAME}_NN_noclip.csv')
gc.collect()

0