In [0]:
# Databricks notebook source
# MAGIC %md
# MAGIC # Travel & Transportation Analytics Pipeline
# MAGIC **Architecture:** Bronze → Silver → Gold  
# MAGIC **Use Cases:** Ride-Demand Forecasting, Dynamic Pricing, Route Efficiency, Driver Utilization, Customer Segmentation  
# MAGIC **Source:** Delta table `workspace.default.uber_data`

# COMMAND ----------

# -------------------------------
# Step 0: Libraries & warnings
# -------------------------------
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql.functions import col, hour, dayofweek
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans
from sklearn.linear_model import LinearRegression
import warnings
warnings.filterwarnings("ignore")

# COMMAND ----------

# -------------------------------
# Step 1: Bronze Layer (Raw Delta table)
# -------------------------------
bronze_df = spark.read.table("workspace.default.uber_data")
bronze_df.show(5)
print("Bronze layer loaded from Delta table!")

# Save as Bronze Delta table (optional)
bronze_df.write.format("delta").mode("overwrite").saveAsTable("workspace.default.bronze_rides")

# COMMAND ----------

# -------------------------------
# Step 2: Silver Layer (Cleaning & enrichment)
# -------------------------------
silver_df = bronze_df.dropna(subset=['pickup_datetime','fare_amount'])
silver_df = silver_df.filter(col('fare_amount') > 0)

# Add features required for analytics
silver_df = silver_df.withColumn("hour", hour(col("pickup_datetime"))) \
                     .withColumn("day_of_week", dayofweek(col("pickup_datetime")))

# Convert to Pandas for downstream analytics
silver_pd = silver_df.toPandas()
if 'key' not in silver_pd.columns:
    silver_pd['key'] = range(len(silver_pd))

# Save Silver Layer
silver_spark = spark.createDataFrame(silver_pd)
silver_spark.write.format("delta").mode("overwrite").saveAsTable("workspace.default.silver_rides")

display(silver_spark.limit(5))
print("Silver layer created!")

# COMMAND ----------

# -------------------------------
# Step 3: Gold Layer (Aggregations for all 5 use cases)
# -------------------------------
silver_pd = silver_spark.toPandas()

# ----- Use Case 1: Ride-Demand Forecasting -----
demand = silver_pd.groupby(['hour','day_of_week']).size().reset_index(name='ride_count')

# ----- Use Case 2: Dynamic Pricing Optimization -----
rides_per_hour = silver_pd.groupby('hour').size().reset_index(name='ride_count')
np.random.seed(42)
rides_per_hour['available_drivers'] = np.random.randint(50,150,len(rides_per_hour))
rides_per_hour['demand_supply_ratio'] = rides_per_hour['ride_count']/rides_per_hour['available_drivers']
avg_fare_hour = silver_pd.groupby('hour')['fare_amount'].mean().reset_index()
pricing_df = pd.merge(rides_per_hour, avg_fare_hour, on='hour')
X = pricing_df[['demand_supply_ratio']]
y = pricing_df['fare_amount']
model = LinearRegression()
model.fit(X, y)
pricing_df['predicted_fare'] = model.predict(X)

# ----- Use Case 3: Route Efficiency Analysis -----
hourly_stats = silver_pd.groupby('hour').agg(
    trip_count=('fare_amount','count'),
    avg_fare=('fare_amount','mean')
).reset_index()
hourly_stats['total_revenue'] = hourly_stats['trip_count']*hourly_stats['avg_fare']

# ----- Use Case 4: Driver Utilization Insights -----
total_drivers = 200
hourly_stats['active_drivers'] = hourly_stats['trip_count'].apply(lambda x: min(x, total_drivers))
hourly_stats['idle_drivers'] = total_drivers - hourly_stats['active_drivers']
hourly_stats['utilization_percent'] = (hourly_stats['active_drivers']/total_drivers)*100

# ----- Use Case 5: Customer Segmentation -----
rider_features = silver_pd.groupby('key').agg(
    trip_count=('fare_amount','count'),
    avg_fare=('fare_amount','mean')
).reset_index()

X_scaled = StandardScaler().fit_transform(rider_features[['trip_count','avg_fare']])
kmeans = KMeans(n_clusters=3, random_state=42)
rider_features['cluster'] = kmeans.fit_predict(X_scaled)

# COMMAND ----------

