In [1]:
%%init_spark
launcher.master="yarn"
launcher.num_executors=6
launcher.executor_cores=2
launcher.executor_memory='6000m'

In [2]:
//Telecom Churn Prediction
//Loading and Exploring Data
val churn_df=spark.read.option("header","true").option("inferschema", "true").csv("/hadoop-user/data/Churn.csv")

churn_df.cache()
churn_df.printSchema()
churn_df.count

Intitializing Scala interpreter ...

Spark Web UI available at http://bd-hm:8088/proxy/application_1575668430188_0003
SparkContext available as 'sc' (version = 2.4.4, master = yarn, app id = application_1575668430188_0003)
SparkSession available as 'spark'


root
 |-- customerID: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- SeniorCitizen: integer (nullable = true)
 |-- Partner: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- tenure: integer (nullable = true)
 |-- PhoneService: string (nullable = true)
 |-- MultipleLines: string (nullable = true)
 |-- InternetService: string (nullable = true)
 |-- OnlineSecurity: string (nullable = true)
 |-- OnlineBackup: string (nullable = true)
 |-- DeviceProtection: string (nullable = true)
 |-- TechSupport: string (nullable = true)
 |-- StreamingTV: string (nullable = true)
 |-- StreamingMovies: string (nullable = true)
 |-- Contract: string (nullable = true)
 |-- PaperlessBilling: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- MonthlyCharges: double (nullable = true)
 |-- TotalCharges: string (nullable = true)
 |-- Churn: string (nullable = true)



churn_df: org.apache.spark.sql.DataFrame = [customerID: string, gender: string ... 19 more fields]
res0: Long = 7043


In [3]:
//Changing "TotalCharges" to double datatype
import org.apache.spark.sql.types.DoubleType
val new_churn= churn_df.withColumn("TotalCharges",churn_df("TotalCharges").cast(DoubleType))
new_churn.printSchema
new_churn.select($"TotalCharges").show(10)



root
 |-- customerID: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- SeniorCitizen: integer (nullable = true)
 |-- Partner: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- tenure: integer (nullable = true)
 |-- PhoneService: string (nullable = true)
 |-- MultipleLines: string (nullable = true)
 |-- InternetService: string (nullable = true)
 |-- OnlineSecurity: string (nullable = true)
 |-- OnlineBackup: string (nullable = true)
 |-- DeviceProtection: string (nullable = true)
 |-- TechSupport: string (nullable = true)
 |-- StreamingTV: string (nullable = true)
 |-- StreamingMovies: string (nullable = true)
 |-- Contract: string (nullable = true)
 |-- PaperlessBilling: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- MonthlyCharges: double (nullable = true)
 |-- TotalCharges: double (nullable = true)
 |-- Churn: string (nullable = true)

+------------+
|TotalCharges|
+------------+
|       29.85|
|      1889.5|
|     

import org.apache.spark.sql.types.DoubleType
new_churn: org.apache.spark.sql.DataFrame = [customerID: string, gender: string ... 19 more fields]


In [4]:
//Dropping the "customerID"
val col=new_churn.columns  
val n=new_churn.columns.length
val ToBeDropped = n-21
 val newDf=new_churn.drop(col(ToBeDropped ))  


col: Array[String] = Array(customerID, gender, SeniorCitizen, Partner, Dependents, tenure, PhoneService, MultipleLines, InternetService, OnlineSecurity, OnlineBackup, DeviceProtection, TechSupport, StreamingTV, StreamingMovies, Contract, PaperlessBilling, PaymentMethod, MonthlyCharges, TotalCharges, Churn)
n: Int = 21
ToBeDropped: Int = 0
newDf: org.apache.spark.sql.DataFrame = [gender: string, SeniorCitizen: int ... 18 more fields]


In [5]:
//Now let's do some exploratory data analysis. First let's check some statistics on each column (number of rows,min, max, standard deviation,etc.).
//In spark, you can use the describe method of the dataframe to get a basic summary statistics.
val stat=newDf.describe()
stat.show()

+-------+------+------------------+-------+----------+------------------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------------+------------------+------------------+-----+
|summary|gender|     SeniorCitizen|Partner|Dependents|            tenure|PhoneService|MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|      Contract|PaperlessBilling|       PaymentMethod|    MonthlyCharges|      TotalCharges|Churn|
+-------+------+------------------+-------+----------+------------------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------------+------------------+------------------+-----+
|  count|  7043|              7043|   7043|      7043|              7043|        7043|         7043|   

stat: org.apache.spark.sql.DataFrame = [summary: string, gender: string ... 19 more fields]


