### A marketing agency has many customers that use their service to produce ads for the client/customer websites. They've noticed that they have quite a bit of churn in clients. They basically randomly assign account managers right now. Thus, predicting which customers will Churn(stop buying the service), which will help the agency to assign them an account manager.

### Initialize and create a spark session

In [1]:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().appName("Churn").getOrCreate()

Intitializing Scala interpreter ...

Spark Web UI available at http://Varun-CK:4040
SparkContext available as 'sc' (version = 2.3.0, master = local[*], app id = local-1577622954730)
SparkSession available as 'spark'


2019-12-29 18:06:07 WARN  SparkContext:66 - Using an existing SparkContext; some configuration may not take effect.


import org.apache.spark.sql.SparkSession
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@3fc3c4cc


### Initialize Logger

In [2]:
import org.apache.log4j._
Logger.getLogger("org").setLevel(Level.ERROR)

import org.apache.log4j._


### Import statements to setup ML for Logistic Regression

In [3]:
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{StringIndexer,VectorAssembler,OneHotEncoder}
import org.apache.spark.ml.linalg.Vectors

import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler, OneHotEncoder}
import org.apache.spark.ml.linalg.Vectors


### Using Spark to read the customer churn data set

In [4]:
val data = spark.read.options(Map(("header","true"),("inferSchema","true"))).csv("customer_churn.csv")

data: org.apache.spark.sql.DataFrame = [Names: string, Age: double ... 8 more fields]


### Printing the first row of the dataframe

In [5]:
data.head(1)

res1: Array[org.apache.spark.sql.Row] = Array([Cameron Williams,42.0,11066.8,0,7.22,8.0,2013-08-30 07:00:40.0,10265 Elizabeth Mission Barkerburgh, AK 89518,Harvey LLC,1])


### Printing the schema of the dataframe

In [6]:
data.printSchema

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: integer (nullable = true)



### Show

In [7]:
data.show(5)

+----------------+----+--------------+---------------+-----+---------+-------------------+--------------------+--------------------+-----+
|           Names| Age|Total_Purchase|Account_Manager|Years|Num_Sites|       Onboard_date|            Location|             Company|Churn|
+----------------+----+--------------+---------------+-----+---------+-------------------+--------------------+--------------------+-----+
|Cameron Williams|42.0|       11066.8|              0| 7.22|      8.0|2013-08-30 07:00:40|10265 Elizabeth M...|          Harvey LLC|    1|
|   Kevin Mueller|41.0|      11916.22|              0|  6.5|     11.0|2013-08-13 00:38:46|6157 Frank Garden...|          Wilson PLC|    1|
|     Eric Lozano|38.0|      12884.75|              0| 6.67|     12.0|2016-06-29 06:20:07|1331 Keith Court ...|Miller, Johnson a...|    1|
|   Phillip White|42.0|       8010.76|              0| 6.71|     10.0|2014-04-22 12:43:12|13120 Daniel Moun...|           Smith Inc|    1|
|  Cynthia Norton|37.0|    

### Count

In [8]:
data.count

res4: Long = 900


### Count by dropping duplicates

In [9]:
data.na.drop().count

res5: Long = 900


### Checking out whether the string columns "Names", "Location" and "Company"are useful or not (to check whether they are categorical columns or not)

In [10]:
data.groupBy("Names").count().count()

res6: Long = 899


In [11]:
data.groupBy("Location").count().count()

res7: Long = 900


In [12]:
data.groupBy("Company").count().count()

res8: Long = 873


### Ignoring the categorical columns since they are not useful and checking out timestamp column "Onboard_date"

In [13]:
data.select("Onboard_date").show(3)

+-------------------+
|       Onboard_date|
+-------------------+
|2013-08-30 07:00:40|
|2013-08-13 00:38:46|
|2016-06-29 06:20:07|
+-------------------+
only showing top 3 rows



### Checking out whether `Year` is useful or not in the column `Onboard_date`

In [14]:
data.groupBy(year($"Onboard_date")).count().count()

res10: Long = 11


### Creating a new column `Onboard_Year`

In [15]:
var filtered_data = data.withColumn("Onboard_Year",year($"Onboard_date"))

filtered_data: org.apache.spark.sql.DataFrame = [Names: string, Age: double ... 9 more fields]


In [16]:
filtered_data.printSchema

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: integer (nullable = true)
 |-- Onboard_Year: integer (nullable = true)



