In [None]:
import os,gc
import glob
import numpy as np
import time
import pandas as pd
import polars as pl
import lightgbm as lgb
import xgboost as xgb
import pickle
import kaggle_evaluation.jane_street_inference_server
from sklearn.linear_model import Ridge

import torch
import torch.nn as nn
import torch.nn.functional as F
import tqdm
from pytorch_lightning import (LightningDataModule, LightningModule, Trainer)
from pytorch_lightning.callbacks import EarlyStopping, ModelCheckpoint, Timer

In [None]:
class CONFIG:
    seed = 42
    target_name = "responder_6"
    weight_name = 'weight'
    feature_names = [f"feature_{i:02d}" for i in range(79)] + [f"responder_{idx}_lag_1" for idx in range(9)]
    lag_cols_rename = { f"responder_{idx}" : f"responder_{idx}_lag_1" for idx in range(9)}
    ridge_features = [item for item in feature_names if item not in ['feature_09','feature_10','feature_11']]
    
    DEBUG = False

    retrain_every_n_days = 20
    retrain_after_n_days = 20
    retrain_last_n_days_data = 250

    cache_cols = ['date_id', 'time_id', 'symbol_id', 'weight', 'feature_00', 'feature_01', 'feature_02', 'feature_03', 'feature_04', 'feature_05', 'feature_06', 'feature_07', 'feature_08', 'feature_09', 'feature_10', 'feature_11', 'feature_12', 'feature_13', 'feature_14', 'feature_15', 'feature_16', 'feature_17', 'feature_18', 'feature_19', 'feature_20', 'feature_21', 'feature_22', 'feature_23', 'feature_24', 'feature_25', 'feature_26', 'feature_27', 'feature_28', 'feature_29', 'feature_30', 'feature_31', 'feature_32', 'feature_33', 'feature_34', 'feature_35', 'feature_36', 'feature_37', 'feature_38', 'feature_39', 'feature_40', 'feature_41', 'feature_42', 'feature_43', 'feature_44', 'feature_45', 'feature_46', 'feature_47', 'feature_48', 'feature_49', 'feature_50', 'feature_51', 'feature_52', 'feature_53', 'feature_54', 'feature_55', 'feature_56', 'feature_57', 'feature_58', 'feature_59', 'feature_60', 'feature_61', 'feature_62', 'feature_63', 'feature_64', 'feature_65', 'feature_66', 'feature_67', 'feature_68', 'feature_69', 'feature_70', 'feature_71', 'feature_72', 'feature_73', 'feature_74', 'feature_75', 'feature_76', 'feature_77', 'feature_78', 'responder_0_lag_1', 'responder_1_lag_1', 'responder_2_lag_1', 'responder_3_lag_1', 'responder_4_lag_1', 'responder_5_lag_1', 'responder_6_lag_1', 'responder_7_lag_1', 'responder_8_lag_1']
    newmodel_lr = 0.1
    newmodel_num_boost_round = 50

    retrain_feature_num = 45

# xgb chinese version

In [None]:
lag_ndays = 2

history = pl.scan_parquet("/kaggle/input/jane-street-real-time-market-data-forecasting/train.parquet").select(['date_id','time_id','symbol_id'] + [f"responder_{idx}" for idx in range(9)]).filter(
    (pl.col("date_id")>=(1698 - lag_ndays))&(pl.col("date_id")<1698)
)
# 这里将历史date_id变为从-N到-1, 假设test的date_id=0紧随train的date_id=1698,
# 在第一个batch给出的lags应该是date_id=1698的responser(但date_id给的0),
# 这样history中最后一个date_id=1697变为-1, 正好可以和推理时给的lags衔接上
history = history.with_columns(
    date_id = (pl.col("date_id") - pl.lit(1698)).cast(pl.Int16)
)
history = history.collect()

# 这里是为了统一特征的dtypes(polars在concat时如果dtype对不上会报错)
history_column_types = {
    'date_id': pl.Int16,
    'time_id': pl.Int16,
    'symbol_id': pl.Int16
}
feature_column_types = {}
for f in [f"feature_{idx:02d}" for idx in range(79)]:
    feature_column_types[f] = pl.Float32

