In [1]:
import findspark

In [2]:
findspark.init('/home/adeola/spark-2.4.2-bin-hadoop2.7')

In [3]:
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder.appName('churn').getOrCreate()

In [5]:
data = spark.read.csv('customer_churn.csv',inferSchema = True, header = True)

In [6]:
data.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: integer (nullable = true)



In [7]:
new_data= data.select(['Age','Total_Purchase','Years','Num_Sites','Churn'])

In [8]:
from pyspark.sql.functions import dayofyear,year,month

In [9]:
new_data.show()

+----+--------------+-----+---------+-----+
| Age|Total_Purchase|Years|Num_Sites|Churn|
+----+--------------+-----+---------+-----+
|42.0|       11066.8| 7.22|      8.0|    1|
|41.0|      11916.22|  6.5|     11.0|    1|
|38.0|      12884.75| 6.67|     12.0|    1|
|42.0|       8010.76| 6.71|     10.0|    1|
|37.0|       9191.58| 5.56|      9.0|    1|
|48.0|      10356.02| 5.12|      8.0|    1|
|44.0|      11331.58| 5.23|     11.0|    1|
|32.0|       9885.12| 6.92|      9.0|    1|
|43.0|       14062.6| 5.46|     11.0|    1|
|40.0|       8066.94| 7.11|     11.0|    1|
|30.0|      11575.37| 5.22|      8.0|    1|
|45.0|       8771.02| 6.64|     11.0|    1|
|45.0|       8988.67| 4.84|     11.0|    1|
|40.0|       8283.32|  5.1|     13.0|    1|
|41.0|       6569.87|  4.3|     11.0|    1|
|38.0|      10494.82| 6.81|     12.0|    1|
|45.0|       8213.41| 7.35|     11.0|    1|
|43.0|      11226.88| 8.08|     12.0|    1|
|53.0|       5515.09| 6.85|      8.0|    1|
|46.0|        8046.4| 5.69|     

In [10]:
new_data.columns

['Age', 'Total_Purchase', 'Years', 'Num_Sites', 'Churn']

In [11]:
df = new_data.select(['Age', 'Total_Purchase', 'Years', 'Num_Sites'])

In [12]:
df.show()

+----+--------------+-----+---------+
| Age|Total_Purchase|Years|Num_Sites|
+----+--------------+-----+---------+
|42.0|       11066.8| 7.22|      8.0|
|41.0|      11916.22|  6.5|     11.0|
|38.0|      12884.75| 6.67|     12.0|
|42.0|       8010.76| 6.71|     10.0|
|37.0|       9191.58| 5.56|      9.0|
|48.0|      10356.02| 5.12|      8.0|
|44.0|      11331.58| 5.23|     11.0|
|32.0|       9885.12| 6.92|      9.0|
|43.0|       14062.6| 5.46|     11.0|
|40.0|       8066.94| 7.11|     11.0|
|30.0|      11575.37| 5.22|      8.0|
|45.0|       8771.02| 6.64|     11.0|
|45.0|       8988.67| 4.84|     11.0|
|40.0|       8283.32|  5.1|     13.0|
|41.0|       6569.87|  4.3|     11.0|
|38.0|      10494.82| 6.81|     12.0|
|45.0|       8213.41| 7.35|     11.0|
|43.0|      11226.88| 8.08|     12.0|
|53.0|       5515.09| 6.85|      8.0|
|46.0|        8046.4| 5.69|      8.0|
+----+--------------+-----+---------+
only showing top 20 rows



In [13]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression

In [14]:
df.columns

['Age', 'Total_Purchase', 'Years', 'Num_Sites']

In [15]:
assembler = VectorAssembler(inputCols = df.columns, outputCol = 'features')

In [16]:
final_data = assembler.transform(new_data)

In [17]:
final_data.show()

