## Creation de la session Spark

In [None]:
from pyspark.sql import SparkSession

spark=(SparkSession
       .builder
       .master("local[*]")
       .appName("Spark streaming Process")
       .config("spark.streaming.stopGracefullyOnShutdown",True)
       .config("spark.jars.packages","org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0")
       .config("spark.sql.shuffle.partitions",8)
       .getOrCreate()
      ) 
spark

# Creation du df qui prend les données depuis Kafka

In [2]:
kafka_df=(
    spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers","ed-kafka:29092")
    .option("subscribe","device-data-3")
    .option("startingOffsets","earliest")
    .load()

)

# Montre le schema des données prises dans le kafka

In [3]:
kafka_df.printSchema()
#kafka_df.show()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



# Traitement des données

## Transformation du type de 'value' de binary vers string

In [4]:
from pyspark.sql.functions import expr

kafka_json_df = kafka_df.withColumn("value",expr("cast(value as string)"))


In [5]:
kafka_json_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: string (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [6]:
#kafka_json_df.show()

## Definition du schema des données venant de kafka

In [7]:
from pyspark.sql.types import StringType, StructField, StructType, ArrayType, LongType

json_schema = (
    StructType(
    [StructField('customerId', StringType(), True), 
    StructField('data', StructType(
        [StructField('devices', 
                     ArrayType(StructType([ 
                        StructField('deviceId', StringType(), True), 
                        StructField('measure', StringType(), True), 
                        StructField('status', StringType(), True), 
                        StructField('temperature', LongType(), True)
                    ]), True), True)
        ]), True), 
    StructField('eventId', StringType(), True), 
    StructField('eventOffset', LongType(), True), 
    StructField('eventPublisher', StringType(), True), 
    StructField('eventTime', StringType(), True)
    ])
)

## Recuperation de la partie 'value'

In [8]:
from pyspark.sql.functions import from_json, col

streaming_df=kafka_json_df.withColumn("values_json",from_json(col("value"),json_schema)).selectExpr("values_json.*")

In [9]:
streaming_df.printSchema()
#streaming_df.show()

root
 |-- customerId: string (nullable = true)
 |-- data: struct (nullable = true)
 |    |-- devices: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- deviceId: string (nullable = true)
 |    |    |    |-- measure: string (nullable = true)
 |    |    |    |-- status: string (nullable = true)
 |    |    |    |-- temperature: long (nullable = true)
 |-- eventId: string (nullable = true)
 |-- eventOffset: long (nullable = true)
 |-- eventPublisher: string (nullable = true)
 |-- eventTime: string (nullable = true)



## Explosion de la colonne device avec data_device puis de cette meme colonne en  nouvelle colonne 

In [10]:
from pyspark.sql.functions import explode

explode_df=streaming_df.withColumn("data_devices",explode("data.devices"))

In [11]:
explode_df.printSchema()

root
 |-- customerId: string (nullable = true)
 |-- data: struct (nullable = true)
 |    |-- devices: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- deviceId: string (nullable = true)
 |    |    |    |-- measure: string (nullable = true)
 |    |    |    |-- status: string (nullable = true)
 |    |    |    |-- temperature: long (nullable = true)
 |-- eventId: string (nullable = true)
 |-- eventOffset: long (nullable = true)
 |-- eventPublisher: string (nullable = true)
 |-- eventTime: string (nullable = true)
 |-- data_devices: struct (nullable = true)
 |    |-- deviceId: string (nullable = true)
 |    |-- measure: string (nullable = true)
 |    |-- status: string (nullable = true)
 |    |-- temperature: long (nullable = true)



## Recuperation des nouvelles colonnes

In [None]:
from pyspark.sql.functions import col
flatten_df=(
    explode_df
    .drop("data")
    .withColumn("deviceId",col("data_devices.deviceId"))
    .withColumn("measure",col("data_devices.measure"))
    .withColumn("status",col("data_devices.status"))
    .withColumn("temperature",col("data_devices.temperature"))
    .drop("data_devices")
)   

In [13]:
flatten_df.printSchema()
#flatten_df.show()

root
 |-- customerId: string (nullable = true)
 |-- eventId: string (nullable = true)
 |-- eventOffset: long (nullable = true)
 |-- eventPublisher: string (nullable = true)
 |-- eventTime: string (nullable = true)
 |-- deviceId: string (nullable = true)
 |-- measure: string (nullable = true)
 |-- status: string (nullable = true)
 |-- temperature: long (nullable = true)



## Ecriture en temps reels des données traitées dans un fichier csv

In [14]:
(flatten_df
 .writeStream
 .format("csv")
 .option("path", "data/output/device_data_1.csv")
 .queryName("kafka_table")
 .outputMode("append")
 .trigger(processingTime='10 seconds')
 .option("checkpointLocation","checkpoint_dir_kafka_9")
 .start()
 .awaitTermination())

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/opt/conda/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 