## Projet : Employee-Attrition

In [3]:
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier,DecisionTreeClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator, RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import stddev, avg

## chargement du dataset

In [4]:
spark = SparkSession.builder.appName('attrition').getOrCreate()
file_location = "HR-Employee-Attrition.csv"
file_type = "csv"
# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","
# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
.option("inferSchema", infer_schema) \
.option("header", first_row_is_header) \
.option("sep", delimiter) \
.load(file_location)

df = df.drop("Over18")
df = df.drop("EmployeeCount")
df = df.drop("StandardHours")

## grid pour les modeles

In [5]:
gbt = GBTClassifier(featuresCol='features', labelCol='label')
param_grid_gbt = ParamGridBuilder() \
    .addGrid(gbt.maxDepth, [2, 4, 6]) \
    .addGrid(gbt.maxBins, [20, 30]) \
    .addGrid(gbt.maxIter, [10, 20, 30]) \
    .build()

rf = RandomForestClassifier(featuresCol='features', labelCol='label')
param_grid_rf = ParamGridBuilder() \
    .addGrid(rf.maxDepth, [2]) \
    .addGrid(rf.maxBins, [20]) \
    .addGrid(rf.numTrees, [10]) \
    .build()

lr = LogisticRegression(featuresCol='features', labelCol='label')
param_grid_lr = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.05, 0.1]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

## fonctions

In [6]:
def convert_to_int(df):
    # convertir les colonnes string en double quand cela est possible
    for column in df.columns:
        try:
            if int(df.select(column).first()[0]):
                df = df.withColumn(column, col(column).cast("double"))
        except:
            pass
    return df
        
def remove_outliers(df):

    # Boucle pour enlever les valeurs aberrantes de chaque colonne
    for col in df.columns:
        # Calcul des statistiques pour la colonne
        stats = df.select(avg(col), stddev(col)).first()
        mean = stats[0]
        std = stats[1]  
        if mean is not None and std is not None:
            # Calcul du seuil pour déterminer les valeurs aberrantes
            threshold = 3 * std + mean     
            # Suppression des valeurs aberrantes pour la colonne
            df = df.filter(df[col] <= threshold)
    return df


def assembler_for_pipeline(df, label):
    categoricalColumns = [col for (col, dtype) in df.dtypes if dtype == "string" and col != label]
    stages = []

    for categoricalCol in categoricalColumns:
        stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
        encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
        stages += [stringIndexer, encoder]
    
    label_stringIdx = StringIndexer(inputCol=label, outputCol="label")
    stages += [label_stringIdx]

    numericCols = [col for (col, dtype) in df.dtypes if dtype.startswith('double') and col != "label"]
    assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
    assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
    return assembler, stages

def model_for_pipeline(classifier, param_grid):
    evaluator = BinaryClassificationEvaluator(labelCol='label')
    cv = CrossValidator(estimator=classifier, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=3)
    return cv

def view_feature_importances(df):
    
    assembler, stages = assembler_for_pipeline(df, 'Attrition')
    rf = RandomForestClassifier(labelCol="label", featuresCol="features")
    paramGrid = ParamGridBuilder() \
        .addGrid(rf.numTrees, [10, 20]) \
        .addGrid(rf.maxDepth, [5, 10]) \
        .build()
    
    evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
    train, test = df.randomSplit([0.8, 0.2])
    stages += [assembler, rf]
    pipeline = Pipeline(stages=stages)
    
    pipelineModel = pipeline.fit(train)
    predictions = pipelineModel.transform(test)
    auc = evaluator.evaluate(predictions)
    print("AUC-ROC = %g" % auc)
    
    importances = pipelineModel.stages[-1].featureImportances.toArray()
    cols = df.columns
    selectedcols = ["label", "features"] + cols
    cols_importances = list(zip(selectedcols, importances))
    sorted_cols_importances = sorted(cols_importances, key=lambda x: x[1], reverse=True)
    print("Colonnes triées par ordre d'importance décroissante :")
    for col, importance in sorted_cols_importances:
        print(col, "=", importance)
        
    return sorted_cols_importances
        
def pipelinedata(df, assembler, stages, model):
    
    stages += [assembler, model]
    train, test = df.randomSplit([0.8, 0.2])
    pipeline = Pipeline(stages=stages)
    pipelineModel = pipeline.fit(train)
    predictions = pipelineModel.transform(test)
    predictions.take(1)
    selected = predictions.select("label", "prediction", "rawPrediction", "probability")
    display(selected)
    return model, predictions, selected

def metrics(predictions, model):

    evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="label", metricName="areaUnderROC")
    auc_roc = evaluator.evaluate(predictions)
    evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="accuracy")
    accuracy = evaluator.evaluate(predictions)
    evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="label", metricName="rmse")
    rmse = evaluator.evaluate(predictions)

    print(model + ": courbe ROC = %g" % auc_roc)
    print(model + ": précision = %g" % accuracy)
    print(model + ": RMSE = %g" % rmse)
    
def filter_most_important_columns(df, sorted_cols_importances):
    selected_cols = [col for col, importance in sorted_cols_importances if importance > 0.01]
    if "features" in selected_cols:
        selected_cols.remove("features")
    if "label" in selected_cols:
        selected_cols.remove("label")
    df = df.select(selected_cols)
    return df

## Nettoyage

In [7]:
df = convert_to_int(df)
df = remove_outliers(df)

## Analyse de l'importance des colonnes

In [8]:
sorted_cols_importances = view_feature_importances(df)

