### Jane Street Real-Time Market Data Forecasting with GRU

The GRU model is trained with all available data except one for testing.

Link to the competition: https://www.kaggle.com/competitions/jane-street-real-time-market-data-forecasting/overview

In [1]:
import numpy as np
import polars as pls
from pathlib import Path

import pytorch_lightning as pl
from pytorch_lightning.loggers import WandbLogger
import torch
from torch.utils.data import DataLoader, Dataset
from torch import nn
#from torch.optim.lr_scheduler import StepLR
#from torchmetrics.functional import r2_score

#import plotly.express as px

import wandb

In [2]:
data_path = "/home/yang/kaggle/jane/data"

In [3]:
# for each training set, we take 20% of the data for validation
#frac_train = 0.8
train_raw_data_num = ["0", "1", "2", "4", "5", "6", "8", "9"]
# a completely new dataset for testing
test_raw_data_num = "7"

In [4]:
train_feature_list = ["time_id", "symbol_id"] + [f"feature_{idx:02d}" for idx in range(79)]

In [5]:
num_features = len(train_feature_list)

In [6]:
sample_testing_data = pls.read_parquet(Path(data_path, "test.parquet", f"date_id=0", "part-0.parquet"))
num_sample_testing_data = len(sample_testing_data)

In [7]:
class TimeseriesDataset(Dataset):
    def __init__(self, df: pls.DataFrame):
        df = df.fill_null(0)
        self.features = torch.tensor(df.select([col for col in df.columns if col in train_feature_list]).to_numpy(), dtype=torch.float32)
        self.target = torch.tensor(df.select(pls.col("responder_6")).to_numpy(), dtype=torch.float32)

    def __len__(self):
        return len(self.features)

    def __getitem__(self, idx):
        return self.features[idx], self.target[idx]

In [8]:
class DataModule(pl.LightningDataModule):
    def __init__(self, file_path: str, batch_size: int = 32):
        super().__init__()
        self.file_path = file_path
        self.batch_size = batch_size

    def train_dataloader(self):
        df = pls.read_parquet(self.file_path)  # Adjust for your file format (e.g., CSV, Parquet)
        dataset = TimeseriesDataset(df)
        return DataLoader(dataset, batch_size=self.batch_size, shuffle=False, num_workers=15, multiprocessing_context='fork')

In [9]:
class GRURegressor(pl.LightningModule):
    def __init__(self, input_dim: int, hidden_dim: int = 128, num_layers: int = 2, lr: float = 1e-3):
        super().__init__()
        self.save_hyperparameters()
        self.lr = lr
        # Define GRU layer
        self.gru = nn.GRU(input_dim, hidden_dim, num_layers, batch_first=True)

        # Define a fully connected layer to map GRU outputs to a single value
        self.fc = nn.Linear(hidden_dim, 1)

        self.training_step_outputs = []
        self.validation_step_outputs = []
        self.test_step_outputs = []
        self.criterion = nn.MSELoss()
        #self.criterion = r2_score

    def forward(self, x):
        # Forward pass through GRU
        #_, hidden = self.gru(x)  # hidden is the last hidden state
        outputs, _ = self.gru(x)
        
        # Pass the last hidden state through the fully connected layer
        #output = self.fc(hidden[-1])
        output = self.fc(outputs)
        return output

    def training_step(self, batch, batch_idx):
        x, y = batch
        y_hat = self(x).squeeze()
        loss = self.criterion(y_hat, y.squeeze())
        self.training_step_outputs.append(loss.item())
        self.log("train_loss", loss)
        return loss
    
    def on_train_epoch_end(self):
        epoch_average = torch.tensor(self.training_step_outputs).mean()
        self.log("training_epoch_average", epoch_average)
        self.training_step_outputs.clear()  # free memory

    def validation_step(self, batch, batch_idx):
        x, y = batch
        y_hat = self(x).squeeze()
        loss = self.criterion(y_hat, y.squeeze())
        self.validation_step_outputs.append(loss.item())
        self.log("val_loss", loss, prog_bar=True)
        return {"val_loss": loss}

    def on_validation_epoch_end(self):
        avg_val_loss = torch.tensor(self.validation_step_outputs).mean()
        self.log("avg_val_loss", avg_val_loss)
        self.validation_step_outputs.clear()

    def test_step(self, batch, batch_idx):
        x, y = batch
        y_hat = self(x).squeeze()
        loss = self.criterion(y_hat, y.squeeze())
        self.test_step_outputs.append(loss.item())
        self.log("test_loss", loss)
        return {"test_loss": loss}
    
    def on_test_epoch_end(self):
        epoch_average = torch.tensor(self.test_step_outputs).mean()
        self.log("test_epoch_average", epoch_average)
        self.test_step_outputs.clear()  # free memory

    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=self.lr)

In [None]:
# define hyperparameters and the 
parameters = dict(
    epoch = 2,
    input_dim = num_features,
    hidden_dim = 64,
    num_layers = 2,
    batch_size = 30000,
    #dropout = 0.0,
    learning_rate = 0.1,
    dataset = 'Jane street market data',
    architecture = 'GRU'
)

