In [1]:
# Databricks notebook source
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import GBTClassifier
from pyspark.ml import Pipeline
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from sklearn.utils import resample
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [2]:
#reading data 
spark = SparkSession.builder.appName('logReg').getOrCreate()
df = spark.read.csv('/kaggle/input/customer-churn/customer_churn.csv', header=True, inferSchema=True)

display(df)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/18 13:01:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

DataFrame[Names: string, Age: double, Total_Purchase: double, Account_Manager: int, Years: double, Num_Sites: double, Onboard_date: timestamp, Location: string, Company: string, Churn: int]

In [3]:
#check churn vs no churn data distribution
no_churn = df.filter(df['Churn']==0).count()
churn = df.filter(df['Churn']==1).count()
print("Churn :", churn)
print("No Churn :", no_churn)
diff = (no_churn - churn) /2
upSampleLength = churn + diff
downSampleLength = no_churn - diff
downSampleLength

Churn : 150
No Churn : 750


450.0

In [4]:
df_no_churn = df.filter(df['Churn']==0)
df_no_churn.toPandas()

Unnamed: 0,Names,Age,Total_Purchase,Account_Manager,Years,Num_Sites,Onboard_date,Location,Company,Churn
0,Michael Williams,35.0,15571.26,0,6.45,9.0,2011-12-02 20:13:49,"5728 Michael Rue Riosland, NY 38804-1415",Petty and Sons,0
1,Connie Golden,39.0,10268.87,1,3.68,6.0,2009-08-19 18:52:21,"3324 Gomez Knolls Suite 591 Farmerchester, LA ...",Brown-Wagner,0
2,Seth Griffin,44.0,12328.03,1,4.60,9.0,2006-08-29 02:24:37,"9436 Warner Mill Suite 265 Port Kenneth, OH 00...",Williams PLC,0
3,Rachel Cherry,52.0,9782.83,0,3.96,7.0,2012-04-17 10:47:29,"3479 Stewart Way Sandersside, ID 69317-8759","Bishop, Tran and Pope",0
4,Robert Sanders,29.0,9378.24,0,4.93,8.0,2015-08-06 22:29:28,"8199 Christopher Tunnel Suite 537 Warnerside, ...","White, Jones and Nelson",0
...,...,...,...,...,...,...,...,...,...,...
745,Paul Miller,42.0,12800.82,1,3.62,8.0,2007-12-01 13:29:34,"9316 Julian Fort Suite 328 North Leslie, ME 43961",Evans-Lucero,0
746,Natalie Hodges,52.0,9893.92,0,6.91,7.0,2008-12-28 15:23:58,"8419 William Square Apt. 695 Martinville, RI 3...",Perry and Sons,0
747,Ana Smith,45.0,12056.18,0,5.46,4.0,2014-06-20 05:10:09,Unit 8633 Box 8738 DPO AA 14126-5026,Schneider-Smith,0
748,Justin Leonard,51.0,6517.93,1,5.47,10.0,2012-05-30 00:15:43,"49800 Torres Ways Suite 886 West Bradleybury, ...",Robles-Abbott,0


In [5]:
#spparating churn and no_churn and converting to pandas dataframe
df_no_churn = df.filter(df['Churn']==0).toPandas()
df_churn = df.filter(df['Churn']==1).toPandas()
RANDOM_SEED = 10

#applying upsampling and downsampling
no_churn_downsampled = resample(df_no_churn,
              replace=True,
              n_samples=int(downSampleLength),
              random_state=10)
churn_upsampled = resample(df_churn,
              replace=True,
              n_samples=int(upSampleLength),
              random_state=10)


In [6]:
#concatenating pandas dataframe: churn and no_churn
pd_df = [no_churn_downsampled, churn_upsampled]
pd_df = pd.concat(pd_df)

#random shuffling
pd_df = pd_df.sample(frac=1).reset_index(drop=True)

#convert pandas df to spark df
df = spark.createDataFrame(pd_df)

In [7]:
display(df)
display(df)
df.show()
df.printSchema()

DataFrame[Names: string, Age: double, Total_Purchase: double, Account_Manager: bigint, Years: double, Num_Sites: double, Onboard_date: timestamp, Location: string, Company: string, Churn: bigint]

DataFrame[Names: string, Age: double, Total_Purchase: double, Account_Manager: bigint, Years: double, Num_Sites: double, Onboard_date: timestamp, Location: string, Company: string, Churn: bigint]

