In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("RealtimeTrafficAnalysis") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2") \
    .getOrCreate()

# spark = SparkSession.builder.appName("RealtimeTrafficAnalysis").getOrCreate()
spark.sparkContext.setLogLevel("WARN")


:: loading settings :: url = jar:file:/opt/bitnami/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /opt/bitnami/spark/.ivy2/cache
The jars for the packages stored in: /opt/bitnami/spark/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-3bc29a4a-f3b9-428d-a81b-bd171c0465a3;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.1.2 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.1.2 in central
	found org.apache.kafka#kafka-clients;2.6.0 in central
	found com.github.luben#zstd-jni;1.4.8-1 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.2 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.commons#commons-pool2;2.6.2 in central
downloading https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.1.2/spark-sql-kafka-0-10_2.12-3.1.2.jar ...
	[SUCCESSFUL ] org.apache.spark#spark-sql-kafka-

In [38]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
import time
import shutil

# Cleanup old checkpoints (optional but good for clean start)
shutil.rmtree("/tmp/checkpoints", ignore_errors=True)

# Stop any running streams
for q in spark.streams.active:
    q.stop()

# Define schema
schema = StructType([
    StructField("sensor_id", StringType(), True),
    StructField("timestamp", StringType(), True),
    StructField("vehicle_count", IntegerType(), True),
    StructField("average_speed", DoubleType(), True),
    StructField("congestion_level", StringType(), True),
])

# Read from Kafka
kafka_df = (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", "kafka1:29092")
    .option("subscribe", "trafic_data")
    .option("startingOffsets", "earliest")
    .load()
)

parsed_df = kafka_df.selectExpr("CAST(value AS STRING) as json_data") \
    .select(from_json(col("json_data"), schema).alias("data")) \
    .select("data.*") \
    .withColumn("timestamp", to_timestamp("timestamp"))

# Console Output for debug
query = traffic_volume.writeStream \
    .outputMode("append") \
    .format("console") \
    .option("truncate", "false") \
    .option("checkpointLocation", "/tmp/checkpoints/console_output") \
    .start()

time.sleep(10)
query.stop()


