## Verification des version et test de spark

In [None]:
import sys
print("Version de Python :", sys.version)

import pyspark
print("Version de PySpark :", pyspark.__version__)

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
print("Version de Spark :", spark.version)

## Création d'une session de test

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("TestSpark") \
    .master("spark://spark-master:7077") \
    .getOrCreate()

df = spark.range(10)
df.show()

## Extraction du dataset et sauvegarde dans HDFS au format parquet

In [None]:
from pyspark.sql import SparkSession


spark = SparkSession.builder \
    .appName("CSV to Parquet") \
    .master("spark://spark-master:7077") \
    .getOrCreate()


csv_file_path = "/home/jovyan/work/datasets/train.csv"  


df = spark.read.csv(csv_file_path, header=True, inferSchema=True)


print("Noms des colonnes:", df.columns)


df.show(5)


parquet_output_path = "hdfs://namenode:8020/user/jovyan/bronze/train.parquet"


df.write.parquet(parquet_output_path)
print("Sauvegade du dataset avec succès dans hdfs")

## Chargement du dataset brut depuis HDFS pour un cleanning et stocker la data nettoyé dans une table Hive

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import trim, col
from pyspark.sql.functions import when
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler


spark = SparkSession.builder \
    .appName("Parquet Cleaning") \
    .master("spark://spark-master:7077") \
    .enableHiveSupport() \
    .getOrCreate()


parquet_file_path = "hdfs://namenode:9000/user/jovyan/bronze/train.parquet"


df = spark.read.parquet(parquet_file_path)
print("Chargement du fichier avec succès depuis hdfs")

# Nettoyage des données

# 1. Suppression des espaces dans les colonnes de type string
string_columns = [col_name for col_name, dtype in df.dtypes if dtype == "string"]
df_cleaned = df.select([trim(col(c)).alias(c) if c in string_columns else col(c) for c in df.columns])

# 2. Gestion des valeurs manquantes
df_cleaned = df_cleaned.na.fill(0)

# 3. Convertion des Colonnes en int
int_columns = ['ID', 'Customer_ID', 'Age', 'SSN', 'Num_Bank_Accounts', 'Num_Credit_Card', 
               'Interest_Rate', 'Num_of_Loan', 'Delay_from_due_date', 'Num_of_Delayed_Payment', 
               'Num_Credit_Inquiries']
df_cleaned = df_cleaned.select([col(c).cast("int").alias(c) if c in int_columns else col(c) for c in df_cleaned.columns])

# Convertion des Colonnes  en float
float_columns = ['Annual_Income', 'Monthly_Inhand_Salary', 'Changed_Credit_Limit', 'Outstanding_Debt', 
                 'Credit_Utilization_Ratio', 'Total_EMI_per_month', 'Amount_invested_monthly', 'Monthly_Balance', 
                 'Credit_History_Age']
df_cleaned = df_cleaned.select([col(c).cast("float").alias(c) if c in float_columns else col(c) for c in df_cleaned.columns])

# Suppression des doublons
df_cleaned = df_cleaned.dropDuplicates(subset=['ID', 'Customer_ID', 'Name'])

# Catégorisation de l'âge
df_cleaned = df_cleaned.withColumn(
    "Age_Group", 
    when(df_cleaned.Age < 25, "Young").when((df_cleaned.Age >= 25) & (df_cleaned.Age <= 35), "Adult").when((df_cleaned.Age > 35) & (df_cleaned.Age <= 50), "Middle-Aged").otherwise("Senior")
)


df_cleaned.printSchema()

df_cleaned.show(5)

cleaned_parquet_output_path = "hdfs://namenode:8020/user/jovyan/silver/cleaned_train.parquet"

# Sauvegarde du DataFrame nettoyé en format Parquet dans HDFS
df_cleaned.write.mode("overwrite").parquet(cleaned_parquet_output_path)

print(f"Dataset nettoyé sauvegardé avec succès dans {cleaned_parquet_output_path}")


In [None]:


# Sauvegarde du DataFrame nettoyé dans une table Hive
hive_table_name = "cleaned_train_data"

df_cleaned.write.mode("overwrite").saveAsTable(hive_table_name)

print(f"Dataset nettoyé sauvegardé avec succès dans la table Hive : {hive_table_name}")


## Partitionnement de notre Datawarehouse en DataMarts dans une base de donnée MySQL

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from sqlalchemy import create_engine
!pip install mysql-connector-python


spark = SparkSession.builder \
    .appName("Datamart Cleaning and Storage") \
    .master("spark://spark-master:7077") \
    .getOrCreate()

csv_file_path = "/home/jovyan/work/datasets/cleaned_dataset.csv"
df = spark.read.csv(csv_file_path, header=True, inferSchema=True)

# Datamart des Profils Clients
datamart_clients = df.select(
    'Customer_ID', 'Name', 'Age', 'SSN', 'Occupation', 'Annual_Income', 'Monthly_Inhand_Salary'
)

# Datamart des Comptes et Utilisation du Crédit
datamart_credit = df.select(
    'Customer_ID', 'Num_Bank_Accounts', 'Num_Credit_Card', 'Interest_Rate', 
    'Num_of_Loan', 'Delay_from_due_date', 'Num_of_Delayed_Payment', 
    'Credit_Utilization_Ratio', 'Outstanding_Debt'
)

# Datamart du Risque de Crédit
datamart_risk = df.select(
    'Customer_ID', 'Credit_Mix', 'Payment_of_Min_Amount', 'Total_EMI_per_month', 
    'Payment_Behaviour', 'Credit_Score', 'Credit_History_Age'
)

# Datamart des Transactions et Investissements
datamart_transactions = df.select(
    'Customer_ID', 'Month', 'Amount_invested_monthly', 'Monthly_Balance', 
    'Outstanding_Debt', 'Payment_Behaviour'
)

df_clients_pd = datamart_clients.toPandas()
df_credit_pd = datamart_credit.toPandas()
df_risk_pd = datamart_risk.toPandas()
df_transactions_pd = datamart_transactions.toPandas()

# Connexion à la base de données MySQL avec les informations de notre config Docker
engine = create_engine('mysql+mysqlconnector://root:rootpassword@mysql:3306/hive')

# Sauvegarde des datamarts dans la base de données MySQL
df_clients_pd.to_sql(name='datamart_clients', con=engine, if_exists='replace', index=False)
df_credit_pd.to_sql(name='datamart_credit', con=engine, if_exists='replace', index=False)
df_risk_pd.to_sql(name='datamart_risk', con=engine, if_exists='replace', index=False)
df_transactions_pd.to_sql(name='datamart_transactions', con=engine, if_exists='replace', index=False)

print("Datamarts sauvegardés avec succès dans la base de données MySQL")
