# Mini-projet Machine learning avec MLLIB

Le problème de prédiction de l'attrition des employés consiste à prédire si un employé quittera ou non son poste. Cela peut être utile pour les entreprises afin de prendre des mesures préventives pour retenir leurs employés précieux et réduire le taux d'attrition. L'attrition peut être coûteuse pour les entreprises en termes de recrutement et de formation de nouveaux employés, ainsi que de la perte de connaissances et d'expertise. L'utilisation de Spark MLLIB permet de construire des modèles prédictifs à grande échelle pour résoudre ce problème. En utilisant des techniques telles que l'encodage des caractéristiques, la visualisation des données et la construction de modèles Random Forests et Gradient Boosting, les data scientists peuvent développer des modèles prédictifs précis qui peuvent aider les entreprises à réduire le taux d'attrition et à améliorer la rétention des employés.

# 1. Dataset :*Attrition et performance des employés IBM HR Analytics Prévoyez l'attrition de vos précieux employés
Il s'agit d'un ensemble de données fictif créé par les data scientists d'IBM. Nous devons explorer l'ensemble de données, comprendre les algorithmes et les techniques qui peuvent y être appliqués. Nous essaierons d'obtenir des informations significatives à partir de l'ensemble de données, comme quels sont les facteurs qui ont un impact sur l'attrition des employés.

# 2. Travail demandé :

# 1. Importer les données : Tout d'abord, il est nécessaire d'importer les données dans un format exploitable par Spark MLLIB. Les données peuvent être stockées dans des fichiers CSV, des bases de données ou des formats de fichiers parquet, entre autres.

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnan, when, count, col
from pyspark.ml import Pipeline
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder, MinMaxScaler, ChiSqSelector
from pyspark.ml.classification import RandomForestClassifier, GBTClassifier, LogisticRegression, DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

In [None]:
# Création de l'application Spark avec le nom PAE - Prédiction Attrition Employés - Merouane Bennaceur V1.0
spark = SparkSession.builder.appName("PAE - Prédiction Attrition Employés - Merouane Bennaceur V1.0").getOrCreate()

# Charger les données
data = spark.read.csv("HR-Employee-Attrition.csv", header=True, inferSchema=True)

# 2. Explorer les données : Une fois les données importées, il est important d'en explorer la structure et les caractéristiques. Cela peut inclure l'analyse des valeurs manquantes, des distributions de variables et de l'existence de corrélations entre les variables.

In [None]:
# Afficher les 10 premières lignes
print("\n\n")
print("Affichage des 10 premières lignes")
data.show(10)

# Afficher le schéma
print("\n\n")
print("Affichage du schéma")
data.printSchema()

# Statistiques descriptives
print("\n\n")
print("Statistiques descriptives")
data.describe().show()

print("\n\n")
data_summary = data.describe().toPandas()
print(data_summary)

# Vérification des valeurs manquantes
# Identifiecation des colonnes avec des valeurs manquantes
print("\n\n")
print("Valeurs manquantes par colonnes")
data.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in data.columns]).show()


# Analyse des distributions de variables
# Exemple avec age
print("\n\n")
print("Histogramme de distribution de la variable age")
col_name = "age"
col_data = data.select(col_name).toPandas()
sns.histplot(col_data[col_name], kde=True)
plt.show()


# Examiner les corrélations entre les variables et visualiser les relations
# Création d'un vecteur avec les colonnes numériques
vector_col = "corr_features"
assembler = VectorAssembler(inputCols=["Age", "DailyRate"], outputCol=vector_col)
data_vector = assembler.transform(data).select(vector_col)

# Calculer la matrice de corrélation
matrix = Correlation.corr(data_vector, vector_col).collect()[0][0]
correlation_matrix = matrix.toArray().tolist()
print("\n\n")
print("Matrice de corrélation entre Age et DailyRate")
print(correlation_matrix)

# Exemple pour un nuage de points entre deux variables numériques (Age, DailyRate)
x_col = "Age"
y_col = "DailyRate"
scatter_data = data.select(x_col, y_col).toPandas()
sns.scatterplot(data=scatter_data, x=x_col, y=y_col)
print("\n\n")
print("Nuage de points de corrélation entre Age et DailyRate")
plt.show()

# 3. Nettoyer les données : Après l'exploration des données, il est possible que certaines données soient incohérentes, mal formatées ou contiennent des valeurs aberrantes. Dans ce cas, il est important de nettoyer les données avant de les utiliser pour entraîner un modèle.

