# Customer retention case study

In [1]:
import os
os.environ['PYSPARK_DRIVER_PYTHON'] = '/opt/anaconda3/bin/python3.7'
os.environ['PYSPARK_PYTHON'] = '/opt/anaconda3/bin/python3.7'
os.environ["SPARK_HOME"] = '/opt/cloudera/parcels/CDH/lib/spark'

import findspark
findspark.init()
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Customer retention').getOrCreate()


# Read the dataset

In [3]:
df = spark.read.csv('bankCustomer.csv', inferSchema = True, header=True)
df.show(3)

+----+----------+--------+-----------+--------+------+---+------+-----------+-----------+---------+------------+---------+------+
|SrNo|CustomerId|LastName|CreditScore|Location|   Sex|Age|Tenure|CurrBalance|NumProducts|OwnCrCard|ActiveMember|   Salary|Exited|
+----+----------+--------+-----------+--------+------+---+------+-----------+-----------+---------+------------+---------+------+
|   1|  15634602|Hargrave|        619|  France|Female| 42|     2|        0.0|          1|        1|           1|101348.88|     1|
|   2|  15647311|    Hill|        608|   Spain|Female| 41|     1|   83807.86|          1|        0|           1|112542.58|     0|
|   3|  15619304|    Onio|        502|  France|Female| 42|     8|   159660.8|          3|        1|           0|113931.57|     1|
+----+----------+--------+-----------+--------+------+---+------+-----------+-----------+---------+------------+---------+------+
only showing top 3 rows



In [4]:
cols = df.columns

In [5]:
df.printSchema()

root
 |-- SrNo: integer (nullable = true)
 |-- CustomerId: integer (nullable = true)
 |-- LastName: string (nullable = true)
 |-- CreditScore: integer (nullable = true)
 |-- Location: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tenure: integer (nullable = true)
 |-- CurrBalance: double (nullable = true)
 |-- NumProducts: integer (nullable = true)
 |-- OwnCrCard: integer (nullable = true)
 |-- ActiveMember: integer (nullable = true)
 |-- Salary: double (nullable = true)
 |-- Exited: integer (nullable = true)



# Use One hot Encoding for the categorical columns

In [6]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
location_indexer = StringIndexer(inputCol="Location", outputCol="locationIndex")
sex_indexer = StringIndexer(inputCol="Sex", outputCol="genderIndex")
onehotencoder_location_vector = OneHotEncoder(inputCol="locationIndex", outputCol="location_vec")
onehotencoder_gender_vector = OneHotEncoder(inputCol="genderIndex", outputCol="gender_vec")


In [7]:
pipeline=Pipeline(stages=[location_indexer,sex_indexer,onehotencoder_location_vector,onehotencoder_gender_vector])

In [8]:
df_transformed = pipeline.fit(df).transform(df)
df_transformed.show()

+----+----------+---------+-----------+--------+------+---+------+-----------+-----------+---------+------------+---------+------+-------------+-----------+-------------+-------------+
|SrNo|CustomerId| LastName|CreditScore|Location|   Sex|Age|Tenure|CurrBalance|NumProducts|OwnCrCard|ActiveMember|   Salary|Exited|locationIndex|genderIndex| location_vec|   gender_vec|
+----+----------+---------+-----------+--------+------+---+------+-----------+-----------+---------+------------+---------+------+-------------+-----------+-------------+-------------+
|   1|  15634602| Hargrave|        619|  France|Female| 42|     2|        0.0|          1|        1|           1|101348.88|     1|          0.0|        1.0|(2,[0],[1.0])|    (1,[],[])|
|   2|  15647311|     Hill|        608|   Spain|Female| 41|     1|   83807.86|          1|        0|           1|112542.58|     0|          2.0|        1.0|    (2,[],[])|    (1,[],[])|
|   3|  15619304|     Onio|        502|  France|Female| 42|     8|   159660

In [None]:
df.show(2)

In [9]:
df_transformed=df_transformed.withColumn("Exited",df.Exited)
df_transformed.show()


