In [1]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, col, regexp_extract, reverse, regexp_replace
from pyspark.sql.types import FloatType

In [2]:
mongo_url = "mongodb://192.168.1.7:27018/streaming.calls"

In [3]:
# !pip install elasticsearch-hadoop

In [4]:
os.environ[
        'PYSPARK_SUBMIT_ARGS'] = '--packages org.mongodb.spark:mongo-spark-connector_2.12:3.0.0,org.elasticsearch:elasticsearch-hadoop:7.11.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 pyspark-shell'

In [5]:
spark = SparkSession.\
        builder.\
        appName("pyspark-streaming").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "512m").\
        config("spark.eventLog.enabled", "true").\
        config("spark.mongodb.output.uri", mongo_url).\
        config("spark.mongodb.input.uri", mongo_url).\
        config("spark.eventLog.dir", "file:///opt/workspace/events").\
        getOrCreate()      

Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
:: loading settings :: url = jar:file:/usr/local/lib/python3.9/dist-packages/pyspark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.mongodb.spark#mongo-spark-connector_2.12 added as a dependency
org.elasticsearch#elasticsearch-hadoop added as a dependency
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-b0d13bb6-fdf9-4785-8d31-e3c91f98e34c;1.0
	confs: [default]
	found org.mongodb.spark#mongo-spark-connector_2.12;3.0.0 in central
	found org.mongodb#mongodb-driver-sync;4.0.5 in central
	found org.mongodb#bson;4.0.5 in central
	found org.mongodb#mongodb-driver-core;4.0.5 in central
	found org.elasticsearch#elasticsearch-hadoop;7.11.0 in central
	found commons-logging#commons-logging;1.1.1 in central
	found commons-httpclient#commons-httpclient;3.0.1 in central
	found commons-codec#commons-c

In [6]:
def write_row_in_mongo(batch_df, batch_id):
    batch_df.write.format("mongo").mode("append").save()
    pass

In [7]:
kafka_df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka1:9092") \
        .option("subscribe", "CDR_APPEL") \
        .option("startingOffsets", "earliest") \
        .load() \
        .selectExpr("cast(key as string) key", "cast(value as string) value", "timestamp")

In [8]:
# kafka_df.show()

In [9]:
data_stream = kafka_df.select(
    split(kafka_df.value, "\\|").alias("data")
)

In [10]:
# data_stream.show()

In [11]:
# Créer la structure de données
schema = "Anumber STRING, Bnumber STRING, CallDate STRING, SiteName STRING, Coordinate STRING"
data_df = data_stream.selectExpr("data[0] AS Anumber", "data[1] AS Bnumber", "data[2] AS CallDate", "data[3] AS SiteName", "data[4] AS Coordinate")

In [12]:
# data_df.show()

In [13]:
# Filtrer les données pour ignorer la ligne avec certaine Anumber
filtered_stream = data_df.filter(~col("Anumber").contains("Anumber"))

In [14]:
# filtered_stream.show()

In [15]:
# Dans cet exemple, nous remplaçons la virgule par un espace
replace_stream = filtered_stream.withColumn("Coordinate", regexp_replace(col("Coordinate"), "\\{", ""))
replace_stream = replace_stream.withColumn("Coordinate", regexp_replace(col("Coordinate"), "\\}", ""))
# replace_stream.show()

In [16]:
filtered_stream = replace_stream.withColumn("Coordinate", reverse(col("Coordinate")))
# filtered_stream.show()

In [17]:
# Split the "name" column into first_name and last_name columns
split_df = filtered_stream.withColumn("split_values", split(col("Coordinate"), ",")) \
             .withColumn("Latitude", col("split_values")[0]) \
             .withColumn("Longitude", col("split_values")[1]) \
             .drop("split_values")\
             .drop("Coordinate")

# split_df.show()

In [19]:
# converted_df = split_df.withColumn("Latitude", col("Latitude").cast(FloatType()))
# converted_df = converted_df.withColumn("Longitude", col("Longitude").cast(FloatType()))

In [18]:
(split_df
     .writeStream
     .queryName("streaming")
     .foreachBatch(write_row_in_mongo)
     .start()
     .awaitTermination())

23/08/09 21:44:10 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-4ea7c2a8-0c04-4984-a798-069d084ac07b. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/08/09 23:19:39 ERROR TaskSchedulerImpl: Lost executor 1 on 172.24.0.10: worker lost
23/08/09 23:19:39 ERROR TaskSchedulerImpl: Lost executor 0 on 172.24.0.11: worker lost
23/08/09 23:53:05 WARN KafkaOffsetReader: Error in attempt 1 getting Kafka offsets: 
org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition CDR_APPEL-0 could be determined
23/08/10 00:47:49 ERROR TaskSchedulerImpl: Lost executor 2 on 172.24.0.10: worker lost
23/08/10 00:47:49 ERROR TaskSchedulerImpl: Lost executor 3 on 172.24.0.11: worker lost
23/08/10 01:09:41 ERROR TaskSchedulerIm

KeyboardInterrupt: 