In [None]:
from typing import List, Optional, Callable, Iterator, Tuple
import polars as pl
import pyarrow.parquet as pq
import torch
from torch.utils.data import Dataset, Sampler, DataLoader

import numpy as np

# Utilities

In [149]:
from functools import wraps
from time import time

def timing(f: Callable, *args, **kwargs) -> Callable:
    @wraps(f)
    def wrap(*args, **kwargs):
        ts = time()
        result = f(*args, **kwargs)
        te = time()
        print('func:%r args:[%r, %r] took: %2.4f sec' % \
          (f.__name__, args, kwargs, te-ts))
        return result
    return wrap

# Dataset for the Parquet files

In [231]:
class ParquetDataset(Dataset):
    def __init__(self, file_paths: List[str], batch_size: int = 1000, logging=True):
        self.file_paths = file_paths
        self.file_row_counts: List[int] = []  
        self.file_row_group_counts: List[List[int]] = []
        for f in file_paths:
            pq_file = pq.ParquetFile(f)
            self.file_row_group_counts.append([pq_file.metadata.row_group(i).num_rows for i in range(pq_file.num_row_groups)]) 
            self.file_row_counts.append(sum(self.file_row_group_counts[-1]))

        self.cum_row_counts: List[int] = np.array([sum(self.file_row_counts[:i]) for i in range(len(self.file_row_counts))])
        self.cum_row_group_counts: List[List[int]] = [np.array([sum(c[:i]) for i in range(len(c))]) for c in self.file_row_group_counts]

        self.cum_total_counts: List[Tuple[int, int, int]] = []
        for f in range(len(self.cum_row_counts)):
            for r in range(len(self.cum_row_group_counts[f])):
                self.cum_total_counts.append((f, r, self.cum_row_counts[f] + self.cum_row_group_counts[f][r]))
        self.cum_total_counts_np = np.array([v[2] for v in self.cum_total_counts])

        self.pq_df: Optional[pl.DataFrame] = None
        self.pq_df_idx: Optional[int] = None
        self.pq_df_batch_idx: Optional[int] = None 

        if logging:
            print("Loaded files with rows:")
            for i, file in enumerate(file_paths):
                print(f"\t{self.file_row_counts[i]} : {file}")

            print(f"{len(self)} total samples." )

    def __len__(self) -> int:
        return sum(self.file_row_counts)

    def __getitem__(self, idx: int) -> torch.Tensor:
        pass

    def get_single_row_with_row_group_batching(self, row_idx:int, file_idx: int, row_group_idx: int) -> pl.DataFrame:
        if file_idx != self.pq_df_idx or row_group_idx != self.pq_df_group_idx:
            self.load_pq_file(idx=file_idx, row_group=row_group_idx)

        return self.pq_df.row(row_idx)

    def get_single_row_with_batching(self, row_idx: int, file_idx: int) -> pl.DataFrame:
        if file_idx != self.pq_df_idx:
            self.load_pq_file(idx=file_idx)

        return self.pq_df.row(row_idx)

    def get_single_row(self, idx: int) -> pl.DataFrame:
        # Identify which file and which row within that file corresponds to idx
        file_idx: int = self.calculate_index_from_cumulative_counts(idx, self.cum_row_counts)
        row_idx: int = idx - self.cum_row_counts[file_idx]
        
        # Load data for the required row
        return self.get_row_from_pq_file(file=self.file_paths[file_idx], row_idx=row_idx)
        
    
    def calculate_file_and_row_group_index_from_global_index(self, idx: int) -> Tuple[int, int]:
        # Get the file index for this global index.
        file_index: int = self.calculate_index_from_cumulative_counts(idx, self.cum_row_counts)

        # Get the row index within this file and then the row group within that file.
        row_index = idx - self.cum_row_counts[file_index]
        row_group_idx = self.calculate_index_from_cumulative_counts(
            idx=row_index,
            cumulative_counts=self.cum_row_group_counts[file_index]
        )
        
        # Return a tuple of file idx and row group idx
        return (file_index, row_group_idx)
        
        
    def calculate_index_from_cumulative_counts(self, idx: int, cumulative_counts: np.ndarray) -> int:
        return np.searchsorted(cumulative_counts, idx, side="right") - 1
        
                
    @staticmethod
    def get_row_from_pq_file(file: str, row_idx: int):
        return pl.read_parquet(file, row_index_offset=row_idx, n_rows=1, use_pyarrow=False)

    @staticmethod
    def convert_df_row_to_tensor(row: pl.DataFrame) -> torch.Tensor:
        # TODO
        return row
    
    def load_pq_file(self, idx: int, row_group: Optional[int] = None) -> None:
        if row_group is not None:
            self.pq_df_group_idx = row_group
            self.pq_df = pl.from_arrow(pq.ParquetFile(self.file_paths[idx]).read_row_group(row_group))
        else:
            self.pq_df = pl.read_parquet(self.file_paths[idx])

        self.pq_df_idx = idx

