In [63]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType, IntegerType, TimestampType
from pyspark.sql.functions import col, from_json, avg, min, max, count, desc, current_timestamp, window, to_timestamp
from pyspark.sql import functions as F
from pyspark.sql import Window

In [None]:
spark = (
    SparkSession 
    .builder 
    .appName("Streaming from Kafka") 
    .config("spark.streaming.stopGracefullyOnShutdown", True)
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0,com.datastax.spark:spark-cassandra-connector_2.12:3.5.0")
    .config("spark.cassandra.connection.host", "127.0.0.1")
    .config("spark.executor.memory", "4g")
    .config("spark.driver.memory", "2g")
    .config("spark.cassandra.auth.username", "cassandra")
    .config("spark.cassandra.auth.password", "cassandra")
    .master("local[*]") 
    .getOrCreate()
)

25/01/31 11:40:38 WARN Utils: Your hostname, Ala resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/01/31 11:40:38 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/ala/.ivy2/cache
The jars for the packages stored in: /home/ala/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
com.datastax.spark#spark-cassandra-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-12a52257-1135-43b2-b610-eba4f3b6a221;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.0 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.3 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in cent

In [2]:
spark

In [3]:
schema_vp = StructType([
    StructField("vehicle_type", StringType(), True),
    StructField("nextStop", StringType(), True),
    StructField("VP", StructType([
        StructField("desi", StringType(), True),
        StructField("dir", StringType(), True),
        StructField("oper", IntegerType(), True),
        StructField("veh", IntegerType(), True),
        StructField("tst", StringType(), True),
        StructField("tsi", LongType(), True),
        StructField("spd", DoubleType(), True),
        StructField("hdg", IntegerType(), True),
        StructField("lat", DoubleType(), True),
        StructField("long", DoubleType(), True),
        StructField("acc", DoubleType(), True),
        StructField("dl", IntegerType(), True),
        StructField("odo", IntegerType(), True),  # correction ici
        StructField("drst", IntegerType(), True),
        StructField("oday", StringType(), True),
        StructField("jrn", IntegerType(), True),
        StructField("line", IntegerType(), True),
        StructField("start", StringType(), True),
        StructField("loc", StringType(), True),
        StructField("stop", IntegerType(), True),
        StructField("route", StringType(), True),
        StructField("occu", IntegerType(), True)
    ]))
])

In [4]:
kafka_df_vp = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "vp") \
    .option("startingOffsets","earliest") \
    .option("enable.auto.create.topics", "false") \
    .load()

In [5]:
kafka_json_df_vp = kafka_df_vp.withColumn("value", expr("cast(value as string)"))

In [6]:
streaming_df_vp = kafka_json_df_vp.selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), schema_vp).alias("data"))

In [7]:
streaming_df_vp.printSchema()

root
 |-- data: struct (nullable = true)
 |    |-- vehicle_type: string (nullable = true)
 |    |-- nextStop: string (nullable = true)
 |    |-- VP: struct (nullable = true)
 |    |    |-- desi: string (nullable = true)
 |    |    |-- dir: string (nullable = true)
 |    |    |-- oper: integer (nullable = true)
 |    |    |-- veh: integer (nullable = true)
 |    |    |-- tst: string (nullable = true)
 |    |    |-- tsi: long (nullable = true)
 |    |    |-- spd: double (nullable = true)
 |    |    |-- hdg: integer (nullable = true)
 |    |    |-- lat: double (nullable = true)
 |    |    |-- long: double (nullable = true)
 |    |    |-- acc: double (nullable = true)
 |    |    |-- dl: integer (nullable = true)
 |    |    |-- odo: integer (nullable = true)
 |    |    |-- drst: integer (nullable = true)
 |    |    |-- oday: string (nullable = true)
 |    |    |-- jrn: integer (nullable = true)
 |    |    |-- line: integer (nullable = true)
 |    |    |-- start: string (nullable = true)
 | 

