In [0]:
spark.sql("USE CATALOG traffic_lab")
spark.sql("USE raw")

# List files in the volume to see the path
display(dbutils.fs.ls("/Volumes/traffic_lab/raw/traffic_files/"))


path,name,size,modificationTime
dbfs:/Volumes/traffic_lab/raw/traffic_files/Metro_Interstate_Traffic_Volume.csv,Metro_Interstate_Traffic_Volume.csv,3237208,1764799651000


In [0]:
file_path = "/Volumes/traffic_lab/raw/traffic_files/Metro_Interstate_Traffic_Volume.csv"

df_raw = (
    spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv(file_path)
)

df_raw.write.mode("overwrite").saveAsTable("raw.metro_interstate_raw")


In [0]:
spark.sql("SELECT COUNT(*) FROM raw.metro_interstate_raw").show()
spark.sql("SELECT * FROM raw.metro_interstate_raw LIMIT 5").show()


+--------+
|COUNT(*)|
+--------+
|   48204|
+--------+

+-------+------+-------+-------+----------+------------+-------------------+-------------------+--------------+
|holiday|  temp|rain_1h|snow_1h|clouds_all|weather_main|weather_description|          date_time|traffic_volume|
+-------+------+-------+-------+----------+------------+-------------------+-------------------+--------------+
|   None|288.28|    0.0|    0.0|        40|      Clouds|   scattered clouds|2012-10-02 09:00:00|          5545|
|   None|289.36|    0.0|    0.0|        75|      Clouds|      broken clouds|2012-10-02 10:00:00|          4516|
|   None|289.58|    0.0|    0.0|        90|      Clouds|    overcast clouds|2012-10-02 11:00:00|          4767|
|   None|290.13|    0.0|    0.0|        90|      Clouds|    overcast clouds|2012-10-02 12:00:00|          5026|
|   None|291.14|    0.0|    0.0|        75|      Clouds|      broken clouds|2012-10-02 13:00:00|          4918|
+-------+------+-------+-------+----------+-----

In [0]:
spark.sql("USE CATALOG traffic_lab")
spark.sql("USE raw")

spark.sql("DESCRIBE TABLE metro_interstate_raw").show()

spark.sql("""
SELECT
  MIN(date_time) AS min_ts,
  MAX(date_time) AS max_ts,
  COUNT(*) AS row_count
FROM metro_interstate_raw
""").show()

spark.sql("""
SELECT
  HOUR(date_time) AS hour_of_day,
  AVG(traffic_volume) AS avg_volume
FROM metro_interstate_raw
GROUP BY HOUR(date_time)
ORDER BY hour_of_day
""").show()


+-------------------+---------+-------+
|           col_name|data_type|comment|
+-------------------+---------+-------+
|            holiday|   string|   NULL|
|               temp|   double|   NULL|
|            rain_1h|   double|   NULL|
|            snow_1h|   double|   NULL|
|         clouds_all|      int|   NULL|
|       weather_main|   string|   NULL|
|weather_description|   string|   NULL|
|          date_time|timestamp|   NULL|
|     traffic_volume|      int|   NULL|
+-------------------+---------+-------+

+-------------------+-------------------+---------+
|             min_ts|             max_ts|row_count|
+-------------------+-------------------+---------+
|2012-10-02 09:00:00|2018-09-30 23:00:00|    48204|
+-------------------+-------------------+---------+

+-----------+------------------+
|hour_of_day|        avg_volume|
+-----------+------------------+
|          0| 834.7810505645557|
|          1|  516.448999511957|
|          2|388.35364041604754|
|          3| 371.09

In [0]:

spark.sql("""
CREATE OR REPLACE TABLE silver.metro_interstate_hourly AS
SELECT
  date_time,
  traffic_volume,
  HOUR(date_time) AS hour_of_day,
  DAYOFWEEK(date_time) AS day_of_week,
  MONTH(date_time) AS month,
  CASE WHEN DAYOFWEEK(date_time) IN (1,7) THEN 1 ELSE 0 END AS is_weekend
FROM raw.metro_interstate_raw
""")

DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

In [0]:
spark.sql("SELECT * FROM silver.metro_interstate_hourly LIMIT 5").show()

+-------------------+--------------+-----------+-----------+-----+----------+
|          date_time|traffic_volume|hour_of_day|day_of_week|month|is_weekend|
+-------------------+--------------+-----------+-----------+-----+----------+
|2012-10-02 09:00:00|          5545|          9|          3|   10|         0|
|2012-10-02 10:00:00|          4516|         10|          3|   10|         0|
|2012-10-02 11:00:00|          4767|         11|          3|   10|         0|
|2012-10-02 12:00:00|          5026|         12|          3|   10|         0|
|2012-10-02 13:00:00|          4918|         13|          3|   10|         0|
+-------------------+--------------+-----------+-----------+-----+----------+



