# Utlisation du Spark StructuredStreaming et Spark DataFrames en général

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [None]:
spark = SparkSession.builder.getOrCreate()

Charger les fichiers sur '/databricks-datasets/iot-stream/data-device' comme DataFrame et inspectez-les  

Faire les opérations habituelles

In [None]:
inputPath = '/databricks-datasets/iot-stream/data-device'

In [None]:
%fs ls /databricks-datasets/iot-stream/data-device

path,name,size,modificationTime
dbfs:/databricks-datasets/iot-stream/data-device/part-00000.json.gz,part-00000.json.gz,2610922,1532465738000
dbfs:/databricks-datasets/iot-stream/data-device/part-00001.json.gz,part-00001.json.gz,2612478,1532465738000
dbfs:/databricks-datasets/iot-stream/data-device/part-00002.json.gz,part-00002.json.gz,2619023,1532465738000
dbfs:/databricks-datasets/iot-stream/data-device/part-00003.json.gz,part-00003.json.gz,2620016,1532465738000
dbfs:/databricks-datasets/iot-stream/data-device/part-00004.json.gz,part-00004.json.gz,2618699,1532465738000
dbfs:/databricks-datasets/iot-stream/data-device/part-00005.json.gz,part-00005.json.gz,2619772,1532465738000
dbfs:/databricks-datasets/iot-stream/data-device/part-00006.json.gz,part-00006.json.gz,2619027,1532465738000
dbfs:/databricks-datasets/iot-stream/data-device/part-00007.json.gz,part-00007.json.gz,2619832,1532465738000
dbfs:/databricks-datasets/iot-stream/data-device/part-00008.json.gz,part-00008.json.gz,2617893,1532465738000
dbfs:/databricks-datasets/iot-stream/data-device/part-00009.json.gz,part-00009.json.gz,2619764,1532465738000


In [None]:
df = spark.read.format("json").load(inputPath)

In [None]:
df.show(truncate=False)

