
## Traitement de flux structuré avec l'API DataFrame Python
Apache Spark inclut une API de traitement de flux de haut niveau : Structured Streaming.
<br>Dans ce TP, nous jetons un coup d'œil rapide l'utilisation de l'API DataFrame pour construire des applications Structured Streaming.
<br>Dans un premier temps, nous réviserons les éléments vus précédemment pour explorer rapidement les données.
<br>Dans un second temps, nous calculerons des métriques en temps réel (dans notre exemple, ce seront des "running counts" et "windowed counts" sur un flux d'actions horodatées).


Nous allons utiliser les 50 fichiers du dossier events. Chaque ligne de fichier contient un enregistrement json avec deux champs : `time` and `action`. Nous allons analyser ces fichiers de manière interactive.

In [68]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Lab5StreamingApp") \
    .config("spark.eventLog.enabled", "true") \
    .config("spark.eventLog.dir", "../../spark-logs") \
    .getOrCreate()

In [69]:
from pyspark.sql.functions import col

## Traitement par lots/interactif
La première étape pour tenter de traiter les données est de les interroger de manière interactive.
Nous allons définir un DataFrame statique sur les fichiers.

In [41]:
input_path = "../../data/events"

Définir le schéma (au lieu de demander à Spark de l'inférer).

In [42]:
from pyspark.sql.types import StructType, TimestampType, StringType, StructField

json_schema = StructType(
    [
        StructField("time", TimestampType(), True),
        StructField("action", StringType(), True),
    ]
)

In [43]:
event_df = spark.read.json(input_path, schema=json_schema)
event_df.printSchema()

root
 |-- time: timestamp (nullable = true)
 |-- action: string (nullable = true)



In [44]:
event_df.toPandas()

Unnamed: 0,time,action
0,2016-07-28 04:19:28,Close
1,2016-07-28 04:19:28,Close
2,2016-07-28 04:19:29,Open
3,2016-07-28 04:19:31,Close
4,2016-07-28 04:19:31,Open
...,...,...
99995,2016-07-26 04:16:53,Close
99996,2016-07-26 04:17:01,Open
99997,2016-07-26 04:17:06,Open
99998,2016-07-26 04:17:10,Open


Calculer le nombre d'actions "open" et "close" dans des fenêtres d'une heure.
<br>Indication : utiliser un GROUP BY sur la colonne `action` et des fenêtres (windows) sur la colonne `time`.
<br>Réaliser cette tâche en DSL.
<br>Mettre en cache (en mémoire) le DataFrame obtenu.

In [45]:
from pyspark.sql.functions import window

static_counts_df = event_df.groupBy(
    col("action"),
    window(col("time"), "1 hour")
).count().cache()

In [46]:
static_counts_df.toPandas()

Unnamed: 0,action,window,count
0,Close,"(2016-07-26 13:00:00, 2016-07-26 14:00:00)",1028
1,Open,"(2016-07-27 04:00:00, 2016-07-27 05:00:00)",995
2,Open,"(2016-07-26 11:00:00, 2016-07-26 12:00:00)",991
3,Open,"(2016-07-26 10:00:00, 2016-07-26 11:00:00)",1007
4,Close,"(2016-07-27 03:00:00, 2016-07-27 04:00:00)",1025
...,...,...,...
99,Open,"(2016-07-26 04:00:00, 2016-07-26 05:00:00)",999
100,Close,"(2016-07-26 03:00:00, 2016-07-26 04:00:00)",344
101,Open,"(2016-07-26 03:00:00, 2016-07-26 04:00:00)",1001
102,Open,"(2016-07-26 02:00:00, 2016-07-26 03:00:00)",179


<br>Déterminer le nombre total d'actions "open" et d'actions "close" (sur l'ensemble de la période).
<br>Réaliser cette tâche en SQL.

In [47]:
static_counts_df.createOrReplaceTempView("static_count_view")

static_total_df = spark.sql(
    """
    SELECT action, sum(count) as total_count
    FROM static_count_view
    GROUP BY action
    """
)
static_total_df.show()

+------+-----------+
|action|total_count|
+------+-----------+
|  Open|      50000|
| Close|      50000|
+------+-----------+



Exécuter la commande suivante pour obtenir un visuel des actions dans le temps avec un affichage plus lisible de la plage d'heure.

In [48]:
from pyspark.sql.functions import date_format, col

df_result = static_counts_df.select(
    "action",
    date_format(col("window").start, "MMM-dd HH:mm").alias("time"),
    "count"
).orderBy("time", "action")

In [49]:
df_result.toPandas()

Unnamed: 0,action,time,count
0,Close,Jul-26 02:00,11
1,Open,Jul-26 02:00,179
2,Close,Jul-26 03:00,344
3,Open,Jul-26 03:00,1001
4,Close,Jul-26 04:00,815
...,...,...,...
99,Open,Jul-28 03:00,996
100,Close,Jul-28 04:00,960
101,Open,Jul-28 04:00,825
102,Close,Jul-28 05:00,671


Vous devriez remarquer que les actions de fermeture suivent les actions d'ouverture correspondantes (il y a plus d'"open" au début et plus de "close" à la fin).

### Traitement de flux (stream processing)
Maintenant que nous avons analysé les données de manière interactive, nous allons créer une requête de traitement de flux qui se met à jour en continu, à mesure que les données arrivent.
<br>Étant donné que nous avons simplement un ensemble statique de fichiers, nous allons émuler un flux à partir d'eux en lisant un fichier à la fois, dans l'ordre chronologique de leur création.
<br> A noter que le code que nous allons écrire est à peu près le même que précédemment. Merci l'API de Spark !

Adapter le code créé précédemment pour calculer le nombre d'actions "open" et "close", en utilisant readStream à la place de read. Utiliser l'option maxFilesPerTrigger afin de spécifier qu'on ne prend qu'un ficher à la fois (les options doivent être placées avant le chargement effectif à partir de la méthode json).

In [70]:
streaming_event_df = spark.readStream.option("maxFilesPerTrigger", 1).json(input_path, schema=json_schema)

In [71]:
streaming_counts_df = streaming_event_df.groupBy(
    col("action"),
    window(col("time"), "1 hour")
).count()

In [52]:
streaming_counts_df.show()

AnalysisException: Queries with streaming sources must be executed with writeStream.start();
FileSource[../../data/events]

En fouillant (avec le nez fin) la documentation des DataFrames (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/dataframe.html), vérifier si le DataFrame que nous venons de créer est bien un DataFrame de streaming.

In [53]:
streaming_counts_df.isStreaming

True

In [54]:
static_counts_df.isStreaming

False

<br>Vous pouvez démarrer le calcul en continu en définissant la destination (le "sink") et en le démarrant.
<br>Dans notre cas, nous voulons interroger de manière interactive les comptages (mêmes requêtes que ci-dessus), nous allons donc définir l'ensemble complet des comptages d'une heure dans une table en mémoire (pour cela il faudra utiliser format avec l'option "memory").
<br>Utiliser la fonction writeStream pour démarrer la requête.
<br>Donner un nom à la table in-memory, afin de pouvoir y faire référence dans la suite. Pour cela, utiliser queryName.
<br>Choisir un outputMode "complete".

In [72]:
query = streaming_counts_df.writeStream.format("memory").queryName("streaming_counts").outputMode("complete").start()

`query` est une référence à la requête de streaming qui s'exécute en arrière-plan. Cette requête récupère continuellement les fichiers et met à jour les comptages fenêtrés. A noter que nous ne faisons que simuler une arrivée continue de fichiers, grâce à l'option maxFilesPerTrigger. Nous n'avons pas précisé d'intervalle de temps entre chaque trigger (il est possible de le paramétrer).

Si vous travaillez avec l'IDE de Databricks, vous pouvez alors remarquer le statut de la requête dans la cellule ci-dessus (la barre de progression montrerait que la requête est active). Dans ce cas, développer `> counts` permet alors de retrouver le nombre de fichiers traités.

Dans tous les cas, l'attribut isActive permet de vérifier si une query est active. Vérifier que notre query est bien active.

In [73]:
query.isActive

True

Attendre un peu (physiquement ou en utilisant la fonction sleep de la bibliothèque time) puis lancer la requête sql permettant de visualiser les comptages dans le temps sur le DataFrame de streaming. Ne pas oublier que nous avons donné un nom particulier à la table in-memory.

In [74]:
df_result = static_counts_df.select(
    "action",
    date_format(col("window").start, "MMM-dd HH:mm").alias("time"),
    "count"
).orderBy("time", "action")

In [75]:
sql_query = """
    SELECT action, date_format(window.start, "MMM-dd HH:mm") as time, count
    FROM streaming_counts
    ORDER BY time, action
"""

In [76]:
spark.sql(sql_query).toPandas()

Unnamed: 0,action,time,count
0,Close,Jul-26 02:00,11
1,Open,Jul-26 02:00,179
2,Close,Jul-26 03:00,344
3,Open,Jul-26 03:00,1001
4,Close,Jul-26 04:00,176
5,Open,Jul-26 04:00,289


Continuer à exécuter la requête de manière itérative.

In [77]:
spark.sql(sql_query).toPandas()

Unnamed: 0,action,time,count
0,Close,Jul-26 02:00,11
1,Open,Jul-26 02:00,179
2,Close,Jul-26 03:00,344
3,Open,Jul-26 03:00,1001
4,Close,Jul-26 04:00,815
5,Open,Jul-26 04:00,999
6,Close,Jul-26 05:00,323
7,Open,Jul-26 05:00,328
8,Close,Jul-26 13:00,699
9,Open,Jul-26 13:00,656


Visualiser le nombre total d'actions "open" et "close".

In [93]:
streaming_total_df = spark.sql(
    """
    SELECT action, sum(count) as total_count
    FROM streaming_counts
    GROUP BY action
    """
)
streaming_total_df.toPandas()

Unnamed: 0,action,total_count
0,Open,50503
1,Close,51497


Si vous exécutez la requête ci-dessus de manière répétée, vous constaterez que le nombre d'actions "open" est bien en permanence plus élevé que le nombre d'actions "close", comme anticipable dans un flux de données où une fermeture apparaît toujours après l'ouverture qui lui correspondant. Cela montre que Structured Streaming garantit l'"intégrité du préfixe" (prefix integrity).

Arrêter la requête en exécutant query.stop(). Vérifier le statut de la cellule correspondante à la requête.
<br>Ici bien entendu, nous n'avons que quelques fichiers, et il est possible qu'au moment où vous arrivez à cette cellule, ils soient tous consommés. Dans ce cas, il n'y a plus de mise à jour des comptages. Mais il faut quand même arrêter notre requête proprement !

In [90]:
query.stop()

Arrêter la requête n'arrête pas la session Spark. Arrêter la session Spark.

In [95]:
spark.stop()