In [0]:
import org.apache.spark._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.Dataset
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.DecisionTreeClassifier
import org.apache.spark.ml.classification.DecisionTreeClassificationModel
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.tuning.ParamGridBuilder
import org.apache.spark.ml.tuning.CrossValidator
import org.apache.spark.ml.feature.VectorAssembler

 

Customer Relationship Management (CRM) is a key element of modern marketing strategies.
The French Telecom company Orange offered a large marketing database to predict the propensity of customers to switch provider (churn).

Orange made available a large dataset of customer data consisting of:
- Training : 50,000 instances including 15,000 inputs variables, and the target value.
- Test : 50,000 instances including 15,000 inputs variables.

See details on https://www.kdd.org/kdd-cup/view/kdd-cup-2009/Intro

The Orange Telecom Churn Dataset used in this labs consists of cleaned customer activity data (features), along with a churn label specifying whether the customer canceled their subscription or not. 
The used dataset is smaller than the original one.
The input csv file has the following format: 
KS,128,415,No,Yes,25,265.1,110,45.07,197.4,99,16.78,244.7,91,11.01,10.0,3,2.7,1,False
The name of the variables are given in the header of the data files.

We use a Scala case class and Structype to define the schema, corresponding to a line in the csv data file.

In [2]:
%spark

case class Account(state: String, len: Integer, acode: String,
    intlplan: String, vplan: String, numvmail: Double,
    tdmins: Double, tdcalls: Double, tdcharge: Double,
    temins: Double, tecalls: Double, techarge: Double,
    tnmins: Double, tncalls: Double, tncharge: Double,
    timins: Double, ticalls: Double, ticharge: Double,
    numcs: Double, churn: String)
    
    
// StructType objects define the schema of Spark DataFrames. 
// StructType objects contain a list of StructField objects that define the name, type, and nullable flag for each column in a DataFrame.    
// A nullable flag switched to true means that the corresponding column could have null values.
val schema = StructType(Array(
    StructField("state", StringType, true),
    StructField("len", IntegerType, true),
    StructField("acode", StringType, true),
    StructField("intlplan", StringType, true),
    StructField("vplan", StringType, true),
    StructField("numvmail", DoubleType, true),
    StructField("tdmins", DoubleType, true),
    StructField("tdcalls", DoubleType, true),
    StructField("tdcharge", DoubleType, true),
    StructField("temins", DoubleType, true),
    StructField("tecalls", DoubleType, true),
    StructField("techarge", DoubleType, true),
    StructField("tnmins", DoubleType, true),
    StructField("tncalls", DoubleType, true),
    StructField("tncharge", DoubleType, true),
    StructField("timins", DoubleType, true),
    StructField("ticalls", DoubleType, true),
    StructField("ticharge", DoubleType, true),
    StructField("numcs", DoubleType, true),
    StructField("churn", StringType, true)
  ))

In [3]:
%spark

val trainDf = spark.read.option("header", "true").csv("/lab7/churn-train-header.csv")
trainDf.count()
trainDf.first()
trainDf.printSchema()

In [4]:
%spark

import spark.implicits._
val train: Dataset[Account] = spark.read.option("header", "true").option("inferSchema", "false")
      .schema(schema).csv("/lab7/churn-train-header.csv").as[Account]
train.count()  
train.cache()
train.first()
train.printSchema()

**Q1: What is the advantage to use the "Dataset" train instead of the "Dataframe" trainDf?**




In [6]:
%spark

val test: Dataset[Account] = spark.read.option("header", "true").option("inferSchema", "false")
      .schema(schema).csv("/lab7/churn-test-header.csv").as[Account]
test.count()      
test.first()
test.printSchema()
    

In [7]:
%spark

train.createOrReplaceTempView("account")
spark.catalog.cacheTable("account") // Caches the specified table in-memory.
train.show

The describe() function performs summary statistics calculations on  numeric columns 

In [9]:
%spark

train.describe("tdcharge", "techarge","tncharge", "numcs").show

**Q2: Compute the pearson correlation between tdmins and tdcharge in the following cell**

In [11]:
%spark

