# Amex 1st Place Pipeline - Step 2: Preprocessing (v5 Memory-Fix)

This notebook's *only* job is to preprocess the data.

**NEW (Memory Fix):** This version loads the large `train_fe.parquet` and `test_fe.parquet` files in **column-wise chunks** to avoid MemoryErrors during loading.

1.  Reads the parquet file *schema* (column names) first.
2.  Loads features in chunks of 500 columns.
3.  Applies feature selection, imputation, encoding, and downcasting to each chunk.
4.  Concatenates the processed chunks into a final DataFrame.
5.  Saves the final `train_processed.parquet`, `test_processed.parquet`, and preprocessor files.

In [1]:
import pandas as pd
import numpy as np
import gc
import os
import time
import joblib
import json
import pyarrow.parquet as pq # Import pyarrow
from sklearn.preprocessing import LabelEncoder
from tqdm.auto import tqdm

# --- Define Paths ---
FE_DATA_DIR = '../data_fe/'
TRAIN_PATH_IN = os.path.join(FE_DATA_DIR, 'train_fe.parquet')
TEST_PATH_IN = os.path.join(FE_DATA_DIR, 'test_fe.parquet') 

TRAIN_PATH_OUT = os.path.join(FE_DATA_DIR, 'train_processed.parquet')
TEST_PATH_OUT = os.path.join(FE_DATA_DIR, 'test_processed.parquet') 

PREPROCESSOR_DIR = './preprocessors/'
if not os.path.exists(PREPROCESSOR_DIR):
    os.makedirs(PREPROCESSOR_DIR)

  from .autonotebook import tqdm as notebook_tqdm


## Step 1: Preprocess Train Data (Chunked)

In [2]:
print(f"--- Starting Preprocessing for {TRAIN_PATH_IN} ---")
start_time = time.time()

# --- 1. Get Column Schema --- 
print("Loading column schema from parquet file...")
pf = pq.ParquetFile(TRAIN_PATH_IN)
all_cols = pf.schema.names
print(f"Total columns in file: {len(all_cols)}")

# --- 2. Load Essential Columns --- 
print("Loading essential columns (customer_ID, target)...")
y_train = pd.read_parquet(TRAIN_PATH_IN, columns=['customer_ID', 'target'])
X_train = y_train[['customer_ID']].copy() # Base DataFrame to merge processed chunks onto
gc.collect()

# --- 3. Define Feature Lists --- 
print("Defining feature lists based on schema...")
features_to_remove = [col for col in all_cols if 'skew' in col or 'kurt' in col or 'sub_mean' in col or 'div_mean' in col]
features_to_process = [col for col in all_cols if col not in ['customer_ID', 'target'] + features_to_remove]
print(f"Original feature count: {len(all_cols) - 2}. Removing {len(features_to_remove)} features. Processing {len(features_to_process)} features.")

original_cat_features = ['B_30', 'B_38', 'D_114', 'D_116', 'D_117', 'D_120', 'D_126', 'D_63', 'D_64', 'D_66', 'D_68']
categorical_cols_for_lgb = [col for col in features_to_process if any(cat_feat in col for cat_feat in original_cat_features) and col.endswith(('_last', '_nunique', '_count'))]
categorical_cols_for_lgb += [col for col in original_cat_features if col in features_to_process] 
categorical_cols_for_lgb = list(set(categorical_cols_for_lgb))

# Define object columns based on the 1st place solution's logic (S1/S2)
# These are the 'last' and 'first' aggregates of the *original* object-type categoricals.
obj_cols_base = ['B_30', 'B_38', 'D_114', 'D_116', 'D_117', 'D_120', 'D_126', 'D_63', 'D_64', 'D_66', 'D_68']
obj_cols = [f'{col}_{suffix}' for col in obj_cols_base for suffix in ['last', 'first'] if f'{col}_{suffix}' in features_to_process]
obj_cols = list(set(obj_cols))
print(f"Found {len(obj_cols)} object-type columns to LabelEncode.")

num_cols = [col for col in features_to_process if col not in categorical_cols_for_lgb]
print(f"Found {len(num_cols)} numeric columns to downcast.")

# --- 4. Load, Process, and Concatenate Chunks --- 
print("Loading and processing X_train in column-wise chunks...")
encoders = {}
chunk_size = 500 

