# NYC Taxi Duration Prediction Pipeline
**Author:** Ali Ahmed  
**Role:** Associate ML/MLOps Engineer  
**Contact:** [ðŸ“§ Email](mailto:ali.ahmed.nour14@gmail.com) | [ðŸ“± Phone](tel:+201007871314) | [ðŸ”— LinkedIn](https://www.linkedin.com/in/ali-ahmed-nour/)

**Status:** Development / Production-Ready Simulation

---

## Project Overview
This project implements a professional data pipeline designed with MLOps best practices:
* **Automation Ready:** Modular code structure prepared for orchestration.
* **Data Versioning Support:** Tiered storage (raw/processed) for better data lineage.
* **Portability:** Environment-agnostic path management for seamless deployment.

## ðŸš• Table of Contents
1. [Environment Setup & Ingestion](#1.-Environment-Setup-&-Ingestion)
    * [1.1 Imports and Setup](#1.1-Imports-and-Setup)
    * [1.2 Global Configuration (The SSoT Layer)](#1.2-Global-Configuration)
    * [1.3 Data Ingestion Engine](#1.3-Data-Ingestion-Engine)
2. [Data Engineering & Preprocessing](#2.-Data-Engineering-&-Preprocessing)
    * [2.1 Feature Engineering Logic](#2.1-Feature-Engineering-Logic)
    * [2.2 Memory Optimization (SSoT for Dtypes)](#2.2-Memory-Optimization)
3. [Feature Transformation & Vectorization](#3.-Feature-Transformation)
4. [Baseline Modeling: Linear Regression](#4.-Baseline-Modeling:-Linear-Regression)
    * [4.1 Model Training & Baseline RMSE](#4.1-Model-Training-&-Baseline-RMSE)
    * [4.2 Model Persistence (Artifact Management)](#4.2-Model-Persistence)
5. [Model Validation (Out-of-Sample)](#5.-Model-Validation-(Out-of-Sample))

## 1. Environment Setup & Ingestion
*(Phase 1: Preparing tools, configurations, and fetching raw data)*

### 1.1 Imports and Setup
In this section, I import all necessary libraries. I include **XGBoost** and **CatBoost** from the start to ensure all dependencies are resolved early for the entire pipeline.

In [38]:
from pathlib import Path
import pickle
from datetime import datetime
from typing import TypedDict, List, Dict, Any, cast  # Add List, Dict, Any, and cast

import pandas as pd

# import xgboost as xgb
# from catboost import CatBoostRegressor
from sklearn.feature_extraction import DictVectorizer
from sklearn.linear_model import LinearRegression
from sklearn.metrics import root_mean_squared_error

# Setting pandas display options for professional logging
pd.options.display.max_columns = None

### 1.2 Global Configuration (The SSoT Layer)
To avoid hard-coding and ensure consistency, I've established a **Single Source of Truth (SSoT)**. In this section, I establish a centralized configuration layer to manage environment-specific variables. By defining a `BASE_DIR` and utilizing `os.path.join`, I ensure the pipeline remains:

* **Environment Agnostic:** My code is designed to work seamlessly across Linux, macOS, and Windows.
* **Scalable & Portable:** All paths (raw, processed, and model artifacts) as well as feature schemas are managed through a central configuration object. This allows for easy updates for different taxi types or time periods.
* **Type-Safe:** I use `TypedDict` to provide explicit type hinting, which improves code maintainability and reduces runtime errors.

In [35]:
# Updated Configuration to handle Dynamic Versioning
class ProjectConfig(TypedDict):
    taxi_type: str
    year: int
    month: int
    data_url: str
    raw_path: str
    processed_path: str
    model_path: str
    categorical_features: list[str]
    numerical_features: list[str]
    all_features: list[str]
    target: str


def get_config(
    taxi_type: str = "yellow", year: int = 2025, month: int = 10
) -> ProjectConfig:
    """
    Centralized configuration with Dynamic Date Versioning for artifacts.
    """
    # Root directory resolution for WSL structure
    base_dir = Path.cwd().parent

    # 1. Versioning Logic: Get current date (SSoT for file naming)
    today = datetime.now().strftime("%Y-%m-%d")

    # Path definitions matching your terminal tree
    raw_dir = base_dir / "data" / "raw"
    proc_dir = base_dir / "data" / "processed"
    model_dir = base_dir / "models"

    # Ensure structure exists using Pathlib
    for d in [raw_dir, proc_dir, model_dir]:
        d.mkdir(parents=True, exist_ok=True)

    file_name = f"{taxi_type}_tripdata_{year:04d}-{month:02d}.parquet"

    # 2. Dynamic Model Name: Includes training month AND current date
    model_name = f"model_{taxi_type}_{year:04d}-{month:02d}_v_{today}.bin"

    # SSoT Feature sets
    cat_features = ["PULocationID", "DOLocationID"]
    num_features = ["trip_distance"]

    # We define the config object explicitly to ensure strict type compliance
    config: ProjectConfig = {
        "taxi_type": taxi_type,
        "year": year,
        "month": month,
        "data_url": f"https://d37ci6vzurychx.cloudfront.net/trip-data/{file_name}",
        "raw_path": str(raw_dir / file_name),
        "processed_path": str(proc_dir / file_name),
        "model_path": str(model_dir / model_name),
        "categorical_features": cat_features,
        "numerical_features": num_features,
        "all_features": cat_features + num_features,
        "target": "duration",
    }

    return config


# Initialize Training Config
cfg = get_config(year=2025, month=10)

# Ruff fix: f-strings now contain variables, removing F541 warning
print(
    f"LOG: SSoT Config initialized for {cfg['taxi_type']} taxi (Period: {cfg['year']}-{cfg['month']:02d})."
)
print(f"LOG: Model will be saved as: {Path(cfg['model_path']).name}")

LOG: SSoT Config initialized for yellow taxi (Period: 2025-10).
LOG: Model will be saved as: model_yellow_2025-10_v_2025-12-29.bin


### 1.3 Data Ingestion Engine
In this stage, I implement an **Idempotent Ingestion Engine** to handle data loading. My goal is to fetch the NYC Taxi dataset from the official Cloudfront repository and store it in the `data/raw` directory. 

**Professional Standards Applied:**
* **Idempotency & Caching:** The engine checks if the data already exists in the `raw_path` defined in our SSoT to avoid redundant downloads, saving bandwidth and execution time.
* **Separation of Concerns:** The logic for "where" the data is and "how" to get it is encapsulated in a modular function, separated from the main execution flow.
* **Automated Logging:** I included status updates to track the ingestion progress and file paths based on the centralized configuration.

In [28]:
def ingest_data(config: ProjectConfig) -> pd.DataFrame:
    """
    Downloads and loads the parquet data based on SSoT config using Pathlib.
    Includes caching and error handling for production environments.
    """
    # Use Path object for modern path manipulation
    raw_path = Path(config["raw_path"])

    # Check if data exists in local storage
    if not raw_path.exists():
        try:
            print(f"LOG: Downloading data from {config['data_url']}...")
            df = pd.read_parquet(config["data_url"])

            # Ensure the directory structure is ready before saving
            raw_path.parent.mkdir(parents=True, exist_ok=True)
            df.to_parquet(raw_path)
            print(f"LOG: Data successfully cached at: {raw_path.name}")

        except Exception as e:
            print(f"ERROR: Failed to fetch data from cloud. Exception: {e}")
            raise
    else:
        # Load directly from cache if available
        print(f"LOG: Data already exists at {raw_path.name}. Loading locally...")
        df = pd.read_parquet(raw_path)

    return df


# Executing ingestion using the October 2025 SSoT config
df = ingest_data(cfg)
print(f"LOG: Raw data shape: {df.shape}")

LOG: Data already exists at yellow_tripdata_2025-10.parquet. Loading locally...
LOG: Raw data shape: (4428699, 20)


## 2. Data Engineering & Preprocessing
*(Phase 2: Cleaning, transforming, and optimizing the dataset)*

In this stage, I transform raw taxi records into a format suitable for machine learning using high professional standards:
* **SSoT Feature Mapping:** Instead of hard-coding column names, I dynamically pull them from the `ProjectConfig`. This ensures that any schema changes in the source data only need to be updated in one place.
* **Target Engineering:** I derive the `duration` variable from pickup and dropoff timestamps and mitigate outliers by filtering for trips between 1 and 60 minutes.
* **Memory Optimization & Type Integrity:** I convert categorical location IDs to strings. This ensures the `DictVectorizer` treats them as discrete entities rather than continuous numbers, while maintaining a memory-efficient workflow.

In [37]:
def preprocess_data(df: pd.DataFrame, config: ProjectConfig) -> pd.DataFrame:
    """
    Cleans data, derives target, and enforces SSoT feature types.
    """
    # 1. Target Derivation: Calculate duration in minutes
    df["duration"] = df.tpep_dropoff_datetime - df.tpep_pickup_datetime
    df["duration"] = df["duration"].apply(lambda td: td.total_seconds() / 60)

    # 2. Outlier Mitigation (Operational Range: 1-60 mins)
    df = df[(df.duration >= 1) & (df.duration <= 60)].copy()

    # 3. Memory Optimization & Categorical Integrity
    # Dynamically fetching feature names from SSoT config
    cat_features = config["categorical_features"]
    df[cat_features] = df[cat_features].astype(str)

    return df


# Start Data Engineering Phase
print("LOG: Starting Data Engineering and Preprocessing...")
df_processed = preprocess_data(df, cfg)

print("LOG: Preprocessing complete.")
print(f"LOG: Final Shape: {df_processed.shape}")
print(f"LOG: Average Duration: {df_processed.duration.mean():.2f} minutes")

LOG: Starting Data Engineering and Preprocessing...
LOG: Preprocessing complete.
LOG: Final Shape: (4198802, 21)
LOG: Average Duration: 17.32 minutes


### 2.2 Memory Management (SSoT for Data Types)
To handle millions of rows efficiently, I enforce a strict schema for data types. This optimization phase ensures the pipeline remains scalable:

* **Vectorized Optimization:** I use a vectorized approach to downcast numerical features, ensuring high performance and significantly reducing the CPU overhead.
* **Consistency & Footprint:** This ensures that both training and validation data occupy the minimum memory footprint (e.g., using `float32` instead of `float64`), which is critical for cloud-based training environments.
* **SSoT Integration:** The function dynamically identifies numerical columns and the target from the centralized `ProjectConfig`, maintaining a Single Source of Truth for the entire data schema.

In [30]:
def optimize_memory_vectorized(df: pd.DataFrame, config: ProjectConfig) -> pd.DataFrame:
    """
    Optimizes memory usage by downcasting numerical types based on SSoT config.
    """
    # 1. Calculate memory BEFORE optimization
    mem_before = df.memory_usage(deep=True).sum() / 1024**2

    # 2. Identify numerical columns from SSoT (Features + Target)
    # This prevents hard-coding and respects the schema defined in Section 1.2
    num_cols = config["numerical_features"] + [config["target"]]

    # 3. Downcast to float32 (Industry standard for ML precision/memory balance)
    df[num_cols] = df[num_cols].astype("float32")

    # 4. Calculate memory AFTER optimization
    mem_after = df.memory_usage(deep=True).sum() / 1024**2
    improvement = ((mem_before - mem_after) / mem_before) * 100

    print(f"LOG: Memory Optimization Report for {config['taxi_type']} dataset:")
    print(f"   - BEFORE: {mem_before:.2f} MB")
    print(f"   - AFTER: {mem_after:.2f} MB")
    print(f"   - Reduction: {improvement:.2f}%")

    return df


# Execute Memory Optimization
df_processed = optimize_memory_vectorized(df_processed, cfg)

LOG: Memory Optimization Report:
   - BEFORE: 1183.29 MB
   - AFTER: 1151.25 MB
   - Improvement Percentage: 2.71%


## 3. Data Transformation (Vectorization)
*(Phase 3: Converting dataframes into numerical matrices)*

In this stage, I convert the processed categorical and numerical features into a format that the machine learning model can understand. This transformation is crucial for several reasons:

* **One-Hot Encoding & Efficiency:** I use `DictVectorizer` to handle categorical location IDs, which creates a **sparse matrix**. This optimizes memory usage and ensures the model can interpret discrete categories correctly.
* **SSoT Alignment:** Instead of manually selecting columns, I dynamically utilize the `all_features` list from the `ProjectConfig`. This guarantees a consistent and reproducible feature order.
* **Consistency for Production:** The `DictVectorizer` (dv) fitted here becomes the **"Source of Truth"** for all future data (Validation/Production), ensuring that feature schemas remain synchronized.

In [None]:
# 1. Feature & Target Selection using SSoT
# We pull definitions directly from the config object
features = cfg["all_features"]
target = cfg["target"]

# 2. Convert DataFrame to List of Dictionaries
# This is a memory-efficient way to pass data to the DictVectorizer
train_dicts = df_processed[features].to_dict(orient="records")

# 3. Fit and Transform the Vectorizer
# 'dv' will be saved later to be used in the inference pipeline
dv = DictVectorizer()
X_train = dv.fit_transform(train_dicts)

# 4. Extract Target Vector
y_train = df_processed[target].to_numpy()

print("LOG: Feature Transformation complete.")
print(f"LOG: Training Matrix Shape: {X_train.shape}")
print(f"LOG: DictVectorizer successfully mapped {len(dv.feature_names_)} features.")

LOG: Feature Transformation complete.
LOG: Training Matrix Shape: (4198802, 523)
LOG: DictVectorizer successfully mapped 523 features.


## 4. Baseline Modeling & Persistence
*(Phase 4: Establishing a performance benchmark and managing artifacts)*

In this final stage, I establish a **Baseline** for the project and ensure the persistence of the training results following our **Single Source of Truth (SSoT)** architecture:

* **Baseline Establishment & RMSE:** I train a **Linear Regression** model to serve as our performance benchmark. Using Root Mean Squared Error (RMSE), I establish a "score to beat" to verify the predictive power of features dynamically pulled from the `ProjectConfig`.
* **Artifact Synchronization:** I serialize both the model and the `DictVectorizer` into a single binary file. This is a critical MLOps practice that prevents **"Schema Skew"** during inference by ensuring the preprocessor and model remain perfectly synchronized.
* **SSoT Persistence:** I utilize the `model_path` defined in our centralized configuration to ensure artifacts are stored automatically in the correct project directory (`models/`), ensuring reproducibility and pipeline alignment.

In [36]:
# 1. Model Initialization
# Linear Regression provides a fast, interpretable benchmark
lr = LinearRegression()

# 2. Model Training
# SSoT: Training on the feature matrix generated in Section 3
lr.fit(X_train, y_train)

# 3. Evaluation
# Calculating the 'score to beat' for future model iterations
y_pred = lr.predict(X_train)
rmse_train = root_mean_squared_error(y_train, y_pred)

print(f"LOG: Baseline model ({lr.__class__.__name__}) training complete.")
print(f"LOG: Training RMSE for {cfg['taxi_type']} taxi: {rmse_train:.2f} minutes")

LOG: Baseline model (LinearRegression) training complete.
LOG: Training RMSE for yellow taxi: 9.55 minutes


### 4.2 Model Persistence (Serialization)
In a production environment, it is crucial to save the trained model and the vectorizer to maintain consistency.

* **The "Source of Truth" Bundle:** I save both the `DictVectorizer` (which holds the feature schema) and the `LinearRegression` model as a single binary file using `pickle`.
* **Standardization:** This ensures that during inference or validation, we use the exact same feature mapping (the 523 indices in our case) that the model was trained on, preventing **Training-Serving Skew**.
* **Reproducibility:** Saving these artifacts allows us to load the model in other environments without needing to re-run the entire training pipeline, ensuring the portability of our SSoT.

In [33]:
# 1. Path Resolution from SSoT (Casting to Path object)
model_path = Path(cfg["model_path"])

# 2. Logic: Overwrite with Logging using Pathlib properties
if model_path.exists():
    print(
        f"INFO: Artifact already exists. Overwriting current version: {model_path.name}"
    )
else:
    print(f"LOG: Creating new artifact: {model_path.name}")

# 3. Serialization (Using pathlib's context manager for cleaner I/O)
with model_path.open("wb") as f_out:
    pickle.dump((dv, lr), f_out)

# 4. Verification using Pathlib stat() method
artifact_size = model_path.stat().st_size / 1024**2
print(f"LOG: Successfully saved to: {model_path}")
print(f"LOG: Final Artifact size: {artifact_size:.2f} MB")

INFO: Artifact already exists. Overwriting current version: model_yellow_2025-10_v_2025-12-29.bin
LOG: Successfully saved to: /mnt/e/VScode/nyc-taxi-duration-mlops/models/model_yellow_2025-10_v_2025-12-29.bin
LOG: Final Artifact size: 0.02 MB


## 5. Model Validation (Out-of-Sample)
In this phase, I evaluate the model's performance on unseen data (November 2025). 
* **The Goal:** To ensure the model generalizes well to new data and maintains a similar RMSE to the training phase.
* **The Pipeline:** I will reuse the `prepare_features` and `optimize_memory_vectorized` functions to maintain consistency.

In [34]:
# 1. Initialize Validation Configuration for November 2025
# Use centralized config logic for out-of-sample data validation
val_cfg = get_config(year=2025, month=11)

# 2. Ingest and Prepare Validation Data
print(f"LOG: Starting validation pipeline for {val_cfg['month']}/{val_cfg['year']}...")
df_val_raw = ingest_data(val_cfg)

# 3. Preprocessing and Memory Optimization
# Ensure training and validation datasets undergo identical transformations
df_val = preprocess_data(df_val_raw, val_cfg)
df_val = optimize_memory_vectorized(df_val, val_cfg)

# 4. Feature Transformation (CRITICAL: transform only)
# Reuse the DictVectorizer fitted on training data to prevent feature mismatch
# Use cast to ensure type safety without runtime overhead
val_dicts = df_val[val_cfg["all_features"]].to_dict(orient="records")
X_val = dv.transform(iter(cast(List[Dict[str, Any]], val_dicts)))
y_val = df_val[val_cfg["target"]].to_numpy()

# 5. Model Evaluation
# Generate predictions using the Linear Regression model trained on October data
y_pred_val = lr.predict(X_val)
rmse_val = root_mean_squared_error(y_val, y_pred_val)

print(f"LOG: Validation complete for {val_cfg['taxi_type']} taxi.")
print(f"LOG: Validation RMSE: {rmse_val:.2f} minutes")
print(f"LOG: Training RMSE was: {rmse_train:.2f} minutes")

LOG: Starting validation pipeline for 11/2025...
LOG: Data already exists at yellow_tripdata_2025-11.parquet. Loading locally...
LOG: Memory Optimization Report:
   - BEFORE: 1118.86 MB
   - AFTER: 1088.52 MB
   - Improvement Percentage: 2.71%
LOG: Validation complete for yellow taxi.
LOG: Validation RMSE: 9.48 minutes
LOG: Training RMSE was: 9.55 minutes
