In [None]:
# Step 1: Read raw CSVs into DataFrames

# Air Quality
df_airquality = spark.read.csv(
    "wasbs://raw-sensor-data@smartcitystoragebyqui.blob.core.windows.net/Melbourne_Air_Quality_Data.csv",
    header=True,
    inferSchema=True
)
# Traffic
df_traffic = spark.read.csv(
    "wasbs://raw-sensor-data@smartcitystoragebyqui.blob.core.windows.net/Melbourne_Traffic_Data.csv",
    header=True,
    inferSchema=True
)
# Energy Consumption
df_energy = spark.read.csv(
    "wasbs://raw-sensor-data@smartcitystoragebyqui.blob.core.windows.net/Melbourne_Energy_Consumption_Data.csv",
    header=True,
    inferSchema=True
)
# Step 2: Define explicit schema for traffic

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType

traffic_schema = StructType([
    StructField("sensor_id", StringType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("location", StringType(), True),
    StructField("vehicle_count", IntegerType(), True),
    StructField("avg_speed_kmh", IntegerType(), True)
])

df_traffic = spark.read.csv(
    "wasbs://raw-sensor-data@smartcitystoragebyqui.blob.core.windows.net/Melbourne_Traffic_Data.csv",
    header=True,
    schema=traffic_schema
)

df_traffic.printSchema()

# Step 3: Preview the data

df_airquality.show(5)
df_traffic.show(5)
df_energy.show(5)

# Step 4: GroupBy + Aggregations
# Average AQI
avg_air_quality = df_airquality.groupBy("location").agg({"air_quality_index": "avg"}).withColumnRenamed("avg(air_quality_index)", "Average_Air_Quality")
avg_air_quality.show()

# Average Energy Consumption
avg_energy = df_energy.groupBy("location").agg({"energy_consumed_kwh": "avg"}).withColumnRenamed("avg(energy_consumed_kwh)", "Average_Energy_Consumption")
avg_energy.show()

# Average Traffic Density
avg_traffic_density = df_traffic.groupBy("location").agg({"vehicle_count": "avg"}).withColumnRenamed("avg(vehicle_count)", "Average_Traffic_Density")
avg_traffic_density.show()

# Average Speed
avg_speed = df_traffic.groupBy("location").agg({"avg_speed_kmh": "avg"}).withColumnRenamed("avg(avg_speed_kmh)", "Average_Speed_kmh")
avg_speed.show()