for i in tqdm(range(0, len(features_to_process), chunk_size), desc="Processing X_train chunks"):
    chunk_cols = features_to_process[i:i + chunk_size]
    
    # Load one chunk of columns
    X_chunk = pd.read_parquet(TRAIN_PATH_IN, columns=chunk_cols)
    gc.collect()

    # Process this chunk in-place
    for col in chunk_cols:
        X_chunk[col] = X_chunk[col].fillna(0)
        
        if col in obj_cols:
            le = LabelEncoder()
            X_chunk[col] = le.fit_transform(X_chunk[col].astype(str))
            encoders[col] = le 
            X_chunk[col] = X_chunk[col].astype('category')
        elif col in categorical_cols_for_lgb:
            X_chunk[col] = X_chunk[col].astype('category')
        elif col in num_cols:
            X_chunk[col] = X_chunk[col].astype(np.float16)
    
    # Add this processed chunk to the main DataFrame
    X_train = pd.concat([X_train, X_chunk], axis=1)
    del X_chunk; gc.collect()

# --- 5. Save Preprocessing Artifacts & Final Data ---
print("Saving preprocessing artifacts...")
joblib.dump(encoders, os.path.join(PREPROCESSOR_DIR, 'encoders.joblib'))

features_list = [col for col in X_train.columns if col not in ['customer_ID']]
column_lists = {
    'all_features': features_list,
    'categorical_cols_for_lgb': [col for col in categorical_cols_for_lgb if col in features_list],
    'obj_cols_to_encode': obj_cols,
    'numeric_cols_to_downcast': num_cols
}
with open(os.path.join(PREPROCESSOR_DIR, 'column_lists.json'), 'w') as f:
    json.dump(column_lists, f)

print("Saving processed train features...")
train_processed = X_train.merge(y_train, on='customer_ID', how='left')
train_processed.to_parquet(TRAIN_PATH_OUT, index=False)
print(f"Processed train data saved to: {TRAIN_PATH_OUT}")
print(f"Total time for train processing: {time.time() - start_time:.2f}s")

del X_train, y_train, train_processed, pf; gc.collect()

--- Starting Preprocessing for ../data_fe/train_fe.parquet ---
Loading column schema from parquet file...
Total columns in file: 7007
Loading essential columns (customer_ID, target)...
Defining feature lists based on schema...
Original feature count: 7005. Removing 0 features. Processing 7005 features.
Found 11 object-type columns to LabelEncode.
Found 6906 numeric columns to downcast.
Loading and processing X_train in column-wise chunks...


  return arr.astype(dtype, copy=True)
  return arr.astype(dtype, copy=True)
  return arr.astype(dtype, copy=True)
  return arr.astype(dtype, copy=True)
  return arr.astype(dtype, copy=True)
  return arr.astype(dtype, copy=True)
  return arr.astype(dtype, copy=True)
  return arr.astype(dtype, copy=True)
  return arr.astype(dtype, copy=True)
  return arr.astype(dtype, copy=True)
  return arr.astype(dtype, copy=True)
  return arr.astype(dtype, copy=True)
  return arr.astype(dtype, copy=True)
  return arr.astype(dtype, copy=True)
  return arr.astype(dtype, copy=True)
  return arr.astype(dtype, copy=True)
  return arr.astype(dtype, copy=True)
  return arr.astype(dtype, copy=True)
  return arr.astype(dtype, copy=True)
  return arr.astype(dtype, copy=True)
  return arr.astype(dtype, copy=True)
  return arr.astype(dtype, copy=True)
  return arr.astype(dtype, copy=True)
  return arr.astype(dtype, copy=True)
  return arr.astype(dtype, copy=True)
  return arr.astype(dtype, copy=True)
  return arr

Saving preprocessing artifacts...
Saving processed train features...
Processed train data saved to: ../data_fe/train_processed.parquet
Total time for train processing: 182.51s


2

## Step 2: Preprocess Test Data (Chunked)

Now we do the exact same process for the test data, using the encoders and column lists from the train set.

In [3]:
print(f"--- Starting Preprocessing for {TEST_PATH_IN} ---")
start_time = time.time()

# --- 1. Get Column Schema --- 
print("Loading column schema from parquet file...")
pf_test = pq.ParquetFile(TEST_PATH_IN)
all_test_cols = pf_test.schema.names
print(f"Total columns in file: {len(all_test_cols)}")

