In [3]:
from sklearn.feature_selection import SelectKBest, chi2
from sklearn.preprocessing import OrdinalEncoder
import pandas as pd

In [4]:


# Charger les données
file_path = "churn-bigml-80.csv"
df = pd.read_csv(file_path)

# Sélectionner les caractéristiques et la cible
X = df.drop(columns=["Churn"])  # Caractéristiques
y = df["Churn"]  # Cible

# Encoder les caractéristiques catégoriques en valeurs numériques
encoder = OrdinalEncoder()
X_encoded = encoder.fit_transform(X)

# Sélectionner les k meilleures caractéristiques
k_best = SelectKBest(score_func=chi2, k= 12)  # Choisissez le nombre de caractéristiques que vous souhaitez conserver
X_selected = k_best.fit_transform(X_encoded, y)

# Afficher les noms des caractéristiques sélectionnées
selected_feature_names = X.columns[k_best.get_support(indices=True)]
print("Caractéristiques sélectionnées :", selected_feature_names)


Caractéristiques sélectionnées : Index(['International plan', 'Voice mail plan', 'Number vmail messages',
       'Total day minutes', 'Total day charge', 'Total eve minutes',
       'Total eve charge', 'Total night minutes', 'Total night charge',
       'Total intl minutes', 'Total intl charge', 'Customer service calls'],
      dtype='object')


In [5]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline

# Initialiser SparkSession
spark = SparkSession.builder \
    .appName("Prétraitement et ML avec PySpark") \
    .getOrCreate()

# Charger les données
file_path = "churn-bigml-80.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)

# Convertir la colonne "Churn" en StringType
df = df.withColumn("Churn", df["Churn"].cast("string"))

# Encoder la variable cible "Churn"
indexer = StringIndexer(inputCol="Churn", outputCol="label")
df = indexer.fit(df).transform(df)

# Sélectionner les colonnes pour les caractéristiques
selected_features =['International plan', 'Voice mail plan', 'Number vmail messages',
       'Total day minutes', 'Total day charge', 'Total eve minutes',
       'Total eve charge', 'Total night minutes', 'Total night charge',
       'Total intl minutes', 'Total intl charge', 'Customer service calls']
     

# Supprimer les valeurs manquantes
df = df.na.drop()

# Encoder les variables catégoriques en numériques
indexers = [StringIndexer(inputCol=col, outputCol=col+"_index").fit(df) for col in ['International plan', 'Voice mail plan']]
pipeline = Pipeline(stages=indexers)
df = pipeline.fit(df).transform(df)

# Assembler les caractéristiques sélectionnées dans un vecteur
assembler = VectorAssembler(inputCols=[col+"_index" if col in ['International plan', 'Voice mail plan'] else col for col in selected_features],
                            outputCol="features")
df = assembler.transform(df)

# Afficher le DataFrame résultant
df.select("features", "Churn").show(truncate=False)


