# NYC Taxi ML Pipeline - Jupyter + Spark Connect

## End-to-End ML Pipeline with EDA, Feature Selection, and Model Training

This notebook demonstrates:
1. **Spark Connect** connection for distributed data processing
2. **EDA** - Exploratory data analysis with visualizations
3. **Feature Selection** - Correlation analysis and feature importance
4. **Model Training** - sklearn models with proper train/test split
5. **Model Testing** - Cross-validation and performance metrics
6. **Metrics Export** - Push metrics to Prometheus Pushgateway

## 1. Configuration & Imports

In [None]:
import os
import sys
import json
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')

# Spark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *

# Data processing
import pandas as pd
import numpy as np

# ML
from sklearn.model_selection import train_test_split, cross_val_score, TimeSeriesSplit
from sklearn.ensemble import GradientBoostingRegressor, RandomForestRegressor
from sklearn.linear_model import Ridge
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics import mean_absolute_percentage_error, mean_squared_error, r2_score
import joblib

# Visualization
import matplotlib.pyplot as plt
import seaborn as sns

# Metrics export
import requests
from prometheus_client import CollectorRegistry, Gauge, push_to_gateway

print(f"Python: {sys.version}")
print(f"Pandas: {pd.__version__}")
print(f"NumPy: {np.__version__}")

In [None]:
# Configuration
CONFIG = {
    # Spark Connect
    'spark_connect_host': os.environ.get('SPARK_CONNECT_HOST', 'scenario1-spark-35-connect'),
    'spark_connect_port': os.environ.get('SPARK_CONNECT_PORT', '15002'),
    
    # MinIO
    'minio_endpoint': os.environ.get('S3_ENDPOINT', 'http://minio.spark-infra.svc.cluster.local:9000'),
    'minio_access_key': os.environ.get('AWS_ACCESS_KEY_ID', 'minioadmin'),
    'minio_secret_key': os.environ.get('AWS_SECRET_ACCESS_KEY', 'minioadmin'),
    
    # Prometheus
    'pushgateway_url': os.environ.get('PUSHGATEWAY_URL', 'http://prometheus-pushgateway.spark-operations:9091'),
    
    # ML
    'test_size': 0.2,
    'cv_folds': 5,
    'random_state': 42,
    
    # Experiment
    'experiment_name': 'nyc_taxi_revenue_prediction',
    'model_version': datetime.now().strftime('%Y%m%d_%H%M'),
}

print("Configuration:")
for k, v in CONFIG.items():
    if 'secret' not in k.lower():
        print(f"  {k}: {v}")

## 2. Spark Connect Session

In [None]:
# Create Spark session via Spark Connect
connect_url = f"sc://{CONFIG['spark_connect_host']}:{CONFIG['spark_connect_port']}"

spark = SparkSession.builder \
    .remote(connect_url) \
    .appName(f"nyc-taxi-ml-{CONFIG['model_version']}") \
    .config("spark.hadoop.fs.s3a.endpoint", CONFIG['minio_endpoint']) \
    .config("spark.hadoop.fs.s3a.access.key", CONFIG['minio_access_key']) \
    .config("spark.hadoop.fs.s3a.secret.key", CONFIG['minio_secret_key']) \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

print(f"Spark App ID: {spark.sparkContext.applicationId}")
print(f"Spark Version: {spark.version}")
spark

## 3. Data Loading & EDA

In [None]:
# Load raw taxi data from MinIO
raw_path = "s3a://nyc-taxi/raw/*.parquet"

print(f"Loading data from {raw_path}...")
df_raw = spark.read.parquet(raw_path)

# Record metrics
total_records = df_raw.count()
print(f"Total records: {total_records:,}")

# Schema overview
print("\nSchema:")
df_raw.printSchema()

In [None]:
# Sample data
print("Sample data:")
df_raw.show(5, truncate=False)