import org.apache.spark.mllib.stat.Statistics

val tdmins = train.select("tdmins").map{row:Row => row.getAs[Double]("tdmins")}.rdd
val tdcharge = train.select( "tdcharge").map{row:Row => row.getAs[Double]("tdcharge")}.rdd

val correlation = ...

 
use Spark SQL to explore the dataset

**Q3: compute the number of labels (churn or no churn) for the train dataset in the following cell. Use sparkSQL.**

In [14]:
%spark

// Compute the number of labels hereafter




In [15]:
%spark

val fractions = Map("False" -> .17, "True" -> 1.0)
//Here we're keeping all instances of the Churn=True class, but downsampling the Churn=False class to a fraction of 388/2278.
val strain = train.stat.sampleBy("churn", fractions, 36L) // Returns a stratified sample without replacement based on the fraction given on each stratum.
// Parameters:
//    col - column that defines strata
//    fractions - sampling fraction for each stratum. If a stratum is not specified, we treat its fraction as zero.
//    seed - random seed

strain.groupBy("churn").count.show
strain.createOrReplaceTempView("account")
spark.catalog.cacheTable("account")

// The resulting table is now balanced ("fractions" defines a probability to take a sample, hence the number of samples is not known in advance)

The zeppelin-context is a system-wide container for common utility functions and user-specific data. 
It implements functions for data input, data display, etc. that are often needed but are not uniformly available in all interpreters.

The zeppelin-context is available as a predefined variable z that can be used by directly invoking its methods. 

In the Apache Spark interpreter, the zeppelin-context provides a show method, which, using Zeppelin's table feature, can be used to nicely display a set of data.

In [17]:
%spark2.spark

z.show(strain.groupBy("churn").avg())

In [18]:
%sql

SELECT churn, avg(numcs) as numcs
FROM account 
GROUP BY churn
ORDER BY numcs

**Q4: Use a query "SELECT ..." to show the average of tdmins, temins and tnmins for each label (churn = true or churn = false) in the following cell.**

In [20]:
%sql

-- Put your query here




In [21]:
%spark

val ntrain = strain.drop("state").drop("acode").drop("vplan").drop("tdcharge").drop("techarge").drop("ticharge")
println(ntrain.count)
ntrain.show
ntrain.cache

In order for the features to be used by a machine learning algorithm, they are transformed into numbers representing the value for each feature


In [23]:
%spark

val ipindexer = new StringIndexer()
      .setInputCol("intlplan")
      .setOutputCol("iplanIndex")



**Q5: Create a transformer to convert the churn value into a 0-1 label**


In [25]:
%spark

// Put the transformer here

val labelindexer = ...


**Q6: Create a transformer to convert all the features in a single vector**


In [27]:
%spark

// Put the transformer here

val featureCols = Array(...)
val assembler = ...


Decision trees have played a significant role in data mining and machine learning since the 1960's. 
They generate white-box classification and regression models which can be used for feature selection and sample prediction. 
The transparency of these models is a big advantage over black-box learners, because the models are easy to understand and interpret, and they can be readily extracted and implemented in any programming language (with nested if-else statements) for use in production environments. 
Furthermore, decision trees require almost no data preparation (ie normalization) and can handle both categorical and continuous data. 
To remedy over-fitting and improve prediction accuracy, decision trees can also be limited to a certain depth or complexity, or bundled into ensembles of trees (ie random forests).

A decision tree is a predictive model which maps observations (features) about an item to conclusions about the item's label or class. 
The model is generated using a top-down approach, where the source dataset is split into subsets using a statistical measure, often in the form of the Gini index or information gain via Shannon entropy. 
This process is applied recursively until a subset contains only samples with the same target class, or is halted by a predefined stopping criteria.

In [29]:
%spark

val dTree = new DecisionTreeClassifier().setLabelCol("label")
      .setFeaturesCol("features")

Set up a pipeline to pass the data through transformers to extract the features and label and pass this to a decision tree estimator to fit the model 

**Q7: Create the Pipeline to train the decision tree**



In [31]:
%spark

// put treeClassifier in a Pipeline here.
 val pipeline = ...



