In [24]:
from pyspark.sql import SparkSession

# Créer une session Spark
spark = SparkSession.builder \
    .appName("Nettoyage des données") \
    .getOrCreate()


In [25]:
file_paths = {
    "cptferm": "hdfs://namenode:9000/mj/pfe/data/cptferm.csv",
    "fermcpt": "hdfs://namenode:9000/mj/pfe/data/fermcpt.csv",
    "gab_abstr": "hdfs://namenode:9000/mj/pfe/data/gab_abstr.csv",
    "gab22": "hdfs://namenode:9000/mj/pfe/data/gab22.csv",
    "getvir": "hdfs://namenode:9000/mj/pfe/data/getvir.csv",
    "operations_agence": "hdfs://namenode:9000/mj/pfe/data/operations_agence.csv",
    "perimetre": "hdfs://namenode:9000/mj/pfe/data/perimetre.csv",
    "solde": "hdfs://namenode:9000/mj/pfe/data/solde.csv",
    "statut_connexion": "hdfs://namenode:9000/mj/pfe/data/statut_connexion.csv",
    "yousr": "hdfs://namenode:9000/mj/pfe/data/yousr.csv"
}


In [26]:
from pyspark.sql.functions import *

# Charger chaque fichier CSV et stocker les DataFrames dans un dictionnaire
dataframes = {}
for key, file_path in file_paths.items():
    df = spark.read.csv(file_path, header=True)
    df = df.withColumnRenamed("new", "Idclient") if 'new' in df.columns else df
    dataframes[key] = df


In [27]:
from pyspark.sql.functions import *
from itertools import chain
# Perimetre
df_perimetre = dataframes["perimetre"]
df_perimetre = df_perimetre.withColumn("genre", when(df_perimetre["genre"] == "FÃ©minin", "Féminin").otherwise(df_perimetre["genre"]))
df_perimetre = df_perimetre.withColumn("MARITAL_STATUS",
                                       when(df_perimetre["MARITAL_STATUS"] == "MariÃ© (e)", "Marié(e)")
                                       .when(df_perimetre["MARITAL_STATUS"] == "DivorcÃ© (e)", "Divorcé(e)")
                                       .otherwise(df_perimetre["MARITAL_STATUS"]))
df_perimetre = df_perimetre.withColumn("DATE_NAISSANCE", date_format(df_perimetre["DATE_NAISSANCE"], "yyyy-MM-dd"))
df_perimetre = df_perimetre.withColumn("CUSTOMER_SINCE", date_format(df_perimetre["CUSTOMER_SINCE"], "yyyy-MM-dd"))
df_perimetre = df_perimetre.withColumn("DATE_NAISSANCE", when(col("DATE_NAISSANCE").isNull(), "1975-01-01").otherwise(col("DATE_NAISSANCE")))
df_perimetre = df_perimetre.withColumn("genre", when(df_perimetre["genre"].isNull(), when(rand() < 0.5, "Féminin").otherwise("Masculin")).otherwise(df_perimetre["genre"]))
df_perimetre = df_perimetre.withColumn("MARITAL_STATUS", when(df_perimetre["MARITAL_STATUS"].isNull(), when(rand() < 0.5, "Celibataire").otherwise("Marié(e)")).otherwise(df_perimetre["MARITAL_STATUS"]))
df_perimetre = df_perimetre.withColumn("age", (months_between(to_date(lit("2020-01-01")), to_date("DATE_NAISSANCE"), True) / 12).cast("integer"))
df_perimetre = df_perimetre.withColumn("CUSTOMER_YEARS", (months_between(to_date(lit("2020-01-01")), to_date("CUSTOMER_SINCE"), True) / 12).cast("integer"))
# Création d'une nouvelle colonne "region"
mapping_dic = {
    '1': 'BP Centre Sud',
    '17': 'BCP Réseau',
    '78': 'BCP Réseau',
    '27': 'BP Fès Meknès',
    '48': 'BP Fès Meknès',
    '43': 'BP Lâayoune',
    '45': 'BP Marrakech Béni Mellal',
    '50': 'BP Nador El Hoceima',
    '57': 'BP Oujda',
    '64': 'BP Tanger Tétouan',
    '81': 'BP Rabat Kénitra',
    '90': 'BCP Réseau',
    '30': 'BP LIB ATTAWFIK',
    '40': 'BP LIB ATTAWFIK'
}
mapping_expr = create_map([lit(x) for x in chain(*mapping_dic.items())])
df_perimetre = df_perimetre.withColumn('region', mapping_expr[col('BPR')])
df_perimetre = df_perimetre.fillna({'region': 'BP Nador El Hoceima'})
dataframes["perimetre"] = df_perimetre