In [None]:
# Data quality: filter invalid records
df_clean = df_raw.filter(
    (F.col("trip_distance") > 0) &
    (F.col("trip_distance") < 100) &  # Remove outliers
    (F.col("fare_amount") > 0) &
    (F.col("total_amount") > 0) &
    (F.col("tpep_pickup_datetime") >= "2023-01-01") &
    (F.col("tpep_pickup_datetime") < "2025-01-01")
)

clean_records = df_clean.count()
print(f"Clean records: {clean_records:,} ({100*clean_records/total_records:.1f}% of raw)")

In [None]:
# Daily statistics for EDA
daily_stats = df_clean \
    .withColumn("pickup_date", F.to_date("tpep_pickup_datetime")) \
    .groupBy("pickup_date") \
    .agg(
        F.count("*").alias("trip_count"),
        F.sum("total_amount").alias("total_revenue"),
        F.avg("trip_distance").alias("avg_distance"),
        F.avg("fare_amount").alias("avg_fare")
    ) \
    .orderBy("pickup_date") \
    .toPandas()

print(f"Daily stats: {len(daily_stats)} days")
daily_stats.head()

In [None]:
# EDA Visualizations
fig, axes = plt.subplots(2, 2, figsize=(14, 10))

# Daily trip count
axes[0, 0].plot(daily_stats['pickup_date'], daily_stats['trip_count'])
axes[0, 0].set_title('Daily Trip Count')
axes[0, 0].set_xlabel('Date')
axes[0, 0].tick_params(axis='x', rotation=45)

# Daily revenue
axes[0, 1].plot(daily_stats['pickup_date'], daily_stats['total_revenue'])
axes[0, 1].set_title('Daily Revenue')
axes[0, 1].set_xlabel('Date')
axes[0, 1].tick_params(axis='x', rotation=45)

# Trip count distribution
axes[1, 0].hist(daily_stats['trip_count'], bins=50, edgecolor='black')
axes[1, 0].set_title('Trip Count Distribution')
axes[1, 0].set_xlabel('Trips per Day')

# Revenue vs Distance
axes[1, 1].scatter(daily_stats['avg_distance'], daily_stats['total_revenue'], alpha=0.5)
axes[1, 1].set_title('Revenue vs Avg Distance')
axes[1, 1].set_xlabel('Avg Distance (miles)')
axes[1, 1].set_ylabel('Total Revenue')

plt.tight_layout()
plt.savefig('/tmp/eda_plots.png', dpi=150)
plt.show()

## 4. Feature Engineering

In [None]:
# US Holidays
HOLIDAYS_2023 = ["2023-01-01", "2023-01-16", "2023-02-20", "2023-05-29", "2023-06-19",
                 "2023-07-04", "2023-09-04", "2023-10-09", "2023-11-11", "2023-11-23", "2023-12-25"]
HOLIDAYS_2024 = ["2024-01-01", "2024-01-15", "2024-02-19", "2024-05-27", "2024-06-19",
                 "2024-07-04", "2024-09-02", "2024-10-14", "2024-11-11", "2024-11-28", "2024-12-25"]
ALL_HOLIDAYS = HOLIDAYS_2023 + HOLIDAYS_2024

