In [11]:
import os
import polars as pl
import pandas as pd
import numpy as np
from tqdm import tqdm
from time import time

In [12]:
# Setup paths
PROJ_DIR = os.path.dirname(os.getcwd())
DATA_DIR = os.path.join(PROJ_DIR, "jane-street-real-time-market-data-forecasting")
OUTPUT_DIR = os.path.join(PROJ_DIR, "feature_enhanced_data")

# Create output directory if it doesn't exist
os.makedirs(OUTPUT_DIR, exist_ok=True)

# Feature column definitions
FEATURE_COLS = [f'feature_{i:02d}' for i in range(79)]

### Null flags + forward then backward fill data

In [13]:
def handle_nulls(df: pd.DataFrame, feature_cols: list) -> pd.DataFrame:
    """
    Strategy:
    1. Create null flags for ALL features (not just ones with nulls)
    2. Forward fill within each symbol_id group
    3. Backward fill remaining nulls (handles first rows)
    """
    print("\n🔍 Processing null values...")
    
    # Store original shape for validation
    original_shape = df.shape
    
    # Create null flags for ALL features
    null_features = []
    for col in tqdm(feature_cols, desc="Creating null flags"):
        # Create flag column regardless of null presence
        flag_col = f'{col}_is_null'
        df[flag_col] = df[col].isnull().astype(np.int8)
        
        # Still track statistics for logging
        null_count = df[col].isnull().sum()
        if null_count > 0:
            null_features.append(col)
            print(f"  {col}: {null_count:,} nulls ({(null_count/len(df))*100:.2f}%)")
    
    # First forward fill within each symbol_id group
    print("\n📈 Forward filling values within symbol groups...")
    ffill_start = time()
    df[feature_cols] = df.groupby('symbol_id')[feature_cols].ffill()
    ffill_time = time() - ffill_start
    print(f"Forward fill completed in {ffill_time:.2f} seconds")
    
    # Handle remaining nulls (first rows) with backward fill
    remaining_nulls = df[feature_cols].isnull().sum()
    if remaining_nulls.any():
        print("\n⚠️ Backward filling remaining nulls (first rows)...")
        bfill_start = time()
        df[feature_cols] = df.groupby('symbol_id')[feature_cols].bfill()
        bfill_time = time() - bfill_start
        print(f"Backward fill completed in {bfill_time:.2f} seconds")
        
        # Check if any nulls still remain (this would happen if entire column is null for a symbol)
        final_nulls = df[feature_cols].isnull().sum()
        if final_nulls.any():
            print("\n⚠️ Warning: Some columns still have nulls after forward and backward fill.")
            print("These are likely entire null columns for some symbols.")
            # Fill these with 0 or another appropriate value
            zero_fill_start = time()
            df[feature_cols] = df[feature_cols].fillna(0)
            zero_fill_time = time() - zero_fill_start
            print(f"Zero fill completed in {zero_fill_time:.2f} seconds")
    
    # Validate processing
    assert df[feature_cols].isnull().sum().sum() == 0, "Found remaining nulls after processing"
    assert df.shape[0] == original_shape[0], "Row count changed during processing"
    
    return df

### Create new data set

In [None]:
print("🚀 Starting data enhancement process...")
start_time = time()

# Load training data
print("Reading training data...")
train = pl.scan_parquet(os.path.join(DATA_DIR, "train.parquet")).\
    select(
        pl.int_range(pl.len(), dtype=pl.UInt64).alias("id"),
        pl.all(),
    )

print("Converting to pandas DataFrame...")
with tqdm(total=1, desc="Loading data") as pbar:
    train_df = train.collect().to_pandas()
    pbar.update(1)

# Record initial shape and memory usage
initial_shape = train_df.shape
initial_mem = train_df.memory_usage().sum() / 1024 / 1024
print(f"\nInitial DataFrame shape: {initial_shape}")
print(f"Initial memory usage: {initial_mem:.2f} MB")

# Process the data
enhanced_train = handle_nulls(train_df, FEATURE_COLS)

# Convert back to Polars for efficient partitioned saving
print("\n💾 Converting to Polars DataFrame for partitioned saving...")
pl_enhanced = pl.from_pandas(enhanced_train)

# Calculate partition sizes
n_partitions = 20
rows_per_partition = len(pl_enhanced) // n_partitions
print(f"Saving {n_partitions} partitions with ~{rows_per_partition:,} rows each")

# Create output directory structure
output_dir = os.path.join(OUTPUT_DIR, "enhanced_train.parquet")
os.makedirs(output_dir, exist_ok=True)

# Save partitioned data
print("\n💾 Saving partitioned enhanced data...")
for i in tqdm(range(n_partitions), desc="Saving partitions"):
    start_idx = i * rows_per_partition
    end_idx = None if i == n_partitions - 1 else (i + 1) * rows_per_partition
    
    partition = pl_enhanced.slice(start_idx, rows_per_partition if end_idx else len(pl_enhanced) - start_idx)
    partition_path = os.path.join(output_dir, f"partition_id={i}")
    os.makedirs(partition_path, exist_ok=True)
    partition.write_parquet(os.path.join(partition_path, "part-0.parquet"))

end_time = time()
print(f"\n✨ Processing completed in {end_time - start_time:.2f} seconds")

# Print sample of changes
print("\nSample of enhanced data (first 5 rows of first feature):")
sample = pl_enhanced.select(['feature_00', 'feature_00_is_null']).head(5)
print(sample)

🚀 Starting data enhancement process...
Reading training data...
Converting to pandas DataFrame...


Loading data: 100%|██████████| 1/1 [00:56<00:00, 56.15s/it]



Initial DataFrame shape: (47127338, 94)
Initial memory usage: 16584.38 MB

🔍 Processing null values...


Creating null flags:   1%|▏         | 1/79 [00:00<00:46,  1.66it/s]

  feature_00: 3,182,052 nulls (6.75%)


Creating null flags:   3%|▎         | 2/79 [00:02<01:32,  1.20s/it]

  feature_01: 3,182,052 nulls (6.75%)


Creating null flags:   4%|▍         | 3/79 [00:03<01:50,  1.46s/it]

  feature_02: 3,182,052 nulls (6.75%)


Creating null flags:   5%|▌         | 4/79 [00:05<01:45,  1.41s/it]

  feature_03: 3,182,052 nulls (6.75%)


Creating null flags:   6%|▋         | 5/79 [00:06<01:44,  1.41s/it]

  feature_04: 3,182,052 nulls (6.75%)


Creating null flags:  13%|█▎        | 10/79 [00:11<00:55,  1.23it/s]

  feature_08: 300,247 nulls (0.64%)


Creating null flags:  22%|██▏       | 17/79 [00:12<00:16,  3.71it/s]

  feature_15: 1,209,299 nulls (2.57%)
  feature_16: 261 nulls (0.00%)


Creating null flags:  23%|██▎       | 18/79 [00:13<00:14,  4.23it/s]

  feature_17: 201,838 nulls (0.43%)


Creating null flags:  24%|██▍       | 19/79 [00:16<01:07,  1.13s/it]

  feature_18: 226 nulls (0.00%)


Creating null flags:  25%|██▌       | 20/79 [00:22<02:32,  2.59s/it]

  feature_19: 226 nulls (0.00%)


Creating null flags:  28%|██▊       | 22/79 [00:38<04:55,  5.18s/it]

  feature_21: 8,435,985 nulls (17.90%)


Creating null flags:  35%|███▌      | 28/79 [00:46<01:03,  1.24s/it]

  feature_26: 8,435,985 nulls (17.90%)
  feature_27: 8,435,985 nulls (17.90%)


Creating null flags:  42%|████▏     | 33/79 [00:47<00:15,  2.93it/s]

  feature_31: 8,435,985 nulls (17.90%)
  feature_32: 478,457 nulls (1.02%)


Creating null flags:  44%|████▍     | 35/79 [00:47<00:10,  4.11it/s]

  feature_33: 478,457 nulls (1.02%)


Creating null flags:  48%|████▊     | 38/79 [00:48<00:13,  3.13it/s]

  feature_37: 849 nulls (0.00%)


Creating null flags:  51%|█████     | 40/79 [00:49<00:16,  2.34it/s]

  feature_39: 4,300,649 nulls (9.13%)


Creating null flags:  52%|█████▏    | 41/79 [00:50<00:20,  1.90it/s]

  feature_40: 67,856 nulls (0.14%)


Creating null flags:  53%|█████▎    | 42/79 [00:51<00:21,  1.73it/s]

  feature_41: 1,093,012 nulls (2.32%)


Creating null flags:  54%|█████▍    | 43/79 [00:51<00:20,  1.79it/s]

  feature_42: 4,300,649 nulls (9.13%)


Creating null flags:  56%|█████▌    | 44/79 [00:52<00:20,  1.75it/s]

  feature_43: 67,856 nulls (0.14%)


Creating null flags:  57%|█████▋    | 45/79 [00:53<00:28,  1.18it/s]

  feature_44: 1,093,012 nulls (2.32%)


Creating null flags:  58%|█████▊    | 46/79 [00:54<00:31,  1.04it/s]

  feature_45: 317,163 nulls (0.67%)


Creating null flags:  59%|█████▉    | 47/79 [00:55<00:30,  1.04it/s]

  feature_46: 317,163 nulls (0.67%)


Creating null flags:  61%|██████    | 48/79 [00:56<00:26,  1.17it/s]

  feature_47: 87 nulls (0.00%)


Creating null flags:  65%|██████▍   | 51/79 [00:59<00:25,  1.09it/s]

  feature_50: 4,254,098 nulls (9.03%)


Creating null flags:  66%|██████▌   | 52/79 [01:00<00:25,  1.07it/s]

  feature_51: 13,805 nulls (0.03%)


Creating null flags:  67%|██████▋   | 53/79 [01:01<00:28,  1.10s/it]

  feature_52: 1,044,898 nulls (2.22%)


Creating null flags:  68%|██████▊   | 54/79 [01:02<00:29,  1.19s/it]

  feature_53: 4,254,098 nulls (9.03%)


Creating null flags:  70%|██████▉   | 55/79 [01:03<00:26,  1.09s/it]

  feature_54: 13,805 nulls (0.03%)


Creating null flags:  71%|███████   | 56/79 [01:04<00:21,  1.08it/s]

  feature_55: 1,044,898 nulls (2.22%)


Creating null flags:  72%|███████▏  | 57/79 [01:05<00:22,  1.02s/it]

  feature_56: 226 nulls (0.00%)


Creating null flags:  73%|███████▎  | 58/79 [01:06<00:22,  1.09s/it]

  feature_57: 226 nulls (0.00%)


Creating null flags:  75%|███████▍  | 59/79 [01:07<00:18,  1.07it/s]

  feature_58: 478,452 nulls (1.02%)


Creating null flags:  80%|███████▉  | 63/79 [01:10<00:11,  1.41it/s]

  feature_62: 292,827 nulls (0.62%)


Creating null flags:  81%|████████  | 64/79 [01:11<00:09,  1.59it/s]

  feature_63: 227,566 nulls (0.48%)


Creating null flags:  82%|████████▏ | 65/79 [01:12<00:10,  1.33it/s]

  feature_64: 237,663 nulls (0.50%)


Creating null flags:  84%|████████▎ | 66/79 [01:13<00:13,  1.07s/it]

  feature_65: 317,163 nulls (0.67%)


Creating null flags:  85%|████████▍ | 67/79 [01:18<00:23,  1.98s/it]

  feature_66: 317,163 nulls (0.67%)


Creating null flags:  94%|█████████▎| 74/79 [01:47<00:16,  3.37s/it]

  feature_73: 483,759 nulls (1.03%)


Creating null flags:  95%|█████████▍| 75/79 [01:47<00:09,  2.45s/it]

  feature_74: 483,759 nulls (1.03%)


Creating null flags:  96%|█████████▌| 76/79 [01:47<00:05,  1.82s/it]

  feature_75: 58,430 nulls (0.12%)


Creating null flags:  97%|█████████▋| 77/79 [01:48<00:02,  1.40s/it]

  feature_76: 58,430 nulls (0.12%)


Creating null flags:  99%|█████████▊| 78/79 [01:48<00:01,  1.08s/it]

  feature_77: 20,043 nulls (0.04%)


Creating null flags: 100%|██████████| 79/79 [01:48<00:00,  1.38s/it]

  feature_78: 20,043 nulls (0.04%)

📈 Forward filling values within symbol groups...





Forward fill completed in 288.62 seconds

⚠️ Backward filling remaining nulls (first rows)...
Backward fill completed in 216.95 seconds

💾 Saving enhanced data to /monfs01/projects/ys68/JaneStreet-Kaggle/feature_enhanced_data/enhanced_train.parquet

✨ Processing completed in 922.12 seconds

Sample of enhanced data (first 5 rows of first feature):
   feature_00  feature_00_is_null
0    1.161135                   1
1    1.157178                   1
2    2.085530                   1
3    1.237014                   1
4    1.358605                   1
