In [0]:
from pyspark.sql.functions import from_json, col, window, current_timestamp
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

connectionString = dbutils.secrets.get(scope="kv-trafficflow-scope", key="eh-inputstream-connstr")

ehConf = {
  'eventhubs.connectionString' : sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString),
  'eventhubs.eventHubName': 'inputstream',
#   'eventhubs.consumerGroup': '$Default'
}


In [0]:
schema = StructType([
    StructField("plate_number", StringType(), True),
    StructField("from_zone", StringType(), True),
    StructField("to_zone", StringType(), True),
    StructField("speed", IntegerType(), True),
    StructField("timestamp", StringType(), True)
])


In [0]:
# Odczyt surowych danych z Event Hubs
rawStreamDF = spark.readStream \
  .format("eventhubs") \
  .options(**ehConf) \
  .load()

# Parsowanie JSON i rzutowanie typ√≥w
trafficDF = rawStreamDF.select(
    from_json(col("body").cast("string"), schema).alias("data")
).select("data.*")

# Konwersja na typ Timestamp
trafficDF = trafficDF.withColumn("timestamp", col("timestamp").cast("timestamp"))

# Watermark
trafficWithWatermarkDF = trafficDF.withWatermark("timestamp", "15 seconds")


In [0]:
zoneCountsDF = trafficWithWatermarkDF \
    .groupBy(
        window(col("timestamp"), "2 minutes", "30 seconds"), 
        "to_zone"
    ) \
    .count() \
    .withColumnRenamed("count", "vehicle_count")

display(zoneCountsDF.select("window.start", "window.end", "to_zone", "vehicle_count").orderBy("window.start", ascending=False))


start,end,to_zone,vehicle_count
2026-01-09T23:51:00.000+0000,2026-01-09T23:53:00.000+0000,Centrum,10
2026-01-09T23:51:00.000+0000,2026-01-09T23:53:00.000+0000,Nowa-Huta,16
2026-01-09T23:51:00.000+0000,2026-01-09T23:53:00.000+0000,Podgorze,14
2026-01-09T23:51:00.000+0000,2026-01-09T23:53:00.000+0000,Debniki,12
2026-01-09T23:51:00.000+0000,2026-01-09T23:53:00.000+0000,Kazimierz,15
2026-01-09T23:51:00.000+0000,2026-01-09T23:53:00.000+0000,Bronowice,14
2026-01-09T23:50:30.000+0000,2026-01-09T23:52:30.000+0000,Kazimierz,30
2026-01-09T23:50:30.000+0000,2026-01-09T23:52:30.000+0000,Debniki,20
2026-01-09T23:50:30.000+0000,2026-01-09T23:52:30.000+0000,Podgorze,35
2026-01-09T23:50:30.000+0000,2026-01-09T23:52:30.000+0000,Centrum,26


Databricks visualization. Run in Databricks to view.

In [0]:
query = zoneCountsDF.writeStream \
    .outputMode("complete") \
    .format("delta") \
    .option("checkpointLocation", "/mnt/checkpoints/traffic_counts") \
    .queryName("traffic_counts") \
    .start("/mnt/deltaoutput/current_traffic")


In [0]:
display(zoneCountsDF)