def add_features(df):
    """Add temporal, geospatial, and derived features."""
    
    # Temporal features
    df = df \
        .withColumn("pickup_date", F.to_date("tpep_pickup_datetime")) \
        .withColumn("hour_of_day", F.hour("tpep_pickup_datetime")) \
        .withColumn("day_of_week", F.dayofweek("tpep_pickup_datetime")) \
        .withColumn("day_of_month", F.dayofmonth("tpep_pickup_datetime")) \
        .withColumn("month", F.month("tpep_pickup_datetime")) \
        .withColumn("year", F.year("tpep_pickup_datetime")) \
        .withColumn("is_weekend", F.when(F.col("day_of_week").isin([1, 7]), 1).otherwise(0)) \
        .withColumn("is_rush_hour", F.when(
            (F.col("hour_of_day").between(7, 9)) | (F.col("hour_of_day").between(17, 19)), 1
        ).otherwise(0)) \
        .withColumn("is_holiday", F.when(F.col("pickup_date").isin(ALL_HOLIDAYS), 1).otherwise(0)) \
        .withColumn("time_of_day", 
            F.when(F.col("hour_of_day").between(6, 11), "morning")
            .when(F.col("hour_of_day").between(12, 17), "afternoon")
            .when(F.col("hour_of_day").between(18, 21), "evening")
            .otherwise("night"))
    
    # Borough mapping (simplified)
    df = df.withColumn("pickup_borough",
        F.when(F.col("PULocationID").between(1, 10), "Staten Island")
         .when(F.col("PULocationID").between(11, 50), "Brooklyn")
         .when(F.col("PULocationID").between(51, 150), "Queens")
         .when(F.col("PULocationID").between(151, 220), "Manhattan")
         .when(F.col("PULocationID").between(221, 265), "Bronx")
         .otherwise("Unknown"))
    
    # Trip metrics
    df = df \
        .withColumn("trip_duration", 
            (F.unix_timestamp("tpep_dropoff_datetime") - F.unix_timestamp("tpep_pickup_datetime")) / 60) \
        .withColumn("avg_speed", 
            F.when(F.col("trip_duration") > 0, 
                   F.col("trip_distance") / (F.col("trip_duration") / 60)).otherwise(0)) \
        .withColumn("is_airport", F.when(F.col("PULocationID").isin([1, 132, 138]), 1).otherwise(0))
    
    return df

# Apply features
df_features = add_features(df_clean)
print("Features added successfully")
df_features.select("pickup_date", "hour_of_day", "day_of_week", "is_weekend", 
                  "pickup_borough", "trip_distance", "trip_duration", "avg_speed").show(5)

In [None]:
# Aggregate to daily borough level for ML
df_daily = df_features \
    .groupBy("pickup_date", "pickup_borough") \
    .agg(
        F.sum("total_amount").alias("daily_revenue"),
        F.count("*").alias("daily_trips"),
        F.avg("trip_distance").alias("avg_distance"),
        F.avg("trip_duration").alias("avg_duration"),
        F.avg("avg_speed").alias("avg_speed"),
        F.sum("is_weekend").alias("is_weekend"),
        F.sum("is_holiday").alias("is_holiday"),
        F.sum("is_airport").alias("airport_trips")
    ) \
    .orderBy("pickup_date", "pickup_borough")

# Convert to pandas for ML
df_ml = df_daily.toPandas()
df_ml['pickup_date'] = pd.to_datetime(df_ml['pickup_date'])
df_ml = df_ml.sort_values(['pickup_borough', 'pickup_date'])

print(f"ML dataset: {len(df_ml)} rows")
print(f"Date range: {df_ml['pickup_date'].min()} to {df_ml['pickup_date'].max()}")
print(f"Boroughs: {df_ml['pickup_borough'].unique().tolist()}")
df_ml.head()

## 5. Feature Selection

In [None]:
# Add lag features and rolling averages
def add_time_series_features(df, target_cols=['daily_revenue', 'daily_trips']):
    """Add time series features: lags and rolling statistics."""
    df = df.copy()
    
    for borough in df['pickup_borough'].unique():
        mask = df['pickup_borough'] == borough
        borough_data = df[mask].sort_values('pickup_date')
        
        for col in target_cols:
            # Lag features
            df.loc[mask, f'{col}_lag1'] = borough_data[col].shift(1).values
            df.loc[mask, f'{col}_lag7'] = borough_data[col].shift(7).values
            
            # Rolling statistics
            df.loc[mask, f'{col}_ma7'] = borough_data[col].rolling(7, min_periods=1).mean().values
            df.loc[mask, f'{col}_std7'] = borough_data[col].rolling(7, min_periods=1).std().values
    
    return df

