In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('RegLog').getOrCreate()

In [6]:
df = spark.read.csv('Logistic_Regression/customer_churn.csv' , inferSchema=True , header = True)

In [7]:
df.show()

+-------------------+----+--------------+---------------+-----+---------+-------------------+--------------------+--------------------+-----+
|              Names| Age|Total_Purchase|Account_Manager|Years|Num_Sites|       Onboard_date|            Location|             Company|Churn|
+-------------------+----+--------------+---------------+-----+---------+-------------------+--------------------+--------------------+-----+
|   Cameron Williams|42.0|       11066.8|              0| 7.22|      8.0|2013-08-30 07:00:40|10265 Elizabeth M...|          Harvey LLC|    1|
|      Kevin Mueller|41.0|      11916.22|              0|  6.5|     11.0|2013-08-13 00:38:46|6157 Frank Garden...|          Wilson PLC|    1|
|        Eric Lozano|38.0|      12884.75|              0| 6.67|     12.0|2016-06-29 06:20:07|1331 Keith Court ...|Miller, Johnson a...|    1|
|      Phillip White|42.0|       8010.76|              0| 6.71|     10.0|2014-04-22 12:43:12|13120 Daniel Moun...|           Smith Inc|    1|
|     

In [8]:
df.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: integer (nullable = true)



In [9]:
# To check for the missing data with describe (Cheching column counts)
df.describe().show()

+-------+-------------+-----------------+-----------------+------------------+-----------------+------------------+-------------------+--------------------+--------------------+-------------------+
|summary|        Names|              Age|   Total_Purchase|   Account_Manager|            Years|         Num_Sites|       Onboard_date|            Location|             Company|              Churn|
+-------+-------------+-----------------+-----------------+------------------+-----------------+------------------+-------------------+--------------------+--------------------+-------------------+
|  count|          900|              900|              900|               900|              900|               900|                900|                 900|                 900|                900|
|   mean|         null|41.81666666666667|10062.82403333334|0.4811111111111111| 5.27315555555555| 8.587777777777777|               null|                null|                null|0.16666666666666666|
| stddev| 

In [10]:
df.columns

['Names',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites',
 'Onboard_date',
 'Location',
 'Company',
 'Churn']

In [11]:
# Assembling the features into vector assembler
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites',],outputCol='features')
output = assembler.transform(df)

In [13]:
final_data = output.select('features','Churn')
final_data.show()