# --- 2. Load Essential Columns --- 
print("Loading essential column (customer_ID)...")
X_test = pd.read_parquet(TEST_PATH_IN, columns=['customer_ID'])
gc.collect()

# --- 3. Load Feature Lists (from training) ---
print("Loading column lists from training...")
with open(os.path.join(PREPROCESSOR_DIR, 'column_lists.json'), 'r') as f:
    column_lists = json.load(f)
features_to_process = column_lists['all_features']
categorical_cols_for_lgb = column_lists['categorical_cols_for_lgb']
obj_cols = column_lists['obj_cols_to_encode']
num_cols = column_lists['numeric_cols_to_downcast']
encoders = joblib.load(os.path.join(PREPROCESSOR_DIR, 'encoders.joblib'))

# --- 4. Load, Process, and Concatenate Chunks --- 
print("Loading and processing X_test in column-wise chunks...")
chunk_size = 500
for i in tqdm(range(0, len(features_to_process), chunk_size), desc="Processing X_test chunks"):
    chunk_cols = features_to_process[i:i + chunk_size]
    
    # Load one chunk of columns
    X_chunk = pd.read_parquet(TEST_PATH_IN, columns=chunk_cols)
    gc.collect()

    # Process this chunk in-place
    for col in chunk_cols:
        X_chunk[col] = X_chunk[col].fillna(0)
        
        if col in obj_cols:
            if col not in encoders:
                print(f"Warning: Encoder for {col} not found. Skipping encoding.")
                continue
            le = encoders[col]
            
            unseen_mask = ~X_chunk[col].astype(str).isin(le.classes_)
            if unseen_mask.any():
                if 'unseen' not in le.classes_:
                    le.classes_ = np.append(le.classes_, 'unseen')
                X_chunk.loc[unseen_mask, col] = 'unseen'
            
            X_chunk[col] = le.transform(X_chunk[col].astype(str))
            X_chunk[col] = X_chunk[col].astype('category')
        elif col in categorical_cols_for_lgb:
            X_chunk[col] = X_chunk[col].astype('category')
        elif col in num_cols:
            X_chunk[col] = X_chunk[col].astype(np.float16)
    
    # Add this processed chunk to the main DataFrame
    X_test = pd.concat([X_test, X_chunk], axis=1)
    del X_chunk; gc.collect()

# --- 5. Save the processed test file ---
print("X_test preprocessing complete.")
print(f"Final X_test shape: {X_test.shape}")
print(f"Saving processed test data to {TEST_PATH_OUT}...")
X_test.to_parquet(TEST_PATH_OUT, index=False)
print("Processed test data saved.")

print("\nPreprocessing notebook complete. You can now run the Training & Inference notebook.")
del X_test, pf_test; gc.collect()

--- Starting Preprocessing for ../data_fe/test_fe.parquet ---
Loading column schema from parquet file...
Total columns in file: 7006
Loading essential column (customer_ID)...
Loading column lists from training...
Loading and processing X_test in column-wise chunks...


  return arr.astype(dtype, copy=True)
  return arr.astype(dtype, copy=True)
  return arr.astype(dtype, copy=True)
  return arr.astype(dtype, copy=True)
  return arr.astype(dtype, copy=True)
  return arr.astype(dtype, copy=True)
  return arr.astype(dtype, copy=True)
  return arr.astype(dtype, copy=True)
  return arr.astype(dtype, copy=True)
  return arr.astype(dtype, copy=True)
  return arr.astype(dtype, copy=True)
  return arr.astype(dtype, copy=True)
  return arr.astype(dtype, copy=True)
  return arr.astype(dtype, copy=True)
  return arr.astype(dtype, copy=True)
  return arr.astype(dtype, copy=True)
  return arr.astype(dtype, copy=True)
  return arr.astype(dtype, copy=True)
  return arr.astype(dtype, copy=True)
  return arr.astype(dtype, copy=True)
  return arr.astype(dtype, copy=True)
  return arr.astype(dtype, copy=True)
  return arr.astype(dtype, copy=True)
  return arr.astype(dtype, copy=True)
  return arr.astype(dtype, copy=True)
  return arr.astype(dtype, copy=True)
  return arr

X_test preprocessing complete.
Final X_test shape: (924621, 7006)
Saving processed test data to ../data_fe/test_processed.parquet...
Processed test data saved.

Preprocessing notebook complete. You can now run the Training & Inference notebook.


2