# TP : Spark Streaming

**Objectif :** analyser en temps réel des messages texte envoyés via un socket et calculer le **nombre d’occurrences de chaque mot toutes les 5 secondes**.


## 1) Installer PySpark

Installation de PySpark dans le runtime Colab.

In [1]:
!pip -q install pyspark

## (Optionnel) Masquer le warning de dépréciation

Spark peut afficher un `FutureWarning` indiquant que DStream est déprécié. Ce n'est **pas** une erreur. Ici on le masque pour un affichage plus propre.

In [11]:
import warnings
warnings.filterwarnings("ignore", category=FutureWarning)

## 2) Créer un fichier de messages (pour l'envoi aléatoire)

Conformément à la consigne (lecture aléatoire depuis un fichier), on prépare `messages.txt` :  
- **1 ligne = 1 message**
- le serveur socket choisira **au hasard** une ligne à envoyer.


In [13]:
from pathlib import Path

messages_path = Path("messages.txt")

# Si le fichier n'existe pas, on le crée avec quelques messages d'exemple.
if not messages_path.exists():
    messages_path.write_text(
        "\n".join([
            "spark streaming dstream",
            "spark spark streaming",
            "big data spark",
            "stream processing with spark",
            "hello spark streaming"
        ]) + "\n",
        encoding="utf-8"
    )

print("Fichier utilisé :", messages_path.resolve())
print("Contenu :")
print(messages_path.read_text(encoding="utf-8"))

Fichier utilisé : /content/messages.txt
Contenu :
spark streaming dstream
spark spark streaming
big data spark
stream processing with spark
hello spark streaming



## 3) Créer un serveur socket simulant un flux de données

Le serveur :  
- écoute sur **localhost:9999**  
- accepte une connexion (Spark se connectera automatiquement)  
- envoie **une ligne aléatoire** du fichier toutes les **2 secondes**.


In [4]:
import socket
import time
import threading
import random

HOST = "localhost"
PORT = 9999
SEND_EVERY_SECONDS = 2

def start_socket_server(file_path: str, host: str = HOST, port: int = PORT, interval_s: float = SEND_EVERY_SECONDS):
    # Charger les messages (lignes non vides)
    lines = [ln.strip() for ln in Path(file_path).read_text(encoding="utf-8").splitlines() if ln.strip()]
    if not lines:
        raise ValueError("Le fichier ne contient aucune ligne non vide.")

    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    # Utile si tu relances la cellule : permet de réutiliser le port rapidement.
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

    server.bind((host, port))
    server.listen(1)
    print(f"[SocketServer] En écoute sur {host}:{port} ...")

    conn, addr = server.accept()
    print(f"[SocketServer] Client connecté : {addr}")

    try:
        while True:
            msg = random.choice(lines)
            conn.sendall((msg + "\n").encode("utf-8"))
            print(f"[SocketServer] envoyé -> {msg}")
            time.sleep(interval_s)
    except Exception as e:
        print("[SocketServer] arrêt :", repr(e))
    finally:
        try:
            conn.close()
        except:
            pass
        try:
            server.close()
        except:
            pass

# Lancer le serveur dans un thread (daemon=True => s'arrête avec le runtime)
server_thread = threading.Thread(
    target=start_socket_server,
    kwargs={"file_path": str(messages_path)},
    daemon=True
)
server_thread.start()

print("Serveur socket démarré.")

Serveur socket démarré.


## 4) Créer un `StreamingContext`

Le TP demande un calcul **toutes les 5 secondes** → `batch_sec = 5`.

⚠️ Selon les versions, `ssc.batchDuration` peut ne pas exister (AttributeError).  
On affiche donc **la valeur qu'on a définie** (et on peut aussi lire via `_jssc` si besoin).


In [5]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Si un StreamingContext existait déjà (ex. re-run), on essaie de l'arrêter proprement.
try:
    if "ssc" in globals() and ssc is not None:
        ssc.stop(stopSparkContext=False, stopGraceFully=True)
except Exception:
    pass

# SparkContext unique (Colab)
sc = SparkContext.getOrCreate()
sc.setLogLevel("ERROR")

batch_sec = 5
ssc = StreamingContext(sc, batch_sec)

print("StreamingContext créé avec batchDuration =", batch_sec, "secondes")

# Optionnel : récupérer la durée via l'objet Java (si disponible)
try:
    print("Batch duration (via _jssc) =", ssc._jssc.batchDuration().milliseconds() / 1000, "secondes")
except Exception:
    pass

[SocketServer] En écoute sur localhost:9999 ...
StreamingContext créé avec batchDuration = 5 secondes


## 5) Lire les données sous forme de DStream + WordCount

Pipeline demandé :  
```python
lines = ssc.socketTextStream("localhost", 9999)
words = lines.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda w: (w, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)
counts.pprint()
```


In [6]:
# Lecture du flux texte depuis le socket
lines = ssc.socketTextStream(HOST, PORT)

# Transformations : découpe en mots, puis (mot,1), puis somme par mot
words = lines.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda w: (w, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)

# Affichage dans la console
counts.pprint()

## 6) Démarrer le streaming et afficher les résultats (30 secondes)

Le TP demande :
- `ssc.start()`
- `ssc.awaitTerminationOrTimeout(30)`
- `ssc.stop(stopSparkContext=False)`


In [7]:
# Démarrage
ssc.start()

# Laisser tourner 30 secondes
ssc.awaitTerminationOrTimeout(30)

# Arrêt propre (on garde le SparkContext)
ssc.stop(stopSparkContext=False, stopGraceFully=True)

print("✅ Streaming terminé.")

[SocketServer] Client connecté : ('127.0.0.1', 55458)
[SocketServer] envoyé -> hello spark streaming
[SocketServer] envoyé -> spark spark streaming
[SocketServer] envoyé -> stream processing with spark
[SocketServer] envoyé -> big data spark
[SocketServer] envoyé -> big data spark
-------------------------------------------
Time: 2026-01-21 21:25:30
-------------------------------------------
('hello', 1)
('streaming', 2)
('spark', 3)

-------------------------------------------
Time: 2026-01-21 21:25:35
-------------------------------------------
('with', 1)
('big', 1)
('stream', 1)
('processing', 1)
('spark', 2)
('data', 1)

[SocketServer] envoyé -> hello spark streaming
[SocketServer] envoyé -> big data spark
[SocketServer] envoyé -> spark streaming dstream
-------------------------------------------
Time: 2026-01-21 21:25:40
-------------------------------------------
('big', 2)
('hello', 1)
('streaming', 1)
('data', 2)
('spark', 3)

[SocketServer] envoyé -> big data spark
[SocketS

## Questions (réponses)

**4) Qu’est-ce qu’un micro-batch dans DStream ?**  
Un **micro-batch** est un petit lot de données collectées pendant une fenêtre de temps fixe (ex. **5 secondes**). Spark Streaming traite chaque micro-batch comme un job Spark classique.

**5) Sur quelle structure repose un DStream ?**  
Un DStream repose sur une **séquence de RDD** : à chaque intervalle de batch, Spark crée un **RDD** contenant les données reçues sur cette fenêtre.

**6) Quelle est la durée du batch utilisée ?**  
La durée du batch est **5 secondes** (`StreamingContext(sc, 5)`).
