In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
from pyspark.sql.functions import col, window, avg

# define the schema for traffic JSON files
traffic_schema = StructType([
    StructField("city", StringType()),
    StructField("road", StringType()),
    StructField("speed", IntegerType()),
    StructField("congestion", IntegerType()),
    StructField("timestamp", TimestampType())
])

# set the path to Volume
input_path = "/Volumes/main/city_insights/input_traffic/"

# read streaming data from the JSON files in the Volume
traffic_stream = (
    spark.readStream
    .schema(traffic_schema)
    .option("maxFilesPerTrigger", 1)  # Useful for demo/testing; adjust as needed
    .json(input_path)
)

# process/aggregate
agg_traffic = (
    traffic_stream
    .groupBy(
        window(col("timestamp"), "5 minutes"),
        col("city")
    )
    .agg(
        avg("speed").alias("avg_speed"),
        avg("congestion").alias("avg_congestion")
    )
)

agg_traffic.dataType

# write to a Unity Catalog Delta table (or another location)
output_table = "city_insights.traffic_agg"
checkpoint_path = "/Volumes/city_insights/traffic_agg_from_file"

agg_traffic.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", checkpoint_path) \
    .toTable(output_table)

# Step 6: For live monitoring, in another cell use:
# display(spark.table(output_table))