responder_column_types = {}
for f in [f"responder_{idx}" for idx in range(9)]:
    responder_column_types[f] = pl.Float32

history = history.cast(history_column_types)
history = history.cast(responder_column_types)

model_path = "/kaggle/input/janestreet-public-model/xgb_001.pkl"
with open(model_path, "rb") as fp:
    result = pickle.load(fp)
model_pub_xgb_cn_ver = result["model"]
features_pub_xgb_cn_ver = result["features"]

def predict_public_xgb_chinese_version(test: pl.DataFrame, lags: pl.DataFrame | None):
    global history
    global lags_infer
    lag_cols_rename = { f"responder_{idx}_lag_1" : f"responder_{idx}" for idx in range(9)}
    lag_target_cols_name = [f"responder_{idx}" for idx in range(9)]
    lag_cols_original = ["date_id", "time_id", "symbol_id"] + [f"responder_{idx}" for idx in range(9)]
    def create_agg_list(day, columns):
        agg_mean_list = [pl.col(c).mean().name.suffix(f"_mean_{day}d") for c in columns]
        agg_std_list = [pl.col(c).std().name.suffix(f"_std_{day}d") for c in columns]
        agg_max_list = [pl.col(c).max().name.suffix(f"_max_{day}d") for c in columns]
        agg_last_list = [pl.col(c).last().name.suffix(f"_last_{day}d") for c in columns]
        agg_list = agg_mean_list + agg_std_list + agg_max_list + agg_last_list
        return agg_list


    symbol_ids = test.select("symbol_id").to_numpy()[:, 0]
    current_date = test.select("date_id").to_numpy()[:, 0][0]

    if lags is not None:
        # 原始lags先存储到history更新历史数据
        lags = lags.rename(lag_cols_rename)
        lags = lags.cast(history_column_types)
        lags = lags.cast(responder_column_types)
        history = pl.concat([history, lags])
        
        # 只储存最近N天的历史数据
        history = history.filter(pl.col("date_id") > (current_date - lag_ndays))
        # 这里用的XGB模型只使用了shift 1天的统计值
        agg_list = create_agg_list(1, lag_target_cols_name)
        shift_n_data = history.filter(pl.col("date_id") == current_date)
        lags_infer = shift_n_data.group_by(["date_id", "symbol_id"], maintain_order=True).agg(agg_list)
  
    test = test.cast(history_column_types)
    test = test.cast(feature_column_types)
    # 在一个date_id下的所有batch用到的lags_infer是相同的
    # 像lags_infer这样的统计特征在每个date_id的time_id=0时构造完成
    X_test = test.join(lags_infer, on=["date_id", "symbol_id"], how="left")
    
    preds = np.zeros((X_test.shape[0],))
    preds += model_pub_xgb_cn_ver.predict(X_test[features_pub_xgb_cn_ver].to_pandas().values)
    return preds

# xgb motono

In [None]:
def predict_public_xgb_motono(test_with_lag_features: pl.DataFrame):
    xgb_model = None
    with open( "/kaggle/input/js-with-lags-trained-xgb/result.pkl", "rb") as fp:
        result = pickle.load(fp)
        xgb_model = result["model"]
    
    xgb_feature_cols = ["symbol_id", "time_id"] + CONFIG.feature_names
    
    preds = np.zeros((test_with_lag_features.shape[0],))
    preds += xgb_model.predict(test_with_lag_features[xgb_feature_cols].to_pandas())
    return preds

# nn