+----+--------------+-----+---------+-----+--------------------+
| Age|Total_Purchase|Years|Num_Sites|Churn|            features|
+----+--------------+-----+---------+-----+--------------------+
|42.0|       11066.8| 7.22|      8.0|    1|[42.0,11066.8,7.2...|
|41.0|      11916.22|  6.5|     11.0|    1|[41.0,11916.22,6....|
|38.0|      12884.75| 6.67|     12.0|    1|[38.0,12884.75,6....|
|42.0|       8010.76| 6.71|     10.0|    1|[42.0,8010.76,6.7...|
|37.0|       9191.58| 5.56|      9.0|    1|[37.0,9191.58,5.5...|
|48.0|      10356.02| 5.12|      8.0|    1|[48.0,10356.02,5....|
|44.0|      11331.58| 5.23|     11.0|    1|[44.0,11331.58,5....|
|32.0|       9885.12| 6.92|      9.0|    1|[32.0,9885.12,6.9...|
|43.0|       14062.6| 5.46|     11.0|    1|[43.0,14062.6,5.4...|
|40.0|       8066.94| 7.11|     11.0|    1|[40.0,8066.94,7.1...|
|30.0|      11575.37| 5.22|      8.0|    1|[30.0,11575.37,5....|
|45.0|       8771.02| 6.64|     11.0|    1|[45.0,8771.02,6.6...|
|45.0|       8988.67| 4.8

In [18]:
data_select = final_data.select(['Churn','features'])

In [19]:
data_select.show()

+-----+--------------------+
|Churn|            features|
+-----+--------------------+
|    1|[42.0,11066.8,7.2...|
|    1|[41.0,11916.22,6....|
|    1|[38.0,12884.75,6....|
|    1|[42.0,8010.76,6.7...|
|    1|[37.0,9191.58,5.5...|
|    1|[48.0,10356.02,5....|
|    1|[44.0,11331.58,5....|
|    1|[32.0,9885.12,6.9...|
|    1|[43.0,14062.6,5.4...|
|    1|[40.0,8066.94,7.1...|
|    1|[30.0,11575.37,5....|
|    1|[45.0,8771.02,6.6...|
|    1|[45.0,8988.67,4.8...|
|    1|[40.0,8283.32,5.1...|
|    1|[41.0,6569.87,4.3...|
|    1|[38.0,10494.82,6....|
|    1|[45.0,8213.41,7.3...|
|    1|[43.0,11226.88,8....|
|    1|[53.0,5515.09,6.8...|
|    1|[46.0,8046.4,5.69...|
+-----+--------------------+
only showing top 20 rows



In [20]:
lg_model = LogisticRegression(labelCol = 'Churn')

In [21]:
my_model = lg_model.fit(data_select)

In [22]:
model_summary = my_model.summary

In [23]:
model_summary.accuracy

0.8988888888888888

In [24]:
model_summary.predictions.show()

+-----+--------------------+--------------------+--------------------+----------+
|Churn|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  1.0|[42.0,11066.8,7.2...|[2.36675117437238...|[0.91425652266606...|       0.0|
|  1.0|[41.0,11916.22,6....|[-0.7999397979939...|[0.31003839681250...|       1.0|
|  1.0|[38.0,12884.75,6....|[-1.9595409361309...|[0.12351673739297...|       1.0|
|  1.0|[42.0,8010.76,6.7...|[0.36499055377901...|[0.59024796731348...|       0.0|
|  1.0|[37.0,9191.58,5.5...|[2.47184489317977...|[0.92214432064823...|       0.0|
|  1.0|[48.0,10356.02,5....|[3.24356725803294...|[0.96244127192005...|       0.0|
|  1.0|[44.0,11331.58,5....|[-0.2275403112572...|[0.44335909156700...|       1.0|
|  1.0|[32.0,9885.12,6.9...|[1.96034031135884...|[0.87656977721006...|       0.0|
|  1.0|[43.0,14062.6,5.4...|[-0.3995775862127...|[0.40141383356855...|       1.0|
|  1.0|[40.0,806

In [25]:
train_data, test_data = data_select.randomSplit([0.7,0.3])

In [26]:
new_model = lg_model.fit(train_data)

In [27]:
test_result = new_model.evaluate(test_data)

In [28]:
test_result.accuracy

0.8971631205673759

In [29]:
test_result.predictions.show()

+-----+--------------------+--------------------+--------------------+----------+
|Churn|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|    0|[28.0,9090.43,5.7...|[1.71920096709085...|[0.84802588722859...|       0.0|
|    0|[28.0,11245.38,6....|[3.61823055819813...|[0.97387093677792...|       0.0|
|    0|[29.0,12711.15,5....|[5.43649560370781...|[0.99566416412202...|       0.0|
|    0|[29.0,13240.01,4....|[7.41445507644116...|[0.99939788215477...|       0.0|
|    0|[29.0,13255.05,4....|[4.74272988223440...|[0.99136046874556...|       0.0|
|    0|[30.0,8677.28,7.3...|[4.31255327863374...|[0.98677787340814...|       0.0|
|    0|[30.0,10960.52,5....|[2.72630364430196...|[0.93856103492819...|       0.0|
|    0|[31.0,5387.75,6.8...|[2.01514402618586...|[0.88237795675806...|       0.0|
|    0|[31.0,8688.21,3.5...|[6.59669906089006...|[0.99863699437078...|       0.0|
|    0|[31.0,957

In [30]:
#Using Binary Evaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [31]:
churn_eval = BinaryClassificationEvaluator(rawPredictionCol = 'prediction',labelCol = 'Churn')

In [32]:
my_final_roc = churn_eval.evaluate(test_result.predictions)

In [33]:
my_final_roc

0.7766050626259087

# TESTING THE NEW MODEL ON NEW_CUSTOMERS DATA

In [34]:
new_customer = spark.read.csv('new_customers.csv', inferSchema = True, header = True)

In [35]:
new_customer.show()

+--------------+----+--------------+---------------+-----+---------+-------------------+--------------------+----------------+
|         Names| Age|Total_Purchase|Account_Manager|Years|Num_Sites|       Onboard_date|            Location|         Company|
+--------------+----+--------------+---------------+-----+---------+-------------------+--------------------+----------------+
| Andrew Mccall|37.0|       9935.53|              1| 7.71|      8.0|2011-08-29 18:37:54|38612 Johnny Stra...|        King Ltd|
|Michele Wright|23.0|       7526.94|              1| 9.28|     15.0|2013-07-22 18:19:54|21083 Nicole Junc...|   Cannon-Benson|
|  Jeremy Chang|65.0|         100.0|              1|  1.0|     15.0|2006-12-11 07:48:13|085 Austin Views ...|Barron-Robertson|
|Megan Ferguson|32.0|        6487.5|              0|  9.4|     14.0|2016-10-28 05:32:13|922 Wright Branch...|   Sexton-Golden|
|  Taylor Young|32.0|      13147.71|              1| 10.0|      8.0|2012-03-20 00:36:46|Unit 0789 Box 073...|  

In [36]:
new_customer.columns

['Names',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites',
 'Onboard_date',
 'Location',
 'Company']

In [37]:
test11_data = assembler.transform(new_customer)

In [38]:
test11_data.columns

['Names',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites',
 'Onboard_date',
 'Location',
 'Company',
 'features']

In [39]:
test_model = new_model.transform(test11_data)

In [40]:
final_prediction = test_model.select(['Company','prediction'])

In [41]:
final_prediction.show()

+----------------+----------+
|         Company|prediction|
+----------------+----------+
|        King Ltd|       0.0|
|   Cannon-Benson|       1.0|
|Barron-Robertson|       1.0|
|   Sexton-Golden|       1.0|
|        Wood LLC|       0.0|
|   Parks-Robbins|       1.0|
+----------------+----------+