25/05/13 19:29:11 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/05/13 19:29:11 WARN AdminClientConfig: The configuration 'key.deserializer' was supplied but isn't a known config.
25/05/13 19:29:11 WARN AdminClientConfig: The configuration 'value.deserializer' was supplied but isn't a known config.
25/05/13 19:29:11 WARN AdminClientConfig: The configuration 'enable.auto.commit' was supplied but isn't a known config.
25/05/13 19:29:11 WARN AdminClientConfig: The configuration 'max.poll.records' was supplied but isn't a known config.
25/05/13 19:29:11 WARN AdminClientConfig: The configuration 'auto.offset.reset' was supplied but isn't a known config.
25/05/13 19:29:21 ERROR WriteToDataSourceV2Exec: Data source write support MicroBatchWrite[epoch: 0, writer: ConsoleWriter[numRows=20, truncate=false]] is aborting.
25/05/13 19:29:21 ERROR WriteToDataSourceV2Exec: Data source write support MicroBatchWrite[epoch

In [53]:
# Enforce schema validation
df = df.filter(
    col("sensor_id").isNotNull() & 
    col("timestamp").isNotNull() & 
    (col("vehicle_count") >= 0) & 
    (col("average_speed") > 0)
)

In [None]:
# Handle duplicate records using deduplication (sensor_id + timestamp)
deduplicated_df = validated_df.dropDuplicates(["sensor_id", "timestamp"])


In [29]:
# 1. Traffic Volume per Sensor (5-minute window)
traffic_volume = df.groupBy(
    window("timestamp", "1 minutes"),
    "sensor_id"
).agg(sum("vehicle_count").alias("total_vehicle_count"))


traffic_volume.show(truncate=False)

+------------------------------------------+---------+-------------------+
|window                                    |sensor_id|total_vehicle_count|
+------------------------------------------+---------+-------------------+
|{2025-05-13 14:57:00, 2025-05-13 14:58:00}|105      |530                |
|{2025-05-13 14:59:00, 2025-05-13 15:00:00}|104      |1484               |
|{2025-05-13 15:00:00, 2025-05-13 15:01:00}|103      |433                |
|{2025-05-13 14:57:00, 2025-05-13 14:58:00}|101      |542                |
|{2025-05-13 14:59:00, 2025-05-13 15:00:00}|102      |1414               |
|{2025-05-13 14:59:00, 2025-05-13 15:00:00}|101      |1434               |
|{2025-05-13 14:58:00, 2025-05-13 14:59:00}|105      |1677               |
|{2025-05-13 15:00:00, 2025-05-13 15:01:00}|101      |522                |
|{2025-05-13 14:58:00, 2025-05-13 14:59:00}|101      |1483               |
|{2025-05-13 14:57:00, 2025-05-13 14:58:00}|103      |475                |
|{2025-05-13 14:57:00, 20

In [31]:
# 2. Congestion Hotspots (3 consecutive HIGH congestion levels)
from pyspark.sql.window import Window

window_spec = Window.partitionBy("sensor_id").orderBy("timestamp")
parsed_df = df.withColumn(
    "prev_congestion",
    lag("congestion_level", 1).over(window_spec)
).withColumn(
    "prev2_congestion",
    lag("congestion_level", 2).over(window_spec)
)
congestion_hotspots = parsed_df.filter(
    (col("congestion_level") == "HIGH") &
    (col("prev_congestion") == "HIGH") &
    (col("prev2_congestion") == "HIGH")
).select("sensor_id", "timestamp")

congestion_hotspots.show(truncate=False)

+---------+-------------------+
|sensor_id|timestamp          |
+---------+-------------------+
|101      |2025-05-13T14:57:42|
|101      |2025-05-13T14:59:33|
|102      |2025-05-13T14:58:28|
|102      |2025-05-13T14:59:09|
|102      |2025-05-13T14:59:10|
|102      |2025-05-13T14:59:11|
|103      |2025-05-13T14:57:51|
|104      |2025-05-13T14:57:59|
|104      |2025-05-13T14:58:00|
|104      |2025-05-13T14:58:06|
|104      |2025-05-13T14:59:19|
|104      |2025-05-13T14:59:57|
|104      |2025-05-13T15:00:10|
|104      |2025-05-13T15:00:11|
|105      |2025-05-13T14:57:48|
|105      |2025-05-13T14:57:49|
|105      |2025-05-13T14:58:31|
|105      |2025-05-13T14:58:56|
|105      |2025-05-13T14:58:57|
|105      |2025-05-13T14:58:58|
+---------+-------------------+
only showing top 20 rows



In [32]:
# 3. Average Speed per Sensor (10-minute rolling window)
average_speed = parsed_df.groupBy(
    window("timestamp", "10 minutes", "5 minutes"),
    "sensor_id"
).agg(avg("average_speed").alias("avg_speed"))
average_speed.show(truncate=False)

+------------------------------------------+---------+------------------+
|window                                    |sensor_id|avg_speed         |
+------------------------------------------+---------+------------------+
|{2025-05-13 14:50:00, 2025-05-13 15:00:00}|104      |56.27680851063827 |
|{2025-05-13 14:55:00, 2025-05-13 15:05:00}|105      |53.68239263803681 |
|{2025-05-13 15:00:00, 2025-05-13 15:10:00}|101      |51.91130434782608 |
|{2025-05-13 14:55:00, 2025-05-13 15:05:00}|103      |55.08380368098162 |
|{2025-05-13 15:00:00, 2025-05-13 15:10:00}|103      |57.73545454545455 |
|{2025-05-13 15:00:00, 2025-05-13 15:10:00}|104      |51.16454545454545 |
|{2025-05-13 14:50:00, 2025-05-13 15:00:00}|101      |54.07262411347518 |
|{2025-05-13 14:50:00, 2025-05-13 15:00:00}|102      |58.01             |
|{2025-05-13 14:50:00, 2025-05-13 15:00:00}|103      |54.67007092198583 |
|{2025-05-13 14:55:00, 2025-05-13 15:05:00}|104      |55.58680981595089 |
|{2025-05-13 14:55:00, 2025-05-13 15:0

In [51]:
# **Sudden Speed Drop Detection**: drop > 50% within **2 mins**.
window_spec = Window.partitionBy("sensor_id").orderBy("timestamp")
speed_drop = df.withColumn(
    "prev_speed", 
    lag("average_speed", 1).over(window_spec)
).withColumn(
    "speed_drop_%", 
    round(abs((col("average_speed") - col("prev_speed")) / col("prev_speed") * 100), 2)
).filter(
    col("speed_drop_%") > 50
)
rank_window_spec = Window.partitionBy("sensor_id").orderBy(col("speed_drop_%").desc())
top5_speed_drops = speed_drop.select(
    "*",
    rank().over(rank_window_spec).alias("row_num")
).filter(col("row_num").isin([1,2,3,4])).show(truncate=False)


+---------+-------------------+-------------+-------------+----------------+----------+------------+-------+
|sensor_id|timestamp          |vehicle_count|average_speed|congestion_level|prev_speed|speed_drop_%|row_num|
+---------+-------------------+-------------+-------------+----------------+----------+------------+-------+
|101      |2025-05-13T14:59:39|5            |78.85        |HIGH            |14.78     |433.49      |1      |
|101      |2025-05-13T14:59:25|3            |83.13        |MEDIUM          |16.86     |393.06      |2      |
|101      |2025-05-13T14:58:49|35           |56.35        |MEDIUM          |11.69     |382.04      |3      |
|101      |2025-05-13T14:59:50|39           |63.11        |MEDIUM          |13.98     |351.43      |4      |
|102      |2025-05-13T14:59:28|46           |82.66        |LOW             |11.61     |611.97      |1      |
|102      |2025-05-13T14:57:53|19           |76.71        |MEDIUM          |10.97     |599.27      |2      |
|102      |2025-05-

In [59]:
# **Top 3 Busiest Sensors**: by vehicle count in **last 30 mins**.
from pyspark.sql.functions import *

busiest_sensors = df.groupBy(
    window("timestamp", "2 minutes"), 
    "sensor_id"
).agg(sum("vehicle_count").alias("total_vehicle_count")).orderBy(
    col("total_vehicle_count").desc()
).limit(3)
    
busiest_sensors.show(
    truncate=False
)
busiest_sensors.selectExpr(
    "to_json(struct(*)) AS value"
).write \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("topic", "traffic_analysis") \
  .save()

+------------------------------------------+---------+-------------------+
|window                                    |sensor_id|total_vehicle_count|
+------------------------------------------+---------+-------------------+
|{2025-05-13 14:58:00, 2025-05-13 15:00:00}|105      |3086               |
|{2025-05-13 14:58:00, 2025-05-13 15:00:00}|103      |3069               |
|{2025-05-13 14:58:00, 2025-05-13 15:00:00}|102      |3065               |
+------------------------------------------+---------+-------------------+



AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of Structured Streaming + Kafka Integration Guide.

In [60]:

spark.read.format("kafka").load()


AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of Structured Streaming + Kafka Integration Guide.