AUC-ROC = 0.757249
Colonnes triées par ordre d'importance décroissante :
MonthlyRate = 0.09995755918827355
NumCompaniesWorked = 0.04767685195930893
MonthlyIncome = 0.027335017109355646
Age = 0.013141824061761028
EmployeeNumber = 0.012896362693972225
YearsSinceLastPromotion = 0.012353609360885384
features = 0.010871588843403399
TotalWorkingYears = 0.01081078066777602
OverTime = 0.010579889291449242
EnvironmentSatisfaction = 0.009779080343573937
YearsWithCurrManager = 0.009198619971385671
Department = 0.007645710022962007
Gender = 0.007252614369424988
label = 0.006228258243485643
MaritalStatus = 0.00558500168252136
JobSatisfaction = 0.0034972692462091
YearsAtCompany = 0.0032789144803145403
JobLevel = 0.003188733095991534
StockOptionLevel = 0.0026172003735815373
PercentSalaryHike = 0.002437299009961595
PerformanceRating = 0.0024107384981124375
TrainingTimesLastYear = 0.0018966803895070741
BusinessTravel = 0.0017382937130864412
JobInvolvement = 0.0016130330273097613
Attrition = 0.001064934

## Garder les colonnes les plus importantes

In [130]:
df = filter_most_important_columns(df, sorted_cols_importances)

In [131]:
df.show(10)

+-----------+------------------+-------------+--------+--------+---------+--------------------+------+-------------+
|MonthlyRate|NumCompaniesWorked|MonthlyIncome|JobLevel|OverTime|Attrition|YearsWithCurrManager|Gender|MaritalStatus|
+-----------+------------------+-------------+--------+--------+---------+--------------------+------+-------------+
|    19479.0|               8.0|       5993.0|     2.0|     Yes|      Yes|                 5.0|Female|       Single|
|    24907.0|               1.0|       5130.0|     2.0|      No|       No|                 7.0|  Male|      Married|
|     2396.0|               6.0|       2090.0|     1.0|     Yes|      Yes|                 0.0|  Male|       Single|
|    23159.0|               1.0|       2909.0|     1.0|     Yes|       No|                 0.0|Female|      Married|
|    16632.0|               9.0|       3468.0|     1.0|      No|       No|                 2.0|  Male|      Married|
|    11864.0|               0.0|       3068.0|     1.0|      No|

## Modelisation avec les champs filtrés

In [132]:
df_gbt = df
df_rf = df
df_lr = df

### random forest

In [93]:
df_rf = convert_to_int(df_rf)
df_rf = remove_outliers(df_rf)

In [94]:
assembler, stages = assembler_for_pipeline(df_rf, 'Attrition')

In [95]:
model_rf = model_for_pipeline(rf, param_grid_rf)
model_rf, predictions_rf, selected_rf = pipelinedata(df_rf, assembler, stages, model_rf)
metrics(predictions_rf, 'rf')

DataFrame[label: double, prediction: double, rawPrediction: vector, probability: vector]

rf: courbe ROC = 0.704139
rf: précision = 0.848361
rf: RMSE = 0.389409


### gradient boosting

In [98]:
df_gbt = convert_to_int(df_gbt)
df_gbt = remove_outliers(df_gbt)

In [100]:
assembler, stages = assembler_for_pipeline(df_gbt, 'Attrition')

In [101]:
model_gbt = model_for_pipeline(gbt, param_grid_gbt)
model_gbt, predictions_gbt, selected_gbt = pipelinedata(df_gbt, assembler, stages, model_gbt)
metrics(predictions_gbt, 'gbt')

DataFrame[label: double, prediction: double, rawPrediction: vector, probability: vector]

gbt: courbe ROC = 0.795025
gbt: précision = 0.828685
gbt: RMSE = 0.413902


### logistic regression

In [105]:
df_lr = convert_to_int(df_lr)
df_lr = remove_outliers(df_lr)

In [106]:
assembler, stages = assembler_for_pipeline(df_lr, 'Attrition')

In [107]:
model_lr = model_for_pipeline(lr, param_grid_lr)
model_lr, predictions_lr, selected_lr = pipelinedata(df_lr, assembler, stages, model_lr)
metrics(predictions_lr, 'lr')

DataFrame[label: double, prediction: double, rawPrediction: vector, probability: vector]

lr: courbe ROC = 0.76598
lr: précision = 0.818898
lr: RMSE = 0.425561


### j'ai pas utilisé cette partie, je te laisse modifié si besoin

In [61]:
# #paramétres
# def params(nom, nb_model, train, test):
#     liste = range(1, 20)

#     best_accuracy = 0
#     best_nb = 0

#     for nb in liste:
#         model_choix = [LogisticRegression(labelCol = 'label', featuresCol = 'features', maxIter= nb),
#         DecisionTreeClassifier(labelCol="label", featuresCol="features", maxDepth=nb, seed =40),
#         GBTClassifier(labelCol="label", featuresCol="features", maxIter=nb, seed =40),
#         RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=nb, seed =40)]

#         model = model_choix[nb_model]
#         trained_model = model.fit(train)
#         predictions = trained_model.transform(test)
#         evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="accuracy")
#         accuracy = evaluator.evaluate(predictions)
    
#         # meilleure précision
#         if accuracy > best_accuracy:
#             best_accuracy = accuracy
#             best_nb = nb
        
#     print("la meilleure précision pour " + nom + " :", best_accuracy)
#     print("le meilleur nombre pour paramétre " + nom + " :", best_nb)

In [None]:
import shutil
modelg.save("Model")
shutil.make_archive('Model', 'zip','./content/Model')