In [1]:
# Load spark packages
from pyspark.sql import SparkSession
spark  = SparkSession.builder.appName("bcc").getOrCreate()

In [2]:
# Load data
df = spark.read.csv("dataset/logistic_reg/customer_churn.csv",inferSchema=True,header=True)

In [3]:
df.printSchema() # Data schema

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: integer (nullable = true)



In [4]:
df.columns

['Names',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites',
 'Onboard_date',
 'Location',
 'Company',
 'Churn']

In [5]:
df.show(5)

+----------------+----+--------------+---------------+-----+---------+-------------------+--------------------+--------------------+-----+
|           Names| Age|Total_Purchase|Account_Manager|Years|Num_Sites|       Onboard_date|            Location|             Company|Churn|
+----------------+----+--------------+---------------+-----+---------+-------------------+--------------------+--------------------+-----+
|Cameron Williams|42.0|       11066.8|              0| 7.22|      8.0|2013-08-30 07:00:40|10265 Elizabeth M...|          Harvey LLC|    1|
|   Kevin Mueller|41.0|      11916.22|              0|  6.5|     11.0|2013-08-13 00:38:46|6157 Frank Garden...|          Wilson PLC|    1|
|     Eric Lozano|38.0|      12884.75|              0| 6.67|     12.0|2016-06-29 06:20:07|1331 Keith Court ...|Miller, Johnson a...|    1|
|   Phillip White|42.0|       8010.76|              0| 6.71|     10.0|2014-04-22 12:43:12|13120 Daniel Moun...|           Smith Inc|    1|
|  Cynthia Norton|37.0|    

In [6]:
from pyspark.ml.feature import VectorAssembler

In [7]:
# Changes important features into single feature
assembler = VectorAssembler(inputCols=["Age", "Total_Purchase","Account_Manager",
                                      "Years", "Num_Sites"], outputCol="features")

In [8]:
output = assembler.transform(df)

In [9]:
final_data = output.select("features", "churn")
final_data.show()