+----+----------+---------+-----------+--------+------+---+------+-----------+-----------+---------+------------+---------+------+-------------+-----------+-------------+-------------+
|SrNo|CustomerId| LastName|CreditScore|Location|   Sex|Age|Tenure|CurrBalance|NumProducts|OwnCrCard|ActiveMember|   Salary|Exited|locationIndex|genderIndex| location_vec|   gender_vec|
+----+----------+---------+-----------+--------+------+---+------+-----------+-----------+---------+------------+---------+------+-------------+-----------+-------------+-------------+
|   1|  15634602| Hargrave|        619|  France|Female| 42|     2|        0.0|          1|        1|           1|101348.88|     1|          0.0|        1.0|(2,[0],[1.0])|    (1,[],[])|
|   2|  15647311|     Hill|        608|   Spain|Female| 41|     1|   83807.86|          1|        0|           1|112542.58|     0|          2.0|        1.0|    (2,[],[])|    (1,[],[])|
|   3|  15619304|     Onio|        502|  France|Female| 42|     8|   159660

# Create the feature vector

In [10]:
inputCols = ["CreditScore", "Age", "Tenure", "CurrBalance", "NumProducts", "OwnCrCard","ActiveMember","Salary","location_vec", "gender_vec"]
outputCol = "features"
df_va = VectorAssembler(inputCols = inputCols, outputCol = outputCol)
df_final = df_va.transform(df_transformed)

In [11]:
df_final=df_final.select(["features","Exited"])
df_final=df_final.withColumnRenamed("Exited","label")

# Split the dataset into train and test

In [12]:
train, test = df_final.randomSplit([0.7, 0.3], seed=100)
print(train.count())
print(test.count())

7043
2957


# Apply the various models and select the best model

### Logistic Regression

In [13]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(labelCol = 'label', featuresCol = 'features', maxIter=10)
lrModel = lr.fit(train)

In [14]:
predictions = lrModel.transform(test)
predictions.take(1)

[Row(features=SparseVector(11, {0: 519.0, 1: 47.0, 2: 6.0, 3: 157296.02, 4: 2.0, 7: 147278.43}), label=1, rawPrediction=DenseVector([-0.1547, 0.1547]), probability=DenseVector([0.4614, 0.5386]), prediction=1.0)]

In [15]:
predictions.printSchema()

root
 |-- features: vector (nullable = true)
 |-- label: integer (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [16]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print('Area Under ROC', evaluator.evaluate(predictions))

Area Under ROC 0.770944097119996


In [17]:
evaluator.getMetricName()

'areaUnderROC'

In [18]:
print(lr.explainParams())

aggregationDepth: suggested depth for treeAggregate (>= 2). (default: 2)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0)
family: The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial (default: auto)
featuresCol: features column name. (default: features, current: features)
fitIntercept: whether to fit an intercept term. (default: True)
labelCol: label column name. (default: label, current: label)
lowerBoundsOnCoefficients: The lower bounds on coefficients if fitting under bound constrained optimization. The bound matrix must be compatible with the shape (1, number of features) for binomial regression, or (number of classes, number of features) for multinomial regression. (undefined)
lowerBoundsOnIntercepts: The lower bounds on intercepts if fitting under bound constrained optimization. The

In [19]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr.maxIter, [1, 5, 10])
             .build())

In [20]:
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(train)

In [21]:
predictions = cvModel.transform(test)
print('Area Under ROC', evaluator.evaluate(predictions))

Area Under ROC 0.7760017087208159


### Decision Trees

In [22]:
from pyspark.ml.classification import DecisionTreeClassifier

dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'label', maxDepth = 3)
dtModel = dt.fit(train)

In [23]:
print("numNodes = ", dtModel.numNodes)
print("depth = ", dtModel.depth)

numNodes =  13
depth =  3


In [24]:
predictions = dtModel.transform(test)
predictions.printSchema()

