In [1]:
#Loading PySpark supporting libraries.

import findspark
findspark.init('/home/shoby/spark-2.4.0-bin-hadoop2.7')
import pyspark

In [2]:
#Importing SparkSession

from pyspark.sql import SparkSession

In [3]:
#Creating Spark App

spark = SparkSession.builder.appName('Customer_Churn').getOrCreate()

In [4]:
#Importing the dataset, custome

data = spark.read.csv('customer_churn.csv', header = True, inferSchema = True)

In [5]:
#Checking how data looks like.

data.show(3)

+----------------+----+--------------+---------------+-----+---------+-------------------+--------------------+--------------------+-----+
|           Names| Age|Total_Purchase|Account_Manager|Years|Num_Sites|       Onboard_date|            Location|             Company|Churn|
+----------------+----+--------------+---------------+-----+---------+-------------------+--------------------+--------------------+-----+
|Cameron Williams|42.0|       11066.8|              0| 7.22|      8.0|2013-08-30 07:00:40|10265 Elizabeth M...|          Harvey LLC|    1|
|   Kevin Mueller|41.0|      11916.22|              0|  6.5|     11.0|2013-08-13 00:38:46|6157 Frank Garden...|          Wilson PLC|    1|
|     Eric Lozano|38.0|      12884.75|              0| 6.67|     12.0|2016-06-29 06:20:07|1331 Keith Court ...|Miller, Johnson a...|    1|
+----------------+----+--------------+---------------+-----+---------+-------------------+--------------------+--------------------+-----+
only showing top 3 rows



In [6]:
data.columns

['Names',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites',
 'Onboard_date',
 'Location',
 'Company',
 'Churn']

In [7]:
data.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: integer (nullable = true)



In [8]:
#Importing countDistinct to count unique values.

from pyspark.sql.functions import countDistinct

#Checking how many distinct Companies are there.

data.agg(countDistinct('Company')).show()

+-----------------------+
|count(DISTINCT Company)|
+-----------------------+
|                    873|
+-----------------------+



In [9]:
#Data processing plan.

#1 - Drop [na, Names, Account_Manager, Company]
#2 - Extract onboard_date year
#3 - Extract location city
data.columns

['Names',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites',
 'Onboard_date',
 'Location',
 'Company',
 'Churn']

In [10]:
#1 - Dropping Na.
data = data.dropna()

In [11]:
#1 - Dropping Names, Account_Manager

data = data.drop('Names','Account_Manager','Company')

In [12]:
# Checking the location data and see if its usefull for our use.
# its not.
data.select('Location').show(3, False)

+--------------------------------------------------+
|Location                                          |
+--------------------------------------------------+
|10265 Elizabeth Mission Barkerburgh, AK 89518     |
|6157 Frank Gardens Suite 019 Carloshaven, RI 17756|
|1331 Keith Court Alyssahaven, DE 90114            |
+--------------------------------------------------+
only showing top 3 rows



In [13]:
#Dropping location column.

data = data.drop('Location')

In [14]:
data.show(3)

+----+--------------+-----+---------+-------------------+-----+
| Age|Total_Purchase|Years|Num_Sites|       Onboard_date|Churn|
+----+--------------+-----+---------+-------------------+-----+
|42.0|       11066.8| 7.22|      8.0|2013-08-30 07:00:40|    1|
|41.0|      11916.22|  6.5|     11.0|2013-08-13 00:38:46|    1|
|38.0|      12884.75| 6.67|     12.0|2016-06-29 06:20:07|    1|
+----+--------------+-----+---------+-------------------+-----+
only showing top 3 rows



In [15]:
#Importing functions to extract timestamp data.

from pyspark.sql.functions import year, month, dayofmonth

In [16]:
#Addting years column to the dataframe, extracting data from Onboard_date

data = data.withColumn('years', year("Onboard_date"))

In [17]:
#Addting months column to the dataframe, extracting data from Onboard_date

data = data.withColumn('months', month("Onboard_date"))

In [18]:
#Addting dayofmonth column to the dataframe, extracting data from Onboard_date

data = data.withColumn('DayOfMonth', dayofmonth("Onboard_date"))

In [19]:
data.show(3)