# ----- Save Gold Layer Tables -----
spark.createDataFrame(demand).write.format("delta").mode("overwrite").saveAsTable("workspace.default.gold_demand")
spark.createDataFrame(pricing_df).write.format("delta").mode("overwrite").saveAsTable("workspace.default.gold_pricing")
spark.createDataFrame(hourly_stats).write.format("delta").mode("overwrite").saveAsTable("workspace.default.gold_route_driver")
spark.createDataFrame(rider_features).write.format("delta").mode("overwrite").saveAsTable("workspace.default.gold_customers")

print("Gold layer created for all 5 use cases!")

# COMMAND ----------

# ----- Preview Gold Tables -----
display(spark.read.table("workspace.default.gold_demand").limit(5))
display(spark.read.table("workspace.default.gold_pricing").limit(5))
display(spark.read.table("workspace.default.gold_route_driver").limit(5))
display(spark.read.table("workspace.default.gold_customers").limit(5))


+-------------------+-----------+-------------------+------------------+-----------------+------------------+-----------------+---------------+
|                key|fare_amount|    pickup_datetime|  pickup_longitude|  pickup_latitude| dropoff_longitude| dropoff_latitude|passenger_count|
+-------------------+-----------+-------------------+------------------+-----------------+------------------+-----------------+---------------+
|2015-05-07 19:52:06|        7.5|2015-05-07 19:52:06|-73.99981689453125|40.73835372924805|   -73.99951171875|40.72321701049805|              1|
|2009-07-17 20:04:56|        7.7|2009-07-17 20:04:56|        -73.994355|        40.728225|         -73.99471|        40.750325|              1|
|2009-08-24 21:45:00|       12.9|2009-08-24 21:45:00|        -74.005043|         40.74077|        -73.962565|        40.772647|              1|
|2009-06-26 08:22:21|        5.3|2009-06-26 08:22:21|        -73.976124|        40.790844|        -73.965316|        40.803349|         

key,fare_amount,pickup_datetime,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count,hour,day_of_week
2015-05-07T19:52:06.000Z,7.5,2015-05-07T19:52:06.000Z,-73.99981689453125,40.73835372924805,-73.99951171875,40.72321701049805,1,19,5
2009-07-17T20:04:56.000Z,7.7,2009-07-17T20:04:56.000Z,-73.994355,40.728225,-73.99471,40.750325,1,20,6
2009-08-24T21:45:00.000Z,12.9,2009-08-24T21:45:00.000Z,-74.005043,40.74077,-73.962565,40.772647,1,21,2
2009-06-26T08:22:21.000Z,5.3,2009-06-26T08:22:21.000Z,-73.976124,40.790844,-73.965316,40.803349,3,8,6
2014-08-28T17:47:00.000Z,16.0,2014-08-28T17:47:00.000Z,-73.925023,40.744085,-73.97308199999999,40.761247,5,17,5


Silver layer created!
Gold layer created for all 5 use cases!


hour,day_of_week,ride_count
0,1,1791
0,2,616
0,3,667
0,4,840
0,5,1015


hour,ride_count,available_drivers,demand_supply_ratio,fare_amount,predicted_fare
0,7844,101,77.66336633663366,11.656279959204488,11.816779866999765
1,5908,142,41.6056338028169,11.663276912660798,12.139615456976184
2,4421,64,69.078125,11.452479077131873,11.893646082849658
3,3254,121,26.89256198347108,11.733678549477563,12.271345959350654
4,2364,110,21.49090909090909,14.14200084602369,12.31970856220658


hour,trip_count,avg_fare,total_revenue,active_drivers,idle_drivers,utilization_percent
0,7844,11.656279959204488,91431.86,200,0,100.0
1,5908,11.663276912660798,68906.64,200,0,100.0
2,4421,11.452479077131873,50631.41,200,0,100.0
3,3254,11.733678549477563,38181.39,200,0,100.0
4,2364,14.14200084602369,33431.69,200,0,100.0


key,trip_count,avg_fare,cluster
2009-10-28T08:19:01.000Z,1,4.9,0
2009-10-28T08:24:00.000Z,1,10.5,0
2009-10-28T08:27:00.000Z,1,7.3,0
2009-10-28T08:45:00.000Z,1,8.5,0
2009-10-28T09:03:00.000Z,1,5.3,0
