In [48]:
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql.functions import col , when
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
import pickle


spark = SparkSession.builder.appName("PredictionSession").config("spark.local.dir", "C:/temp").config("spark.executor.memory", "4g").config("spark.driver.memory", "4g").getOrCreate()

dataset_path = "../data/Transformed_GlobalFireBurnedArea_pandas.csv"
df = spark.read.csv(dataset_path, header=True, inferSchema=True)

# Afficher les 5 premières lignes des données initiales
print("===== Données Initiales =====")
df.show(5)

===== Données Initiales =====
+--------+-----------+----------+-----------+-------------+-----------+--------------------+----------+-------------------+------+-------------+
|      ID|Initialdate| Finaldate|    Area_ha|      Area_m2|   Area_Km2|         CountryName| Continent|             Region|Season|Duration_days|
+--------+-----------+----------+-----------+-------------+-----------+--------------------+----------+-------------------+------+-------------+
|25078590| 2022-01-09|2022-02-06|50232.10763|5.023210763E8|502.3210763|               Ghana|    Africa|     Western Africa|Winter|           28|
|25079092| 2022-01-11|2022-02-08|82380.29538|8.238029538E8|823.8029538|             Nigeria|    Africa|     Western Africa|Winter|           28|
|25079113| 2022-01-11|2022-02-03|36851.12748|3.685112748E8|368.5112748|             Nigeria|    Africa|     Western Africa|Winter|           23|
|25083241| 2022-01-03|2022-02-12|43303.63519|4.330363519E8|433.0363519|             Nigeria|    Afri

In [49]:
df = df.withColumn(
    "severity",
    when(col("Area_Km2") < 50, "Low")
    .when((col("Area_Km2") >= 50) & (col("Area_Km2") < 1000), "Medium")
    .otherwise("High")
)

# Compter le nombre d'incendies par catégorie
df.groupBy("severity").count().show()


+--------+------+
|severity| count|
+--------+------+
|  Medium|  6668|
|     Low|312489|
|    High|   121|
+--------+------+



In [50]:
indexer = StringIndexer(
    inputCols=["severity", "Season", "CountryName", "Region", "Continent"],
    outputCols=["severity_index", "Season_index", "CountryName_index", "Region_index", "Continent_index"]
)
df = indexer.fit(df).transform(df)
print("===== Après encodage des colonnes catégoriques =====")
df.select("severity", "severity_index", "Season", "Season_index").show(5)

===== Après encodage des colonnes catégoriques =====
+--------+--------------+------+------------+
|severity|severity_index|Season|Season_index|
+--------+--------------+------+------------+
|  Medium|           1.0|Winter|         2.0|
|  Medium|           1.0|Winter|         2.0|
|  Medium|           1.0|Winter|         2.0|
|  Medium|           1.0|Winter|         2.0|
|  Medium|           1.0|Winter|         2.0|
+--------+--------------+------+------------+
only showing top 5 rows



In [51]:
df.groupBy("Season").count().show()

+------+------+
|Season| count|
+------+------+
|Winter| 79605|
|Spring| 50249|
|Summer|108957|
|Autumn| 80467|
+------+------+



In [52]:
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col, when

# Vérifier et gérer les valeurs nulles dans les colonnes
df = df.withColumn("Season_index", when(col("Season_index").isNull(), 0).otherwise(col("Season_index")))
df = df.withColumn("Duration_days", when(col("Duration_days").isNull(), 0).otherwise(col("Duration_days").cast("double")))
df = df.withColumn("CountryName_index", when(col("CountryName_index").isNull(), 0).otherwise(col("CountryName_index")))
df = df.withColumn("Region_index", when(col("Region_index").isNull(), 0).otherwise(col("Region_index")))
df = df.withColumn("Continent_index", when(col("Continent_index").isNull(), 0).otherwise(col("Continent_index")))

# Vérifier le schéma après gestion des valeurs nulles
print("Schéma après gestion des valeurs nulles :")
df.printSchema()

# Définir les colonnes d'entrée pour VectorAssembler
feature_cols = ["Season_index", "Duration_days", "CountryName_index", "Region_index", "Continent_index"]

# Initialiser VectorAssembler
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Appliquer la transformation
df = assembler.transform(df)

# Afficher les résultats
print("===== Après assemblage des caractéristiques =====")
df.select("features", "severity_index", "Area_Km2").show(5, truncate=False)