+--------------------+-----+
|            features|Churn|
+--------------------+-----+
|[42.0,11066.8,0.0...|    1|
|[41.0,11916.22,0....|    1|
|[38.0,12884.75,0....|    1|
|[42.0,8010.76,0.0...|    1|
|[37.0,9191.58,0.0...|    1|
|[48.0,10356.02,0....|    1|
|[44.0,11331.58,1....|    1|
|[32.0,9885.12,1.0...|    1|
|[43.0,14062.6,1.0...|    1|
|[40.0,8066.94,1.0...|    1|
|[30.0,11575.37,1....|    1|
|[45.0,8771.02,1.0...|    1|
|[45.0,8988.67,1.0...|    1|
|[40.0,8283.32,1.0...|    1|
|[41.0,6569.87,1.0...|    1|
|[38.0,10494.82,1....|    1|
|[45.0,8213.41,1.0...|    1|
|[43.0,11226.88,0....|    1|
|[53.0,5515.09,0.0...|    1|
|[46.0,8046.4,1.0,...|    1|
+--------------------+-----+
only showing top 20 rows



In [15]:
train_data , test_data = final_data.randomSplit([0.7,0.3])
train_data.describe().show()

+-------+-------------------+
|summary|              Churn|
+-------+-------------------+
|  count|                636|
|   mean|0.15723270440251572|
| stddev|0.36430654557557735|
|    min|                  0|
|    max|                  1|
+-------+-------------------+



In [16]:
from pyspark.ml.classification import LogisticRegression
lr_churn = LogisticRegression(labelCol='Churn')

In [17]:
# fitting above model to the train data
Fitted_model = lr_churn.fit(train_data)

In [18]:
training_summary = Fitted_model.summary

In [21]:
training_summary.predictions.describe().show()



+-------+-------------------+-------------------+
|summary|              Churn|         prediction|
+-------+-------------------+-------------------+
|  count|                636|                636|
|   mean|0.15723270440251572|0.11477987421383648|
| stddev|0.36430654557557735| 0.3190069956589578|
|    min|                0.0|                0.0|
|    max|                1.0|                1.0|
+-------+-------------------+-------------------+



In [24]:
# Evaluating against the test data using the Ml library
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [25]:
training_summary.predictions.show()



+--------------------+-----+--------------------+--------------------+----------+
|            features|Churn|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[26.0,8787.39,1.0...|  1.0|[1.21438148479030...|[0.77107328400543...|       0.0|
|[26.0,8939.61,0.0...|  0.0|[6.77227997216392...|[0.99885622879079...|       0.0|
|[27.0,8628.8,1.0,...|  0.0|[6.31513889646644...|[0.99819455102268...|       0.0|
|[28.0,8670.98,0.0...|  0.0|[8.16208755795948...|[0.99971481509070...|       0.0|
|[28.0,9090.43,1.0...|  0.0|[2.15793092712944...|[0.89640756981476...|       0.0|
|[28.0,11204.23,0....|  0.0|[1.79113348423115...|[0.85706618878080...|       0.0|
|[28.0,11245.38,0....|  0.0|[4.16347568468806...|[0.98468479840173...|       0.0|
|[29.0,8688.17,1.0...|  1.0|[3.39316165483084...|[0.96749013525477...|       0.0|
|[29.0,9378.24,0.0...|  0.0|[5.03227134575067...|[0.99351831085625...|       0.0|
|[29.0,9617.59,0

In [27]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [28]:
# Evaluating against test data
test_evaluation = Fitted_model.evaluate (test_data)

In [30]:
test_evaluation.predictions.describe().show()

+-------+-------------------+-------------------+
|summary|              Churn|         prediction|
+-------+-------------------+-------------------+
|  count|                264|                264|
|   mean| 0.1893939393939394|0.13636363636363635|
| stddev|0.39256542904947994|0.34382609612430554|
|    min|                  0|                0.0|
|    max|                  1|                1.0|
+-------+-------------------+-------------------+



In [31]:
churn_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='Churn')

In [32]:
auc = churn_eval.evaluate (test_evaluation.predictions)



In [33]:
auc

0.7489719626168224

In [34]:
### Predict on own data

In [35]:
Final_model = lr_churn.fit (final_data)

In [38]:
new_customer = spark.read.csv ('Logistic_Regression/new_customers.csv' , inferSchema=True , header=True)

In [40]:
new_customer.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)



In [41]:
test_new_customer = assembler.transform(new_customer)

In [42]:
test_new_customer.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- features: vector (nullable = true)



In [43]:
final_results = Final_model.transform(test_new_customer)

In [44]:
final_results.show()

+--------------+----+--------------+---------------+-----+---------+-------------------+--------------------+----------------+--------------------+--------------------+--------------------+----------+
|         Names| Age|Total_Purchase|Account_Manager|Years|Num_Sites|       Onboard_date|            Location|         Company|            features|       rawPrediction|         probability|prediction|
+--------------+----+--------------+---------------+-----+---------+-------------------+--------------------+----------------+--------------------+--------------------+--------------------+----------+
| Andrew Mccall|37.0|       9935.53|              1| 7.71|      8.0|2011-08-29 18:37:54|38612 Johnny Stra...|        King Ltd|[37.0,9935.53,1.0...|[2.22168680572547...|[0.90218015921764...|       0.0|
|Michele Wright|23.0|       7526.94|              1| 9.28|     15.0|2013-07-22 18:19:54|21083 Nicole Junc...|   Cannon-Benson|[23.0,7526.94,1.0...|[-6.2207539991845...|[0.00198380259784...|       

In [45]:
final_results.select('company','prediction').show()

+----------------+----------+
|         company|prediction|
+----------------+----------+
|        King Ltd|       0.0|
|   Cannon-Benson|       1.0|
|Barron-Robertson|       1.0|
|   Sexton-Golden|       1.0|
|        Wood LLC|       0.0|
|   Parks-Robbins|       1.0|
+----------------+----------+

