In [93]:
import os
import pandas as pd
import numpy as np
import lightgbm as lgb
from tqdm import tqdm 
from glob import glob
import joblib
from multiprocessing import Pool, cpu_count
import warnings
warnings.filterwarnings('ignore')

In [90]:
# input_dir = '../input/optiver-realized-volatility-prediction'
# model_dir = '../input/optiver-realized-volatility-prediction'
input_dir = '../data'
output_dir = './'
model_dir = '../model'

In [8]:
def load_book(stock_id=0, data_type='train'):
    """加载 book 数据
    """
    book_df = pd.read_parquet(
        os.path.join(input_dir,
                     'book_{}.parquet/stock_id={}'.format(data_type,
                                                          stock_id)))
    book_df['stock_id'] = stock_id
    book_df['stock_id'] = book_df['stock_id'].astype(np.int8)
    book_df['seconds_in_bucket'] = book_df['seconds_in_bucket'].astype(
        np.int32)

    return book_df


def load_trade(stock_id=0, data_type='train'):
    """加载 trade 数据
    """
    trade_df = pd.read_parquet(
        os.path.join(
            input_dir,
            'trade_{}.parquet/stock_id={}'.format(data_type, stock_id)))
    trade_df['stock_id'] = stock_id
    trade_df['stock_id'] = trade_df['stock_id'].astype(np.int8)
    trade_df['order_count'] = trade_df['order_count'].astype(np.int32)
    trade_df['seconds_in_bucket'] = trade_df['seconds_in_bucket'].astype(
        np.int32)

    return trade_df
def log_return(list_stock_prices):
    """收益率
    """
    return np.log(list_stock_prices).diff()


def realized_volatility(series_log_return):
    """波动率
    """
    return np.sqrt(np.sum(series_log_return**2))


def fix_jsonerr(df):
    """
    """
    df.columns = [
        "".join(c if c.isalnum() else "_" for c in str(x)) for x in df.columns
    ]
    return df

In [78]:
# 特征工程
def feature_row(book):
    """
    """
    # book_wap1 生成标签
    for i in [
            1,
            2,
    ]:
        # wap
        book[f'book_wap{i}'] = (book[f'bid_price{i}'] * book[f'ask_size{i}'] +
                                book[f'ask_price{i}'] *
                                book[f'bid_size{i}']) / (book[f'bid_size{i}'] +
                                                         book[f'ask_size{i}'])

    # mean wap
    book['book_wap_mean'] = (book['book_wap1'] + book['book_wap2']) / 2

    # wap diff
    book['book_wap_diff'] = book['book_wap1'] - book['book_wap2']

    # other orderbook features
    book['book_price_spread'] = (book['ask_price1'] - book['bid_price1']) / (
        book['ask_price1'] + book['bid_price1'])
    book['book_bid_spread'] = book['bid_price1'] - book['bid_price2']
    book['book_ask_spread'] = book['ask_price1'] - book['ask_price2']
    book['book_total_volume'] = book['ask_size1'] + book['ask_size2'] + book[
        'bid_size1'] + book['bid_size2']
    book['book_volume_imbalance'] = (book['ask_size1'] + book['ask_size2']) - (
        book['bid_size1'] + book['bid_size2'])
    return book


def feature_agg(book, trade):
    """
    """
    # 聚合生成特征
    book_feats = book.columns[book.columns.str.startswith('book_')].tolist()
    trade_feats = ['price', 'size', 'order_count', 'seconds_in_bucket']

    trade = trade.groupby(['time_id', 'stock_id'])[trade_feats].agg(
        ['sum', 'mean', 'std', 'max', 'min']).reset_index()

    book = book.groupby(['time_id', 'stock_id'])[book_feats].agg(
        [lambda x: realized_volatility(log_return(x))]).reset_index()

    # 修改特征名称
    book.columns = ["".join(col).strip() for col in book.columns.values]
    trade.columns = ["".join(col).strip() for col in trade.columns.values]
    df_ret = book.merge(trade, how='left', on=['time_id', 'stock_id'])
    return df_ret

