In [0]:
from pyspark.sql import SparkSession

# Create Spark session
spark = SparkSession.builder.appName("GoldLayerAggregation").getOrCreate()

# Azure MySQL connection properties
mysql_url = "jdbc:mysql://bizserver.mysql.database.azure.com:3306/silver_db"
mysql_properties = {
    "user": "aftab",
    "password": "Amyra@0457",  # Replace with actual password
    "driver": "com.mysql.cj.jdbc.Driver",
    "sslMode": "REQUIRED"
}

In [0]:
# Load Silver Layer Data from MySQL
silver_df = spark.read.jdbc(
    url=mysql_url,
    table="delivery_data_silver",
    properties=mysql_properties
)

# Show sample data
display(silver_df)

delivery_id,vehicle_id,vehicle_type,driver_name,driver_rating,route_name,delivery_time,distance_covered,delivery_status,distance,fuel_efficiency,fuel_consumed,processed_date
5,383,Truck,Joseph Wilson,4.2,New Rogerton - North Jennifer,12.0,84.86,Failed,693.0,12.96,6.54784,2025-04-03T08:29:54Z
6,313,Van,Elizabeth Stout,1.4,North Susan - Bellport,12.0,500.0,Completed,372.0,13.48,37.092,2025-04-03T08:29:54Z
52,405,Car,Mr. Patrick Adams III,1.4,North John - Robinbury,4.72,500.0,Completed,121.0,9.86,50.7099,2025-04-03T08:29:54Z
93,311,Van,Mrs. Lisa Clark,4.9,South Tylerchester - Luisland,12.0,500.0,Failed,470.0,8.71,57.4053,2025-04-03T08:29:54Z
130,100,Truck,Alexander Marsh,3.8,Joshuabury - Nicolestad,12.0,500.0,Failed,898.0,8.05,62.1118,2025-04-03T08:29:54Z
145,256,Truck,Darlene Turner,1.2,Haasland - East Scottville,5.84,141.12,Failed,60.0,12.5,11.2896,2025-04-03T08:29:54Z
152,302,Van,Albert Lynch,2.2,West Robert - South Andrew,3.38,147.51,Completed,763.0,10.52,14.0219,2025-04-03T08:29:54Z
166,8,Car,Michael Bell,1.0,East Edwinfort - Lake Josephland,1.48,84.71,Failed,812.0,13.59,6.23326,2025-04-03T08:29:54Z
173,247,Bus,Christopher Hahn,4.0,Vegamouth - Port Dawnshire,4.32,500.0,Completed,151.0,7.58,65.9631,2025-04-03T08:29:54Z
175,371,Car,Colleen Russell,1.8,South Rachel - Brettton,4.72,162.58,Failed,214.0,9.14,17.7877,2025-04-03T08:29:54Z


In [0]:
from pyspark.sql.functions import avg, count, current_date, col, first

# Route Optimization Analysis
route_analysis_df = silver_df.groupBy("route_name", "vehicle_id", "driver_name").agg(
    count("delivery_id").alias("total_deliveries"),
    avg("delivery_time").alias("avg_delivery_time"),
    avg("fuel_consumed").alias("avg_fuel_consumed"),
    first("delivery_status").alias("delivery_status")  # Pick one representative status
)

# Fleet Performance
fleet_performance_df = silver_df.groupBy("vehicle_id").agg(
    avg("distance_covered").alias("total_distance"),
    avg("fuel_efficiency").alias("fuel_efficiency")
)

# Driver Performance
driver_performance_df = silver_df.groupBy("driver_name").agg(
    count("delivery_id").alias("total_deliveries_by_driver"),
    avg("delivery_time").alias("avg_delivery_time"),
    avg("driver_rating").alias("driver_rating")
)

# Final Aggregation for Gold Table
final_gold_df = route_analysis_df \
    .join(fleet_performance_df, "vehicle_id", "inner") \
    .join(driver_performance_df, "driver_name", "inner") \
    .withColumn("report_date", current_date())

# Arrange columns in the required order
final_gold_df = final_gold_df.select(
    "route_name", 
    "total_deliveries", 
    route_analysis_df.avg_delivery_time.alias("avg_delivery_time"), 
    "avg_fuel_consumed", 
    "vehicle_id", 
    "total_distance", 
    "fuel_efficiency", 
    "driver_name", 
    "total_deliveries_by_driver",
    "driver_rating",
    "delivery_status",
    "report_date"
)

# Show Final Data
display(final_gold_df)


route_name,total_deliveries,avg_delivery_time,avg_fuel_consumed,vehicle_id,total_distance,fuel_efficiency,driver_name,total_deliveries_by_driver,driver_rating,delivery_status,report_date
Barajaschester - East Lindsey,1,4.16,68.9655,417,500.0,7.25,Raven Jenkins,1,1.8,Completed,2025-04-03
Bergview - Laurenside,1,12.0,74.4048,433,500.0,6.72,Allen Robinson,1,4.5,Completed,2025-04-03
Brandonport - Shannonview,1,12.0,43.3651,260,500.0,11.53,Natalie White,1,4.2,Completed,2025-04-03
Buckborough - Randystad,1,12.0,12.7604,338,125.69,9.85,Amy Garcia,1,4.6,Failed,2025-04-03
Christopherburgh - West Alicia,1,1.93,70.7214,435,500.0,7.07,Heather Bryant,2,3.1,Failed,2025-04-03
Coleside - South Jessica,1,12.0,55.371,44,500.0,9.03,Mary Watson,1,1.9,Completed,2025-04-03
Colleenmouth - North Kelsey,1,3.84,45.5788,42,500.0,10.97,Michael Ramirez,1,4.0,Failed,2025-04-03
Courtneyburgh - Zoeside,1,4.61,8.04025,173,104.875,14.41,Latasha Le,1,1.6,Failed,2025-04-03
Crystalhaven - Michellestad,1,12.0,9.81204,62,79.87,8.14,Robert Hart,1,2.6,Completed,2025-04-03
Derekmouth - New Melissachester,1,12.0,80.5153,380,500.0,6.21,Michael West,1,4.9,Failed,2025-04-03


In [0]:
gold_table = "gold_db.transportation_gold"
mysql_properties = {
    "user": "aftab",
    "password": "Amyra@0457",
    "driver": "com.mysql.jdbc.Driver"
}

final_gold_df.write \
    .mode("overwrite") \
    .jdbc("jdbc:mysql://bizserver.mysql.database.azure.com:3306/gold_db", gold_table, properties=mysql_properties)

In [0]:
spark.conf.set("fs.azure.account.key.bizlakegen.dfs.core.windows.net", 
               "lMKG/jMLOV1tWLzdacgxzcR47JrvpDhZEJ2U1xaiVms5XSG47iLUg3BWBngPkqWMSGIxpDhfUNYB+AStvwR0rg==")

# Now Write Data
gold_path = "abfss://gold@bizlakegen.dfs.core.windows.net/transportation_gold"
final_gold_df.write.mode("overwrite").parquet(gold_path)