In [28]:
df_perimetre .show()

+--------------+--------------+--------+--------------+---+-----------+---+--------------+-------------------+
|CUSTOMER_SINCE|DATE_NAISSANCE|   genre|MARITAL_STATUS|BPR|radical_new|age|CUSTOMER_YEARS|             region|
+--------------+--------------+--------+--------------+---+-----------+---+--------------+-------------------+
|    1996-03-05|    1956-01-01| Féminin|      Marié(e)| 01|          0| 64|            23|BP Nador El Hoceima|
|    1998-03-12|    1968-05-02|Masculin|      Marié(e)| 01|          1| 51|            21|BP Nador El Hoceima|
|    1974-11-28|    1950-01-01| Féminin|      Marié(e)| 01|          2| 70|            45|BP Nador El Hoceima|
|    2002-09-26|    1968-04-01| Féminin|   Celibataire| 01|          3| 51|            17|BP Nador El Hoceima|
|    2013-04-19|    1970-01-01|Masculin|      Marié(e)| 01|          4| 50|             6|BP Nador El Hoceima|
|    2010-12-08|    1978-07-01| Féminin|      Marié(e)| 01|          5| 41|             9|BP Nador El Hoceima|
|

In [29]:
# Transformation pour 'getvir'
df_getvir = dataframes["getvir"]
df_getvir = df_getvir.withColumn("DT_CONFIRMATION", date_format(df_getvir["DT_CONFIRMATION"], "yyyy-MM-dd"))
dataframes["getvir"] = df_getvir


In [30]:
# Transformation pour 'solde'
df_solde = dataframes["solde"]
df_solde = df_solde.withColumn("MVTCRED", when(col("MVTCRED").isNull(), lit(0)).otherwise(col("MVTCRED")))
dataframes["solde"] = df_solde


In [31]:
# Transformation pour 'statut_connexion'
df_statut_connexion = dataframes["statut_connexion"]
df_statut_connexion = df_statut_connexion.withColumn("CREATEDATE", date_format(df_statut_connexion["CREATEDATE"], "yyyy-MM-dd"))
df_statut_connexion = df_statut_connexion.withColumn("LASTLOGINDATE", date_format(df_statut_connexion["LASTLOGINDATE"], "yyyy-MM-dd"))
df_statut_connexion = df_statut_connexion.withColumn("CONNECTIONCOUNT", when(col("CONNECTIONCOUNT").isNull(), lit(0)).otherwise(col("CONNECTIONCOUNT")))
dataframes["statut_connexion"] = df_statut_connexion


In [32]:
# Transformation pour 'yousr'
df_yousr = dataframes["yousr"]
df_yousr = df_yousr.withColumn("AUTORISATION", col("AUTORISATION").cast("int"))
dataframes["yousr"] = df_yousr


In [33]:
# Transformation pour 'fermcpt'
df_fermcpt = dataframes["fermcpt"]
df_fermcpt = df_fermcpt.withColumn("DATE_DEMANDE", date_format(df_fermcpt["DATE_DEMANDE"], "yyyy-MM-dd"))
df_fermcpt = df_fermcpt.withColumn("DATE_TRAIT", date_format(df_fermcpt["DATE_TRAIT"], "yyyy-MM-dd"))
dataframes["fermcpt"] = df_fermcpt


In [34]:
# Transformation pour 'cptferm'
df_cptferm = dataframes["cptferm"]
df_cptferm = df_cptferm.withColumn("DATE_FERM", date_format(df_cptferm["DATE_FERM"], "yyyy-MM-dd"))
dataframes["cptferm"] = df_cptferm


In [35]:
# Pas de transformation spécifique nécessaire pour 'operations_agence'

df_operations_agence = dataframes["operations_agence"]
dataframes["operations_agence"] = df_operations_agence

In [36]:
# Pas de transformation spécifique nécessaire pour 'gab22'

df_gab22 = dataframes["gab22"] 
dataframes["gab22"] = df_gab22

In [37]:
# Vérifier la structure des DataFrames après les modifications
for key, df in dataframes.items():
    print(f"Schéma de {key}:")
    df.printSchema()

