# Nettoyage des données dans CO2 avec Hadoop
Nous allons maintenant nettoyer les données dans CO2.csv pour être utilisable dans notre datalake.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import col, regexp_replace, avg, split,regexp_extract, trim, translate, when

In [2]:
sparkHive = (SparkSession
                .builder
                .appName('example-pyspark-read-and-write-from-hive')
                .config("hive.metastore.uris", "thrift://localhost:9083")
                .config("spark.sql.warehouse.dir", "/user/hive/warehouse")
                .enableHiveSupport()
                .getOrCreate()
                )

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/09 16:41:02 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
sparkHive.sql("SHOW TABLES").show()

+---------+--------------------+-----------+
|namespace|           tableName|isTemporary|
+---------+--------------------+-----------+
|  default|       catalogue_co2|      false|
|  default|catalogue_oraclen...|      false|
|  default|        client_mongo|      false|
|  default| client_mongo_extern|      false|
|  default|             co2_ext|      false|
|  default|        co2_hdfs_ext|      false|
|  default|immatriculation_h...|      false|
|  default|immatriculation_o...|      false|
|  default|marketing_mongo_e...|      false|
|  default| table_catalogue_co2|      false|
+---------+--------------------+-----------+



In [None]:
#sparkHive.stop()

In [5]:
df_co2 = sparkHive.read.csv("tpa/CO2.csv", header=True, inferSchema=True)

                                                                                

In [6]:
df_co2.count()

                                                                                

437

In [14]:
df_co2.show(437)

+---+--------------------+-----------+-------------------+------------+
| ID|       marque_modele|bonus_malus|rejets_co2_g_per_km|cout_energie|
+---+--------------------+-----------+-------------------+------------+
|  2|AUDI E-TRON SPORT...|  -6 000€ 1|                  0|       319 €|
|  3|AUDI E-TRON SPORT...|  -6 000€ 1|                  0|       356 €|
|  4|AUDI E-TRON 55 (4...|  -6 000€ 1|                  0|       357 €|
|  5|AUDI E-TRON 50 (3...|  -6 000€ 1|                  0|       356 €|
|  6|       BMW i3 120 Ah|  -6 000€ 1|                  0|       204 €|
|  7|      BMW i3s 120 Ah|  -6 000€ 1|                  0|       204 €|
|  8|    CITROEN BERLINGO|  -6 000€ 1|                  0|       203 €|
|  9|      CITROEN C-ZERO|  -6 000€ 1|                  0|       491 €|
| 10|DS DS3 CROSSBACK ...|  -6 000€ 1|                  0|       251 €|
| 11|HYUNDAI KONA elec...|  -6 000€ 1|                  0|       205 €|
| 12|HYUNDAI KONA elec...|  -6 000€ 1|                  0|      

In [5]:
def preprocess_data(df):
    # Extraire la marque et le modèle avec une meilleure séparation
    df = df.withColumn('marque_modele', regexp_replace(col('marque_modele'), r'^"|"$', ''))
    # Extrayez la marque
    df = df.withColumn('marque', regexp_extract('marque_modele', r'^(\w+)', 1))
    # Extrayez le modèle
    df = df.withColumn('modele', regexp_extract('marque_modele', r'^\w+\s+(.*)', 1))

    # Supprimer les espaces, les virgules et ensuite convertir la partie numérique en float
    df = df.withColumn('bonus_malus', split(col('bonus_malus'), '€').getItem(0))
    df = df.withColumn('bonus_malus', regexp_replace('bonus_malus', r'\s+', ''))  # Enlève les espaces
    df = df.withColumn('bonus_malus', regexp_replace('bonus_malus', '[^\d-.]', ''))  # Enlève tout sauf les chiffres, les signes et les points
    df = df.withColumn('bonus_malus', col('bonus_malus').cast('int'))  # Convertit en int
    df = df.withColumn('bonus_malus', when(col('bonus_malus').isNull(), 0).otherwise(col('bonus_malus')))


    df = df.withColumn('cout_energie', split(col('cout_energie'), '€').getItem(0))
    df = df.withColumn('cout_energie', regexp_replace('cout_energie', r'\s+', ''))  # Enlève les espaces
    df = df.withColumn('cout_energie', regexp_replace('cout_energie', '[^\d-.]', ''))  # Enlève tout sauf les chiffres, les signes et les points
    df = df.withColumn('cout_energie', col('cout_energie').cast('int'))  # Convertit en int
    df = df.withColumn('cout_energie', when(col('cout_energie').isNull(), 0).otherwise(col('cout_energie')))

    return df

In [6]:
df_cleaned = preprocess_data(df_co2)

In [7]:
df_cleaned.select("ID","bonus_malus","cout_energie").show(n=df_cleaned.count(),truncate=False)

                                                                                

+---+-----------+------------+
|ID |bonus_malus|cout_energie|
+---+-----------+------------+
|2  |-6000      |319         |
|3  |-6000      |356         |
|4  |-6000      |357         |
|5  |-6000      |356         |
|6  |-6000      |204         |
|7  |-6000      |204         |
|8  |-6000      |203         |
|9  |-6000      |491         |
|10 |-6000      |251         |
|11 |-6000      |205         |
|12 |-6000      |205         |
|13 |-6000      |271         |
|14 |-6000      |212         |
|15 |-6000      |203         |
|16 |-6000      |214         |
|17 |-6000      |214         |
|18 |-6000      |291         |
|19 |-6000      |411         |
|20 |-6000      |411         |
|21 |-6000      |199         |
|23 |-6000      |155         |
|24 |-6000      |177         |
|25 |-6000      |221         |
|26 |-6000      |241         |
|27 |-6000      |203         |
|28 |-6000      |206         |
|29 |-6000      |206         |
|30 |-6000      |206         |
|31 |-6000      |210         |
|32 |-60

In [8]:
rdd_co2 = df_cleaned.rdd
collected_data = rdd_co2.collect()
# Effectuez une boucle sur les éléments de l'RDD
for row in collected_data:
    print(row)

                                                                                

Row(ID=2, marque_modele='AUDI E-TRON SPORTBACK 55 (408ch) quattro', bonus_malus=-6000, rejets_co2_g_per_km=0, cout_energie=319, marque='AUDI', modele='E-TRON SPORTBACK 55 (408ch) quattro')
Row(ID=3, marque_modele='AUDI E-TRON SPORTBACK 50 (313ch) quattro', bonus_malus=-6000, rejets_co2_g_per_km=0, cout_energie=356, marque='AUDI', modele='E-TRON SPORTBACK 50 (313ch) quattro')
Row(ID=4, marque_modele='AUDI E-TRON 55 (408ch) quattro', bonus_malus=-6000, rejets_co2_g_per_km=0, cout_energie=357, marque='AUDI', modele='E-TRON 55 (408ch) quattro')
Row(ID=5, marque_modele='AUDI E-TRON 50 (313ch) quattro', bonus_malus=-6000, rejets_co2_g_per_km=0, cout_energie=356, marque='AUDI', modele='E-TRON 50 (313ch) quattro')
Row(ID=6, marque_modele='BMW i3 120 Ah', bonus_malus=-6000, rejets_co2_g_per_km=0, cout_energie=204, marque='BMW', modele='i3 120 Ah')
Row(ID=7, marque_modele='BMW i3s 120 Ah', bonus_malus=-6000, rejets_co2_g_per_km=0, cout_energie=204, marque='BMW', modele='i3s 120 Ah')
Row(ID=8, ma

In [9]:
def map_row(row):
    marque = row['marque']
    bonus_malus = row['bonus_malus']
    rejets_co2_g_per_km = row['rejets_co2_g_per_km']
    cout_energie = row['cout_energie']
    return (marque, (bonus_malus, rejets_co2_g_per_km, cout_energie, 1))

def reduce_rows(x, y):
    return (x[0] + y[0], x[1] + y[1], x[2] + y[2], x[3] + y[3])

# Fonction de calcul des moyennes
def calculate_averages(row):
    marque, sums = row
    total_bonus_malus, total_rejets_co2, total_cout_energie, count = sums
    return (marque, total_bonus_malus / count, total_rejets_co2 / count, total_cout_energie / count)

# Fonction de conversion en chaîne de caractères
def to_string(row):
    marque, avg_bonus_malus, avg_rejets_co2, avg_cout_energie = row
    return f"{marque},{avg_bonus_malus},{avg_rejets_co2},{avg_cout_energie}"

# Fonction principale de traitement map-reduce
def traitement_map_reduce(rdd):
    # Transformation initiale
    rdd_transformed = rdd.map(map_row)
    # Réduction par clé
    rdd_reduced = rdd_transformed.reduceByKey(reduce_rows)
    # Calcul des moyennes
    rdd_averages = rdd_reduced.map(calculate_averages)
    # Conversion en chaîne de caractères
    rdd_resultats = rdd_averages.map(to_string)
    return rdd_resultats


In [11]:

# Appliquez la fonction de traitement map-reduce
resultats = traitement_map_reduce(rdd_co2)

# Affichez les résultats
# for item in resultats.collect():
#     print(item)

resultats_string = resultats.map(lambda x: f"{x[0]},{x[1]},{x[2]},{x[3]}")
resultats.saveAsTextFile('/resultat_map_reduce_now')
print(resultats_string)
resultats_string.saveAsTextFile('/user/vagrant/output/clean_co2_now')

                                                                                

PythonRDD[41] at RDD at PythonRDD.scala:53


                                                                                