# NYC Taxi Fare Model Inference

This notebook demonstrates how to load the trained taxi fare prediction model from Unity Catalog and run inference on new data.


In [None]:
# Setup widgets for model configuration
dbutils.widgets.text("model_name", "mlflow3_demo", "Model Name")
dbutils.widgets.text("model_alias", "challenger", "Model Alias")
dbutils.widgets.text("catalog", "main", "Catalog")
dbutils.widgets.text("schema", "default", "Schema") 
dbutils.widgets.text("features_table", "main.default.features", "Source Data Table (for batch inference)")
dbutils.widgets.text("predictions_table", "main.default.predictions", "Predictions Table Name")

In [None]:
# Get widget values
catalog = dbutils.widgets.get("catalog")
schema = dbutils.widgets.get("schema")
model_name = dbutils.widgets.get("model_name")
model_alias = dbutils.widgets.get("model_alias")
features_table = dbutils.widgets.get("features_table")
predictions_table = dbutils.widgets.get("predictions_table")

# Construct model URI
full_model_name = f"{catalog}.{schema}.{model_name}"
model_uri = f"models:/{full_model_name}@{model_alias}"

print(f"Model URI: {model_uri}")
print(f"Source table for batch inference: {features_table}")

In [None]:
import mlflow
import mlflow.sklearn
from mlflow import MlflowClient
import pandas as pd
import numpy as np
from pyspark.sql import functions as F
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')

# Set MLflow registry URI to Unity Catalog
mlflow.set_registry_uri("databricks-uc")

print(f"MLflow version: {mlflow.__version__}")
print(f"Registry URI: {mlflow.get_registry_uri()}")
print(f"Tracking URI: {mlflow.get_tracking_uri()}")


In [None]:
# Load the trained model from Unity Catalog
print(f"Loading model from Unity Catalog: {model_uri}")

try:
    # Load the model
    loaded_model = mlflow.sklearn.load_model(model_uri)
    print(f"Successfully loaded model: {full_model_name} (alias: {model_alias})")
    
    # Get model information
    model_info = mlflow.models.get_model_info(model_uri)
    print(f"\nModel Information:")
    print(f"  - Run ID: {model_info.run_id}")
    print(f"  - Model UUID: {model_info.model_uuid}")
    print(f"  - MLflow Version: {model_info.mlflow_version}")
    
    # Display model signature if available
    if model_info.signature:
        print(f"\nModel Signature:")
        print(f"  - Input Schema: {model_info.signature.inputs}")
        print(f"  - Output Schema: {model_info.signature.outputs}")
    
    # Check if this is the expected model type
    print(f"\nModel Type: {type(loaded_model).__name__}")
    print(f"Model ready for inference! 🎯")
    
except Exception as e:
    print(f"Error loading model: {str(e)}")
    print("Make sure the model exists in Unity Catalog and you have access permissions")
    raise


In [None]:
# Create sample data for single prediction inference
print("🧪 Creating sample data for inference...")

# The model expects these features (from training):
# ['trip_distance', 'pickup_hour', 'pickup_day_of_week', 'pickup_month', 'trip_duration_minutes']

# Create sample trip scenarios
sample_trips = [
    {
        "trip_distance": 2.5,
        "pickup_hour": 14,  # 2 PM
        "pickup_day_of_week": 2,  # Monday (1=Sunday, 2=Monday, etc.)
        "pickup_month": 6,  # June
        "trip_duration_minutes": 15
    },
    {
        "trip_distance": 0.8,
        "pickup_hour": 8,  # 8 AM rush hour
        "pickup_day_of_week": 3,  # Tuesday
        "pickup_month": 12,  # December
        "trip_duration_minutes": 8
    },
    {
        "trip_distance": 15.2,
        "pickup_hour": 22,  # 10 PM
        "pickup_day_of_week": 7,  # Saturday
        "pickup_month": 7,  # July
        "trip_duration_minutes": 45
    },
    {
        "trip_distance": 5.1,
        "pickup_hour": 18,  # 6 PM
        "pickup_day_of_week": 6,  # Friday
        "pickup_month": 3,  # March
        "trip_duration_minutes": 22
    }
]