+--------------------+-----+
|            features|churn|
+--------------------+-----+
|[42.0,11066.8,0.0...|    1|
|[41.0,11916.22,0....|    1|
|[38.0,12884.75,0....|    1|
|[42.0,8010.76,0.0...|    1|
|[37.0,9191.58,0.0...|    1|
|[48.0,10356.02,0....|    1|
|[44.0,11331.58,1....|    1|
|[32.0,9885.12,1.0...|    1|
|[43.0,14062.6,1.0...|    1|
|[40.0,8066.94,1.0...|    1|
|[30.0,11575.37,1....|    1|
|[45.0,8771.02,1.0...|    1|
|[45.0,8988.67,1.0...|    1|
|[40.0,8283.32,1.0...|    1|
|[41.0,6569.87,1.0...|    1|
|[38.0,10494.82,1....|    1|
|[45.0,8213.41,1.0...|    1|
|[43.0,11226.88,0....|    1|
|[53.0,5515.09,0.0...|    1|
|[46.0,8046.4,1.0,...|    1|
+--------------------+-----+
only showing top 20 rows



#### Train Test Split

In [10]:
# Split the data
train_chrun, test_chrun = final_data.randomSplit([0.7,0.3])

In [11]:
from pyspark.ml.classification import LogisticRegression

In [12]:
lr_chrun = LogisticRegression(labelCol='churn', featuresCol='features')
fitted_chrun_model = lr_chrun.fit(train_chrun) # train on training data

In [13]:
# Training data summary
training_summary = fitted_chrun_model.summary

In [14]:
# Training data summary
training_summary.predictions.show(20)

+--------------------+-----+--------------------+--------------------+----------+
|            features|churn|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[26.0,8939.61,0.0...|  0.0|[6.30325761966255...|[0.99817301146911...|       0.0|
|[28.0,9090.43,1.0...|  0.0|[1.61887563357416...|[0.83464000756161...|       0.0|
|[28.0,11128.95,1....|  0.0|[4.30875974376663...|[0.98672828650509...|       0.0|
|[28.0,11204.23,0....|  0.0|[1.76079283942151...|[0.85330892993094...|       0.0|
|[28.0,11245.38,0....|  0.0|[3.70197773353910...|[0.97591950033795...|       0.0|
|[29.0,5900.78,1.0...|  0.0|[4.19780832907963...|[0.98519403292242...|       0.0|
|[29.0,9617.59,0.0...|  0.0|[4.38699234555641...|[0.98771472281733...|       0.0|
|[29.0,10203.18,1....|  0.0|[3.90199467866712...|[0.98019844699090...|       0.0|
|[29.0,11274.46,1....|  0.0|[4.63256417969285...|[0.99036397816645...|       0.0|
|[29.0,13240.01,

In [15]:
training_summary.accuracy

0.9014308426073132

In [16]:
training_summary.areaUnderROC, training_summary.precisionByLabel, training_summary.recallByLabel

(0.9045497185741093,
 [0.9212880143112702, 0.7428571428571429],
 [0.9662288930581614, 0.5416666666666666])

In [17]:
training_summary.recallByThreshold.collect()[-5:]

[Row(threshold=0.0011722760236683913, recall=1.0),
 Row(threshold=0.0009831207555033097, recall=1.0),
 Row(threshold=0.000680805625146748, recall=1.0),
 Row(threshold=0.00038650598081058367, recall=1.0),
 Row(threshold=4.580928904338428e-05, recall=1.0)]

In [18]:
# Model Evaluation on test dataset
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [19]:
test_chrun.show()

+--------------------+-----+
|            features|churn|
+--------------------+-----+
|[22.0,11254.38,1....|    0|
|[25.0,9672.03,0.0...|    0|
|[26.0,8787.39,1.0...|    1|
|[27.0,8628.8,1.0,...|    0|
|[28.0,8670.98,0.0...|    0|
|[29.0,8688.17,1.0...|    1|
|[29.0,9378.24,0.0...|    0|
|[29.0,12711.15,0....|    0|
|[30.0,7960.64,1.0...|    1|
|[30.0,10183.98,1....|    0|
|[31.0,7073.61,0.0...|    0|
|[31.0,8688.21,0.0...|    0|
|[31.0,8829.83,1.0...|    0|
|[31.0,11297.57,1....|    1|
|[31.0,11743.24,0....|    0|
|[32.0,5756.12,0.0...|    0|
|[32.0,8617.98,1.0...|    1|
|[32.0,9885.12,1.0...|    1|
|[32.0,12142.99,0....|    0|
|[32.0,13630.93,0....|    0|
+--------------------+-----+
only showing top 20 rows



In [20]:
# class ensemble problem
final_data.groupBy("churn").count().show()

+-----+-----+
|churn|count|
+-----+-----+
|    1|  150|
|    0|  750|
+-----+-----+



In [21]:
test_chrun.count()

271

In [22]:
pred_and_labels = fitted_chrun_model.evaluate(test_chrun)

In [23]:
pred_and_labels.predictions.show()

+--------------------+-----+--------------------+--------------------+----------+
|            features|churn|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[22.0,11254.38,1....|    0|[4.70781597339279...|[0.99105624684581...|       0.0|
|[25.0,9672.03,0.0...|    0|[4.59509253923913...|[0.98999972961851...|       0.0|
|[26.0,8787.39,1.0...|    1|[0.70191352898745...|[0.66861188955407...|       0.0|
|[27.0,8628.8,1.0,...|    0|[5.55972131896505...|[0.99616491536915...|       0.0|
|[28.0,8670.98,0.0...|    0|[7.72139023797288...|[0.99955695241738...|       0.0|
|[29.0,8688.17,1.0...|    1|[2.81229360954136...|[0.94333654357226...|       0.0|
|[29.0,9378.24,0.0...|    0|[4.70503846244534...|[0.99103159404016...|       0.0|
|[29.0,12711.15,0....|    0|[5.34902811017450...|[0.99526971402916...|       0.0|
|[30.0,7960.64,1.0...|    1|[3.22335011836388...|[0.96170358969767...|       0.0|
|[30.0,10183.98,

#### using AUC

In [24]:
churn_eval = BinaryClassificationEvaluator(rawPredictionCol= 'prediction', labelCol="churn")

In [25]:
auc = churn_eval.evaluate(pred_and_labels.predictions)

In [26]:
auc

0.7431302270011948

### Prediction on brand new unlabeled data

In [27]:
# all data feed into model
final_lr_model = lr_chrun.fit(final_data)

In [28]:
data = spark.read.csv("new_customers.csv", inferSchema=True, header=True)

In [29]:
data.describe().show()

+-------+-------------+------------------+-----------------+------------------+-----------------+------------------+--------------------+----------------+
|summary|        Names|               Age|   Total_Purchase|   Account_Manager|            Years|         Num_Sites|            Location|         Company|
+-------+-------------+------------------+-----------------+------------------+-----------------+------------------+--------------------+----------------+
|  count|            6|                 6|                6|                 6|                6|                 6|                   6|               6|
|   mean|         null|35.166666666666664|7607.156666666667|0.8333333333333334|6.808333333333334|12.333333333333334|                null|            null|
| stddev|         null| 15.71517313511584|4346.008232825459| 0.408248290463863|3.708737880555414|3.3862466931200785|                null|            null|
|    min|Andrew Mccall|              22.0|            100.0|          

In [30]:
data.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)



In [31]:
new_customer_data = assembler.transform(data)

In [32]:
new_customer_data.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- features: vector (nullable = true)



In [33]:
final_results = final_lr_model.transform(new_customer_data)

In [34]:
final_results.select("Company","prediction").show()

+----------------+----------+
|         Company|prediction|
+----------------+----------+
|        King Ltd|       0.0|
|   Cannon-Benson|       1.0|
|Barron-Robertson|       1.0|
|   Sexton-Golden|       1.0|
|        Wood LLC|       0.0|
|   Parks-Robbins|       1.0|
+----------------+----------+

