In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql import functions as F

In [2]:
spark = SparkSession.builder.appName("tpa").config("spark.sql.warehouse.dir", "/user/hive/warehouse").config("spark.sql.catalogImplementation", "hive").enableHiveSupport().getOrCreate()

24/01/04 21:57:24 WARN Utils: Your hostname, srv458516 resolves to a loopback address: 127.0.1.1; using 85.31.239.246 instead (on interface eth0)
24/01/04 21:57:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/01/04 21:57:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
!hadoop fs -ls /root/tpa/tpa/DATABASE/data

Found 2 items
-rw-r--r--   1 root root      38916 2024-01-04 16:51 /root/tpa/tpa/DATABASE/data/CO2.csv
-rw-r--r--   1 root root      14114 2024-01-02 22:10 /root/tpa/tpa/DATABASE/data/Catalogue.csv


In [4]:
csv_file_path = "/root/tpa/tpa/DATABASE/data/CO2.csv"

custom_schema = StructType([
    StructField("", IntegerType(), True),
    StructField("Marque / Modele", StringType(), True),
    StructField("Bonus / Malus", StringType(), True),
    StructField("Rejets CO2 g/km", IntegerType(), True),
    StructField("Cout enerie", StringType(), True),
])

df_co2 = spark.read.csv(csv_file_path, header=True, schema=custom_schema)

In [5]:
df_co2.show()

                                                                                