# initialize weights & biases service
mode = 'online'
#mode = 'disabled'
wandb.init(config=parameters, project='jane_street', entity='git-yang', mode=mode)
config = wandb.config
wandb_logger = WandbLogger(log_model="all")

In [None]:
%%time
file_paths = [Path(data_path, "train.parquet", f"partition_id={i}", "part-0.parquet") for i in train_raw_data_num]

model = GRURegressor(input_dim=config.input_dim, hidden_dim=config.hidden_dim, num_layers=config.num_layers, lr=config.learning_rate)

#model = GRURegressor.load_from_checkpoint("model_init/mlp_hidden_64_checkpoint.ckpt")
#model = GRURegressor.load_from_checkpoint("model_init/jane_mlp_hidden_32_epoch_30.ckpt")
wandb.watch(model)

for file_path in file_paths:
    print(f"Traing on dataset: {file_path}")

    # Initialize DataModule and model
    datamodule = DataModule(file_path, batch_size=config.batch_size)

    # Training using PyTorch Lightning
    trainer = pl.Trainer(max_epochs=config.epoch, accelerator="auto", devices="auto", logger=wandb_logger)

    # Train with dataframes sequentially
    trainer.fit(model, train_dataloaders=datamodule.train_dataloader())

trainer.save_checkpoint("model_checkpoint.ckpt")

### Evaluation

In [10]:
# Evaluation with testing dataset
def test_dataloader(df: pls.DataFrame, batch_size: int = 10000):
    dataset = TimeseriesDataset(df)
    return DataLoader(dataset, batch_size=batch_size, shuffle=False, num_workers=15, multiprocessing_context='fork')

In [11]:
test_data = pls.read_parquet(Path(data_path, "train.parquet", f"partition_id={test_raw_data_num}", "part-0.parquet"))
data_loader = test_dataloader(test_data, batch_size=10000)

In [None]:
test_results = trainer.test(model, data_loader)

In [15]:
wandb.finish()

## Evaluation using the given metric

In [13]:
# load model
model = GRURegressor.load_from_checkpoint("model_init/jane_gru_hidden_64_layer_2_rmse.ckpt")

In [14]:
def sample_weighted_zero_mean_r2(y_pred, y_truth, weight):
    """
    Zero-mean R-squared metrics.

    Args:
        y_pred: Array of predicted values.
        y_truth: Array of true values.
        weight: Array of sample weights.

    Returns:
        1-corr: Zero-mean R-squared.
    """

    # Ensure weights are valid
    weight = weight if weight is not None else np.ones_like(y_pred)
    
    corr = np.sum((weight * (y_truth - y_pred) ** 2)) / np.sum(weight * y_truth ** 2)
    
    return 1 - corr 

In [16]:
# Assuming your model is already defined as `model`
device = "cuda" if torch.cuda.is_available() else "cpu"

GRURegressor(
  (gru): GRU(81, 64, num_layers=2, batch_first=True)
  (fc): Linear(in_features=64, out_features=1, bias=True)
  (criterion): MSELoss()
)

In [17]:
# Move the model to GPU
model.to(device)

'cuda'

In [None]:
# inference without batch is slow. We will try to accerlate it with batch processing
# %%time

# test_data_subset = test_data.select([col for col in test_data.columns if col in train_feature_list])
# test_data_subset = test_data_subset.fill_null(0)

# model.eval()
# with torch.no_grad():
#     y_pred = model(torch.tensor(test_data_subset.to_numpy(), dtype=torch.float32)).squeeze().numpy()


# score = sample_weighted_zero_mean_r2(y_pred, test_data.select(pls.col("responder_6")).to_numpy()[:,0],
#                                      test_data.select(pls.col("weight")).to_numpy()[:,0])
# score

In [19]:
%%time

all_predictions = []

model.eval()
with torch.no_grad():
    for batch in data_loader:
        x, y = batch
        y_pred = model(x.to(device)).squeeze()
        all_predictions.append(y_pred)

predictions = torch.cat(all_predictions, dim=0).cpu().numpy()

score = sample_weighted_zero_mean_r2(predictions, test_data.select(pls.col("responder_6")).to_numpy()[:,0],
                                     test_data.select(pls.col("weight")).to_numpy()[:,0])
score

In addition, using fork() with Python in general is a recipe for mysterious
deadlocks and crashes.

The most likely reason you are seeing this error is because you are using the
multiprocessing module on Linux, which uses fork() by default. This will be
fixed in Python 3.14. Until then, you want to use the "spawn" context instead.

See https://docs.pola.rs/user-guide/misc/multiprocessing/ for details.

  self.pid = os.fork()
In addition, using fork() with Python in general is a recipe for mysterious
deadlocks and crashes.

The most likely reason you are seeing this error is because you are using the
multiprocessing module on Linux, which uses fork() by default. This will be
fixed in Python 3.14. Until then, you want to use the "spawn" context instead.

See https://docs.pola.rs/user-guide/misc/multiprocessing/ for details.

  self.pid = os.fork()


CPU times: user 4.57 s, sys: 1.29 s, total: 5.86 s
Wall time: 10.5 s


np.float32(-0.0010317564)