# Data Preprocessing: Trip Duration

Generate code & functions such that conducts data preprocessing(includes feature engineering & data cleaning).

In [1]:
%%time

# Import Standard Libraries
import os
import sys
import warnings
# warnings.filterwarnings("ignore")

# Import Data Handling Libraries
import pandas as pd
import numpy as np
np.random.seed(42)

# Import Date-Time Handling Libraries
from datetime import timedelta
import datetime as dt

# Import Geodetic Libraries
import pyproj
from pyproj import Geod

# Import Data Visualization Libraries
import matplotlib
matplotlib.rcParams["font.size"] = 12
import matplotlib.pyplot as plt
plt.rcParams["figure.figsize"] = [12, 12]  # Set default figure size
import seaborn as sns

# Import Machine Learning Libraries
from sklearn.decomposition import PCA  # Principal Component Analysis
from scipy.stats import skew, kurtosis

# Set random seed for reproducibility in scikit-learn
from sklearn.utils import check_random_state
from sklearn.preprocessing import MinMaxScaler, StandardScaler
rng = check_random_state(42)

# Import Utilities
import time
import gc
from tqdm import tqdm
import joblib
from joblib import Parallel, delayed
import multiprocessing
import json

# Import Custom Modules
from data_loader import *  # Custom data loading functions

CPU times: user 1.53 s, sys: 451 ms, total: 1.98 s
Wall time: 1.35 s


In [2]:
%%time

# Load the dataset
df_train = load_data("train")
df_train.head()

Detected environment: sys
Loading train.csv from: /root/CEGE0042-STDM/data/train.csv
CPU times: user 2.44 s, sys: 586 ms, total: 3.02 s
Wall time: 3.02 s


Unnamed: 0,id,vendor_id,pickup_datetime,dropoff_datetime,passenger_count,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,store_and_fwd_flag,trip_duration
0,id0458976,2,2016-06-29 18:21:02,2016-06-29 18:39:55,1,-73.862762,40.768822,-73.891701,40.746689,N,1133
1,id0434613,2,2016-04-25 13:03:26,2016-04-25 13:18:13,1,-73.958038,40.783237,-73.97551,40.760853,N,887
2,id3809234,2,2016-05-07 12:36:09,2016-05-07 12:47:35,1,-73.96946,40.785519,-73.989243,40.771748,N,686
3,id1203705,1,2016-05-14 18:44:17,2016-05-14 18:57:55,1,-73.981743,40.736549,-73.998352,40.72644,N,818
4,id1896645,2,2016-04-10 22:51:25,2016-04-10 23:07:16,1,-73.977913,40.752609,-73.975647,40.733139,N,951


In [3]:
df_train.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1166915 entries, 0 to 1166914
Data columns (total 11 columns):
 #   Column              Non-Null Count    Dtype  
---  ------              --------------    -----  
 0   id                  1166915 non-null  object 
 1   vendor_id           1166915 non-null  int64  
 2   pickup_datetime     1166915 non-null  object 
 3   dropoff_datetime    1166915 non-null  object 
 4   passenger_count     1166915 non-null  int64  
 5   pickup_longitude    1166915 non-null  float64
 6   pickup_latitude     1166915 non-null  float64
 7   dropoff_longitude   1166915 non-null  float64
 8   dropoff_latitude    1166915 non-null  float64
 9   store_and_fwd_flag  1166915 non-null  object 
 10  trip_duration       1166915 non-null  int64  
dtypes: float64(4), int64(3), object(4)
memory usage: 97.9+ MB


In [4]:
%%time

# Load the test dataset
df_test = load_data("test")
df_test.head()

Detected environment: sys
Loading test.csv from: /root/CEGE0042-STDM/data/test.csv
CPU times: user 610 ms, sys: 115 ms, total: 725 ms
Wall time: 723 ms


Unnamed: 0,id,vendor_id,pickup_datetime,dropoff_datetime,passenger_count,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,store_and_fwd_flag,trip_duration
0,id2793718,2,2016-06-08 07:36:19,2016-06-08 07:53:39,1,-73.985611,40.735943,-73.980331,40.760468,N,1040
1,id3485529,2,2016-04-03 12:58:11,2016-04-03 13:11:58,1,-73.978394,40.764351,-73.991623,40.749859,N,827
2,id1816614,2,2016-06-05 02:49:13,2016-06-05 02:59:27,5,-73.989059,40.744389,-73.973381,40.748692,N,614
3,id1050851,2,2016-05-05 17:18:27,2016-05-05 17:32:54,2,-73.990326,40.731136,-73.991264,40.748917,N,867
4,id0140657,1,2016-05-12 17:43:38,2016-05-12 19:06:25,4,-73.789497,40.646675,-73.987137,40.759232,N,4967


In [5]:
df_test.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 291729 entries, 0 to 291728
Data columns (total 11 columns):
 #   Column              Non-Null Count   Dtype  
---  ------              --------------   -----  
 0   id                  291729 non-null  object 
 1   vendor_id           291729 non-null  int64  
 2   pickup_datetime     291729 non-null  object 
 3   dropoff_datetime    291729 non-null  object 
 4   passenger_count     291729 non-null  int64  
 5   pickup_longitude    291729 non-null  float64
 6   pickup_latitude     291729 non-null  float64
 7   dropoff_longitude   291729 non-null  float64
 8   dropoff_latitude    291729 non-null  float64
 9   store_and_fwd_flag  291729 non-null  object 
 10  trip_duration       291729 non-null  int64  
dtypes: float64(4), int64(3), object(4)
memory usage: 24.5+ MB


In [6]:
%%time

# Delete cols that leads to data leakage
del df_train["dropoff_datetime"]
del df_test["dropoff_datetime"]
gc.collect()

CPU times: user 61 ms, sys: 31.3 ms, total: 92.3 ms
Wall time: 90.7 ms


0

In [7]:
%%time