Schéma de cptferm:
root
 |-- Idclient: string (nullable = true)
 |-- PERIODE: string (nullable = true)
 |-- DATE_FERM: string (nullable = true)

Schéma de fermcpt:
root
 |-- Idclient: string (nullable = true)
 |-- ORIGINE_DEMANDE: string (nullable = true)
 |-- DATE_TRAIT: string (nullable = true)
 |-- DATE_DEMANDE: string (nullable = true)

Schéma de gab_abstr:
root
 |-- MONTANT: string (nullable = true)
 |-- MOIS: string (nullable = true)
 |-- ANNEE: string (nullable = true)
 |-- NOMBRE_OPERATION: string (nullable = true)
 |-- Idclient: string (nullable = true)

Schéma de gab22:
root
 |-- MONTANT: string (nullable = true)
 |-- MOIS: string (nullable = true)
 |-- ANNEE: string (nullable = true)
 |-- NOMBRE_OPERATION: string (nullable = true)
 |-- Idclient: string (nullable = true)

Schéma de getvir:
root
 |-- CANAL: string (nullable = true)
 |-- MONTANT_TOTAL: string (nullable = true)
 |-- DT_CONFIRMATION: string (nullable = true)
 |-- TYPE_OPE: string (nullable = true)
 |-- Idclient: 

In [38]:
from pyspark.sql.functions import concat_ws

# Liste des DataFrames à modifier
dataframes_to_modify = ["gab_abstr", "gab22", "operations_agence", "solde"]

# Boucle sur chaque DataFrame pour modifier le schéma
for df_name in dataframes_to_modify:
    df = dataframes[df_name]
    # Créer une nouvelle colonne "MOIS_ANNEE" en fusionnant "MOIS" et "ANNEE"
    df = df.withColumn("MOIS_ANNEE", concat_ws("-", df["ANNEE"], df["MOIS"]))
    # Optionnellement, supprimer les anciennes colonnes "MOIS" et "ANNEE"
    df = df.drop("MOIS", "ANNEE")
    # Mettre à jour le DataFrame dans le dictionnaire
    dataframes[df_name] = df

    # Afficher le nouveau schéma pour vérification
    print(f"Schéma modifié de {df_name}:")
    df.printSchema()


Schéma modifié de gab_abstr:
root
 |-- MONTANT: string (nullable = true)
 |-- NOMBRE_OPERATION: string (nullable = true)
 |-- Idclient: string (nullable = true)
 |-- MOIS_ANNEE: string (nullable = false)

Schéma modifié de gab22:
root
 |-- MONTANT: string (nullable = true)
 |-- NOMBRE_OPERATION: string (nullable = true)
 |-- Idclient: string (nullable = true)
 |-- MOIS_ANNEE: string (nullable = false)

Schéma modifié de operations_agence:
root
 |-- MONTANT: string (nullable = true)
 |-- NOMBRE_OPERATION: string (nullable = true)
 |-- Idclient: string (nullable = true)
 |-- MOIS_ANNEE: string (nullable = false)

Schéma modifié de solde:
root
 |-- MVTDEB: string (nullable = true)
 |-- MVTCRED: string (nullable = true)
 |-- NBMVTCRE_MENS: string (nullable = true)
 |-- NBMVTDB_MENS: string (nullable = true)
 |-- SOLDE: string (nullable = true)
 |-- radical_new: string (nullable = true)
 |-- MOIS_ANNEE: string (nullable = false)



In [39]:
# Afficher le DataFrame 'gab22' pour vérifier la nouvelle colonne 'MOIS_ANNEE'
dataframes["gab22"].show()


+-------+----------------+--------+----------+
|MONTANT|NOMBRE_OPERATION|Idclient|MOIS_ANNEE|
+-------+----------------+--------+----------+
|      0|              12|  232100|    2020-1|
|      0|               5|  232100|    2020-2|
|   9798|              25|  922709|    2020-1|
|   7570|              33|  922709|    2020-2|
| 5993.8|              13|  359535|    2020-1|
|   3700|               7|  359535|    2020-2|
|   7200|              31|  780003|    2020-1|
|   4000|              13|  780003|    2020-2|
|   5000|              30|  820867|    2020-1|
|   3600|              12|  820867|    2020-2|
|   4800|               9|  148052|    2020-1|
|   6300|              11|  148052|    2020-2|
|   6600|              10|  999879|    2020-1|
|   4300|               5|  999879|    2020-2|
|   3800|              24|  349685|    2020-1|
|   5300|              23|  349685|    2020-2|
|   1600|               9|  397364|    2020-1|
|   2500|               3|  397364|    2020-2|
|   3600|    