# Instantiate the dataloader.

In [232]:
def make_train_parquet_path(i: int) -> str:
    return f"/kaggle/input/jane-street-real-time-market-data-forecasting/train.parquet/partition_id={i}/part-0.parquet"
# Setup the file indices to use.
K_MAX_TRAIN_FILES: int = 10
K_TRAIN_FILES: List[str] = [make_train_parquet_path(i) for i in range(K_MAX_TRAIN_FILES)]
K_TEST_FILES: List[str] = ["/kaggle/input/jane-street-real-time-market-data-forecasting/test.parquet/date_id=0/part-0.parquet"]

# K_TRAIN_FILE_INDICES: List[int] = [8]

train_dataset = ParquetDataset(file_paths=K_TRAIN_FILES, logging=True)
# test_dataset = ParquetDataset(file_paths=K_TEST_FILES, logging=True)



Loaded files with rows:
	1944210 : /kaggle/input/jane-street-real-time-market-data-forecasting/train.parquet/partition_id=0/part-0.parquet
	2804247 : /kaggle/input/jane-street-real-time-market-data-forecasting/train.parquet/partition_id=1/part-0.parquet
	3036873 : /kaggle/input/jane-street-real-time-market-data-forecasting/train.parquet/partition_id=2/part-0.parquet
	4016784 : /kaggle/input/jane-street-real-time-market-data-forecasting/train.parquet/partition_id=3/part-0.parquet
	5022952 : /kaggle/input/jane-street-real-time-market-data-forecasting/train.parquet/partition_id=4/part-0.parquet
	5348200 : /kaggle/input/jane-street-real-time-market-data-forecasting/train.parquet/partition_id=5/part-0.parquet
	6203912 : /kaggle/input/jane-street-real-time-market-data-forecasting/train.parquet/partition_id=6/part-0.parquet
	6335560 : /kaggle/input/jane-street-real-time-market-data-forecasting/train.parquet/partition_id=7/part-0.parquet
	6140024 : /kaggle/input/jane-street-real-time-market-da

## (Optional) Run timing analysis to check batched approach vs. naive single-row loading

In [186]:
@timing
def iterate_samples_by_row(step_size: int):
    for i in range(0, len(train_dataset), step_size):
        train_dataset.get_single_row(i)

@timing
def iterate_samples_batched(step_size: int):
    for i in range(0, len(train_dataset), step_size):
        file_idx = train_dataset.calculate_index_from_cumulative_counts(i, train_dataset.cum_row_counts)
        row_idx = i - train_dataset.cum_row_counts[file_idx]
        train_dataset.get_single_row_with_batching(row_idx=row_idx, file_idx=file_idx)

@timing
def iterate_samples_batched_row_group(step_size: int):
    for i in range(0, len(train_dataset), step_size):
        file_idx, row_group_idx = train_dataset.calculate_file_and_row_group_index_from_global_index(i)
        row_idx = i - train_dataset.cum_row_counts[file_idx] - train_dataset.cum_row_group_counts[file_idx][row_group_idx]
        train_dataset.get_single_row_with_row_group_batching(row_idx=row_idx, file_idx=file_idx, row_group_idx=row_group_idx)

# A simple batching schemem is definitely the way to go. We can iterate orders of magnitude more samples
# in the same amount of time it would take to do naive random access.
iterate_samples_by_row(step_size=1000000)
iterate_samples_by_row(step_size=100000)

iterate_samples_batched(step_size=1000)
iterate_samples_batched(step_size=100)

iterate_samples_batched_row_group(step_size=1000)
iterate_samples_batched_row_group(step_size=100)