# Convert to DataFrame
sample_df = pd.DataFrame(sample_trips)

print(f"Created {len(sample_trips)} sample trips:")
print(sample_df)
print(f"\nFeature columns: {list(sample_df.columns)}")


In [None]:
# Run predictions on sample data
print("🔮 Running predictions on sample trips...")

try:
    # Make predictions
    predictions = loaded_model.predict(sample_df)
    
    # Create results dataframe
    results_df = sample_df.copy()
    results_df['predicted_fare'] = predictions
    results_df['predicted_fare'] = results_df['predicted_fare'].round(2)
    
    print(f"Predictions completed!")
    print(f"\nPrediction Results:")
    print("=" * 80)
    
    # Display results with interpretations
    for idx, row in results_df.iterrows():
        trip_desc = f"Trip {idx+1}: {row['trip_distance']} miles"
        time_desc = f"at {row['pickup_hour']}:00 on "
        
        # Day of week mapping
        days = {1: "Sunday", 2: "Monday", 3: "Tuesday", 4: "Wednesday", 
                5: "Thursday", 6: "Friday", 7: "Saturday"}
        day_name = days.get(row['pickup_day_of_week'], "Unknown")
        
        # Month mapping
        months = {1: "Jan", 2: "Feb", 3: "Mar", 4: "Apr", 5: "May", 6: "Jun",
                 7: "Jul", 8: "Aug", 9: "Sep", 10: "Oct", 11: "Nov", 12: "Dec"}
        month_name = months.get(row['pickup_month'], "Unknown")
        
        print(f"{trip_desc} {time_desc}{day_name} in {month_name}")
        print(f"  Duration: {row['trip_duration_minutes']} minutes")
        print(f"  Predicted Fare: ${row['predicted_fare']}")
        print()
    
    # Display as table
    print("Summary Table:")
    display(results_df)
    
    # Some basic analysis
    avg_fare = results_df['predicted_fare'].mean()
    min_fare = results_df['predicted_fare'].min()
    max_fare = results_df['predicted_fare'].max()
    
    print(f"Prediction Statistics:")
    print(f"  - Average predicted fare: ${avg_fare:.2f}")
    print(f"  - Minimum predicted fare: ${min_fare:.2f}")
    print(f"  - Maximum predicted fare: ${max_fare:.2f}")
    
except Exception as e:
    print(f"Error during prediction: {str(e)}")
    raise


In [None]:
# Single trip prediction example
print("Single Trip Prediction Example")
print("=" * 40)

# Example: A 3.2-mile trip at 3 PM on a Friday in September, taking 18 minutes
single_trip = {
    "trip_distance": 3.2,
    "pickup_hour": 15,  # 3 PM
    "pickup_day_of_week": 6,  # Friday
    "pickup_month": 9,  # September
    "trip_duration_minutes": 18
}

# Convert to DataFrame (model expects DataFrame input)
single_trip_df = pd.DataFrame([single_trip])

# Make prediction
single_prediction = loaded_model.predict(single_trip_df)[0]

print(f"Trip Details:")
print(f"  - Distance: {single_trip['trip_distance']} miles")
print(f"  - Time: 3:00 PM on Friday")
print(f"  - Month: September")  
print(f"  - Duration: {single_trip['trip_duration_minutes']} minutes")
print(f"\nPredicted Fare: ${single_prediction:.2f}")

# Calculate fare per mile
fare_per_mile = single_prediction / single_trip['trip_distance']
print(f"Fare per mile: ${fare_per_mile:.2f}")

# Show the input format for reference
print(f"\nInput format for API calls:")
print(f"Input DataFrame shape: {single_trip_df.shape}")
print(f"Required columns: {list(single_trip_df.columns)}")
display(single_trip_df)


---

## Batch Inference on Real Data

This section demonstrates batch inference using actual data from Unity Catalog.


In [None]:
# Batch inference on real data from Unity Catalog
print(f"Loading data for batch inference from: {features_table}")