In [None]:
class NN(LightningModule):
    def __init__(self, input_dim, hidden_dims, dropouts, lr, weight_decay):
        super().__init__()
        self.save_hyperparameters()
        layers = []
        in_dim = input_dim
        for i, hidden_dim in enumerate(hidden_dims):
            layers.append(nn.BatchNorm1d(in_dim))
            if i > 0:
                layers.append(nn.SiLU())
            if i < len(dropouts):
                layers.append(nn.Dropout(dropouts[i]))
            layers.append(nn.Linear(in_dim, hidden_dim))
            # layers.append(nn.ReLU())
            in_dim = hidden_dim
        layers.append(nn.Linear(in_dim, 1)) 
        layers.append(nn.Tanh())
        self.model = nn.Sequential(*layers)
        self.lr = lr
        self.weight_decay = weight_decay
        self.validation_step_outputs = []

    def forward(self, x):
        return 5 * self.model(x).squeeze(-1)  

    def training_step(self, batch):
        x, y, w = batch
        y_hat = self(x)
        loss = F.mse_loss(y_hat, y, reduction='none') * w  #
        loss = loss.mean()
        self.log('train_loss', loss, on_step=False, on_epoch=True, batch_size=x.size(0))
        return loss

    def validation_step(self, batch):
        x, y, w = batch
        y_hat = self(x)
        loss = F.mse_loss(y_hat, y, reduction='none') * w
        loss = loss.mean()
        self.log('val_loss', loss, on_step=False, on_epoch=True, batch_size=x.size(0))
        self.validation_step_outputs.append((y_hat, y, w))
        return loss

    def on_validation_epoch_end(self):
        """Calculate validation WRMSE at the end of the epoch."""
        y = torch.cat([x[1] for x in self.validation_step_outputs]).cpu().numpy()
        if self.trainer.sanity_checking:
            prob = torch.cat([x[0] for x in self.validation_step_outputs]).cpu().numpy()
        else:
            prob = torch.cat([x[0] for x in self.validation_step_outputs]).cpu().numpy()
            weights = torch.cat([x[2] for x in self.validation_step_outputs]).cpu().numpy()
            # r2_val
            val_r_square = r2_val(y, prob, weights)
            self.log("val_r_square", val_r_square, prog_bar=True, on_step=False, on_epoch=True)
        self.validation_step_outputs.clear()

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=self.lr, weight_decay=self.weight_decay)
        scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5,
                                                               verbose=True)
        return {
            'optimizer': optimizer,
            'lr_scheduler': {
                'scheduler': scheduler,
                'monitor': 'val_loss',
            }
        }

    def on_train_epoch_end(self):
        if self.trainer.sanity_checking:
            return
        epoch = self.trainer.current_epoch
        metrics = {k: v.item() if isinstance(v, torch.Tensor) else v for k, v in self.trainer.logged_metrics.items()}
        formatted_metrics = {k: f"{v:.5f}" for k, v in metrics.items()}
        print(f"Epoch {epoch}: {formatted_metrics}")

N_folds = 5
nn_models = []
device = torch.device(f'cuda:0' if torch.cuda.is_available() else 'cpu')

nn_features = [item for item in CONFIG.feature_names if item not in ['feature_09','feature_10','feature_11']]
nn_model_root_path = '/kaggle/input/js-my-offline-nn-model-5fold'

# nn_features = [item for item in CONFIG.feature_names]
# nn_model_root_path = '/kaggle/input/js-xs-nn-trained-model'

for checkpoint_path in glob.glob(nn_model_root_path + r'/*'):
    model = NN.load_from_checkpoint(checkpoint_path)
    nn_models.append(model.to("cuda:0"))

def predict_nn(test_with_lag_features: pl.DataFrame):
    # fill na 0
    test_input = test_with_lag_features[nn_features].to_pandas()
    test_input = test_input.fillna(0)
    preds = np.zeros((test_input.shape[0],))
    test_input = torch.FloatTensor(test_input.values).to(device)
    with torch.no_grad():
        for i, nn_model in enumerate(nn_models):
            nn_model.eval()
            preds += nn_model(test_input).cpu().numpy() / len(nn_models)

    return preds

# my xgb lgb cat

In [None]:
my_gbdt_models = []
my_gbdt_feature_names = [f"feature_{i:02d}" for i in range(79)] + [f"responder_{idx}_lag_1" for idx in range(9)]

for file in glob.glob('/kaggle/input/js-xgb-lgb-cat-offline-model-2025-1-12/*model*'):
    
    print(file)
    with open(file,'rb') as fp:
        my_gbdt_models.append(pickle.load(fp))

def predict_my_offline_gbdt(test_with_lag_features: pl.DataFrame):
    preds = np.zeros((test_with_lag_features.shape[0],))
    
    feat = test_with_lag_features[my_gbdt_feature_names].to_numpy()
    
    pred = [model.predict(feat) for model in my_gbdt_models]
    pred = np.mean(pred, axis=0)
    pred = np.clip(pred, a_min=-5, a_max=5)
    return preds

