## A marketing agency has many customers that use their service to produce ads for the client/customer websites. They've noticed that they have quite a bit of churn in clients. They basically randomly assign account managers right now. Thus, predicting which customers will Churn(stop buying the service), which will help the agency to assign them an account manager.


In [81]:
# Initialize pyspark
import findspark
findspark.init()
import pyspark

In [82]:
# Initialize and create a spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Churn').getOrCreate()

In [83]:
# Import statements to setup ML
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.linalg import Vectors

In [84]:
# Using Spark to read the customer churn data set
data = spark.read.csv('customer_churn.csv', header=True, inferSchema=True)

In [85]:
# Printing the first row of the dataframe
data.head()

Row(Names='Cameron Williams', Age=42.0, Total_Purchase=11066.8, Account_Manager=0, Years=7.22, Num_Sites=8.0, Onboard_date=datetime.datetime(2013, 8, 30, 7, 0, 40), Location='10265 Elizabeth Mission Barkerburgh, AK 89518', Company='Harvey LLC', Churn=1)

In [86]:
# Printing the schema of the dataframe
data.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: integer (nullable = true)



In [87]:
data.count()

900

__Checking out whether the string columns "Names", "Location" and "Company"are useful or not (to check whether they are categorical columns or not)__

In [88]:
data.groupBy('Names').count().count()

899

In [89]:
data.groupBy('Location').count().count()

900

In [90]:
data.groupBy('Company').count().count()

873

Ignoring the categorical columns since they are not useful and checking out timestamp column "Onboard_date"

In [91]:
data.select('Onboard_date').show(3)

+-------------------+
|       Onboard_date|
+-------------------+
|2013-08-30 07:00:40|
|2013-08-13 00:38:46|
|2016-06-29 06:20:07|
+-------------------+
only showing top 3 rows



In [92]:
from pyspark.sql.functions import year

filtered_data = data.withColumn('Onboard_Year', year(data['Onboard_date']))

In [93]:
filtered_data.columns

['Names',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites',
 'Onboard_date',
 'Location',
 'Company',
 'Churn',
 'Onboard_Year']

In [94]:
filtered_data = filtered_data.select('Age','Total_Purchase','Account_Manager','Years','Num_Sites'
                                     ,'Churn','Onboard_Year')

In [95]:
filtered_data.groupBy('Onboard_Year').count().count()

11

In [96]:
filtered_data.show(3)

+----+--------------+---------------+-----+---------+-----+------------+
| Age|Total_Purchase|Account_Manager|Years|Num_Sites|Churn|Onboard_Year|
+----+--------------+---------------+-----+---------+-----+------------+
|42.0|       11066.8|              0| 7.22|      8.0|    1|        2013|
|41.0|      11916.22|              0|  6.5|     11.0|    1|        2013|
|38.0|      12884.75|              0| 6.67|     12.0|    1|        2016|
+----+--------------+---------------+-----+---------+-----+------------+
only showing top 3 rows



In [97]:
#Assembling all the features to a single vector column "features"

assembler = VectorAssembler(inputCols=['Age','Total_Purchase','Account_Manager','Years','Num_Sites'
                                       ,'Onboard_Year'], outputCol='features')

In [98]:
output = assembler.transform(filtered_data)

In [99]:
final_data = output.select('Churn','features')

In [100]:
final_data.show(3, truncate=False)

+-----+------------------------------------+
|Churn|features                            |
+-----+------------------------------------+
|1    |[42.0,11066.8,0.0,7.22,8.0,2013.0]  |
|1    |[41.0,11916.22,0.0,6.5,11.0,2013.0] |
|1    |[38.0,12884.75,0.0,6.67,12.0,2016.0]|
+-----+------------------------------------+
only showing top 3 rows



__Splitting the resultant data into training data and testing data, Training data is to train the model, Testing data is to test the builted model__

In [101]:
train_data,test_data = final_data.randomSplit([0.7,0.3])

In [102]:
train_data.count()

649

In [103]:
test_data.count()

251

In [104]:
#Creating a logistic regression model object
lor = LogisticRegression(labelCol='Churn', featuresCol='features')

