In [4]:
import os
os.chdir("/media/mldadmin/home/s124mdg34_03/dev/Real-Time-Market-Data-Forecasting")

In [10]:
import pandas as pd
import polars as pl
import numpy as np
import os
from tqdm.auto import tqdm
from matplotlib import pyplot as plt
import pickle

from sklearn.metrics import r2_score
from lightgbm import LGBMRegressor
import lightgbm as lgb
from xgboost import XGBRegressor
from catboost import CatBoostRegressor
from sklearn.ensemble import VotingRegressor

import warnings
warnings.filterwarnings('ignore')
pd.options.display.max_columns = None

import kaggle_evaluation.jane_street_inference_server

In [11]:
class CONFIG:
    seed = 42
    target_col = "responder_6"
    feature_cols = ["symbol_id", "time_id"] \
        + [f"feature_{idx:02d}" for idx in range(79)] \
        + [f"responder_{idx}_lag_1" for idx in range(9)]
    categorical_cols = []

In [None]:
train = pl.scan_parquet("/kaggle/input/js24-preprocessing-create-lags/training.parquet").collect().to_pandas()
valid = pl.scan_parquet("/kaggle/input/js24-preprocessing-create-lags/validation.parquet").collect().to_pandas()
train.shape, valid.shape

In [None]:
history = pl.scan_parquet(
    "/kaggle/input/jane-street-realtime-marketdata-forecasting/train.parquet"
).select(['date_id','time_id','symbol_id'] + [f"responder_{idx}" for idx in range(9)]).filter(
    (pl.col("date_id")>=(1698 - CONFIG.lag_ndays))&(pl.col("date_id")<1698)
)
# 这里将历史date_id变为从-N到-1, 假设test的date_id=0紧随train的date_id=1698,
# 在第一个batch给出的lags应该是date_id=1698的responser(但date_id给的0),
# 这样history中最后一个date_id=1697变为-1, 正好可以和推理时给的lags衔接上
history = history.with_columns(
    date_id = (pl.col("date_id") - pl.lit(1698)).cast(pl.Int16)
)
history = history.collect()

# 这里是为了统一特征的dtypes(polars在concat时如果dtype对不上会报错)
history_column_types = {
    'date_id': pl.Int16,
    'time_id': pl.Int16,
    'symbol_id': pl.Int16
}
feature_column_types = {}
for f in [f"feature_{idx:02d}" for idx in range(79)]:
    feature_column_types[f] = pl.Float32

responder_column_types = {}
for f in [f"responder_{idx}" for idx in range(9)]:
    responder_column_types[f] = pl.Float32

history = history.cast(history_column_types)
history = history.cast(responder_column_types)
history.tail()

In [None]:
def predict(test: pl.DataFrame, lags: pl.DataFrame | None) -> pl.DataFrame | pd.DataFrame:
    
    global history
    global lags_infer
    
    symbol_ids = test.select("symbol_id").to_numpy()[:, 0]
    current_date = test.select("date_id").to_numpy()[:, 0][0]

    if lags is not None:
        
        # 原始lags先存储到history更新历史数据
        lags = lags.rename(CONFIG.lag_cols_rename)
        lags = lags.cast(history_column_types)
        lags = lags.cast(responder_column_types)

        history = pl.concat([history, lags])
        
        # 只储存最近N天的历史数据
        history = history.filter(pl.col("date_id") > (current_date - CONFIG.lag_ndays))
        # 这里构造当前date_id下所有batch用的特征
        # 如果想用shift N天, 可以在history这样取：
        # -- shift_n_data = history.filter(pl.col("date_id") == (current_date - N))
        # 如果想用rolling平均 N天, 可以在history这样取：
        # -- rolling_n_data = history.filter(pl.col("date_id") >= (current_date - N)).agg(任何统计指标)

        # 这里用的XGB模型只使用了shift 1天的统计值
        agg_list = create_agg_list(1, CONFIG.lag_target_cols_name)
        shift_n_data = history.filter(pl.col("date_id") == current_date)
        lags_infer = shift_n_data.group_by(["date_id", "symbol_id"], maintain_order=True).agg(agg_list)
  
    
    test = test.cast(history_column_types)
    test = test.cast(feature_column_types)
    # 在一个date_id下的所有batch用到的lags_infer是相同的
    # 像lags_infer这样的统计特征在每个date_id的time_id=0时构造完成
    X_test = test.join(lags_infer, on=["date_id", "symbol_id"], how="left")
    
    preds = np.zeros((X_test.shape[0],))
    preds += model.predict(X_test[features].to_pandas().values)
    preds = np.clip(preds, a_min=-5, a_max=5)
    
    predictions = (
        test.select('row_id').with_columns(
            pl.Series(name='responder_6', values=preds, dtype=pl.Float64)
        )
    )

    assert isinstance(predictions, pl.DataFrame | pd.DataFrame)
    assert list(predictions.columns) == ['row_id', 'responder_6']
    assert len(predictions) == len(test)

    return predictions

In [None]:
inference_server = kaggle_evaluation.jane_street_inference_server.JSInferenceServer(predict)

if os.getenv('KAGGLE_IS_COMPETITION_RERUN'):
    inference_server.serve()
else:
    inference_server.run_local_gateway(
        (
            '/kaggle/input/jane-street-realtime-marketdata-forecasting/test.parquet',
            '/kaggle/input/jane-street-realtime-marketdata-forecasting/lags.parquet',
        )
    )

In [None]:
if CONFIG.debug:

    train = pl.read_parquet("/kaggle/input/jane-street-realtime-marketdata-forecasting/train.parquet")
    lags = train.select(pl.col(CONFIG.lag_cols_original))
    lags = lags.with_columns(date_id = pl.col("date_id") + 1)
    
    date_ids = lags.select("date_id").unique().to_series()
    agg_list = create_agg_list(1, CONFIG.lag_target_cols_name)
    
    result = []
    for date_id in tqdm(date_ids, total=len(date_ids)):
        try:
            # rolling N天
            lags_ = lags.filter((pl.col("date_id") > date_id - CONFIG.lag_ndays) & (pl.col("date_id") <= date_id))
            # shift N天
            lags_ = lags.filter((pl.col("date_id") == date_id - CONFIG.lag_ndays))
            # 为了merge，将date_id统一到对应的date_id
            # 比如在统计第10天的rolling 3天的数据时, 数据中的date_id应该是8,9,10, 统一为10和主数据对应
            lags_ = lags_.with_columns(date_id=date_id)
            lags_ = lags_.group_by(["date_id", "symbol_id"], maintain_order=True).agg(agg_list)
            result.append(lags_)
        except:
            continue
    
    lag_Ndays = pl.concat(result).sort("date_id")
    lag_Ndays = lag_Ndays.cast({"date_id": pl.Int16})
    
    train = train.join(lag_Ndays, on=["date_id", "symbol_id"],  how="left")