In [None]:
# On va supprimer les valeurs aberrantes les colonnes numériques en utilisant l'intervalle interquartile (IQR).

# Liste des colonnes numériques
numerical_columns = ["Age", "DailyRate", "DistanceFromHome", "Education", "EnvironmentSatisfaction", "HourlyRate", "JobInvolvement", "JobLevel", "JobSatisfaction", "MonthlyIncome", "MonthlyRate", "NumCompaniesWorked", "PercentSalaryHike", "PerformanceRating", "RelationshipSatisfaction", "StockOptionLevel", "TotalWorkingYears", "TrainingTimesLastYear", "WorkLifeBalance", "YearsAtCompany", "YearsInCurrentRole", "YearsSinceLastPromotion", "YearsWithCurrManager"]

data_cleaned = data

for column in numerical_columns:
    Q1 = data.approxQuantile(column, [0.25], 0.05)[0]
    Q3 = data.approxQuantile(column, [0.75], 0.05)[0]
    IQR = Q3 - Q1
    lower_range = Q1 - 1.5 * IQR
    upper_range = Q3 + 1.5 * IQR

    data_cleaned = data_cleaned.filter((col(column) >= lower_range) & (col(column) <= upper_range))

# Affichage dataset nettoyé
print("\n\n")
print("Affichage dataset nettoyé")
data_cleaned.show()

# 4. Préparer les données : Selon le modèle que vous souhaitez utiliser, vous devrez peut-être préparer les données pour le rendre compatible avec ce modèle. Cela peut inclure la normalisation des variables, la transformation des variables catégorielles en variables numériques et la séparation des données en ensembles de formation et de test.

In [None]:
# Normalisation de toutes les colonnes numériques et transformation des colonnes discrettes en variables numériques (one-hot encoding).

# Liste des colonnes discrétes
categorical_columns = ["BusinessTravel", "Department", "EducationField", "Gender", "JobRole", "MaritalStatus", "OverTime"]

# Stages pour le pipeline
stages = []

# One-hot encoding pour les colonnes catégorielles
for column in categorical_columns:
    indexer = StringIndexer(inputCol=column, outputCol=f"{column}Index")
    encoder = OneHotEncoder(inputCol=f"{column}Index", outputCol=f"{column}Vec")
    stages += [indexer, encoder]

# VectorAssembler et MinMaxScaler pour les colonnes numériques
for column in numerical_columns:
    assembler = VectorAssembler(inputCols=[column], outputCol=f"{column}Vec")
    scaler = MinMaxScaler(inputCol=f"{column}Vec", outputCol=f"{column}Scaled")
    stages += [assembler, scaler]

# Créer et appliquer un pipeline
pipeline = Pipeline(stages=stages)
data_prepared = pipeline.fit(data_cleaned).transform(data_cleaned)

# Affichage dataset préparé
print("\n\n")
print("Affichage dataset préparé")
data_prepared.show()

# 5. Sélectionner les fonctionnalités : Si les données contiennent de nombreuses variables, vous pouvez envisager de sélectionner les fonctionnalités qui sont les plus pertinentes pour votre modèle. Cela peut aider à améliorer la précision de votre modèle et à réduire le temps nécessaire pour l'entraîner.

In [None]:
# Utilisation méthode Chi-Squared pour sélectionner les fonctionnalités les plus pertinentes pour le modèle. 
# pour sélectionner les 10 fonctionnalités les + importantes parmi les colonnes discrétes et numériques prétraitées.

# Combinez toutes les colonnes d'entités en un seul vecteur
input_cols = [f"{col}Scaled" for col in numerical_columns] + [f"{col}Vec" for col in categorical_columns]
assembler = VectorAssembler(inputCols=input_cols, outputCol="features")
data_features = assembler.transform(data_prepared)

# Indexez la colonne Attrition (cible)
indexer = StringIndexer(inputCol="Attrition", outputCol="label")
data_features = indexer.fit(data_features).transform(data_features)

# Utilisation ChiSqSelector pour sélectionner les fonctionnalités les plus pertinentes
selector = ChiSqSelector(numTopFeatures=10, featuresCol="features", outputCol="selectedFeatures", labelCol="label")
selected_data = selector.fit(data_features).transform(data_features)
selected_data = selected_data.select("features", "selectedFeatures", "label")

# Affichage dataset avec les fonctionnalités pertinentes
print("\n\n")
print("Affichage dataset avec les fonctionnalités pertinentes")
selected_data.show()