try:
    # Load data from Unity Catalog
    batch_df = spark.table(features_table)
    
    print(f"Loaded {batch_df.count():,} rows from {features_table}")
    
    # Prepare features (same feature engineering as training)
    batch_features = batch_df.select(
        # Original columns for reference
        F.col("tpep_pickup_datetime"),
        F.col("tpep_dropoff_datetime"), 
        F.col("fare_amount").alias("actual_fare"),
        
        # Features for prediction
        F.col("trip_distance"),
        F.hour(F.col("tpep_pickup_datetime")).alias("pickup_hour"),
        F.dayofweek(F.col("tpep_pickup_datetime")).alias("pickup_day_of_week"),
        F.month(F.col("tpep_pickup_datetime")).alias("pickup_month"),
        ((F.unix_timestamp(F.col("tpep_dropoff_datetime")) - 
          F.unix_timestamp(F.col("tpep_pickup_datetime"))) / 60).alias("trip_duration_minutes")
    ).filter(
        # Apply same filters as training
        (F.col("fare_amount") > 0) & 
        (F.col("fare_amount") < 1000) &
        (F.col("trip_distance") > 0) & 
        (F.col("trip_distance") < 100) &
        (F.col("trip_duration_minutes") > 0) &
        (F.col("trip_duration_minutes") < 300)
    ).limit(1000)  # Limit for demo purposes
    
    print(f"After filtering: {batch_features.count():,} rows ready for inference")
    print("\nSample of prepared data:")
    display(batch_features.limit(5))
    
except Exception as e:
    print(f"Error loading batch data: {str(e)}")
    print("Make sure the source table exists and contains the expected columns")
    raise


In [None]:
# Run batch predictions
print("Running batch inference...")

# Convert to Pandas for model prediction (sample for performance)
batch_sample = batch_features.sample(fraction=0.1, seed=42).toPandas()

print(f"Running inference on {len(batch_sample):,} samples...")

# Select feature columns for prediction
feature_columns = ['trip_distance', 'pickup_hour', 'pickup_day_of_week', 
                  'pickup_month', 'trip_duration_minutes']

X_batch = batch_sample[feature_columns]

# Make batch predictions
batch_predictions = loaded_model.predict(X_batch)

# Add predictions to the dataframe
batch_sample['predicted_fare'] = batch_predictions
batch_sample['predicted_fare'] = batch_sample['predicted_fare'].round(2)

# Calculate prediction vs actual differences
batch_sample['fare_difference'] = (batch_sample['predicted_fare'] - batch_sample['actual_fare']).round(2)
batch_sample['absolute_error'] = abs(batch_sample['fare_difference']).round(2)
batch_sample['percentage_error'] = (abs(batch_sample['fare_difference']) / batch_sample['actual_fare'] * 100).round(1)

print("Batch predictions completed!")

# Display results
print(f"\nBatch Inference Results (sample of {len(batch_sample):,} trips):")
result_columns = ['trip_distance', 'pickup_hour', 'trip_duration_minutes', 
                 'actual_fare', 'predicted_fare', 'fare_difference', 'percentage_error']
display(batch_sample[result_columns].head(10))


In [None]:
# Get model version by alias using MLflow API
client = MlflowClient()
model_version = client.get_model_version_by_alias(full_model_name, model_alias).version
print(f"Model version for alias '{model_alias}': {model_version}")

In [None]:
# Add metadata before saving predictions
print("Adding prediction metadata...")

# Add prediction metadata columns
current_timestamp = datetime.now()

batch_sample['prediction_timestamp'] = current_timestamp
batch_sample['model_id'] = model_version
batch_sample['model_name'] = full_model_name
batch_sample['model_run_id'] = model_info.run_id

print(f"Metadata added:")
print(f"  - Prediction timestamp: {current_timestamp}")
print(f"  - Model version: {model_version}")
print(f"  - Model name: {full_model_name}")
print(f"  - Model run ID: {model_info.run_id}")

# Show updated columns
print(f"\nUpdated columns ({len(batch_sample.columns)} total):")
print(f"New metadata columns: prediction_timestamp, model_id (model_version), model_name, model_run_id")
print(f"All columns: {list(batch_sample.columns)}")


