In [1]:
%%capture

import polars as pl
import pandas as pd
import numpy as np
import warnings; warnings.filterwarnings(action='ignore')

from gc import collect
from pprint import pprint

In [2]:
TARGET = "responder_6"
FEATURES = [f"feature_{i:02d}" for i in range(79)]

RANDOM_STATE = 42
DEV_START_ID = 4_500_000
VERSION_NUMBER  = "V1_1"

# **DATA LOADING**

Here, we load the data and describe the CV scheme. We don't need to specify the sub-directory paths while importing the datasets; polars knows to import all training components as this is a **hive** dataset. Specifying the train path is enough. Weights parameter is important here — this is a sample weight used in our custom eval-metric.


In [3]:
%%time 

id_col = pl.int_range(pl.len(), dtype=pl.UInt32).alias("id") # Generate an id column
all_cols = pl.all() # Select all columns

# Read the parquet file and select the specified columns
file_path = "/kaggle/input/jane-street-real-time-market-data-forecasting/train.parquet"
train = pl.scan_parquet(file_path).select(id_col, all_cols)

all_col_names = train.collect_schema().names()
cols_of_disinterest = ("weight", "id", "date_id", "time_id", "partition_id")
target_columns, selected_columns = [], []

# Factory for loop to classify train and target column names
for col in all_col_names: 
    if col.startswith("responder"):
        target_columns.append(col)
        
    elif not col.startswith(cols_of_disinterest):
        selected_columns.append(col)
        
sample_weight = train.select(pl.col("weight")).collect().to_series()
collect()

CPU times: user 256 ms, sys: 361 ms, total: 617 ms
Wall time: 647 ms


0

In [4]:
date_column = train.select(pl.col("date_id")).collect()

train_length = date_column.shape[0]
offline_train_length = train_length - DEV_START_ID
last_train_date = date_column.row(offline_train_length)[0]

print(f"Last offline train date = {last_train_date}\n")

train_XY = train.filter(pl.col("date_id").le(last_train_date))
test_XY = train.filter(pl.col("date_id").gt(last_train_date))

Last offline train date = 1577



# **PREPROCESSING**

In [6]:
train_data = train_XY.collect()
#train_subset = train_data.limit(500_000).to_pandas()

# del train_data
# collect()

In [17]:
def weighted_r2(y_true, y_pred, weight):
    """Custom Weighted R^2."""
    
    weighted_r2 = 1 - np.sum(weight * (y_true - y_pred)**2) / np.sum(weight * y_true**2)
    
    return weighted_r2

# **FIT AND PREDICT**

In [21]:
train_data = train_subset
weights = sample_weight.limit(500_000).to_pandas()

train_data = train_data.fillna(0)

X_train, y_train = train_data[FEATURES], train_data[TARGET]

In [22]:
from sklearn.linear_model import Ridge

model = Ridge()
model.fit(X_train, y_train)

train_pred = model.predict(X_train)
test_pred = model.predict(X_test)

print(weighted_r2(y_train, train_pred, weights))
print(weighted_r2(y_test, test_pred, weights))

  return linalg.solve(A, Xy, assume_a="pos", overwrite_a=True).T


0.017258107662200928
-0.008439421653747559


In [None]:
lags_ : pl.DataFrame | None = None

# You can return either a Pandas or Polars dataframe, though Polars is recommended.
# Each batch of predictions (except the very first) must be returned within 10 minutes of the batch features being provided.
def predict(test: pl.DataFrame, lags: pl.DataFrame | None) -> pl.DataFrame | pd.DataFrame:
    """Make a prediction."""
    # All the responders from the previous day are passed in at time_id == 0. We save them in a global variable for access at every time_id.
    # Use them as extra features, if you like.
    global lags_
    
    if lags is not None:
        lags_ = lags

    # Predictions are clipped between -5 and 5.
    predictions = test.select(
        'row_id',
        pl.lit(0.0).clip(-5, 5).alias('responder_6'),
    )

    # The predict function must return a DataFrame
    assert isinstance(predictions, pl.DataFrame | pd.DataFrame)
    # with columns 'row_id', 'responder_6'
    assert predictions.columns == ['row_id', 'responder_6']
    # and as many rows as the test data.
    assert len(predictions) == len(test)

    return predictions

In [None]:
inference_server = kaggle_evaluation.jane_street_inference_server.JSInferenceServer(predict)

if os.getenv('KAGGLE_IS_COMPETITION_RERUN'):
    inference_server.serve()
else:
    inference_server.run_local_gateway(
        (
            '/kaggle/input/jane-street-real-time-market-data-forecasting/test.parquet',
            '/kaggle/input/jane-street-real-time-market-data-forecasting/lags.parquet',
        )
    )