In [17]:
filtered_data = filtered_data.drop("Names","Location","Company","Onboard_date")

filtered_data: org.apache.spark.sql.DataFrame = [Age: double, Total_Purchase: double ... 5 more fields]


In [18]:
filtered_data.show(5)

+----+--------------+---------------+-----+---------+-----+------------+
| Age|Total_Purchase|Account_Manager|Years|Num_Sites|Churn|Onboard_Year|
+----+--------------+---------------+-----+---------+-----+------------+
|42.0|       11066.8|              0| 7.22|      8.0|    1|        2013|
|41.0|      11916.22|              0|  6.5|     11.0|    1|        2013|
|38.0|      12884.75|              0| 6.67|     12.0|    1|        2016|
|42.0|       8010.76|              0| 6.71|     10.0|    1|        2014|
|37.0|       9191.58|              0| 5.56|      9.0|    1|        2016|
+----+--------------+---------------+-----+---------+-----+------------+
only showing top 5 rows



In [19]:
filtered_data.printSchema

root
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Churn: integer (nullable = true)
 |-- Onboard_Year: integer (nullable = true)



### Assembling all the features to a single vector column "features"

In [20]:
val assembler = new VectorAssembler().setInputCols(Array("Age","Total_Purchase","Account_Manager","Years","Num_Sites"
                                                         ,"Onboard_Year")).setOutputCol("features")

assembler: org.apache.spark.ml.feature.VectorAssembler = vecAssembler_5f8d16d37bcc


In [21]:
val output = assembler.transform(filtered_data)

output: org.apache.spark.sql.DataFrame = [Age: double, Total_Purchase: double ... 6 more fields]


In [22]:
val final_data = output.select("Churn","features")

final_data: org.apache.spark.sql.DataFrame = [Churn: int, features: vector]


In [23]:
final_data.show(5,false)

+-----+------------------------------------+
|Churn|features                            |
+-----+------------------------------------+
|1    |[42.0,11066.8,0.0,7.22,8.0,2013.0]  |
|1    |[41.0,11916.22,0.0,6.5,11.0,2013.0] |
|1    |[38.0,12884.75,0.0,6.67,12.0,2016.0]|
|1    |[42.0,8010.76,0.0,6.71,10.0,2014.0] |
|1    |[37.0,9191.58,0.0,5.56,9.0,2016.0]  |
+-----+------------------------------------+
only showing top 5 rows



### Splitting the resultant data into training data and testing data,

<code>
<b>Training data is to train the model</b>
<b>Testing data is to test the builted model</b>
</code>

In [24]:
val Array(train_data,test_data) = final_data.randomSplit(Array(0.7,0.3))

train_data: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Churn: int, features: vector]
test_data: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Churn: int, features: vector]


In [25]:
final_data.describe().show()

+-------+-------------------+
|summary|              Churn|
+-------+-------------------+
|  count|                900|
|   mean|0.16666666666666666|
| stddev| 0.3728852122772358|
|    min|                  0|
|    max|                  1|
+-------+-------------------+



In [26]:
train_data.describe().show()

+-------+-------------------+
|summary|              Churn|
+-------+-------------------+
|  count|                618|
|   mean|0.16828478964401294|
| stddev|0.37442204382160843|
|    min|                  0|
|    max|                  1|
+-------+-------------------+



In [27]:
test_data.describe().show()

+-------+-------------------+
|summary|              Churn|
+-------+-------------------+
|  count|                282|
|   mean|0.16312056737588654|
| stddev|0.37013248518085234|
|    min|                  0|
|    max|                  1|
+-------+-------------------+



### Creating a logistic regression model object

In [28]:
val lor = new LogisticRegression().setLabelCol("Churn").setFeaturesCol("features")

lor: org.apache.spark.ml.classification.LogisticRegression = logreg_d07b9689afac


### Creating a logistic regression model and fitting the training data to it

In [29]:
val churnModel = lor.fit(train_data)

2019-12-29 18:07:38 WARN  BLAS:61 - Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
2019-12-29 18:07:38 WARN  BLAS:61 - Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS


churnModel: org.apache.spark.ml.classification.LogisticRegressionModel = logreg_d07b9689afac


### Getting Results on Test Set

In [30]:
val results = churnModel.transform(test_data)

results: org.apache.spark.sql.DataFrame = [Churn: int, features: vector ... 3 more fields]


In [31]:
results.show(5)