In [0]:
spark.sql("DESCRIBE TABLE raw.metro_interstate_raw").show()


+-------------------+---------+-------+
|           col_name|data_type|comment|
+-------------------+---------+-------+
|            holiday|   string|   NULL|
|               temp|   double|   NULL|
|            rain_1h|   double|   NULL|
|            snow_1h|   double|   NULL|
|         clouds_all|      int|   NULL|
|       weather_main|   string|   NULL|
|weather_description|   string|   NULL|
|          date_time|timestamp|   NULL|
|     traffic_volume|      int|   NULL|
+-------------------+---------+-------+



In [0]:
spark.sql("""
CREATE OR REPLACE TABLE silver.metro_interstate_hourly AS
SELECT
  date_time,
  traffic_volume,
  HOUR(date_time) AS hour_of_day,
  DAYOFWEEK(date_time) AS day_of_week,
  MONTH(date_time) AS month,
  CASE WHEN DAYOFWEEK(date_time) IN (1,7) THEN 1 ELSE 0 END AS is_weekend,
  holiday,
  temp,
  rain_1h,
  snow_1h,
  clouds_all,
  weather_main
FROM raw.metro_interstate_raw
""")

spark.sql("SELECT * FROM silver.metro_interstate_hourly LIMIT 5").show()


+-------------------+--------------+-----------+-----------+-----+----------+-------+------+-------+-------+----------+------------+
|          date_time|traffic_volume|hour_of_day|day_of_week|month|is_weekend|holiday|  temp|rain_1h|snow_1h|clouds_all|weather_main|
+-------------------+--------------+-----------+-----------+-----+----------+-------+------+-------+-------+----------+------------+
|2012-10-02 09:00:00|          5545|          9|          3|   10|         0|   None|288.28|    0.0|    0.0|        40|      Clouds|
|2012-10-02 10:00:00|          4516|         10|          3|   10|         0|   None|289.36|    0.0|    0.0|        75|      Clouds|
|2012-10-02 11:00:00|          4767|         11|          3|   10|         0|   None|289.58|    0.0|    0.0|        90|      Clouds|
|2012-10-02 12:00:00|          5026|         12|          3|   10|         0|   None|290.13|    0.0|    0.0|        90|      Clouds|
|2012-10-02 13:00:00|          4918|         13|          3|   10|   

In [0]:
spark.sql("""
CREATE OR REPLACE TABLE gold.metro_interstate_daily AS
SELECT
  DATE(date_time) AS date,
  AVG(traffic_volume) AS avg_traffic_volume,
  SUM(traffic_volume) AS total_traffic_volume,
  AVG(temp) AS avg_temp,
  MAX(rain_1h) AS max_rain_1h,
  MAX(snow_1h) AS max_snow_1h,
  AVG(clouds_all) AS avg_clouds_all,
  MAX(holiday) AS any_holiday 
FROM silver.metro_interstate_hourly
GROUP BY DATE(date_time)
ORDER BY date
""")

spark.sql("SELECT * FROM gold.metro_interstate_daily LIMIT 5").show()


+----------+------------------+--------------------+------------------+-----------+-----------+------------------+-----------+
|      date|avg_traffic_volume|total_traffic_volume|          avg_temp|max_rain_1h|max_snow_1h|    avg_clouds_all|any_holiday|
+----------+------------------+--------------------+------------------+-----------+-----------+------------------+-----------+
|2012-10-02| 4219.266666666666|               63289|290.40333333333336|        0.0|        0.0|29.133333333333333|       None|
|2012-10-03|           3317.25|               66345|          286.4135|        0.0|        0.0|              3.85|       None|
|2012-10-04|3747.4583333333335|               89939|          289.3575|        0.0|        0.0|16.708333333333332|       None|
|2012-10-05| 4242.545454545455|               93336| 282.0781818181818|        0.0|        0.0|              75.0|       None|
|2012-10-06|3256.9565217391305|               74910|277.74608695652176|        0.0|        0.0| 61.652173913043

In [0]:
spark.sql("USE CATALOG traffic_lab")
spark.sql("USE gold")

df_daily = spark.table("gold.metro_interstate_daily").orderBy("date")
df_daily.printSchema()
df_daily.show(5)


root
 |-- date: date (nullable = true)
 |-- avg_traffic_volume: double (nullable = true)
 |-- total_traffic_volume: long (nullable = true)
 |-- avg_temp: double (nullable = true)
 |-- max_rain_1h: double (nullable = true)
 |-- max_snow_1h: double (nullable = true)
 |-- avg_clouds_all: double (nullable = true)
 |-- any_holiday: string (nullable = true)