In [6]:
//As we can see because of the number of features it is hard to see the statistics on each feature. 
//It is better to transpose this dataset, that is to flip the rows and columns, so we can have features as rows and their statistics as columns. 
//Unfortunately, spark does not have a built-in feature for transposing a dataframe. Spylon allows us to share spark dataframes between python .We just need to create a temporary view from the dataframe. 
stat.createOrReplaceTempView("stat")

In [7]:
//Then we can use %%python to switch to pyspark.
//We can convert dataframe to a non-distributed python dataframe using spark toPandas method. This method acts similar to collect in that it collects the entire dataset to the driver, except that it collects data as a python dataframe which resides in memory of the driver node. 
//We can use the transpose method now from "pandas" library in python to transpose the dataframe.
%%python
import pandas as pd
stat_python=spark.sql("select * from stat" )
stat_python_nonDistributed=stat_python.toPandas().transpose()
pd.set_option('display.max_columns', 7)
pd.set_option('display.width', 100)

print(stat_python_nonDistributed)

                      0                   1                   2                          3  \
summary           count                mean              stddev                        min   
gender             7043                None                None                     Female   
SeniorCitizen      7043  0.1621468124378816  0.3686116056100135                          0   
Partner            7043                None                None                         No   
Dependents         7043                None                None                         No   
tenure             7043   32.37114865824223  24.559481023094442                          0   
PhoneService       7043                None                None                         No   
MultipleLines      7043                None                None                         No   
InternetService    7043                None                None                        DSL   
OnlineSecurity     7043                None                N

In [None]:
//The "count" column above shows the number of non-null entries for each feature.There are no missing values.

In [None]:
//Feature Engineering

//1.The categorical predictor needs to be converted to a numeric value before we can feed it to a machine learning algorithm, so we can use StringIndexer to convert the categorical variable to category indices.
//And the variables that does not have a natural ordering, for those we should also use one-hot-encoding on top of stringIndexer.

//2.If we look at the summary statistics above, we will see that variables "TotalCharges","tenure","MonthlyCharges" are on different scale . So that tells us that we need to scale our data.
//Let's scale the numeric predictors using StandardScale.

//Before feeding a dataset to a machine learning algorithm in spark, we need to convert it into (features,label) form where features is a numeric vector of predictors and label is a numeric target variable. 

In [8]:
//Using StringIndexer to convert the below categorical variables to numeric values.
import org.apache.spark.ml.feature._

val catColNames=Array("gender","Partner","Dependents","PhoneService","MultipleLines","OnlineSecurity","OnlineBackup","DeviceProtection"
                     ,"TechSupport","StreamingTV","StreamingMovies","PaperlessBilling","Churn","Contract","PaymentMethod","InternetService")

//var indexers : Array[StringIndexer] = Array()
val index = catColNames.map { colName =>

  new StringIndexer().setInputCol(colName).setOutputCol(colName + "_indexed").setHandleInvalid("skip")
}


import org.apache.spark.ml.feature._
catColNames: Array[String] = Array(gender, Partner, Dependents, PhoneService, MultipleLines, OnlineSecurity, OnlineBackup, DeviceProtection, TechSupport, StreamingTV, StreamingMovies, PaperlessBilling, Churn, Contract, PaymentMethod, InternetService)
index: Array[org.apache.spark.ml.feature.StringIndexer] = Array(strIdx_0919b9fc9387, strIdx_07c8c5cd518e, strIdx_3817077f4bb8, strIdx_04d88c85877d, strIdx_229e19e1d11b, strIdx_b318ddc5c53f, strIdx_2b8e0ec64b4e, strIdx_f4a4905bcc81, strIdx_d7be157e0ae6, strIdx_76f25977de6a, strIdx_724dec617200, strIdx_da083bd8a65f, strIdx_e2f7a5c42d10, strIdx_ec886a84b63b, strIdx_c5416cb7375b, strIdx_170ac90360a3)


In [9]:
//Using OneHotEncoder to convert non-ordering categorical values to numeric values.

val encoder= new OneHotEncoderEstimator().setInputCols(Array("Contract_indexed","PaymentMethod_indexed","InternetService_indexed")).setOutputCols(Array("Contract_coded","PaymentMethod_coded","InternetService_coded"))


encoder: org.apache.spark.ml.feature.OneHotEncoderEstimator = oneHotEncoder_5afe9ce7208e


In [10]:

//Scaling the numeric variables.
import org.apache.spark.ml.feature._