func:'iterate_samples_by_row' args:[(), {'step_size': 1000000}] took: 4.5942 sec
func:'iterate_samples_by_row' args:[(), {'step_size': 100000}] took: 35.9613 sec
func:'iterate_samples_batched' args:[(), {'step_size': 1000}] took: 10.0974 sec
func:'iterate_samples_batched' args:[(), {'step_size': 100}] took: 16.4546 sec
func:'iterate_samples_batched_row_group' args:[(), {'step_size': 1000}] took: 15.8445 sec
func:'iterate_samples_batched_row_group' args:[(), {'step_size': 100}] took: 22.0621 sec


# Create a custom sample for randomly ordering the parquet files and rows within the parquet files

In [233]:
class ParquetBatchedSampler(Sampler[int]):
    r"""Samples elements randomly while batching by Parquet file to minimize disk i/o.
    Randomly orders the N parquet files and then provides random indices into the rows of each parquet file,
    ensuring that each file is fully sampled before movign on the to next.

    Args:
        data_source (Dataset): dataset to sample from
        generator (Generator): Generator used in sampling.
    """

    data_source: ParquetDataset

    def __init__(
        self,
        data_source: ParquetDataset,
        generator=None,
    ) -> None:
        self.data_source = data_source
        self.generator = generator

    @property
    def num_samples(self) -> int:
        # dataset size might change at runtime
        if self._num_samples is None:
            return len(self.data_source)
        return self._num_samples

    def __iter__(self) -> Iterator[int]:

        if self.generator is None:
            seed = int(torch.empty((), dtype=torch.int64).random_().item())
            generator = torch.Generator()
            generator.manual_seed(seed)
        else:
            generator = self.generator

        # Randomly sample from an ordering or partitions, shuffling all of the samples within a partition.
        for i in torch.randperm(len(self.data_source.cum_total_counts), generator=generator):
            file_idx, row_group_idx, offset = self.data_source.cum_total_counts[i]
            yield from offset + torch.randperm(self.data_source.file_row_group_counts[file_idx][row_group_idx])

    def __len__(self) -> int:
        return len(self.data_source)


## (Optional) Verify that the sampler produces samples in random parition order

In [234]:
sampler = ParquetBatchedSampler(data_source=train_dataset)
partition_ordering: List[Tuple[int,int]] = []
num_sampled_per_partition: List[int] = []
curr_partition: Optional[Tuple[int, int]] = None

for i, idx in enumerate(sampler):

    if (i % 1000000 == 0):
        print(f"{i} of {len(sampler)}\r")

    new_idxes = train_dataset.calculate_index_from_cumulative_counts(idx, train_dataset.cum_total_counts_np)

    if curr_partition != new_idxes:
        # update the partition index assuming that once we see a file index and then leave it
        # we'll never see it again. Throw an error if we do see it again.
        if new_idxes in partition_ordering:
            raise ValueError(f"File index {new_idxes} seen out of order! Already seen ordering: {partition_ordering}")
        print(f"found new file {new_idxes}")
        curr_partition = new_idxes
        partition_ordering.append(curr_partition)
        if len(num_sampled_per_partition) > 0:
            num_sampled_per_partition.append(i - num_sampled_per_partition[-1])
        else:
            num_sampled_per_partition.append(i)

print(f"Total elements: {len(sampler)}. Sampled partition ordering: {partition_ordering}")

0 of 47127338
found new file 21
found new file 165
found new file 58
found new file 16
1000000 of 47127338
found new file 32
found new file 50
found new file 18
found new file 59
2000000 of 47127338
found new file 89
found new file 53
found new file 107
found new file 66
3000000 of 47127338
found new file 160
found new file 68
found new file 148
4000000 of 47127338
found new file 124
found new file 33
found new file 125
found new file 26
5000000 of 47127338
found new file 141
found new file 24
found new file 37
found new file 25
6000000 of 47127338
found new file 172
found new file 45
found new file 132
7000000 of 47127338
found new file 19
found new file 140
found new file 4
found new file 57
8000000 of 47127338
found new file 87
found new file 48
found new file 70
found new file 163
9000000 of 47127338
found new file 62
found new file 61
found new file 171
found new file 46
10000000 of 47127338
found new file 49
found new file 10
found new file 30
11000000 of 47127338
found new file 

## (Optional) Benchmarking how long it takes to serve samples from the data loader

In [None]:

@timing
def iterate_dataloader(num_workers=24, batch_size=64, prefetch_factor=10):
    dl = DataLoader(train_dataset, batch_size=batch_size, num_workers=num_workers, pin_memory=True, prefetch_factor=prefetch_factor, sampler=ParquetBatchedSampler(data_source=train_dataset))