root
 |-- features: vector (nullable = true)
 |-- label: integer (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [27]:
predictions.show(10)

+--------------------+-----+--------------+--------------------+----------+
|            features|label| rawPrediction|         probability|prediction|
+--------------------+-----+--------------+--------------------+----------+
|(11,[0,1,2,3,4,7]...|    1| [321.0,290.0]|[0.52536824877250...|       0.0|
|(11,[0,1,2,3,4,7]...|    1| [321.0,290.0]|[0.52536824877250...|       0.0|
|(11,[0,1,2,3,4,7]...|    0|[4375.0,523.0]|[0.89322172315230...|       0.0|
|(11,[0,1,2,3,4,7]...|    0|[4375.0,523.0]|[0.89322172315230...|       0.0|
|(11,[0,1,2,3,4,7]...|    0|[4375.0,523.0]|[0.89322172315230...|       0.0|
|(11,[0,1,2,3,4,7]...|    0|[4375.0,523.0]|[0.89322172315230...|       0.0|
|(11,[0,1,2,3,4,7]...|    0|[4375.0,523.0]|[0.89322172315230...|       0.0|
|(11,[0,1,2,3,4,7]...|    0|[4375.0,523.0]|[0.89322172315230...|       0.0|
|(11,[0,1,2,3,4,7]...|    1|  [43.0,248.0]|[0.14776632302405...|       1.0|
|(11,[0,1,2,3,4,7]...|    0|[4375.0,523.0]|[0.89322172315230...|       0.0|
+-----------

In [28]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(predictions)

0.2569322977099677

In [29]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
paramGrid = (ParamGridBuilder()
             .addGrid(dt.maxDepth, [1, 2, 6, 10])
             .addGrid(dt.maxBins, [20, 40, 80])
             .build())

In [30]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=dt, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(train)

In [31]:
print("numNodes = ", cvModel.bestModel.numNodes)
print("depth = ", cvModel.bestModel.depth)

numNodes =  491
depth =  10


In [32]:
predictions = cvModel.transform(test)
evaluator.evaluate(predictions)

0.6306572513840061

In [33]:
predictions.show(3)

+--------------------+-----+-------------+--------------------+----------+
|            features|label|rawPrediction|         probability|prediction|
+--------------------+-----+-------------+--------------------+----------+
|(11,[0,1,2,3,4,7]...|    1|   [23.0,6.0]|[0.79310344827586...|       0.0|
|(11,[0,1,2,3,4,7]...|    1| [109.0,11.0]|[0.90833333333333...|       0.0|
|(11,[0,1,2,3,4,7]...|    0| [484.0,54.0]|[0.89962825278810...|       0.0|
+--------------------+-----+-------------+--------------------+----------+
only showing top 3 rows



### Random Forest

In [34]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'label')
rfModel = rf.fit(train)
predictions = rfModel.transform(test)
predictions.printSchema()

root
 |-- features: vector (nullable = true)
 |-- label: integer (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [35]:
predictions.show(3)

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|(11,[0,1,2,3,4,7]...|    1|[11.7802956554716...|[0.58901478277358...|       0.0|
|(11,[0,1,2,3,4,7]...|    1|[11.9574743961301...|[0.59787371980650...|       0.0|
|(11,[0,1,2,3,4,7]...|    0|[17.4434786554100...|[0.87217393277050...|       0.0|
+--------------------+-----+--------------------+--------------------+----------+
only showing top 3 rows



In [36]:
evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(predictions)

0.8457446501451825

In [37]:
paramGrid = (ParamGridBuilder()
             .addGrid(rf.maxDepth, [2, 4, 6])
             .addGrid(rf.maxBins, [20, 60])
             .addGrid(rf.numTrees, [5, 20])
             .build())

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations.  This can take about 6 minutes since it is training over 20 trees!
cvModel = cv.fit(train)

In [38]:
predictions = cvModel.transform(test)
evaluator.evaluate(predictions)

0.8543405694196703

In [40]:
predictions.show(3)

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|(11,[0,1,2,3,4,7]...|    1|[11.6367825390349...|[0.58183912695174...|       0.0|
|(11,[0,1,2,3,4,7]...|    1|[13.5564240323507...|[0.67782120161753...|       0.0|
|(11,[0,1,2,3,4,7]...|    0|[16.8810018565629...|[0.84405009282814...|       0.0|
+--------------------+-----+--------------------+--------------------+----------+
only showing top 3 rows



### Make Predictions

In [42]:
bestModel = cvModel.bestModel
final_predictions = bestModel.transform(df_final)
evaluator.evaluate(final_predictions)

0.8586116347754902