Ce notebook de travail est utilisé pour concevoir le temps de parcours réel en utilisant les données du topic Kafka, faire une anti-jointure, et le rebalancer dans une nouvelle topic kafka.

In [5]:
from kafka import KafkaConsumer, TopicPartition
from kafka.consumer.fetcher import log
import json


def get_end_offsets(consumer, topic) -> dict:
    partitions_for_topic = consumer.partitions_for_topic(topic)
    if partitions_for_topic:
        partitions = []
        for partition in consumer.partitions_for_topic(topic):
            partitions.append(TopicPartition(topic, partition))
        # https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html#kafka.KafkaConsumer.end_offsets
        # Get the last offset for the given partitions. The last offset of a partition is the offset of the upcoming message, i.e. the offset of the last available message + 1.
        end_offsets = consumer.end_offsets(partitions)
        return end_offsets

In [13]:
def grab_topic_gare(gare: str = "87271460", num : int = 2):
    last_n_msg = num
    # kafka_server = "35.180.29.24:9092"
    kafka_server = "13.37.146.224:9092"
    # consumer
    consumer = KafkaConsumer(
        bootstrap_servers=kafka_server,
        consumer_timeout_ms=10000)
    end_offsets = get_end_offsets(consumer, 'rer-b-'+gare)
    consumer.assign([*end_offsets])
    for key_partition, value_end_offset in end_offsets.items():
        new_calculated_offset = value_end_offset - last_n_msg
        new_offset = new_calculated_offset if new_calculated_offset >= 0 else 0
        consumer.seek(key_partition, new_offset)

    for msg in consumer:
        return msg

In [14]:
def forgiving_json_deserializer(v):
    if v is None:
        try:
            return json.loads(v.encode('utf-8'))
        except json.decoder.JSONDecodeError:
            log.exception('Unable to decode: %s', v)
        return None

In [80]:
content_1 = grab_topic_gare("87271411", 1)
content_2 = grab_topic_gare("87271411", 2)

In [81]:
import pyspark
from pyspark.sql import *

In [82]:
from pyspark.shell import sc

spark = SparkSession.builder\
    .appName("StructuredNetworkWordCount")\
    .getOrCreate()
df_previous = spark.read.json(sc.parallelize([json.loads(content_1[6])]))
df_latest = spark.read.json(sc.parallelize([json.loads(content_2[6])]))

22/06/25 18:24:10 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [83]:
df_latest.show()

+----------------+---------+----+----+----+------+--------+-------------------+
|            date|direction|etat|miss|mode|   num|    term|      time_recorded|
+----------------+---------+----+----+----+------+--------+-------------------+
|25/06/2022 18:25|        0|    |SOTE|   R|SOTE28|87758722|2022-06-25 18:23:01|
|25/06/2022 18:31|        1|    |PITA|   R|PITA30|87758896|2022-06-25 18:23:01|
|25/06/2022 18:35|        0|    |ILAR|   R|ILAR07|87271528|2022-06-25 18:23:01|
|25/06/2022 18:38|        1|    |EPOU|   R|EPOU11|87001479|2022-06-25 18:23:01|
|25/06/2022 18:40|        1|    |SOTE|   R|SOTE34|87758722|2022-06-25 18:23:01|
|25/06/2022 18:42|        1|    |ILAR|   R|ILAR13|87271528|2022-06-25 18:23:01|
|25/06/2022 18:46|        1|    |PITA|   R|PITA36|87758896|2022-06-25 18:23:01|
|25/06/2022 18:51|        1|    |EPOU|   R|EPOU17|87001479|2022-06-25 18:23:01|
|25/06/2022 18:55|        0|    |SOTE|   R|SOTE40|87758722|2022-06-25 18:23:01|
|25/06/2022 18:56|        1|    |ILAR|  

In [84]:
df_previous.show()

+----------------+---------+----+----+----+------+--------+-------------------+
|            date|direction|etat|miss|mode|   num|    term|      time_recorded|
+----------------+---------+----+----+----+------+--------+-------------------+
|25/06/2022 18:31|        1|    |PITA|   R|PITA30|87758896|2022-06-25 18:24:01|
|25/06/2022 18:35|        0|    |ILAR|   R|ILAR07|87271528|2022-06-25 18:24:01|
|25/06/2022 18:38|        1|    |EPOU|   R|EPOU11|87001479|2022-06-25 18:24:01|
|25/06/2022 18:40|        1|    |SOTE|   R|SOTE34|87758722|2022-06-25 18:24:01|
|25/06/2022 18:42|        1|    |ILAR|   R|ILAR13|87271528|2022-06-25 18:24:01|
|25/06/2022 18:46|        1|    |PITA|   R|PITA36|87758896|2022-06-25 18:24:01|
|25/06/2022 18:51|        1|    |EPOU|   R|EPOU17|87001479|2022-06-25 18:24:01|
|25/06/2022 18:55|        0|    |SOTE|   R|SOTE40|87758722|2022-06-25 18:24:01|
|25/06/2022 18:56|        1|    |ILAR|   R|ILAR19|87271528|2022-06-25 18:24:01|
|25/06/2022 19:01|        0|    |PITA|  

In [86]:
df_final = df_latest.join(df_previous, df_latest.num == df_previous.num, 'left_anti')

In [87]:
df_final.show()

+----------------+---------+----+----+----+------+--------+-------------------+
|            date|direction|etat|miss|mode|   num|    term|      time_recorded|
+----------------+---------+----+----+----+------+--------+-------------------+
|25/06/2022 18:25|        0|    |SOTE|   R|SOTE28|87758722|2022-06-25 18:23:01|
+----------------+---------+----+----+----+------+--------+-------------------+



In [98]:
from pyspark.sql.functions import lit

df_final.withColumn("time_arrived", lit(df_previous.collect()[0][-1])).show()

+----------------+---------+----+----+----+------+--------+-------------------+-------------------+
|            date|direction|etat|miss|mode|   num|    term|      time_recorded|       time_arrived|
+----------------+---------+----+----+----+------+--------+-------------------+-------------------+
|25/06/2022 18:25|        0|    |SOTE|   R|SOTE28|87758722|2022-06-25 18:23:01|2022-06-25 18:24:01|
+----------------+---------+----+----+----+------+--------+-------------------+-------------------+

