# Streaming structuré à l'aide de l'API Python DataFrames

Apache Spark inclut une API de haut niveau dédiée au traitement des stream, [Structured Streaming](http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html). Dans ce notebook, nous examinons rapidement comment utiliser l'API DataFrame pour créer des applications de streaming structuré. Nous voulons calculer des métriques en temps réel comme les décomptes en cours d'exécution et les décomptes fenêtrés sur un flux d'actions horodatées (par exemple, ouvrir, fermer, etc.).

Pour exécuter ce notebook, importez-le et attachez-le à un cluster Spark.

In [None]:
import os
from google.colab import drive
drive.mount('/content/drive')

###Configurer PySpark dans Colab
Installer Pyspark et findspark.
findspark est important ici; Il localisera Spark sur le système et l'importera en tant que bibliothèque standard.

In [None]:
!pip install -q pyspark
!pip install -q findspark
print("installé")

In [None]:
os.environ["SPARK_HOME"] = "/usr/local/lib/python3.9/dist-packages/pyspark"
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"

## Sample Data
Nous avons quelques exemples de données d'action sous forme de fichiers dans `events/` que nous allons utiliser pour créer cette application. Examinons le contenu de ce répertoire.

In [None]:
dataDir = '/content/drive/MyDrive/tpspark/events'  #Attention !!! Mettez le chemin qui vous correspond.
os.listdir(dataDir)

Il y a environ 50 fichiers JSON dans le répertoire. Voyons ce que contient chaque fichier JSON.

In [None]:
!head '/content/drive/MyDrive/tpspark/events/file-0.json'

Chaque ligne du fichier contient un enregistrement JSON avec deux champs - 'time' et 'action'. Essayons d'analyser ces fichiers de manière interactive.

#Initialisation / Configuration / démarrage de Spark


In [None]:
import pandas as pd
from google.colab import data_table

# Definition des focntions display pour un meilleur affichage

def display(df, n=100):
  return data_table.DataTable(df.limit(n).toPandas(), include_index=False, num_rows_per_page=10)

def display2(df, n=20):
  pd.set_option('max_columns', None)
  pd.set_option('max_colwidth', None)
  return df.limit(n).toPandas().head(n)

print("display redéfini")

In [None]:
# Initialisation de Spark
import findspark
print("findspark.init() initialise les variables d'environnement pour spark")
findspark.init()

from pyspark.sql import SparkSession
from pyspark import SparkConf

# pour les dataframe et udf
from pyspark.sql import *
from datetime import *

In [None]:

# Démarrage session spark
# --------------------------
def demarrer_spark():
  local = "local[*]"
  appName = "TP"
  configLocale = SparkConf().setAppName(appName).setMaster(local)

  spark = SparkSession.builder.config(conf = configLocale).getOrCreate()
  sc = spark.sparkContext
  sc.setLogLevel("ERROR")


  # On ajuste l'environnement d'exécution des requêtes à la taille du cluster (4 coeurs)
  print("session démarrée, son id est ", sc.applicationId)
  return spark

spark = demarrer_spark()

## Batch/Interactive Processing
La première étape habituelle pour tenter de traiter les données consiste à interroger les données de manière interactive. Définissons un DataFrame statique sur les fichiers et donnons-lui un nom de table.

In [None]:
#from pyspark.sql.types import *

inputPath = "/content/drive/MyDrive/tpspark/events/"

# Puisque nous connaissons déjà le format des données, définissons le schéma pour accélérer le traitement (pas besoin de Spark pour déduire le schéma)
jsonSchema = StructType([ StructField("time", TimestampType(), True), StructField("action", StringType(), True) ])

# DataFrame statique représentant les données dans les fichiers JSON
staticInputDF = (
  spark
    .read
    .schema(jsonSchema)
    .json(inputPath)
)

display(staticInputDF)

Nous pouvons maintenant calculer le nombre d'actions "open" et "close" avec des fenêtres d'une heure. Pour ce faire, nous allons regrouper par la colonne d'action et une fenêtres d'une heure sur la colonne "time".

In [None]:
from pyspark.sql.functions import *      # pour la fonction window()

staticCountsDF = (
  staticInputDF
    .groupBy(
       staticInputDF.action,
       window(staticInputDF.time, "1 hour"))
    .count()
)
staticCountsDF.cache()

# A partir du DF, créer une table vistuelle 'static_counts'
staticCountsDF.createOrReplaceTempView("static_counts")

Nous pouvons maintenant utiliser directement SQL pour interroger la table. Par exemple, voici les décomptes totaux pour toutes les heures.

In [None]:
# %sql select action, sum(count) as total_count from static_counts group by action
spark.sql(""" select action, sum(count) as total_count from static_counts group by action """)show()