In [None]:
# load model
offline_lgbm_path = '/kaggle/input/lgbm-offline-model/lgbm_model_offline_450iter.json'
lgbm_model = lgb.Booster(model_file=offline_lgbm_path)
features_retrain_indexs = np.argsort(lgbm_model.feature_importance())[::-1][:CONFIG.retrain_feature_num]

offline_cache_path = '/kaggle/input/data-create-create-lags/offline_cache.parquet'
offline_label_path = '/kaggle/input/data-create-create-lags/offline_labels.parquet'

In [None]:
params = {
    'objective': 'regression',
    "device"           : "gpu",
    'metric': 'l2',                                      # Root Mean Squared Error
    'boosting_type': 'gbdt',                               # Gradient Boosted Decision Trees
    "colsample_bytree" : 0.8,
    "subsample"        : 0.8,
    "num_leaves"        : 31,
    # "reg_alpha"        : 0.1,
    # "reg_lambda"       : 1.0,
    'learning_rate': CONFIG.newmodel_lr,
    'n_estimators':   CONFIG.newmodel_num_boost_round,
}

# predict function

In [None]:
# Initialize global vars
cache = None
cache_list = []
# tot nb of days counter
day_count = 0
# training counter to be reset after each train
train_counter = 0
new_lgbm_model = None
online_ridge_model = None
lags_ : pl.DataFrame | None = None
labels : pl.DataFrame | None = None

lags_last : pl.DataFrame | None = None

cache = pl.scan_parquet(offline_cache_path).filter(pl.col("date_id").gt(-300)).collect()
labels = pl.scan_parquet(offline_label_path).filter(pl.col("date_id").gt(-300)).collect()
id_column_types = {
    'date_id': pl.Int16,
    'time_id': pl.Int16,
    'symbol_id': pl.Int16
}
cache = cache.cast(id_column_types)
labels = labels.cast(id_column_types)

