In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col , regexp_replace , split ,initcap , upper , avg , trim , lit , coalesce , round , monotonically_increasing_id
from pyspark.sql.types import StructType, StructField, StringType, IntegerType , DoubleType

## 1 - Creating spark session 

In [4]:
spark = SparkSession.builder.appName("Catalogue_CO2").getOrCreate()

## 2 - Importing data from hdfs 

In [7]:
schema = StructType([
    StructField("NoLigne", IntegerType(), True),
    StructField("Marque / Modele", StringType(), True),
    StructField("Bonus / Malus", StringType(), True),
    StructField("Rejets CO2 g/km", IntegerType(), True),
    StructField("Cout energie", StringType(), True)
])
co2_df = spark.read.csv("tpa_13/CO2.csv", header=True, schema= schema)

In [10]:
catalog_df = spark.read.csv("tpa_13/Catalogue.csv", header=True, inferSchema=True)

                                                                                

## 3 - Processing CO2

In [11]:
# fix Bonus / Malus data by removing everything after the euro sign 
co2_df = co2_df.withColumn("Bonus / Malus", regexp_replace("Bonus / Malus", r'€.*', '€'))
# remove all data that have invalide values 
co2_df = co2_df.filter(col("Bonus / Malus") != "-")

In [12]:
# add create Marque colunm from Marque / Model by taking the first word 
co2_df = co2_df.withColumn("Marque", split(co2_df["Marque / Modele"], " ")[0])
# create Model colunm from Marque / Model col by taking the 
co2_df = co2_df.withColumn("Model", split(co2_df["Marque / Modele"], " ")[1])
# delete the marque / model col as it 
co2_df = co2_df.drop("Marque / Modele")

In [13]:
# we take all the marques catalogue
marques_catalogue = catalog_df.select("marque").distinct().withColumnRenamed("marque", "Marque")
# we capitalise them in order to be able to compared it with data from co2
marques_catalogue = marques_catalogue.withColumn("Marque", upper(col("Marque")))

In [15]:
# We filter CO2 to only have data having marques in Catalogues 
co2_df = co2_df.alias("co2").join(marques_catalogue.alias("marques"), "Marque")
co2_df.show()

+--------+-------+-------------+---------------+------------+--------+
|  Marque|NoLigne|Bonus / Malus|Rejets CO2 g/km|Cout energie|   Model|
+--------+-------+-------------+---------------+------------+--------+
|    AUDI|      2|      -6 000€|              0|       319 €|  E-TRON|
|    AUDI|      3|      -6 000€|              0|       356 €|  E-TRON|
|    AUDI|      4|      -6 000€|              0|       357 €|  E-TRON|
|    AUDI|      5|      -6 000€|              0|       356 €|  E-TRON|
|     BMW|      6|      -6 000€|              0|       204 €|      i3|
|     BMW|      7|      -6 000€|              0|       204 €|     i3s|
|  JAGUAR|     13|      -6 000€|              0|       271 €|  I-PACE|
|     KIA|     14|      -6 000€|              0|       212 €|  e-NIRO|
|     KIA|     15|      -6 000€|              0|       203 €|  e-NIRO|
|     KIA|     16|      -6 000€|              0|       214 €|    SOUL|
|     KIA|     17|      -6 000€|              0|       214 €|    SOUL|
|MERCE

24/05/28 10:08:27 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , Marque / Modele, Bonus / Malus, Rejets CO2 g/km, Cout enerie
 Schema: NoLigne, Marque / Modele, Bonus / Malus, Rejets CO2 g/km, Cout energie
Expected: NoLigne but found: 
CSV file: hdfs://localhost:9000/user/vagrant/tpa_13/CO2.csv


In [10]:
# we process Bonus / Malus and Cout energie to be able to perform operations as number
# remove euro , + , white spaces 
processed_co2 = co2_df.withColumn('Bonus / Malus', regexp_replace(col('Bonus / Malus'), '€', '')) \
    .withColumn('Cout energie', regexp_replace(col('Cout energie'), '€', '')) \
    .withColumn('Bonus / Malus', regexp_replace(col('Bonus / Malus'), '\\+', '')) \
    .withColumn('Cout energie', regexp_replace(col('Cout energie'), '\u00A0', '')) \
    .withColumn('Bonus / Malus', regexp_replace(col('Bonus / Malus'), '\u00A0', ''))
# we convert Bonus / Malus , Cout energie to float 
processed_co2 = processed_co2.withColumn('Bonus / Malus', processed_co2['Bonus / Malus'].cast('float')) \
    .withColumn('Cout energie', processed_co2['Cout energie'].cast('float'))
processed_co2.show()

+--------+-------+-------------+---------------+------------+--------+
|  Marque|NoLigne|Bonus / Malus|Rejets CO2 g/km|Cout energie|   Model|
+--------+-------+-------------+---------------+------------+--------+
|    AUDI|      2|      -6000.0|              0|       319.0|  E-TRON|
|    AUDI|      3|      -6000.0|              0|       356.0|  E-TRON|
|    AUDI|      4|      -6000.0|              0|       357.0|  E-TRON|
|    AUDI|      5|      -6000.0|              0|       356.0|  E-TRON|
|     BMW|      6|      -6000.0|              0|       204.0|      i3|
|     BMW|      7|      -6000.0|              0|       204.0|     i3s|
|  JAGUAR|     13|      -6000.0|              0|       271.0|  I-PACE|
|     KIA|     14|      -6000.0|              0|       212.0|  e-NIRO|
|     KIA|     15|      -6000.0|              0|       203.0|  e-NIRO|
|     KIA|     16|      -6000.0|              0|       214.0|    SOUL|
|     KIA|     17|      -6000.0|              0|       214.0|    SOUL|
|MERCE

24/05/01 21:32:41 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: NoLigne, Marque / Modele, Bonus / Malus, Rejets CO2 g/km, Cout enerie
 Schema: NoLigne, Marque / Modele, Bonus / Malus, Rejets CO2 g/km, Cout energie
Expected: Cout energie but found: Cout enerie
CSV file: hdfs://localhost:9000/user/vagrant/concessionnaire_automobile/CO2


In [16]:
# we process Bonus / Malus and Cout energie to be able to perform operations as number
# remove euro , + , white spaces 
processed_co2 = co2_df.withColumn('Bonus / Malus', regexp_replace(col('Bonus / Malus'), '€', '')) \
    .withColumn('Cout energie', regexp_replace(col('Cout energie'), '€', '')) \
    .withColumn('Bonus / Malus', regexp_replace(col('Bonus / Malus'), '\\+', '')) \
    .withColumn('Cout energie', regexp_replace(col('Cout energie'), '\u00A0', '')) \
    .withColumn('Bonus / Malus', regexp_replace(col('Bonus / Malus'), '\u00A0', ''))
# we convert Bonus / Malus , Cout energie to float 
processed_co2 = processed_co2.withColumn('Bonus / Malus', processed_co2['Bonus / Malus'].cast('float')) \
    .withColumn('Cout energie', processed_co2['Cout energie'].cast('float'))
processed_co2.show()

+--------+-------+-------------+---------------+------------+--------+
|  Marque|NoLigne|Bonus / Malus|Rejets CO2 g/km|Cout energie|   Model|
+--------+-------+-------------+---------------+------------+--------+
|    AUDI|      2|      -6000.0|              0|       319.0|  E-TRON|
|    AUDI|      3|      -6000.0|              0|       356.0|  E-TRON|
|    AUDI|      4|      -6000.0|              0|       357.0|  E-TRON|
|    AUDI|      5|      -6000.0|              0|       356.0|  E-TRON|
|     BMW|      6|      -6000.0|              0|       204.0|      i3|
|     BMW|      7|      -6000.0|              0|       204.0|     i3s|
|  JAGUAR|     13|      -6000.0|              0|       271.0|  I-PACE|
|     KIA|     14|      -6000.0|              0|       212.0|  e-NIRO|
|     KIA|     15|      -6000.0|              0|       203.0|  e-NIRO|
|     KIA|     16|      -6000.0|              0|       214.0|    SOUL|
|     KIA|     17|      -6000.0|              0|       214.0|    SOUL|
|MERCE

24/05/28 10:08:42 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , Marque / Modele, Bonus / Malus, Rejets CO2 g/km, Cout enerie
 Schema: NoLigne, Marque / Modele, Bonus / Malus, Rejets CO2 g/km, Cout energie
Expected: NoLigne but found: 
CSV file: hdfs://localhost:9000/user/vagrant/tpa_13/CO2.csv


In [17]:
# We take the mean of Bonus / Malus , Regets CO2 , Cout Energie for each brand 
moyennes_par_marque = processed_co2.groupBy('Marque').agg(
    round(avg('Bonus / Malus'), 2).alias('Bonus / Malus'),
    round(avg('Rejets CO2 g/km'), 2).alias('Rejets CO2 g/km'),
    round(avg('Cout Energie'), 2).alias('Cout Energie')
)
moyennes_par_marque.show()

24/05/28 10:08:49 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Marque / Modele, Bonus / Malus, Rejets CO2 g/km, Cout enerie
 Schema: Marque / Modele, Bonus / Malus, Rejets CO2 g/km, Cout energie
Expected: Cout energie but found: Cout enerie
CSV file: hdfs://localhost:9000/user/vagrant/tpa_13/CO2.csv


+----------+-------------+---------------+------------+
|    Marque|Bonus / Malus|Rejets CO2 g/km|Cout Energie|
+----------+-------------+---------------+------------+
|  MERCEDES|      8237.36|          196.1|      788.49|
|     SKODA|      -6000.0|            0.0|       210.0|
|    NISSAN|       5802.4|          160.0|       681.2|
|      AUDI|      -6000.0|            0.0|       347.0|
|      MINI|      -6000.0|            0.0|       199.0|
|   PEUGEOT|      -6000.0|            0.0|      221.67|
|    JAGUAR|      -6000.0|            0.0|       271.0|
|       BMW|      -6000.0|            0.0|       204.0|
|VOLKSWAGEN|      -6000.0|            0.0|       197.0|
|       KIA|      -6000.0|            0.0|      210.75|
|   RENAULT|      -6000.0|            0.0|       206.0|
+----------+-------------+---------------+------------+



In [18]:
# we take the mean of Bonus / Malus , Rehet CO2 and Cout Energie of all brand 
moyenne_toutes_marques_df = processed_co2.groupBy().agg(
    round(avg("Bonus / Malus"),2).alias("Moyenne_Bonus_Malus"),
    round(avg("Rejets CO2 g/km"),2).alias("Moyenne_Rejets_CO2"),
    round(avg("Cout Energie"),2).alias("Moyenne_Cout_Energie")
)
moyenne_toutes_marques_df.show()

+-------------------+------------------+--------------------+
|Moyenne_Bonus_Malus|Moyenne_Rejets_CO2|Moyenne_Cout_Energie|
+-------------------+------------------+--------------------+
|            7109.77|            180.49|               744.2|
+-------------------+------------------+--------------------+



24/05/28 10:09:02 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Marque / Modele, Bonus / Malus, Rejets CO2 g/km, Cout enerie
 Schema: Marque / Modele, Bonus / Malus, Rejets CO2 g/km, Cout energie
Expected: Cout energie but found: Cout enerie
CSV file: hdfs://localhost:9000/user/vagrant/tpa_13/CO2.csv


In [19]:
# we add column from bonus / malus , rejets co2 and cout energie into catalog
# we leave values null for brandes that are not in co2 
df_resultat = catalog_df.alias("catalogue").join(
    moyennes_par_marque.withColumnRenamed("Marque", "Marque_moyenne").select("Marque_moyenne", "Bonus / Malus", "Rejets CO2 g/km", "Cout Energie"),
    (upper(col("catalogue.marque")) == col("Marque_moyenne")),
    "left_outer"
).drop("Marque_moyenne")
df_resultat.show()

24/05/28 10:09:07 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Marque / Modele, Bonus / Malus, Rejets CO2 g/km, Cout enerie
 Schema: Marque / Modele, Bonus / Malus, Rejets CO2 g/km, Cout energie
Expected: Cout energie but found: Cout enerie
CSV file: hdfs://localhost:9000/user/vagrant/tpa_13/CO2.csv


+----------+--------------+---------+-----------+--------+--------+-------+--------+-----+-------------+---------------+------------+
|    marque|           nom|puissance|   longueur|nbPlaces|nbPortes|couleur|occasion| prix|Bonus / Malus|Rejets CO2 g/km|Cout Energie|
+----------+--------------+---------+-----------+--------+--------+-------+--------+-----+-------------+---------------+------------+
|     Volvo|        S80 T6|      272|tr�s longue|       5|       5|  blanc|   false|50500|         NULL|           NULL|        NULL|
|     Volvo|        S80 T6|      272|tr�s longue|       5|       5|   noir|   false|50500|         NULL|           NULL|        NULL|
|     Volvo|        S80 T6|      272|tr�s longue|       5|       5|  rouge|   false|50500|         NULL|           NULL|        NULL|
|     Volvo|        S80 T6|      272|tr�s longue|       5|       5|   gris|    true|35350|         NULL|           NULL|        NULL|
|     Volvo|        S80 T6|      272|tr�s longue|       5|    

In [20]:
moyenne_values = moyenne_toutes_marques_df.first()
# we set the values missing brand in co2 to the mean of all brands
updated_catalog_df = df_resultat.withColumn("Bonus / Malus", coalesce(df_resultat["Bonus / Malus"], lit(moyenne_values["Moyenne_Bonus_Malus"]))) \
    .withColumn("Rejets CO2 g/km", coalesce(df_resultat["Rejets CO2 g/km"], lit(moyenne_values["Moyenne_Rejets_CO2"]))) \
    .withColumn("Cout Energie", coalesce(df_resultat["Cout Energie"], lit(moyenne_values["Moyenne_Cout_Energie"])))
updated_catalog_df.show()

24/05/28 10:09:08 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Marque / Modele, Bonus / Malus, Rejets CO2 g/km, Cout enerie
 Schema: Marque / Modele, Bonus / Malus, Rejets CO2 g/km, Cout energie
Expected: Cout energie but found: Cout enerie
CSV file: hdfs://localhost:9000/user/vagrant/tpa_13/CO2.csv
24/05/28 10:09:09 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Marque / Modele, Bonus / Malus, Rejets CO2 g/km, Cout enerie
 Schema: Marque / Modele, Bonus / Malus, Rejets CO2 g/km, Cout energie
Expected: Cout energie but found: Cout enerie
CSV file: hdfs://localhost:9000/user/vagrant/tpa_13/CO2.csv


+----------+--------------+---------+-----------+--------+--------+-------+--------+-----+-------------+---------------+------------+
|    marque|           nom|puissance|   longueur|nbPlaces|nbPortes|couleur|occasion| prix|Bonus / Malus|Rejets CO2 g/km|Cout Energie|
+----------+--------------+---------+-----------+--------+--------+-------+--------+-----+-------------+---------------+------------+
|     Volvo|        S80 T6|      272|tr�s longue|       5|       5|  blanc|   false|50500|      7109.77|         180.49|       744.2|
|     Volvo|        S80 T6|      272|tr�s longue|       5|       5|   noir|   false|50500|      7109.77|         180.49|       744.2|
|     Volvo|        S80 T6|      272|tr�s longue|       5|       5|  rouge|   false|50500|      7109.77|         180.49|       744.2|
|     Volvo|        S80 T6|      272|tr�s longue|       5|       5|   gris|    true|35350|      7109.77|         180.49|       744.2|
|     Volvo|        S80 T6|      272|tr�s longue|       5|    

In [21]:
updated_catalog_df.repartition(1).write.csv("tpa_13/transformed_catalog" , mode="overwrite" , header=True)

24/05/28 10:09:54 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Marque / Modele, Bonus / Malus, Rejets CO2 g/km, Cout enerie
 Schema: Marque / Modele, Bonus / Malus, Rejets CO2 g/km, Cout energie
Expected: Cout energie but found: Cout enerie
CSV file: hdfs://localhost:9000/user/vagrant/tpa_13/CO2.csv


In [22]:
spark.stop()