# Define helper function formats time seconds into string
def format_time(seconds):
    hours = int(seconds // 3600)
    minutes = int((seconds % 3600) // 60)
    seconds = seconds % 60
    return f"{hours} hour {minutes} min {seconds:.2f} sec"

CPU times: user 2 µs, sys: 1e+03 ns, total: 3 µs
Wall time: 13.4 µs


## Feature Extraction

### PCA in Longitudes & Latitudes

In [8]:
%%time

def apply_pca_to_coords(train, test, random_seed=42):
    """
    Applies PCA transformation to pickup and dropoff coordinates for train and test datasets.

    The PCA is fitted **only on the training data** to prevent data leakage.

    Parameters:
        train (pd.DataFrame): The training dataset.
        test (pd.DataFrame): The testing dataset.
        random_seed (int): Random seed for reproducibility.

    Returns:
        None: Modifies train and test DataFrames in place.
    """

    # Fit PCA only on training data
    coords_train = np.vstack((
        train[["pickup_latitude", "pickup_longitude"]].values,
        train[["dropoff_latitude", "dropoff_longitude"]].values
    ))

    pca = PCA(whiten=True, random_state=random_seed).fit(coords_train)

    # Apply transformation to train dataset
    train_coords_pickup = train[["pickup_latitude", "pickup_longitude"]].values
    train_coords_dropoff = train[["dropoff_latitude", "dropoff_longitude"]].values
    train.loc[:, "pickup_pca0"] = pca.transform(train_coords_pickup)[:, 0]
    train.loc[:, "pickup_pca1"] = pca.transform(train_coords_pickup)[:, 1]
    train.loc[:, "dropoff_pca0"] = pca.transform(train_coords_dropoff)[:, 0]
    train.loc[:, "dropoff_pca1"] = pca.transform(train_coords_dropoff)[:, 1]

    # Apply the same transformation to test dataset to avoid data leakage
    test_coords_pickup = test[["pickup_latitude", "pickup_longitude"]].values
    test_coords_dropoff = test[["dropoff_latitude", "dropoff_longitude"]].values
    test.loc[:, "pickup_pca0"] = pca.transform(test_coords_pickup)[:, 0]
    test.loc[:, "pickup_pca1"] = pca.transform(test_coords_pickup)[:, 1]
    test.loc[:, "dropoff_pca0"] = pca.transform(test_coords_dropoff)[:, 0]
    test.loc[:, "dropoff_pca1"] = pca.transform(test_coords_dropoff)[:, 1]

# Example usage:
apply_pca_to_coords(df_train, df_test, random_seed=42)

CPU times: user 885 ms, sys: 851 ms, total: 1.74 s
Wall time: 341 ms


### Distance

In [9]:
%%time

# Define WGS84 ellipsoid
geod = Geod(ellps="WGS84")

# Compute great-circle distance in kilometers
df_train["euclidean_distance"] = df_train.apply(
    lambda row: geod.inv(row["pickup_longitude"], row["pickup_latitude"],
                         row["dropoff_longitude"], row["dropoff_latitude"])[2] / 1000, axis=1
)

# Compute great-circle distance in kilometers
df_test["euclidean_distance"] = df_test.apply(
    lambda row: geod.inv(row["pickup_longitude"], row["pickup_latitude"],
                         row["dropoff_longitude"], row["dropoff_latitude"])[2] / 1000, axis=1
)

CPU times: user 16.3 s, sys: 787 ms, total: 17.1 s
Wall time: 16.5 s


### Datetime Feature

In [10]:
%%time

def generate_datetime_features(df):
    """
    Generate detailed date-time features for pickups and modify the DataFrame in place.

    Parameters:
        df (pd.DataFrame): The DataFrame containing the datetime column.
    
    Returns:
        None (Modifies df in place)
    """
    # Convert to datetime format
    pickup_times = pd.to_datetime(df["pickup_datetime"])

    # Extract relevant time features as integers
    df["pickup_hour_of_day"] = (pickup_times.dt.hour * 60 + pickup_times.dt.minute) // 60  # Integer division

    df["day_of_week"] = pickup_times.dt.weekday.astype(int)
    df["hour_of_week"] = (df["day_of_week"] * 24 + df["pickup_hour_of_day"]).astype(int)

    df["month_of_year"] = pickup_times.dt.month.astype(int)
    df["day_of_year"] = pickup_times.dt.dayofyear.astype(int)
    df["week_of_year"] = pickup_times.dt.isocalendar().week.astype(int)
    df["hour_of_year"] = (df["day_of_year"] * 24 + df["pickup_hour_of_day"]).astype(int)

generate_datetime_features(df_train)
generate_datetime_features(df_test)

CPU times: user 1.13 s, sys: 23.7 ms, total: 1.15 s
Wall time: 1.15 s


### NYC Weather

In [11]:
%%time

def merge_weather_data(df):
    """
    Merges weather data with a given dataframe (train or test) based on the pickup date.
    
    Parameters:
        df (pd.DataFrame): The train or test dataframe containing 'pickup_datetime'.
    
    Returns:
        pd.DataFrame: The merged dataframe with only the intermediate weather data columns.
    """

    # Load NYC weather data to enrich information
    weather_data = pd.read_csv(os.path.join("utils", "weather_data_nyc_centralpark_2016.csv"), low_memory=False)
    weather_data["date"] = pd.to_datetime(weather_data["date"], format="%d-%m-%Y")

    # Ensure datetime consistency
    weather_data["date"] = weather_data["date"].dt.date
    df["pickup_date"] = pd.to_datetime(df["pickup_datetime"]).dt.date

    # Handle trace values in precipitation, snow fall, and snow depth columns
    weather_data["r_depth"] = weather_data["precipitation"].apply(lambda x: 0.01 if x == "T" else float(x))  # rain depth
    weather_data["s_fall"] = weather_data["snow fall"].apply(lambda x: 0.01 if x == "T" else float(x))  # snow fall
    weather_data["s_depth"] = weather_data["snow depth"].apply(lambda x: 0.01 if x == "T" else float(x))  # snow depth

    # Calculate total precipitation, and snow/rain indicators
    weather_data["all_precip"] = weather_data["s_fall"] + weather_data["r_depth"]
    weather_data["has_snow"] = (weather_data["s_fall"] > 0) | (weather_data["s_depth"] > 0)
    weather_data["has_rain"] = weather_data["r_depth"] > 0

    # Copy temperature columns
    weather_data["max_temp"] = weather_data["maximum temperature"]
    weather_data["min_temp"] = weather_data["minimum temperature"]

    # Select only the newly created columns
    weather_data = weather_data[["date", "r_depth", "s_fall", "s_depth", "all_precip", "has_snow", "has_rain", "max_temp", "min_temp"]]

    # Merge the datasets on the date
    df = df.merge(weather_data, left_on="pickup_date", right_on="date", how="left")

    return df

# Apply function to train and test datasets
df_train = merge_weather_data(df_train)
df_test = merge_weather_data(df_test)

CPU times: user 1.01 s, sys: 236 ms, total: 1.24 s
Wall time: 1.24 s


## Data Cleaning

### Location Outlier

In [12]:
%%time

def filter_by_nyc_boundary(df, geojson_path):
    """
    Filters pickup and dropoff locations to keep only those within the New York City boundary.

    Parameters:
        df (pd.DataFrame): The DataFrame containing pickup and dropoff coordinates.
        geojson_path (str): Path to the GeoJSON file defining NYC boundaries.

    Returns:
        pd.DataFrame: Filtered DataFrame with locations inside the NYC bounding box.
    """
    # Load the GeoJSON file
    with open(geojson_path, "r") as f:
        geojson_data = json.load(f)

    # Extract NYC boundary coordinates where NAME is "New York"
    nyc_coords = []
    for feature in geojson_data["features"]:
        if feature["properties"].get("NAME") == "New York":
            for polygon in feature["geometry"]["coordinates"]:  # Loop through MultiPolygon
                for ring in polygon:  # Each polygon has a ring of coordinates
                    nyc_coords.extend(ring)

    # Compute NYC bounding box (min/max latitudes & longitudes)
    min_long = min(lon for lon, lat in nyc_coords)
    max_long = max(lon for lon, lat in nyc_coords)
    min_lat = min(lat for lon, lat in nyc_coords)
    max_lat = max(lat for lon, lat in nyc_coords)

    # Count records before filtering
    initial_count = len(df)

    # Filter data based on bounding box
    mask = (
        (df["pickup_longitude"].between(min_long, max_long))
        & (df["pickup_latitude"].between(min_lat, max_lat))
        & (df["dropoff_longitude"].between(min_long, max_long))
        & (df["dropoff_latitude"].between(min_lat, max_lat))
    )

    filtered_df = df[mask]

    # Count records after filtering
    final_count = len(filtered_df)
    dropped_count = initial_count - final_count

    print(f"Records before filtering: {initial_count}")
    print(f"Records after filtering: {final_count}")
    print(f"Records dropped: {dropped_count}\n")

    return filtered_df


# Apply function to df_train and df_test
df_train = filter_by_nyc_boundary(df_train, "utils/gz_2010_us_040_00_5m.json")
df_test = filter_by_nyc_boundary(df_test, "utils/gz_2010_us_040_00_5m.json")

Records before filtering: 1166915
Records after filtering: 1166864
Records dropped: 51

Records before filtering: 291729
Records after filtering: 291713
Records dropped: 16

CPU times: user 458 ms, sys: 116 ms, total: 574 ms
Wall time: 571 ms


### Distance & Duration Outlier

In [13]:
%%time

def filter_by_range(df, column, percentile_threshold=0.15, lower_bound=None, upper_bound=None):
    """
    Filters trips based on a given column (e.g., Euclidean distance or trip duration) 
    using percentile thresholds and absolute bounds.

    Parameters:
        df (pd.DataFrame): The DataFrame containing the column to filter.
        column (str): The column to apply filtering on (e.g., 'euclidean_distance', 'trip_duration').
        percentile_threshold (float, optional): The percentile threshold to remove outliers.
                                                Defaults to 0.15 (removes 0.15% lowest and highest values).
        lower_bound (float, optional): Absolute minimum value to keep. Defaults to None (not applied).
        upper_bound (float, optional): Absolute maximum value to keep. Defaults to None (not applied).

    Returns:
        pd.DataFrame: Filtered DataFrame with values within the specified range.
    """
    if column not in df.columns:
        raise ValueError(f"Column '{column}' not found in DataFrame.")
    
    initial_count = len(df)

    # Calculate percentile-based limits
    lower_lim = np.percentile(df[column], percentile_threshold)
    upper_lim = np.percentile(df[column], 100 - percentile_threshold)

    # Apply filtering
    filtered_df = df[df[column].between(lower_lim, upper_lim)]

    filtered_count = len(filtered_df)
    dropped_count = initial_count - filtered_count

    print(f"Records before filtering ({column}): {initial_count}")
    print(f"Records after filtering ({column}): {filtered_count}")
    print(f"Records dropped due to percentile limits ({column}): {dropped_count}")
    print(f"Applied percentile limits ({column}): {lower_lim:.4f} - {upper_lim:.4f}")
    
    if lower_bound is not None:
        filtered_df = filtered_df[filtered_df[column] > lower_bound]
        print(f"Applied lower bound after filtering ({column}): {lower_bound}")
    if upper_bound is not None:
        filtered_df = filtered_df[filtered_df[column] < upper_bound]
        print(f"Applied upper bound after filtering ({column}): {upper_bound}")
        
    final_count = len(filtered_df)
    dropped_count = initial_count - final_count
    print(f"Total records dropped due to {column} outliers: {dropped_count}")

    return filtered_df

# Apply function to filter both euclidean_distance and trip_duration
df_train = filter_by_range(df_train, "euclidean_distance", 0.15, lower_bound=0.1)
print()
df_train = filter_by_range(df_train, "trip_duration", 0.15, lower_bound=300)

Records before filtering (euclidean_distance): 1166864
Records after filtering (euclidean_distance): 1165113
Records dropped due to percentile limits (euclidean_distance): 1751
Applied percentile limits (euclidean_distance): 0.0000 - 23.3741
Applied lower bound after filtering (euclidean_distance): 0.1
Total records dropped due to euclidean_distance outliers: 12369

Records before filtering (trip_duration): 1154495
Records after filtering (trip_duration): 1151066
Records dropped due to percentile limits (trip_duration): 3429
Applied percentile limits (trip_duration): 54.0000 - 7331.2590
Applied lower bound after filtering (trip_duration): 300
Total records dropped due to trip_duration outliers: 172332
CPU times: user 797 ms, sys: 176 ms, total: 973 ms
Wall time: 969 ms


### Spatial & Temporal Aggregation

In [14]:
%%time

def bin_coordinates(df, precision=2):
    """Bins latitude and longitude to a specified precision."""
    df.loc[:, "pickup_lat_bin"] = np.round(df["pickup_latitude"], precision)
    df.loc[:, "pickup_long_bin"] = np.round(df["pickup_longitude"], precision)
    df.loc[:, "dropoff_lat_bin"] = np.round(df["dropoff_latitude"], precision)
    df.loc[:, "dropoff_long_bin"] = np.round(df["dropoff_longitude"], precision)

bin_coordinates(df_train)
bin_coordinates(df_test)

CPU times: user 26.4 ms, sys: 329 µs, total: 26.8 ms
Wall time: 25.6 ms


In [15]:
%%time

def compute_spatial_aggregations(df, min_trips=100):
    """Computes trip counts for different spatial aggregations."""
    groupings = [
        ["pickup_lat_bin", "pickup_long_bin", "dropoff_lat_bin", "dropoff_long_bin"],
        ["pickup_lat_bin", "pickup_long_bin"],
        ["dropoff_lat_bin", "dropoff_long_bin"]
    ]
    
    for groupby_cols in groupings:
        col_name = "cnt_coords_bin_" + "".join(set([col[0] for col in groupby_cols]))
        
        # Compute trip counts and store in a dictionary for fast lookup
        counts = df.groupby(groupby_cols).size().to_dict()
        
        # Apply counts to create a new column in the dataframe
        df[col_name] = df[groupby_cols].apply(lambda row: counts.get(tuple(row), 0), axis=1)
        
        # Apply filtering based on min_trips
        df[col_name] = df[col_name].where(df[col_name] >= min_trips, 0)

compute_spatial_aggregations(df_train)
compute_spatial_aggregations(df_test)

CPU times: user 17.4 s, sys: 111 ms, total: 17.5 s
Wall time: 17.5 s


In [16]:
%%time

def process_chunk(chunk, df_ref):
    """
    Process a chunk of the DataFrame to compute spatial-temporal features.
    
    Args:
        chunk: A subset of the main DataFrame.
        df_ref: Reference DataFrame for aggregation calculations.
    
    Returns:
        DataFrame with computed features for the chunk.
    """
    # Ensure pickup_datetime is in datetime format
    chunk["pickup_datetime"] = pd.to_datetime(chunk["pickup_datetime"])
    df_ref["pickup_datetime"] = pd.to_datetime(df_ref["pickup_datetime"])
    
    # Create temporary columns for time calculations
    df_ref = df_ref.copy()
    df_ref["pickup_hour"] = df_ref["pickup_datetime"].dt.floor("H")
    
    # Add new columns with default values
    chunk["cnt_prev_1h"] = 0
    chunk["cnt_mean_prev_3h_pickups"] = 0.0
    chunk["cnt_mean_prev_3h_dropoffs"] = 0.0
    
    for idx, row in chunk.iterrows():
        # Get current trip attributes
        current_time = row["pickup_datetime"]
        current_hour = current_time.floor("H")
        pl_bin = row["pickup_lat_bin"]
        plon_bin = row["pickup_long_bin"]
        dl_bin = row["dropoff_lat_bin"]
        dlon_bin = row["dropoff_long_bin"]
        
        # Calculate 1-hour window
        t1_start = current_hour - pd.Timedelta(hours=1)
        t1_end = current_hour
        
        # Calculate 3-hour average window (T-4h to T-1h)
        t3_start = current_hour - pd.Timedelta(hours=4)
        t3_end = current_hour - pd.Timedelta(hours=1)
        
        # Get reference data subsets
        ref_1h = df_ref[
            (df_ref["pickup_hour"] >= t1_start) & 
            (df_ref["pickup_hour"] < t1_end)
        ]
        
        ref_3h = df_ref[
            (df_ref["pickup_hour"] >= t3_start) & 
            (df_ref["pickup_hour"] < t3_end)
        ]
        
        # Calculate 1-hour total count
        chunk.at[idx, "cnt_prev_1h"] = len(ref_1h)
        
        # Calculate 3-hour spatial averages
        pickup_count = len(ref_3h[
            (ref_3h["pickup_lat_bin"] == pl_bin) &
            (ref_3h["pickup_long_bin"] == plon_bin)
        ])
        
        dropoff_count = len(ref_3h[
            (ref_3h["dropoff_lat_bin"] == dl_bin) &
            (ref_3h["dropoff_long_bin"] == dlon_bin)
        ])
        
        chunk.at[idx, "cnt_mean_prev_3h_pickups"] = pickup_count / 3
        chunk.at[idx, "cnt_mean_prev_3h_dropoffs"] = dropoff_count / 3
    
    return chunk

def compute_spatial_temporal_aggregation_parallel(df, df_ref, n_jobs):
    """
    Compute spatial-temporal aggregated features for taxi trips in parallel.
    
    Args:
        df: DataFrame to compute features for (must contain pickup/dropoff bins and timestamps).
        df_ref: Reference DataFrame used for aggregation calculations.
        n_jobs: Number of parallel jobs to run. Default is -1 (use all available cores).
    
    Returns:
        DataFrame with computed features.
    """
    
    # Determine chunk size based on available cores
    num_chunks = min(n_jobs, len(df))
    chunk_size = len(df) // num_chunks if num_chunks > 0 else len(df)
    print(f"Available CPU Core: {n_jobs} | Chunk Size: {chunk_size}")
    
    # Split the DataFrame into chunks for parallel processing
    chunks = [df.iloc[i:i + chunk_size] for i in range(0, len(df), chunk_size)]
    
    # Process chunks in parallel
    results = Parallel(n_jobs=n_jobs)(
        delayed(process_chunk)(chunk, df_ref) for chunk in chunks
    )
    
    # Combine results into a single DataFrame
    return pd.concat(results, ignore_index=True)

CPU times: user 4 µs, sys: 1 µs, total: 5 µs
Wall time: 13.8 µs


In [17]:
%%time

# For training data:
df_train = compute_spatial_temporal_aggregation_parallel(df_train, df_train, 8)

Available CPU Core: 8 | Chunk Size: 122770
CPU times: user 28.3 s, sys: 13.5 s, total: 41.8 s
Wall time: 35min 17s


In [18]:
%%time

# For test data:
combined_ref = pd.concat([df_train, df_test], ignore_index=True)
df_test = compute_spatial_temporal_aggregation_parallel(df_test, combined_ref, 8)

Available CPU Core: 8 | Chunk Size: 36464
CPU times: user 38.9 s, sys: 10.6 s, total: 49.5 s
Wall time: 13min 58s


### Drop Redundant Columns

In [19]:
df_train.columns

Index(['id', 'vendor_id', 'pickup_datetime', 'passenger_count',
       'pickup_longitude', 'pickup_latitude', 'dropoff_longitude',
       'dropoff_latitude', 'store_and_fwd_flag', 'trip_duration',
       'pickup_pca0', 'pickup_pca1', 'dropoff_pca0', 'dropoff_pca1',
       'euclidean_distance', 'pickup_hour_of_day', 'day_of_week',
       'hour_of_week', 'month_of_year', 'day_of_year', 'week_of_year',
       'hour_of_year', 'pickup_date', 'date', 'r_depth', 's_fall', 's_depth',
       'all_precip', 'has_snow', 'has_rain', 'max_temp', 'min_temp',
       'pickup_lat_bin', 'pickup_long_bin', 'dropoff_lat_bin',
       'dropoff_long_bin', 'cnt_coords_bin_dp', 'cnt_coords_bin_p',
       'cnt_coords_bin_d', 'cnt_prev_1h', 'cnt_mean_prev_3h_pickups',
       'cnt_mean_prev_3h_dropoffs'],
      dtype='object')

In [20]:
%%time

# Delete redundant, intermediate columns
df_train.drop(columns=[
    "pickup_datetime", "pickup_date", "date",
    "pickup_longitude", "pickup_latitude", "dropoff_longitude", "dropoff_latitude",
    "pickup_lat_bin", "pickup_long_bin", "dropoff_lat_bin", "dropoff_long_bin"
], inplace=True)

gc.collect()

# Reorganize the columns to make `trip_duration` the target column in the end
df_train = df_train[[col for col in df_train.columns if col != "trip_duration"] + ["trip_duration"]]

df_train.columns

CPU times: user 532 ms, sys: 360 ms, total: 892 ms
Wall time: 891 ms


Index(['id', 'vendor_id', 'passenger_count', 'store_and_fwd_flag',
       'pickup_pca0', 'pickup_pca1', 'dropoff_pca0', 'dropoff_pca1',
       'euclidean_distance', 'pickup_hour_of_day', 'day_of_week',
       'hour_of_week', 'month_of_year', 'day_of_year', 'week_of_year',
       'hour_of_year', 'r_depth', 's_fall', 's_depth', 'all_precip',
       'has_snow', 'has_rain', 'max_temp', 'min_temp', 'cnt_coords_bin_dp',
       'cnt_coords_bin_p', 'cnt_coords_bin_d', 'cnt_prev_1h',
       'cnt_mean_prev_3h_pickups', 'cnt_mean_prev_3h_dropoffs',
       'trip_duration'],
      dtype='object')

In [21]:
df_train.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 982163 entries, 0 to 982162
Data columns (total 31 columns):
 #   Column                     Non-Null Count   Dtype  
---  ------                     --------------   -----  
 0   id                         982163 non-null  object 
 1   vendor_id                  982163 non-null  int64  
 2   passenger_count            982163 non-null  int64  
 3   store_and_fwd_flag         982163 non-null  object 
 4   pickup_pca0                982163 non-null  float64
 5   pickup_pca1                982163 non-null  float64
 6   dropoff_pca0               982163 non-null  float64
 7   dropoff_pca1               982163 non-null  float64
 8   euclidean_distance         982163 non-null  float64
 9   pickup_hour_of_day         982163 non-null  int32  
 10  day_of_week                982163 non-null  int64  
 11  hour_of_week               982163 non-null  int64  
 12  month_of_year              982163 non-null  int64  
 13  day_of_year                98

In [22]:
df_train.describe()

Unnamed: 0,vendor_id,passenger_count,pickup_pca0,pickup_pca1,dropoff_pca0,dropoff_pca1,euclidean_distance,pickup_hour_of_day,day_of_week,hour_of_week,...,all_precip,max_temp,min_temp,cnt_coords_bin_dp,cnt_coords_bin_p,cnt_coords_bin_d,cnt_prev_1h,cnt_mean_prev_3h_pickups,cnt_mean_prev_3h_dropoffs,trip_duration
count,982163.0,982163.0,982163.0,982163.0,982163.0,982163.0,982163.0,982163.0,982163.0,982163.0,...,982163.0,982163.0,982163.0,982163.0,982163.0,982163.0,982163.0,982163.0,982163.0,982163.0
mean,1.534791,1.669296,-0.001696,-0.03929,-0.003674,-0.019589,3.859115,13.700199,3.036553,86.577471,...,0.146849,59.544171,43.748533,968.38096,30085.929062,23902.928276,270.125312,8.146983,6.537344,945.804442
std,0.498788,1.314079,0.513579,0.823351,0.480059,0.939661,3.967002,6.351028,1.941386,46.491919,...,1.050329,16.925423,14.797519,1081.646917,18835.605582,17550.652579,81.562234,6.86924,6.392805,641.938199
min,1.0,0.0,-5.617779,-7.29959,-5.926344,-7.232434,0.100036,0.0,0.0,0.0,...,0.0,15.0,-1.0,0.0,0.0,0.0,0.0,0.0,0.0,301.0
25%,1.0,1.0,-0.079142,-0.405583,-0.141495,-0.468246,1.547194,9.0,1.0,45.0,...,0.0,46.0,32.0,156.0,15864.0,9226.0,241.0,2.666667,1.333333,509.0
50%,2.0,1.0,0.106446,0.059886,0.083917,0.070701,2.460965,14.0,3.0,88.0,...,0.0,59.0,44.0,685.0,27178.0,21705.0,280.0,6.333333,4.666667,760.0
75%,2.0,2.0,0.25212,0.424876,0.241034,0.472858,4.407869,19.0,5.0,126.0,...,0.04,73.0,54.0,1357.0,45476.0,35690.0,322.0,12.333333,10.0,1167.0
max,2.0,6.0,4.688035,11.410693,4.104426,9.376627,23.370427,23.0,6.0,167.0,...,29.61,92.0,73.0,5687.0,62262.0,54389.0,462.0,41.0,40.333333,7331.0


In [23]:
df_test.columns

Index(['id', 'vendor_id', 'pickup_datetime', 'passenger_count',
       'pickup_longitude', 'pickup_latitude', 'dropoff_longitude',
       'dropoff_latitude', 'store_and_fwd_flag', 'trip_duration',
       'pickup_pca0', 'pickup_pca1', 'dropoff_pca0', 'dropoff_pca1',
       'euclidean_distance', 'pickup_hour_of_day', 'day_of_week',
       'hour_of_week', 'month_of_year', 'day_of_year', 'week_of_year',
       'hour_of_year', 'pickup_date', 'date', 'r_depth', 's_fall', 's_depth',
       'all_precip', 'has_snow', 'has_rain', 'max_temp', 'min_temp',
       'pickup_lat_bin', 'pickup_long_bin', 'dropoff_lat_bin',
       'dropoff_long_bin', 'cnt_coords_bin_dp', 'cnt_coords_bin_p',
       'cnt_coords_bin_d', 'cnt_prev_1h', 'cnt_mean_prev_3h_pickups',
       'cnt_mean_prev_3h_dropoffs'],
      dtype='object')

In [24]:
%%time

# Delete redundant, intermediate columns
df_test.drop(columns=[
    "pickup_datetime", "pickup_date", "date",
    "pickup_longitude", "pickup_latitude", "dropoff_longitude", "dropoff_latitude",
    "pickup_lat_bin", "pickup_long_bin", "dropoff_lat_bin", "dropoff_long_bin"
], inplace=True)

gc.collect()

# Reorganize the columns to make `trip_duration` the target column in the end
df_test = df_test[[col for col in df_test.columns if col != "trip_duration"] + ["trip_duration"]]

df_test.columns

CPU times: user 303 ms, sys: 221 ms, total: 524 ms
Wall time: 523 ms


Index(['id', 'vendor_id', 'passenger_count', 'store_and_fwd_flag',
       'pickup_pca0', 'pickup_pca1', 'dropoff_pca0', 'dropoff_pca1',
       'euclidean_distance', 'pickup_hour_of_day', 'day_of_week',
       'hour_of_week', 'month_of_year', 'day_of_year', 'week_of_year',
       'hour_of_year', 'r_depth', 's_fall', 's_depth', 'all_precip',
       'has_snow', 'has_rain', 'max_temp', 'min_temp', 'cnt_coords_bin_dp',
       'cnt_coords_bin_p', 'cnt_coords_bin_d', 'cnt_prev_1h',
       'cnt_mean_prev_3h_pickups', 'cnt_mean_prev_3h_dropoffs',
       'trip_duration'],
      dtype='object')

In [25]:
df_test.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 291713 entries, 0 to 291712
Data columns (total 31 columns):
 #   Column                     Non-Null Count   Dtype  
---  ------                     --------------   -----  
 0   id                         291713 non-null  object 
 1   vendor_id                  291713 non-null  int64  
 2   passenger_count            291713 non-null  int64  
 3   store_and_fwd_flag         291713 non-null  object 
 4   pickup_pca0                291713 non-null  float64
 5   pickup_pca1                291713 non-null  float64
 6   dropoff_pca0               291713 non-null  float64
 7   dropoff_pca1               291713 non-null  float64
 8   euclidean_distance         291713 non-null  float64
 9   pickup_hour_of_day         291713 non-null  int32  
 10  day_of_week                291713 non-null  int64  
 11  hour_of_week               291713 non-null  int64  
 12  month_of_year              291713 non-null  int64  
 13  day_of_year                29

In [26]:
df_test.describe()

Unnamed: 0,vendor_id,passenger_count,pickup_pca0,pickup_pca1,dropoff_pca0,dropoff_pca1,euclidean_distance,pickup_hour_of_day,day_of_week,hour_of_week,...,all_precip,max_temp,min_temp,cnt_coords_bin_dp,cnt_coords_bin_p,cnt_coords_bin_d,cnt_prev_1h,cnt_mean_prev_3h_pickups,cnt_mean_prev_3h_dropoffs,trip_duration
count,291713.0,291713.0,291713.0,291713.0,291713.0,291713.0,291713.0,291713.0,291713.0,291713.0,...,291713.0,291713.0,291713.0,291713.0,291713.0,291713.0,291713.0,291713.0,291713.0,291713.0
mean,1.534351,1.667187,-0.00136,-0.010228,-0.003101,0.015176,3.439431,13.617384,3.053823,86.909147,...,0.148745,59.40122,43.615135,315.223463,8805.198424,7147.937127,346.38746,10.321529,8.341479,960.21541
std,0.498819,1.316759,0.502199,0.823313,0.481242,0.930334,3.95634,6.389152,1.953361,46.795565,...,1.042237,16.947452,14.8369,354.867913,5423.045435,4993.007779,107.054067,8.663103,7.952818,3254.264232
min,1.0,0.0,-9.752336,-7.094127,-14.335127,-7.094127,0.0,0.0,0.0,0.0,...,0.0,15.0,-1.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0
25%,1.0,1.0,-0.095278,-0.374895,-0.148776,-0.421688,1.233456,9.0,1.0,45.0,...,0.0,46.0,32.0,0.0,4518.0,3001.0,310.0,3.333333,1.666667,396.0
50%,2.0,1.0,0.10157,0.088641,0.079633,0.0968,2.094199,14.0,3.0,89.0,...,0.0,59.0,44.0,216.0,8456.0,6835.0,361.0,8.0,6.333333,663.0
75%,2.0,2.0,0.246469,0.462763,0.238757,0.50992,3.872186,19.0,5.0,128.0,...,0.04,73.0,54.0,479.0,13009.0,10488.0,415.0,15.666667,12.666667,1077.0
max,2.0,7.0,6.016428,27.156156,9.766639,27.055565,115.120552,23.0,6.0,167.0,...,29.61,92.0,73.0,1642.0,18041.0,15425.0,572.0,52.333333,52.0,86391.0


In [27]:
%%time

def recommend_scaling_methods(df):
    """
    Recommend normalization methods based on skewness and kurtosis of the data.
    Returns a dictionary where keys are column names and values are the recommended normalization methods.
    """
    scaling_recommendations = {}
    
    for column in df.select_dtypes(include=["int64", "float64", "int32", "float32"]).columns:
        # Skip 'vendor_id' column as it is binary & categorical
        if column == "vendor_id":
            continue
        
        # Check if the column contains mixed non-numeric values
        try:
            pd.to_numeric(df[column], errors="raise")
            is_mixed = False  # No mixed non-numeric values
        except ValueError:
            is_mixed = True  # Contains non-numeric values

        if is_mixed:
            print(f"Column '{column}' contains non-numeric data and cannot be normalized.")
            scaling_recommendations[column] = None
            continue
        
        # Compute skewness and kurtosis
        col_skewness = skew(df[column].dropna())  # Calculate skewness
        col_kurtosis = kurtosis(df[column].dropna())  # Calculate kurtosis
        
        # Count unique values and display up to 10 unique values
        unique_values = df[column].dropna().unique()
        unique_count = len(unique_values)
        print(f"Column '{column}' has {unique_count} unique values, first 10: {unique_values[:10]}")
        
        # Determine the appropriate normalization method based on skewness and kurtosis
        if abs(col_skewness) < 0.5 and abs(col_kurtosis) < 3:
            scaling_recommendations[column] = "StandardScaler"
            print(f"Column '{column}' is approximately normally distributed. Recommended: StandardScaler.")
        else:
            scaling_recommendations[column] = "MinMaxScaler"
            print(f"Column '{column}' is not normally distributed. Recommended: MinMaxScaler.")
    
    return scaling_recommendations

CPU times: user 4 µs, sys: 0 ns, total: 4 µs
Wall time: 15.5 µs


In [28]:
scaling_recommendations = recommend_scaling_methods(df_train)
scaling_recommendations

Column 'passenger_count' has 7 unique values, first 10: [1 4 3 2 5 6 0]
Column 'passenger_count' is not normally distributed. Recommended: MinMaxScaler.
Column 'pickup_pca0' has 941889 unique values, first 10: [-1.46272668 -0.22378877 -0.07564223  0.11820193  0.05723002  0.20927621
  0.31676871  0.13715185 -0.30037711 -0.13259448]
Column 'pickup_pca0' is not normally distributed. Recommended: MinMaxScaler.
Column 'pickup_pca1' has 941889 unique values, first 10: [ 0.34018641  0.89645261  0.97908039 -0.41451582  0.04281858 -0.12385815
 -0.51255053  0.66609366  0.72419467  1.59961465]
Column 'pickup_pca1' is not normally distributed. Recommended: MinMaxScaler.
Column 'dropoff_pca0' has 962683 unique values, first 10: [-1.06862767  0.02020165  0.19286223  0.34265458  0.04060286  0.45233916
  0.22020595 -0.06132744  0.25546429 -0.25607666]
Column 'dropoff_pca0' is not normally distributed. Recommended: MinMaxScaler.
Column 'dropoff_pca1' has 962683 unique values, first 10: [-0.25515351  0.

{'passenger_count': 'MinMaxScaler',
 'pickup_pca0': 'MinMaxScaler',
 'pickup_pca1': 'MinMaxScaler',
 'dropoff_pca0': 'MinMaxScaler',
 'dropoff_pca1': 'MinMaxScaler',
 'euclidean_distance': 'MinMaxScaler',
 'pickup_hour_of_day': 'StandardScaler',
 'day_of_week': 'StandardScaler',
 'hour_of_week': 'StandardScaler',
 'month_of_year': 'StandardScaler',
 'day_of_year': 'StandardScaler',
 'week_of_year': 'MinMaxScaler',
 'hour_of_year': 'StandardScaler',
 'r_depth': 'MinMaxScaler',
 's_fall': 'MinMaxScaler',
 's_depth': 'MinMaxScaler',
 'all_precip': 'MinMaxScaler',
 'max_temp': 'StandardScaler',
 'min_temp': 'StandardScaler',
 'cnt_coords_bin_dp': 'MinMaxScaler',
 'cnt_coords_bin_p': 'StandardScaler',
 'cnt_coords_bin_d': 'StandardScaler',
 'cnt_prev_1h': 'MinMaxScaler',
 'cnt_mean_prev_3h_pickups': 'MinMaxScaler',
 'cnt_mean_prev_3h_dropoffs': 'MinMaxScaler',
 'trip_duration': 'MinMaxScaler'}

### One-Hot Encoding Categorical Data

In [29]:
%%time

# Convert boolean columns to integers
bool_columns = ["has_snow", "has_rain"]
df_train[bool_columns] = df_train[bool_columns].astype(int)
df_test[bool_columns] = df_test[bool_columns].astype(int)
    
# Process the vendor_id column
if "vendor_id" in df_train.columns:
    df_train["vendor_id"] = df_train["vendor_id"] - 1
    
if "vendor_id" in df_test.columns:
    df_test["vendor_id"] = df_test["vendor_id"] - 1
    
# Progress the flag column
df_train["store_and_fwd_flag"] = df_train["store_and_fwd_flag"].apply(lambda x: 0 if x == "Y" else 1)
df_test["store_and_fwd_flag"] = df_test["store_and_fwd_flag"].apply(lambda x: 0 if x == "Y" else 1)

CPU times: user 302 ms, sys: 30.4 ms, total: 332 ms
Wall time: 331 ms


### Standardization & Normalization

In [30]:
%%time

def scale_data(df_train: pd.DataFrame, df_test: pd.DataFrame, scaling_dict: dict):
    """
    Scales the training and testing data based on the provided scaling dictionary.
    
    Parameters:
    df_train (pd.DataFrame): Training dataset
    df_test (pd.DataFrame): Testing dataset
    scaling_dict (dict): Dictionary specifying the scaler type for each column
    
    Returns:
    tuple: Transformed df_train, df_test, and the scaler used for 'trip_duration' if present
    """
    
    # Initialize variable to store scaler for 'trip_duration'
    trip_duration_scaler = None
    
    for col, scaler_name in scaling_dict.items():
        if scaler_name == "MinMaxScaler":
            scaler = MinMaxScaler()
        elif scaler_name == "StandardScaler":
            scaler = StandardScaler()
        else:
            raise ValueError(f"Unsupported scaler: {scaler_name}")
            
        # Apply scaling if column exists in both train and test datasets
        if col in df_train.columns and col in df_test.columns:
            # Fit and transform training data
            df_train[col] = scaler.fit_transform(df_train[[col]])
            
            # Transform test data using the same scaler
            df_test[col] = scaler.transform(df_test[[col]])
        
            # Store the scaler used for 'trip_duration' if it exists
            if col == "trip_duration":
                trip_duration_scaler = scaler
        else:
            raise ValueError(f"Unsupported column: {col}")
    
    return df_train, df_test, trip_duration_scaler

df_train, df_test, trip_duration_scaler = scale_data(
    df_train, df_test, scaling_recommendations
)

CPU times: user 440 ms, sys: 54.5 ms, total: 495 ms
Wall time: 492 ms


In [31]:
%%time

df_train.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 982163 entries, 0 to 982162
Data columns (total 31 columns):
 #   Column                     Non-Null Count   Dtype  
---  ------                     --------------   -----  
 0   id                         982163 non-null  object 
 1   vendor_id                  982163 non-null  int64  
 2   passenger_count            982163 non-null  float64
 3   store_and_fwd_flag         982163 non-null  int64  
 4   pickup_pca0                982163 non-null  float64
 5   pickup_pca1                982163 non-null  float64
 6   dropoff_pca0               982163 non-null  float64
 7   dropoff_pca1               982163 non-null  float64
 8   euclidean_distance         982163 non-null  float64
 9   pickup_hour_of_day         982163 non-null  float64
 10  day_of_week                982163 non-null  float64
 11  hour_of_week               982163 non-null  float64
 12  month_of_year              982163 non-null  float64
 13  day_of_year                98

In [32]:
df_test.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 291713 entries, 0 to 291712
Data columns (total 31 columns):
 #   Column                     Non-Null Count   Dtype  
---  ------                     --------------   -----  
 0   id                         291713 non-null  object 
 1   vendor_id                  291713 non-null  int64  
 2   passenger_count            291713 non-null  float64
 3   store_and_fwd_flag         291713 non-null  int64  
 4   pickup_pca0                291713 non-null  float64
 5   pickup_pca1                291713 non-null  float64
 6   dropoff_pca0               291713 non-null  float64
 7   dropoff_pca1               291713 non-null  float64
 8   euclidean_distance         291713 non-null  float64
 9   pickup_hour_of_day         291713 non-null  float64
 10  day_of_week                291713 non-null  float64
 11  hour_of_week               291713 non-null  float64
 12  month_of_year              291713 non-null  float64
 13  day_of_year                29

### Save Data in parquet

In [33]:
%%time

# Ensure the 'prep' directory exists
os.makedirs(os.path.join("data", "prep"), exist_ok=True)

# Save to Parquet format
df_train.to_parquet("data/prep/df_train.parquet", index=False)
df_test.to_parquet("data/prep/df_test.parquet", index=False)

print("df_train and df_test saved to 'prep' directory as Parquet files.")

df_train and df_test saved to 'prep' directory as Parquet files.
CPU times: user 1.11 s, sys: 265 ms, total: 1.37 s
Wall time: 1.3 s


---