df_ml = add_time_series_features(df_ml)
df_ml = df_ml.dropna()

print(f"After adding time series features: {len(df_ml)} rows")
df_ml.head()

In [None]:
# Correlation analysis for feature selection
numeric_cols = df_ml.select_dtypes(include=[np.number]).columns.tolist()
exclude_cols = ['daily_revenue', 'daily_trips']
feature_cols = [c for c in numeric_cols if c not in exclude_cols and 'lag' not in c and 'ma' not in c and 'std' not in c]
feature_cols += [c for c in numeric_cols if 'lag' in c or 'ma' in c]

# Correlation with target
correlations = df_ml[feature_cols + ['daily_revenue']].corr()['daily_revenue'].drop('daily_revenue')
correlations = correlations.abs().sort_values(ascending=False)

print("Feature correlations with daily_revenue:")
print(correlations.head(15))

# Select top features
SELECTED_FEATURES = correlations.head(10).index.tolist()
print(f"\nSelected features ({len(SELECTED_FEATURES)}): {SELECTED_FEATURES}")

In [None]:
# Feature correlation heatmap
plt.figure(figsize=(12, 8))
sns.heatmap(df_ml[SELECTED_FEATURES + ['daily_revenue', 'daily_trips']].corr(), 
            annot=True, cmap='coolwarm', center=0, fmt='.2f')
plt.title('Feature Correlation Heatmap')
plt.tight_layout()
plt.savefig('/tmp/correlation_heatmap.png', dpi=150)
plt.show()

## 6. Model Training with Tests

In [None]:
# Prepare data for training
def prepare_training_data(df, borough, target='daily_revenue'):
    """Prepare train/test split for a specific borough."""
    borough_df = df[df['pickup_borough'] == borough].copy()
    borough_df = borough_df.sort_values('pickup_date')
    
    # Time-based split (last 20% for test)
    split_idx = int(len(borough_df) * 0.8)
    
    X = borough_df[SELECTED_FEATURES].values
    y = borough_df[target].values
    
    X_train, X_test = X[:split_idx], X[split_idx:]
    y_train, y_test = y[:split_idx], y[split_idx:]
    
    # Scale features
    scaler = StandardScaler()
    X_train = scaler.fit_transform(X_train)
    X_test = scaler.transform(X_test)
    
    return X_train, X_test, y_train, y_test, scaler

# Test with one borough
borough = 'Manhattan'
X_train, X_test, y_train, y_test, scaler = prepare_training_data(df_ml, borough)

print(f"Training data shape: {X_train.shape}")
print(f"Test data shape: {X_test.shape}")
print(f"Target range: [{y_train.min():.2f}, {y_train.max():.2f}]")

In [None]:
# Model comparison with cross-validation
def evaluate_model(model, X_train, y_train, X_test, y_test, model_name):
    """Evaluate model with cross-validation and test metrics."""
    
    # Cross-validation on training data
    cv = TimeSeriesSplit(n_splits=5)
    cv_scores = cross_val_score(model, X_train, y_train, cv=cv, scoring='neg_mean_squared_error')
    cv_rmse = np.sqrt(-cv_scores).mean()
    
    # Train on full training data
    model.fit(X_train, y_train)
    
    # Test predictions
    y_pred = model.predict(X_test)
    
    # Test metrics
    test_rmse = np.sqrt(mean_squared_error(y_test, y_pred))
    test_mape = mean_absolute_percentage_error(y_test, y_pred)
    test_r2 = r2_score(y_test, y_pred)
    
    return {
        'model': model_name,
        'cv_rmse': cv_rmse,
        'test_rmse': test_rmse,
        'test_mape': test_mape,
        'test_r2': test_r2,
        'model_obj': model
    }

