In [1]:
# Loading data from DBFS
path = '/FileStore/tables/Churn_Modelling.csv'
df = spark.read.format("csv").option("inferSchema","true").option("header","true").load(path)
display(df)

RowNumber,CustomerId,Surname,CreditScore,Geography,Gender,Age,Tenure,Balance,NumOfProducts,HasCrCard,IsActiveMember,EstimatedSalary,Exited
1,15634602,Hargrave,619,France,Female,42,2,0.0,1,1,1,101348.88,1
2,15647311,Hill,608,Spain,Female,41,1,83807.86,1,0,1,112542.58,0
3,15619304,Onio,502,France,Female,42,8,159660.8,3,1,0,113931.57,1
4,15701354,Boni,699,France,Female,39,1,0.0,2,0,0,93826.63,0
5,15737888,Mitchell,850,Spain,Female,43,2,125510.82,1,1,1,79084.1,0
6,15574012,Chu,645,Spain,Male,44,8,113755.78,2,1,0,149756.71,1
7,15592531,Bartlett,822,France,Male,50,7,0.0,2,1,1,10062.8,0
8,15656148,Obinna,376,Germany,Female,29,4,115046.74,4,1,0,119346.88,1
9,15792365,He,501,France,Male,44,4,142051.07,2,0,1,74940.5,0
10,15592389,H?,684,France,Male,27,2,134603.88,1,1,1,71725.73,0


In [2]:
# Getting the total count of the records
df.count()

In [3]:
# Getting the column names
df.columns

In [4]:
from pyspark.sql.functions import isnan, count, when, col
display(df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]))
# when takes 2 arguments - condition, column name where condition is to be checked

RowNumber,CustomerId,Surname,CreditScore,Geography,Gender,Age,Tenure,Balance,NumOfProducts,HasCrCard,IsActiveMember,EstimatedSalary,Exited
0,0,0,0,0,0,0,0,0,0,0,0,0,0


In [5]:
df.printSchema()

In [6]:
df.groupBy('Exited').count().show()
# This shows that the dataset is imbalanced

In [7]:
df.select('Geography','Exited').groupBy('Geography').count().show()
# France has maximum customers

In [8]:
# Creating a temporary table so we can apply sql queries over it
df.createOrReplaceTempView('churn')

In [9]:
%sql

select Geography, Exited, count(*) from churn group by Geography, Exited

Geography,Exited,count(1)
Germany,0,1695
Spain,0,2064
France,0,4204
Germany,1,814
France,1,810
Spain,1,413


In [10]:
display(df.groupBy('Gender').count())

Gender,count
Female,4543
Male,5457


In [11]:
%sql

select Gender, Exited, count(*) from churn group by Gender,Exited

Gender,Exited,count(1)
Male,1,898
Female,0,3404
Male,0,4559
Female,1,1139


In [12]:
%sql

select Tenure, count(Tenure) as Count from churn group by Tenure order by Tenure

Tenure,Count
0,413
1,1035
2,1048
3,1009
4,989
5,1012
6,967
7,1028
8,1025
9,984


In [13]:
%sql

select Tenure, Exited, count(*) from churn group by Tenure, Exited order by Tenure

Tenure,Exited,count(1)
0,1,95
0,0,318
1,0,803
1,1,232
2,1,201
2,0,847
3,1,213
3,0,796
4,0,786
4,1,203


In [14]:
%sql

select Tenure, count(Tenure) as Count from churn group by Tenure order by Count desc

Tenure,Count
2,1048
1,1035
7,1028
8,1025
5,1012
3,1009
4,989
9,984
6,967
10,490


In [15]:
%sql

select Age, Exited from churn
/* People with age around 40-50 are having more chances to exit */

Age,Exited
42,1
41,0
42,1
39,0
43,0
44,1
50,0
29,1
44,0
27,0


In [16]:
%sql

select Balance, Exited from churn

Balance,Exited
0.0,1
83807.86,0
159660.8,1
0.0,0
125510.82,0
113755.78,1
0.0,0
115046.74,1
142051.07,0
134603.88,0


In [17]:
%sql

select HasCrCard, Exited, count(*) from churn group by HasCrCard, Exited

/* Exit is not depending on whether customer is having a credit card or not. As exit count of customers with credit card is almost same with customers not having credit card */

HasCrCard,Exited,count(1)
1,0,5631
1,1,1424
0,0,2332
0,1,613


In [18]:
%sql

select IsActiveMember, Exited, count(*) from churn group by IsActiveMember, Exited

/* From the count it can be seen that customers who are not an active member are having more chances of leaving the bank. 36% of inactive customers are leaving as compared to 16% of active customers */ 

IsActiveMember,Exited,count(1)
1,0,4416
1,1,735
0,0,3547
0,1,1302


In [19]:
%sql

select CreditScore, HasCrCard from churn

/* Credit score is also not having much effect on customer's exit rate */

CreditScore,HasCrCard
619,1
608,0
502,1
699,0
850,1
645,1
822,1
376,1
501,0
684,1


In [20]:
df.select('NumOfProducts').distinct().show()
# This indicates how many products of the bank a customer is using

In [21]:
%sql

select NumOfProducts, Exited, count(*) from churn group by NumOfProducts, Exited

/* 100% of customers are leaving the bank if they are using all the 4 products of the bank */


NumOfProducts,Exited,count(1)
1,0,3675
3,1,220
1,1,1409
2,1,348
2,0,4242
3,0,46
4,1,60


In [22]:
df.select('EstimatedSalary').show()

In [23]:
%sql