//Use VectorAssesmbler to assemble numeric features into a vector
val vectorizer_numeric=new VectorAssembler().setInputCols(Array("TotalCharges","tenure","MonthlyCharges")).setOutputCol("numeric_features").setHandleInvalid("skip")



//Create an estimator to standardize the numeric feature

val standardizer=new StandardScaler().setWithMean(true).setInputCol("numeric_features").setOutputCol("numeric_features_vector")




import org.apache.spark.ml.feature._
vectorizer_numeric: org.apache.spark.ml.feature.VectorAssembler = vecAssembler_84dc4635758d
standardizer: org.apache.spark.ml.feature.StandardScaler = stdScal_964024eea435


In [11]:
//Now let's add the all converted cataegorical variables and numerical feature vector to our feature vector using VectorAssembler again
val vectorizer_all=new VectorAssembler().setInputCols(Array("numeric_features_vector","Contract_coded","PaymentMethod_coded","InternetService_coded","gender_indexed","Partner_indexed","Dependents_indexed","PhoneService_indexed","MultipleLines_indexed","OnlineSecurity_indexed","OnlineBackup_indexed","DeviceProtection_indexed","TechSupport_indexed","StreamingTV_indexed","StreamingMovies_indexed","PaperlessBilling_indexed","SeniorCitizen")).setOutputCol("features")
                                                                      

vectorizer_all: org.apache.spark.ml.feature.VectorAssembler = vecAssembler_d7a3817d94cb


In [12]:
//Data analysis and Expiremental Results
//Building Machine Learning pipelines
//Logistic Regression
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.tuning._
import org.apache.spark.ml.evaluation._
import org.apache.spark.ml.feature._
val lr = new LogisticRegression().setLabelCol("Churn_indexed").setFeaturesCol("features")
val paramGrid =new ParamGridBuilder()
             .addGrid(lr.regParam, Array(0.01, 0.5, 2.0))
             .addGrid(lr.elasticNetParam, Array(0.0, 0.5, 1.0))
             .build()
val evaluator = new BinaryClassificationEvaluator().setRawPredictionCol("rawPrediction").setLabelCol("Churn_indexed").setMetricName("areaUnderROC")
val cv = new CrossValidator().setEstimator(lr).setEvaluator(evaluator).setEstimatorParamMaps(paramGrid).setNumFolds(3)

//creating a pipeline
val pipeline = new Pipeline().setStages(index++Array(encoder,vectorizer_numeric,standardizer,vectorizer_all,cv))

//dividing data into training and test data
val Array(training,testing)=newDf.randomSplit(Array(0.8,0.2),111)


//Fit the training data to the pipeline
val pipelineModel = pipeline.fit(training)

// Make predictions.
val predictions = pipelineModel.transform(testing)

// Select example rows to display.
predictions.select("Churn_indexed", "prediction", "probability").show(5)

//Evaluating our model
val AUC = evaluator.evaluate(predictions)
println(s"Area under ROC curve(AUC) for LR on test data = $AUC")