[Stage 11:>                                                         (0 + 1) / 1]

+------------------+----+--------------+---------------+-----+---------+-------------------+--------------------+--------------------+-----+
|             Names| Age|Total_Purchase|Account_Manager|Years|Num_Sites|       Onboard_date|            Location|             Company|Churn|
+------------------+----+--------------+---------------+-----+---------+-------------------+--------------------+--------------------+-----+
|    Samantha Clark|35.0|      12398.62|              1|  5.6|      9.0|2016-11-17 09:21:44|0385 Smith Spring...|          Grimes Inc|    0|
|    Harold Griffin|41.0|       6569.87|              1|  4.3|     11.0|2015-03-28 02:13:44|1774 Peter Row Ap...|Snyder, Lee and M...|    1|
|     Nathan Murphy|42.0|       9577.42|              0| 5.86|     11.0|2012-01-18 00:52:28|34489 Andrew Well...|          Deleon Ltd|    1|
|    Jessica Hudson|43.0|       8914.46|              0| 6.49|      7.0|2008-04-21 23:26:57|7255 Weber Port D...|Smith, Marshall a...|    0|
|Jennifer Her

                                                                                

In [8]:
#selecting columns 
df_select_col = df.select(['Age', 'Total_Purchase', 'Years', 'Num_Sites', 'Company', 'Churn'])

#drop na
df_select_col = df_select_col.na.drop()
df_select_col.printSchema()

root
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: long (nullable = true)



In [9]:
#vectorizing categorical features using string indexer and one hot encoder
# vectorizing categorical features using string indexer and one hot encoder
categoricalColumns = ['Company']
stages = []

for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(
        inputCol=categoricalCol,
        outputCol=categoricalCol + 'Index'
    )
    
    encoder = OneHotEncoder(
        inputCols=[stringIndexer.getOutputCol()],
        outputCols=[categoricalCol + "classVec"]
    )
    
    stages += [stringIndexer, encoder]

numericCols = ['Age', 'Total_Purchase', 'Years', 'Num_Sites']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols

assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [10]:
#pipeline of transforming features
cols = df_select_col.columns
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(df_select_col)
df = pipelineModel.transform(df_select_col)
selectedCols = ['features', 'Churn']
df = df.select(selectedCols)
df.printSchema()

                                                                                

root
 |-- features: vector (nullable = true)
 |-- Churn: long (nullable = true)



In [11]:
#train-test split
train, test = df.randomSplit([0.7, 0.3], seed = 20)
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

                                                                                

Training Dataset Count: 620
Test Dataset Count: 280


In [12]:
#Logistic regression
lr = LogisticRegression(featuresCol = 'features', labelCol = 'Churn', maxIter= 50)
lrModel = lr.fit(train)
predictions = lrModel.transform(test)
evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='Churn')
print('Linear Regression Test Area Under ROC', evaluator.evaluate(predictions))

Linear Regression Test Area Under ROC 0.8834971334971335


In [13]:
#Random Forest
rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'Churn', numTrees=100)
rfModel = rf.fit(train)
predictions = rfModel.transform(test)
evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='Churn')
print('Random Forest Test Area Under ROC', evaluator.evaluate(predictions))

                                                                                

Random Forest Test Area Under ROC 0.8284193284193285


In [14]:
#Gradient Boost
gbt = GBTClassifier(featuresCol = 'features', labelCol = 'Churn',maxIter=100)
gbtModel = gbt.fit(train)
predictions = gbtModel.transform(test)
evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='Churn')
print('Gradient Boost Test Area Under ROC', evaluator.evaluate(predictions))

Gradient Boost Test Area Under ROC 0.9363226863226863


In [17]:
from pyspark.sql import Row
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

new_user = spark.createDataFrame([
    Row(Age=35, Total_Purchase=2000, Years=5, Num_Sites=3, Company='Wilson PLC')
])


new_user_transformed = pipelineModel.transform(new_user)
new_user_transformed = pipelineModel.transform(new_user).select('features')

prediction = lrModel.transform(new_user_transformed)
prediction.select('features', 'prediction', 'probability').show(truncate=False)

+-----------------------------------------------------+----------+-----------+
|features                                             |prediction|probability|
+-----------------------------------------------------+----------+-----------+
|(467,[227,463,464,465,466],[1.0,35.0,2000.0,5.0,3.0])|0.0       |[1.0,0.0]  |
+-----------------------------------------------------+----------+-----------+

