Thanks to these great works:
  - [motono0223: js24-train-gbdt-model-with-lags-singlemodel](https://www.kaggle.com/code/motono0223/js24-train-gbdt-model-with-lags-singlemodel)
  - [yuanzhezhou: jane-street-baseline-lgb-xgb-and-catboost](https://www.kaggle.com/code/yuanzhezhou/jane-street-baseline-lgb-xgb-and-catboost)

这篇notebook主要包含如下内容:
  - 如何建立滞后N个date_id的(shift, rolling)特征工程
  - 如何动态存储最近N个date_id的数据以更新lags特征

这里用到的模型是一个加入lag前一天特征的XGBoost单折模型, 在LB取得了0.0052得分.

在每个新的date_id的time_id=0到来时会给一个当前date_id前一天的lags标签, 比如当date_id=100时，会给date_id=99的标签，但是这个lags数据里的date_id还是100(为了和当前主数据对齐)； 需要将这个lags和动态存储最近N天，以构造更多的shift和rolling特征。

In [1]:
import pandas as pd
import polars as pl
import numpy as np
import os, gc
from tqdm.auto import tqdm
from matplotlib import pyplot as plt
import pickle

from sklearn.metrics import r2_score
from lightgbm import LGBMRegressor
import lightgbm as lgb
from xgboost import XGBRegressor
import xgboost as xgb
from catboost import CatBoostRegressor
from sklearn.ensemble import VotingRegressor

import warnings
warnings.filterwarnings('ignore')
pd.options.display.max_columns = None

import kaggle_evaluation.jane_street_inference_server

pl.Config.set_tbl_rows(100)
pl.Config.set_tbl_cols(400)
pl.Config.set_fmt_table_cell_list_len(5)

polars.config.Config

# Configurations

In [2]:
class CONFIG:
    debug = False
    seed = 42
    target_col = "responder_6"
    lag_cols_rename = { f"responder_{idx}_lag_1" : f"responder_{idx}" for idx in range(9)}
    lag_target_cols_name = [f"responder_{idx}" for idx in range(9)]
    lag_cols_original = ["date_id", "time_id", "symbol_id"] + [f"responder_{idx}" for idx in range(9)]
    model_path = "/kaggle/input/janestreet-public-model/xgb_001.pkl"
    lag_ndays = 4

In [3]:
def create_agg_list(day, columns):
    agg_mean_list = [pl.col(c).mean().name.suffix(f"_mean_{day}d") for c in columns]
    agg_std_list = [pl.col(c).std().name.suffix(f"_std_{day}d") for c in columns]
    agg_max_list = [pl.col(c).max().name.suffix(f"_max_{day}d") for c in columns]
    agg_last_list = [pl.col(c).last().name.suffix(f"_last_{day}d") for c in columns]
    agg_list = agg_mean_list + agg_std_list + agg_max_list + agg_last_list
    return agg_list

# Load model

In [4]:
with open( CONFIG.model_path, "rb") as fp:
    result = pickle.load(fp)
    
model = result["model"]
features = result["features"]
print(len(features))

116


在对test进行推理前需要提前构造好历史数据，需要注意的地方如下：
  - 假如用到了lags N天的特征，需要先存储train中最后N天的数据.
  - test数据中的date_id是从0开始计数的，需要预先调整train与test的date_id一致.
  - 目前不确定test数据是否是紧随train，这里先改为和test一致(-N, -N+1, ..., -2, -1).

In [5]:
history = pl.scan_parquet(
    "/kaggle/input/jane-street-realtime-marketdata-forecasting/train.parquet"
).select(['date_id','time_id','symbol_id'] + [f"responder_{idx}" for idx in range(9)]).filter(
    (pl.col("date_id")>=(1698 - CONFIG.lag_ndays))&(pl.col("date_id")<1698)
)
# 这里将历史date_id变为从-N到-1, 假设test的date_id=0紧随train的date_id=1698,
# 在第一个batch给出的lags应该是date_id=1698的responser(但date_id给的0),
# 这样history中最后一个date_id=1697变为-1, 正好可以和推理时给的lags衔接上
history = history.with_columns(
    date_id = (pl.col("date_id") - pl.lit(1698)).cast(pl.Int16)
)
history = history.collect()

# 这里是为了统一特征的dtypes(polars在concat时如果dtype对不上会报错)
history_column_types = {
    'date_id': pl.Int16,
    'time_id': pl.Int16,
    'symbol_id': pl.Int16
}
feature_column_types = {}
for f in [f"feature_{idx:02d}" for idx in range(79)]:
    feature_column_types[f] = pl.Float32

responder_column_types = {}
for f in [f"responder_{idx}" for idx in range(9)]:
    responder_column_types[f] = pl.Float32

history = history.cast(history_column_types)
history = history.cast(responder_column_types)
history.tail()

date_id,time_id,symbol_id,responder_0,responder_1,responder_2,responder_3,responder_4,responder_5,responder_6,responder_7,responder_8
i16,i16,i16,f32,f32,f32,f32,f32,f32,f32,f32,f32
-1,967,34,0.501321,0.905332,-0.819582,-0.564046,-0.223018,-0.283954,-0.045938,0.009797,-0.102538
-1,967,35,-1.113053,0.69719,-1.619031,-1.222743,-0.706082,-0.291133,0.167733,0.099704,0.32461
-1,967,36,-1.019353,-0.460962,-2.026678,-0.848606,-0.305448,-1.256913,-0.109359,-0.027474,-0.253956
-1,967,37,0.23585,0.556479,0.618944,-0.243765,-0.108361,-0.260777,-0.486923,-0.275566,-1.020708
-1,967,38,0.542563,0.513193,0.814393,0.032767,0.025435,0.311465,-0.044797,0.011133,-0.0793


In [6]:

def predict(test: pl.DataFrame, lags: pl.DataFrame | None) -> pl.DataFrame | pd.DataFrame:
    
    global history
    global lags_infer
    
    symbol_ids = test.select("symbol_id").to_numpy()[:, 0]
    current_date = test.select("date_id").to_numpy()[:, 0][0]

    if lags is not None:
        
        # 原始lags先存储到history更新历史数据
        lags = lags.rename(CONFIG.lag_cols_rename)
        lags = lags.cast(history_column_types)
        lags = lags.cast(responder_column_types)

        history = pl.concat([history, lags])
        
        # 只储存最近N天的历史数据
        history = history.filter(pl.col("date_id") > (current_date - CONFIG.lag_ndays))
        # 这里构造当前date_id下所有batch用的特征
        # 如果想用shift N天, 可以在history这样取：
        # -- shift_n_data = history.filter(pl.col("date_id") == (current_date - N))
        # 如果想用rolling平均 N天, 可以在history这样取：
        # -- rolling_n_data = history.filter(pl.col("date_id") >= (current_date - N)).agg(任何统计指标)

        # 这里用的XGB模型只使用了shift 1天的统计值
        agg_list = create_agg_list(1, CONFIG.lag_target_cols_name)
        shift_n_data = history.filter(pl.col("date_id") == current_date)
        lags_infer = shift_n_data.group_by(["date_id", "symbol_id"], maintain_order=True).agg(agg_list)
  
    
    test = test.cast(history_column_types)
    test = test.cast(feature_column_types)
    # 在一个date_id下的所有batch用到的lags_infer是相同的
    # 像lags_infer这样的统计特征在每个date_id的time_id=0时构造完成
    X_test = test.join(lags_infer, on=["date_id", "symbol_id"], how="left")
    
    preds = np.zeros((X_test.shape[0],))
    preds += model.predict(X_test[features].to_pandas().values)
    preds = np.clip(preds, a_min=-5, a_max=5)
    
    predictions = (
        test.select('row_id').with_columns(
            pl.Series(name='responder_6', values=preds, dtype=pl.Float64)
        )
    )

    assert isinstance(predictions, pl.DataFrame | pd.DataFrame)
    assert list(predictions.columns) == ['row_id', 'responder_6']
    assert len(predictions) == len(test)

    return predictions

In [7]:
inference_server = kaggle_evaluation.jane_street_inference_server.JSInferenceServer(predict)

if os.getenv('KAGGLE_IS_COMPETITION_RERUN'):
    inference_server.serve()
else:
    inference_server.run_local_gateway(
        (
            '/kaggle/input/jane-street-realtime-marketdata-forecasting/test.parquet',
            '/kaggle/input/jane-street-realtime-marketdata-forecasting/lags.parquet',
        )
    )

下面是训练时可以参考的shift N与rolling N的构造方法：

In [8]:
if CONFIG.debug:

    train = pl.read_parquet("/kaggle/input/jane-street-realtime-marketdata-forecasting/train.parquet")
    lags = train.select(pl.col(CONFIG.lag_cols_original))
    lags = lags.with_columns(date_id = pl.col("date_id") + 1)
    
    date_ids = lags.select("date_id").unique().to_series()
    agg_list = create_agg_list(1, CONFIG.lag_target_cols_name)
    
    result = []
    for date_id in tqdm(date_ids, total=len(date_ids)):
        try:
            # rolling N天
            lags_ = lags.filter((pl.col("date_id") > date_id - CONFIG.lag_ndays) & (pl.col("date_id") <= date_id))
            # shift N天
            lags_ = lags.filter((pl.col("date_id") == date_id - CONFIG.lag_ndays))
            # 为了merge，将date_id统一到对应的date_id
            # 比如在统计第10天的rolling 3天的数据时, 数据中的date_id应该是8,9,10, 统一为10和主数据对应
            lags_ = lags_.with_columns(date_id=date_id)
            lags_ = lags_.group_by(["date_id", "symbol_id"], maintain_order=True).agg(agg_list)
            result.append(lags_)
        except:
            continue
    
    lag_Ndays = pl.concat(result).sort("date_id")
    lag_Ndays = lag_Ndays.cast({"date_id": pl.Int16})
    
    train = train.join(lag_Ndays, on=["date_id", "symbol_id"],  how="left")