In [40]:
# Afficher le nombre de lignes de chaque DataFrame
for key, df in dataframes.items():
    count = df.count()
    print(f"Le DataFrame {key} contient {count} lignes.")


Le DataFrame cptferm contient 21703 lignes.
Le DataFrame fermcpt contient 13548 lignes.
Le DataFrame gab_abstr contient 4832172 lignes.
Le DataFrame gab22 contient 832453 lignes.
Le DataFrame getvir contient 145787 lignes.
Le DataFrame operations_agence contient 2456799 lignes.
Le DataFrame perimetre contient 1000000 lignes.
Le DataFrame solde contient 10677248 lignes.
Le DataFrame statut_connexion contient 52785 lignes.
Le DataFrame yousr contient 38845 lignes.


In [41]:
from pyspark.sql.functions import col

# Fusionner les DataFrames "cptferm" et "fermcpt" sur "Idclient"
df_cptferm = dataframes["cptferm"]
df_fermcpt = dataframes["fermcpt"]

# Fusionner les DataFrames
df_merged_cptferm_fermcpt = df_cptferm.join(df_fermcpt, "Idclient", "left")

# Afficher le schéma et les premières lignes du nouveau DataFrame fusionné pour vérifier
df_merged_cptferm_fermcpt.printSchema()
df_merged_cptferm_fermcpt.show(truncate=False)


root
 |-- Idclient: string (nullable = true)
 |-- PERIODE: string (nullable = true)
 |-- DATE_FERM: string (nullable = true)
 |-- ORIGINE_DEMANDE: string (nullable = true)
 |-- DATE_TRAIT: string (nullable = true)
 |-- DATE_DEMANDE: string (nullable = true)

+--------+-------+----------+---------------+----------+------------+
|Idclient|PERIODE|DATE_FERM |ORIGINE_DEMANDE|DATE_TRAIT|DATE_DEMANDE|
+--------+-------+----------+---------------+----------+------------+
|543425  |H      |2018-12-01|NULL           |NULL      |NULL        |
|353327  |H      |2018-12-01|NULL           |NULL      |NULL        |
|354648  |H      |2018-12-01|NULL           |NULL      |NULL        |
|319869  |H      |2018-12-01|NULL           |NULL      |NULL        |
|708338  |H      |2018-12-22|NULL           |NULL      |NULL        |
|340658  |H      |2018-12-22|NULL           |NULL      |NULL        |
|174405  |H      |2018-12-22|NULL           |NULL      |NULL        |
|174405  |H      |2018-12-22|NULL        

In [42]:
df_merged_cptferm_fermcpt.count()

21716

In [43]:
from pyspark.sql.functions import col

# Charger les DataFrames depuis le dictionnaire
df_perimetre = dataframes["perimetre"].withColumnRenamed("radical_new", "Idclient")
df_solde = dataframes["solde"].withColumnRenamed("radical_new", "Idclient")

# Fusionner les DataFrames "perimetre" et "solde" sur "Idclient"
df_merged_perimetre_solde = df_perimetre.join(df_solde, "Idclient", "left")

# Afficher le schéma et les premières lignes du nouveau DataFrame fusionné pour vérifier
df_merged_perimetre_solde.printSchema()
df_merged_perimetre_solde.show(truncate=False)




root
 |-- Idclient: string (nullable = true)
 |-- CUSTOMER_SINCE: string (nullable = true)
 |-- DATE_NAISSANCE: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- MARITAL_STATUS: string (nullable = true)
 |-- BPR: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- CUSTOMER_YEARS: integer (nullable = true)
 |-- region: string (nullable = false)
 |-- MVTDEB: string (nullable = true)
 |-- MVTCRED: string (nullable = true)
 |-- NBMVTCRE_MENS: string (nullable = true)
 |-- NBMVTDB_MENS: string (nullable = true)
 |-- SOLDE: string (nullable = true)
 |-- MOIS_ANNEE: string (nullable = true)

+--------+--------------+--------------+--------+--------------+---+---+--------------+-------------------+-------+---------+-------------+------------+---------+----------+
|Idclient|CUSTOMER_SINCE|DATE_NAISSANCE|genre   |MARITAL_STATUS|BPR|age|CUSTOMER_YEARS|region             |MVTDEB |MVTCRED  |NBMVTCRE_MENS|NBMVTDB_MENS|SOLDE    |MOIS_ANNEE|
+--------+--------------+----