In [8]:
flattened_df_vp = streaming_df_vp.select(
    col("data.vehicle_type"),
    col("data.nextStop"),
    col("data.VP.desi"),
    col("data.VP.dir"),
    col("data.VP.oper"),
    col("data.VP.veh"),
    col("data.VP.tst"),
    col("data.VP.tsi"),
    col("data.VP.spd"),
    col("data.VP.hdg"),
    col("data.VP.lat"),
    col("data.VP.long"),
    col("data.VP.acc"),
    col("data.VP.dl"),
    col("data.VP.odo"),
    col("data.VP.drst"),
    col("data.VP.oday"),
    col("data.VP.jrn"),
    col("data.VP.line"),
    col("data.VP.start"),
    col("data.VP.loc"),
    col("data.VP.stop"),
    col("data.VP.route"),
    col("data.VP.occu")
)

In [9]:
flattened_df_vp.printSchema()


root
 |-- vehicle_type: string (nullable = true)
 |-- nextStop: string (nullable = true)
 |-- desi: string (nullable = true)
 |-- dir: string (nullable = true)
 |-- oper: integer (nullable = true)
 |-- veh: integer (nullable = true)
 |-- tst: string (nullable = true)
 |-- tsi: long (nullable = true)
 |-- spd: double (nullable = true)
 |-- hdg: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- acc: double (nullable = true)
 |-- dl: integer (nullable = true)
 |-- odo: integer (nullable = true)
 |-- drst: integer (nullable = true)
 |-- oday: string (nullable = true)
 |-- jrn: integer (nullable = true)
 |-- line: integer (nullable = true)
 |-- start: string (nullable = true)
 |-- loc: string (nullable = true)
 |-- stop: integer (nullable = true)
 |-- route: string (nullable = true)
 |-- occu: integer (nullable = true)



In [10]:
flattened_df_vp_clean = flattened_df_vp.fillna({
    'vehicle_type': 'Unknown',
    'nextStop': 'Unknown',
    'desi': 'Unknown',
    'dir': 'Unknown',
    'oper': 0,
    'veh': 0,
    'tst': 'Unknown',
    'tsi': 0,
    'spd': 0.0,
    'hdg': 0,
    'lat': 0.0,
    'long': 0.0,
    'acc': 0.0,
    'dl': 0,
    'odo': 0,
    'drst': 0,
    'oday': 'Unknown',
    'jrn': 0,
    'line': 0,
    'start': 'Unknown',
    'loc': 'Unknown',
    'stop': 0,
    'route': 'Unknown',
    'occu': 0
})

In [12]:
flattened_df_vp_clean = flattened_df_vp_clean.withColumn("vehicle_type", F.col("vehicle_type").cast(StringType())) \
    .withColumn("nextstop", F.col("nextStop").cast(StringType())) \
    .withColumn("desi", F.col("desi").cast(StringType())) \
    .withColumn("dir", F.col("dir").cast(StringType())) \
    .withColumn("oper", F.col("oper").cast(IntegerType())) \
    .withColumn("veh", F.col("veh").cast(IntegerType())) \
    .withColumn("tst", F.col("tst").cast(StringType())) \
    .withColumn("tsi", F.col("tsi").cast(LongType())) \
    .withColumn("spd", (F.col("spd") * 3.6).cast(DoubleType())) \
    .withColumn("hdg", F.col("hdg").cast(IntegerType())) \
    .withColumn("lat", F.col("lat").cast(DoubleType())) \
    .withColumn("long", F.col("long").cast(DoubleType())) \
    .withColumn("acc", F.col("acc").cast(DoubleType())) \
    .withColumn("dl", F.col("dl").cast(IntegerType())) \
    .withColumn("odo", F.col("odo").cast(IntegerType())) \
    .withColumn("drst", F.col("drst").cast(IntegerType())) \
    .withColumn("oday", F.col("oday").cast(StringType())) \
    .withColumn("jrn", F.col("jrn").cast(IntegerType())) \
    .withColumn("line", F.col("line").cast(IntegerType())) \
    .withColumn("start", F.col("start").cast(StringType())) \
    .withColumn("loc", F.col("loc").cast(StringType())) \
    .withColumn("stop", F.col("stop").cast(IntegerType())) \
    .withColumn("route", F.col("route").cast(StringType())) \
    .withColumn("occu", F.col("occu").cast(IntegerType())).select("*")

In [13]:
flattened_df_vp_clean.printSchema()

