# Préparation des données

In [1]:
import os, sys

#os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars xgboost4j-spark-0.90.jar,xgboost4j-0.90.jar pyspark-shell'

# import findspark
# findspark.init()

# import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import col


In [2]:
 spark = SparkSession\
         .builder\
         .appName("PySpark XGBOOST Telco_Customer_Churn")\
         .getOrCreate()

In [3]:
spark.sparkContext.addPyFile("sparkxgb.zip")

In [4]:
spark.sparkContext.version

'2.4.4'

In [None]:
"""nous définissons un schéma des données que nous lisons à partir du csv. 
C'est généralement une meilleure pratique que de laisser spark pour déduire le schéma, 
car il consomme moins de ressources et nous avons un contrôle total sur les champs."""

#schema = StructType(
  #[StructField('_c0', IntegerType()),
   # StructField("Embarked", StringType())
  #])


In [5]:
import os
os.getcwd()

'/home/jovyan/work/Telco_Customer_Churn'

In [6]:
file = "/home/jovyan/work/Telco_Customer_Churn/data"

df = spark.read.format("csv").option("header", "true") \
                               .option("delimiter", ",") \
                               .option("inferSchema", "true") \
                               .load(file + "/Telco-Customer-Churn.csv")

In [7]:
import pandas as pd 

pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)

df.toPandas().head(10)

Unnamed: 0,customerID,gender,SeniorCitizen,Partner,Dependents,tenure,PhoneService,MultipleLines,InternetService,OnlineSecurity,OnlineBackup,DeviceProtection,TechSupport,StreamingTV,StreamingMovies,Contract,PaperlessBilling,PaymentMethod,MonthlyCharges,TotalCharges,Churn
0,7590-VHVEG,Female,0,Yes,No,1,No,No phone service,DSL,No,Yes,No,No,No,No,Month-to-month,Yes,Electronic check,29.85,29.85,No
1,5575-GNVDE,Male,0,No,No,34,Yes,No,DSL,Yes,No,Yes,No,No,No,One year,No,Mailed check,56.95,1889.5,No
2,3668-QPYBK,Male,0,No,No,2,Yes,No,DSL,Yes,Yes,No,No,No,No,Month-to-month,Yes,Mailed check,53.85,108.15,Yes
3,7795-CFOCW,Male,0,No,No,45,No,No phone service,DSL,Yes,No,Yes,Yes,No,No,One year,No,Bank transfer (automatic),42.3,1840.75,No
4,9237-HQITU,Female,0,No,No,2,Yes,No,Fiber optic,No,No,No,No,No,No,Month-to-month,Yes,Electronic check,70.7,151.65,Yes
5,9305-CDSKC,Female,0,No,No,8,Yes,Yes,Fiber optic,No,No,Yes,No,Yes,Yes,Month-to-month,Yes,Electronic check,99.65,820.5,Yes
6,1452-KIOVK,Male,0,No,Yes,22,Yes,Yes,Fiber optic,No,Yes,No,No,Yes,No,Month-to-month,Yes,Credit card (automatic),89.1,1949.4,No
7,6713-OKOMC,Female,0,No,No,10,No,No phone service,DSL,Yes,No,No,No,No,No,Month-to-month,No,Mailed check,29.75,301.9,No
8,7892-POOKP,Female,0,Yes,No,28,Yes,Yes,Fiber optic,No,No,Yes,Yes,Yes,Yes,Month-to-month,Yes,Electronic check,104.8,3046.05,Yes
9,6388-TABGU,Male,0,No,Yes,62,Yes,No,DSL,Yes,Yes,No,No,No,No,One year,No,Bank transfer (automatic),56.15,3487.95,No


In [None]:
df.printSchema()

In [None]:
df.agg({"TotalCharges": "min"}).collect()[0][0]

In [None]:
#on enléve la variable "customerID"
df = df.drop("customerID")

#On remplace les espaces par des valeurs nulles (NA) dans la colonne TotalCharges
df = df.filter(df['TotalCharges'] != " ")

In [None]:
# On change le schéma de la table
from pyspark.sql.types import IntegerType

df = df.withColumn("TotalCharges", df["TotalCharges"].cast(IntegerType()))

In [None]:
# on importe une classe qui transforme les colonnes qualitatives en colonnes entiers comme (equivalent de LabelEncoder de Scikit-Learn)
from pyspark.ml.feature import StringIndexer

ColString = [item[0] for item in df.dtypes if item[1].startswith('string')]

indexer = [StringIndexer(inputCol=column, outputCol=column+"_index") \
           .setHandleInvalid("keep").fit(df) for column in list(set(ColString))]

In [None]:
df_r = Pipeline(stages=indexer).fit(df).transform(df)

In [None]:
#on construit ensuite un vecteur rassemblant toutes les colonnes explicatives
from pyspark.ml.feature import VectorAssembler

#on rassemble la liste des colonnes numériques que l'on va utiliser
numericsCols = [col for col in list(df.columns) if col not in ColString]
numericsCols += [col+"_index" for col in ColString][:-1]

