In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, count, avg, sum, max, min, round as spark_round,
    when, lit, countDistinct, desc, asc
)
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

summary_stats = df_bronze.agg(
        spark_round(avg("speed"), 2).alias("avg_speed"),
        spark_round(avg("fuel_level"), 2).alias("avg_fuel"),
        spark_round(avg("engine_temp"), 2).alias("avg_engine_temp"),
        max("speed").alias("max_speed"),
        min("fuel_level").alias("min_fuel"),
        max("engine_temp").alias("max_temp")
    )
display(summary_stats)
summary_stats.write.mode("overwrite").saveAsTable("traffic.traffic_insights.executive_summary_gold")

In [0]:
total_records = df_bronze.count()

df_status_distribution = df_bronze.groupBy("status").agg(
    count("*").alias("record_count"),
    spark_round((count("*") / total_records * 100), 2).alias("percentage")
).orderBy(desc("record_count"))

display(df_status_distribution)
df_status_distribution.write.mode("overwrite").saveAsTable("traffic.traffic_insights.status_gold")

In [0]:
df_alert_distribution = df_bronze.groupBy("alert").agg(
    count("*").alias("alert_count"),
    spark_round((count("*") / total_records * 100), 2).alias("percentage")
).orderBy(desc("alert_count"))

display(df_alert_distribution)
df_alert_distribution.write.mode("overwrite").saveAsTable("traffic.traffic_insights.alert_distribution_gold")

In [0]:
df_city_alerts = df_bronze.groupBy("city", "alert").agg(
    count("*").alias("alert_count")
).orderBy("city", desc("alert_count"))

display(df_city_alerts)
df_city_alerts.write.mode("overwrite").saveAsTable("traffic.traffic_insights.city_alerts_gold")

In [0]:
df_city_summary = df_bronze.groupBy("city").agg(
    count("*").alias("total_records"),
    spark_round(avg("speed"), 2).alias("avg_speed_kmh"),
    spark_round(avg("fuel_level"), 2).alias("avg_fuel_percent"),
    spark_round(avg("engine_temp"), 2).alias("avg_engine_temp_c"),
    max("speed").alias("max_speed_kmh"),
    min("fuel_level").alias("min_fuel_percent")
).orderBy(desc("total_records"))

display(df_city_summary)
df_city_summary.write.mode("overwrite").saveAsTable("traffic.traffic_insights.city_summary_gold")

In [0]:
df_vehicle_performance = df_bronze.groupBy("vehicle_id").agg(
    count("*").alias("total_records"),
    sum(when(col("status") == "maintenance", 1).otherwise(0)).alias("maintenance_count"),
    sum(when(col("alert") == "Overspeed", 1).otherwise(0)).alias("overspeed_count"),
    sum(when(col("alert") == "Low Fuel", 1).otherwise(0)).alias("low_fuel_count"),
    sum(when(col("alert") == "Normal", 1).otherwise(0)).alias("normal_count"),
    spark_round(avg("speed"), 2).alias("avg_speed_kmh"),
    spark_round(avg("fuel_level"), 2).alias("avg_fuel_percent"),
    spark_round(avg("engine_temp"), 2).alias("avg_engine_temp_c")
).orderBy(desc("total_records"))

display(df_vehicle_performance)
df_vehicle_performance.write.mode("overwrite").saveAsTable("traffic.traffic_insights.vehicle_performance_gold")