In [105]:
# Creating a logistic regression model and fitting the training data to it
churnModel = lor.fit(train_data)

In [106]:
#Getting Results on Test Set
results = churnModel.transform(test_data)

In [107]:
results.printSchema()

root
 |-- Churn: integer (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [108]:
results.show(3)

+-----+--------------------+--------------------+--------------------+----------+
|Churn|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|    0|[22.0,11254.38,1....|[4.85881251886140...|[0.99230005628738...|       0.0|
|    0|[28.0,11128.95,1....|[4.34452597243059...|[0.98718860337318...|       0.0|
|    0|[30.0,6744.87,0.0...|[3.70838685723937...|[0.97606966024172...|       0.0|
+-----+--------------------+--------------------+--------------------+----------+
only showing top 3 rows



### MODEL EVALUATION

__1) Converting the data to rdd and evaluating using MulticlassMetrics to print the confusion matrix__

In [109]:
from pyspark.mllib.evaluation import MulticlassMetrics

In [110]:
clean_result = results.withColumn('Churn',output['Churn'].cast('double'))

In [111]:
clean_result.select('prediction','Churn').show(3)

+----------+-----+
|prediction|Churn|
+----------+-----+
|       0.0|  0.0|
|       0.0|  0.0|
|       0.0|  0.0|
+----------+-----+
only showing top 3 rows



In [112]:
predictionAndLabel = clean_result.select('prediction','Churn').rdd

In [113]:
metrics = MulticlassMetrics(predictionAndLabel)

In [114]:
#Printing the confusion matrix
print(metrics.confusionMatrix())

DenseMatrix([[191.,   8.],
             [ 22.,  30.]])


In [115]:
#Printing the Accuracy
print(metrics.accuracy)

0.8804780876494024


In [116]:
metrics.recall()

0.8804780876494024

In [117]:
metrics.precision()

0.8804780876494024

__2) Evaluating using BinaryClassificationEvaluator__

In [118]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [119]:
bin_eval = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction', labelCol='Churn')

In [120]:
#Calculating Area Under ROC
AOC = bin_eval.evaluate(results)

In [121]:
#Printing Area Under ROC
print(AOC)

0.9172787011982998


__3) Evaluating using MulticlassClassificationEvaluator__

In [122]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [123]:
multi_eval = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='Churn')

In [124]:
#Calculating Area Under ROC
AOC_2 = multi_eval.evaluate(results)

In [125]:
#Printing Area Under ROC
print(AOC_2)

0.8732126510140666


### Predict on brand new unlabeled data

In [126]:
final_lr_Model = lor.fit(final_data)

In [127]:
new_customers = spark.read.csv('new_customers.csv',header=True,inferSchema=True)

In [128]:
new_customers.count()

6

In [129]:
new_customers.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)



In [130]:
new_customers_1 = (new_customers.withColumn("Onboard_Year",year(new_customers['Onboard_date']))
                   .drop('Onboard_date'))

In [131]:
test_new_customers = assembler.transform(new_customers_1)

In [132]:
test_new_customers.head()

Row(Names='Andrew Mccall', Age=37.0, Total_Purchase=9935.53, Account_Manager=1, Years=7.71, Num_Sites=8.0, Location='38612 Johnny Stravenue Nataliebury, WI 15717-8316', Company='King Ltd', Onboard_Year=2011, features=DenseVector([37.0, 9935.53, 1.0, 7.71, 8.0, 2011.0]))

In [133]:
final_results = final_lr_Model.transform(test_new_customers)

In [134]:
final_results.select('Company','prediction').show(10)

+----------------+----------+
|         Company|prediction|
+----------------+----------+
|        King Ltd|       0.0|
|   Cannon-Benson|       0.0|
|Barron-Robertson|       0.0|
|   Sexton-Golden|       0.0|
|        Wood LLC|       0.0|
|   Parks-Robbins|       0.0|
+----------------+----------+



So based on above predictions no need to assign Account Managers to above Companies

In [None]:
#Closing spark session
spark.stop()