# Compare models
models_to_test = [
    (GradientBoostingRegressor(n_estimators=100, max_depth=5, random_state=42), 'GradientBoosting'),
    (RandomForestRegressor(n_estimators=100, max_depth=5, random_state=42), 'RandomForest'),
    (Ridge(alpha=1.0), 'Ridge'),
]

results = []
for model, name in models_to_test:
    result = evaluate_model(model, X_train, y_train, X_test, y_test, name)
    results.append(result)
    print(f"{name}: CV_RMSE={result['cv_rmse']:.2f}, Test_RMSE={result['test_rmse']:.2f}, "
          f"Test_MAPE={result['test_mape']:.4f}, Test_R2={result['test_r2']:.4f}")

# Select best model
best_result = min(results, key=lambda x: x['test_rmse'])
print(f"\nBest model: {best_result['model']}")

In [None]:
# Train final models for all boroughs
trained_models = {}
training_results = []

for borough in df_ml['pickup_borough'].unique():
    if borough == 'Unknown':
        continue
        
    print(f"\nTraining models for {borough}...")
    
    # Revenue model
    X_train, X_test, y_train, y_test, scaler = prepare_training_data(df_ml, borough, 'daily_revenue')
    
    if len(X_train) < 50:
        print(f"  Skipping {borough} - insufficient data ({len(X_train)} samples)")
        continue
    
    model_revenue = GradientBoostingRegressor(n_estimators=100, max_depth=5, random_state=42)
    model_revenue.fit(X_train, y_train)
    
    pred_revenue = model_revenue.predict(X_test)
    mape_rev = mean_absolute_percentage_error(y_test, pred_revenue)
    rmse_rev = np.sqrt(mean_squared_error(y_test, pred_revenue))
    
    # Trips model
    X_train_t, X_test_t, y_train_t, y_test_t, scaler_t = prepare_training_data(df_ml, borough, 'daily_trips')
    
    model_trips = GradientBoostingRegressor(n_estimators=100, max_depth=5, random_state=42)
    model_trips.fit(X_train_t, y_train_t)
    
    pred_trips = model_trips.predict(X_test_t)
    mape_trips = mean_absolute_percentage_error(y_test_t, pred_trips)
    rmse_trips = np.sqrt(mean_squared_error(y_test_t, pred_trips))
    
    trained_models[borough] = {
        'revenue_model': model_revenue,
        'trips_model': model_trips,
        'scaler': scaler,
        'features': SELECTED_FEATURES
    }
    
    training_results.append({
        'borough': borough,
        'mape_revenue': mape_rev,
        'rmse_revenue': rmse_rev,
        'mape_trips': mape_trips,
        'rmse_trips': rmse_trips,
        'train_samples': len(X_train),
        'test_samples': len(X_test)
    })
    
    print(f"  Revenue: MAPE={mape_rev:.4f}, RMSE={rmse_rev:.2f}")
    print(f"  Trips: MAPE={mape_trips:.4f}, RMSE={rmse_trips:.2f}")

# Results summary
results_df = pd.DataFrame(training_results)
print("\n" + "="*60)
print("TRAINING RESULTS SUMMARY")
print("="*60)
print(results_df.to_string(index=False))

## 7. Save Models to MinIO

In [None]:
import boto3
from io import BytesIO

s3 = boto3.client(
    's3',
    endpoint_url=CONFIG['minio_endpoint'],
    aws_access_key_id=CONFIG['minio_access_key'],
    aws_secret_access_key=CONFIG['minio_secret_key']
)

# Save each borough's models
for borough, models in trained_models.items():
    model_data = {
        'revenue_model': models['revenue_model'],
        'trips_model': models['trips_model'],
        'scaler': models['scaler'],
        'features': models['features'],
        'borough': borough,
        'version': CONFIG['model_version'],
        'trained_at': datetime.now().isoformat()
    }
    
    model_bytes = joblib.dumps(model_data)
    key = f"taxi-predictor/{CONFIG['model_version']}/{borough.lower().replace(' ', '_')}.joblib"
    
    s3.put_object(Bucket='ml-models', Key=key, Body=model_bytes)
    print(f"Saved {borough} model to s3a://ml-models/{key}")

