In [1]:
import os

import pandas as pd
import polars as pl

import kaggle_evaluation.jane_street_inference_server

The evaluation API requires that you set up a server which will respond to inference requests. We have already defined the server; you just need write the predict function. When we evaluate your submission on the hidden test set the client defined in `jane_street_gateway` will run in a different container with direct access to the hidden test set and hand off the data timestep by timestep.



Your code will always have access to the published copies of the files.

In [2]:
import pickle
import time
import xgboost as xgb
import polars as pl
import numpy as np

# Global variables
lags_ : pl.DataFrame | None = None

# Use dictionaries for efficient data access
history_cache = {}
lags_cache = {}

train_flag = False
training_data = None
combined_training_data = pl.DataFrame()
train_buffer = 0
BUFFER_LIMIT = 1

train_runs = 0
TRAIN_RUN_LIMIT = 0

loaded_model = None

# Feature columns
feature_columns = ['responder_3_lag_1', 'responder_8_lag_1', 'responder_7_lag_1', 'responder_4_lag_1', 'responder_5_lag_1',
        'responder_0_lag_1', 'responder_2_lag_1', 'responder_1_lag_1', 
        'feature_06', 'feature_60', 'feature_49', 'feature_04', 'feature_07', 
        'feature_58', 'feature_59', 'feature_47', 'feature_51', 'feature_36', 
        'feature_52', 'feature_68', 'feature_13', 'feature_02', 'feature_05', 
        'feature_41', 'feature_01', 'time_id', 'feature_54', 'feature_40', 
        'feature_03', 'feature_55', 'feature_08', 'feature_19', 'feature_48', 
        'feature_00', 'feature_71', 'feature_66', 'feature_45']

def predict(test: pl.DataFrame, lags: pl.DataFrame | None) -> pl.DataFrame:
    global lags_, loaded_model, history_cache, lags_cache, train_flag, training_data, train_buffer, combined_training_data, BUFFER_LIMIT, feature_columns, train_runs, TRAIN_RUN_LIMIT

    # Extract date_id from test data
    date_id = test['date_id'][0]

    if lags is not None:
        lags_ = lags
        lags_cache[date_id] = lags
        train_flag = True

    if loaded_model is None:
        # Load the model once
        with open("/kaggle/input/xgboostregressor/other/default/1/saved_model_xgboost (2).pkl", "rb") as f:
            loaded_model = pickle.load(f)

    # Join test with lags_
    if lags_ is not None:
        combined_data = test.join(lags_, on=["date_id", "time_id", "symbol_id"], how="left")
    else:
        combined_data = test

    combined_data = combined_data.fill_null(0)  # Fill nulls with zeros

    # Prepare the input features
    X_pred = combined_data.select(feature_columns).to_numpy()
    X_pred = np.nan_to_num(X_pred)  # Replace any remaining NaNs with zeros

    # Get predictions
    predictions = loaded_model.predict(X_pred)

    # Add predictions to combined_data
    combined_data = combined_data.with_columns(pl.Series(name="responder_6", values=predictions))

    # Prepare the output
    predictions_df = combined_data.select('row_id', 'responder_6')

    # Update history_cache
    history_cache[date_id] = test

    # Training
    if train_flag:
        # Only allow model training once we have enough data
        if len(history_cache) <= 1:
            train_flag = False
        else:
            start = time.time()

            prev_date_id = date_id - 1

            # Ground truths: lags_ with date_id - 1
            ground_truths = lags_.with_columns(
                (pl.col("date_id") - 1).alias("date_id")
            )

            # Get previous test data
            if prev_date_id in history_cache:
                prev_test_data = history_cache[prev_date_id]

                training_data = ground_truths.join(prev_test_data, on=["date_id", "time_id","symbol_id"], how="left")

                # Rename columns
                rename_mapping = {f"responder_{i}_lag_1": f"responder_{i}" for i in range(9)}
                training_data = training_data.rename(rename_mapping)

                # Also join with previous lags data
                if prev_date_id in lags_cache:
                    prev_lags_data = lags_cache[prev_date_id]
                    training_data = training_data.join(prev_lags_data, on=["date_id", "time_id", "symbol_id"], how="left")

                # Accumulate training data
                if combined_training_data.is_empty():
                    combined_training_data = training_data
                else:
                    combined_training_data = combined_training_data.vstack(training_data)

                train_buffer += 1

                if train_buffer >= BUFFER_LIMIT and train_runs < TRAIN_RUN_LIMIT:
                    # Prepare training data
                    # X_train = combined_training_data.select(feature_columns).to_numpy()
                    # X_train = np.nan_to_num(X_train)
                    X_train = combined_training_data.select(feature_columns).to_pandas()

                    # print(X_train.columns)

                    # y_train = combined_training_data.select(["responder_6"]).to_numpy().ravel()
                    y_train = combined_training_data.select(["responder_6"])

                    # Continue training the model
                    loaded_model.fit(X_train, y_train, xgb_model=loaded_model)

                    # Reset training data
                    combined_training_data = pl.DataFrame()
                    train_buffer = 0
                    train_runs += 1

                # Remove old data
                del history_cache[prev_date_id]
                del lags_cache[prev_date_id]

            end = time.time()
            print(f"Total time taken: {end-start}")
            train_flag = False

    # Ensure the output DataFrame has the correct columns
    if isinstance(predictions_df, pl.DataFrame):
        assert predictions_df.columns == ['row_id', 'responder_6']
    else:
        raise TypeError('The predict function must return a Polars DataFrame')

    # Confirm has as many rows as the test data.
    assert len(predictions_df) == len(test)

    return predictions_df

When your notebook is run on the hidden test set, inference_server.serve must be called within 15 minutes of the notebook starting or the gateway will throw an error. If you need more than 15 minutes to load your model you can do so during the very first `predict` call, which does not have the usual 1 minute response deadline.

In [3]:
inference_server = kaggle_evaluation.jane_street_inference_server.JSInferenceServer(predict)

if os.getenv('KAGGLE_IS_COMPETITION_RERUN'):
    inference_server.serve()
else:
    inference_server.run_local_gateway(
        (
            '/kaggle/input/jane-street-real-time-market-data-forecasting/test.parquet',
            '/kaggle/input/jane-street-real-time-market-data-forecasting/lags.parquet',
        )
    )



In [4]:
import os
if os.path.isfile('submission.parquet'):
    pl_sub = pl.read_parquet('submission.parquet')
    display(pl_sub)

row_id,responder_6
i64,f32
0,0.138778
1,0.11631
2,0.208794
3,0.14146
4,0.015718
…,…
34,0.10175
35,0.417461
36,0.061081
37,0.102887