+-------------+----------+--------------------+
|Churn_indexed|prediction|         probability|
+-------------+----------+--------------------+
|          0.0|       0.0|[0.50102509612074...|
|          1.0|       1.0|[0.36457260457959...|
|          0.0|       0.0|[0.64255808536684...|
|          1.0|       0.0|[0.63571532550880...|
|          1.0|       0.0|[0.67587957128931...|
+-------------+----------+--------------------+
only showing top 5 rows

Area under ROC curve(AUC) for LR on test data = 0.8553162422080097


import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.tuning._
import org.apache.spark.ml.evaluation._
import org.apache.spark.ml.feature._
lr: org.apache.spark.ml.classification.LogisticRegression = logreg_b12a43472d40
paramGrid: Array[org.apache.spark.ml.param.ParamMap] =
Array({
	logreg_b12a43472d40-elasticNetParam: 0.0,
	logreg_b12a43472d40-regParam: 0.01
}, {
	logreg_b12a43472d40-elasticNetParam: 0.0,
	logreg_b12a43472d40-regParam: 0.5
}, {
	logreg_b12a43472d40-elasticNetParam: 0.0,
	logreg_b12a43472d40-regParam: 2.0
}, {
	logreg_b12a43472d40-elasticNetParam: 0.5,
	logreg_b12a43472d40-regParam: 0.01
}, {
	logreg_b12a43472d40-elasticNetParam: 0.5,
	logreg_b12a43472d40-regParam: 0.5
}, {
	logreg_b1...

In [16]:
//Random Forest Classifier
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.tuning._
import org.apache.spark.ml.evaluation._
import org.apache.spark.ml.feature._

val rf = new RandomForestClassifier().setLabelCol("Churn_indexed").setFeaturesCol("features")
val paramGrid =new ParamGridBuilder()
             .addGrid(rf.maxDepth, Array(2, 5))
             .addGrid(rf.numTrees, Array(5, 20))
             .build()

val evaluator = new BinaryClassificationEvaluator().setRawPredictionCol("rawPrediction").setLabelCol("Churn_indexed").setMetricName("areaUnderROC")


val cv_rf = new CrossValidator().setEstimator(rf).setEvaluator(evaluator).setEstimatorParamMaps(paramGrid).setNumFolds(3)

//creating a pipeline
val pipeline_rf = new Pipeline().setStages(index++Array(encoder,vectorizer_numeric,standardizer,vectorizer_all,cv_rf))

//dividing the data into test and training data
val Array(training,testing)=newDf.randomSplit(Array(0.8,0.2),111)

//Fit the training data to the pipeline
val pipelineModel_rf = pipeline_rf.fit(training)

// Make predictions.
val predictions = pipelineModel_rf.transform(testing)
predictions.select("Churn_indexed", "prediction", "probability").show(5)

//Evaluating the model
val AUC = evaluator.evaluate(predictions)
println(s"Area under ROC curve(AUC) for RF on test data = $AUC")

+-------------+----------+--------------------+
|Churn_indexed|prediction|         probability|
+-------------+----------+--------------------+
|          0.0|       1.0|[0.47293930435641...|
|          1.0|       1.0|[0.36700763737587...|
|          0.0|       0.0|[0.61000998737870...|
|          1.0|       0.0|[0.53118530289351...|
|          1.0|       0.0|[0.55768457098996...|
+-------------+----------+--------------------+
only showing top 5 rows

Area under ROC curve(AUC) for RF on test data = 0.853260590961288


import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.tuning._
import org.apache.spark.ml.evaluation._
import org.apache.spark.ml.feature._
rf: org.apache.spark.ml.classification.RandomForestClassifier = rfc_89d79dd391a4
paramGrid: Array[org.apache.spark.ml.param.ParamMap] =
Array({
	rfc_89d79dd391a4-maxDepth: 2,
	rfc_89d79dd391a4-numTrees: 5
}, {
	rfc_89d79dd391a4-maxDepth: 2,
	rfc_89d79dd391a4-numTrees: 20
}, {
	rfc_89d79dd391a4-maxDepth: 5,
	rfc_89d79dd391a4-numTrees: 5
}, {
	rfc_89d79dd391a4-maxDepth: 5,
	rfc_89d79dd391a4-numTrees: 20
})
evaluator: org.apache.spark.ml.evaluation.BinaryClassificationEvaluator = binEval_3d3e2b723753
cv_rf: org.apache.spark.ml.tuning.CrossValidator = cv_e9b6f9036...

In [24]:
//Gradient Boosted classification Tree
import org.apache.spark.ml.classification.{GBTClassificationModel, GBTClassifier}
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.tuning._
import org.apache.spark.ml.evaluation._
import org.apache.spark.ml.feature._



// Create a GBT model.
val gbt = new GBTClassifier()
  .setLabelCol("Churn_indexed")
  .setFeaturesCol("features")



//Create ParamGrid for Cross Validation
val paramGrid = new ParamGridBuilder()
             .addGrid(gbt.maxDepth, Array(2,5))
             .addGrid(gbt.maxIter, Array(10, 20,100))
             .build()
val evaluator = new BinaryClassificationEvaluator().setRawPredictionCol("rawPrediction").setLabelCol("Churn_indexed").setMetricName("areaUnderROC")

val cv_gbt = new CrossValidator().setEstimator(gbt).setEvaluator(evaluator).setEstimatorParamMaps(paramGrid).setNumFolds(3)

//creating pipeline
val pipeline_gbt = new Pipeline().setStages(index++Array(encoder,vectorizer_numeric,standardizer,vectorizer_all,cv_gbt))

//dividing data into training and testing data
val Array(training,testing)=newDf.randomSplit(Array(0.8,0.2),111)


//Fit the training data to the pipeline
val pipelineModel_gbt = pipeline_gbt.fit(training)

// Make predictions.
val predictions = pipelineModel_gbt.transform(testing)

// Select example rows to display.
predictions.select("Churn_indexed", "prediction", "probability").show(5)

//Evaluating model using AUC
val AUC = evaluator.evaluate(predictions)
println(s"Area under ROC curve(AUC) for GBT on test data = $AUC")



+-------------+----------+--------------------+
|Churn_indexed|prediction|         probability|
+-------------+----------+--------------------+
|          0.0|       1.0|[0.27841581440181...|
|          1.0|       1.0|[0.19726526917774...|
|          0.0|       1.0|[0.44905156073738...|
|          1.0|       1.0|[0.42783342407179...|
|          1.0|       0.0|[0.51401154785198...|
+-------------+----------+--------------------+
only showing top 5 rows

Area under ROC curve(AUC) for GBT on test data = 0.8580934219160142


import org.apache.spark.ml.classification.{GBTClassificationModel, GBTClassifier}
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.tuning._
import org.apache.spark.ml.evaluation._
import org.apache.spark.ml.feature._
gbt: org.apache.spark.ml.classification.GBTClassifier = gbtc_cf193053a297
paramGrid: Array[org.apache.spark.ml.param.ParamMap] =
Array({
	gbtc_cf193053a297-maxDepth: 2,
	gbtc_cf193053a297-maxIter: 10
}, {
	gbtc_cf193053a297-maxDepth: 5,
	gbtc_cf193053a297-maxIter: 10
}, {
	gbtc_cf193053a297-maxDepth: 2,
	gbtc_cf193053a297-maxIter: 20
}, {
	gbtc_cf193053a297-maxDepth: 5,
	gbtc_cf193053a297-maxIter: 20
}, {
	gbtc_cf193053a297-maxDepth: 2,
	gbtc_cf193053a297-maxIter: 100
}, {
	gbtc_cf193053a297-maxDepth: 5,
	gbtc_cf193053a297-maxIter: 100
})
eval...

In [None]:
// After comparing the AUC , it can be seen that the GBT performed a little better than the other two models in predicting the "Churn" on the test set.

In [42]:
//variable importance.
import org.apache.spark.mllib.linalg._
import org.apache.spark.ml.tuning

val featureImportance=pipelineModel_gbt.stages(20).asInstanceOf[CrossValidatorModel].bestModel.asInstanceOf[GBTClassificationModel].featureImportances


val features= Array("numeric_features_vector","Contract_coded","PaymentMethod_coded","InternetService_coded","gender_indexed","Partner_indexed","Dependents_indexed","PhoneService_indexed","MultipleLines_indexed","OnlineSecurity_indexed","OnlineBackup_indexed","DeviceProtection_indexed","TechSupport_indexed","StreamingTV_indexed","StreamingMovies_indexed","PaperlessBilling_indexed","SeniorCitizen")
                                                                    

val res = features.zip(featureImportance.toArray).sortBy(-_._2).foreach(println)

(Contract_coded,0.20250270224942046)
(PaymentMethod_coded,0.12142205395270278)
(gender_indexed,0.11467472371339353)
(InternetService_coded,0.10359198090066042)
(numeric_features_vector,0.08012441503393478)
(StreamingMovies_indexed,0.0636671912575563)
(Partner_indexed,0.03660985003954302)
(PaperlessBilling_indexed,0.03124031883418768)
(SeniorCitizen,0.02791977184363902)
(StreamingTV_indexed,0.012732268199435655)
(MultipleLines_indexed,0.009072683185864731)
(OnlineBackup_indexed,0.007628462681373407)
(TechSupport_indexed,0.0044555369839919725)
(PhoneService_indexed,0.004179948818404411)
(OnlineSecurity_indexed,0.003951717177411724)
(Dependents_indexed,0.0)
(DeviceProtection_indexed,0.0)


import org.apache.spark.mllib.linalg._
import org.apache.spark.ml.tuning
featureImportance: org.apache.spark.ml.linalg.Vector = (23,[0,1,2,3,4,5,7,8,9,10,12,13,14,15,16,17,18,19,20,21,22],[0.08012441503393478,0.20250270224942046,0.12142205395270278,0.10359198090066042,0.11467472371339353,0.03660985003954302,0.004179948818404411,0.009072683185864731,0.003951717177411724,0.007628462681373407,0.0044555369839919725,0.012732268199435655,0.0636671912575563,0.03124031883418768,0.02791977184363902,0.008318522522396532,0.046461420306766435,0.029377872905235813,0.02533862707868604,0.04182544080445914,0.024904491510936183])
features: Array[String] = Array(numeric_features_vector, Contract_coded, PaymentMethod_coded, InternetService_coded, gender_indexed, Partner_indexed, Dependents_indexed, PhoneS...

In [None]:
// The result shows that Contract_coded,PaymentMethod_coded,gender_indexed and InternetService_coded were the most important variables in predicting the target value"Churn"