In [None]:
!hadoop fs -ls /tpa_groupe_14/data/co2

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("TPT-HADOOP_MAP_REDUCE") \
    .getOrCreate()

In [None]:
def load_data_in_csv_file(filename,separator,head):
    # This path is based on this instruction `2. Upload all resulting csv files to HDFS.`
    df = spark.read.options(delimiter=separator, header=head).csv(filename)

    # Display schema and first five rows of the DataFrame
    print("Schema and first rows in", filename)
    print("Count : ",df.count())
    df.printSchema()
    df.show(5)
    
    return df

## Chargement de données

In [None]:
# Prendre CO2.csv deppuis hdfs
co2_hdfs_df = load_data_in_csv_file("/tpa_groupe_14/data/co2/CO2.csv",",", True)

In [None]:
# Prendre catalogue deppuis hdfs, les données cache de Hive
catalogue_hive_df = load_data_in_csv_file("/user/hive/warehouse/catalogue",'\t',False)
catalogue_hive_df.cache()

### Ajouter une colonne marque dans CO2

In [None]:
from pyspark.sql.functions import col

# Prendre les "marque" présent dans catalogue
marque_catalogue_df = catalogue_hive_df.select(col("_c1")).distinct()

In [None]:
# Renommer "_c1" en "Marque"
marque_catalogue_df = marque_catalogue_df.withColumnRenamed("_c1", "Marque")
marque_catalogue_df.show(5)
# Store all the "marque" in catalogue
marque_catalogue_df.cache()

In [None]:
print("Nombre de marque : " ,marque_catalogue_df.count())

In [None]:
from pyspark.sql.functions import lower

# Prendre "marque" de Catalogue et le mettre dans "CO2" 
# Enlever la colonne "Marque / Modele"
join_df = co2_hdfs_df.join(marque_catalogue_df, lower(co2_hdfs_df["Marque / Modele"]).\
                                  contains(lower(marque_catalogue_df["Marque"])), "inner")

co2_marque_df = join_df.drop("Marque / Modele")

In [None]:
co2_marque_df.show(5)

### Formatage de valeurs dans CO2

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Fonction pour rectifier les valeurs de "Bonus / Malus" et "Cout enerie"
# Convertir ses valeurs en nombres
def clean_number(value):
    string_value = value
    if '-' in value and '€' not in value:
        return 0
    if '€' in value :
        string_value = value.split("€", 1)[0]
    number = ''.join(filter(lambda x: x.isdigit() or x == '-' or x == '+', string_value))
    return int(number) if number else 0

clean_value_udf = udf(clean_number, StringType())

co2_valid_df = co2_marque_df.\
                    withColumn("Bonus / Malus", clean_value_udf(co2_marque_df["Bonus / Malus"])).\
                    withColumn("Cout enerie", clean_value_udf(co2_marque_df["Cout enerie"]))

In [None]:
co2_valid_df.cache()
co2_valid_df.show(5)
print("CO2 valide : ", co2_valid_df.count())

### Valuers de CO2 pour tous marque de catalogue

In [None]:
from pyspark.sql.functions import avg

# Calculer les moyennes de "Bonus / Malus" , "Rejets CO2 g/km", "Cout enerie" regroupé par "Marque"
co2_marque_avg_df = co2_valid_df.groupBy("Marque") \
                                   .agg(avg("Bonus / Malus").alias("Bonus / Malus"), \
                                        avg("Rejets CO2 g/km").alias("Rejets CO2 g/km"), \
                                        avg("Cout enerie").alias("Cout enerie"))

In [None]:
# Calculer les moyennes de "Bonus / Malus" , "Rejets CO2 g/km", "Cout enerie" de tous les lignes
co2_all_avg_df = co2_valid_df.select(avg("Bonus / Malus").alias("Bonus / Malus"),\
                           avg("Rejets CO2 g/km").alias("Rejets CO2 g/km"),\
                           avg("Cout enerie").alias("Cout enerie"))


In [None]:
# Afficher le resultat
print("Average by 'Marque'")
co2_marque_avg_df.show(5)
print("Average by 'Marque' : ", co2_marque_avg_df.count())

print("Average of all 'Marque'")
co2_all_avg_df.show()

co2_marque_avg_df.cache()
co2_all_avg_df.cache()

In [None]:
# Prendre "Marque" présent dans "Catalogue" mais non dans "CO2"
marque_only_catalogue_df = marque_catalogue_df\
                    .join(co2_marque_avg_df, marque_catalogue_df["Marque"] == co2_marque_avg_df["Marque"], "left_anti")
print("Marque not in CO2 but in Cataloque: ", marque_only_catalogue_df.count())
marque_only_catalogue_df.show(5)

In [None]:
# Créer CO2 pour les "marque" non présent dans CO2 
co2_marque_catalogue_avg_df = marque_only_catalogue_df.crossJoin(co2_all_avg_df)

In [None]:
# Union de tous CO2
co2_all_marque_catalogue = co2_marque_avg_df.union(co2_marque_catalogue_avg_df)

# Renommer colonnes
co2_all_marque_catalogue = co2_all_marque_catalogue.withColumnRenamed("Marque", "marque")
co2_all_marque_catalogue = co2_all_marque_catalogue.withColumnRenamed("Bonus / Malus", "bonusmalus")
co2_all_marque_catalogue = co2_all_marque_catalogue.withColumnRenamed("Rejets CO2 g/km", "rejetco2")
co2_all_marque_catalogue = co2_all_marque_catalogue.withColumnRenamed("Cout enerie", "coutenergie")

co2_all_marque_catalogue.show(5)

In [None]:
co2_all_marque_catalogue_tx = co2_all_marque_catalogue.withColumn("bonusmalus", col("bonusmalus").cast("string")) \
                                                    .withColumn("rejetco2", col("rejetco2").cast("string")) \
                                                      .withColumn("coutenergie", col("coutenergie").cast("string"))


co2_all_marque_catalogue_tx.show(5)

### Intégration de CO2 dans catalogue

In [None]:
catalogue_hive_df.show(5)

In [None]:
catalogue_co2_df = catalogue_hive_df.\
        join(co2_all_marque_catalogue_tx, \
             catalogue_hive_df["_c1"] == co2_all_marque_catalogue_tx["marque"], "inner")

In [None]:
catalogue_co2_df.show(5)

In [None]:
print("Catalogue : ", catalogue_co2_df.count())

In [None]:
catalogue_co2_df = catalogue_co2_df.drop("_c1")

In [None]:
catalogue_co2_df.show(5)

In [None]:
catalogue_co2_df.write.csv("/tpa_groupe_14/mapreduce/", header=False)

In [None]:
catalogue_hive_df.unpersist()
marque_catalogue_df.unpersist()
co2_valid_df.unpersist()
co2_marque_avg_df.unpersist()
co2_all_avg_df.unpersist()
spark.stop()