Spark ML supports k-fold cross validation with a transformation/estimation pipeline to try out different combinations of parameters, using a process called grid search. 
You set up a CrossValidator with the parameters to test, an estimator and evaluator for a model selection workflow.


In [33]:
%spark

// set param grid to Search through decision tree's maxDepth parameter for best model
// Deeper trees are potentially more accurate, but are also more likely to overfit.
 val paramGrid = new ParamGridBuilder().addGrid(dTree.maxDepth, Array( 4, 5, 6)).build()
 val evaluator = new BinaryClassificationEvaluator()
      .setLabelCol("label")
      .setRawPredictionCol("prediction")

// Set up 3-fold cross validation with paramGrid
 val crossval = new CrossValidator().setCollectSubModels(true).setEstimator(pipeline)
      .setEvaluator(evaluator)
      .setEstimatorParamMaps(paramGrid).setNumFolds(5)

In [34]:
%spark

val cvModel = crossval.fit(ntrain) // Model from k-fold cross validation

In [35]:
%spark

cvModel.subModels.foreach{println}

In [36]:
%spark

val bestModel = cvModel.bestModel // retrieve only the best model
println("The Best Model:\n--------------------")
// Use Scala's asInstanceOf method to cast an instance to the desired type.
val treeModel = bestModel.asInstanceOf[org.apache.spark.ml.PipelineModel].stages(3).asInstanceOf[DecisionTreeClassificationModel] // extract the decision tree model from the pipeline
println("Learned classification tree model:\n" + treeModel.toDebugString) // print the decision tree: we can read the sequence of decision rules

**Q8: What are the two first features used in the decision tree based classification rule? Give their name with respect to the original dataset.**


In [38]:
%spark// Put here the command to print the original names of the features.println(...) // print the name of the columns which appear first in the decision tree


The actual performance of the model can be determined using the test data set which has not been used for any training or cross-validation activities. 
We'll transform the test set with the model pipeline, which will map the features according to the same recipe. 


**Q9: Appy the learned transformer on the test dataset.**

In [41]:
%spark

// Put the command to compute the "predictions" hereafter.
// The variable "predictions" will contrain the result of the transformer.

//transform the test set with the model pipeline, which will map the features according to the same recipe
val predictions = ...

test.printSchema()

predictions.printSchema()


In [42]:
%spark

predictions.select("label","prediction", "probability").show

Accuracy is measured by the area under the ROC curve. The area measures the ability of the test to correctly classify true positives from false positives. A random predictor would have .5 accuracy. The closer the value is to 1 the better its predictions are. 

**Q10: Compute the area under the ROC curve in the following cell.**

In [44]:
%spark

// Put your command line to compute the variable "accuracy" hereafter

//The evaluator will provide us with the score of the predictions by comparing the prediction to the label
val accuracy = ...

evaluator.explainParams()

In [45]:
%spark

val lp = predictions.select("label", "prediction")
val counttotal = predictions.count()
val correct = lp.filter($"label" === $"prediction").count() // comparison between columns
val wrong = lp.filter(not($"label" === $"prediction")).count()
val ratioWrong = wrong.toDouble / counttotal.toDouble
val ratioCorrect = correct.toDouble / counttotal.toDouble
val truep = lp.filter($"prediction" === 0.0).filter($"label" === $"prediction").count() / counttotal.toDouble
val truen = lp.filter($"prediction" === 1.0).filter($"label" === $"prediction").count() / counttotal.toDouble
val falsep = lp.filter($"prediction" === 1.0).filter(not($"label" === $"prediction")).count() / counttotal.toDouble
val falsen = lp.filter($"prediction" === 0.0).filter(not($"label" === $"prediction")).count() / counttotal.toDouble
println("total count : " + counttotal)
println("correct : " + correct)
println("wrong: " + wrong)
println("ratio correct: " + ratioCorrect)
println("ratio true positive : " + truep)
println("ratio true negative : " + truen)
println("ratio wrong: " + ratioWrong)
println("ratio false positive : " + falsep)
println("ratio false negative : " + falsen)