root
 |-- vehicle_type: string (nullable = false)
 |-- nextstop: string (nullable = false)
 |-- desi: string (nullable = false)
 |-- dir: string (nullable = false)
 |-- oper: integer (nullable = false)
 |-- veh: integer (nullable = false)
 |-- tst: string (nullable = false)
 |-- tsi: long (nullable = false)
 |-- spd: double (nullable = false)
 |-- hdg: integer (nullable = false)
 |-- lat: double (nullable = false)
 |-- long: double (nullable = false)
 |-- acc: double (nullable = false)
 |-- dl: integer (nullable = false)
 |-- odo: integer (nullable = false)
 |-- drst: integer (nullable = false)
 |-- oday: string (nullable = false)
 |-- jrn: integer (nullable = false)
 |-- line: integer (nullable = false)
 |-- start: string (nullable = false)
 |-- loc: string (nullable = false)
 |-- stop: integer (nullable = false)
 |-- route: string (nullable = false)
 |-- occu: integer (nullable = false)



In [15]:

flattened_df_vp_clean = flattened_df_vp_clean.withColumn(
    "tst", F.to_timestamp(F.col("tst"), "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") 
)


In [16]:
#avg_speed_df = flattened_df_vp_clean.groupBy("vehicle_type").agg(avg("spd").alias("avg_speed"))

In [18]:
flattened_df_vp_clean.printSchema() 

root
 |-- vehicle_type: string (nullable = false)
 |-- nextstop: string (nullable = false)
 |-- desi: string (nullable = false)
 |-- dir: string (nullable = false)
 |-- oper: integer (nullable = false)
 |-- veh: integer (nullable = false)
 |-- tst: timestamp (nullable = true)
 |-- tsi: long (nullable = false)
 |-- spd: double (nullable = false)
 |-- hdg: integer (nullable = false)
 |-- lat: double (nullable = false)
 |-- long: double (nullable = false)
 |-- acc: double (nullable = false)
 |-- dl: integer (nullable = false)
 |-- odo: integer (nullable = false)
 |-- drst: integer (nullable = false)
 |-- oday: string (nullable = false)
 |-- jrn: integer (nullable = false)
 |-- line: integer (nullable = false)
 |-- start: string (nullable = false)
 |-- loc: string (nullable = false)
 |-- stop: integer (nullable = false)
 |-- route: string (nullable = false)
 |-- occu: integer (nullable = false)



In [19]:
flattened_df_vp_clean.printSchema()

root
 |-- vehicle_type: string (nullable = false)
 |-- nextstop: string (nullable = false)
 |-- desi: string (nullable = false)
 |-- dir: string (nullable = false)
 |-- oper: integer (nullable = false)
 |-- veh: integer (nullable = false)
 |-- tst: timestamp (nullable = true)
 |-- tsi: long (nullable = false)
 |-- spd: double (nullable = false)
 |-- hdg: integer (nullable = false)
 |-- lat: double (nullable = false)
 |-- long: double (nullable = false)
 |-- acc: double (nullable = false)
 |-- dl: integer (nullable = false)
 |-- odo: integer (nullable = false)
 |-- drst: integer (nullable = false)
 |-- oday: string (nullable = false)
 |-- jrn: integer (nullable = false)
 |-- line: integer (nullable = false)
 |-- start: string (nullable = false)
 |-- loc: string (nullable = false)
 |-- stop: integer (nullable = false)
 |-- route: string (nullable = false)
 |-- occu: integer (nullable = false)



In [20]:
fleet_analysis = flattened_df_vp_clean \
    .withWatermark("tst", "10 minutes") \
    .groupBy(
        F.col("vehicle_type"),
        F.window(F.col("tst"), "5 minutes", "2 minutes")
    ) \
    .agg(
        F.avg("spd").alias("avg_speed"),
        F.count("veh").alias("active_vehicles"),
        F.max("spd").alias("max_speed"),
        F.min("spd").alias("min_speed"),
        F.avg("occu").alias("avg_occupancy")
    ) \
    .select(
        F.col("vehicle_type"),
        F.col("window.start").alias("window_start"),
        F.col("window.end").alias("window_end"),
        F.col("avg_speed"),
        F.col("active_vehicles"),
        F.col("max_speed"),
        F.col("min_speed"),
        F.col("avg_occupancy")
    )


In [21]:
fleet_analysis.printSchema()

