In [1]:

# Fabric Notebook - read and save it as a Bronze table:
df = spark.read.csv("Files/bronze/fleet_latest.csv", header=True, inferSchema=True)
df.write.mode("overwrite").saveAsTable("bronze_fleet_data")


StatementMeta(, 8cc8c10b-420a-4292-8d0f-995e5b438dfd, 3, Finished, Available, Finished)

Silver layer transformation

In [3]:
#Step 1: Load Bronze Table
df_bronze = spark.read.table("bronze_fleet_data")
df_bronze.show()

StatementMeta(, 2c5e343b-8c97-4384-a03c-d07fbd97df28, 5, Finished, Available, Finished)

+----------+-------------+------------------+-------+--------------+-----+-------------------+----------+-----------+---------+----------+------------+-------------+-------------+----------------------+----------------+--------------------+
|vehicle_id|     latitude|         longitude|bearing|current_status|speed|          timestamp|fuel_level|engine_temp|driver_id|alert_flag|vehicle_type|location_name|tire_pressure|route_congestion_level|traffic_incident|         ingested_at|
+----------+-------------+------------------+-------+--------------+-----+-------------------+----------+-----------+---------+----------+------------+-------------+-------------+----------------------+----------------+--------------------+
|     y2056|  42.40427844|      -71.03585853|  193.0|    STOPPED_AT| NULL|2025-07-10 09:23:50|     36.26|      81.19|  DVR4991|         0|       truck|       Quincy|         30.6|                  High|               0|2025-07-10 09:23:...|
|     y3204|  42.35219092|      -71.

In [4]:
#Step 2: Data Cleaning & Type Casting
from pyspark.sql.functions import col

df_silver = df_bronze \
    .withColumn("speed", col("speed").cast("double")) \
    .withColumn("fuel_level", col("fuel_level").cast("double")) \
    .withColumn("engine_temp", col("engine_temp").cast("double")) \
    .withColumn("tire_pressure", col("tire_pressure").cast("double")) \
    .withColumn("traffic_incident", col("traffic_incident").cast("int")) \
    .withColumn("alert_flag", col("alert_flag").cast("int")) \
    .withColumn("timestamp", col("timestamp").cast("timestamp")) \
    .withColumn("ingested_at", col("ingested_at").cast("timestamp"))



StatementMeta(, 2c5e343b-8c97-4384-a03c-d07fbd97df28, 6, Finished, Available, Finished)

In [5]:
#Step 3: Enrich With Derived Fields
df_silver = df_silver \
    .withColumn("fuel_status", when(col("fuel_level") < 20, "Low").otherwise("Normal")) \
    .withColumn("tire_status", when(col("tire_pressure") < 30, "Low").otherwise("Normal")) \
    .withColumn("is_critical", when((col("fuel_level") < 15) & (col("engine_temp") > 110), 1).otherwise(0))


StatementMeta(, 2c5e343b-8c97-4384-a03c-d07fbd97df28, 7, Finished, Available, Finished)

In [6]:
#Step 4: Save as Silver Table

df_silver.write.mode("overwrite").format("delta").saveAsTable("silver_fleet_data")




StatementMeta(, 2c5e343b-8c97-4384-a03c-d07fbd97df28, 8, Finished, Available, Finished)

Gold layer Transformation

In [7]:
# Create View from silver table

df_silver = spark.read.table("silver_fleet_data")
df_silver.createOrReplaceTempView("silver")


StatementMeta(, 2c5e343b-8c97-4384-a03c-d07fbd97df28, 9, Finished, Available, Finished)

In [8]:
#  Create gold_vehicle_summary table (Aggregate per vehicle)

df_vehicle_summary = spark.sql("""
SELECT 
    vehicle_id,
    vehicle_type,
    COUNT(*) AS total_records,
    ROUND(AVG(speed), 2) AS avg_speed,
    ROUND(AVG(fuel_level), 2) AS avg_fuel_level,
    ROUND(AVG(engine_temp), 2) AS avg_engine_temp,
    MAX(alert_flag) AS was_alert_raised
FROM silver
GROUP BY vehicle_id, vehicle_type
""")
df_vehicle_summary.write.mode("overwrite").format("delta").saveAsTable("gold_vehicle_summary")


StatementMeta(, 2c5e343b-8c97-4384-a03c-d07fbd97df28, 10, Finished, Available, Finished)

In [9]:
#  Create gold_driver_summary table (Aggregate per driver) 

df_driver_summary = spark.sql("""
SELECT 
    driver_id,
    COUNT(*) AS trip_count,
    ROUND(AVG(speed), 2) AS avg_speed,
    ROUND(AVG(engine_temp), 2) AS avg_temp,
    ROUND(AVG(fuel_level), 2) AS avg_fuel,
    SUM(alert_flag) AS total_alerts
FROM silver
GROUP BY driver_id
""")
df_driver_summary.write.mode("overwrite").format("delta").saveAsTable("gold_driver_summary")



StatementMeta(, 2c5e343b-8c97-4384-a03c-d07fbd97df28, 11, Finished, Available, Finished)

In [10]:
#  Create gold_location_congestion table (Monitor congestion by area) 

df_location_congestion = spark.sql("""
SELECT 
    location_name,
    COUNT(*) AS records,
    ROUND(AVG(speed), 2) AS avg_speed,
    ROUND(AVG(route_congestion_level), 2) AS avg_congestion,
    SUM(traffic_incident) AS total_incidents
FROM silver
GROUP BY location_name
""")
df_location_congestion.write.mode("overwrite").format("delta").saveAsTable("gold_location_congestion")


StatementMeta(, 2c5e343b-8c97-4384-a03c-d07fbd97df28, 12, Finished, Available, Finished)

In [11]:
#  Create gold_daily_health_summary table (Daily alert trend for Power BI KPI cards / line charts) 

from pyspark.sql.functions import to_date

df_daily = df_silver.withColumn("date", to_date("ingested_at"))
df_health = df_daily.groupBy("date").agg(
    {"is_critical": "sum", "vehicle_id": "count"}
).withColumnRenamed("sum(is_critical)", "total_critical_events") \
 .withColumnRenamed("count(vehicle_id)", "total_records")

df_health.write.mode("overwrite").format("delta").saveAsTable("gold_daily_health_summary")



StatementMeta(, 2c5e343b-8c97-4384-a03c-d07fbd97df28, 13, Finished, Available, Finished)

In [12]:
#  Create gold_fleet_status_snapshot table (Latest vehicle status — for real-time map view / dashboard)

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

windowSpec = Window.partitionBy("vehicle_id").orderBy(col("timestamp").desc())

df_latest = df_silver.withColumn("rn", row_number().over(windowSpec)) \
                     .filter("rn = 1") \
                     .drop("rn")

df_latest.write.mode("overwrite").format("delta").saveAsTable("gold_fleet_status_snapshot")


StatementMeta(, 2c5e343b-8c97-4384-a03c-d07fbd97df28, 14, Finished, Available, Finished)