# 6. Équilibrer les données : Si les données contiennent un déséquilibre entre les classes, il peut être nécessaire d'équilibrer les données en utilisant des techniques telles que SMOTE (Synthetic Minority Over-sampling Technique) pour augmenter le nombre d'échantillons de la classe minoritaire.

In [None]:
# Calculez le taux d'équilibrage
attrition_counts = selected_data.groupBy("label").count().collect()
minority_count = min(attrition_counts, key=lambda x: x["count"])["count"]
majority_count = max(attrition_counts, key=lambda x: x["count"])["count"]
balancing_ratio = minority_count / majority_count

# Équilibrez les données
majority_label = max(attrition_counts, key=lambda x: x["count"])["label"]
minority_label = min(attrition_counts, key=lambda x: x["count"])["label"]

majority_data = selected_data.filter(col("label") == majority_label)
minority_data = selected_data.filter(col("label") == minority_label)

majority_data_downsampled = majority_data.sample(withReplacement=False, fraction=balancing_ratio, seed=42)
balanced_data = majority_data_downsampled.union(minority_data)

print("\n\n")
print("Affichage dataset équilibré")
balanced_data.show()

# 7. Diviser les données : Une fois que les données sont nettoyées et préparées, il est important de diviser les données en ensembles de formation et de test. L'ensemble de formation est utilisé pour entraîner le modèle, tandis que l'ensemble de test est utilisé pour évaluer les performances du modèle.

In [None]:
train_data, test_data = balanced_data.randomSplit([0.8, 0.2], seed=42)

# 8. Entraîner le modèle : Utilisez l'ensemble de formation pour entraîner le modèle en utilisant l'algorithme approprié. Les algorithmes populaires pour la classification incluent Random Forest, Gradient Boosting, Logistic Regression et Decision Tree.

In [None]:
# Random Forest
rf = RandomForestClassifier(featuresCol="selectedFeatures", labelCol="label", numTrees=100)
rf_model = rf.fit(train_data)


# Gradient Boosting
gbt = GBTClassifier(featuresCol="selectedFeatures", labelCol="label", maxIter=100)
gbt_model = gbt.fit(train_data)


# Logistic Regression:
lr = LogisticRegression(featuresCol="selectedFeatures", labelCol="label", maxIter=100)
lr_model = lr.fit(train_data)

# Decision Tree:
dt = DecisionTreeClassifier(featuresCol="selectedFeatures", labelCol="label")
dt_model = dt.fit(train_data)

# 9. Évaluer le modèle : Une fois que le modèle est entraîné, utilisez l'ensemble de test pour évaluer ses performances. Les métriques courantes pour évaluer les performances d'un modèle de classification incluent la précision, le rappel, le score F1 et la courbe ROC.

In [None]:
# Fonction pour évaluer les modèles
def evaluate_model(model, test_data):
    predictions = model.transform(test_data)
    evaluator = BinaryClassificationEvaluator()
    roc = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})
    predictionAndLabels = predictions.select("prediction", "label").rdd.map(lambda p: (float(p[0]), float(p[1])))
    metrics = MulticlassMetrics(predictionAndLabels)
    precision = metrics.precision(1.0)
    recall = metrics.recall(1.0)
    f1_score = metrics.fMeasure(1.0)
    return roc, precision, recall, f1_score

# Évaluation des modèles
models = [("Random Forest", rf_model), ("Gradient Boosting", gbt_model), ("Logistic Regression", lr_model), ("Decision Tree", dt_model)]

for model_name, model in models:
    roc, precision, recall, f1_score = evaluate_model(model, test_data)
    print(f"{model_name}:")
    print(f"  Précision: {precision}")
    print(f"  Rappel: {recall}")
    print(f"  Score F1: {f1_score}")
    print(f"  ROC: {roc}\n")

# 10. Optimiser le modèle : Si le modèle ne répond pas aux attentes, il est possible que les paramètres du modèle doivent être optimisés pour obtenir de meilleures performances. Cela peut inclure l'ajustement des hyperparamètres, la modification de l'algorithme ou la sélection de différentes fonctionnalités.

In [None]:
# Pour optimiser les modèles on peut utiliser la validation croisée avec un ensemble de paramètres à tester. 
# Par exemple avec la régression logistique car c'est elle qui a obtenu les meilleurs évaluations.
# On peut faire l'optimisation du modèle avec validation croisée et recherche sur grille.

paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.1, 0.01, 0.001]).addGrid(lr.fitIntercept, [True, False]).addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]).build()
cross_val = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=BinaryClassificationEvaluator(), numFolds=5)
cv_model = cross_val.fit(train_data)
best_lr_model = cv_model.bestModel

# Évaluer le modèle optimisé
roc, precision, recall, f1_score = evaluate_model(best_lr_model, test_data)
print("Logistic Regression (Optimisé):")
print(f"  Précision: {precision}")
print(f"  Rappel: {recall}")
print(f"  Score F1: {f1_score}")
print(f"  ROC: {roc}\n")

# 11. Appliquer le modèle : Une fois que le modèle est entraîné et testé, il peut être appliqué aux nouvelles données pour faire des prédictions sur la probabilité d'attrition d'un employé.

In [None]:
def preprocess_data(filename):
    data = spark.read.csv(filename, header=True, inferSchema=True)
    data_cleaned = data
    numerical_columns = ["Age", "DailyRate", "DistanceFromHome", "Education", "EnvironmentSatisfaction", "HourlyRate", "JobInvolvement", "JobLevel", "JobSatisfaction", "MonthlyIncome", "MonthlyRate", "NumCompaniesWorked", "PercentSalaryHike", "PerformanceRating", "RelationshipSatisfaction", "StockOptionLevel", "TotalWorkingYears", "TrainingTimesLastYear", "WorkLifeBalance", "YearsAtCompany", "YearsInCurrentRole", "YearsSinceLastPromotion", "YearsWithCurrManager"]
    categorical_columns = ["BusinessTravel", "Department", "EducationField", "Gender", "JobRole", "MaritalStatus", "OverTime"]
    for column in numerical_columns:
        Q1 = data.approxQuantile(column, [0.25], 0.05)[0]
        Q3 = data.approxQuantile(column, [0.75], 0.05)[0]
        IQR = Q3 - Q1
        lower_range = Q1 - 1.5 * IQR
        upper_range = Q3 + 1.5 * IQR
        data_cleaned = data_cleaned.filter((col(column) >= lower_range) & (col(column) <= upper_range))
    stages = []
    for column in categorical_columns:
        indexer = StringIndexer(inputCol=column, outputCol=f"{column}Index")
        encoder = OneHotEncoder(inputCol=f"{column}Index", outputCol=f"{column}Vec")
        stages += [indexer, encoder]
    for column in numerical_columns:
        assembler = VectorAssembler(inputCols=[column], outputCol=f"{column}Vec")
        scaler = MinMaxScaler(inputCol=f"{column}Vec", outputCol=f"{column}Scaled")
        stages += [assembler, scaler]
    pipeline = Pipeline(stages=stages)
    data_prepared = pipeline.fit(data_cleaned).transform(data_cleaned)
    input_cols = [f"{col}Scaled" for col in numerical_columns] + [f"{col}Vec" for col in categorical_columns]
    assembler = VectorAssembler(inputCols=input_cols, outputCol="features")
    data_features = assembler.transform(data_prepared)
    indexer = StringIndexer(inputCol="Attrition", outputCol="label")
    data_features = indexer.fit(data_features).transform(data_features)
    selector = ChiSqSelector(numTopFeatures=10, featuresCol="features", outputCol="selectedFeatures", labelCol="label")
    selected_data = selector.fit(data_features).transform(data_features)
    attrition_counts = selected_data.groupBy("label").count().collect()
    minority_count = min(attrition_counts, key=lambda x: x["count"])["count"]
    majority_count = max(attrition_counts, key=lambda x: x["count"])["count"]
    balancing_ratio = minority_count / majority_count
    majority_label = max(attrition_counts, key=lambda x: x["count"])["label"]
    minority_label = min(attrition_counts, key=lambda x: x["count"])["label"]
    majority_data = selected_data.filter(col("label") == majority_label)
    minority_data = selected_data.filter(col("label") == minority_label)
    majority_data_downsampled = majority_data.sample(withReplacement=False, fraction=balancing_ratio, seed=42)
    balanced_data = majority_data_downsampled.union(minority_data)

    return balanced_data

In [None]:
new_data = preprocess_data("New-Employee-Data.csv")
new_data.show()

new_data_features = new_data

predictions = best_lr_model.transform(new_data_features)
predictions.select("label", "prediction", "probability").show()

#  11.2 tester avec un input (ligne) sous forme de texte en entrée

