In [0]:
# AUTHENTICATION FOR S3 ACCESS

spark.conf.set("spark.hadoop.fs.s3a.aws.credentials.provider","org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", "AKIA3T4PJQNLDBEON3PW")
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "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")
spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3.eu-north-1.amazonaws.com")


In [0]:

# GOLD LAYER        

from pyspark.sql.functions import col, window, avg, min, max, count

spark.sql("USE CATALOG hive_metastore")
spark.sql("USE traffic_db")

silver_df = spark.table("traffic_silver")
display(silver_df.limit(5))
print(" Silver Data Loaded Successfully")


# Gold Aggregations (Hourly Metrics) 

gold_df = (
    silver_df
    .groupBy(
        col("road_segment_id"),
        window(col("event_time"), "1 hour").alias("time_window")
    )
    .agg(
        count("*").alias("records"),
        avg("avg_speed_kmph").alias("avg_speed"),
        min("avg_speed_kmph").alias("min_speed"),
        max("avg_speed_kmph").alias("max_speed"),
        avg("density_veh_per_km").alias("avg_density"),
        avg("flow_veh_per_hr").alias("avg_flow"),
        avg("occupancy_pct").alias("avg_occupancy"),
        avg("avg_wait_time_s").alias("avg_wait_time")
    )
    .select(
        "road_segment_id",
        col("time_window.start").alias("window_start"),
        col("time_window.end").alias("window_end"),
        "records", "avg_speed", "min_speed", "max_speed",
        "avg_density", "avg_flow", "avg_occupancy", "avg_wait_time"
    )
)

display(gold_df.limit(20))
print("Gold Aggregation Completed")


# Save to S3 as GOLD DELTA 

gold_path = "s3://traffic-data-monitoring-project/gold/traffic_hourly_metrics/"

(gold_df.write
     .mode("overwrite")
     .format("delta")
     .save(gold_path))

spark.sql(f"""
CREATE TABLE IF NOT EXISTS traffic_gold_hourly
USING DELTA
LOCATION '{gold_path}'
""")

display(spark.table("traffic_gold_hourly").limit(20))
print("🥇 GOLD TABLE READY")
print("✔ Gold job completed")

In [0]:
# Load Gold Data 
spark.sql("USE traffic_db")                      
gold_df = spark.table("traffic_gold_hourly")     

display(gold_df.limit(20)) 


In [0]:
import pandas as pd

pd_df = gold_df.toPandas()
pd_df['window_start'] = pd.to_datetime(pd_df['window_start'])   # ensure datetime


In [0]:
import seaborn as sns
import matplotlib.pyplot as plt

plt.figure(figsize=(10,6))
sns.heatmap(pd_df.corr(numeric_only=True), annot=True, cmap="coolwarm")
plt.title("Traffic Feature Correlation Heatmap")
plt.show()


In [0]:
# TOP 10
top10 = (pd_df.groupby("road_segment_id")['records']
         .sum()
         .sort_values(ascending=False)
         .head(10)
         .index.tolist())

df_top = pd_df[pd_df['road_segment_id'].isin(top10)]
print("Selected Top Roads:", top10)



In [0]:
import matplotlib.pyplot as plt

plt.figure(figsize=(12,6))

for road in top10:  # ensure exactly 10
    temp = df_top[df_top['road_segment_id'] == road].sort_values("window_start")
    plt.plot(temp['window_start'], temp['avg_speed'], label=road)

plt.title("Average Speed Trend of Top 10 Busiest Roads")
plt.xlabel("Time")
plt.ylabel("Avg Speed (km/h)")
plt.legend()
plt.grid(True)
plt.show()


In [0]:
top10 = (pd_df.groupby("road_segment_id")['records']
         .sum()
         .sort_values(ascending=False)
         .head(10)
         .index.tolist())

df_top = pd_df[pd_df['road_segment_id'].isin(top10)].copy()
print("Selected Roads:", top10)


In [0]:
df_smooth = (df_top.groupby(['road_segment_id',
                             pd.to_datetime(df_top['window_start']).dt.date])['avg_speed']
                      .mean()
                      .reset_index()
                      .rename(columns={'window_start':'date'}))


In [0]:
import matplotlib.pyplot as plt

plt.figure(figsize=(12,6))

for road in df_smooth['road_segment_id'].unique():
    temp = df_smooth[df_smooth['road_segment_id']==road]
    plt.plot(temp['date'], temp['avg_speed'], marker='o', label=road)

plt.title("Avg Speed Trend (Top 10 Roads - Smoothed View)")
plt.xlabel("Date")
plt.ylabel("Avg Speed (kmph)")
plt.legend()
plt.grid(True)
plt.show()


In [0]:
import pandas as pd
import matplotlib.pyplot as plt

# Convert Spark DF to Pandas
pd_df = gold_df.toPandas()

# Compute avg speed for each road
speed_avg = (pd_df.groupby('road_segment_id')['avg_speed']
             .mean()
             .sort_values(ascending=False)
             .head(10))

plt.figure(figsize=(10,6))
bars = plt.bar(speed_avg.index, speed_avg.values, color="skyblue", edgecolor="black")

# Add value labels on top of bars
for bar in bars:
    yval = bar.get_height()
    plt.text(bar.get_x()+bar.get_width()/2, yval+0.05, f"{yval:.2f}",
             ha='center', fontsize=10)

plt.title("Avg Speed Comparison — Top 10 Roads")
plt.ylabel("Average Speed (kmph)")
plt.xticks(rotation=45)
plt.ylim(speed_avg.min()-1, speed_avg.max()+1)  # tighter view to show variation
plt.grid(axis='y', linestyle='--', alpha=0.6)
plt.show()