In [44]:
df_merged_perimetre_solde.count()

10829478

In [45]:
from pyspark.sql.functions import when, col, date_format

# Appliquer les transformations sur le DataFrame fusionné
df_merged_perimetre_solde = df_merged_perimetre_solde.withColumn(
    "MARITAL_STATUS",
    when(df_merged_perimetre_solde["MARITAL_STATUS"] == "MariÃ© (e)", "Marié(e)")
    .when(df_merged_perimetre_solde["MARITAL_STATUS"] == "DivorcÃ© (e)", "Divorcé(e)")
    .otherwise(df_merged_perimetre_solde["MARITAL_STATUS"])
)

# Afficher le schéma et quelques lignes pour vérifier les transformations
df_merged_perimetre_solde.printSchema()
df_merged_perimetre_solde.show(10, truncate=False)


root
 |-- Idclient: string (nullable = true)
 |-- CUSTOMER_SINCE: string (nullable = true)
 |-- DATE_NAISSANCE: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- MARITAL_STATUS: string (nullable = true)
 |-- BPR: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- CUSTOMER_YEARS: integer (nullable = true)
 |-- region: string (nullable = false)
 |-- MVTDEB: string (nullable = true)
 |-- MVTCRED: string (nullable = true)
 |-- NBMVTCRE_MENS: string (nullable = true)
 |-- NBMVTDB_MENS: string (nullable = true)
 |-- SOLDE: string (nullable = true)
 |-- MOIS_ANNEE: string (nullable = true)

+--------+--------------+--------------+--------+--------------+---+---+--------------+--------+--------+---------+-------------+------------+---------+----------+
|Idclient|CUSTOMER_SINCE|DATE_NAISSANCE|genre   |MARITAL_STATUS|BPR|age|CUSTOMER_YEARS|region  |MVTDEB  |MVTCRED  |NBMVTCRE_MENS|NBMVTDB_MENS|SOLDE    |MOIS_ANNEE|
+--------+--------------+--------------+--------+

In [46]:
output_paths = {
    "cptferm": "hdfs://namenode:9000/mj/pfe/data_transformed/cptferm",
    "fermcpt": "hdfs://namenode:9000/mj/pfe/data_transformed/fermcpt",
    "gab_abstr": "hdfs://namenode:9000/mj/pfe/data_transformed/gab_abstr",
    "gab22": "hdfs://namenode:9000/mj/pfe/data_transformed/gab22",
    "getvir": "hdfs://namenode:9000/mj/pfe/data_transformed/getvir",
    "operations_agence": "hdfs://namenode:9000/mj/pfe/data_transformed/operations_agence",
    "perimetre": "hdfs://namenode:9000/mj/pfe/data_transformed/perimetre",
    "solde": "hdfs://namenode:9000/mj/pfe/data_transformed/solde",
    "statut_connexion": "hdfs://namenode:9000/mj/pfe/data_transformed/statut_connexion",
    "yousr": "hdfs://namenode:9000/mj/pfe/data_transformed/yousr",
    "merged_perimetre_solde": "hdfs://namenode:9000/mj/pfe/data_transformed/merged_perimetre_solde",
    "merged_cptferm_fermcpt": "hdfs://namenode:9000/mj/pfe/data_transformed/merged_cptferm_fermcpt"
}

# Sauvegarde de chaque DataFrame transformé au format CSV
for key, df in dataframes.items():
    try:
        df.write.csv(output_paths[key], mode='overwrite', header=True)
        print(f"DataFrame {key} sauvegardé au format CSV dans {output_paths[key]}")
    except Exception as e:
        print(f"Erreur lors de la sauvegarde de {key} au format CSV :", e)

# Sauvegarde des DataFrames fusionnés
try:
    df_merged_perimetre_solde.write.csv(output_paths['merged_perimetre_solde'], mode='overwrite', header=True)
    print("DataFrame merged_perimetre_solde sauvegardé au format CSV.")
except Exception as e:
    print("Erreur lors de la sauvegarde de merged_perimetre_solde au format CSV :", e)

try:
    df_merged_cptferm_fermcpt.write.csv(output_paths['merged_cptferm_fermcpt'], mode='overwrite', header=True)
    print("DataFrame merged_cptferm_fermcpt sauvegardé au format CSV.")
except Exception as e:
    print("Erreur lors de la sauvegarde de merged_cptferm_fermcpt au format CSV :", e)