In [None]:
schema = StructType([
    StructField("Age", IntegerType(), True),
    StructField("Attrition", StringType(), True),
    StructField("BusinessTravel", StringType(), True),
    StructField("DailyRate", IntegerType(), True),
    StructField("Department", StringType(), True),
    StructField("DistanceFromHome", IntegerType(), True),
    StructField("Education", IntegerType(), True),
    StructField("EducationField", StringType(), True),
    StructField("EmployeeCount", IntegerType(), True),
    StructField("EmployeeNumber", IntegerType(), True),
    StructField("EnvironmentSatisfaction", IntegerType(), True),
    StructField("Gender", StringType(), True),
    StructField("HourlyRate", IntegerType(), True),
    StructField("JobInvolvement", IntegerType(), True),
    StructField("JobLevel", IntegerType(), True),
    StructField("JobRole", StringType(), True),
    StructField("JobSatisfaction", IntegerType(), True),
    StructField("MaritalStatus", StringType(), True),
    StructField("MonthlyIncome", IntegerType(), True),
    StructField("MonthlyRate", IntegerType(), True),
    StructField("NumCompaniesWorked", IntegerType(), True),
    StructField("Over18", StringType(), True),
    StructField("OverTime", StringType(), True),
    StructField("PercentSalaryHike", IntegerType(), True),
    StructField("PerformanceRating", IntegerType(), True),
    StructField("RelationshipSatisfaction", IntegerType(), True),
    StructField("StandardHours", IntegerType(), True),
    StructField("StockOptionLevel", IntegerType(), True),
    StructField("TotalWorkingYears", IntegerType(), True),
    StructField("TrainingTimesLastYear", IntegerType(), True),
    StructField("WorkLifeBalance", IntegerType(), True),
    StructField("YearsAtCompany", IntegerType(), True),
    StructField("YearsInCurrentRole", IntegerType(), True),
    StructField("YearsSinceLastPromotion", IntegerType(), True),
    StructField("YearsWithCurrManager", IntegerType(), True),
])

def preprocess_data(filename):
    data = spark.read.csv(filename, header=True, inferSchema=True)
    data_cleaned = data
    numerical_columns = ["Age", "DailyRate", "DistanceFromHome", "Education", "EnvironmentSatisfaction", "HourlyRate", "JobInvolvement", "JobLevel", "JobSatisfaction", "MonthlyIncome", "MonthlyRate", "NumCompaniesWorked", "PercentSalaryHike", "PerformanceRating", "RelationshipSatisfaction", "StockOptionLevel", "TotalWorkingYears", "TrainingTimesLastYear", "WorkLifeBalance", "YearsAtCompany", "YearsInCurrentRole", "YearsSinceLastPromotion", "YearsWithCurrManager"]
    categorical_columns = ["BusinessTravel", "Department", "EducationField", "Gender", "JobRole", "MaritalStatus", "OverTime"]
    for column in numerical_columns:
        Q1 = data.approxQuantile(column, [0.25], 0.05)[0]
        Q3 = data.approxQuantile(column, [0.75], 0.05)[0]
        IQR = Q3 - Q1
        lower_range = Q1 - 1.5 * IQR
        upper_range = Q3 + 1.5 * IQR
        data_cleaned = data_cleaned.filter((col(column) >= lower_range) & (col(column) <= upper_range))
    stages = []
    for column in categorical_columns:
        indexer = StringIndexer(inputCol=column, outputCol=f"{column}Index")
        encoder = OneHotEncoder(inputCol=f"{column}Index", outputCol=f"{column}Vec")
        stages += [indexer, encoder]
    for column in numerical_columns:
        assembler = VectorAssembler(inputCols=[column], outputCol=f"{column}Vec")
        scaler = MinMaxScaler(inputCol=f"{column}Vec", outputCol=f"{column}Scaled")
        stages += [assembler, scaler]
    pipeline = Pipeline(stages=stages)
    data_prepared = pipeline.fit(data_cleaned).transform(data_cleaned)
    input_cols = [f"{col}Scaled" for col in numerical_columns] + [f"{col}Vec" for col in categorical_columns]
    assembler = VectorAssembler(inputCols=input_cols, outputCol="features")
    data_features = assembler.transform(data_prepared)
    indexer = StringIndexer(inputCol="Attrition", outputCol="label")
    data_features = indexer.fit(data_features).transform(data_features)
    selector = ChiSqSelector(numTopFeatures=10, featuresCol="features", outputCol="selectedFeatures", labelCol="label")
    selected_data = selector.fit(data_features).transform(data_features)
    attrition_counts = selected_data.groupBy("label").count().collect()
    minority_count = min(attrition_counts, key=lambda x: x["count"])["count"]
    majority_count = max(attrition_counts, key=lambda x: x["count"])["count"]
    balancing_ratio = minority_count / majority_count
    majority_label = max(attrition_counts, key=lambda x: x["count"])["label"]
    minority_label = min(attrition_counts, key=lambda x: x["count"])["label"]
    majority_data = selected_data.filter(col("label") == majority_label)
    minority_data = selected_data.filter(col("label") == minority_label)
    majority_data_downsampled = majority_data.sample(withReplacement=False, fraction=balancing_ratio, seed=42)
    balanced_data = majority_data_downsampled.union(minority_data)

    return balanced_data, majority_data_downsampled, minority_data

