In [2]:
import dask.dataframe as dd
import pandas as pd
import os

# Define paths
base_path = "jane-street-real-time-market-data-forecasting/train.parquet/"
output_path = "preprocessed_data/"
global_medians_path = "global_medians.csv"

# Ensure output directory exists
os.makedirs(output_path, exist_ok=True)

# Load global medians
global_medians = pd.read_csv(global_medians_path, index_col=0).squeeze()

# Function to process and save each partition
def preprocess_partition(partition, output_file):
    # Fill missing values with global medians
    partition = partition.fillna(global_medians)
    # Save preprocessed partition
    partition.to_parquet(output_file, engine="pyarrow")
    print(f"Saved preprocessed data to {output_file}")

# Load all partitions as Dask DataFrame
input_files = [f"{base_path}/partition_id={i}/part-0.parquet" for i in range(10)]
ddf = dd.read_parquet(input_files, engine="pyarrow")

# Process and save partitions
for i in range(10):
    partition = ddf.partitions[i]
    output_file = f"{output_path}/partition_id={i}.parquet"
    preprocess_partition(partition.compute(), output_file)

print("All partitions successfully preprocessed and saved.")


Saved preprocessed data to preprocessed_data//partition_id=0.parquet
Saved preprocessed data to preprocessed_data//partition_id=1.parquet
Saved preprocessed data to preprocessed_data//partition_id=2.parquet
Saved preprocessed data to preprocessed_data//partition_id=3.parquet
Saved preprocessed data to preprocessed_data//partition_id=4.parquet
Saved preprocessed data to preprocessed_data//partition_id=5.parquet
Saved preprocessed data to preprocessed_data//partition_id=6.parquet
Saved preprocessed data to preprocessed_data//partition_id=7.parquet
Saved preprocessed data to preprocessed_data//partition_id=8.parquet
Saved preprocessed data to preprocessed_data//partition_id=9.parquet
All partitions successfully preprocessed and saved.
