In [8]:
# Spark for Machine Learning -Logistic Regression
# Import Spark, create spark session and load the data from our bucket
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('assignment10').getOrCreate()
data = spark.read.csv('gs://bucketassignment10/customer_churn.csv',inferSchema=True,
                     header=True)

In [9]:
# Check the schema of our spark dataframe
data.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: integer (nullable = true)



In [10]:
# Shows a summary of descriptive statistics for our spark dataframe
data.describe().show()

+-------+-------------+-----------------+-----------------+------------------+-----------------+------------------+--------------------+--------------------+-------------------+
|summary|        Names|              Age|   Total_Purchase|   Account_Manager|            Years|         Num_Sites|            Location|             Company|              Churn|
+-------+-------------+-----------------+-----------------+------------------+-----------------+------------------+--------------------+--------------------+-------------------+
|  count|          900|              900|              900|               900|              900|               900|                 900|                 900|                900|
|   mean|         null|41.81666666666667|10062.82403333334|0.4811111111111111| 5.27315555555555| 8.587777777777777|                null|                null|0.16666666666666666|
| stddev|         null|6.127560416916251|2408.644531858096|0.4999208935073339|1.274449013194616|1.764835592035

In [11]:
# Shows the data columns of our dataframe
data.columns

['Names',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites',
 'Onboard_date',
 'Location',
 'Company',
 'Churn']

In [37]:
# Import VectorAssembler from the Spark Machine Learning submodule.
# VectorAssembler takes all the columns and combines them into a new vector column
from pyspark.ml.feature import VectorAssembler

# Pick numeric values columns
assembler = VectorAssembler(inputCols=['Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites'],outputCol='features')
    
output = assembler.transform(data)
final_data = output.select('features','churn')

# Divide dataset into training set and test set
# 70% of data for training set
# 30% of data for test set
train_churn,test_churn = final_data.randomSplit([0.7,0.3])

In [39]:
# Create and fit the logistic regression model
from pyspark.ml.classification import LogisticRegression
lr_churn = LogisticRegression(labelCol='churn')
fitted_churn_model = lr_churn.fit(train_churn)
training_sum = fitted_churn_model.summary
training_sum.predictions.describe().show()

+-------+-------------------+-------------------+
|summary|              churn|         prediction|
+-------+-------------------+-------------------+
|  count|                664|                664|
|   mean|0.17018072289156627|0.12349397590361445|
| stddev|0.37607478620132123| 0.3292513881293036|
|    min|                0.0|                0.0|
|    max|                1.0|                1.0|
+-------+-------------------+-------------------+



In [19]:
# Evaluation of Results with the test data
# BinaryClassificationEvaluator expects two inputs, the raw prediction and the label
# Its function is to evaluate our model
from pyspark.ml.evaluation import BinaryClassificationEvaluator
pred_and_labels = fitted_churn_model.evaluate(test_churn)
pred_and_labels.predictions.show()

+--------------------+-----+--------------------+--------------------+----------+
|            features|churn|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[26.0,8939.61,0.0...|    0|[6.47264016361498...|[0.99845724285900...|       0.0|
|[29.0,9617.59,0.0...|    0|[4.56636606490787...|[0.98971128967545...|       0.0|
|[29.0,13240.01,1....|    0|[6.57903920044534...|[0.99861274377268...|       0.0|
|[29.0,13255.05,1....|    0|[4.01184985649093...|[0.98222189945838...|       0.0|
|[30.0,8677.28,1.0...|    0|[3.93877984688534...|[0.98089995624143...|       0.0|
|[30.0,10744.14,1....|    1|[1.57273298964715...|[0.82817286859469...|       0.0|
|[30.0,12788.37,0....|    0|[2.68254534477116...|[0.93598879414199...|       0.0|
|[30.0,13473.35,0....|    0|[2.92367208651656...|[0.94900430349779...|       0.0|
|[31.0,5387.75,0.0...|    0|[2.62571684167339...|[0.93249844451955...|       0.0|
|[31.0,10058.87,

In [41]:
# Using AUC
# AUC (Area Under The Curve) ROC (Receiver Operating Characteristics) curve
# AUC - ROC curve is a performance measurement for classification problem
# It tells how much model is capable of distinguishing between classes

churn_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction',
                                           labelCol='churn')
auc = churn_eval.evaluate(pred_and_labels.predictions)
auc

0.7419416364676713

In [40]:
# Use the model to make predictions on unlabeled data
final_lr_model = lr_churn.fit(final_data)

# new data to evaluate the model
new_customers = spark.read.csv('gs://bucketassignment10/new_customers.csv',inferSchema=True,
                              header=True)
#schema of our new data
new_customers.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)



In [32]:
test_new_customers = assembler.transform(new_customers)

In [33]:
test_new_customers.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- features: vector (nullable = true)



In [34]:
final_results = final_lr_model.transform(test_new_customers)

In [35]:
final_results.select('Company','prediction').show()

+----------------+----------+
|         Company|prediction|
+----------------+----------+
|        King Ltd|       0.0|
|   Cannon-Benson|       1.0|
|Barron-Robertson|       1.0|
|   Sexton-Golden|       1.0|
|        Wood LLC|       0.0|
|   Parks-Robbins|       1.0|
+----------------+----------+



In [None]:
# Results are 1.0 values: Cannon-Benson, Barron-Robertson, Sexton-Golden, and Parks-Robbins.
# These are the Companies that are likely to churn and that should be assigned managers to
# try to prevent them to churn.