# Save training results
results_df.to_csv('/tmp/training_results.csv', index=False)
s3.put_object(
    Bucket='ml-models', 
    Key=f"taxi-predictor/{CONFIG['model_version']}/training_results.csv",
    Body=results_df.to_csv(index=False)
)
print(f"\nSaved training results to s3a://ml-models/taxi-predictor/{CONFIG['model_version']}/training_results.csv")

## 8. Push Metrics to Prometheus

In [None]:
def push_metrics_to_prometheus(results_df, config):
    """Push training metrics to Prometheus Pushgateway."""
    registry = CollectorRegistry()
    
    # Create metrics
    mape_revenue_gauge = Gauge(
        'ml_training_mape_revenue',
        'MAPE for revenue prediction',
        ['borough', 'experiment', 'version'],
        registry=registry
    )
    
    mape_trips_gauge = Gauge(
        'ml_training_mape_trips',
        'MAPE for trips prediction',
        ['borough', 'experiment', 'version'],
        registry=registry
    )
    
    rmse_revenue_gauge = Gauge(
        'ml_training_rmse_revenue',
        'RMSE for revenue prediction',
        ['borough', 'experiment', 'version'],
        registry=registry
    )
    
    rmse_trips_gauge = Gauge(
        'ml_training_rmse_trips',
        'RMSE for trips prediction',
        ['borough', 'experiment', 'version'],
        registry=registry
    )
    
    train_samples_gauge = Gauge(
        'ml_training_samples',
        'Number of training samples',
        ['borough', 'experiment', 'version'],
        registry=registry
    )
    
    # Set values
    for _, row in results_df.iterrows():
        labels = {
            'borough': row['borough'].lower().replace(' ', '_'),
            'experiment': config['experiment_name'],
            'version': config['model_version']
        }
        
        mape_revenue_gauge.labels(**labels).set(row['mape_revenue'])
        mape_trips_gauge.labels(**labels).set(row['mape_trips'])
        rmse_revenue_gauge.labels(**labels).set(row['rmse_revenue'])
        rmse_trips_gauge.labels(**labels).set(row['rmse_trips'])
        train_samples_gauge.labels(**labels).set(row['train_samples'])
    
    # Push to gateway
    try:
        push_to_gateway(
            config['pushgateway_url'],
            job=f"nyc_taxi_ml_{config['model_version']}",
            registry=registry
        )
        print(f"Metrics pushed to {config['pushgateway_url']}")
        return True
    except Exception as e:
        print(f"Warning: Could not push metrics: {e}")
        return False

# Push metrics
push_metrics_to_prometheus(results_df, CONFIG)

## 9. Summary

In [None]:
print("="*60)
print("NYC TAXI ML PIPELINE - NOTEBOOK SUMMARY")
print("="*60)
print(f"\nExperiment: {CONFIG['experiment_name']}")
print(f"Version: {CONFIG['model_version']}")
print(f"\nData:")
print(f"  - Raw records: {total_records:,}")
print(f"  - Clean records: {clean_records:,}")
print(f"  - ML dataset rows: {len(df_ml)}")
print(f"  - Features: {len(SELECTED_FEATURES)}")
print(f"\nModels trained: {len(trained_models)} boroughs")
print(f"\nResults:")
print(results_df[['borough', 'mape_revenue', 'mape_trips', 'train_samples']].to_string(index=False))
print(f"\nModels saved to: s3a://ml-models/taxi-predictor/{CONFIG['model_version']}/")
print(f"\nMetrics pushed to: {CONFIG['pushgateway_url']}")
print("="*60)

In [None]:
# Cleanup
spark.stop()
print("Spark session closed.")