In [2]:
import threading
import socket
import time
import requests
import json

from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext

In [3]:
# === CONFIGURATION ===
API_KEY = "a44791367a5c3a1336e2a70a7d9a6b8d"
CITY = "Dakar"
INTERVAL = 10  # secondes
HOST = "localhost"
PORT = 9999

In [4]:
# === FONCTION DE RÉCUPÉRATION DES DONNÉES MÉTÉO ===
def fetch_weather_data():
    url = f"http://api.openweathermap.org/data/2.5/weather?q={CITY}&appid={API_KEY}&units=metric"
    response = requests.get(url)
    data = response.json()
    if response.status_code == 200:
        weather_info = {
            "city": data["name"],
            "temp": data["main"]["temp"],
            "humidity": data["main"]["humidity"],
            "description": data["weather"][0]["description"],
            "timestamp": time.strftime('%Y-%m-%d %H:%M:%S')
        }
        return json.dumps(weather_info)
    else:
        return json.dumps({"error": "Erreur API", "message": data.get("message", "")})

In [5]:
# === THREAD SOCKET POUR ENVOYER LES DONNÉES EN TEMPS RÉEL ===
def start_socket_server():
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.bind((HOST, PORT))
    server_socket.listen(1)
    print(f"✅ Socket server actif sur {HOST}:{PORT}, en attente de Spark...")
    client_socket, addr = server_socket.accept()
    print(f"💡 Spark Streaming connecté depuis {addr}")

    while True:
        weather_json = fetch_weather_data()
        print(f"📡 Envoi météo : {weather_json}")
        client_socket.send((weather_json + "\n").encode("utf-8"))
        time.sleep(INTERVAL)

In [6]:
# === DÉMARRAGE DU THREAD SOCKET ===
threading.Thread(target=start_socket_server, daemon=True).start()

In [7]:
# === CONFIGURATION SPARK STREAMING ===
spark = SparkSession.builder.appName("StreamingMeteo").getOrCreate()
sc = spark.sparkContext
ssc = StreamingContext(sc, INTERVAL)

✅ Socket server actif sur localhost:9999, en attente de Spark...




In [8]:
# === CRÉATION DU FLUX DE DONNÉES DEPUIS LE SOCKET ===
lines = ssc.socketTextStream(HOST, PORT)

In [9]:
# === TRAITEMENT DES DONNÉES MÉTÉO EN TEMPS RÉEL ===
def traiter_ligne(rdd):
    if not rdd.isEmpty():
        df = spark.read.json(rdd)
        df.select("timestamp", "city", "temp", "humidity", "description").show(truncate=False)

lines.foreachRDD(traiter_ligne)

In [11]:
# === LANCEMENT DU STREAMING ===
ssc.start()
ssc.awaitTermination()

📡 Envoi météo : {"city": "Dakar", "temp": 27.92, "humidity": 77, "description": "scattered clouds", "timestamp": "2025-07-15 19:20:16"}
+-------------------+-----+-----+--------+----------------+
|timestamp          |city |temp |humidity|description     |
+-------------------+-----+-----+--------+----------------+
|2025-07-15 19:20:16|Dakar|27.92|77      |scattered clouds|
+-------------------+-----+-----+--------+----------------+

📡 Envoi météo : {"city": "Dakar", "temp": 27.92, "humidity": 78, "description": "scattered clouds", "timestamp": "2025-07-15 19:20:26"}
+-------------------+-----+-----+--------+----------------+
|timestamp          |city |temp |humidity|description     |
+-------------------+-----+-----+--------+----------------+
|2025-07-15 19:20:26|Dakar|27.92|78      |scattered clouds|
+-------------------+-----+-----+--------+----------------+

📡 Envoi météo : {"city": "Dakar", "temp": 27.92, "humidity": 77, "description": "scattered clouds", "timestamp": "2025-07-15 1

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/socket.py", line 718, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

In [None]:
ssc.stop()