# Problématique :
Le concessionnaire a constaté la perte de données cruciales avant l'introduction du système de gestion de données centralisé, en particulier des details tels que les émissions de CO2, les tarifs énergétiques et les informations sur les bonus/malus pour la taxation des véhicules par marque et modèle. Ces données sont essentielles pour enrichir les modèles prédictifs du concessionnaire. Bien qu'un fichier CO2.csv soit disponible, il présente des lacunes : il ne répertorie pas toutes les marques et modèles de voitures présents dans le catalogue du concessionnaire, et son format diffère de celui attendu. L'objectif est donc de développer un programme Map/Reduce utilisant Hadoop ou Spark pour harmoniser le fichier CO2.csv en intégrant ces données manquantes dans la table de catalogue du concessionnaire.


Voici un exemple illustratif pour se familiariser avec le concept du système de BONUS/MALUS.

![Logo Google](https://www.autoradiogps.fr/wp-content/uploads/2019/01/InfogDFAuto.gif)


# Import des modules nécessaires

In [1]:

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import split, col, regexp_extract, substring_index, regexp_replace, round
from pyspark.sql.functions import mean, when, avg, lit,upper


# Initialisation de la session Spark

In [2]:

spark = SparkSession.builder \
    .appName("Intégration des données CO2 dans le catalogue du concessionnaire") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


# Chargement des données

In [3]:

co2_data = spark.read.csv("/MBDS_Projet/CO2.csv", header=True)

co2_data.show()

                                                                                

+---+--------------------+-------------+---------------+-----------+
|_c0|     Marque / Modele|Bonus / Malus|Rejets CO2 g/km|Cout enerie|
+---+--------------------+-------------+---------------+-----------+
|  2|AUDI E-TRON SPORT...|    -6 000€ 1|              0|      319 €|
|  3|AUDI E-TRON SPORT...|    -6 000€ 1|              0|      356 €|
|  4|AUDI E-TRON 55 (4...|    -6 000€ 1|              0|      357 €|
|  5|AUDI E-TRON 50 (3...|    -6 000€ 1|              0|      356 €|
|  6|       BMW i3 120 Ah|    -6 000€ 1|              0|      204 €|
|  7|      BMW i3s 120 Ah|    -6 000€ 1|              0|      204 €|
|  8|    CITROEN BERLINGO|    -6 000€ 1|              0|      203 €|
|  9|      CITROEN C-ZERO|    -6 000€ 1|              0|      491 €|
| 10|DS DS3 CROSSBACK ...|    -6 000€ 1|              0|      251 €|
| 11|HYUNDAI KONA elec...|    -6 000€ 1|              0|      205 €|
| 12|HYUNDAI KONA elec...|    -6 000€ 1|              0|      205 €|
| 13|JAGUAR I-PACE EV4...|    -6 0

24/05/02 12:18:35 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , Marque / Modele, Bonus / Malus, Rejets CO2 g/km, Cout enerie
 Schema: _c0, Marque / Modele, Bonus / Malus, Rejets CO2 g/km, Cout enerie
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/MBDS_Projet/CO2.csv



Ce tableau présente des données sur différents modèles de voitures, notamment leur marque et leur modèle, le montant du bonus/malus, les rejets de CO2 par kilomètre et le coût de l'énergie.

La colonne "_c0": une colonne d'index ou d'identifiant unique pour chaque enregistrement de voiture.

La colonne "Marque / Modele" indique la marque et le modèle de chaque voiture.

La colonne "Bonus / Malus" montre le montant du bonus ou du malus appliqué à chaque voiture.

La colonne "Rejets CO2 g/km" donne la quantité de CO2 rejetée par kilomètre parcouru pour chaque voiture. Dans cet extrait, toutes les valeurs semblent être zéro, ce qui peut être une caractéristique spécifique des voitures répertoriées.

La colonne "Cout enerie" représente le coût estimé de l'énergie nécessaire pour faire fonctionner chaque voiture, généralement exprimé en euros.

# Nettoyage des données

In [4]:

# Suppression du caractère de l'euro dans la colonne "Cout enerie" et renommer la colonne par "Cout Energie"
co2_data_cleaned = co2_data.withColumn("Cout Energie", F.regexp_replace("cout enerie", "€", ""))
co2_data_cleaned = co2_data_cleaned.drop("cout enerie")

# Supprimer les caractères non numériques (garder uniquement les chiffres et les décimales)
co2_data_cleaned = co2_data_cleaned.withColumn("Cout Energie", regexp_replace(col("Cout Energie"), "[^0-9.]", ""))

# Convertir la colonne en type numérique
co2_data_cleaned = co2_data_cleaned.withColumn("Cout Energie", co2_data_cleaned["Cout Energie"].cast("double"))

co2_data_cleaned.show()

+---+--------------------+-------------+---------------+------------+
|_c0|     Marque / Modele|Bonus / Malus|Rejets CO2 g/km|Cout Energie|
+---+--------------------+-------------+---------------+------------+
|  2|AUDI E-TRON SPORT...|    -6 000€ 1|              0|       319.0|
|  3|AUDI E-TRON SPORT...|    -6 000€ 1|              0|       356.0|
|  4|AUDI E-TRON 55 (4...|    -6 000€ 1|              0|       357.0|
|  5|AUDI E-TRON 50 (3...|    -6 000€ 1|              0|       356.0|
|  6|       BMW i3 120 Ah|    -6 000€ 1|              0|       204.0|
|  7|      BMW i3s 120 Ah|    -6 000€ 1|              0|       204.0|
|  8|    CITROEN BERLINGO|    -6 000€ 1|              0|       203.0|
|  9|      CITROEN C-ZERO|    -6 000€ 1|              0|       491.0|
| 10|DS DS3 CROSSBACK ...|    -6 000€ 1|              0|       251.0|
| 11|HYUNDAI KONA elec...|    -6 000€ 1|              0|       205.0|
| 12|HYUNDAI KONA elec...|    -6 000€ 1|              0|       205.0|
| 13|JAGUAR I-PACE E

24/05/02 12:18:35 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , Marque / Modele, Bonus / Malus, Rejets CO2 g/km, Cout enerie
 Schema: _c0, Marque / Modele, Bonus / Malus, Rejets CO2 g/km, Cout enerie
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/MBDS_Projet/CO2.csv


In [5]:
# Séparation de la colonne "Marque / Modèle" en "Marque" et "Modele"
#La séparation de la colonne "Marque / Modèle" en deux colonnes distinctes "Marque" et "Modèle" a été réalisée dans le but de mieux organiser et structurer les données.
# Cette séparation permet de distinguer clairement la marque du véhicule de son modèle, ce qui facilite la manipulation et l'analyse ultérieures des données.

co2_data_cleaned = co2_data_cleaned.withColumn("Marque", regexp_extract(col("Marque / Modele"), r"(\w+)", 1))
co2_data_cleaned = co2_data_cleaned.withColumn("Modele", regexp_extract(col("Marque / Modele"), r"\s(.*)", 1))
co2_data_cleaned = co2_data_cleaned.drop("Marque / Modele")

co2_data_cleaned.show()

+---+-------------+---------------+------------+--------+--------------------+
|_c0|Bonus / Malus|Rejets CO2 g/km|Cout Energie|  Marque|              Modele|
+---+-------------+---------------+------------+--------+--------------------+
|  2|    -6 000€ 1|              0|       319.0|    AUDI|E-TRON SPORTBACK ...|
|  3|    -6 000€ 1|              0|       356.0|    AUDI|E-TRON SPORTBACK ...|
|  4|    -6 000€ 1|              0|       357.0|    AUDI|E-TRON 55 (408ch)...|
|  5|    -6 000€ 1|              0|       356.0|    AUDI|E-TRON 50 (313ch)...|
|  6|    -6 000€ 1|              0|       204.0|     BMW|           i3 120 Ah|
|  7|    -6 000€ 1|              0|       204.0|     BMW|          i3s 120 Ah|
|  8|    -6 000€ 1|              0|       203.0| CITROEN|            BERLINGO|
|  9|    -6 000€ 1|              0|       491.0| CITROEN|              C-ZERO|
| 10|    -6 000€ 1|              0|       251.0|      DS|DS3 CROSSBACK E-T...|
| 11|    -6 000€ 1|              0|       205.0| HYU

24/05/02 12:18:36 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , Marque / Modele, Bonus / Malus, Rejets CO2 g/km, Cout enerie
 Schema: _c0, Marque / Modele, Bonus / Malus, Rejets CO2 g/km, Cout enerie
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/MBDS_Projet/CO2.csv


In [6]:
# Supprimer les caractères après le symbole de l'euro dans la colonne "Bonus / Malus"
co2_data_cleaned = co2_data_cleaned.withColumn("Bonus / Malus", regexp_replace("Bonus / Malus", "€.*$", "€"))


# Définir une nouvelle colonne "Devise" en extrayant la devise de la colonne "Bonus / Malus"
co2_data_cleaned = co2_data_cleaned.withColumn("Devise", regexp_extract(col("Bonus / Malus"), r'(-?\d+\s?\d*)\s?([€$])', 2))

# Supprimer le symbole de l'euro dans la colonne "Bonus / Malus"
co2_data_cleaned = co2_data_cleaned.withColumn("Bonus / Malus", regexp_replace("Bonus / Malus", "€", ""))

# Remplacer "-" par 0 seulement si la colonne entière contient un "-"
co2_data_cleaned = co2_data_cleaned.withColumn("Bonus / Malus",
                                               when(col("Bonus / Malus") == "-", "0")
                                               .otherwise(col("Bonus / Malus")))
co2_data_cleaned.show()

+---+-------------+---------------+------------+--------+--------------------+------+
|_c0|Bonus / Malus|Rejets CO2 g/km|Cout Energie|  Marque|              Modele|Devise|
+---+-------------+---------------+------------+--------+--------------------+------+
|  2|       -6 000|              0|       319.0|    AUDI|E-TRON SPORTBACK ...|     €|
|  3|       -6 000|              0|       356.0|    AUDI|E-TRON SPORTBACK ...|     €|
|  4|       -6 000|              0|       357.0|    AUDI|E-TRON 55 (408ch)...|     €|
|  5|       -6 000|              0|       356.0|    AUDI|E-TRON 50 (313ch)...|     €|
|  6|       -6 000|              0|       204.0|     BMW|           i3 120 Ah|     €|
|  7|       -6 000|              0|       204.0|     BMW|          i3s 120 Ah|     €|
|  8|       -6 000|              0|       203.0| CITROEN|            BERLINGO|     €|
|  9|       -6 000|              0|       491.0| CITROEN|              C-ZERO|     €|
| 10|       -6 000|              0|       251.0|      

24/05/02 12:18:36 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , Marque / Modele, Bonus / Malus, Rejets CO2 g/km, Cout enerie
 Schema: _c0, Marque / Modele, Bonus / Malus, Rejets CO2 g/km, Cout enerie
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/MBDS_Projet/CO2.csv


On travaille par la suite sur :

Extraction des parties numériques et des signes "+" et "-": Il utilise une expression régulière pour rechercher des motifs spécifiques dans les valeurs de la colonne "Bonus / Malus". Cette expression régulière cherche des chiffres, éventuellement suivis d'un point décimal, et peut être précédée d'un signe "+" ou "-". Les valeurs qui correspondent à ce motif sont conservées telles quelles, tandis que celles qui ne correspondent pas sont remplacées par "0".

Extraction du signe de la colonne "Bonus / Malus": Une fois que les valeurs numériques ont été extraites, le code utilise à nouveau une expression régulière pour extraire le signe "+" ou "-" de chaque valeur de la colonne "Bonus / Malus" et la stocke dans une nouvelle colonne appelée "Signe".

Suppression des caractères non numériques: Tous les caractères non numériques, à l'exception du point décimal, sont supprimés de la colonne "Bonus / Malus". Cela garantit que seuls les chiffres et les décimales restent dans les valeurs.

Conversion des valeurs en nombres décimaux: Les valeurs de la colonne "Bonus / Malus" sont converties de chaînes de caractères en nombres décimaux (doubles).

Traitement des valeurs négatives: Si le signe extrait est "-", cela signifie que la valeur était négative. Dans ce cas, les valeurs correspondantes dans la colonne "Bonus / Malus" sont multipliées par -1 pour les rendre positives.

Suppression de la colonne "Signe": Une fois que les valeurs ont été correctement transformées, la colonne "Signe" temporaire est supprimée car elle n'est plus nécessaire.

In [7]:
# Utiliser une expression régulière pour extraire les parties numériques
co2_data_cleaned = co2_data_cleaned.withColumn("Bonus / Malus",
                                               when(
                                                   co2_data_cleaned["Bonus / Malus"].rlike(r'^[+-]?\d+(\.\d+)?'),
                                                   co2_data_cleaned["Bonus / Malus"]
                                               ).otherwise("0"))

# Utiliser une expression régulière pour extraire le signe de la colonne "Bonus / Malus"
co2_data_cleaned = co2_data_cleaned.withColumn("Signe", regexp_extract(co2_data_cleaned["Bonus / Malus"], r'([+-])', 1))

# Supprimer les caractères non numériques (garder uniquement les chiffres et les décimales)
co2_data_cleaned = co2_data_cleaned.withColumn("Bonus / Malus", regexp_replace(col("Bonus / Malus"), "[^0-9.]", ""))

# Convertir la colonne de chaînes en colonne de doubles en conservant le signe
co2_data_cleaned= co2_data_cleaned.withColumn("Bonus / Malus", col("Bonus / Malus").cast("double"))

# Multiplier les valeurs de la colonne "Bonus/Malus" par -1 lorsque le signe est "-"
co2_data_cleaned = co2_data_cleaned.withColumn("Bonus / Malus",
                                               when(col("Signe") == "-", col("Bonus / Malus") * -1)
                                               .otherwise(col("Bonus / Malus")))

# Supprimer la colonne "Signe"
co2_data_cleaned = co2_data_cleaned.drop("Signe")

co2_data_cleaned.show()

+---+-------------+---------------+------------+--------+--------------------+------+
|_c0|Bonus / Malus|Rejets CO2 g/km|Cout Energie|  Marque|              Modele|Devise|
+---+-------------+---------------+------------+--------+--------------------+------+
|  2|      -6000.0|              0|       319.0|    AUDI|E-TRON SPORTBACK ...|     €|
|  3|      -6000.0|              0|       356.0|    AUDI|E-TRON SPORTBACK ...|     €|
|  4|      -6000.0|              0|       357.0|    AUDI|E-TRON 55 (408ch)...|     €|
|  5|      -6000.0|              0|       356.0|    AUDI|E-TRON 50 (313ch)...|     €|
|  6|      -6000.0|              0|       204.0|     BMW|           i3 120 Ah|     €|
|  7|      -6000.0|              0|       204.0|     BMW|          i3s 120 Ah|     €|
|  8|      -6000.0|              0|       203.0| CITROEN|            BERLINGO|     €|
|  9|      -6000.0|              0|       491.0| CITROEN|              C-ZERO|     €|
| 10|      -6000.0|              0|       251.0|      

24/05/02 12:18:36 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , Marque / Modele, Bonus / Malus, Rejets CO2 g/km, Cout enerie
 Schema: _c0, Marque / Modele, Bonus / Malus, Rejets CO2 g/km, Cout enerie
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/MBDS_Projet/CO2.csv


# Preparation des valeurs calculées :
la moyenne pour "Bonus / Malus"

In [8]:
# Calcul de la moyenne pour "Bonus / Malus" (en excluant les valeurs nulles)
avg_bonus_malus = co2_data_cleaned.filter(col("Bonus / Malus") != 0.0).groupBy("Marque").agg(F.avg("Bonus / Malus").alias("Moyenne Bonus/Malus en euro"))

avg_bonus_malus.show()

+----------+---------------------------+
|    Marque|Moyenne Bonus/Malus en euro|
+----------+---------------------------+
|  MERCEDES|          8237.358422939069|
|   HYUNDAI|                    -6000.0|
|     SKODA|                    -6000.0|
|    NISSAN|                     5802.4|
|   CITROEN|                    -6000.0|
|      AUDI|                    -6000.0|
|      MINI|                    -6000.0|
|   PEUGEOT|                    -6000.0|
|    JAGUAR|                    -6000.0|
|     TESLA|                    -6000.0|
|       BMW|                    -6000.0|
|VOLKSWAGEN|                    -6000.0|
|       KIA|                    -6000.0|
|     SMART|                    -6000.0|
|   RENAULT|                    -6000.0|
|        DS|                    -6000.0|
+----------+---------------------------+



la moyenne pour "Rejets CO2 g/km"

In [9]:
# Convertir la colonne "Rejets CO2 g/km" en type numérique
co2_data_cleaned = co2_data_cleaned.withColumn("Rejets CO2 g/km", co2_data_cleaned["Rejets CO2 g/km"].cast("double"))

# Calcul de la moyenne pour "Rejets CO2 g/km" (en excluant les valeurs égales à zéro)
avg_co2_emission = co2_data_cleaned.filter((col("Rejets CO2 g/km").rlike(r'^\d+(\.\d+)?$')) & (col("Rejets CO2 g/km") != 0)) \
                                    .groupBy("Marque") \
                                    .agg(F.avg("Rejets CO2 g/km").alias("Moyenne Rejets CO2 g/km"))

avg_co2_emission.show()

+----------+-----------------------+
|    Marque|Moyenne Rejets CO2 g/km|
+----------+-----------------------+
|  MERCEDES|     189.55479452054794|
|   PORSCHE|      69.85714285714286|
|   HYUNDAI|                   26.0|
|    TOYOTA|                   32.0|
|     SKODA|                   31.0|
|      LAND|                   69.0|
|    NISSAN|                  200.0|
|   BENTLEY|                   84.0|
|      AUDI|                   43.5|
|      MINI|                   43.0|
|   PEUGEOT|     31.666666666666668|
|     VOLVO|      42.45454545454545|
|       BMW|      43.88235294117647|
|VOLKSWAGEN|                   32.8|
|       KIA|                   31.0|
|        DS|                   33.0|
|MITSUBISHI|                   40.0|
+----------+-----------------------+



La moyenne du coût de l'énergie

In [10]:
# Calcul du coût d'énergie pour chaque marque
energy_cost_per_brand = co2_data_cleaned.groupBy("Marque").agg(F.avg("Cout Energie").alias("Cout Energie Moyen en euro"))

energy_cost_per_brand.show()

+----------+--------------------------+
|    Marque|Cout Energie Moyen en euro|
+----------+--------------------------+
|  MERCEDES|         749.9796610169492|
|   PORSCHE|         89.71428571428571|
|   HYUNDAI|                     151.0|
|    TOYOTA|                      43.0|
|     SKODA|         98.88888888888889|
|    NISSAN|                     681.2|
|      LAND|                      78.0|
|   CITROEN|                     347.0|
|   BENTLEY|                     102.0|
|      AUDI|                     191.6|
|      MINI|                     126.0|
|   PEUGEOT|        144.16666666666666|
|    JAGUAR|                     271.0|
|     VOLVO|         72.72727272727273|
|     TESLA|        245.88888888888889|
|       BMW|         80.52631578947368|
|VOLKSWAGEN|                      96.0|
|       KIA|        157.66666666666666|
|     SMART|        191.36363636363637|
|   RENAULT|                     206.0|
+----------+--------------------------+
only showing top 20 rows



# Combinaison des DataFrames
Suivant on  réalise une jointure entre trois DataFrames sur la colonne "Marque" afin de combiner les informations sur la moyenne du bonus/malus, la moyenne des rejets de CO2 et la moyenne du coût de l'énergie pour chaque marque de voiture. Voici pourquoi chaque étape est effectuée :

1. **Jointure des DataFrames :** Les trois DataFrames contiennent des informations sur différentes caractéristiques des voitures, mais toutes sont liées par la colonne "Marque". La jointure est réalisée avec la méthode `join` pour fusionner les données en fonction de cette colonne commune.


2. **Sélection des colonnes et arrondi des moyennes :** Une fois que les DataFrames sont joints, la méthode `select` est utilisée pour sélectionner les colonnes pertinentes, à savoir "Marque", "Moyenne Bonus/Malus en euro", "Moyenne Rejets CO2 g/km" et "Cout Energie Moyen en euro". De plus, les valeurs des moyennes sont arrondies à trois chiffres après la virgule pour des raisons de lisibilité.

3. **Remplacement des valeurs NULL :** Certaines marques peuvent ne pas avoir de valeurs pour les moyennes du bonus/malus ou des rejets de CO2. Pour éviter toute confusion lors de l'analyse ultérieure, les valeurs NULL dans ces colonnes sont remplacées par 0 à l'aide de la méthode `na.fill`.

4. **Affichage du tableau combiné :** Enfin, le DataFrame combiné est affiché à l'aide de la méthode `show`, avec l'option `truncate=False` pour afficher toutes les données sans tronquer les colonnes. Cela permet de visualiser facilement les informations combinées sur les moyennes pour chaque marque de voiture.

In [11]:
# Joindre les trois dataframes en un seul sur la colonne "Marque"
merged_data = avg_bonus_malus.join(avg_co2_emission, "Marque", "outer").join(energy_cost_per_brand, "Marque", "outer")

# Afficher le tableau combiné avec les moyennes arrondies à 3 chiffres après la virgule
merged_data = merged_data.select(
    col("Marque"),
    round(col("Moyenne Bonus/Malus en euro"), 3).alias("Moyenne Bonus/Malus en euro"),
    round(col("Moyenne Rejets CO2 g/km"), 3).alias("Moyenne Rejets CO2 g/km"),
    round(col("Cout Energie Moyen en euro"), 3).alias("Cout Energie Moyen en euro")
)

# Remplacer les valeurs NULL dans les colonnes "Moyenne Bonus/Malus" et "Moyenne Rejets CO2" par 0
merged_data = merged_data.na.fill(0, subset=["Moyenne Bonus/Malus en euro", "Moyenne Rejets CO2 g/km"])

# Afficher le tableau combiné
merged_data.show(truncate=False)

+----------+---------------------------+-----------------------+--------------------------+
|Marque    |Moyenne Bonus/Malus en euro|Moyenne Rejets CO2 g/km|Cout Energie Moyen en euro|
+----------+---------------------------+-----------------------+--------------------------+
|AUDI      |-6000.0                    |43.5                   |191.6                     |
|BENTLEY   |0.0                        |84.0                   |102.0                     |
|BMW       |-6000.0                    |43.882                 |80.526                    |
|CITROEN   |-6000.0                    |0.0                    |347.0                     |
|DS        |-6000.0                    |33.0                   |159.0                     |
|HYUNDAI   |-6000.0                    |26.0                   |151.0                     |
|JAGUAR    |-6000.0                    |0.0                    |271.0                     |
|KIA       |-6000.0                    |31.0                   |157.667         

In [12]:
# Calcul de la moyenne globale pour "Bonus / Malus" (en excluant les valeurs nulles)
global_avg_bonus_malus = co2_data_cleaned.filter(col("Bonus / Malus") != 0.0).agg(F.avg("Bonus / Malus")).collect()[0][0]

# Calcul de la moyenne globale pour "Rejets CO2 g/km" (en excluant les valeurs égales à zéro)
global_avg_co2_emission = co2_data_cleaned.filter(col("Rejets CO2 g/km") != 0).agg(F.avg("Rejets CO2 g/km")).collect()[0][0]

# Calcul de la moyenne globale pour "Cout Energie"
global_avg_energy_cost = co2_data_cleaned.agg(F.avg("Cout Energie")).collect()[0][0]

# Formater les moyennes avec trois chiffres après la virgule
global_avg_bonus_malus= "{:.3f}".format(global_avg_bonus_malus)
global_avg_co2_emission = "{:.3f}".format(global_avg_co2_emission)
global_avg_energy_cost= "{:.3f}".format(global_avg_energy_cost)


# Affichage des moyennes globales
print("Moyenne globale de Bonus/Malus (en excluant les valeurs nulles) :", global_avg_bonus_malus)
print("Moyenne globale de Rejets CO2 (en excluant les valeurs égales à zéro) :", global_avg_co2_emission)
print("Moyenne globale de Cout Energie :", global_avg_energy_cost)

Moyenne globale de Bonus/Malus (en excluant les valeurs nulles) : 6137.231
Moyenne globale de Rejets CO2 (en excluant les valeurs égales à zéro) : 158.495
Moyenne globale de Cout Energie : 561.465


Suivant on prépare essentiellement les données du catalogue pour la jointure avec les données combinées, en les convertissant en majuscules pour faciliter la correspondance avec les données précédemment traitées.

# Jointure avec Catalogue

In [13]:
########################################################################## Jointure avec Catalog ####################################################""""
catalog_data = spark.read.csv("/MBDS_Projet/Catalogue.csv", header=True,encoding="ISO-8859-1")

# Conversion des valeurs en majuscules
catalog_data = catalog_data.withColumn("marque_upper", upper(col("marque")))

catalog_data.show()

+----------+--------------+---------+-----------+--------+--------+-------+--------+-----+------------+
|    marque|           nom|puissance|   longueur|nbPlaces|nbPortes|couleur|occasion| prix|marque_upper|
+----------+--------------+---------+-----------+--------+--------+-------+--------+-----+------------+
|     Volvo|        S80 T6|      272|très longue|       5|       5|  blanc|   false|50500|       VOLVO|
|     Volvo|        S80 T6|      272|très longue|       5|       5|   noir|   false|50500|       VOLVO|
|     Volvo|        S80 T6|      272|très longue|       5|       5|  rouge|   false|50500|       VOLVO|
|     Volvo|        S80 T6|      272|très longue|       5|       5|   gris|    true|35350|       VOLVO|
|     Volvo|        S80 T6|      272|très longue|       5|       5|   bleu|    true|35350|       VOLVO|
|     Volvo|        S80 T6|      272|très longue|       5|       5|   gris|   false|50500|       VOLVO|
|     Volvo|        S80 T6|      272|très longue|       5|      

On  réalise une jointure gauche entre les données du catalogue (catalog_data) et les données combinées précédemment (merged_data) sur la colonne "Marque".

![Logo Google](https://www.postgresqltutorial.com/wp-content/uploads/2018/12/PostgreSQL-Join-Left-Join.png)


In [14]:
# Left join entre catalog_data et merged_data par la colonne "Marque"
joined_data = catalog_data.join(merged_data, catalog_data["marque_upper"] == merged_data["Marque"], "left").drop(merged_data["Marque"])

# Supprimer la colonne "marque_upper"
joined_data = joined_data.drop("marque_upper")

joined_data.show()

+----------+--------------+---------+-----------+--------+--------+-------+--------+-----+---------------------------+-----------------------+--------------------------+
|    marque|           nom|puissance|   longueur|nbPlaces|nbPortes|couleur|occasion| prix|Moyenne Bonus/Malus en euro|Moyenne Rejets CO2 g/km|Cout Energie Moyen en euro|
+----------+--------------+---------+-----------+--------+--------+-------+--------+-----+---------------------------+-----------------------+--------------------------+
|     Volvo|        S80 T6|      272|très longue|       5|       5|  blanc|   false|50500|                        0.0|                 42.455|                    72.727|
|     Volvo|        S80 T6|      272|très longue|       5|       5|   noir|   false|50500|                        0.0|                 42.455|                    72.727|
|     Volvo|        S80 T6|      272|très longue|       5|       5|  rouge|   false|50500|                        0.0|                 42.455|        

On remplace les valeurs nulles dans le DataFrame joined_data par les moyennes globales calculées précédemment. Voici ce qui se passe ligne par ligne :

 La méthode fillna() est utilisée pour remplacer les valeurs nulles dans le DataFrame joined_data.
 Un dictionnaire est fourni en tant qu'argument à fillna(), où chaque clé correspond à une colonne à remplir et chaque valeur correspond à la valeur de remplacement. Dans ce cas, les colonnes "Moyenne Bonus/Malus en euro", "Moyenne Rejets CO2 g/km" et "Cout Energie Moyen en euro" sont remplacées par leurs valeurs moyennes globales respectives (global_avg_bonus_malus, global_avg_co2_emission et global_avg_energy_cost).

Une fois que les valeurs nulles ont été remplacées, les données résultantes sont affichées à l'aide de la méthode show(). Cela permet de visualiser les données nettoyées et complétées, prêtes à être utilisées pour l'analyse ou le traitement ultérieur.

In [15]:
# Remplacer les valeurs NULL par les moyennes globales
joined_data = joined_data.fillna({
    "Moyenne Bonus/Malus en euro": global_avg_bonus_malus,
    "Moyenne Rejets CO2 g/km": global_avg_co2_emission,
    "Cout Energie Moyen en euro": global_avg_energy_cost
})

# Afficher les données résultantes
joined_data.show()

+----------+--------------+---------+-----------+--------+--------+-------+--------+-----+---------------------------+-----------------------+--------------------------+
|    marque|           nom|puissance|   longueur|nbPlaces|nbPortes|couleur|occasion| prix|Moyenne Bonus/Malus en euro|Moyenne Rejets CO2 g/km|Cout Energie Moyen en euro|
+----------+--------------+---------+-----------+--------+--------+-------+--------+-----+---------------------------+-----------------------+--------------------------+
|     Volvo|        S80 T6|      272|très longue|       5|       5|  blanc|   false|50500|                        0.0|                 42.455|                    72.727|
|     Volvo|        S80 T6|      272|très longue|       5|       5|   noir|   false|50500|                        0.0|                 42.455|                    72.727|
|     Volvo|        S80 T6|      272|très longue|       5|       5|  rouge|   false|50500|                        0.0|                 42.455|        

In [16]:
# Compter le nombre de lignes
nombre_de_lignes = joined_data.count()

print("Nombre total de lignes dans joined_data :", nombre_de_lignes)

Nombre total de lignes dans joined_data : 270


In [17]:
# Afficher la totalité des données
joined_data.show(270)

+----------+----------------+---------+-----------+--------+--------+-------+--------+------+---------------------------+-----------------------+--------------------------+
|    marque|             nom|puissance|   longueur|nbPlaces|nbPortes|couleur|occasion|  prix|Moyenne Bonus/Malus en euro|Moyenne Rejets CO2 g/km|Cout Energie Moyen en euro|
+----------+----------------+---------+-----------+--------+--------+-------+--------+------+---------------------------+-----------------------+--------------------------+
|     Volvo|          S80 T6|      272|très longue|       5|       5|  blanc|   false| 50500|                        0.0|                 42.455|                    72.727|
|     Volvo|          S80 T6|      272|très longue|       5|       5|   noir|   false| 50500|                        0.0|                 42.455|                    72.727|
|     Volvo|          S80 T6|      272|très longue|       5|       5|  rouge|   false| 50500|                        0.0|              

On définit le chemin de destination pour enregistrer le DataFrame joined_data au format CSV et exporte les données résultantes vers ce le fichier "resultat_Catalogue_CO2.csv" en HDFS.
Une fois cette opération terminée, les données de joined_data seront écrites dans le fichier CSV spécifié à l'emplacement indiqué afin que nous puissions l'intégrer par la suite dans datalake Hive.

In [18]:
# Définir le chemin de destination du fichier CSV 
output_path = "/MBDS_Projet/resultat_Catalogue_CO2.csv"

# Exporter les données résultantes au format CSV vers HDFS
joined_data.write.csv(output_path, mode="overwrite", header=True)

                                                                                

- Charger les données dans le répertoire local de la machine virtuelle : 

In [27]:
# Exporter les données au format CSV localement
joined_data.to_csv("resultat_Catalogue_CO2.csv", index=False)

Vous pouvez accéder au fichier "resultat_Catalogue_CO2.csv" dans le rendu du projet, ou bien vous pouvez également le télécharger directement en utilisant le lien suivant : https://drive.google.com/file/d/1bfh1_5auKf45xuIR2oyDjYes-AfUHyLY/view?usp=sharing

On arrête la session Spark, ce qui libère les ressources utilisées par Spark et termine l'exécution du programme.

In [42]:
# Arrêt de la session Spark
spark.stop()