# Logistic Regression to predict customer churn

In [1]:
import findspark
findspark.init('/home/ubuntu/spark-2.2.0-bin-hadoop2.7/')

### Create a spark session

In order to use the DataFrame API and be able to execute SQL over tables.

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName('logregchurn').getOrCreate()

### Import and explore data

In [4]:
data = spark.read.csv('customer_churn.csv',inferSchema=True,
                     header=True)

In [13]:
data.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: integer (nullable = true)



In [14]:
data.describe().show()

+-------+-------------+-----------------+-----------------+------------------+-----------------+------------------+--------------------+--------------------+-------------------+
|summary|        Names|              Age|   Total_Purchase|   Account_Manager|            Years|         Num_Sites|            Location|             Company|              Churn|
+-------+-------------+-----------------+-----------------+------------------+-----------------+------------------+--------------------+--------------------+-------------------+
|  count|          900|              900|              900|               900|              900|               900|                 900|                 900|                900|
|   mean|         null|41.81666666666667|10062.82403333334|0.4811111111111111| 5.27315555555555| 8.587777777777777|                null|                null|0.16666666666666666|
| stddev|         null|6.127560416916251|2408.644531858096|0.4999208935073339|1.274449013194616|1.764835592035

In [15]:
data.describe(data.columns[:6]).show()

+-------+-------------+-----------------+-----------------+------------------+-----------------+------------------+
|summary|        Names|              Age|   Total_Purchase|   Account_Manager|            Years|         Num_Sites|
+-------+-------------+-----------------+-----------------+------------------+-----------------+------------------+
|  count|          900|              900|              900|               900|              900|               900|
|   mean|         null|41.81666666666667|10062.82403333334|0.4811111111111111| 5.27315555555555| 8.587777777777777|
| stddev|         null|6.127560416916251|2408.644531858096|0.4999208935073339|1.274449013194616|1.7648355920350969|
|    min|   Aaron King|             22.0|            100.0|                 0|              1.0|               3.0|
|    max|Zachary Walsh|             65.0|         18026.01|                 1|             9.15|              14.0|
+-------+-------------+-----------------+-----------------+-------------

'Onboard_date' cannot be presented because it's a timestamp

In [16]:
data.describe(data.columns[7:]).show()

+-------+--------------------+--------------------+-------------------+
|summary|            Location|             Company|              Churn|
+-------+--------------------+--------------------+-------------------+
|  count|                 900|                 900|                900|
|   mean|                null|                null|0.16666666666666666|
| stddev|                null|                null| 0.3728852122772358|
|    min|00103 Jeffrey Cre...|     Abbott-Thompson|                  0|
|    max|Unit 9800 Box 287...|Zuniga, Clark and...|                  1|
+-------+--------------------+--------------------+-------------------+



In [17]:
data.columns

['Names',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites',
 'Onboard_date',
 'Location',
 'Company',
 'Churn']

### Choose features to feed into the model

In [11]:
from pyspark.ml.feature import VectorAssembler

In [18]:
assembler = VectorAssembler(inputCols=['Age',
                                       'Total_Purchase',
                                       'Account_Manager',
                                       'Years',
                                       'Num_Sites'],outputCol='features')

In [19]:
output = assembler.transform(data)

In [20]:
output

DataFrame[Names: string, Age: double, Total_Purchase: double, Account_Manager: int, Years: double, Num_Sites: double, Onboard_date: timestamp, Location: string, Company: string, Churn: int, features: vector]

### Prepare dataset to be used for model

In [21]:
final_data = output.select('features','churn')

In [22]:
final_data

DataFrame[features: vector, churn: int]

In [23]:
final_data.show()

+--------------------+-----+
|            features|churn|
+--------------------+-----+
|[42.0,11066.8,0.0...|    1|
|[41.0,11916.22,0....|    1|
|[38.0,12884.75,0....|    1|
|[42.0,8010.76,0.0...|    1|
|[37.0,9191.58,0.0...|    1|
|[48.0,10356.02,0....|    1|
|[44.0,11331.58,1....|    1|
|[32.0,9885.12,1.0...|    1|
|[43.0,14062.6,1.0...|    1|
|[40.0,8066.94,1.0...|    1|
|[30.0,11575.37,1....|    1|
|[45.0,8771.02,1.0...|    1|
|[45.0,8988.67,1.0...|    1|
|[40.0,8283.32,1.0...|    1|
|[41.0,6569.87,1.0...|    1|
|[38.0,10494.82,1....|    1|
|[45.0,8213.41,1.0...|    1|
|[43.0,11226.88,0....|    1|
|[53.0,5515.09,0.0...|    1|
|[46.0,8046.4,1.0,...|    1|
+--------------------+-----+
only showing top 20 rows



### Test Train Split

In [24]:
train_churn,test_churn = final_data.randomSplit([0.7,0.3])

### Fit the model

In [25]:
from pyspark.ml.classification import LogisticRegression

In [26]:
lr_churn = LogisticRegression(labelCol='churn')

In [27]:
fitted_churn_model = lr_churn.fit(train_churn)

In [28]:
training_sum = fitted_churn_model.summary

In [29]:
training_sum.predictions.describe().show()

+-------+-------------------+-------------------+
|summary|              churn|         prediction|
+-------+-------------------+-------------------+
|  count|                616|                616|
|   mean|0.17857142857142858|0.13636363636363635|
| stddev|  0.383304296228922|  0.343453182678216|
|    min|                0.0|                0.0|
|    max|                1.0|                1.0|
+-------+-------------------+-------------------+



### Evaluation

In [30]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [31]:
pred_and_labels = fitted_churn_model.evaluate(test_churn)

In [32]:
pred_and_labels.predictions.show()

+--------------------+-----+--------------------+--------------------+----------+
|            features|churn|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[28.0,11204.23,0....|    0|[1.78424394084410...|[0.85622011595886...|       0.0|
|[28.0,11245.38,0....|    0|[3.64701622036996...|[0.97459351996191...|       0.0|
|[29.0,10203.18,1....|    0|[3.52182699114504...|[0.97130247345917...|       0.0|
|[30.0,7960.64,1.0...|    1|[2.79342652850077...|[0.94231957070896...|       0.0|
|[30.0,8677.28,1.0...|    0|[3.78878904893553...|[0.97787749637708...|       0.0|
|[30.0,8874.83,0.0...|    0|[2.95536189109807...|[0.95051629543120...|       0.0|
|[30.0,10183.98,1....|    0|[2.65478834091182...|[0.93430550469581...|       0.0|
|[30.0,10744.14,1....|    1|[1.51070653810753...|[0.81916589215399...|       0.0|
|[30.0,10960.52,1....|    0|[2.20099342307338...|[0.90033868513463...|       0.0|
|[31.0,9574.89,0

### Area under the ROC curve

In [33]:
churn_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction',
                                           labelCol='churn')

In [34]:
auc = churn_eval.evaluate(pred_and_labels.predictions)

In [35]:
auc

0.733811475409836