Que dire d'une chronologie des décomptes fenêtrés ?

---



---



In [None]:
%sql select action, date_format(window.end, "MMM-dd HH:mm") as time, count from static_counts order by time, action

## Stream Processing
Maintenant que nous avons analysé les données de manière interactive, convertissons cela en une requête en continu qui se met à jour en continu au fur et à mesure que les données arrivent. Puisque nous n'avons qu'un ensemble statique de fichiers, nous allons émuler un flux à partir d'eux en lisant un fichier à la fois, dans l'ordre chronologique de leur création. La requête que nous devons écrire est à peu près la même que la requête interactive ci-dessus.

In [None]:
from pyspark.sql.functions import *

# Semblable à la définition de staticInputDF ci-dessus, en utilisant simplement `readStream` au lieu de `read
streamingInputDF = (
  spark
    .readStream
    .schema(jsonSchema)               # Définir le schéma des données JSON
    .option("maxFilesPerTrigger", 1)  # Traiter une séquence de fichiers comme un flux en sélectionnant un fichier à la fois
    .json(inputPath)
)

# Même requête que staticInputDF
streamingCountsDF= (
  streamingInputDF
    .groupBy(
      streamingInputDF.action,
      window(streamingInputDF.time, "1 hour"))
    .count()
)


In [None]:

# Ce DF est-il réellement un DF en streaming ?
streamingCountsDF1.isStreaming

Comme vous pouvez le voir, streamingCountsDF est une dataframe streaming (streamingCountsDF.isStreaming 'is true'). Vous pouvez démarrer l'exécution en streaming en définissant le 'sink' (récepteur) et en le démarrant. Dans notre cas, nous voulons interroger les décomptes de manière interactive (mêmes requêtes que ci-dessus), nous allons donc définir l'ensemble complet des décomptes d'une heure dans une table en mémoire.

In [None]:
spark.conf.set("spark.sql.shuffle.partitions", "2")  # garder la taille des "shuffle" petite
query = (
  streamingCountsDF
    .writeStream
    .format("memory")        # mémoire = stocker la table en mémoire
    .queryName("counts")     # counts = nom de la table en mémoire
    .outputMode("complete")  # complet = tous les comptages doivent être dans la table
    .start()
    )

`query` est un handle de la requête de streaming qui s'exécute en arrière-plan. Cette requête récupère en permanence des fichiers et met à jour les décomptes fenêtrés.

Notez l'état de 'uery' dans la cellule ci-dessus. La barre de progression indique que la requête est active.
De plus, si vous développez le `> counts` ci-dessus, vous trouverez le nombre de fichiers qu'ils ont déjà traités.

Attendons un peu que quelques fichiers soient traités, puis interrogeons de manière interactive la table `counts` en mémoire.

In [None]:
from time import sleep
sleep(5)  # attendre un peu que le calcul démarre

In [None]:
%sql select action, date_format(window.end, "MMM-dd HH:mm") as time, count from counts order by time, action

Nous voyons la chronologie des comptages fenêtrés (similaire au comptage statique précédent) s'accumuler. Si nous continuons à exécuter cette requête interactive à plusieurs reprises, nous verrons les derniers décomptes mis à jour que la requête de streaming met à jour en arrière-plan.

In [None]:
sleep(5)  # attendre un peu plus pour que plus de données soient calculées

In [None]:
%sql select action, date_format(window.end, "MMM-dd HH:mm") as time, count from counts order by time, action

In [None]:
sleep(5)  # attendre un peu plus pour que plus de données soient calculées

In [None]:
%sql select action, date_format(window.end, "MMM-dd HH:mm") as time, count from counts order by time, action

::Voyons également le nombre total d'"open" et de "close".

In [None]:
%sql select action, sum(count) as total_count from counts group by action order by action

Si vous continuez à exécuter la requête ci-dessus à plusieurs reprises, vous constaterez toujours que le nombre d'"opens" est supérieur au nombre de "closes", comme prévu dans un flux de données où une "closes" apparaît toujours après l'"open" correspondante. Cela montre que le streaming structuré garantit le **prefix integrity**. Lisez les articles de blog liés ci-dessous si vous voulez en savoir plus.

Notez qu'il n'y a que quelques fichiers, donc en les consommant tous, il n'y aura pas de mises à jour des décomptes. Réexécutez la requête si vous souhaitez interagir à nouveau avec la requête de diffusion en continu.

Enfin, vous pouvez arrêter l'exécution de la requête en arrière-plan, soit en cliquant sur le lien 'Annuler' dans la cellule de la requête, soit en exécutant `query.stop()`. Dans les deux cas, lorsque la requête est arrêtée, le statut de la cellule correspondante ci-dessus sera automatiquement mis à jour vers `TERMINATED`.