Schéma après gestion des valeurs nulles :
root
 |-- ID: integer (nullable = true)
 |-- Initialdate: date (nullable = true)
 |-- Finaldate: date (nullable = true)
 |-- Area_ha: double (nullable = true)
 |-- Area_m2: double (nullable = true)
 |-- Area_Km2: double (nullable = true)
 |-- CountryName: string (nullable = true)
 |-- Continent: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Season: string (nullable = true)
 |-- Duration_days: double (nullable = true)
 |-- severity: string (nullable = false)
 |-- severity_index: double (nullable = false)
 |-- Season_index: double (nullable = false)
 |-- CountryName_index: double (nullable = false)
 |-- Region_index: double (nullable = false)
 |-- Continent_index: double (nullable = false)

===== Après assemblage des caractéristiques =====
+-----------------------+--------------+-----------+
|features               |severity_index|Area_Km2   |
+-----------------------+--------------+-----------+
|[2.0,28.0,17.0,2.0,0.0]|1.0 

In [53]:
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)
print("===== Données d'entraînement =====")
train_data.show(5, truncate=False)
print("===== Données de test =====")
test_data.show(5, truncate=False)



===== Données d'entraînement =====
+--------+-----------+----------+-----------+-------------+-----------+---------------------------+------------+---------------------+------+-------------+--------+--------------+------------+-----------------+------------+---------------+------------------------+
|ID      |Initialdate|Finaldate |Area_ha    |Area_m2      |Area_Km2   |CountryName                |Continent   |Region               |Season|Duration_days|severity|severity_index|Season_index|CountryName_index|Region_index|Continent_index|features                |
+--------+-----------+----------+-----------+-------------+-----------+---------------------------+------------+---------------------+------+-------------+--------+--------------+------------+-----------------+------------+---------------+------------------------+
|25036465|2022-01-09 |2022-01-14|214.8744509|2148744.509  |2.148744509|   United States of America|    Americas|     Northern America|Winter|5.0          |Low     |0.0   

In [54]:


# Entraîner le modèle de classification
print("===== Objectif 1 : Prédiction de la gravité =====")
rf_classifier = RandomForestClassifier(labelCol="severity_index", featuresCol="features",maxBins=150)
rf_model = rf_classifier.fit(train_data)


===== Objectif 1 : Prédiction de la gravité =====


In [55]:
import os

# Créer un dossier pour sauvegarder les modèles (si ce n'est pas déjà fait)
model_save_dir = 'modele_sauvegarder'
os.makedirs(model_save_dir, exist_ok=True)

# Sauvegarder le modèle Spark ML
rf_model.write().overwrite().save(os.path.join(model_save_dir, 'rf_model_classification'))
print("Le modèle de classification a été sauvegardé sous 'rf_model_classification'.")

Le modèle de classification a été sauvegardé sous 'rf_model_classification'.


In [56]:
# Faire des prédictions
predictions_classifier = rf_model.transform(test_data)
print("===== Prédictions pour la classification =====")
predictions_classifier.select("severity", "severity_index", "prediction", "probability").show(5)

# Évaluer les performances
classification_evaluator = MulticlassClassificationEvaluator(
    labelCol="severity_index", predictionCol="prediction", metricName="accuracy"
)
accuracy = classification_evaluator.evaluate(predictions_classifier)
print(f"Précision du modèle de classification : {accuracy:.2f}")