def preprocess_input(data, majority_data_downsampled, minority_data):
    data_cleaned = data
    numerical_columns = ["Age", "DailyRate", "DistanceFromHome", "Education", "EnvironmentSatisfaction", "HourlyRate", "JobInvolvement", "JobLevel", "JobSatisfaction", "MonthlyIncome", "MonthlyRate", "NumCompaniesWorked", "PercentSalaryHike", "PerformanceRating", "RelationshipSatisfaction", "StockOptionLevel", "TotalWorkingYears", "TrainingTimesLastYear", "WorkLifeBalance", "YearsAtCompany", "YearsInCurrentRole", "YearsSinceLastPromotion", "YearsWithCurrManager"]
    categorical_columns = ["BusinessTravel", "Department", "EducationField", "Gender", "JobRole", "MaritalStatus", "OverTime"]
    for column in numerical_columns:
        Q1 = data.approxQuantile(column, [0.25], 0.05)[0]
        Q3 = data.approxQuantile(column, [0.75], 0.05)[0]
        IQR = Q3 - Q1
        lower_range = Q1 - 1.5 * IQR
        upper_range = Q3 + 1.5 * IQR
        data_cleaned = data_cleaned.filter((col(column) >= lower_range) & (col(column) <= upper_range))
    stages = []
    for column in categorical_columns:
        indexer = StringIndexer(inputCol=column, outputCol=f"{column}Index")
        assembler = VectorAssembler(inputCols=[f"{column}Index"], outputCol=f"{column}Vec")
        stages += [indexer, assembler]
    for column in numerical_columns:
        assembler = VectorAssembler(inputCols=[column], outputCol=f"{column}Vec")
        scaler = MinMaxScaler(inputCol=f"{column}Vec", outputCol=f"{column}Scaled")
        stages += [assembler, scaler]
    pipeline = Pipeline(stages=stages)
    data_prepared = pipeline.fit(data_cleaned).transform(data_cleaned)
    input_cols = [f"{col}Scaled" for col in numerical_columns] + [f"{col}Vec" for col in categorical_columns]
    assembler = VectorAssembler(inputCols=input_cols, outputCol="features")
    data_features = assembler.transform(data_prepared)
    indexer = StringIndexer(inputCol="Attrition", outputCol="label")
    data_features = indexer.fit(data_features).transform(data_features)
    selector = ChiSqSelector(numTopFeatures=10, featuresCol="features", outputCol="selectedFeatures", labelCol="label")
    selected_data = selector.fit(data_features).transform(data_features)
    balanced_data = majority_data_downsampled.union(minority_data)
    return balanced_data


balanced_data, majority_data_downsampled, minority_data = preprocess_data("HR-Employee-Attrition.csv")


# Un input en entrée
input_str = "34,No,Travel_Rarely,628,Research & Development,8,3,Medical,1,2068,2,Male,82,4,2,Laboratory Technician,3,Married,4404,10228,2,Y,No,12,3,1,80,0,6,3,4,4,3,1,2"
input_list = input_str.split(",")
input_list = [int(x) if x.isdigit() else x for x in input_list]

# créer un dataframe Spark à partir de l'input
input_df = spark.createDataFrame([input_list], schema=schema)

# Préparer les données d'entrée en utilisant le pipeline
input_features = preprocess_input(input_df, majority_data_downsampled, minority_data)

# Appliquer le modèle optimisé (par exemple, best_lr_model) aux données d'entrée
prediction = best_lr_model.transform(input_features)

result = prediction.select("prediction", "probability").first().asDict()
print("Résultat de la prédiciton (0.0 pour No Attrition et 1.0 pour Yes Attrition): " + str(result["prediction"]))