# Nettoyage de la dataset avec Spark (pyspark)

In [None]:
!pip install numpy

Defaulting to user installation because normal site-packages is not writeable
Collecting numpy
  Downloading numpy-2.0.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (60 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m60.9/60.9 kB[0m [31m183.9 kB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hDownloading numpy-2.0.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (19.3 MB)
[2K   [91m━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.5/19.3 MB[0m [31m1.0 MB/s[0m eta [36m0:00:16[0m0m

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, format_string,round,lit,when,regexp_replace,ceil,concat


In [2]:
# Initialiser la session Spark
spark = SparkSession.builder \
    .appName("Nettoyage des données financières") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


## Nettoyage des données financières des clients

In [3]:
df = spark.read.option("header", "true").option("encoding", "UTF-8").csv("/Omnidata_PFE_dataset/Situation_Financière.csv")
df.show()


                                                                                

+------+---------------+------------+--------------------+--------------+
|Emp_ID|TauxEndettement|Depense_Gain|              Revenu|Montant_revenu|
+------+---------------+------------+--------------------+--------------+
|PDBO18|             30|   Crediteur|Investissements i...|         31240|
|URKQ44|              9|   Crediteur|Travail Independa...|         21402|
|CJHP88|             37|   Crediteur|Location Immobili...|         46840|
|MFVC48|             33|   Crediteur|Salaire Net (1007...|         78981|
|FRQG19|             79|   Crediteur|Salaire Net (1520...|          5620|
|AFSY16|             77|   Crediteur|Placements Financ...|         94054|
|OOJJ49|             81|    Debiteur|Salaire Net (1522...|         65277|
|GUTI55|             58|   Crediteur|Salaire Net (2069...|         34914|
|YEBV67|             39|   Crediteur| Salaire Net (22364)|          7647|
|HFRK17|             42|   Crediteur|           Agricoles|         26259|
|DPNR37|             74|    Debiteur|T

In [4]:
# Afficher le schéma pour vérifier les types de données
df.printSchema()

root
 |-- Emp_ID: string (nullable = true)
 |-- TauxEndettement: string (nullable = true)
 |-- Depense_Gain: string (nullable = true)
 |-- Revenu: string (nullable = true)
 |-- Montant_revenu: string (nullable = true)



In [5]:
# Nombre d'élément dans la dataset
print(df.count())

[Stage 2:>                                                          (0 + 2) / 2]

1000000


                                                                                

#### Transformer les taux d'endettement dans un intervalle [0,60]

In [6]:
# Convertir la colonne TauxEndettement en entier (int)
filtered_df = df.withColumn("TauxEndettement", col("TauxEndettement").cast("int"))

# Filtrer les lignes où TauxEndettement > 60
filtered_df = filtered_df.filter(col("TauxEndettement") > 60)

# Compter le nombre de lignes filtrées
count = filtered_df.count()

print(f"Nombre de personnes dont le TauxEndettement est supérieur à 60: {count}")

[Stage 5:>                                                          (0 + 2) / 2]

Nombre de personnes dont le TauxEndettement est supérieur à 60: 395738


                                                                                

In [7]:
from pyspark.sql.functions import col, when

# Filtrer les lignes où TauxEndettement > 60
filtered_df = filtered_df.filter(col("TauxEndettement") > 50)

# Modifier le TauxEndettement pour les personnes dont le TauxEndettement > 50
filtered_df = df.withColumn("TauxEndettement", 
                                     when(col("TauxEndettement") > 50, 
                                          ((col("TauxEndettement") - 50) * 50 / (100 - 50)))
                                     .otherwise(col("TauxEndettement")).cast("int"))


# Afficher le schéma pour vérifier les types de données
filtered_df.printSchema()

# Afficher quelques lignes pour vérifier les modifications
filtered_df.show()

root
 |-- Emp_ID: string (nullable = true)
 |-- TauxEndettement: integer (nullable = true)
 |-- Depense_Gain: string (nullable = true)
 |-- Revenu: string (nullable = true)
 |-- Montant_revenu: string (nullable = true)

+------+---------------+------------+--------------------+--------------+
|Emp_ID|TauxEndettement|Depense_Gain|              Revenu|Montant_revenu|
+------+---------------+------------+--------------------+--------------+
|PDBO18|             30|   Crediteur|Investissements i...|         31240|
|URKQ44|              9|   Crediteur|Travail Independa...|         21402|
|CJHP88|             37|   Crediteur|Location Immobili...|         46840|
|MFVC48|             33|   Crediteur|Salaire Net (1007...|         78981|
|FRQG19|             29|   Crediteur|Salaire Net (1520...|          5620|
|AFSY16|             27|   Crediteur|Placements Financ...|         94054|
|OOJJ49|             31|    Debiteur|Salaire Net (1522...|         65277|
|GUTI55|              8|   Crediteur|Sal

In [8]:
# Filtrer les lignes où TauxEndettement > 60
n = filtered_df.filter(col("TauxEndettement") > 50)

# Compter le nombre de lignes filtrées
count = n.count()

print(f"Nombre de personnes dont le TauxEndettement est supérieur à 60: {count}")

[Stage 9:>                                                          (0 + 2) / 2]

Nombre de personnes dont le TauxEndettement est supérieur à 60: 0


                                                                                

#### Arrondir le montant de revenu

In [9]:
# Round the Montant_revenu column to the nearest multiple of 1000
filtered_df = filtered_df.withColumn("Montant_revenu", round(col("Montant_revenu") / 1000) * 1000)

# Format the Montant_revenu column to string with trailing zeros
filtered_df = filtered_df.withColumn("Montant_revenu", format_string("%.0f", col("Montant_revenu")))

# Show the results
filtered_df.show(truncate=False)

+------+---------------+------------+-----------------------------------------------------------------------------------+--------------+
|Emp_ID|TauxEndettement|Depense_Gain|Revenu                                                                             |Montant_revenu|
+------+---------------+------------+-----------------------------------------------------------------------------------+--------------+
|PDBO18|30             |Crediteur   |Investissements immobiliers- Agricoles                                             |31000         |
|URKQ44|9              |Crediteur   |Travail Independant ou Freelance                                                   |21000         |
|CJHP88|37             |Crediteur   |Location Immobiliere- Pension                                                      |47000         |
|MFVC48|33             |Crediteur   |Salaire Net (10071)- Location de biens Personnels                                  |79000         |
|FRQG19|29             |Crediteur   |Sala

#### Supprimer tous les clients qui ont à la fois un Salaire et une Pension dans leurs revenus

In [10]:
# Filtrer les lignes où la colonne 'Revenu' contient à la fois "Salaire" et "Pension"
filtre = filtered_df.filter((col("Revenu").contains("Salaire")) & (col("Revenu").contains("Pension")))

# Afficher le nombre d'éléments du DataFrame filtré
count = filtre.count()

print(f"Nombre d'éléments avec Salaire et Pension : {count}")

[Stage 13:>                                                         (0 + 2) / 2]

Nombre d'éléments avec Salaire et Pension : 37083


                                                                                

In [11]:
# Supprimer ces lignes du DataFrame d'origine
filtered_df = filtered_df.filter(~((col("Revenu").contains("Salaire")) & (col("Revenu").contains("Pension"))))

# Afficher le DataFrame après suppression des lignes
filtered_df.show(truncate=False)

+------+---------------+------------+-----------------------------------------------------------------------------------+--------------+
|Emp_ID|TauxEndettement|Depense_Gain|Revenu                                                                             |Montant_revenu|
+------+---------------+------------+-----------------------------------------------------------------------------------+--------------+
|PDBO18|30             |Crediteur   |Investissements immobiliers- Agricoles                                             |31000         |
|URKQ44|9              |Crediteur   |Travail Independant ou Freelance                                                   |21000         |
|CJHP88|37             |Crediteur   |Location Immobiliere- Pension                                                      |47000         |
|MFVC48|33             |Crediteur   |Salaire Net (10071)- Location de biens Personnels                                  |79000         |
|FRQG19|29             |Crediteur   |Sala

In [12]:
# Filtrer les lignes où la colonne 'Revenu' contient à la fois "Salaire" et "Pension"
filtre = filtered_df.filter((col("Revenu").contains("Salaire")) & (col("Revenu").contains("Pension")))

# Afficher le nombre d'éléments du DataFrame filtré
count = filtre.count()

print(f"Nombre d'éléments avec Salaire et Pension : {count}")

[Stage 17:>                                                         (0 + 2) / 2]

Nombre d'éléments avec Salaire et Pension : 0


                                                                                

In [13]:
# Nombre d'élément dans la dataset
print(filtered_df.count())



962917


                                                                                

## Nettoyage des données personnelles des clients 

In [14]:
info_perso = spark.read.option("header", "true").option("encoding", "UTF-8").csv("/Omnidata_PFE_dataset/Informations_Personnelle.csv")
info_perso.show()


+------+----------+--------+--------------------+-----------+---+-----------+--------+--------------------+
|Emp_ID|       Nom|  Prénom|    Adresse_actuelle|      Ville|Age| État_civil|    Sexe|          Profession|
+------+----------+--------+--------------------+-----------+---+-----------+--------+--------------------+
|PDBO18|   Mcclure|   Nancy|2711 Lisa Crossro...|Beni Mellal| 84|    Divorcé| Féminin|Travailleur Intér...|
|URKQ44|  Galloway|  Briana|USNV Bautista FPO...|    Kénitra| 42|      Veuve|Masculin|Travailleur saiso...|
|CJHP88|    Sutton|   Jason|393 Reyes Mall Mi...|     Meknès| 79|Célibataire| Féminin|                 RME|
|MFVC48|  Ferguson|   Jason|19887 Mary Rapids...|    Kénitra| 69|Célibataire|Masculin|Travailleur Indép...|
|FRQG19|     Cross|  Robert|0485 Rivera Squar...|     Agadir| 30|Célibataire| Féminin|       Fonctionnaire|
|AFSY16|     Allen|    Mary|6637 Bradley Fore...| Mohammedia| 55|Célibataire|Masculin|Travailleur Indép...|
|OOJJ49| Rodriguez|   Laura|

#### Supprimer l'adresse des clients car elles sont fictives

In [15]:
# Supprimer la colonne 'Adresse'
info_perso = info_perso.drop("Adresse_actuelle")

# Afficher les résultats
info_perso.show()

+------+----------+--------+-----------+---+-----------+--------+--------------------+
|Emp_ID|       Nom|  Prénom|      Ville|Age| État_civil|    Sexe|          Profession|
+------+----------+--------+-----------+---+-----------+--------+--------------------+
|PDBO18|   Mcclure|   Nancy|Beni Mellal| 84|    Divorcé| Féminin|Travailleur Intér...|
|URKQ44|  Galloway|  Briana|    Kénitra| 42|      Veuve|Masculin|Travailleur saiso...|
|CJHP88|    Sutton|   Jason|     Meknès| 79|Célibataire| Féminin|                 RME|
|MFVC48|  Ferguson|   Jason|    Kénitra| 69|Célibataire|Masculin|Travailleur Indép...|
|FRQG19|     Cross|  Robert|     Agadir| 30|Célibataire| Féminin|       Fonctionnaire|
|AFSY16|     Allen|    Mary| Mohammedia| 55|Célibataire|Masculin|Travailleur Indép...|
|OOJJ49| Rodriguez|   Laura|     Meknès| 60|      Veuve| Féminin| Profession libérale|
|GUTI55|  Chambers|  Brenda|    Tétouan| 27|    Divorcé|Masculin|             Salarié|
|YEBV67|      Case|   Brian|     Agadir| 43

#### Transformer l'âge pour qu'il se situe dans l'intervalle [18,80]

In [16]:
# Filtrer les personnes ayant 65 ans ou plus
age_65_plus_df = info_perso.filter(col("Age") > 65)

# Compter le nombre de personnes de 65 ans ou plus
count_age_65_plus = age_65_plus_df.count()

# Afficher le résultat
print(f"Nombre de personnes ayant 65 ans ou plus : {count_age_65_plus}")

[Stage 26:>                                                         (0 + 2) / 2]

Nombre de personnes ayant 65 ans ou plus : 421216


                                                                                

In [17]:
# Filtrer les personnes ayant 80 ans ou plus
age_80_plus_df = info_perso.filter(col("Age") > 80)

# Compter le nombre de personnes de 80 ans ou plus
count_age_80_plus = age_80_plus_df.count()

# Afficher le résultat
print(f"Nombre de personnes ayant 80 ans ou plus : {count_age_80_plus}")

[Stage 29:>                                                         (0 + 2) / 2]

Nombre de personnes ayant 80 ans ou plus : 240555


                                                                                

In [18]:
# Modifier l'Age pour les personnes dont l'Âge > 65
info_perso = info_perso.withColumn(
    "Age",
    when(col("Age") > 80, ((col("Age") - 65) * (65-18) / (100 - 65)) + 18).otherwise(col("Age")).cast("int")
)

# Afficher le schéma pour vérifier les types de données
info_perso.printSchema()

# Afficher quelques lignes pour vérifier les modifications
info_perso.show()

root
 |-- Emp_ID: string (nullable = true)
 |-- Nom: string (nullable = true)
 |-- Prénom: string (nullable = true)
 |-- Ville: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- État_civil: string (nullable = true)
 |-- Sexe: string (nullable = true)
 |-- Profession: string (nullable = true)

+------+----------+--------+-----------+---+-----------+--------+--------------------+
|Emp_ID|       Nom|  Prénom|      Ville|Age| État_civil|    Sexe|          Profession|
+------+----------+--------+-----------+---+-----------+--------+--------------------+
|PDBO18|   Mcclure|   Nancy|Beni Mellal| 43|    Divorcé| Féminin|Travailleur Intér...|
|URKQ44|  Galloway|  Briana|    Kénitra| 42|      Veuve|Masculin|Travailleur saiso...|
|CJHP88|    Sutton|   Jason|     Meknès| 79|Célibataire| Féminin|                 RME|
|MFVC48|  Ferguson|   Jason|    Kénitra| 69|Célibataire|Masculin|Travailleur Indép...|
|FRQG19|     Cross|  Robert|     Agadir| 30|Célibataire| Féminin|       Fonctionn

#### Réduire le nombre de clients âgés de 66 à 80 ans en rajeunissant leur âge

In [19]:
# Filtrer les personnes ayant 60 ans ou plus
age_65_plus_df = info_perso.filter(col("Age") > 65)

# Compter le nombre de personnes de 60 ans ou plus
count_age_65_plus = age_65_plus_df.count()

# Afficher le résultat
print(f"Nombre de personnes ayant 65 ans ou plus : {count_age_65_plus}")

[Stage 33:>                                                         (0 + 2) / 2]

Nombre de personnes ayant 65 ans ou plus : 180661


                                                                                

In [20]:
from pyspark.sql import functions as F

# Modifier l'Age pour les personnes dont l'Âge > 65
info_perso = info_perso.withColumn(
    "Age",
    when(col("Age") > 65, 
         when(F.rand() <= 0.6, ((col("Age") - 65) * (65 - 18) / (100 - 65)) + 18)
         .otherwise(col("Age"))
    ).otherwise(col("Age")).cast("int")
)

# Afficher le schéma pour vérifier les types de données
info_perso.printSchema()

# Afficher quelques lignes pour vérifier les modifications
info_perso.show(truncate=False)

root
 |-- Emp_ID: string (nullable = true)
 |-- Nom: string (nullable = true)
 |-- Prénom: string (nullable = true)
 |-- Ville: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- État_civil: string (nullable = true)
 |-- Sexe: string (nullable = true)
 |-- Profession: string (nullable = true)

+------+----------+--------+-----------+---+-----------+--------+---------------------------------------+
|Emp_ID|Nom       |Prénom  |Ville      |Age|État_civil |Sexe    |Profession                             |
+------+----------+--------+-----------+---+-----------+--------+---------------------------------------+
|PDBO18|Mcclure   |Nancy   |Beni Mellal|43 |Divorcé    |Féminin |Travailleur Intérimaire ou Contractuel |
|URKQ44|Galloway  |Briana  |Kénitra    |42 |Veuve      |Masculin|Travailleur saisonnier ou intermittent |
|CJHP88|Sutton    |Jason   |Meknès     |36 |Célibataire|Féminin |RME                                    |
|MFVC48|Ferguson  |Jason   |Kénitra    |23 |Célibatair

In [21]:
# Filtrer les personnes ayant 60 ans ou plus
age_65_plus_df = info_perso.filter(col("Age") > 65)

# Compter le nombre de personnes de 60 ans ou plus
count_age_65_plus = age_65_plus_df.count()

# Afficher le résultat
print(f"Nombre de personnes ayant 65 ans ou plus : {count_age_65_plus}")

[Stage 37:>                                                         (0 + 2) / 2]

Nombre de personnes ayant 65 ans ou plus : 72505


                                                                                

In [22]:
# Filtrer les lignes où l'âge est strictement supérieur à 63
filtered_age_df = info_perso.filter(col("Age") > 63)

# Afficher le schéma pour vérifier les types de données
filtered_age_df.printSchema()

# Afficher quelques lignes pour vérifier les modifications
filtered_age_df.show(truncate=False)

root
 |-- Emp_ID: string (nullable = true)
 |-- Nom: string (nullable = true)
 |-- Prénom: string (nullable = true)
 |-- Ville: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- État_civil: string (nullable = true)
 |-- Sexe: string (nullable = true)
 |-- Profession: string (nullable = true)

+------+--------+---------+----------+---+-----------+--------+--------------------------------------+
|Emp_ID|Nom     |Prénom   |Ville     |Age|État_civil |Sexe    |Profession                            |
+------+--------+---------+----------+---+-----------+--------+--------------------------------------+
|HFRK17|Pruitt  |Jennifer |Casablanca|68 |Célibataire|Féminin |Fonctionnaire                         |
|DPNR37|Cobb    |Daniel   |Marrakech |66 |Célibataire|Féminin |RME                                   |
|MZIY07|Walker  |Victoria |Mohammedia|76 |Célibataire|Féminin |Salarié                               |
|OINI05|Howell  |Sherri   |Fès       |74 |Marié      |Féminin |Travaille

#### Remplacer la profession de toutes les personnes âgées de 62 ans ou plus par "Retraité"

In [23]:
# Remplacer la profession par "Retraité" pour les personnes ayant un âge >= 62
info_perso = info_perso.withColumn(
    "Profession",
    when(col("Age") >= 62, "Retraité").otherwise(col("Profession"))
)

# Afficher le schéma pour vérifier les types de données
info_perso.printSchema()

# Afficher quelques lignes pour vérifier les modifications
info_perso.show()

root
 |-- Emp_ID: string (nullable = true)
 |-- Nom: string (nullable = true)
 |-- Prénom: string (nullable = true)
 |-- Ville: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- État_civil: string (nullable = true)
 |-- Sexe: string (nullable = true)
 |-- Profession: string (nullable = true)

+------+----------+--------+-----------+---+-----------+--------+--------------------+
|Emp_ID|       Nom|  Prénom|      Ville|Age| État_civil|    Sexe|          Profession|
+------+----------+--------+-----------+---+-----------+--------+--------------------+
|PDBO18|   Mcclure|   Nancy|Beni Mellal| 43|    Divorcé| Féminin|Travailleur Intér...|
|URKQ44|  Galloway|  Briana|    Kénitra| 42|      Veuve|Masculin|Travailleur saiso...|
|CJHP88|    Sutton|   Jason|     Meknès| 36|Célibataire| Féminin|                 RME|
|MFVC48|  Ferguson|   Jason|    Kénitra| 23|Célibataire|Masculin|Travailleur Indép...|
|FRQG19|     Cross|  Robert|     Agadir| 30|Célibataire| Féminin|       Fonctionn

#### Associer le revenu "Pension" à chaque retraité dans leurs revenus

In [24]:
# Filtrer les retraités qui n'ont pas déjà "Pension" dans leurs revenus
retraites_a_mettre_a_jour = filtered_df.join(info_perso, "Emp_ID") \
    .filter((col("Profession") == "Retraité") & (~col("Revenu").contains("Pension")))

# Mettre à jour la colonne `Revenu` pour les retraités filtrés
retraites_a_mettre_a_jour = retraites_a_mettre_a_jour.withColumn(
    "Revenu",
    concat(col("Revenu"), lit("-Pension"))
).select("Emp_ID", "Revenu")

# Mettre à jour les DataFrames originaux
filtered_df = filtered_df.join(retraites_a_mettre_a_jour, "Emp_ID", "left").select(
    filtered_df["Emp_ID"],
    filtered_df["TauxEndettement"],
    filtered_df["Depense_Gain"],
    when(retraites_a_mettre_a_jour["Revenu"].isNull(), filtered_df["Revenu"]).otherwise(retraites_a_mettre_a_jour["Revenu"]).alias("Revenu"),
    filtered_df["Montant_revenu"]
)

# Afficher les résultats pour vérifier
filtered_df.show()

                                                                                

+------+---------------+------------+--------------------+--------------+
|Emp_ID|TauxEndettement|Depense_Gain|              Revenu|Montant_revenu|
+------+---------------+------------+--------------------+--------------+
|AFSY16|             27|   Crediteur|Placements Financ...|         94000|
|BQCN65|             11|    Debiteur|          Dividendes|         44000|
|CJHP88|             37|   Crediteur|Location Immobili...|         47000|
|DPNR37|             24|    Debiteur|Travail Independa...|         91000|
|DWQS91|             26|   Crediteur|Placements Financ...|         26000|
|FKWM95|             37|    Debiteur|Investissements i...|         56000|
|FRQG19|             29|   Crediteur|Salaire Net (1520...|          6000|
|GRXK41|             11|    Debiteur|          Dividendes|         17000|
|HFRK17|             42|   Crediteur|   Agricoles-Pension|         26000|
|HIDQ28|             46|   Crediteur|Location Immobili...|          5000|
|HPQQ76|             12|   Crediteur|A

                                                                                

In [25]:
# Filtrer les retraités dans info_perso
retraites_info_perso = info_perso.filter(col("Profession") == "Retraité")

# Joindre les retraités avec leurs revenus dans filtered_df
retraites_revenus = retraites_info_perso.join(filtered_df, "Emp_ID")

# Afficher les résultats
retraites_revenus.show(truncate=False)

                                                                                

+------+--------+--------+-----------+---+-----------+--------+----------+---------------+------------+------------------------------------------------------------------------+--------------+
|Emp_ID|Nom     |Prénom  |Ville      |Age|État_civil |Sexe    |Profession|TauxEndettement|Depense_Gain|Revenu                                                                  |Montant_revenu|
+------+--------+--------+-----------+---+-----------+--------+----------+---------------+------------+------------------------------------------------------------------------+--------------+
|AAAA89|Chan    |Alicia  |Tétouan    |49 |Divorcé    |Féminin |Retraité  |0              |Debiteur    |Placements Financiers-Pension                                           |81000         |
|AAAD43|Lee     |Dylan   |Rabat      |54 |Célibataire|Féminin |Retraité  |16             |Debiteur    |Pension- Agricoles                                                      |34000         |
|AAAD43|Lee     |Dylan   |Rabat      |54

                                                                                

#### Supprimer toutes les personnes considérées comme retraitées et âgées de 18 à 54 ans

In [26]:
# Compter les personnes dont l'âge est compris entre 18 et 54 inclus et la profession est "Retraité"
retirees_age_18_to_54_count = info_perso.filter(
    (col("Age").between(18, 54)) & (col("Profession") == "Retraité")
).count()

print(f"Nombre de retraités âgés de 18 à 54 ans avant suppression : {retirees_age_18_to_54_count}")



Nombre de retraités âgés de 18 à 54 ans avant suppression : 69622


                                                                                

In [27]:
# Remplacer la profession "Retraité" par "Salarié" pour les personnes dont l'âge est compris entre 18 et 54 inclus
info_perso = info_perso.withColumn(
    "Profession",
    when((col("Age").between(18, 54)) & (col("Profession") == "Retraité"), "Salarié").otherwise(col("Profession"))
)

# Afficher les résultats pour vérifier
info_perso.show()

+------+----------+--------+-----------+---+-----------+--------+--------------------+
|Emp_ID|       Nom|  Prénom|      Ville|Age| État_civil|    Sexe|          Profession|
+------+----------+--------+-----------+---+-----------+--------+--------------------+
|PDBO18|   Mcclure|   Nancy|Beni Mellal| 43|    Divorcé| Féminin|Travailleur Intér...|
|URKQ44|  Galloway|  Briana|    Kénitra| 42|      Veuve|Masculin|Travailleur saiso...|
|CJHP88|    Sutton|   Jason|     Meknès| 36|Célibataire| Féminin|                 RME|
|MFVC48|  Ferguson|   Jason|    Kénitra| 23|Célibataire|Masculin|Travailleur Indép...|
|FRQG19|     Cross|  Robert|     Agadir| 30|Célibataire| Féminin|       Fonctionnaire|
|AFSY16|     Allen|    Mary| Mohammedia| 55|Célibataire|Masculin|Travailleur Indép...|
|OOJJ49| Rodriguez|   Laura|     Meknès| 60|      Veuve| Féminin| Profession libérale|
|GUTI55|  Chambers|  Brenda|    Tétouan| 27|    Divorcé|Masculin|             Salarié|
|YEBV67|      Case|   Brian|     Agadir| 43

In [28]:
# Compter les personnes dont l'âge est compris entre 18 et 54 inclus et la profession est "Retraité"
retirees_age_18_to_54_count = info_perso.filter(
    (col("Age").between(18, 54)) & (col("Profession") == "Retraité")
).count()

print(f"Nombre de retraités âgés de 18 à 54 ans avant suppression : {retirees_age_18_to_54_count}")

[Stage 64:>                                                         (0 + 2) / 2]

Nombre de retraités âgés de 18 à 54 ans avant suppression : 0


                                                                                

#### Transformer tous les hommes considérés comme "Veuve" en "Célibataire"

In [29]:
# Filter the DataFrame for 'Masculin' and 'Veuve' and count the rows
count_masculin_veuve = info_perso.filter((col("Sexe") == "Masculin") & (col("État_civil") == "Veuve")).count()

print(f"Number of 'Masculin' with 'Veuve' status: {count_masculin_veuve}")

[Stage 67:>                                                         (0 + 2) / 2]

Number of 'Masculin' with 'Veuve' status: 124873


                                                                                

In [30]:
# Update the 'Etat_civil' column
info_perso = info_perso.withColumn(
    "Etat_civil",
    when((col("Sexe") == "Masculin") & (col("État_civil") == "Veuve"), "Célibataire").otherwise(col("État_civil"))
)

# Show the updated DataFrame
info_perso.show()
filtered_df.show()

+------+----------+--------+-----------+---+-----------+--------+--------------------+-----------+
|Emp_ID|       Nom|  Prénom|      Ville|Age| État_civil|    Sexe|          Profession| Etat_civil|
+------+----------+--------+-----------+---+-----------+--------+--------------------+-----------+
|PDBO18|   Mcclure|   Nancy|Beni Mellal| 43|    Divorcé| Féminin|Travailleur Intér...|    Divorcé|
|URKQ44|  Galloway|  Briana|    Kénitra| 42|      Veuve|Masculin|Travailleur saiso...|Célibataire|
|CJHP88|    Sutton|   Jason|     Meknès| 36|Célibataire| Féminin|                 RME|Célibataire|
|MFVC48|  Ferguson|   Jason|    Kénitra| 23|Célibataire|Masculin|Travailleur Indép...|Célibataire|
|FRQG19|     Cross|  Robert|     Agadir| 30|Célibataire| Féminin|       Fonctionnaire|Célibataire|
|AFSY16|     Allen|    Mary| Mohammedia| 55|Célibataire|Masculin|Travailleur Indép...|Célibataire|
|OOJJ49| Rodriguez|   Laura|     Meknès| 60|      Veuve| Féminin| Profession libérale|      Veuve|
|GUTI55|  

                                                                                

+------+---------------+------------+--------------------+--------------+
|Emp_ID|TauxEndettement|Depense_Gain|              Revenu|Montant_revenu|
+------+---------------+------------+--------------------+--------------+
|AFSY16|             27|   Crediteur|Placements Financ...|         94000|
|BQCN65|             11|    Debiteur|          Dividendes|         44000|
|CJHP88|             37|   Crediteur|Location Immobili...|         47000|
|DPNR37|             24|    Debiteur|Travail Independa...|         91000|
|DWQS91|             26|   Crediteur|Placements Financ...|         26000|
|FKWM95|             37|    Debiteur|Investissements i...|         56000|
|FRQG19|             29|   Crediteur|Salaire Net (1520...|          6000|
|GRXK41|             11|    Debiteur|          Dividendes|         17000|
|HFRK17|             42|   Crediteur|   Agricoles-Pension|         26000|
|HIDQ28|             46|   Crediteur|Location Immobili...|          5000|
|HPQQ76|             12|   Crediteur|A

In [31]:
# Filter the DataFrame for 'Masculin' and 'Veuve' and count the rows
count_masculin_veuve = info_perso.filter((col("Sexe") == "Masculin") & (col("Etat_civil") == "Veuve")).count()

print(f"Number of 'Masculin' with 'Veuve' status: {count_masculin_veuve}")

[Stage 79:>                                                         (0 + 2) / 2]

Number of 'Masculin' with 'Veuve' status: 0


                                                                                

#### Enlever le revenu "Pension" des personnes âgées de 18 à 54 ans dans leurs revenus

In [32]:
# Filtrer les personnes dans info_perso dont l'âge est compris entre 18 et 54
age_filtered_df = info_perso.filter(col("Age").between(18, 54))

# Filtrer les personnes dans filtered_df dont le revenu contient "Pension"
pension_filtered_df = filtered_df.filter(col("Revenu").contains("Pension"))

# Compter le nombre de personnes dont l'âge est compris entre 18 et 54 et dont le revenu contient "Pension"
count = age_filtered_df.join(pension_filtered_df, "Emp_ID", "inner").count()

print(f"Nombre de personnes dont l'âge est compris entre 18 et 54 et dont le revenu contient 'Pension' : {count}")

[Stage 91:>                                                         (0 + 2) / 2]

Nombre de personnes dont l'âge est compris entre 18 et 54 et dont le revenu contient 'Pension' : 199832


                                                                                

In [17]:
# Filtrer les personnes dans info_perso dont l'âge est compris entre 18 et 54
age_filtered_df = info_perso.filter(col("Age").between(18, 54))

# Filtrer les personnes dans filtered_df dont le revenu contient "Pension"
pension_filtered_df = filtered_df.filter(col("Revenu").contains("Pension"))

# Trouver les Emp_ID correspondant aux critères
emp_ids_to_update = age_filtered_df.join(pension_filtered_df, "Emp_ID", "inner").select("Emp_ID").distinct()

# Mettre à jour la colonne `Revenu` pour supprimer "Pension" au début ou "-Pension" au milieu/à la fin
updated_filtered_df = filtered_df.join(emp_ids_to_update, "Emp_ID", "inner").withColumn(
    "Revenu",
    regexp_replace(col("Revenu"), r"^Pension|-Pension|- Pension", "")
)

# Mettre à jour le DataFrame original
filtered_df = filtered_df.join(updated_filtered_df.select("Emp_ID", "Revenu"), "Emp_ID", "left").select(
    filtered_df["Emp_ID"],
    filtered_df["TauxEndettement"],
    filtered_df["Depense_Gain"],
    when(updated_filtered_df["Revenu"].isNull(), filtered_df["Revenu"]).otherwise(updated_filtered_df["Revenu"]).alias("Revenu"),
    filtered_df["Montant_revenu"]
)

# Afficher les résultats pour vérifier
filtered_df.show()

[Stage 38:>                                                         (0 + 1) / 1]

+------+---------------+------------+--------------------+--------------+
|Emp_ID|TauxEndettement|Depense_Gain|              Revenu|Montant_revenu|
+------+---------------+------------+--------------------+--------------+
|AFSY16|             27|   Crediteur|Placements Financ...|         94000|
|BQCN65|             11|    Debiteur|          Dividendes|         44000|
|CJHP88|             37|   Crediteur|Location Immobili...|         47000|
|DPNR37|             24|    Debiteur|Travail Independa...|         91000|
|DWQS91|             26|   Crediteur|Placements Financ...|         26000|
|FKWM95|             37|    Debiteur|Investissements i...|         56000|
|FRQG19|             29|   Crediteur|Salaire Net (1520...|          6000|
|GRXK41|             11|    Debiteur|          Dividendes|         17000|
|HFRK17|             42|   Crediteur|           Agricoles|         26000|
|HIDQ28|             46|   Crediteur|Location Immobili...|          5000|
|HPQQ76|             12|   Crediteur|A

                                                                                

In [18]:
# Filtrer les personnes dans info_perso dont l'âge est compris entre 18 et 54
age_filtered_df = info_perso.filter(col("Age").between(18, 54))

# Filtrer les personnes dans filtered_df dont le revenu contient "Pension"
pension_filtered_df = filtered_df.filter(col("Revenu").contains("Pension"))

# Compter le nombre de personnes dont l'âge est compris entre 18 et 54 et dont le revenu contient "Pension"
count = age_filtered_df.join(pension_filtered_df, "Emp_ID", "inner").count()

print(f"Nombre de personnes dont l'âge est compris entre 18 et 54 et dont le revenu contient 'Pension' : {count}")

[Stage 46:>                                                         (0 + 2) / 2]

Nombre de personnes dont l'âge est compris entre 18 et 54 et dont le revenu contient 'Pension' : 0


                                                                                

#### Sauvegarder les données personnelles et financières nettoyées

In [35]:
# Chemin HDFS pour sauvegarder le DataFrame nettoyé
output_path = "hdfs:///Omnidata_PFE_dataset/Information_Personnelle_nettoyee.csv"

# Sauvegarder le DataFrame nettoyé en CSV
info_perso.write.csv(output_path, header=True, mode="overwrite")

                                                                                

In [20]:
# Chemin HDFS pour sauvegarder le DataFrame nettoyé
output_path = "hdfs:///Omnidata_PFE_dataset/Situation_Financière_nettoyee.csv"

# Sauvegarder le DataFrame nettoyé en CSV
filtered_df.write.csv(output_path, header=True, mode="overwrite")

                                                                                

#### Nettoyage des données de crédit des clients

In [48]:
detail_credit = spark.read.option("header", "true").option("encoding", "UTF-8").csv("/Omnidata_PFE_dataset/Détail_Crédit.csv")
detail_credit.show(truncate=False)


+------+------------+----------+----------------------------------------------------+------------+------------+--------------------------------------------+--------------------------------------------------------------+
|Emp_ID|Type_credit |Duree_pret|Objet_pret                                          |Montant_pret|Credit_score|Historique_credits_precedent                |Garanties                                                     |
+------+------------+----------+----------------------------------------------------+------------+------------+--------------------------------------------+--------------------------------------------------------------+
|PDBO18|Consommation|51        |Événements Spéciaux                                 |9945159     |20          |Paiement par prélèvement automatique        |Domiciliation révocable de salaire                            |
|URKQ44|Immobilier  |346       |Frais d'études supérieur                            |5196140     |85          |Encours d

#### Arrondir la durée du prêt

In [49]:
# Arrondir la colonne Duree_pret
detail_credit = detail_credit.withColumn("Duree_pret", round(col("Duree_pret")))

# Afficher le schéma pour vérifier les types de données
detail_credit.printSchema()

# Afficher quelques lignes pour vérifier les modifications
detail_credit.show()

root
 |-- Emp_ID: string (nullable = true)
 |-- Type_credit: string (nullable = true)
 |-- Duree_pret: double (nullable = true)
 |-- Objet_pret: string (nullable = true)
 |-- Montant_pret: string (nullable = true)
 |-- Credit_score: string (nullable = true)
 |-- Historique_credits_precedent: string (nullable = true)
 |-- Garanties: string (nullable = true)

+------+------------+----------+--------------------+------------+------------+----------------------------+--------------------+
|Emp_ID| Type_credit|Duree_pret|          Objet_pret|Montant_pret|Credit_score|Historique_credits_precedent|           Garanties|
+------+------------+----------+--------------------+------------+------------+----------------------------+--------------------+
|PDBO18|Consommation|      51.0| Événements Spéciaux|     9945159|          20|        Paiement par prél...|Domiciliation rév...|
|URKQ44|  Immobilier|     346.0|Frais d'études su...|     5196140|          85|         Encours des impayés|   Caution s

In [50]:
# Arrondir la colonne Duree_pret au multiple de 30 le plus proche
# et s'assurer que les valeurs sont dans l'intervalle [6, 400]
detail_credit = detail_credit.withColumn(
    "Duree_pret",
    when(
        col("Duree_pret") > 400,
        400
    ).otherwise(
        when(
            ceil(col("Duree_pret") / 30) * 30 < 6,
            6
        ).otherwise(
            (ceil(col("Duree_pret") / 30) * 30).cast("int")
        )
    )
)

# Afficher le schéma pour vérifier les types de données
detail_credit.printSchema()

# Afficher quelques lignes pour vérifier les modifications
detail_credit.show()

root
 |-- Emp_ID: string (nullable = true)
 |-- Type_credit: string (nullable = true)
 |-- Duree_pret: integer (nullable = true)
 |-- Objet_pret: string (nullable = true)
 |-- Montant_pret: string (nullable = true)
 |-- Credit_score: string (nullable = true)
 |-- Historique_credits_precedent: string (nullable = true)
 |-- Garanties: string (nullable = true)

+------+------------+----------+--------------------+------------+------------+----------------------------+--------------------+
|Emp_ID| Type_credit|Duree_pret|          Objet_pret|Montant_pret|Credit_score|Historique_credits_precedent|           Garanties|
+------+------------+----------+--------------------+------------+------------+----------------------------+--------------------+
|PDBO18|Consommation|        60| Événements Spéciaux|     9945159|          20|        Paiement par prél...|Domiciliation rév...|
|URKQ44|  Immobilier|       360|Frais d'études su...|     5196140|          85|         Encours des impayés|   Caution 

In [51]:
# Round the Montant_pret column to the nearest multiple of 1,000,000
detail_credit = detail_credit.withColumn("Montant_pret", round(col("Montant_pret") / 1000) * 1000)

# Format the Montant_pret column to string with trailing zeros
detail_credit = detail_credit.withColumn("Montant_pret", format_string("%.0f", col("Montant_pret")))

# Show the results
detail_credit.show(truncate=False)

+------+------------+----------+----------------------------------------------------+------------+------------+--------------------------------------------+--------------------------------------------------------------+
|Emp_ID|Type_credit |Duree_pret|Objet_pret                                          |Montant_pret|Credit_score|Historique_credits_precedent                |Garanties                                                     |
+------+------------+----------+----------------------------------------------------+------------+------------+--------------------------------------------+--------------------------------------------------------------+
|PDBO18|Consommation|60        |Événements Spéciaux                                 |9945000     |20          |Paiement par prélèvement automatique        |Domiciliation révocable de salaire                            |
|URKQ44|Immobilier  |360       |Frais d'études supérieur                            |5196000     |85          |Encours d

#### Sauvegarder les données de crédit nettoyées des clients

In [52]:
# Chemin HDFS pour sauvegarder le DataFrame nettoyé
output_path = "hdfs:///Omnidata_PFE_dataset/Détail_Crédit_nettoyee.csv"

# Sauvegarder le DataFrame nettoyé en CSV
detail_credit.write.csv(output_path, header=True, mode="overwrite")

                                                                                

In [44]:
# Arrêter la session Spark
spark.stop()