+---+--------------------+-------------+---------------+-----------+
|   |     Marque / Modele|Bonus / Malus|Rejets CO2 g/km|Cout enerie|
+---+--------------------+-------------+---------------+-----------+
|  2|AUDI E-TRON SPORT...|    -6 000€ 1|              0|      319 €|
|  3|AUDI E-TRON SPORT...|    -6 000€ 1|              0|      356 €|
|  4|AUDI E-TRON 55 (4...|    -6 000€ 1|              0|      357 €|
|  5|AUDI E-TRON 50 (3...|    -6 000€ 1|              0|      356 €|
|  6|       BMW i3 120 Ah|    -6 000€ 1|              0|      204 €|
|  7|      BMW i3s 120 Ah|    -6 000€ 1|              0|      204 €|
|  8|    CITROEN BERLINGO|    -6 000€ 1|              0|      203 €|
|  9|      CITROEN C-ZERO|    -6 000€ 1|              0|      491 €|
| 10|DS DS3 CROSSBACK ...|    -6 000€ 1|              0|      251 €|
| 11|HYUNDAI KONA elec...|    -6 000€ 1|              0|      205 €|
| 12|HYUNDAI KONA elec...|    -6 000€ 1|              0|      205 €|
| 13|JAGUAR I-PACE EV4...|    -6 0

In [6]:
df_co2 = df_co2.withColumn("Marque", F.split(df_co2["Marque / Modele"], " ")[0])

# Convert to integer
df_co2 = df_co2.withColumn("Cout enerie", F.regexp_replace("Cout enerie", "[^0-9]", ""))
df_co2 = df_co2.withColumn("Cout enerie", df_co2["Cout enerie"].cast("int"))

pattern = r'([+-]?\d+)\s*€.*'
df_co2 = df_co2.withColumn("Bonus / Malus", F.regexp_replace(F.col("Bonus / Malus"), pattern, "$1"))
df_co2 = df_co2.withColumn("Bonus / Malus", F.regexp_replace(F.col("Bonus / Malus"), "[^0-9-]", "").cast("int"))

df_co2.show()

+---+--------------------+-------------+---------------+-----------+--------+
|   |     Marque / Modele|Bonus / Malus|Rejets CO2 g/km|Cout enerie|  Marque|
+---+--------------------+-------------+---------------+-----------+--------+
|  2|AUDI E-TRON SPORT...|        -6000|              0|        319|    AUDI|
|  3|AUDI E-TRON SPORT...|        -6000|              0|        356|    AUDI|
|  4|AUDI E-TRON 55 (4...|        -6000|              0|        357|    AUDI|
|  5|AUDI E-TRON 50 (3...|        -6000|              0|        356|    AUDI|
|  6|       BMW i3 120 Ah|        -6000|              0|        204|     BMW|
|  7|      BMW i3s 120 Ah|        -6000|              0|        204|     BMW|
|  8|    CITROEN BERLINGO|        -6000|              0|        203| CITROEN|
|  9|      CITROEN C-ZERO|        -6000|              0|        491| CITROEN|
| 10|DS DS3 CROSSBACK ...|        -6000|              0|        251|      DS|
| 11|HYUNDAI KONA elec...|        -6000|              0|        

In [7]:
result_df = df_co2.groupBy("Marque").agg(
    F.round(F.avg("Bonus / Malus"), 2).alias("Avg_Bonus_Malus"),
    F.round(F.avg("Rejets CO2 g/km"), 2).alias("Avg_Rejets_CO2"),
    F.round(F.avg("Cout enerie"), 2).alias("Avg_Cout_energie")
).withColumn("Marque", F.lower(F.col("Marque")))

In [8]:
result_df.show()

+----------+---------------+--------------+----------------+
|    Marque|Avg_Bonus_Malus|Avg_Rejets_CO2|Avg_Cout_energie|
+----------+---------------+--------------+----------------+
|  mercedes|        8237.36|        187.63|          749.98|
|   porsche|           NULL|         69.86|           89.71|
|   hyundai|        -6000.0|          8.67|           151.0|
|    toyota|           NULL|          32.0|            43.0|
|     skoda|        -6000.0|         27.56|           98.89|
|    nissan|         5802.4|         160.0|           681.2|
|      land|           NULL|          69.0|            78.0|
|   citroen|        -6000.0|           0.0|           347.0|
|   bentley|           NULL|          84.0|           102.0|
|      audi|        -6000.0|          26.1|           191.6|
|      mini|        -6000.0|          21.5|           126.0|
|   peugeot|        -6000.0|         15.83|          144.17|
|    jaguar|        -6000.0|           0.0|           271.0|
|     volvo|           N

In [9]:
result = spark.sql("use default;")

spark.sql("show tables;").show()

24/01/04 21:57:50 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
24/01/04 21:57:50 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
24/01/04 21:57:53 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0
24/01/04 21:57:53 WARN ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore root@127.0.1.1


+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
+---------+---------+-----------+



24/01/04 21:57:53 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException


In [10]:
csv_catalogue = "/root/tpa/tpa/DATABASE/data/Catalogue.csv"

df_catalogue = spark.read.csv(csv_catalogue, header=True, inferSchema=True)
df_catalogue = df_catalogue.withColumn("marque", F.lower(F.col("marque")))

In [11]:
df_catalogue.show()

+----------+--------------+---------+-----------+--------+--------+-------+--------+-----+
|    marque|           nom|puissance|   longueur|nbPlaces|nbPortes|couleur|occasion| prix|
+----------+--------------+---------+-----------+--------+--------+-------+--------+-----+
|     volvo|        S80 T6|      272|tr�s longue|       5|       5|  blanc|   false|50500|
|     volvo|        S80 T6|      272|tr�s longue|       5|       5|   noir|   false|50500|
|     volvo|        S80 T6|      272|tr�s longue|       5|       5|  rouge|   false|50500|
|     volvo|        S80 T6|      272|tr�s longue|       5|       5|   gris|    true|35350|
|     volvo|        S80 T6|      272|tr�s longue|       5|       5|   bleu|    true|35350|
|     volvo|        S80 T6|      272|tr�s longue|       5|       5|   gris|   false|50500|
|     volvo|        S80 T6|      272|tr�s longue|       5|       5|   bleu|   false|50500|
|     volvo|        S80 T6|      272|tr�s longue|       5|       5|  rouge|    true|35350|

In [12]:
joined_df = df_catalogue.join(result_df, df_catalogue.marque == result_df.Marque, "left_outer")

In [13]:
joined_df = joined_df.select(
    df_catalogue["*"],
    result_df["Avg_Bonus_Malus"],
    result_df["Avg_Rejets_CO2"],
    result_df["Avg_Cout_energie"]
)

joined_df.show(truncate=False)

+----------+--------------+---------+-----------+--------+--------+-------+--------+-----+---------------+--------------+----------------+
|marque    |nom           |puissance|longueur   |nbPlaces|nbPortes|couleur|occasion|prix |Avg_Bonus_Malus|Avg_Rejets_CO2|Avg_Cout_energie|
+----------+--------------+---------+-----------+--------+--------+-------+--------+-----+---------------+--------------+----------------+
|volvo     |S80 T6        |272      |tr�s longue|5       |5       |blanc  |false   |50500|NULL           |42.45         |72.73           |
|volvo     |S80 T6        |272      |tr�s longue|5       |5       |noir   |false   |50500|NULL           |42.45         |72.73           |
|volvo     |S80 T6        |272      |tr�s longue|5       |5       |rouge  |false   |50500|NULL           |42.45         |72.73           |
|volvo     |S80 T6        |272      |tr�s longue|5       |5       |gris   |true    |35350|NULL           |42.45         |72.73           |
|volvo     |S80 T6        |

In [14]:
specific_marque_df = joined_df.filter(F.col("marque") == "seat")

In [15]:
specific_marque_df.show()

+------+----------+---------+--------+--------+--------+-------+--------+-----+---------------+--------------+----------------+
|marque|       nom|puissance|longueur|nbPlaces|nbPortes|couleur|occasion| prix|Avg_Bonus_Malus|Avg_Rejets_CO2|Avg_Cout_energie|
+------+----------+---------+--------+--------+--------+-------+--------+-----+---------------+--------------+----------------+
|  seat|Toledo 1.6|      102|  longue|       5|       5|  blanc|   false|18880|           NULL|          NULL|            NULL|
|  seat|Toledo 1.6|      102|  longue|       5|       5|   noir|   false|18880|           NULL|          NULL|            NULL|
|  seat|Toledo 1.6|      102|  longue|       5|       5|  rouge|   false|18880|           NULL|          NULL|            NULL|
|  seat|Toledo 1.6|      102|  longue|       5|       5|   gris|   false|18880|           NULL|          NULL|            NULL|
|  seat|Toledo 1.6|      102|  longue|       5|       5|   bleu|   false|18880|           NULL|         

In [16]:
not_null_values_df = joined_df.filter(F.col('Avg_Bonus_Malus').isNotNull())

global_avg_df = not_null_values_df.agg(
    F.round(F.avg('Avg_Bonus_Malus'), 2).alias('Avg_Bonus_Malus'),
    F.round(F.avg('Avg_Rejets_CO2'), 2).alias('Avg_Rejets_CO2'),
    F.round(F.avg('Avg_Cout_energie'), 2).alias('Avg_Cout_energie')
)

global_avg_df.show()

+---------------+--------------+----------------+
|Avg_Bonus_Malus|Avg_Rejets_CO2|Avg_Cout_energie|
+---------------+--------------+----------------+
|       -3631.88|         43.88|          264.45|
+---------------+--------------+----------------+



In [17]:
final_df = joined_df.na.fill(global_avg_df.first().asDict())

In [19]:
final_df.show()

+----------+--------------+---------+-----------+--------+--------+-------+--------+-----+---------------+--------------+----------------+
|    marque|           nom|puissance|   longueur|nbPlaces|nbPortes|couleur|occasion| prix|Avg_Bonus_Malus|Avg_Rejets_CO2|Avg_Cout_energie|
+----------+--------------+---------+-----------+--------+--------+-------+--------+-----+---------------+--------------+----------------+
|     volvo|        S80 T6|      272|tr�s longue|       5|       5|  blanc|   false|50500|       -3631.88|         42.45|           72.73|
|     volvo|        S80 T6|      272|tr�s longue|       5|       5|   noir|   false|50500|       -3631.88|         42.45|           72.73|
|     volvo|        S80 T6|      272|tr�s longue|       5|       5|  rouge|   false|50500|       -3631.88|         42.45|           72.73|
|     volvo|        S80 T6|      272|tr�s longue|       5|       5|   gris|    true|35350|       -3631.88|         42.45|           72.73|
|     volvo|        S80 T6|

In [21]:
specific_marque_df = final_df.filter(F.col("marque") == "seat")

In [22]:
specific_marque_df.show()

+------+----------+---------+--------+--------+--------+-------+--------+-----+---------------+--------------+----------------+
|marque|       nom|puissance|longueur|nbPlaces|nbPortes|couleur|occasion| prix|Avg_Bonus_Malus|Avg_Rejets_CO2|Avg_Cout_energie|
+------+----------+---------+--------+--------+--------+-------+--------+-----+---------------+--------------+----------------+
|  seat|Toledo 1.6|      102|  longue|       5|       5|  blanc|   false|18880|       -3631.88|         43.88|          264.45|
|  seat|Toledo 1.6|      102|  longue|       5|       5|   noir|   false|18880|       -3631.88|         43.88|          264.45|
|  seat|Toledo 1.6|      102|  longue|       5|       5|  rouge|   false|18880|       -3631.88|         43.88|          264.45|
|  seat|Toledo 1.6|      102|  longue|       5|       5|   gris|   false|18880|       -3631.88|         43.88|          264.45|
|  seat|Toledo 1.6|      102|  longue|       5|       5|   bleu|   false|18880|       -3631.88|         

In [36]:
output_path = "/root/tpa/tpa/DATABASE/data/final_df.csv"
final_df.write.csv(output_path, mode="overwrite", header=True)