+----------+------------------+--------------------+------------------+-----------+-----------+------------------+-----------+
|      date|avg_traffic_volume|total_traffic_volume|          avg_temp|max_rain_1h|max_snow_1h|    avg_clouds_all|any_holiday|
+----------+------------------+--------------------+------------------+-----------+-----------+------------------+-----------+
|2012-10-02| 4219.266666666666|               63289|290.40333333333336|        0.0|        0.0|29.133333333333333|       None|
|2012-10-03|           3317.25|               66345|          286.4135|        0.0|        0.0|              3.85|       None|
|2012-10-0

In [0]:
import pandas as pd

pdf = df_daily.toPandas()
pdf = pdf.dropna()

# Simple time-based split: first 80% train, last 20% test
split_idx = int(len(pdf) * 0.8)
train = pdf.iloc[:split_idx]
test = pdf.iloc[split_idx:]


In [0]:
feature_cols = [
    "avg_temp",
    "max_rain_1h",
    "max_snow_1h",
    "avg_clouds_all"
]

X = pdf[feature_cols]
y = pdf["total_traffic_volume"]


In [0]:
X_train, X_test = train[feature_cols], test[feature_cols]
y_train, y_test = train["total_traffic_volume"], test["total_traffic_volume"]


In [0]:
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_absolute_error, r2_score

model = RandomForestRegressor(
    n_estimators=100,
    random_state=42,
    n_jobs=-1
)
model.fit(X_train, y_train)

y_pred = model.predict(X_test)

mae = mean_absolute_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)
mae, r2


(23277.09252688172, -0.1704175949964637)

We used only weather features (avg_temp, max_rain_1h, etc.). That ignores the biggest drivers of traffic: time patterns (month, weekday/weekend, seasonality). So the model can’t learn “Mondays are heavier than Sundays,” etc., which explains the negative R².

Next step: add time features into the model
Let’s bring in month and a simplified is_weekend into the gold table, then into the model.

In [0]:
spark.sql("""
CREATE OR REPLACE TABLE traffic_lab.gold.metro_interstate_daily AS
SELECT
  DATE(date_time) AS date,
  AVG(traffic_volume) AS avg_traffic_volume,
  SUM(traffic_volume) AS total_traffic_volume,
  AVG(temp) AS avg_temp,
  MAX(rain_1h) AS max_rain_1h,
  MAX(snow_1h) AS max_snow_1h,
  AVG(clouds_all) AS avg_clouds_all,
  MAX(holiday) AS any_holiday,
  MAX(MONTH(date_time)) AS month,
  MAX(CASE WHEN DAYOFWEEK(date_time) IN (1,7) THEN 1 ELSE 0 END) AS is_weekend
FROM  traffic_lab.silver.metro_interstate_hourly
GROUP BY DATE(date_time)
ORDER BY date
""")


DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

In [0]:
df_daily = spark.table("traffic_lab.gold.metro_interstate_daily").orderBy("date")
pdf = df_daily.toPandas().dropna()

split_idx = int(len(pdf) * 0.8)
train = pdf.iloc[:split_idx]
test = pdf.iloc[split_idx:]


In [0]:
feature_cols = [
    "avg_temp",
    "max_rain_1h",
    "max_snow_1h",
    "avg_clouds_all",
    "month",
    "is_weekend"
]

X_train, X_test = train[feature_cols], test[feature_cols]
y_train, y_test = train["total_traffic_volume"], test["total_traffic_volume"]


In [0]:
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_absolute_error, r2_score

model = RandomForestRegressor(
    n_estimators=200,
    random_state=42,
    n_jobs=-1
)
model.fit(X_train, y_train)

y_pred = model.predict(X_test)
mae = mean_absolute_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)
mae, r2


(19405.458911290323, 0.08206851625811507)

: Adding tolls and revenue logic

In [0]:
# Predict for all days (for scenario analysis)
X_all = pdf[feature_cols]
pdf["predicted_volume"] = model.predict(X_all)


In [0]:
car_share = 0.85
truck_share = 0.15

base_car_toll = 3.0
base_truck_toll = 7.0

pdf["cars_base"] = pdf["predicted_volume"] * car_share
pdf["trucks_base"] = pdf["predicted_volume"] * truck_share

pdf["revenue_base"] = (
    pdf["cars_base"] * base_car_toll
    + pdf["trucks_base"] * base_truck_toll
)


In [0]:
weekend_factor = 1.20  # 20% higher tolls on weekends

pdf["car_toll_weekend"] = base_car_toll
pdf["truck_toll_weekend"] = base_truck_toll

pdf.loc[pdf["is_weekend"] == 1, "car_toll_weekend"] = base_car_toll * weekend_factor
pdf.loc[pdf["is_weekend"] == 1, "truck_toll_weekend"] = base_truck_toll * weekend_factor

pdf["revenue_weekend_boost"] = (
    pdf["cars_base"] * pdf["car_toll_weekend"]
    + pdf["trucks_base"] * pdf["truck_toll_weekend"]
)