+----+--------------+-----+---------+-------------------+-----+------+----------+
| Age|Total_Purchase|years|Num_Sites|       Onboard_date|Churn|months|DayOfMonth|
+----+--------------+-----+---------+-------------------+-----+------+----------+
|42.0|       11066.8| 2013|      8.0|2013-08-30 07:00:40|    1|     8|        30|
|41.0|      11916.22| 2013|     11.0|2013-08-13 00:38:46|    1|     8|        13|
|38.0|      12884.75| 2016|     12.0|2016-06-29 06:20:07|    1|     6|        29|
+----+--------------+-----+---------+-------------------+-----+------+----------+
only showing top 3 rows



In [20]:
#Dropping Onboard_date column as its data has been extracted.

data = data.drop('Onboard_date')

In [21]:
#Importing vectrizer so that data can be vectored.

from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [22]:
#Checking data column names, so vectorAssembler can be configured.

data.columns

['Age',
 'Total_Purchase',
 'years',
 'Num_Sites',
 'Churn',
 'months',
 'DayOfMonth']

In [23]:
#Creating a VectorAssembler instance and configuring it for input and output columns.

VA = VectorAssembler(inputCols=['Age', 'Total_Purchase', 'years', 'Num_Sites', 'months', 'DayOfMonth'], 
                        outputCol='features')

In [24]:
#Transforming the data using VectorAssembler, into vData.

vData = VA.transform(data)

In [25]:
#checking how vData looks like.

vData.show(3)