select EstimatedSalary, Exited from churn

EstimatedSalary,Exited
101348.88,1
112542.58,0
113931.57,1
93826.63,0
79084.1,0
149756.71,1
10062.8,0
119346.88,1
74940.5,0
71725.73,0


In [24]:
%sql

select IsActiveMember, HasCrCard, count(*) from churn group by IsActiveMember,HasCrCard

/* Credit card is not depending on whether a customer is an active memeber or not */

IsActiveMember,HasCrCard,count(1)
1,0,1544
1,1,3607
0,0,1401
0,1,3448


In [25]:
df1 = df.drop('RowNumber','CustomerId','Surname','IsActiveMember','EstimatedSalary','CreditScore')
display(df1)

Geography,Gender,Age,Tenure,Balance,NumOfProducts,HasCrCard,Exited
France,Female,42,2,0.0,1,1,1
Spain,Female,41,1,83807.86,1,0,0
France,Female,42,8,159660.8,3,1,1
France,Female,39,1,0.0,2,0,0
Spain,Female,43,2,125510.82,1,1,0
Spain,Male,44,8,113755.78,2,1,1
France,Male,50,7,0.0,2,1,0
Germany,Female,29,4,115046.74,4,1,1
France,Male,44,4,142051.07,2,0,0
France,Male,27,2,134603.88,1,1,0


In [26]:
df1.show()

In [27]:
df1.count()

In [28]:
display(df1.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df1.columns]))

Geography,Gender,Age,Tenure,Balance,NumOfProducts,HasCrCard,Exited
0,0,0,0,0,0,0,0


In [29]:
df1.show()

In [30]:
df1.select('Geography').distinct().show()

In [31]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol = 'Geography', outputCol = 'Geography_num')
df1_index = indexer.fit(df1).transform(df1)
df1_index.show()

In [32]:
indexer = StringIndexer(inputCol = 'Gender', outputCol = 'Gender_num')
df1_index = indexer.fit(df1_index).transform(df1_index)
df1_index.show()

In [33]:
df1_index.select('Age').describe().show()


In [34]:
# Dense vectors are the vecotrs containing all the independent features requred to train our model
from pyspark.ml.feature import VectorAssembler
va = VectorAssembler(inputCols = ['Geography_num','Gender_num','Age','Tenure','Balance','NumOfProducts','HasCrCard'], outputCol = 'features')
df1_vect = va.transform(df1_index)
df1_vect.show()

In [35]:
# Normalization and Standardization is done on dense vectors
from pyspark.ml.feature import MinMaxScaler
mms = MinMaxScaler(inputCol = 'features', outputCol = 'features_scaled')
df1_vect = mms.fit(df1_vect).transform(df1_vect)
# df1_vect = mms_model.transform(df1_vect)
df1_vect.show()

In [36]:
# Splitting the data- 80% train data and 20% test data
train_data, test_data = df1_vect.randomSplit([0.8, 0.2], 0)

from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol = 'features_scaled', labelCol = 'Exited', maxIter = 10)
lrmodel = lr.fit(train_data)

# Getting coefficients of each independent variable
print(lrmodel.coefficients)
# Getting the intercept
print(lrmodel.intercept)

In [37]:
# Changing the linear regression model to summary tpe so that we can get evaluation metrics
summary = lrmodel.summary
accuracy = summary.accuracy
print("Accuracy",accuracy)
fpr = summary.weightedFalsePositiveRate
print("False Positive Rate", fpr)
tpr = summary.weightedTruePositiveRate
print("True Positive Rate", tpr)
fscore = summary.weightedFMeasure()
print("F1 Score", fscore)
precision = summary.weightedPrecision
print("Precision", precision)
recall = summary.weightedRecall
print("Recall", recall)
roc = summary.areaUnderROC
print("ROC", roc)

In [38]:
# Getting the predictions for the test data
predictions = lrmodel.transform(test_data)
predictions.show()

In [39]:
# Calculating the evalution parameters
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol = 'Exited', predictionCol = 'prediction', metricName = 'accuracy')
acc = evaluator.evaluate(predictions)
print("Accuracy from Logistic Regression: ", round((acc*100),2),'%',sep = '')

In [40]:
from pyspark.ml.classification import DecisionTreeClassifier
dtc = DecisionTreeClassifier(labelCol = 'Exited', featuresCol = 'features_scaled')
dtc_model = dtc.fit(train_data)
dtc_model
# Using all the default hyperparamters

In [41]:
dtc_predict = dtc_model.transform(test_data)
dtc_predict.show()

In [42]:
evaluator = MulticlassClassificationEvaluator(labelCol = 'Exited', predictionCol = 'prediction', metricName = 'accuracy')
dtc_acc = evaluator.evaluate(dtc_predict)
print("Accuracy from Decision Trees: ",round((dtc_acc*100),2),'%',sep='')

In [43]:
from pyspark.ml.classification import RandomForestClassifier
rfc = RandomForestClassifier(labelCol = 'Exited', featuresCol = 'features_scaled')
rfc_model = rfc.fit(train_data)
rfc_predict = rfc_model.transform(test_data)
rfc_predict
# Using all the default hyperparameters

In [44]:
evaluator = MulticlassClassificationEvaluator(labelCol = 'Exited', predictionCol = 'prediction', metricName = 'accuracy')
rfc_acc = evaluator.evaluate(rfc_predict)
print("Accuracy obtained from Random Forest: ", round((rfc_acc*100),2),'%', sep='')
#Here accuracy is improved compared to Decision Trees and Logistic Regression