root
 |-- vehicle_type: string (nullable = false)
 |-- window_start: timestamp (nullable = true)
 |-- window_end: timestamp (nullable = true)
 |-- avg_speed: double (nullable = true)
 |-- active_vehicles: long (nullable = false)
 |-- max_speed: double (nullable = true)
 |-- min_speed: double (nullable = true)
 |-- avg_occupancy: double (nullable = true)



In [22]:
flattened_df_vp_clean = flattened_df_vp_clean.filter(flattened_df_vp_clean.tst.isNotNull())

In [23]:
flattened_df_vp_clean.printSchema()

root
 |-- vehicle_type: string (nullable = false)
 |-- nextstop: string (nullable = false)
 |-- desi: string (nullable = false)
 |-- dir: string (nullable = false)
 |-- oper: integer (nullable = false)
 |-- veh: integer (nullable = false)
 |-- tst: timestamp (nullable = true)
 |-- tsi: long (nullable = false)
 |-- spd: double (nullable = false)
 |-- hdg: integer (nullable = false)
 |-- lat: double (nullable = false)
 |-- long: double (nullable = false)
 |-- acc: double (nullable = false)
 |-- dl: integer (nullable = false)
 |-- odo: integer (nullable = false)
 |-- drst: integer (nullable = false)
 |-- oday: string (nullable = false)
 |-- jrn: integer (nullable = false)
 |-- line: integer (nullable = false)
 |-- start: string (nullable = false)
 |-- loc: string (nullable = false)
 |-- stop: integer (nullable = false)
 |-- route: string (nullable = false)
 |-- occu: integer (nullable = false)



In [41]:
#window_spec = Window.partitionBy("line")

In [54]:
df_analysis_traffic = flattened_df_vp_clean \
    .withWatermark("tst", "10 minutes") \
    .groupBy(F.window(F.col("tst"), "5 minutes", "2 minutes"), "line") \
    .agg(
        avg("spd").alias("vitesse_moyenne"),
        min("spd").alias("vitesse_min"),
        max("spd").alias("vitesse_max"),
        avg("dl").alias("retard_moyen"),
        count("veh").alias("nb_vehicules_actifs")
    ).select(
        F.col("line"),
        F.col("window.start").alias("window_start"),
        F.col("window.end").alias("window_end"),
        F.col("vitesse_moyenne"),
        F.col("vitesse_min"),
        F.col("vitesse_max"),
        F.col("retard_moyen"),
        F.col("nb_vehicules_actifs")
    )

In [55]:
df_analysis_traffic.printSchema()

root
 |-- line: integer (nullable = false)
 |-- window_start: timestamp (nullable = true)
 |-- window_end: timestamp (nullable = true)
 |-- vitesse_moyenne: double (nullable = true)
 |-- vitesse_min: double (nullable = true)
 |-- vitesse_max: double (nullable = true)
 |-- retard_moyen: double (nullable = true)
 |-- nb_vehicules_actifs: long (nullable = false)



In [56]:
df_analysis_traffic = df_analysis_traffic.withColumn("nb_vehicules_actifs", col("nb_vehicules_actifs").cast("int"))

In [60]:
df_analysis_traffic.printSchema()

root
 |-- line: integer (nullable = false)
 |-- window_start: timestamp (nullable = true)
 |-- window_end: timestamp (nullable = true)
 |-- vitesse_moyenne: double (nullable = true)
 |-- vitesse_min: double (nullable = true)
 |-- vitesse_max: double (nullable = true)
 |-- retard_moyen: double (nullable = true)
 |-- nb_vehicules_actifs: integer (nullable = false)



In [61]:
df_analysis_traffic.writeStream \
    .format("org.apache.spark.sql.cassandra") \
    .option("keyspace", "test") \
    .option("table", "traffic_analysis") \
    .outputMode("append") \
    .option("confirm.truncate", "true") \
    .option("checkpointLocation", "checkpoint/check__Line") \
    .start() \
    .awaitTermination()