+----+--------------+-----+---------+-----+------+----------+--------------------+
| Age|Total_Purchase|years|Num_Sites|Churn|months|DayOfMonth|            features|
+----+--------------+-----+---------+-----+------+----------+--------------------+
|42.0|       11066.8| 2013|      8.0|    1|     8|        30|[42.0,11066.8,201...|
|41.0|      11916.22| 2013|     11.0|    1|     8|        13|[41.0,11916.22,20...|
|38.0|      12884.75| 2016|     12.0|    1|     6|        29|[38.0,12884.75,20...|
+----+--------------+-----+---------+-----+------+----------+--------------------+
only showing top 3 rows



In [26]:
#Creating a final dataset for test and training.

final_data = vData.select('features', 'Churn')

In [27]:
#Checking how final_data looks

final_data.show(3)

+--------------------+-----+
|            features|Churn|
+--------------------+-----+
|[42.0,11066.8,201...|    1|
|[41.0,11916.22,20...|    1|
|[38.0,12884.75,20...|    1|
+--------------------+-----+
only showing top 3 rows



In [28]:
#Splitting final data into test and train data.

train_data, test_data = final_data.randomSplit([0.7,0.3])

In [29]:
#Checking test and train data.

train_data.describe().show()

+-------+-------------------+
|summary|              Churn|
+-------+-------------------+
|  count|                598|
|   mean|0.18561872909698995|
| stddev|0.38912417456799425|
|    min|                  0|
|    max|                  1|
+-------+-------------------+



In [30]:
test_data.describe().show()

+-------+-------------------+
|summary|              Churn|
+-------+-------------------+
|  count|                302|
|   mean| 0.1291390728476821|
| stddev|0.33591040649627546|
|    min|                  0|
|    max|                  1|
+-------+-------------------+



In [31]:
#Importing logistic regression.

from pyspark.ml.classification import LogisticRegression

In [32]:
#Creating a logistic regression instance.
#Setting up logistic regression algorithm for incoming data.

lr = LogisticRegression(featuresCol='features', labelCol='Churn', predictionCol='prediction')

In [33]:
#Creating logistic regression model based on above specification and final_data

lr_model = lr.fit(train_data)

In [34]:
#evaluating test_data using Lr model.

lr_predictions = lr_model.evaluate(test_data)

In [35]:
lr_predictions.predictions.show()

+--------------------+-----+--------------------+--------------------+----------+
|            features|Churn|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[28.0,11245.38,20...|    0|[3.98004270550838...|[0.98165787826014...|       0.0|
|[29.0,5900.78,200...|    0|[3.55353209193736...|[0.97217313775616...|       0.0|
|[29.0,8688.17,201...|    1|[3.04328803886282...|[0.95449186585591...|       0.0|
|[29.0,9617.59,201...|    0|[3.72597389458189...|[0.97647703132984...|       0.0|
|[30.0,6744.87,201...|    0|[2.78658894142940...|[0.94194679866292...|       0.0|
|[30.0,7960.64,201...|    1|[1.54775501253021...|[0.82458924935051...|       0.0|
|[30.0,8874.83,200...|    0|[2.64308566535218...|[0.93358354821823...|       0.0|
|[30.0,12788.37,20...|    0|[1.28285293796858...|[0.78293501824053...|       0.0|
|[30.0,13473.35,20...|    0|[1.77199083959460...|[0.85470507672800...|       0.0|
|[31.0,8688.21,2

In [36]:
#Importing classification evaluator.

from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [37]:
#creating a binary classification evaluator item wheren rawPredictions are the "prediction" col
#Actual labels column is 'Churn' column. we will use this classifier to find accuracy of prediction.

churn_eval = BinaryClassificationEvaluator(rawPredictionCol = 'prediction', labelCol = 'Churn')

In [38]:
#finding area under curve by evaluating binary classification on lr prediction's predictions.

auc = churn_eval.evaluate(lr_predictions.predictions)

In [39]:
#Area under curve is 77.4%

auc

0.748318221702252

In [40]:
#Based on above data pre-processing sequence, here is a function that takes new customer data and transform
#it for model compatability.

from pyspark.sql.functions import year, month, dayofmonth
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

def dataProcessing(dataframe):
    dataframe = dataframe.dropna()
    dataframe = dataframe.drop('Names','Account_Manager','Location')
    dataframe = dataframe.withColumn('years', year("Onboard_date"))
    dataframe = dataframe.withColumn('months', month("Onboard_date"))
    dataframe = dataframe.withColumn('DayOfMonth', dayofmonth("Onboard_date"))
    dataframe = dataframe.drop('Onboard_date')
    VA = VectorAssembler(inputCols=['Age', 'Total_Purchase', 'years', 'Num_Sites', 'months', 'DayOfMonth'], 
                        outputCol='features')
    vData = VA.transform(dataframe)
    return vData
 

In [41]:
#importing production (simulation) customer data.

new_data = spark.read.csv('new_customers.csv', header = True, inferSchema = True)

In [50]:
#Seeing how the data looks like.

new_data.show(3)

+--------------+----+--------------+---------------+-----+---------+-------------------+--------------------+----------------+
|         Names| Age|Total_Purchase|Account_Manager|Years|Num_Sites|       Onboard_date|            Location|         Company|
+--------------+----+--------------+---------------+-----+---------+-------------------+--------------------+----------------+
| Andrew Mccall|37.0|       9935.53|              1| 7.71|      8.0|2011-08-29 18:37:54|38612 Johnny Stra...|        King Ltd|
|Michele Wright|23.0|       7526.94|              1| 9.28|     15.0|2013-07-22 18:19:54|21083 Nicole Junc...|   Cannon-Benson|
|  Jeremy Chang|65.0|         100.0|              1|  1.0|     15.0|2006-12-11 07:48:13|085 Austin Views ...|Barron-Robertson|
+--------------+----+--------------+---------------+-----+---------+-------------------+--------------------+----------------+
only showing top 3 rows



In [43]:
#running dataProcessing function on new_cust data.

new_cust = dataProcessing(new_data)

In [49]:
#Checking new customer output after function application.

new_cust.show(3)

+----+--------------+-----+---------+----------------+------+----------+--------------------+
| Age|Total_Purchase|years|Num_Sites|         Company|months|DayOfMonth|            features|
+----+--------------+-----+---------+----------------+------+----------+--------------------+
|37.0|       9935.53| 2011|      8.0|        King Ltd|     8|        29|[37.0,9935.53,201...|
|23.0|       7526.94| 2013|     15.0|   Cannon-Benson|     7|        22|[23.0,7526.94,201...|
|65.0|         100.0| 2006|     15.0|Barron-Robertson|    12|        11|[65.0,100.0,2006....|
+----+--------------+-----+---------+----------------+------+----------+--------------------+
only showing top 3 rows



In [46]:
#using lr_model to transform new_cust data.

new_trans = lr_model.transform(new_cust)

In [48]:
#Final predictions using deployed model on production dataset. 

final_prediction = new_trans.select('Company','prediction').show()

+----------------+----------+
|         Company|prediction|
+----------------+----------+
|        King Ltd|       0.0|
|   Cannon-Benson|       1.0|
|Barron-Robertson|       1.0|
|   Sexton-Golden|       1.0|
|        Wood LLC|       0.0|
|   Parks-Robbins|       1.0|
+----------------+----------+