===== Prédictions pour la classification =====
+--------+--------------+----------+--------------------+
|severity|severity_index|prediction|         probability|
+--------+--------------+----------+--------------------+
|     Low|           0.0|       0.0|[0.98794056009154...|
|     Low|           0.0|       0.0|[0.98794056009154...|
|     Low|           0.0|       0.0|[0.98794056009154...|
|     Low|           0.0|       0.0|[0.98794056009154...|
|     Low|           0.0|       0.0|[0.98794056009154...|
+--------+--------------+----------+--------------------+
only showing top 5 rows

Précision du modèle de classification : 0.98


In [57]:
import sys
import json
from pyspark.ml.classification import RandomForestClassificationModel
from kafka import KafkaProducer

# Patch kafka-python for Python 3.12 and later versions
if sys.version_info >= (3, 12, 0):
    import six
    sys.modules['kafka.vendor.six.moves'] = six.moves

# 1. Load the saved ML model
model_save_dir = 'modele_sauvegarder'
rf_model_path = f"{model_save_dir}/rf_model_classification"
rf_model = RandomForestClassificationModel.load(rf_model_path)
print(f"Model loaded from {rf_model_path}")

# 2. Use `test_data` defined earlier in the notebook (already split)
print("===== Test Data =====")
test_data.show(5, truncate=False)

# 3. Make predictions on the test data
predictions_classifier = rf_model.transform(test_data)
print("===== Predictions for Classification =====")
predictions_classifier.select("severity", "severity_index", "prediction", "probability").show(5)

# 4. Configure Kafka Producer
producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    value_serializer=lambda x: json.dumps(x).encode('utf-8')
)

# 5. Send predictions to the Kafka topic
for row in predictions_classifier.collect():
    message = {
        'severity': row['severity'],
        'severity_index': row['severity_index'],
        'prediction': row['prediction'],
        'probability': row['probability'].toArray().tolist()  # Convert the vector to a list
    }
    producer.send('severity-predictions', value=message)

# 6. Close the Producer after sending the messages
producer.flush()
producer.close()
print("Predictions have been sent to Kafka topic.")

Model loaded from modele_sauvegarder/rf_model_classification
===== Test Data =====
+--------+-----------+----------+-----------+-----------+-----------+---------------------------+------------+---------------------+------+-------------+--------+--------------+------------+-----------------+------------+---------------+-----------------------+
|ID      |Initialdate|Finaldate |Area_ha    |Area_m2    |Area_Km2   |CountryName                |Continent   |Region               |Season|Duration_days|severity|severity_index|Season_index|CountryName_index|Region_index|Continent_index|features               |
+--------+-----------+----------+-----------+-----------+-----------+---------------------------+------------+---------------------+------+-------------+--------+--------------+------------+-----------------+------------+---------------+-----------------------+
|25036472|2022-01-03 |2022-01-07|107.3939977|1073939.977|1.073939977|   United States of America|    Americas|     Northern America

In [58]:
from kafka import KafkaConsumer
import json

# Configurer le Kafka Consumer
consumer = KafkaConsumer(
    'severity-predictions',  # Nom du topic
    bootstrap_servers='localhost:9092',
    value_deserializer=lambda v: json.loads(v.decode('utf-8')),
    enable_auto_commit=True,
    auto_offset_reset='earliest'  # Lire les messages depuis le début
)

# Limite des lignes à lire
max_rows = 5
count = 0

print("Lecture des messages depuis le topic 'severity-predictions'...")

# Lire les messages depuis Kafka en boucle
while count < max_rows:
    # Poller les messages (timeout de 1 seconde)
    messages = consumer.poll(timeout_ms=1000, max_records=5)

    # Vérifier s'il y a des messages
    if messages:
        for topic_partition, msgs in messages.items():
            for message in msgs:
                # Afficher les données reçues
                print(f"Données reçues : {message.value}")

                # Incrémenter le compteur
                count += 1

                # Vérifier si la limite est atteinte
                if count >= max_rows:
                    print(f"Limite de {max_rows} messages atteinte. Arrêt de la consommation.")
                    break
    else:
        print("Aucun message reçu, réessayer...")


Lecture des messages depuis le topic 'severity-predictions'...
Données reçues : {'severity': 'Low', 'severity_index': 0.0, 'prediction': 0.0, 'probability': [0.9880936249292329, 0.011848084557354189, 5.829051341284061e-05]}
Données reçues : {'severity': 'Low', 'severity_index': 0.0, 'prediction': 0.0, 'probability': [0.9880936249292329, 0.011848084557354189, 5.829051341284061e-05]}
Données reçues : {'severity': 'Low', 'severity_index': 0.0, 'prediction': 0.0, 'probability': [0.9880936249292329, 0.011848084557354189, 5.829051341284061e-05]}
Données reçues : {'severity': 'Low', 'severity_index': 0.0, 'prediction': 0.0, 'probability': [0.9880936249292329, 0.011848084557354189, 5.829051341284061e-05]}
Données reçues : {'severity': 'Low', 'severity_index': 0.0, 'prediction': 0.0, 'probability': [0.9880936249292329, 0.011848084557354189, 5.829051341284061e-05]}
Limite de 5 messages atteinte. Arrêt de la consommation.


In [59]:
test_data.groupBy("severity_index").count().show()

+--------------+-----+
|severity_index|count|
+--------------+-----+
|           0.0|62212|
|           1.0| 1335|
|           2.0|   24|
+--------------+-----+



In [60]:
# ---------------------
# 1. Entraîner le modèle de régression
# ---------------------
print("===== Objectif 2 : Prédiction de la taille =====")

# Modèle de RandomForestRegressor avec paramètres optimisés
rf_regressor = RandomForestRegressor(
    labelCol="Area_Km2",    # Colonne cible
    featuresCol="features", # Colonne des caractéristiques
    maxBins=150,            # Nombre maximum de bins (important pour les variables catégoriques)
    numTrees=100,           # Nombre d'arbres
    maxDepth=10             # Profondeur maximale des arbres
)

# Entraîner le modèle sur les données d'entraînement
rf_model_regressor = rf_regressor.fit(train_data)


===== Objectif 2 : Prédiction de la taille =====


In [61]:
# Sauvegarder le modèle Spark ML
rf_model_regressor.write().overwrite().save(os.path.join(model_save_dir, 'rf_model_regression'))
print("Le modèle de régression a été sauvegardé sous 'rf_model_regression'.")

Le modèle de régression a été sauvegardé sous 'rf_model_regression'.


In [62]:
# 2. Faire des prédictions
# ---------------------
predictions_regressor = rf_model_regressor.transform(test_data)

# Afficher un échantillon des prédictions
print("===== Prédictions pour la régression =====")
predictions_regressor.select("Area_Km2", "prediction").show(10, truncate=False)

# ---------------------
# 3. Évaluer les performances
# ---------------------
# Initialiser l'évaluateur pour RMSE et MAE
regression_evaluator_rmse = RegressionEvaluator(
    labelCol="Area_Km2", predictionCol="prediction", metricName="rmse"
)
regression_evaluator_mae = RegressionEvaluator(
    labelCol="Area_Km2", predictionCol="prediction", metricName="mae"
)

# Calculer RMSE et MAE
rmse = regression_evaluator_rmse.evaluate(predictions_regressor)
mae = regression_evaluator_mae.evaluate(predictions_regressor)

# Afficher les métriques d'évaluation
print("===== Évaluation des performances =====")
print(f"RMSE (Root Mean Squared Error) : {rmse:.2f}")
print(f"MAE (Mean Absolute Error) : {mae:.2f}")

# ---------------------
# 4. Importance des caractéristiques
# ---------------------
print("===== Importance des caractéristiques =====")
feature_importances = rf_model_regressor.featureImportances
for col, importance in zip(feature_cols, feature_importances):
    print(f"{col}: {importance:.2f}")

===== Prédictions pour la régression =====
+-----------+------------------+
|Area_Km2   |prediction        |
+-----------+------------------+
|1.073939977|4.109420932189103 |
|3.221117395|4.488870615041736 |
|1.931822245|4.747800911858084 |
|1.07226946 |3.7202377130998827|
|1.28670946 |3.583961881666361 |
|2.78783539 |4.990263400145997 |
|7.290904512|5.30571257535233  |
|3.429793055|3.772919339617944 |
|3.643775961|4.990263400145997 |
|2.572134754|5.983122490676127 |
+-----------+------------------+
only showing top 10 rows

===== Évaluation des performances =====
RMSE (Root Mean Squared Error) : 46.98
MAE (Mean Absolute Error) : 6.84
===== Importance des caractéristiques =====
Season_index: 0.09
Duration_days: 0.69
CountryName_index: 0.15
Region_index: 0.06
Continent_index: 0.01


In [63]:
predictions_regressor.select("Area_Km2", "prediction").show(20)


+-----------+------------------+
|   Area_Km2|        prediction|
+-----------+------------------+
|1.073939977| 4.109420932189103|
|3.221117395| 4.488870615041736|
|1.931822245| 4.747800911858084|
| 1.07226946|3.7202377130998827|
| 1.28670946| 3.583961881666361|
| 2.78783539| 4.990263400145997|
|7.290904512|  5.30571257535233|
|3.429793055| 3.772919339617944|
|3.643775961| 4.990263400145997|
|2.572134754| 5.983122490676127|
|2.357561481|3.9527324413517175|
|1.500145118|  3.58047076672232|
|1.928687191|  3.58047076672232|
|3.213587668| 4.196112729722651|
|1.284796405|3.7202377130998827|
| 2.35506137|3.5998953442460864|
|1.283970857| 3.772919339617944|
|1.069900176|3.7202377130998827|
|3.867095267| 5.313044049731388|
|1.502286694| 3.855159253437626|
+-----------+------------------+
only showing top 20 rows

