✅ Function Purpose:
Input: 'Cleaned_FHVHV.parquet'

Target Variables: base_passenger_fare, trip_time

Output: X_train, X_test, y_train, y_test

✅ Function Workflow:
Load the dataset from Parquet.

Feature engineering:

Add pickup_unix, pickup_day, is_weekend, pickup_bucket, fare_to_pay_ratio, time_to_pickup, zone_pair.

Drop irrelevant columns like timestamps and Borough/ServiceZone.

One-hot encode categorical features: fhvhv_type, trip_category, trip_distance_class, pickup_bucket, zone_pair.

Handle missing & infinite values.

Clip outliers in fare_per_mile and average_speed.

Scale numeric features using StandardScaler.

Split into train/test sets (80/20).

✅ Ram Strategy
🔹 1. Add Row Sampling (limit the dataset size to fit RAM)
Add a row_limit parameter to read only part of the data.

🔹 2. Use Sparse Matrices (to avoid huge dense arrays from one-hot encoding)
Set sparse_output=True in OneHotEncoder and avoid converting it to DataFrame until needed.

🔹 3. Avoid In-Memory Full Export
Avoid exporting large X_processed as a dense DataFrame. Save X_train/X_test only, optionally using sparse format.

| File Name                   | Description                          |
| --------------------------- | ------------------------------------ |
| `FHVHV_X_processed.parquet` | Full features after encoding/scaling |
| `FHVHV_y.parquet`           | Full target column                   |
| `FHVHV_X_train.parquet`     | Training features                    |
| `FHVHV_X_test.parquet`      | Test features                        |
| `FHVHV_y_train.parquet`     | Training target                      |
| `FHVHV_y_test.parquet`      | Test target                          |
| `FHVHV_preprocessor.pkl`    | Saved preprocessor object            |


 # ✅ Part 1: Preprocessing and exporting cleaned feature-ready dataset


In [35]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.model_selection import train_test_split
from scipy import sparse
import joblib

def preprocess_split_export_fhvhv(
    parquet_path="Cleaned_FHVHV.parquet",
    target_col='base_passenger_fare',
    output_prefix='FHVHV',
    row_limit=500_000  # ✅ Limit rows for RAM safety
):
    # Step 1: Load data (with sampling)
    df = pd.read_parquet(parquet_path, engine='pyarrow')
    if row_limit:
        df = df.sample(n=row_limit, random_state=42)

    # Step 2: Feature engineering
    df['pickup_datetime'] = pd.to_datetime(df['pickup_datetime'], errors='coerce')
    df['pickup_hour'] = df['pickup_datetime'].dt.hour
    df['pickup_unix'] = df['pickup_datetime'].astype('int64') // 10**9
    df['pickup_day'] = df['pickup_datetime'].dt.dayofweek
    df['is_weekend'] = df['pickup_day'] >= 5
    df['pickup_bucket'] = pd.cut(df['pickup_hour'], bins=[-1, 6, 12, 18, 24],
                                 labels=['Night', 'Morning', 'Afternoon', 'Evening'])
    df['zone_pair'] = df['PU_Zone'].astype(str) + "_" + df['DO_Zone'].astype(str)
    df['fare_to_pay_ratio'] = df['base_passenger_fare'] / (df['driver_pay'] + 1e-6)
    df['time_to_pickup'] = (
        pd.to_datetime(df['pickup_datetime'], errors='coerce') -
        pd.to_datetime(df['on_scene_datetime'], errors='coerce')
    ).dt.total_seconds()

    # Step 3: Drop irrelevant or unused columns
    drop_cols = [
        'request_datetime', 'on_scene_datetime', 'pickup_datetime', 'dropoff_datetime',
        'pickup_hour', 'dropoff_hour',
        'PU_Borough', 'PU_ServiceZone', 'DO_Borough', 'DO_ServiceZone'
    ]
    df.drop(columns=drop_cols, inplace=True, errors='ignore')

    # Step 4: Clean and filter
    df.replace([np.inf, -np.inf], np.nan, inplace=True)
    df.dropna(inplace=True)
    df = df[(df['fare_per_mile'] > 0) & (df['fare_per_mile'] < 50)]
    df = df[(df['average_speed'] > 0) & (df['average_speed'] < 80)]
    df = df[df[target_col].notna()]

    # Step 5: Separate features and target
    y = df[target_col]
    X = df.drop(columns=[target_col])

    # Step 6: Set categorical flags
    flag_cols = [
        'shared_request_flag', 'shared_match_flag',
        'access_a_ride_flag', 'wav_request_flag', 'wav_match_flag'
    ]
    for col in flag_cols:
        if col in X.columns:
            X[col] = X[col].astype("category")

    # Step 7: Identify column types
    numeric_cols = X.select_dtypes(include=['int64', 'float64']).columns.tolist()
    categorical_cols = X.select_dtypes(include=['object', 'category']).columns.tolist()

    # Step 8: Build preprocessor (sparse)
    preprocessor = ColumnTransformer(transformers=[
        ('num', StandardScaler(), numeric_cols),
        ('cat', OneHotEncoder(handle_unknown='ignore', sparse_output=True), categorical_cols)
    ])

    # Step 9: Fit-transform entire dataset
    X_processed = preprocessor.fit_transform(X)  # will be sparse

    # ✅ Step 10: Export full processed feature set BEFORE split
    sparse.save_npz(f"{output_prefix}_X_processed.npz", X_processed)
    y.to_frame().to_parquet(f"{output_prefix}_y.parquet", index=False)

    # Step 11: Split
    X_train, X_test, y_train, y_test = train_test_split(
        X_processed, y, test_size=0.2, random_state=42
    )

    # Step 12: Export splits
    sparse.save_npz(f"{output_prefix}_X_train.npz", X_train)
    sparse.save_npz(f"{output_prefix}_X_test.npz", X_test)
    y_train.to_frame().to_parquet(f"{output_prefix}_y_train.parquet", index=False)
    y_test.to_frame().to_parquet(f"{output_prefix}_y_test.parquet", index=False)

    # Step 13: Save preprocessor
    joblib.dump(preprocessor, f"{output_prefix}_preprocessor.pkl")

    print(f"✅ Processed and exported {row_limit:,} rows.")
    print("✅ Full processed features exported (before split).")
    print("✅ Train/test sets exported.")
    return X_train, X_test, y_train, y_test, preprocessor

X_train, X_test, y_train, y_test, preprocessor = preprocess_split_export_fhvhv(
    parquet_path='Cleaned_FHVHV.parquet',
    target_col='base_passenger_fare',
    output_prefix='FHVHV',
    row_limit=1000000  # Or lower to avoid memory issues
)


✅ Processed and exported 1,000,000 rows.
✅ Full processed features exported (before split).
✅ Train/test sets exported.