+----------------------------------------------------------------+-----+
|features                                                        |Churn|
+----------------------------------------------------------------+-----+
|[0.0,1.0,25.0,265.1,45.07,197.4,16.78,244.7,11.01,10.0,2.7,1.0] |false|
|[0.0,1.0,26.0,161.6,27.47,195.5,16.62,254.4,11.45,13.7,3.7,1.0] |false|
|[0.0,0.0,0.0,243.4,41.38,121.2,10.3,162.6,7.32,12.2,3.29,0.0]   |false|
|[1.0,0.0,0.0,299.4,50.9,61.9,5.26,196.9,8.86,6.6,1.78,2.0]      |false|
|[1.0,0.0,0.0,166.7,28.34,148.3,12.61,186.9,8.41,10.1,2.73,3.0]  |false|
|[1.0,0.0,0.0,223.4,37.98,220.6,18.75,203.9,9.18,6.3,1.7,0.0]    |false|
|[0.0,1.0,24.0,218.2,37.09,348.5,29.62,212.6,9.57,7.5,2.03,3.0]  |false|
|[1.0,0.0,0.0,157.0,26.69,103.1,8.76,211.8,9.53,7.1,1.92,0.0]    |false|
|[1.0,1.0,37.0,258.6,43.96,222.0,18.87,326.4,14.69,11.2,3.02,0.0]|false|
|[0.0,0.0,0.0,187.7,31.91,163.4,13.89,196.0,8.82,9.1,2.46,0.0]   |false|
|[0.0,0.0,0.0,128.8,21.9,104.9,8.92,141.1,6.35,11.2

In [9]:
from pyspark.ml.feature import MinMaxScaler

# Initialiser le MinMaxScaler
scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features")
# Calculer les statistiques de résumé et normaliser les caractéristiques
scaler_model = scaler.fit(df)
df = scaler_model.transform(df)


In [10]:
df=df.select("scaled_features", "label")
df.show()

+--------------------+-----+
|     scaled_features|label|
+--------------------+-----+
|[0.0,1.0,0.5,0.75...|  0.0|
|[0.0,1.0,0.52,0.4...|  0.0|
|[0.0,0.0,0.0,0.69...|  0.0|
|[1.0,0.0,0.0,0.85...|  0.0|
|[1.0,0.0,0.0,0.47...|  0.0|
|[1.0,0.0,0.0,0.63...|  0.0|
|[0.0,1.0,0.48,0.6...|  0.0|
|[1.0,0.0,0.0,0.44...|  0.0|
|[1.0,1.0,0.74,0.7...|  0.0|
|[0.0,0.0,0.0,0.53...|  0.0|
|[0.0,0.0,0.0,0.36...|  0.0|
|[0.0,0.0,0.0,0.44...|  0.0|
|[0.0,0.0,0.0,0.34...|  0.0|
|[0.0,1.0,0.54,0.5...|  0.0|
|[0.0,0.0,0.0,0.54...|  0.0|
|[0.0,1.0,0.66,0.5...|  0.0|
|[0.0,0.0,0.0,0.63...|  0.0|
|[0.0,0.0,0.0,0.44...|  0.0|
|[0.0,0.0,0.0,0.17...|  1.0|
|[0.0,0.0,0.0,0.52...|  0.0|
+--------------------+-----+
only showing top 20 rows



In [12]:

# Imports nécessaires pour les modèles de classification et les évaluateurs
from pyspark.ml.classification import GBTClassifier, RandomForestClassifier, LogisticRegression, NaiveBayes
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql import SparkSession

train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)


# Entraîner le modèle GBTClassifier
gbt = GBTClassifier(featuresCol='scaled_features', labelCol='label', maxBins=64)
gbt_model = gbt.fit(train_data)
predictions_gbt = gbt_model.transform(test_data)

# Évaluer le modèle GBTClassifier sur l'ensemble de test
evaluator_gbt = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="label")
eval_result_gbt = evaluator_gbt.evaluate(predictions_gbt)
print("GBT Classifier AUC:", eval_result_gbt)

# Entraîner le modèle RandomForestClassifier
rf = RandomForestClassifier(featuresCol='scaled_features', labelCol='label', maxBins=64)
rf_model = rf.fit(train_data)
predictions_rf = rf_model.transform(test_data)

# Évaluer le modèle RandomForestClassifier sur l'ensemble de test
evaluator_rf = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="label")
eval_result_rf = evaluator_rf.evaluate(predictions_rf)
print("Random Forest AUC:", eval_result_rf)

# Entraîner le modèle LogisticRegression
lr = LogisticRegression(featuresCol='scaled_features', labelCol='label', maxIter=10)
lr_model = lr.fit(train_data)
predictions_lr = lr_model.transform(test_data)

# Évaluer le modèle LogisticRegression sur l'ensemble de test
evaluator_lr = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="label")
eval_result_lr = evaluator_lr.evaluate(predictions_lr)
print("Logistic Regression AUC:", eval_result_lr)

# Entraîner le modèle Naive Bayes
nb = NaiveBayes(featuresCol='scaled_features', labelCol='label')
nb_model = nb.fit(train_data)
predictions_nb = nb_model.transform(test_data)

# Évaluer le modèle Naive Bayes sur l'ensemble de test
evaluator_nb = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="label")
eval_result_nb = evaluator_nb.evaluate(predictions_nb)
print("Naive Bayes AUC:", eval_result_nb)



GBT Classifier AUC: 0.9220522763176806
Random Forest AUC: 0.9086241562544877
Logistic Regression AUC: 0.8285580927761017
Naive Bayes AUC: 0.36227200919144


In [15]:

gbt_model.save("gbt_model2")