# Each batch of predictions (except the very first) must be returned within 1 minute of the batch features being provided.
def predict(test: pl.DataFrame, lags: pl.DataFrame | None) -> pl.DataFrame | pd.DataFrame:
    """Make a prediction."""
    # All the responders from the previous day are passed in at time_id == 0. We save them in a global variable for access at every time_id.
    # Use them as extra features, if you like.
    global cache          # Declare the global cache
    global day_count
    global new_lgbm_model
    global lgbm_model
    global lags_
    global cache_list
    global labels
    global train_counter
    global lags_last
    global online_ridge_model

    # st_time = time.time()
    pred_public_xgb_chinese_ver = predict_public_xgb_chinese_version(test.clone(),lags)
    # print('预测xgb耗时:',time.time()-st_time)
    
    id_column_types = {
        'date_id': pl.Int16,
        'time_id': pl.Int16,
        'symbol_id': pl.Int16
    }
    test = test.cast(id_column_types)
    if lags is not None:
        lags = lags.cast(id_column_types)
    
    # Store lags, since they are provided at the start of each day but no for other tiòe_id of the same day
    if lags is not None:
        # print(f"Day count: {day_count}")
        lags_ = lags
        day_count += 1
        train_counter += 1
        # store ground truth from previous day
        update_labels = lags_["date_id", "symbol_id", "time_id","responder_6_lag_1"]
        lag_cols_rename = {"responder_6_lag_1": "responder_6"}
        update_labels = update_labels.rename(lag_cols_rename)
        if labels is not None:
            labels = pl.concat([labels, update_labels], rechunk=True)
        else:
            labels = update_labels

    # Init prediction
    predictions = test.select(
        'row_id',
        pl.lit(0.0).alias('responder_6'),
    )

    # join lag feature
    if lags is not None:
        lags_last = lags.group_by(["date_id", "symbol_id"], maintain_order=True).last() # pick up last record of previous date
        lags_last = lags_last.drop(["time_id"])
    test = test.join(lags_last, on=["date_id", "symbol_id"],  how="left")
    
    #----------------------xgb motono--------------
    pred_public_xgb_motono = predict_public_xgb_motono(test)
    pred_nn = predict_nn(test)
    # pred_my_gbdt = predict_my_offline_gbdt(test)
    
    cache_list.append(test)

    # initialize preds
    preds = np.zeros((test.shape[0],))

    # ======================== retrain part ====================================================
    # re-train a model on the fly every N days
    if train_counter % CONFIG.retrain_every_n_days == 0 and day_count>=CONFIG.retrain_after_n_days:
        print("Start retraining")
        # 更新cache 1.合并旧cache和cache_update 2.保留cache最新的time_window天
        if cache is not None:
            print('cache不是none')
            cache_update = pl.concat(cache_list, rechunk=True)
            cache_update = cache_update.select(CONFIG.cache_cols)
            cache = cache.select(CONFIG.cache_cols)
            cache = cache.cast(id_column_types)
            cache_update = cache_update.cast(id_column_types)

            # print(cache_update.columns)
            cache = pl.concat([cache, cache_update], rechunk=True)
        else:
            print('cache是none！！！！')
            cache = pl.concat(cache_list, rechunk=True)
        # store only last time_window days
        print('cache最小最大date_id',np.min(cache["date_id"].to_numpy()),np.max(cache["date_id"].to_numpy()))
        days = np.sort(np.unique(cache["date_id"].to_numpy()))
        days = days[-CONFIG.retrain_last_n_days_data:]
        min_day = np.min(days)
        print('cache最小天：', min_day)
        cache = cache.filter(pl.col("date_id") >= min_day)
        print('cache shape:',cache.shape)

        # filter labels
        # move data back to the previous day (we receive the lags at the same day but they are the ground truth of the previous day)
        df = labels.with_columns(
            (pl.col("date_id") -1).alias("date_id")
        )
        df = df.filter(pl.col("date_id") >= np.min(cache["date_id"].to_numpy()))
        
        # prepare data for training
        train = cache.join(df, on=["date_id", "symbol_id", "time_id"],  how="left")
        X_train = train[CONFIG.feature_names].to_numpy()[:,features_retrain_indexs]
        y_train = train[CONFIG.target_name].to_numpy().flatten()
        w_train = train[CONFIG.weight_name].to_numpy().flatten()

        # 取最后20w的数据作为valid
        # valid_data_num = 200000
        # X_valid = X_train[-valid_data_num:]
        # y_valid = y_train[-valid_data_num:]
        # w_valid = w_train[-valid_data_num:]
        
        # Re-train the model
        start_tim = time.time()
        train_data = lgb.Dataset(X_train, label=y_train)
        del X_train,y_train,w_train
        gc.collect()
        # valid_data = lgb.Dataset(X_valid, label=y_valid, weight=w_valid, reference=train_data)
        print('构建lgb dataset耗时：',time.time()-start_tim)
        # Re-train the model
        # new_lgbm_model = lgb.train(
        #     params,
        #     train_data,
        #     valid_sets=[train_data, valid_data],
        #     callbacks=[
        #         lgb.early_stopping(25), 
        #         lgb.log_evaluation(50)],
        # )
        new_lgbm_model = lgb.train(
            params,
            train_data,
            num_boost_round= CONFIG.newmodel_num_boost_round,
        )

        # train ridge online model
        X_train_ridge =  train[CONFIG.ridge_features].to_pandas().fillna(0).values
        y_train_ridge = train[CONFIG.target_name].to_pandas().fillna(0).values.flatten()
        del train
        gc.collect()
        online_ridge_model = Ridge()
        online_ridge_model.fit(X_train_ridge,y_train_ridge)
        
        # reset counter otherwise we will retrain for each time_id of the same day
        train_counter = 1
        # empty cache list
        cache_list = []


    # ========================retrain part end ====================================================
    
    y_pred_offline = 0.325*pred_public_xgb_chinese_ver + 0.175*pred_public_xgb_motono + 0.5*pred_nn
    X = test[CONFIG.feature_names].to_numpy()
    if new_lgbm_model:
        y_pred_online = new_lgbm_model.predict(X[:,features_retrain_indexs],num_iteration=new_lgbm_model.best_iteration)
        if online_ridge_model:
            y_pred_ridge = online_ridge_model.predict(test[CONFIG.ridge_features].to_pandas().fillna(0).values)
            y_pred_online = 0.8*y_pred_online + 0.2*y_pred_ridge
        y_pred = (0.45*y_pred_online+0.55*y_pred_offline)
    else:
        y_pred = y_pred_offline

    preds = y_pred
    # print(f"predict> preds.shape =", preds.shape)
    
    predictions = \
    test.select('row_id').\
    with_columns(
        pl.Series(
            name   = 'responder_6', 
            values = np.clip(preds, a_min = -5, a_max = 5),
            dtype  = pl.Float64,
        )
    )

    if isinstance(predictions, pl.DataFrame):
        assert predictions.columns == ['row_id', 'responder_6']
    elif isinstance(predictions, pd.DataFrame):
        assert (predictions.columns == ['row_id', 'responder_6']).all()
    else:
        raise TypeError('The predict function must return a DataFrame')
    # Confirm has as many rows as the test data.
    assert len(predictions) == len(test)

    return predictions

