%md

# Fenêtres temporelles

* On utilise les fenêtres glissantes pour agréger des morceaux de données plutôt que toutes les données (par exemple, toutes les 5 minutes ou toutes les heures)
* On applique le watermarking pour jeter les anciennes données obsolètes que vous n'avez pas l'espace de conserver
* `display` permet de tracez des graphiques en direct (sur databricks)


%md

<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Agrégations en Streaming</h2>

Les applications continues nécessitent souvent des décisions quasi en temps réel sur des statistiques agrégées en temps réel.

Quelques exemples incluent
* Agréger les erreurs dans les données des dispositifs IoT par type
* Détecter un comportement anormal dans le fichier journal d'un serveur en agrégeant par pays
* Faire une analyse de comportement sur les messages instantanés via des hashtags.

Cependant, dans le cas des flux, vous ne voulez généralement pas exécuter des agrégations sur l'ensemble du jeu de données.


%md
Exemple agrégations en Streaming

Fichers à examiner : `/mnt/training/sensor-data/accelerometer/time-series-stream.json/`.

Chaque ligne du fichier contient un enregistrement JSON avec deux champs : `time` et `action`

De nouveaux fichiers sont continuellement écrits dans ce répertoire (en streaming).

Théoriquement, ce processus n'a pas de fin.


In [None]:
import os
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip
from pyspark.sql.functions import col, from_json, to_timestamp
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

# Netoyage des variables d'env qui peuvent casser le bind du driver
for var in ["SPARK_LOCAL_IP", "SPARK_DRIVER_BIND_ADDRESS", "SPARK_DRIVER_HOST"]:
    os.environ.pop(var, None)

# Packages Kafka pour Spark (adapter la version si ton pyspark n'est pas 3.5.*)
EXTRA_PACKAGES = [
    "org.apache.spark:spark-sql-kafka-0-10_2.13:3.5.2",
]

builder = (
    SparkSession.builder
    .appName("SmartTech-Streaming-Silver")
    .master("local[*]")
    .config("spark.driver.bindAddress", "127.0.0.1")
    .config("spark.driver.host", "localhost")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
)

# on passe Kafka via extra_packages
spark = configure_spark_with_delta_pip(builder, extra_packages=EXTRA_PACKAGES).getOrCreate()
spark.sparkContext.setLogLevel("WARN")


In [None]:
# Définir le schéma pour le contenu en streaming
inputPath = "/.../veille/stream_events"
jsonSchema = "time timestamp, action string"


%md

Définir un DataFrame de streaming initial `inputDf` puis `countsDF` qui représente une agrégation 


In [4]:
from pyspark.sql.functions import window, col

inputDF = (spark
  .readStream                                 
  .schema(jsonSchema)                         
  .option("maxFilesPerTrigger", 1)            
  .json(inputPath)                           
)

countsDF = (inputDF
  .groupBy(col("action"),                     # agrège les données par action
           window(col("time"), "1 hour"))     # puis par des fenêtres temporelles d'une heure
  .count()                                    # produit un décompte pour chaque agrégation
  .select(col("window.start").alias("start"), # Transforme le champ en une colonne
          col("action"),                      # Inclut l'action
          col("count"))                       # Inclut le décompte
  .orderBy(col("start"), col("action"))       # Trie par heure de début et par action
)


%md

Résultats requête


In [None]:
#spark.conf.set("spark.sql.shuffle.partitions", sc.defaultParallelism)

#display(countsDF)


# Problème avec la génération de nombreuses fenêtres (Structured Streaming)

Quand on fait une agrégation fenêtrée en Structured Streaming (par exemple `window(time, "1 hour")` + `count()`), Spark doit **maintenir un état (state)** pour **chaque fenêtre** tant qu’il considère que de nouveaux événements peuvent encore arriver pour cette fenêtre.

## Pourquoi ça peut devenir un problème ?
- Chaque fenêtre (ou chaque combinaison de clés + fenêtre) correspond à une portion d’état à conserver.
- **Sans limite temporelle**, Spark doit supposer que des données *en retard* (late data) peuvent arriver indéfiniment pour des fenêtres passées.
- Résultat : le nombre de fenêtres conservées **augmente sans limite**, l’état grossit et on peut observer :
  - ralentissements importants,
  - consommation mémoire élevée côté **executors** (state store),
  - voire des erreurs de type **Out Of Memory**.

> Remarque : l’état est principalement stocké dans le *state store* sur les executors (pas “dans le driver”), même si le driver coordonne l’exécution.

---

## Solutions possibles

### 1) Augmenter la taille des fenêtres (ex: 4 heures)
Augmenter la durée de la fenêtre peut réduire le **nombre** de fenêtres actives à gérer à un instant donné.

Avantage : moins de fenêtres simultanées.  
Limite : si le job tourne longtemps, on accumule quand même un historique potentiellement illimité → l’état continue à croître.

---

### 2) Utiliser un watermark (solution recommandée)
Le **watermark** permet de définir une **limite de retard acceptable** sur la colonne de temps d’événement.

Exemple :
```python
df = (df
  .withWatermark("time", "10 minutes")
  .groupBy(window(col("time"), "1 hour"))
  .count()
)
```

#### Que dit le watermark à Spark ?
> “Je n’attends pas d’événements plus anciens que *max(event_time) - 10 minutes*.”

#### Effets concrets
- Spark peut **finaliser** les fenêtres trop anciennes (au-delà du watermark).
- L’état associé à ces fenêtres peut être **nettoyé / supprimé**.
- Les événements arrivant **après** la limite (trop en retard) peuvent être ignorés (selon le mode de sortie).

Avantage : l’état devient **borné dans le temps**, ce qui stabilise l’utilisation mémoire et améliore la robustesse des agrégations fenêtrées sur de longues durées.

---

## À retenir
- Le watermark sert d’abord à **gérer la donnée en retard** et à **borner l’état**.
- La prévention des erreurs mémoire est une **conséquence** : en permettant le nettoyage des fenêtres anciennes, on évite une croissance infinie de l’état.


%md

Indiquer à Structured Streaming de conserver au maximum 2 heures de données agrégées.


In [None]:
watermarkedDF = (inputDF
  .withWatermark("time", "2 hours")           # limite temporelle  de 2 heures
  .groupBy(col("action"),                     # agrège les données par action
           window(col("time"), "1 hours"))     # puis par des fenêtres temporelles d'une heure
  .count()                                    # produit un décompte pour chaque agrégation
  .select(col("window.start").alias("start"), # Transforme le champ en une colonne
          col("action"),                      # Inclut l'action
          col("count"))                       # Inclut le décompte
  .orderBy(col("start"), col("action"))       # Trie par heure de début et par action
)
display(watermarkedDF)                        # Démarre le flux et l'affiche




DataFrame[start: timestamp, action: string, count: bigint]

25/12/15 16:28:52 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 3000} milliseconds, but spent 9344 milliseconds
25/12/15 16:29:01 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 3000} milliseconds, but spent 8595 milliseconds
25/12/15 16:29:08 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 3000} milliseconds, but spent 7000 milliseconds

%md
**Commentaire :**
* `window` est une colonne spéciale générée par l'agrégation avec window(col("time"), "2 hour"). Elle contient des informations sur la plage temporelle de chaque fenêtre, notamment les champs start (début) et end (fin).
* `col("window.start")` accède au champ start, qui représente l'heure de début de la fenêtre temporelle.



Les données reçues dans les 2 heures suivant le marquage temporel seront toujours traitées. Les données reçues 2 heures _après_ le marquage temporel peuvent ne pas être traitées. Plus les données sont retardées, moins il est probable que le moteur les traite.


In [None]:
#spark.sql("create database if not exists outputDelta2;")


DataFrame[]

In [None]:
# Ecriture du streaming dans une table Delta
checkpointPath = "/.../veille/stream_events/streamevent2.delta.checkpoint"    # Sous-répertoire pour les fichiers de point de contrôle
# Définition de la requête de streaming
streamingQuery = (watermarkedDF                     
  .writeStream                                    
  .queryName("stream_6")                         
  .trigger(processingTime="3 seconds")            
  .format("delta")                             
  .option("checkpointLocation", checkpointPath)  
  .outputMode("complete")                           
  .toTable("outputDelta2.gold2")                           
)


In [None]:
spark.sql("SELECT * FROM outputDelta2.gold").show(truncate=False)

#spark.table("outputDelta2.gold").show(truncate=False)
#spark.sql("SHOW DATABASES").show(truncate=False)
#spark.sql("SHOW TABLES IN outputDelta2").show(truncate=False)


In [None]:
# Liste flux actifs
for stream in spark.streams.active:      # Loop over all active streams
    print(" {} ({})".format(stream.name, stream.id))


 stream_3 (6a751a17-4129-41e2-af3a-173a7e8ed265)


25/12/15 16:14:47 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 3000} milliseconds, but spent 13240 milliseconds
25/12/15 16:14:54 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 3000} milliseconds, but spent 7667 milliseconds
25/12/15 16:15:02 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 3000} milliseconds, but spent 7820 milliseconds
25/12/15 16:15:09 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 3000} milliseconds, but spent 7198 milliseconds
25/12/15 16:15:17 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 3000} milliseconds, but spent 7613 milliseconds
25/12/15 16:15:24 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 3000} milliseconds, but spent 6996 milliseconds
25/12/15 16:15:32 WARN ProcessingTimeExecutor: Current batch is falling behind. The tri

In [16]:
# Arrêter tous les flux
for s in spark.streams.active: 
  s.stop()


25/12/15 16:31:02 ERROR DeltaFileFormatWriter: Aborting job b5953db9-0742-44a4-8295-0f9e6f91786d
java.lang.InterruptedException
	at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1048)
	at scala.concurrent.impl.Promise$DefaultPromise.tryAwait0(Promise.scala:243)
	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:255)
	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:104)
	at org.apache.spark.util.ThreadUtils$.awaitReady(ThreadUtils.scala:374)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:998)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2484)
	at org.apache.spark.sql.delta.files.DeltaFileFormatWriter$.$anonfun$executeWrite$1(DeltaFileFormatWriter.scala:269)
	at org.apache.spark.sql.delta.files.DeltaFileFormatWriter$.writeAndCommit(DeltaFileFormatWriter.scala:302)
	at org.apache.spark.sql.delta.files.DeltaFileFormatWriter$.executeWrite(D

25/12/15 16:31:02 WARN Shell: Interrupted while joining on: Thread[Thread-560154,5,]
java.lang.InterruptedException
	at java.base/java.lang.Object.wait(Native Method)
	at java.base/java.lang.Thread.join(Thread.java:1313)
	at java.base/java.lang.Thread.join(Thread.java:1381)
	at org.apache.hadoop.util.Shell.joinThread(Shell.java:1103)
	at org.apache.hadoop.util.Shell.runCommand(Shell.java:1063)
	at org.apache.hadoop.util.Shell.run(Shell.java:959)
	at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1282)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:1377)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:1359)
	at org.apache.hadoop.fs.FileUtil.readLink(FileUtil.java:225)
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileLinkStatusInternal(RawLocalFileSystem.java:1251)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1240)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatus(RawLocalFileS