def gen_data_test(stock_id=0):
    """
    """
    book = load_book(stock_id, 'test')
    trade = load_trade(stock_id, 'test')

    book = book.sort_values(by=['time_id', 'seconds_in_bucket'])
    trade = trade.sort_values(by=['time_id', 'seconds_in_bucket'])

    book = feature_row(book)

    df_ret = feature_agg(book, trade)

    return df_ret

def gen_data_multi(stock_lst, data_type='train'):
    """
    """
    with Pool(cpu_count()) as p:
        if data_type == 'train':
            feature_dfs = list(
                tqdm(p.imap(gen_data_train, stock_lst), total=len(stock_lst)))
        if data_type == 'test':
            feature_dfs = list(
                tqdm(p.imap(gen_data_test, stock_lst), total=len(stock_lst)))
    df_ret = pd.concat(feature_dfs)
    return df_ret

def gen_data_encoding(df_ret, df_label, data_type='train'):
    """
    test 不使用自己数据的 stock_id encoding
    """

    # 对 stock_id 进行 encoding
    vol_feats = [f for f in df_ret.columns if ('lambda' in f) & ('wap' in f)]
    if data_type == 'train':
        # agg
        stock_df = df_ret.groupby('stock_id')[vol_feats].agg([
            'mean',
            'std',
            'max',
            'min',
        ]).reset_index()

        # fix column names
        stock_df.columns = ['stock_id'] + [
            f'{f}_stock' for f in stock_df.columns.values.tolist()[1:]
        ]
        stock_df = fix_jsonerr(stock_df)

    # 对 time_id 进行 encoding
    time_df = df_ret.groupby('time_id')[vol_feats].agg([
        'mean',
        'std',
        'max',
        'min',
    ]).reset_index()
    time_df.columns = ['time_id'] + [
        f'{f}_time' for f in time_df.columns.values.tolist()[1:]
    ]

    # merge
    df_ret = df_ret.merge(time_df, how='left', on='time_id')

    # make sure to fix json error for lighgbm
    df_ret = fix_jsonerr(df_ret)
    # out
    if data_type == 'train':
        df_ret = df_ret.merge(stock_df, how='left', on='stock_id').merge(
            df_label, how='left',
            on=['stock_id', 'time_id']).replace([np.inf, -np.inf],
                                                np.nan).fillna(method='ffill')
        return df_ret
    if data_type == 'test':
        stock_df = pd.read_pickle(os.path.join(input_dir,'20210805.pkl'))
        df_ret = df_ret.merge(stock_df, how='left', on= ['stock_id']).replace([np.inf, -np.inf],
                                                np.nan).fillna(method='ffill')
        return df_ret

In [121]:
path_lst = glob(os.path.join(input_dir,'book_test.parquet/*'))
stock_lst = [os.path.basename(path).split('=')[-1] for path in path_lst]

In [80]:
df_ret_test = gen_data_multi(stock_lst, data_type='test')

100%|██████████| 1/1 [00:00<00:00,  9.54it/s]


In [88]:
df_all_test = gen_data_encoding(df_ret_test, None, data_type = 'test')

In [101]:
feature_name = pd.read_pickle(os.path.join(features_name_dir,'features_name.pkl')).values.reshape(-1).tolist()

In [114]:
y_preds = np.zeros(len(df_all_test))
model_lst = glob(os.path.join(model_dir,'*model*.pkl'))
for i, model_path in tqdm(enumerate(model_lst)):
    model = joblib.load(model_path)
    y_preds += model.predict(df_all_test[feature_name])
y_preds /= (i+1)

10it [00:02,  4.39it/s]


In [115]:
df_all_test['row_id'] = df_all_test['stock_id'].astype(str)+'-'+df_all_test['time_id'].astype(str)
df_all_test['target'] = y_preds

In [117]:
submit = df_all_test[['row_id','target']]

In [118]:
submit.to_csv('submission.csv',index = False)

In [120]:
submit.head(5)

Unnamed: 0,row_id,target
0,0-4,0.001096