+-----+--------------------+--------------------+--------------------+----------+
|Churn|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|    0|[22.0,11254.38,1....|[5.27307477247574...|[0.99489834095603...|       0.0|
|    0|[25.0,9672.03,0.0...|[4.81891927445291...|[0.99198918160809...|       0.0|
|    0|[26.0,8939.61,0.0...|[6.36659522502375...|[0.99828494777962...|       0.0|
|    0|[29.0,10203.18,1....|[4.29595666957292...|[0.98655957383690...|       0.0|
|    0|[29.0,13255.05,1....|[4.70098194326566...|[0.99099546789519...|       0.0|
+-----+--------------------+--------------------+--------------------+----------+
only showing top 5 rows



## MODEL EVALUATION

### 1) Converting the data to rdd and evaluating using MulticlassMetrics to print the confusion matrix

In [32]:
import org.apache.spark.mllib.evaluation.MulticlassMetrics

import org.apache.spark.mllib.evaluation.MulticlassMetrics


In [33]:
val clean_result = results.withColumn("Churn",results("Churn").cast("double"))

clean_result: org.apache.spark.sql.DataFrame = [Churn: double, features: vector ... 3 more fields]


In [34]:
clean_result.select("Churn","prediction").show(5)

+-----+----------+
|Churn|prediction|
+-----+----------+
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
+-----+----------+
only showing top 5 rows



In [35]:
val predictionAndLabel = clean_result.select("Churn","prediction").as[(Double,Double)].rdd

predictionAndLabel: org.apache.spark.rdd.RDD[(Double, Double)] = MapPartitionsRDD[196] at rdd at <console>:43


In [36]:
val metrics = new MulticlassMetrics(predictionAndLabel)

metrics: org.apache.spark.mllib.evaluation.MulticlassMetrics = org.apache.spark.mllib.evaluation.MulticlassMetrics@2d3ff5cb


#### Printing the confusion matrix

In [37]:
println(metrics.confusionMatrix)

224.0  20.0  
12.0   26.0  


#### Printing the Accuracy

In [38]:
println(metrics.accuracy)

0.8865248226950354


#### Recall

In [39]:
println(metrics.recall)

0.8865248226950354


#### precision

In [40]:
println(metrics.precision)

0.8865248226950354


### 2) Evaluating using BinaryClassificationEvaluator

In [41]:
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator

import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator


In [42]:
val bin_eval = new BinaryClassificationEvaluator().setRawPredictionCol("rawPrediction").setLabelCol("Churn")

bin_eval: org.apache.spark.ml.evaluation.BinaryClassificationEvaluator = binEval_d99aaea2d3e1


#### Calculating Area Under ROC

In [43]:
val AOC =bin_eval.evaluate(results)

AOC: Double = 0.9057663964627868


### 3) Evaluating using MulticlassClassificationEvaluator

In [44]:
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator

import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator


In [45]:
val multi_eval = new MulticlassClassificationEvaluator().setPredictionCol("prediction").setLabelCol("Churn")

multi_eval: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = mcEval_cc606ec67b84


In [46]:
val AOC_2 = multi_eval.evaluate(results)

AOC_2: Double = 0.8820668693009119


#### Printing Area Under ROC

In [47]:
println(AOC_2)

0.8820668693009119


### Predict on brand new unlabeled data

In [48]:
val final_lr_Model = lor.fit(final_data)

2019-12-29 18:08:22 ERROR LBFGS:27 - Failure! Resetting history: breeze.optimize.FirstOrderException: Line search failed
2019-12-29 18:08:22 ERROR LBFGS:27 - Failure! Resetting history: breeze.optimize.FirstOrderException: Line search failed
2019-12-29 18:08:23 ERROR LBFGS:27 - Failure! Resetting history: breeze.optimize.FirstOrderException: Line search failed
2019-12-29 18:08:23 ERROR LBFGS:27 - Failure! Resetting history: breeze.optimize.FirstOrderException: Line search failed
2019-12-29 18:08:24 ERROR LBFGS:27 - Failure! Resetting history: breeze.optimize.FirstOrderException: Line search failed
2019-12-29 18:08:24 ERROR LBFGS:27 - Failure! Resetting history: breeze.optimize.FirstOrderException: Line search failed
2019-12-29 18:08:25 ERROR LBFGS:27 - Failure! Resetting history: breeze.optimize.FirstOrderException: Line search failed
2019-12-29 18:08:26 ERROR LBFGS:27 - Failure! Resetting history: breeze.optimize.FirstOrderException: Line search failed
2019-12-29 18:08:26 ERROR LBFGS:

2019-12-29 18:08:49 ERROR LBFGS:27 - Failure! Resetting history: breeze.optimize.FirstOrderException: Line search failed
2019-12-29 18:08:50 ERROR LBFGS:27 - Failure! Resetting history: breeze.optimize.FirstOrderException: Line search failed
2019-12-29 18:08:50 ERROR LBFGS:27 - Failure! Resetting history: breeze.optimize.FirstOrderException: Line search failed
2019-12-29 18:08:50 ERROR LBFGS:27 - Failure! Resetting history: breeze.optimize.FirstOrderException: Line search failed
2019-12-29 18:08:51 ERROR LBFGS:27 - Failure! Resetting history: breeze.optimize.FirstOrderException: Line search failed
2019-12-29 18:08:51 ERROR LBFGS:27 - Failure! Resetting history: breeze.optimize.FirstOrderException: Line search failed
2019-12-29 18:08:51 ERROR LBFGS:27 - Failure! Resetting history: breeze.optimize.FirstOrderException: Line search failed
2019-12-29 18:08:52 ERROR LBFGS:27 - Failure! Resetting history: breeze.optimize.FirstOrderException: Line search failed
2019-12-29 18:08:52 ERROR LBFGS:

final_lr_Model: org.apache.spark.ml.classification.LogisticRegressionModel = logreg_d07b9689afac


In [56]:
val new_customers  = spark.read.options(Map(("header","true"),("inferSchema","true"))).csv("new_customers.csv")

new_customers: org.apache.spark.sql.DataFrame = [Names: string, Age: double ... 7 more fields]


In [57]:
var new_customers_1 = new_customers.withColumn("Onboard_Year",year($"Onboard_date"))

new_customers_1: org.apache.spark.sql.DataFrame = [Names: string, Age: double ... 8 more fields]


In [58]:
val test_new_customers  = assembler.transform(new_customers_1)

test_new_customers: org.apache.spark.sql.DataFrame = [Names: string, Age: double ... 9 more fields]


In [59]:
test_new_customers.head(1)

res27: Array[org.apache.spark.sql.Row] = Array([Andrew Mccall,37.0,9935.53,1,7.71,8.0,2011-08-29 18:37:54.0,38612 Johnny Stravenue Nataliebury, WI 15717-8316,King Ltd,2011,[37.0,9935.53,1.0,7.71,8.0,2011.0]])


In [60]:
test_new_customers.show(3)

+--------------+----+--------------+---------------+-----+---------+-------------------+--------------------+----------------+------------+--------------------+
|         Names| Age|Total_Purchase|Account_Manager|Years|Num_Sites|       Onboard_date|            Location|         Company|Onboard_Year|            features|
+--------------+----+--------------+---------------+-----+---------+-------------------+--------------------+----------------+------------+--------------------+
| Andrew Mccall|37.0|       9935.53|              1| 7.71|      8.0|2011-08-29 18:37:54|38612 Johnny Stra...|        King Ltd|        2011|[37.0,9935.53,1.0...|
|Michele Wright|23.0|       7526.94|              1| 9.28|     15.0|2013-07-22 18:19:54|21083 Nicole Junc...|   Cannon-Benson|        2013|[23.0,7526.94,1.0...|
|  Jeremy Chang|65.0|         100.0|              1|  1.0|     15.0|2006-12-11 07:48:13|085 Austin Views ...|Barron-Robertson|        2006|[65.0,100.0,1.0,1...|
+--------------+----+-------------

In [61]:
val final_results = final_lr_Model.transform(test_new_customers)

final_results: org.apache.spark.sql.DataFrame = [Names: string, Age: double ... 12 more fields]


In [62]:
final_results.select("Company","prediction").show(10)

+----------------+----------+
|         Company|prediction|
+----------------+----------+
|        King Ltd|       0.0|
|   Cannon-Benson|       0.0|
|Barron-Robertson|       0.0|
|   Sexton-Golden|       0.0|
|        Wood LLC|       0.0|
|   Parks-Robbins|       0.0|
+----------------+----------+



 So based on above predictions no need to assign Account Managers to above Companies

### Closing spark session

In [63]:
spark.stop()

## Thank You!