25/01/31 12:12:29 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/01/31 12:12:29 WARN StreamingQueryManager: Stopping existing streaming query [id=3b721cae-39bc-4124-8980-8f221730d012, runId=91154929-a4ce-43e8-8469-c8e386f64869], as a new run is being started.
25/01/31 12:12:29 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
25/01/31 12:12:36 WARN HDFSBackedStateStoreProvider: The state for version 14 doesn't exist in loadedMaps. Reading snapshot file and delta files if needed...Note that this is normal for the first batch of starting query.
25/01/31 12:12:36 WARN HDFSBackedStateStoreProvider: The state for version 14 doesn't exist in loadedMaps. Reading snapshot file and delta files if needed...Note that this is normal for the first batch of starting query.
25/01/31 12:12:36 W

-------------------------------------------
Batch: 31
-------------------------------------------
+----+------------+----------+---------------+-----------+-----------+------------+-------------------+
|line|window_start|window_end|vitesse_moyenne|vitesse_min|vitesse_max|retard_moyen|nb_vehicules_actifs|
+----+------------+----------+---------------+-----------+-----------+------------+-------------------+
+----+------------+----------+---------------+-----------+-----------+------------+-------------------+



25/01/31 12:12:37 WARN HDFSBackedStateStoreProvider: The state for version 14 doesn't exist in loadedMaps. Reading snapshot file and delta files if needed...Note that this is normal for the first batch of starting query.
25/01/31 12:12:37 WARN HDFSBackedStateStoreProvider: The state for version 14 doesn't exist in loadedMaps. Reading snapshot file and delta files if needed...Note that this is normal for the first batch of starting query.
25/01/31 12:12:37 WARN HDFSBackedStateStoreProvider: The state for version 14 doesn't exist in loadedMaps. Reading snapshot file and delta files if needed...Note that this is normal for the first batch of starting query.
25/01/31 12:12:37 WARN HDFSBackedStateStoreProvider: The state for version 14 doesn't exist in loadedMaps. Reading snapshot file and delta files if needed...Note that this is normal for the first batch of starting query.
25/01/31 12:12:37 WARN HDFSBackedStateStoreProvider: The state for version 14 doesn't exist in loadedMaps. Reading s

-------------------------------------------
Batch: 32
-------------------------------------------
+----+-------------------+-------------------+------------------+--------------------+------------------+-------------------+-------------------+
|line|       window_start|         window_end|   vitesse_moyenne|         vitesse_min|       vitesse_max|       retard_moyen|nb_vehicules_actifs|
+----+-------------------+-------------------+------------------+--------------------+------------------+-------------------+-------------------+
| 945|2025-01-31 10:50:00|2025-01-31 10:55:00|              25.5|               6.516|            40.464| -81.11111111111111|                  9|
| 221|2025-01-31 10:50:00|2025-01-31 10:55:00| 35.74600000000001|               6.264|            77.652| -68.61111111111111|                 18|
| 964|2025-01-31 10:50:00|2025-01-31 10:55:00|18.801391304347824|                 0.0|37.943999999999996| -78.08695652173913|                 23|
| 941|2025-01-31 10:55:00|



-------------------------------------------
Batch: 33
-------------------------------------------
+----+------------+----------+---------------+-----------+-----------+------------+-------------------+
|line|window_start|window_end|vitesse_moyenne|vitesse_min|vitesse_max|retard_moyen|nb_vehicules_actifs|
+----+------------+----------+---------------+-----------+-----------+------------+-------------------+
+----+------------+----------+---------------+-----------+-----------+------------+-------------------+



                                                                                

-------------------------------------------
Batch: 34
-------------------------------------------
+----+------------+----------+---------------+-----------+-----------+------------+-------------------+
|line|window_start|window_end|vitesse_moyenne|vitesse_min|vitesse_max|retard_moyen|nb_vehicules_actifs|
+----+------------+----------+---------------+-----------+-----------+------------+-------------------+
+----+------------+----------+---------------+-----------+-----------+------------+-------------------+



ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/home/ala/.local/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/ala/.local/lib/python3.10/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [51]:
fleet_analysis.writeStream \
    .format("org.apache.spark.sql.cassandra") \
    .option("keyspace", "test") \
    .option("table", "fleet_performance") \
    .outputMode("append") \
    .option("checkpointLocation", "checkpoint/spark-avrge") \
    .start() \
    .awaitTermination()

25/01/30 22:48:15 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/01/30 22:48:15 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/home/ala/.local/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/ala/.local/lib/python3.10/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

                                                                                