iterate_dataloader()

TypeError: 'module' object is not callable

# Prepare data

In [None]:
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models
# Separate features and responders
features = sample_df.filter(regex='^feature_')
responders = sample_df.filter(regex='^responder_')
# Convert to numpy arrays for TensorFlow
X = features.values  # Features for input
y = responders.values  # Responders for output
X = np.nan_to_num(X, nan=0.0, posinf=0.0, neginf=0.0)
y = np.nan_to_num(y, nan=0.0, posinf=0.0, neginf=0.0)

# Build the Autoencoder Model

In [None]:
# Define the number of input and output nodes
input_dim = X.shape[1]  # Number of features (79)
output_dim = y.shape[1]  # Number of responders (9)
# Define the model
model = models.Sequential([
    layers.Input(shape=(input_dim,)),  # Input layer
    layers.Dense(64, activation='relu'),  # Encoder
    layers.Dense(32, activation='relu'),  # Bottleneck layer (compression)
    layers.Dense(64, activation='relu'),  # Decoder
    layers.Dense(output_dim, activation='linear')  # Output layer for responders
])
model.compile(optimizer='adam', loss='mse')

# Train Autoencoder Model

In [None]:
from tensorflow.keras.callbacks import LearningRateScheduler
def step_decay(epoch):
    initial_lr = 0.01
    drop = 0.5
    epochs_drop = 5
    lr = initial_lr * (drop ** (epoch // epochs_drop))
    return lr
lr_scheduler = LearningRateScheduler(step_decay)

In [None]:
from tensorflow.keras.callbacks import EarlyStopping
# Define EarlyStopping
early_stopping = EarlyStopping(
    monitor='val_loss',    # Monitor validation loss
    patience=10,            # Number of epochs to wait for improvement
    min_delta=0.001,       # Minimum change to qualify as an improvement
    restore_best_weights=True  # Restore weights from the best epoch
)

history = model.fit(
    X, y,
    epochs=50,
    batch_size=32,
    validation_split=0.2,
    callbacks=[early_stopping, lr_scheduler]
)

In [None]:
model.save("/kaggle/working/model.keras")

# Submission

See [Jane Street RMF Demo Submission](https://www.kaggle.com/code/ryanholbrook/jane-street-rmf-demo-submission) for details

In [None]:
import os
import polars as pl
import kaggle_evaluation.jane_street_inference_server

In [None]:
import polars as pl
import numpy as np
# Assuming `model` is your trained model
# Assuming features required by the model are named 'feature_00', 'feature_01', etc.
def predict(test: pl.DataFrame, lags: pl.DataFrame | None) -> pl.DataFrame | pd.DataFrame:
    """Make a prediction."""
    global lags_
    if lags is not None:
        lags_ = lags
    # Extract the features for the model input
    feature_columns = [col for col in test.columns if col.startswith("feature_")]
    features = test.select(feature_columns).to_numpy()  # Convert to numpy array for model input
    features = np.nan_to_num(features, nan=0.0, posinf=0.0, neginf=0.0)
    # Generate predictions using the model
    model_predictions = model.predict(features)
    responder_6_predictions = model_predictions[:, 6]  # Assuming responder_6 is at index 6
    # Create a new Polars DataFrame with row_id and responder_6 predictions
    predictions = test.select("row_id").with_columns(
        pl.Series("responder_6", responder_6_predictions)
    )
    # Ensure the output format and length requirements
    if isinstance(predictions, pl.DataFrame):
        assert predictions.columns == ['row_id', 'responder_6']
    elif isinstance(predictions, pd.DataFrame):
        assert (predictions.columns == ['row_id', 'responder_6']).all()
    else:
        raise TypeError('The predict function must return a DataFrame')
    
    assert len(predictions) == len(test)
    return predictions

In [None]:
inference_server = kaggle_evaluation.jane_street_inference_server.JSInferenceServer(predict)
if os.getenv('KAGGLE_IS_COMPETITION_RERUN'):
    inference_server.serve()
else:
    inference_server.run_local_gateway(
        (
            '/kaggle/input/jane-street-real-time-market-data-forecasting/test.parquet',
            '/kaggle/input/jane-street-real-time-market-data-forecasting/lags.parquet',
        )
    )