DataFrame cptferm sauvegardé au format CSV dans hdfs://namenode:9000/mj/pfe/data_transformed/cptferm
DataFrame fermcpt sauvegardé au format CSV dans hdfs://namenode:9000/mj/pfe/data_transformed/fermcpt
DataFrame gab_abstr sauvegardé au format CSV dans hdfs://namenode:9000/mj/pfe/data_transformed/gab_abstr
DataFrame gab22 sauvegardé au format CSV dans hdfs://namenode:9000/mj/pfe/data_transformed/gab22
DataFrame getvir sauvegardé au format CSV dans hdfs://namenode:9000/mj/pfe/data_transformed/getvir
DataFrame operations_agence sauvegardé au format CSV dans hdfs://namenode:9000/mj/pfe/data_transformed/operations_agence
DataFrame perimetre sauvegardé au format CSV dans hdfs://namenode:9000/mj/pfe/data_transformed/perimetre
DataFrame solde sauvegardé au format CSV dans hdfs://namenode:9000/mj/pfe/data_transformed/solde
DataFrame statut_connexion sauvegardé au format CSV dans hdfs://namenode:9000/mj/pfe/data_transformed/statut_connexion
DataFrame yousr sauvegardé au format CSV dans hdfs://na

In [47]:
from pyspark.sql import SparkSession

# Créer une session Spark
spark = SparkSession.builder \
    .appName("Sauvegarde Parquet") \
    .getOrCreate()

# Chemins de sortie pour chaque DataFrame
output_paths = {
    "cptferm": "hdfs://namenode:9000/mj/pfe/data_parquet/cptferm",
    "fermcpt": "hdfs://namenode:9000/mj/pfe/data_parquet/fermcpt",
    "gab_abstr": "hdfs://namenode:9000/mj/pfe/data_parquet/gab_abstr",
    "gab22": "hdfs://namenode:9000/mj/pfe/data_parquet/gab22",
    "getvir": "hdfs://namenode:9000/mj/pfe/data_parquet/getvir",
    "operations_agence": "hdfs://namenode:9000/mj/pfe/data_parquet/operations_agence",
    "perimetre": "hdfs://namenode:9000/mj/pfe/data_parquet/perimetre",
    "solde": "hdfs://namenode:9000/mj/pfe/data_parquet/solde",
    "statut_connexion": "hdfs://namenode:9000/mj/pfe/data_parquet/statut_connexion",
    "yousr": "hdfs://namenode:9000/mj/pfe/data_parquet/yousr",
    "merged_cptferm_fermcpt": "hdfs://namenode:9000/mj/pfe/data_parquet/merged_cptferm_fermcpt",
    "merged_perimetre_solde": "hdfs://namenode:9000/mj/pfe/data_parquet/merged_perimetre_solde"
}

# Sauvegarde de chaque DataFrame transformé au format Parquet
for key, df in dataframes.items():
    try:
        df.write.parquet(output_paths[key], mode='overwrite')
        print(f"DataFrame {key} sauvegardé au format Parquet dans {output_paths[key]}")
    except Exception as e:
        print(f"Erreur lors de la sauvegarde de {key} au format Parquet :", e)


DataFrame cptferm sauvegardé au format Parquet dans hdfs://namenode:9000/mj/pfe/data_parquet/cptferm
DataFrame fermcpt sauvegardé au format Parquet dans hdfs://namenode:9000/mj/pfe/data_parquet/fermcpt
DataFrame gab_abstr sauvegardé au format Parquet dans hdfs://namenode:9000/mj/pfe/data_parquet/gab_abstr
DataFrame gab22 sauvegardé au format Parquet dans hdfs://namenode:9000/mj/pfe/data_parquet/gab22
DataFrame getvir sauvegardé au format Parquet dans hdfs://namenode:9000/mj/pfe/data_parquet/getvir
DataFrame operations_agence sauvegardé au format Parquet dans hdfs://namenode:9000/mj/pfe/data_parquet/operations_agence
DataFrame perimetre sauvegardé au format Parquet dans hdfs://namenode:9000/mj/pfe/data_parquet/perimetre
DataFrame solde sauvegardé au format Parquet dans hdfs://namenode:9000/mj/pfe/data_parquet/solde
DataFrame statut_connexion sauvegardé au format Parquet dans hdfs://namenode:9000/mj/pfe/data_parquet/statut_connexion
DataFrame yousr sauvegardé au format Parquet dans hdfs: