In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import avg, when, trim, split, regexp_replace, round, lower, col, encode, count, levenshtein, row_number, broadcast, coalesce

In [None]:
spark = SparkSession.builder\
    .appName("AggregateCatalogueCo2")\
    .enableHiveSupport()\
    .getOrCreate()


In [None]:
spark.sparkContext.setLogLevel("OFF")
spark.catalog.clearCache()
spark.sql("USE concessionnaire")

df_catalogue = spark.sql("SELECT * FROM catalogue_ext")
df_co2 = spark.sql("SELECT * FROM crit_air_ext")

df_catalogue = df_catalogue.filter(df_catalogue['marque'] != 'marque') # A GERER DANS L'IMPORT DE HIVE ???

In [None]:
df_co2.show()
df_catalogue.show()

### Remarque 001

- **co2** dispose de marque et modele dans la meme colone
- La colone **nom** du catalogue n'est pas nommée **modele** dans **co2**
- La colone **modele** dans les 2 tableaux n'ont pas la meme casse.
- La colone **marque** dans les 2 tableaux n'ont pas la meme casse.
- Le signe **€** est mentionné dans la colone **bonus_malus** de co2.
- Le signe **€** est mentionné dans la colone **cout_energie** de co2.
- Le chiffre 1 peut apparaitre après le signe **€** dans la colone **bonus_malus** de co2.
- Le symbole "�" apparaît dans la colonne **longueur**.


In [None]:
# co2 dispose de marque et modele dans la meme colone
df_co2 = df_co2.withColumn("marque", split(df_co2["marque_modele"], " ", 2).getItem(0))
df_co2 = df_co2.withColumn("modele", split(df_co2["marque_modele"], " ", 2).getItem(1))
df_co2 = df_co2.drop('marque_modele')

# La colone **nom** du catalogue n'est pas nommée **modele** dans dans **co2**
df_catalogue = df_catalogue.withColumnRenamed("nom", "modele")

# La colone modele dans les 2 tableaux n’ont pas la meme casse.
df_co2 = df_co2.withColumn("marque", lower(trim(col("marque"))))
df_catalogue = df_catalogue.withColumn("marque", lower(trim(col("marque"))))

# La colone modele dans les 2 tableaux n’ont pas la meme casse.
df_co2 = df_co2.withColumn("modele", lower(trim(col("modele"))))
df_catalogue = df_catalogue.withColumn("modele", lower(trim(col("modele"))))

# Le signe € est mentionné dans la colone bonus_malus de co2.
df_co2 = df_co2.withColumn("bonus_malus", split(trim(df_co2["bonus_malus"]), "€").getItem(0))
# Le signe € est mentionné dans la colone cout_energie de co2.
df_co2 = df_co2.withColumn("cout_energie", split(trim(df_co2["cout_energie"]), "€").getItem(0))

# Le chiffre 1 peut apparaitre après le signe € dans la colone bonus_malus de co2.
df_co2 = df_co2.withColumn("bonus_malus", regexp_replace(trim(df_co2["bonus_malus"]), "[^0-9-]", "").cast("float"))

#Le symbole "�" apparaît dans la colonne longueur.
df_catalogue = df_catalogue.withColumn("longueur", regexp_replace(col("longueur"), "�", "e"))



In [None]:
df_co2.show()
df_catalogue.show()

### Remarque 002

- Recherche d'éventuel autre "�".
- Correction suivant résultat des recherches de "�"

In [None]:
#Recherche d'éventuel autre "�".
df_search_special_char_catalogue = df_catalogue.filter(
    col("marque").like("%�%") | 
    col("modele").like("%�%") | 
    col("couleur").like("%�%") | 
    col("longueur").like("%�%")
)

df_search_special_char_co2 = df_co2.filter(
    col("marque").like("%�%") | 
    col("modele").like("%�%") 
)


df_search_special_char_catalogue.show()
df_search_special_char_co2.show()


In [None]:
# Étape 1: Extraire les marques correctes de df_co2
marques_correctes_df = df_co2.select('marque').distinct().alias('marques_correctes')

# Étape 2: Extraire les marques du catalogue
marques_catalogue_df = df_catalogue.select('marque').distinct().alias('marques_catalogue')

# Étape 3: Créer le mapping des marques
df_cross = marques_catalogue_df.crossJoin(broadcast(marques_correctes_df))

df_cross = df_cross.withColumn('distance', levenshtein(col('marques_catalogue.marque'), col('marques_correctes.marque')))

window = Window.partitionBy('marques_catalogue.marque').orderBy(col('distance'))
df_min_distance = df_cross.withColumn('rn', row_number().over(window)).filter(col('rn') == 1)

marque_mapping = df_min_distance.select(
    col('marques_catalogue.marque').alias('marque_catalogue'),
    col('marques_correctes.marque').alias('marque_correcte'),
    'distance'
).filter(col('distance') <= 2)

# Étape 4: Appliquer le mapping à df_catalogue
df_catalogue_corrected = df_catalogue.join(
    marque_mapping,
    df_catalogue.marque == marque_mapping.marque_catalogue,
    how='left'
)

df_catalogue_corrected = df_catalogue_corrected.withColumn(
    'marque',
    coalesce(col('marque_correcte'), col('marque'))
).drop('marque_catalogue', 'marque_correcte', 'distance')

# Vérification du résultat
df_catalogue_corrected.printSchema()
df_catalogue_corrected.show(n=1000, truncate=False)

# Optionnel : Remplacer df_catalogue par le DataFrame corrigé
df_catalogue = df_catalogue_corrected

In [None]:
marques_count_catalogue = df_catalogue.groupBy("marque").count()
marques_count_co2 = df_co2.groupBy("marque").count()

marques_count_catalogue = marques_count_catalogue.withColumnRenamed("count", "count_catalogue")
marques_count_co2 = marques_count_co2.withColumnRenamed("count", "count_co2")

marques_count_joined = marques_count_catalogue.join(
    marques_count_co2,
    on="marque",
    how="outer"  # Utilise "left", "right", ou "outer" si nécessaire
).select(
    "marque",
    F.col("count_catalogue"),
    F.col("count_co2")
)

marques_count_joined = marques_count_joined.fillna(0, subset=["count_catalogue", "count_co2"])


marques_count_joined.show(n=1000)

In [None]:
modele_count_catalogue = df_catalogue.groupBy("modele").count()
modele_count_co2 = df_co2.groupBy("modele").count()

modele_count_catalogue = modele_count_catalogue.withColumnRenamed("count", "count_catalogue")
modele_count_co2 = modele_count_co2.withColumnRenamed("count", "count_co2")

modele_count_joined = modele_count_catalogue.join(
    modele_count_co2,
    on="modele",
    how="outer"  # Utilise "left", "right", ou "outer" si nécessaire
).select(
    "modele",
    F.col("count_catalogue"),
    F.col("count_co2")
)

modele_count_joined_match = modele_count_catalogue.join(
    modele_count_co2,
    on="modele",
    how="inner"  # Utilise "left", "right", ou "outer" si nécessaire
).select(
    "modele",
    F.col("count_catalogue"),
    F.col("count_co2")
)

modele_count_joined = modele_count_joined.fillna(0, subset=["count_catalogue", "count_co2"])


modele_count_joined.show(n=1000, truncate=False)
modele_count_joined_match.show(n=1000, truncate=False)

In [None]:
df_normalized = marques_count_joined.withColumn("marque_normalisee", lower(regexp_replace("marque", "ï", "i")))
df_normalized.show(n=1000)