In [None]:
# Analyze batch inference performance
print("Batch Inference Performance Analysis")
print("=" * 50)

# Calculate performance metrics
mae = batch_sample['absolute_error'].mean()
rmse = np.sqrt((batch_sample['fare_difference'] ** 2).mean())
mean_percentage_error = batch_sample['percentage_error'].mean()
median_percentage_error = batch_sample['percentage_error'].median()

print(f"Performance Metrics:")
print(f"  - Mean Absolute Error (MAE): ${mae:.2f}")
print(f"  - Root Mean Square Error (RMSE): ${rmse:.2f}")
print(f"  - Mean Percentage Error: {mean_percentage_error:.1f}%")
print(f"  - Median Percentage Error: {median_percentage_error:.1f}%")

# Accuracy within ranges
within_1_dollar = (batch_sample['absolute_error'] <= 1.0).mean() * 100
within_2_dollars = (batch_sample['absolute_error'] <= 2.0).mean() * 100
within_5_dollars = (batch_sample['absolute_error'] <= 5.0).mean() * 100

print(f"\nAccuracy Ranges:")
print(f"  - Predictions within $1.00: {within_1_dollar:.1f}%")
print(f"  - Predictions within $2.00: {within_2_dollars:.1f}%")
print(f"  - Predictions within $5.00: {within_5_dollars:.1f}%")

# Show distribution of errors
print(f"\nError Distribution:")
print(batch_sample['absolute_error'].describe())

# Find best and worst predictions
best_predictions = batch_sample.nsmallest(3, 'absolute_error')[['trip_distance', 'actual_fare', 'predicted_fare', 'absolute_error']]
worst_predictions = batch_sample.nlargest(3, 'absolute_error')[['trip_distance', 'actual_fare', 'predicted_fare', 'absolute_error']]

print(f"\nBest Predictions (lowest error):")
display(best_predictions)

print(f"\nWorst Predictions (highest error):")
display(worst_predictions)

## Prediction Table

The prediction table includes these important metadata columns:

- **`prediction_timestamp`**: When the prediction was made
- **`model_id`**: Version of the model used (e.g., "latest", "1", "2")  
- **`model_name`**: Full Unity Catalog model name
- **`model_run_id`**: MLflow run ID for complete traceability (though this isn't needed by the quality monitor)


In [None]:
# Save predictions back to Unity Catalog
print("Save predictions to Unity Catalog")

try:
    # Convert back to Spark DataFrame and save
    predictions_spark_df = spark.createDataFrame(batch_sample)
    
    # Show what we would save
    print(f"Would save {batch_sample.shape[0]:,} predictions to: {predictions_table}")
    print(f"Columns to save: {list(batch_sample.columns)}")
    
    # Check if table exists to determine append vs create
    try:
        # Try to describe the table to see if it exists
        spark.sql(f"DESCRIBE TABLE {predictions_table}")
        write_mode = "append"
        action_verb = "appended to"
        print(f"Table {predictions_table} exists - appending new predictions...")
        
        # Check current row count before appending
        current_count = spark.table(predictions_table).count()
        print(f"Current table has {current_count:,} rows")
        
    except Exception:
        # Table doesn't exist, create it
        write_mode = "overwrite"
        action_verb = "saved to new table"
        print(f"Creating new predictions table: {predictions_table}")
    
    # Save with appropriate mode (append for existing table, overwrite for new)
    predictions_spark_df.write.mode(write_mode).saveAsTable(predictions_table)
    print(f"Predictions {action_verb} {predictions_table}")
    
    # Show final row count
    final_count = spark.table(predictions_table).count()
    print(f"Table now contains {final_count:,} total predictions")
        
except Exception as e:
    print(f"Error saving predictions: {str(e)}")

print(f"\nInference Summary:")
print(f"  - Model loaded from: {model_uri}")
print(f"  - Processed {len(batch_sample):,} trips")
print(f"  - Average prediction error: ${mae:.2f}")
print(f"  - Model ready for production use!")