# debug

In [None]:
def makelag(date_id):
    """
    Making lag at the previout day

    Args:
    date_id (int): date_id at the previout day
    
    Returns:
    pl.dataframe
    """
    responder_cols = [s for s in train.columns if "responder" in s]
    lag = alltraindata.filter(pl.col("date_id")==date_id).select(["date_id","time_id","symbol_id"] + responder_cols).collect()
    lag.columns = lag_sample.columns
    
    return lag

def weighted_zero_mean_r2(y_true, y_pred, weights):
    """
    Calculate the sample weighted zero-mean R-squared score.

    Parameters:
    y_true (numpy.ndarray): Ground-truth values for responder_6.
    y_pred (numpy.ndarray): Predicted values for responder_6.
    weights (numpy.ndarray): Sample weight vector.

    Returns:
    float: The weighted zero-mean R-squared score.
    """
    numerator = np.sum(weights * (y_true - y_pred)**2)
    denominator = np.sum(weights * y_true**2)
    
    r2_score = 1 - numerator / denominator
    return r2_score


if CONFIG.DEBUG:
    lag_sample = pl.read_parquet("/kaggle/input/jane-street-real-time-market-data-forecasting/lags.parquet/date_id=0/part-0.parquet")
    alltraindata = pl.scan_parquet("/kaggle/input/jane-street-real-time-market-data-forecasting/train.parquet")
    # pick 50 days
    nb_days = 100
    train = alltraindata.filter(pl.col("date_id")>1698-nb_days).collect()
    train = train.with_columns(pl.Series(range(len(train))).alias("row_id"))


## Step 1 The data is split by day using group_by.
if CONFIG.DEBUG:
    all_submission_dataframe = []
    # Initialize global vars
    # cache = None
    cache_list = []
    # tot nb of days counter
    day_count = 0
    # training counter to be reset after each train
    train_counter = 0
    new_lgbm_model = None
    lags_ : pl.DataFrame | None = None
    # labels : pl.DataFrame | None = None
    for num_days, df_per_day in train.group_by("date_id",maintain_order=True):
        
        ## Step 2 The data is split by time_id using group_by, and the lag is generated (for time_id == 0).
        
        for time_id, test in df_per_day.group_by("time_id",maintain_order=True):
            
            ## when time_id == 0, makelags
            
            if time_id[0] == 0:
                lag = makelag(num_days[0] - 1)
            else:
                lag = None
            start_time = time.time()
            submission_dataframe = predict(test, lag)
            elapsed = time.time() - start_time
            if elapsed>10:
                print(num_days,time_id)
                print('耗时：{}'.format(elapsed))
            all_submission_dataframe.append(submission_dataframe)
            
    all_submission_dataframe = pl.concat(all_submission_dataframe)
    all_submission_dataframe

    print(weighted_zero_mean_r2(train.select("responder_6").to_numpy().reshape(-1), all_submission_dataframe.select("responder_6").to_numpy().reshape(-1), train.select("weight").to_numpy().reshape(-1)))

In [None]:
inference_server = kaggle_evaluation.jane_street_inference_server.JSInferenceServer(predict)

if os.getenv('KAGGLE_IS_COMPETITION_RERUN'):
    inference_server.serve()
else:
    inference_server.run_local_gateway(
        (
            '/kaggle/input/jane-street-real-time-market-data-forecasting/test.parquet',
            '/kaggle/input/jane-street-real-time-market-data-forecasting/lags.parquet',
        )
    )