+--------------+---------+------+------------+---------+--------------------------+-------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|calories_burnt|device_id|id    |miles_walked|num_steps|timestamp                 |user_id|value                                                                                                                                                                    |
+--------------+---------+------+------------+---------+--------------------------+-------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|250.7         |5        |950000|2.507       |5014     |2018-07-22 06:44:25.732267|24     |{"user_id": 24, "calories_burnt": 250.6999969482422, "num_steps": 5014, "miles_walked": 2.506999969482422, "time_stamp": "2

Afficher le schéma et créer une structure avec ce schéma  
Ceci est nécessaire pour créer un readStream ou le schéma doit être fourni

In [None]:
df.printSchema()

root
 |-- calories_burnt: double (nullable = true)
 |-- device_id: long (nullable = true)
 |-- id: long (nullable = true)
 |-- miles_walked: double (nullable = true)
 |-- num_steps: long (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- user_id: long (nullable = true)
 |-- value: string (nullable = true)



In [None]:
jsonSchema = StructType(
  [ 
    StructField("calories_burnt", DoubleType(), True),
    StructField("device_id", LongType(), True),
    StructField("id", LongType(), True),
    StructField("miles_walked", DoubleType(), True),
    StructField("num_steps", LongType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("user_id", LongType(), True),
    StructField("value", StringType(), True)
  ]
)

Créer un objet readStream et relisez les fichiers ; utiliser le schéma créé avant

In [None]:
streamingInputDF = (
  spark
    .readStream
    .format("file")
    .schema(jsonSchema)
    .option("maxFilesPerTrigger", 1)  # Treat a sequence of files as a stream by picking one file at a time
    .json(inputPath)
)

Créer la requête liée au readStream  

Utiliser une fenêtre (choisir la durée et la séparation) et grouper par cette fenêtre et l'id de l'appareil  

Agréger ensemble min, max, moyenne, somme de calories_burnt, miles_walked, num_steps

In [None]:
streamingAggDF = (                 
  streamingInputDF
  .withWatermark("timestamp", "1 minutes") \
  .groupBy(
    window("timestamp", "1 hour", "1 hour"),
    streamingInputDF.device_id
  ).agg(
    max(streamingInputDF.calories_burnt).alias("max_cal"),
    max(streamingInputDF.miles_walked).alias("max_miles"),
    max(streamingInputDF.num_steps).alias("max_steps"),
    min(streamingInputDF.calories_burnt).alias("min_cal"),
    min(streamingInputDF.miles_walked).alias("min_miles"),
    min(streamingInputDF.num_steps).alias("min_steps"),
    avg(streamingInputDF.calories_burnt).alias("avg_cal"),
    avg(streamingInputDF.miles_walked).alias("avg_miles"),
    avg(streamingInputDF.num_steps).alias("avg_strps"),
    sum(streamingInputDF.calories_burnt).alias("sum_cal"),
    sum(streamingInputDF.miles_walked).alias("sum_miles"),
    sum(streamingInputDF.num_steps).alias("sum_steps")
  )
)

Assurer-vous que la source est en streaming

In [None]:
streamingAggDF.isStreaming

Out[20]: True

Créer l'objet writeStream avec format "memory"  
Ceci sera le data sink

In [None]:
query = (
  streamingAggDF
    .writeStream
    .format("memory")        # memory = store in-memory table 
    .queryName("aggs")     # counts = name of the in-memory table
    .outputMode("complete")  # complete = all the counts should be in the table
    .start()
)

Faire des requêtes répétées au data sink pour assurer qu'il reçoit des données  
Choisir n'importe quelle requête; compter le nombre d'éléments suffit

In [None]:
spark.sql("select count(*) from aggs").show()

+--------+
|count(1)|
+--------+
|    2440|
+--------+



In [None]:
spark.sql("select * from aggs").show()

+--------------------+---------+---------+---------+---------+---------+---------+---------+------------------+------------------+-----------------+------------------+------------------+---------+
|              window|device_id|  max_cal|max_miles|max_steps|  min_cal|min_miles|min_steps|           avg_cal|         avg_miles|        avg_strps|           sum_cal|         sum_miles|sum_steps|
+--------------------+---------+---------+---------+---------+---------+---------+---------+------------------+------------------+-----------------+------------------+------------------+---------+
|{2018-07-20 20:00...|        9|    575.8|    5.758|    11516|60.000004|      0.6|     1200|325.69603268253974| 3.256960317460317|6513.920634920635| 41037.70011800001|410.37699999999995|   820754|
|{2018-07-24 12:00...|        9|   577.65|   5.7765|    11553|    55.65|   0.5565|     1113|313.73269253846155|3.1373269230769227|6274.653846153846|       40785.25003|407.85249999999996|   815705|
|{2018-07-21 04

In [None]:
spark.sql("select count(*) from aggs").show()

+--------+
|count(1)|
+--------+
|    2440|
+--------+



Arrêter la requête soit avec une instruction, soit en cliquant sur la cellule

In [None]:
query.stop()

Créer un autre objet writeStream avec la même requête  
Cette fois on va écrire sur des fichiers  
Le mode n'est pas "complete" mais "append"  
Donc n'oublier pas le waterMark  
Attention: withWatermark(df.time...) ne marche pas, il faut utliser withWatermark("time"...)

In [None]:
streamingAggDF = (                 
  streamingInputDF \
  .withWatermark("timestamp", "1 hours") \
  .groupBy(
    window("timestamp", "1 hours", "1 hours"),
    streamingInputDF.device_id
  ).count()
)

Sauvegarder le datasink sur un répertoire an utilisant un format adapté ; essayer différents formats.
Est-ce que csv marche ? json ? parquet ?

In [None]:
%fs rm -r /FileStore/tables/test_streaming_cp

In [None]:
%fs rm -r /FileStore/tables/test_streaming_csv

In [None]:
%fs rm -r /FileStore/tables/test_streaming_aggs

In [None]:
%fs rm -r /FileStore/tables/aggs_parquet

In [None]:
%fs rm -r /FileStore/tables/aggs

In [None]:
%fs rm -r /FileStore/tables/aggs_c

In [None]:
# Better set path and checkpointLocation at the same value

outPath = '/FileStore/tables/aggs'

query = (
  streamingAggDF
  .writeStream
  .format("json")
  .option("checkpointLocation", outPath)
  .option("path", outPath)
  .start()
)

Liser les données enregistrées dans une base de données et explorez-les ; essayer différentes requêtes sur le DataFrame

In [None]:
df = spark.read.format('json').load(outPath)

In [None]:
df.show(truncate=False)

+-----+---------+----------------------------------------------------+
|count|device_id|window                                              |
+-----+---------+----------------------------------------------------+
|43   |18       |{2018-07-20T05:00:00.000Z, 2018-07-20T04:00:00.000Z}|
|87   |9        |{2018-07-20T20:00:00.000Z, 2018-07-20T19:00:00.000Z}|
|34   |6        |{2018-07-20T16:00:00.000Z, 2018-07-20T15:00:00.000Z}|
|46   |16       |{2018-07-20T08:00:00.000Z, 2018-07-20T07:00:00.000Z}|
|37   |17       |{2018-07-21T13:00:00.000Z, 2018-07-21T12:00:00.000Z}|
|40   |20       |{2018-07-20T17:00:00.000Z, 2018-07-20T16:00:00.000Z}|
|42   |20       |{2018-07-23T10:00:00.000Z, 2018-07-23T09:00:00.000Z}|
|40   |8        |{2018-07-21T05:00:00.000Z, 2018-07-21T04:00:00.000Z}|
|36   |16       |{2018-07-20T01:00:00.000Z, 2018-07-20T00:00:00.000Z}|
|55   |15       |{2018-07-24T13:00:00.000Z, 2018-07-24T12:00:00.000Z}|
|44   |17       |{2018-07-20T05:00:00.000Z, 2018-07-20T04:00:00.000Z}|
|46   

In [None]:
outPath = '/FileStore/tables/aggs_csv'

query = (
  streamingAggDF
  .writeStream
  .format("csv")
  .option("checkpointLocation", outPath)
  .option("path", outPath)
  .start()
)

In [None]:
outPath = '/FileStore/tables/aggs_parquet'

query = (
  streamingAggDF
  .writeStream
  .format("parquet")
  .option("checkpointLocation", outPath)
  .option("path", outPath)
  .start()
)

In [None]:
df1 = spark.read.format('parquet').load(outPath)

In [None]:
df1.show(truncate=False)

+------------------------------------------+---------+-----+
|window                                    |device_id|count|
+------------------------------------------+---------+-----+
|{2018-07-20 04:00:00, 2018-07-20 05:00:00}|18       |43   |
|{2018-07-20 19:00:00, 2018-07-20 20:00:00}|9        |87   |
|{2018-07-20 15:00:00, 2018-07-20 16:00:00}|6        |34   |
|{2018-07-20 07:00:00, 2018-07-20 08:00:00}|16       |46   |
|{2018-07-21 12:00:00, 2018-07-21 13:00:00}|17       |37   |
|{2018-07-20 16:00:00, 2018-07-20 17:00:00}|20       |40   |
|{2018-07-23 09:00:00, 2018-07-23 10:00:00}|20       |42   |
|{2018-07-21 04:00:00, 2018-07-21 05:00:00}|8        |40   |
|{2018-07-20 00:00:00, 2018-07-20 01:00:00}|16       |36   |
|{2018-07-24 12:00:00, 2018-07-24 13:00:00}|15       |55   |
|{2018-07-20 04:00:00, 2018-07-20 05:00:00}|17       |44   |
|{2018-07-20 22:00:00, 2018-07-20 23:00:00}|8        |46   |
|{2018-07-24 14:00:00, 2018-07-24 15:00:00}|4        |38   |
|{2018-07-19 21:00:00, 2

In [None]:
%fs rm -r /FileStore/tables/test_streaming_cp

In [None]:
%fs rm -r /FileStore/tables/test_streaming_csv

In [None]:
%fs rm -r /FileStore/tables/test_streaming_aggs

In [None]:
%fs rm -r /FileStore/tables/aggs_parquet


In [None]:
%fs rm -r /FileStore/tables/aggs

In [None]:
%fs rm -r /FileStore/tables/aggs_c