# on crée un objet qui rassemble toutes ces colonnes dans une colonne nommée var_expl
assembler = VectorAssembler(inputCols=numericsCols, outputCol="Var_expl")

## Modéles avec paramétres optimaux 

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier, \
                                      RandomForestClassifier, \
                                      GBTClassifier


dt = DecisionTreeClassifier(labelCol="Churn_index", featuresCol="Var_expl", maxDepth=8)

rf = RandomForestClassifier(labelCol="Churn_index", featuresCol="Var_expl", numTrees=10)

gb = GBTClassifier(labelCol="Churn_index", featuresCol="Var_expl",maxIter=80, maxDepth = 5)



In [None]:
from sparkxgb import XGBoostClassifier

# Define and train model
xgb = XGBoostClassifier(labelCol="Churn_index", featuresCol="Var_expl", predictionCol="prediction") 

    
    # General Params
#    numWorkers=1, nthread=1, checkpointInterval=-1, checkpointPath="",
# #   useExternalMemory=False, silent=0, missing=float("nan"),
    
#     # Column Params
#     labelCol="Churn_index", featuresCol="Var_expl", predictionCol="prediction", 
# #    weightCol="weight", baseMarginCol="baseMargin", 
    
#     # Booster Params
#     baseScore=0.5, objective="binary:logistic", evalMetric="error", 
#     numClass=2, numRound=2, seed=None,
    
# #    Tree Booster Params
#     eta=0.3, maxDepth=5, minChildWeight=1.0, maxDeltaStep=0.0, subsample=1.0,
#     colsampleBytree=1.0, colsampleBylevel=1.0, reg_lambda=0.0, alpha=0.0, treeMethod="auto",
#     sketchEps=0.03, scalePosWeight=1.0, growPolicy='depthwise', maxBin=256,
# )

In [None]:
from pyspark.ml import Pipeline

# on construit les pipelines
pipeline_dt = Pipeline(stages=[assembler, dt])

pipeline_rf = Pipeline(stages=[assembler, rf])

# pipeline_gb = Pipeline(stages=[assembler, gb])

# pipeline_xgb = Pipeline().setStages([assembler, xgb])

In [None]:
# on divise le DataFrame initial (data) en deux DataFrame
(trainingData, testData) = df_r.randomSplit([0.7, 0.3], seed=24)

In [None]:
# on ajuste les modéles
dt_model = pipeline_dt.fit(trainingData)

rf_model = pipeline_rf.fit(trainingData)

# gb_model = pipeline_gb.fit(trainingData)

#xgb_model = pipeline_xgb.fit(trainingData)

In [None]:
# prévision sur les données de validation
dt_predictions = dt_model.transform(testData)

rf_predictions = rf_model.transform(testData)

# gb_predictions = gb_model.transform(testData)

#xgb_predictions = xgb_model.transform(testData)

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# cette classe calcule l'AUC de notre modéle
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", \
                                          labelCol="Churn_index")

as_dt = evaluator.evaluate(dt_predictions)
as_rf = evaluator.evaluate(rf_predictions)
# as_gb = evaluator.evaluate(gb_predictions)
# as_xgb = evaluator.evaluate(xgb_predictions)

In [None]:
# models_cross = pd.DataFrame({
#     'Model': ['Decision Tree', 'Random Forest', 'Gradient Boosting'],
#     'Accuracy_score': [as_dt, as_rf, as_gb]})
    
# models_cross.sort_values(by='Accuracy_score', ascending=False)

In [None]:
# Write model/classifier
# xgboost.write().overwrite().save("Models/xgboost_class_test")
# xgboost_model.write().overwrite().save("Models/xgboost_class_test.model")

In [None]:
# Load model

In [None]:
# Create ParamGrid for Cross Validation
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(dt.maxDepth, [1, 2, 6, 10])
             .addGrid(dt.maxBins, [20, 40, 80])
             .build())

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=Pipeline(stages=[assembler, dt]), estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(trainingData)
# Takes ~5 minutes

# Use test set to measure the accuracy of our model on new data
predictions = cvModel.transform(testData)

# cvModel uses the best model found from the Cross Validation
# Evaluate best model
evaluator.evaluate(predictions)

In [None]:
paramGrid = (ParamGridBuilder()
             .addGrid(rf.maxDepth, [2, 4, 6, 8, 10])
             .addGrid(rf.maxBins, [20, 60])
             .addGrid(rf.numTrees, [5, 20, 50, 100])
             .build())

# Create 5-fold CrossValidator
cv = CrossValidator(estimator= Pipeline(stages=[assembler, rf]), estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations.  This can take about 6 minutes since it is training over 20 trees!
cvModel = cv.fit(trainingData)

# Use test set here so we can measure the accuracy of our model on new data
predictions = cvModel.transform(testData)

# cvModel uses the best model found from the Cross Validation
# Evaluate best model
evaluator.evaluate(predictions)

In [None]:
spark.stop()