In [0]:
import pandas as pd
pdf["year"] = pd.to_datetime(pdf["date"]).dt.year

annual_rev = (
    pdf.groupby("year")[["revenue_base", "revenue_weekend_boost"]]
    .sum()
    .reset_index()
)
annual_rev


Unnamed: 0,year,revenue_base,revenue_weekend_boost
0,2012,28286710.0,29631460.0
1,2013,102565800.0,107187300.0
2,2014,59010390.0,61653190.0
3,2015,52927920.0,55128980.0
4,2016,107192800.0,111973600.0
5,2017,115910500.0,121207800.0
6,2018,79440170.0,82906190.0


### Why model demand response (elasticity)?

In reality, drivers react when tolls change. If we increase prices, some users divert to other routes or travel at different times.  
Ignoring this and assuming traffic volume stays fixed would **overstate** revenue gains from higher tolls.

To keep the analysis realistic, I introduce a simple **price elasticity of demand**:

- A 20% toll increase on weekends causes a small % drop in weekend traffic.
- This lets me compare:
  - Base case: flat tolls, higher volumes.
  - Scenario: higher weekend tolls, slightly lower weekend volumes but potentially higher total revenue.

This is closer to how Traffic & Revenue (T&R) analysts evaluate pricing strategies in practice.


In [0]:
import numpy as np

elasticity = -0.2  # demand elasticity wrt price

# % change in toll for weekend days (0.20 = +20%)
pdf["pct_change_toll"] = 0.0
pdf.loc[pdf["is_weekend"] == 1, "pct_change_toll"] = 0.20

# % change in volume = elasticity * % change in toll
pdf["pct_change_volume"] = elasticity * pdf["pct_change_toll"]

# Adjusted volume under weekend-boost scenario
pdf["volume_weekend_elastic"] = pdf["predicted_volume"] * (1 + pdf["pct_change_volume"])


In [0]:
car_share = 0.85
truck_share = 0.15

# base tolls
base_car_toll = 3.0
base_truck_toll = 7.0

# weekend tolls (same as before)
weekend_factor = 1.20
pdf["car_toll_weekend"] = base_car_toll
pdf["truck_toll_weekend"] = base_truck_toll
pdf.loc[pdf["is_weekend"] == 1, "car_toll_weekend"] = base_car_toll * weekend_factor
pdf.loc[pdf["is_weekend"] == 1, "truck_toll_weekend"] = base_truck_toll * weekend_factor

# Base scenario (unchanged)
pdf["cars_base"] = pdf["predicted_volume"] * car_share
pdf["trucks_base"] = pdf["predicted_volume"] * truck_share
pdf["revenue_base"] = (
    pdf["cars_base"] * base_car_toll
    + pdf["trucks_base"] * base_truck_toll
)

# Weekend-boost scenario with elasticity
pdf["cars_weekend_elastic"] = pdf["volume_weekend_elastic"] * car_share
pdf["trucks_weekend_elastic"] = pdf["volume_weekend_elastic"] * truck_share

pdf["revenue_weekend_elastic"] = (
    pdf["cars_weekend_elastic"] * pdf["car_toll_weekend"]
    + pdf["trucks_weekend_elastic"] * pdf["truck_toll_weekend"]
)


In [0]:
pdf["year"] = pd.to_datetime(pdf["date"]).dt.year

annual_rev_elastic = (
    pdf.groupby("year")[["revenue_base", "revenue_weekend_elastic"]]
    .sum()
    .reset_index()
)
annual_rev_elastic


Unnamed: 0,year,revenue_base,revenue_weekend_elastic
0,2012,28286710.0,29308720.0
1,2013,102565800.0,106078100.0
2,2014,59010390.0,61018920.0
3,2015,52927920.0,54600730.0
4,2016,107192800.0,110826300.0
5,2017,115910500.0,119936500.0
6,2018,79440170.0,82074350.0


In [0]:
output_path = "/Volumes/traffic_lab/raw/traffic_files/traffic_daily_export.csv"

pdf_daily_export = pdf[[
    "date",
    "predicted_volume",
    "total_traffic_volume",
    "is_weekend",
    "revenue_base",
    "revenue_weekend_elastic",
    "year"
]]

pdf_daily_export.to_csv(output_path, index=False)


In [0]:
display(dbutils.fs.ls("/Volumes/traffic_lab/raw/traffic_files/"))


path,name,size,modificationTime
dbfs:/Volumes/traffic_lab/raw/traffic_files/Metro_Interstate_Traffic_Volume.csv,Metro_Interstate_Traffic_Volume.csv,3237208,1764799651000
dbfs:/Volumes/traffic_lab/raw/traffic_files/traffic_daily_export.csv,traffic_daily_export.csv,116931,1764804617000
