Exercice 1

In [None]:
from kafka import KafkaProducer
import json

producer = KafkaProducer(bootstrap_servers='localhost:9092',
                         value_serializer=lambda v: json.dumps(v).encode('utf-8'))

message = {"msg": "Hello Kafka"}
producer.send('weather_stream', message)
producer.flush()
print("Message envoyé au topic weather_stream")


Exercice 2

In [None]:
from kafka import KafkaConsumer
import json
import sys

if len(sys.argv) > 1:
    topic = sys.argv[1]
else:
    topic = 'weather_stream'

consumer = KafkaConsumer(
    topic,
    bootstrap_servers='localhost:9092',            
    auto_offset_reset='earliest',
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

print(f"Lecture des messages du topic {topic}")

for message in consumer:
    print("Message reçu:", message.value)


Exercice 3

In [None]:
import sys
import requests
from kafka import KafkaProducer
import json
import time

def get_weather(lat, lon):
    url = f"https://api.open-meteo.com/v1/forecast?latitude={lat}&longitude={lon}&current_weather=true"
    response = requests.get(url)
    response.raise_for_status()
    return response.json()

def run_producer(lat, lon):
    producer = KafkaProducer(
        bootstrap_servers='localhost:9092',
        value_serializer=lambda v: json.dumps(v).encode('utf-8')
    )
    topic = 'weather_stream'

    print(f"Streaming météo pour latitude={lat}, longitude={lon} vers le topic {topic}")

    try:
        while True:
            weather_data = get_weather(lat, lon)
            current_weather = weather_data.get('current_weather', {})
            if current_weather:
                message = {
                    'latitude': lat,
                    'longitude': lon,
                    'weather': current_weather
                }
                producer.send(topic, message)
                producer.flush()
                print("Message envoyé:", message)
            else:
                print("Pas de données météo reçues")
            time.sleep(60)  
    except KeyboardInterrupt:
        print("Arrêt du producteur")
    finally:
        producer.close()

if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage : python current_weather_producer.py <latitude> <longitude>")
        sys.exit(1)
    lat = float(sys.argv[1])
    lon = float(sys.argv[2])
    run_producer(lat, lon)


Exercice 4

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, current_timestamp, expr, when
from pyspark.sql.types import StructType, StructField, DoubleType, StringType, TimestampType

spark = SparkSession.builder \
    .appName("WeatherStreamTransform") \
    .getOrCreate()

schema = StructType([
    StructField("latitude", DoubleType()),
    StructField("longitude", DoubleType()),
    StructField("weather", StructType([
        StructField("temperature", DoubleType()),
        StructField("windspeed", DoubleType()),
        StructField("time", StringType()) 
    ]))
])

df_raw = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "weather_stream") \
    .option("startingOffsets", "earliest") \
    .load()

df_json = df_raw.selectExpr("CAST(value AS STRING) as json_str")

df_parsed = df_json.select(from_json(col("json_str"), schema).alias("data")).select("data.*")

df_weather = df_parsed.select(
    col("latitude"),
    col("longitude"),
    expr("cast(weather.temperature as double)").alias("temperature"),
    expr("cast(weather.windspeed as double)").alias("windspeed"),
    expr("cast(weather.time as timestamp)").alias("event_time")
).withColumn("event_time", col("event_time")) \
 .withColumn("temperature", col("temperature")) \
 .withColumn("windspeed", col("windspeed")) \
 .withColumn("wind_alert_level", when(col("windspeed") < 10, "level_0")
                .when((col("windspeed") >= 10) & (col("windspeed") <= 20), "level_1")
                .otherwise("level_2")) \
 .withColumn("heat_alert_level", when(col("temperature") < 25, "level_0")
                .when((col("temperature") >= 25) & (col("temperature") <= 35), "level_1")
                .otherwise("level_2"))

from pyspark.sql.functions import to_json, struct

df_out = df_weather.select(to_json(struct(
    "event_time",
    "temperature",
    "windspeed",
    "wind_alert_level",
    "heat_alert_level"
)).alias("value"))

query = df_out.writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("topic", "weather_transformed") \
    .option("checkpointLocation", "/tmp/spark_checkpoint_weather_transform") \
    .start()

query.awaitTermination()


Exercice 5

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, window, count, avg, min, max
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

spark = SparkSession.builder.appName("WeatherAggregates").getOrCreate()

schema = StructType([
    StructField("event_time", TimestampType()),
    StructField("temperature", DoubleType()),
    StructField("windspeed", DoubleType()),
    StructField("wind_alert_level", StringType()),
    StructField("heat_alert_level", StringType()),
    StructField("city", StringType()),
    StructField("country", StringType()),
])

df_raw = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "weather_transformed") \
    .option("startingOffsets", "earliest") \
    .load()

df_json = df_raw.selectExpr("CAST(value AS STRING) as json_str")

df_parsed = df_json.select(from_json(col("json_str"), schema).alias("data")).select("data.*")

windowed_df = df_parsed.withWatermark("event_time", "10 minutes") \
    .groupBy(
        window(col("event_time"), "5 minutes", "1 minute"),
        col("wind_alert_level"),
        col("heat_alert_level"),
        col("city"),
        col("country")
    ).agg(
        count("*").alias("count_alerts"),
        avg("temperature").alias("avg_temperature"),
        min("temperature").alias("min_temperature"),
        max("temperature").alias("max_temperature")
    )

query = windowed_df.writeStream \
    .outputMode("update") \
    .format("console") \
    .option("truncate", "false") \
    .start()

query.awaitTermination()

Exercice 6


In [None]:
import sys
import requests
from kafka import KafkaProducer
import json
import time

def geocode_city(city_name):
    url = f"https://geocoding-api.open-meteo.com/v1/search?name={city_name}&count=1"
    response = requests.get(url)
    response.raise_for_status()
    results = response.json().get("results")
    if not results:
        raise ValueError(f"Aucune correspondance trouvée pour la ville '{city_name}'")
    data = results[0]
    return {
        "latitude": data["latitude"],
        "longitude": data["longitude"],
        "city": data["name"],
        "country": data.get("country", "Unknown")
    }

def get_weather(lat, lon):
    url = f"https://api.open-meteo.com/v1/forecast?latitude={lat}&longitude={lon}&current_weather=true"
    response = requests.get(url)
    response.raise_for_status()
    return response.json()

def run_producer(city_name):
    coords = geocode_city(city_name)
    lat = coords["latitude"]
    lon = coords["longitude"]
    city = coords["city"]
    country = coords["country"]

    producer = KafkaProducer(
        bootstrap_servers='localhost:9092',
        value_serializer=lambda v: json.dumps(v).encode('utf-8')
    )
    topic = 'weather_stream'

    print(f"Streaming météo pour {city}, {country} (lat={lat}, lon={lon}) vers le topic {topic}")

    try:
        while True:
            weather_data = get_weather(lat, lon)
            current_weather = weather_data.get('current_weather', {})
            if current_weather:
                message = {
                    'city': city,
                    'country': country,
                    'latitude': lat,
                    'longitude': lon,
                    'weather': current_weather
                }
                producer.send(topic, message)
                producer.flush()
                print("Message envoyé:", message)
            else:
                print("Pas de données météo reçues")
            time.sleep(60)  
    except KeyboardInterrupt:
        print("Arrêt du producteur")
    finally:
        producer.close()

if __name__ == "__main__":
    if len(sys.argv) != 2:
        print("Usage : python current_weather_producer.py <nom_ville>")
        sys.exit(1)
    city_name = sys.argv[1]
    run_producer(city_name)


Exercice 7

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

spark = SparkSession.builder.appName("KafkaToHDFS").getOrCreate()

schema = StructType([
    StructField("event_time", TimestampType()),
    StructField("temperature", DoubleType()),
    StructField("windspeed", DoubleType()),
    StructField("wind_alert_level", StringType()),
    StructField("heat_alert_level", StringType()),
    StructField("city", StringType()),
    StructField("country", StringType())
])

df_raw = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "weather_transformed") \
    .option("startingOffsets", "earliest") \
    .load()

df_json = df_raw.selectExpr("CAST(value AS STRING) as json_str")

df_parsed = df_json.select(from_json(col("json_str"), schema).alias("data")).select("data.*")

hdfs_base_path = "/hdfs-data"

query = df_parsed.writeStream \
    .format("json") \
    .option("path", hdfs_base_path) \
    .option("checkpointLocation", "/tmp/spark_checkpoint_kafka_hdfs") \
    .partitionBy("country", "city") \
    .outputMode("append") \
    .start()

query.awaitTermination()

Exercice 8

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp, count, avg, min, max
import matplotlib.pyplot as plt

spark = SparkSession.builder.appName("WeatherLogsVisualization").getOrCreate()

df_logs = spark.read.json("/hdfs-data/**/**/*.json")

df = df_logs.withColumn("event_time", to_timestamp("event_time"))

temp_time_df = df.select("event_time", "temperature").orderBy("event_time")
pandas_temp_time = temp_time_df.toPandas()

plt.figure(figsize=(10,4))
plt.plot(pandas_temp_time["event_time"], pandas_temp_time["temperature"], label="Température (°C)")
plt.title("Évolution de la température au fil du temps")
plt.xlabel("Temps")
plt.ylabel("Température")
plt.xticks(rotation=45)
plt.legend()
plt.tight_layout()
plt.show()

windspeed_time_df = df.select("event_time", "windspeed").orderBy("event_time")
pandas_windspeed_time = windspeed_time_df.toPandas()

plt.figure(figsize=(10,4))
plt.plot(pandas_windspeed_time["event_time"], pandas_windspeed_time["windspeed"], label="Vitesse du vent (m/s)", color="orange")
plt.title("Évolution de la vitesse du vent au fil du temps")
plt.xlabel("Temps")
plt.ylabel("Vitesse du vent")
plt.xticks(rotation=45)
plt.legend()
plt.tight_layout()
plt.show()

alerts_count_df = df.groupBy("wind_alert_level", "heat_alert_level").count()
alerts_count_pd = alerts_count_df.toPandas()

fig, ax = plt.subplots(figsize=(8,4))
alerts_count_pd.plot.bar(x="wind_alert_level", y="count", ax=ax, label="Alertes Vent", color='blue', alpha=0.7)
alerts_count_pd.plot.bar(x="heat_alert_level", y="count", ax=ax, label="Alertes Chaleur", color='red', alpha=0.7)
plt.title("Nombre d’alertes vent et chaleur par niveau")
plt.xlabel("Niveau d'alerte")
plt.ylabel("Nombre d'alertes")
plt.xticks(rotation=0)
plt.legend()
plt.tight_layout()
plt.show()

if "country" in df.columns and "weather_code" in df.columns:
    from pyspark.sql.window import Window
    from pyspark.sql.functions import row_number, desc
    
    window_spec = Window.partitionBy("country").orderBy(desc("count"))
    weather_freq_df = df.groupBy("country", "weather_code").count()
    weather_ranked = weather_freq_df.withColumn("rank", row_number().over(window_spec))
    top_weather_per_country = weather_ranked.filter(col("rank") == 1).toPandas()
    
    plt.figure(figsize=(12,6))
    plt.bar(top_weather_per_country["country"], top_weather_per_country["count"])
    plt.title("Code météo le plus fréquent par pays")
    plt.xlabel("Pays")
    plt.ylabel("Nombre d'occurrences")
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.show()
else:
    print("Colonnes 'country' ou 'weather_code' absentes. Skipping code météo fréquent par pays.")

Exercice 9

In [None]:
import requests
import json
from kafka import KafkaProducer
from datetime import datetime, timedelta
import time
from pyspark.sql import SparkSession

def fetch_weather_history(lat, lon, start_date, end_date):
    url = (
        f"https://archive-api.open-meteo.com/v1/archive?"
        f"latitude={lat}&longitude={lon}"
        f"&start_date={start_date}&end_date={end_date}"
        f"&hourly=temperature_2m,windspeed_10m"
        f"&timezone=Europe/Paris"
    )
    response = requests.get(url)
    response.raise_for_status()
    return response.json()

producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

topic = "weather_history_raw"
city = "Paris"
country = "France"
latitude = 48.8566
longitude = 2.3522

start_date = "2013-01-01"
end_date = "2023-01-01"

print(f"Récupération historique météo pour {city} ({latitude}, {longitude}) de {start_date} à {end_date}")

data = fetch_weather_history(latitude, longitude, start_date, end_date)

hourly_data = data.get("hourly", {})
temp_list = hourly_data.get("temperature_2m", [])
wind_list = hourly_data.get("windspeed_10m", [])
times = hourly_data.get("time", [])

for t, temp, wind in zip(times, temp_list, wind_list):
    message = {
        "city": city,
        "country": country,
        "latitude": latitude,
        "longitude": longitude,
        "timestamp": t,
        "temperature": temp,
        "windspeed": wind
    }
    producer.send(topic, message)

    time.sleep(0.01)

producer.flush()
print("Tous les messages historiques envoyés dans Kafka")

spark = SparkSession.builder.appName("HistoryKafkaToHDFS").getOrCreate()

df_raw = spark.read.format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", topic) \
    .option("startingOffsets", "earliest") \
    .load()

from pyspark.sql.functions import from_json, col, to_timestamp
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

schema = StructType([
    StructField("city", StringType()),
    StructField("country", StringType()),
    StructField("latitude", DoubleType()),
    StructField("longitude", DoubleType()),
    StructField("timestamp", StringType()),
    StructField("temperature", DoubleType()),
    StructField("windspeed", DoubleType())
])

df_parsed = df_raw.select(from_json(col("value").cast("string"), schema).alias("data")).select("data.*")

df_final = df_parsed.withColumn("event_time", to_timestamp("timestamp")).drop("timestamp")

hdfs_path = f"/hdfsdata/{country}/{city}/weather_history_raw"

df_final.write.mode("overwrite").partitionBy("country", "city").json(hdfs_path)

print(f"Données historiques sauvegardées dans HDFS sous {hdfs_path}")