# Import libraries

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Logistic Regression Model").getOrCreate()

In [13]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import VectorAssembler, VectorIndexer
from pyspark.sql import functions as f 

# Load and verify data

In [3]:
data = spark.read.csv('resources/customer_churn.csv',header = True, inferSchema = True)

In [4]:
data.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: integer (nullable = true)



In [5]:
data.head(3)

[Row(Names='Cameron Williams', Age=42.0, Total_Purchase=11066.8, Account_Manager=0, Years=7.22, Num_Sites=8.0, Onboard_date='2013-08-30 07:00:40', Location='10265 Elizabeth Mission Barkerburgh, AK 89518', Company='Harvey LLC', Churn=1),
 Row(Names='Kevin Mueller', Age=41.0, Total_Purchase=11916.22, Account_Manager=0, Years=6.5, Num_Sites=11.0, Onboard_date='2013-08-13 00:38:46', Location='6157 Frank Gardens Suite 019 Carloshaven, RI 17756', Company='Wilson PLC', Churn=1),
 Row(Names='Eric Lozano', Age=38.0, Total_Purchase=12884.75, Account_Manager=0, Years=6.67, Num_Sites=12.0, Onboard_date='2016-06-29 06:20:07', Location='1331 Keith Court Alyssahaven, DE 90114', Company='Miller, Johnson and Wallace', Churn=1)]

In [6]:
for item in data.head(1)[0]:
    print(item)

Cameron Williams
42.0
11066.8
0
7.22
8.0
2013-08-30 07:00:40
10265 Elizabeth Mission Barkerburgh, AK 89518
Harvey LLC
1


In [7]:
data.describe().show()

+-------+-------------+-----------------+-----------------+------------------+-----------------+------------------+-------------------+--------------------+--------------------+-------------------+
|summary|        Names|              Age|   Total_Purchase|   Account_Manager|            Years|         Num_Sites|       Onboard_date|            Location|             Company|              Churn|
+-------+-------------+-----------------+-----------------+------------------+-----------------+------------------+-------------------+--------------------+--------------------+-------------------+
|  count|          900|              900|              900|               900|              900|               900|                900|                 900|                 900|                900|
|   mean|         null|41.81666666666667|10062.82403333334|0.4811111111111111| 5.27315555555555| 8.587777777777777|               null|                null|                null|0.16666666666666666|
| stddev| 

In [8]:
data.columns

['Names',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites',
 'Onboard_date',
 'Location',
 'Company',
 'Churn']

# Data Preprocessing

In [9]:
df = data.withColumn("onboardDate", f.unix_timestamp("Onboard_date", "y-M-d H:m:s"))
df.head(1)

[Row(Names='Cameron Williams', Age=42.0, Total_Purchase=11066.8, Account_Manager=0, Years=7.22, Num_Sites=8.0, Onboard_date='2013-08-30 07:00:40', Location='10265 Elizabeth Mission Barkerburgh, AK 89518', Company='Harvey LLC', Churn=1, onboardDate=1377817240)]

In [10]:
my_cols = df.select(['Age', 'Total_Purchase', 'Years', 'Num_Sites', 'onboardDate','Churn'])

In [11]:
data = my_cols.na.drop()

In [14]:
assembler = VectorAssembler(inputCols =['Age', 'Total_Purchase', 'Years', 'Num_Sites', 'onboardDate'],
                            outputCol='features')

In [15]:
output = assembler.transform(data)
final_data = output.select('features','Churn')

# Train Test split

In [16]:
train_data,test_data = final_data.randomSplit([0.7,0.3])

In [17]:
train_data.show(2)

+--------------------+-----+
|            features|Churn|
+--------------------+-----+
|[22.0,11254.38,4....|    0|
|[25.0,9672.03,5.4...|    0|
+--------------------+-----+
only showing top 2 rows



In [18]:
test_data.show(2)

+--------------------+-----+
|            features|Churn|
+--------------------+-----+
|[27.0,8628.8,5.3,...|    0|
|[28.0,9090.43,5.7...|    0|
+--------------------+-----+
only showing top 2 rows



# Build Model 

In [19]:
classifier = LogisticRegression(labelCol = 'Churn')

In [20]:
model = classifier.fit(train_data)

In [21]:
pred_data = model.transform(test_data)

# Evaluate Model

In [22]:
pred_data.head(1)

[Row(features=DenseVector([27.0, 8628.8, 5.3, 7.0, 1460336304.0]), Churn=0, rawPrediction=DenseVector([5.7383, -5.7383]), probability=DenseVector([0.9968, 0.0032]), prediction=0.0)]

In [23]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol = 'Churn')

In [24]:
AUC = evaluator.evaluate(pred_data)

In [25]:
print(AUC)

0.7491437467566165


# Predict on brand new unlabeled data

In [26]:
testmodel = classifier.fit(final_data)

In [27]:
new_customers = spark.read.csv('resources/customer_churn_testset.csv',inferSchema=True,
                              header=True)

In [28]:
new_customers.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)



In [29]:
new_data = new_customers.withColumn("onboardDate", f.unix_timestamp("Onboard_date", "y-M-d H:m:s"))
new_data.head(1)

[Row(Names='Andrew Mccall', Age=37.0, Total_Purchase=9935.53, Account_Manager=1, Years=7.71, Num_Sites=8.0, Onboard_date='2011-08-29 18:37:54', Location='38612 Johnny Stravenue Nataliebury, WI 15717-8316', Company='King Ltd', onboardDate=1314614274)]

In [30]:
newoutput = assembler.transform(new_data)
newoutput.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- onboardDate: long (nullable = true)
 |-- features: vector (nullable = true)



In [31]:
new_pred = testmodel.transform(newoutput)
new_pred.select('Company','prediction').show()

+----------------+----------+
|         Company|prediction|
+----------------+----------+
|        King Ltd|       0.0|
|   Cannon-Benson|       1.0|
|Barron-Robertson|       1.0|
|   Sexton-Golden|       1.0|
|        Wood LLC|       0.0|
|   Parks-Robbins|       1.0|
+----------------+----------+

