In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import round, udf
from pyspark.sql.types import DoubleType
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import RFormula, VectorAssembler, StringIndexer
from pyspark.ml.stat import Correlation
from pyspark.ml.evaluation import (BinaryClassificationEvaluator,
                                   MulticlassClassificationEvaluator)

In [2]:
spark = SparkSession.builder.appName("customer_churn_logistic_regression").getOrCreate()

churn = spark.read.load('../data/raw/churn.csv', format='csv', header=True,
                        inferSchema=True, sep=';')
print("Number of instances in Churn dataset: ", churn.count())
churn.show()

Number of instances in Churn dataset:  10000
+-----------+---------+------+---+------+--------+-------------+---------+--------------+---------------+------+
|CreditScore|Geography|Gender|Age|Tenure| Balance|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|
+-----------+---------+------+---+------+--------+-------------+---------+--------------+---------------+------+
|        619|   France|Female| 42|     2|       0|            1|        1|             1|       10134888|     1|
|        608|    Spain|Female| 41|     1| 8380786|            1|        0|             1|       11254258|     0|
|        502|   France|Female| 42|     8| 1596608|            3|        1|             0|       11393157|     1|
|        699|   France|Female| 39|     1|       0|            2|        0|             0|        9382663|     0|
|        850|    Spain|Female| 43|     2|12551082|            1|        1|             1|         790841|     0|
|        645|    Spain|  Male| 44|     8|11375578| 

In [3]:
indexer = StringIndexer(inputCols=['Geography', 'Gender'],
                        outputCols=['GeographyIndex', 'GenderIndex'])
churn = indexer.fit(churn).transform(churn)
churn = churn.drop('Geography', 'Gender')

assembler = VectorAssembler(inputCols=churn.columns, outputCol='corr_features')
churn_assembled = assembler.transform(churn).select('corr_features')

corr_matrix = Correlation.corr(churn_assembled, 'corr_features')
corr_matrix = corr_matrix.collect()[0][corr_matrix.columns[0]].toArray()
corr_matrix = spark.createDataFrame(corr_matrix.tolist(), churn.columns)
corr_matrix.select([round(c, 3).alias(c) for c in corr_matrix.columns]).show()

+-----------+------+------+-------+-------------+---------+--------------+---------------+------+--------------+-----------+
|CreditScore|   Age|Tenure|Balance|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|GeographyIndex|GenderIndex|
+-----------+------+------+-------+-------------+---------+--------------+---------------+------+--------------+-----------+
|        1.0|-0.004| 0.001|  0.007|        0.012|   -0.005|         0.026|         -0.001|-0.027|         0.008|      0.003|
|     -0.004|   1.0| -0.01|  0.022|       -0.031|   -0.012|         0.085|         -0.015| 0.285|         0.023|      0.028|
|      0.001| -0.01|   1.0| -0.017|        0.013|    0.023|        -0.028|          0.006|-0.014|         0.004|     -0.015|
|      0.007| 0.022|-0.017|    1.0|       -0.276|   -0.011|        -0.011|          0.006| 0.106|         0.063|     -0.007|
|      0.012|-0.031| 0.013| -0.276|          1.0|    0.003|          0.01|          0.014|-0.048|         0.004|      0.022|


In [4]:
r_formula = RFormula(formula="Exited ~ .")
churn_rf = r_formula.fit(churn).transform(churn)
churn_rf.select('features', 'label').show()

churn_train, churn_test = churn_rf.randomSplit([0.7, 0.3])
print("Number of training instances: ", churn_train.count())
print("Number of testing instances: ", churn_test.count())

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[619.0,42.0,2.0,0...|  1.0|
|[608.0,41.0,1.0,8...|  0.0|
|[502.0,42.0,8.0,1...|  1.0|
|[699.0,39.0,1.0,0...|  0.0|
|[850.0,43.0,2.0,1...|  0.0|
|[645.0,44.0,8.0,1...|  1.0|
|[822.0,50.0,7.0,0...|  0.0|
|[376.0,29.0,4.0,1...|  1.0|
|[501.0,44.0,4.0,1...|  0.0|
|[684.0,27.0,2.0,1...|  0.0|
|[528.0,31.0,6.0,1...|  0.0|
|[497.0,24.0,3.0,0...|  0.0|
|[476.0,34.0,10.0,...|  0.0|
|[549.0,25.0,5.0,0...|  0.0|
|[635.0,35.0,7.0,0...|  0.0|
|[616.0,45.0,3.0,1...|  0.0|
|[653.0,58.0,1.0,1...|  1.0|
|[549.0,24.0,9.0,0...|  0.0|
|[587.0,45.0,6.0,0...|  0.0|
|[726.0,24.0,6.0,0...|  0.0|
+--------------------+-----+
only showing top 20 rows

Number of training instances:  7060
Number of testing instances:  2940


In [5]:
logistic_regressor = LogisticRegression()
model = logistic_regressor.fit(churn_train)

summary = model.summary
print("Model evaluation on training set:")
print("Accuracy: ", summary.accuracy)
print("Weighted precision: ", summary.weightedPrecision)
print("Weighted recall: ", summary.weightedRecall)
print("Area under the ROC curve: ", summary.areaUnderROC)

Model evaluation on training set:
Accuracy:  0.8069405099150142
Weighted precision:  0.7756088657196047
Weighted recall:  0.8069405099150142
Area under the ROC curve:  0.7459954514721283


In [6]:
pred = model.transform(churn_test)
pred.select('label', 'prediction', 'probability', 'rawPrediction').show()

+-----+----------+--------------------+--------------------+
|label|prediction|         probability|       rawPrediction|
+-----+----------+--------------------+--------------------+
|  1.0|       0.0|[0.76089537978332...|[1.15759467347914...|
|  1.0|       0.0|[0.85115377827115...|[1.74367911120823...|
|  1.0|       0.0|[0.50435687007737...|[0.01742792141484...|
|  1.0|       0.0|[0.79201008820893...|[1.33708455185023...|
|  1.0|       1.0|[0.43623788809358...|[-0.2564446656718...|
|  1.0|       0.0|[0.86691386645957...|[1.87394308599051...|
|  1.0|       0.0|[0.55761817163612...|[0.23150107369509...|
|  1.0|       0.0|[0.91924697129875...|[2.43215935688786...|
|  0.0|       0.0|[0.86013186164403...|[1.81638559510099...|
|  0.0|       0.0|[0.94840109353636...|[2.91127702752562...|
|  0.0|       0.0|[0.85451119698297...|[1.77043047757829...|
|  0.0|       0.0|[0.92166265000472...|[2.46515476654000...|
|  1.0|       0.0|[0.62058712696447...|[0.49204102200414...|
|  0.0|       0.0|[0.832

In [7]:
binary_evaluator = BinaryClassificationEvaluator(metricName='areaUnderROC')
print("Model evaluation on testing set:")
print("Area under the ROC curve:", binary_evaluator.evaluate(pred))
binary_evaluator.setMetricName('areaUnderPR')
print("Area under the PR curve:", binary_evaluator.evaluate(pred))

multiclass_evaluator = MulticlassClassificationEvaluator(metricName='accuracy')
print("Accuracy:", multiclass_evaluator.evaluate(pred))
multiclass_evaluator.setMetricName('weightedPrecision')
print("Weighted precision:", multiclass_evaluator.evaluate(pred))
multiclass_evaluator.setMetricName('weightedRecall')
print("Weighted recall:", multiclass_evaluator.evaluate(pred))
multiclass_evaluator.setMetricName('f1')
print("F1-score:", multiclass_evaluator.evaluate(pred))

Model evaluation on testing set:
Area under the ROC curve: 0.7708847525830641
Area under the PR curve: 0.4567797771200694
Accuracy: 0.8136054421768707
Weighted precision: 0.7861094185988897
Weighted recall: 0.8136054421768707
F1-score: 0.7638129317450977


In [8]:
churn.groupBy('Exited').count().show()

+------+-----+
|Exited|count|
+------+-----+
|     1| 2037|
|     0| 7963|
+------+-----+



In [9]:
data_balancing_ratio = (churn.where(churn.Exited == 1).count()
                       /  churn.count())

calculate_weights = udf(lambda x: 1 * data_balancing_ratio if x == 0
                        else (1 * (1.0 - data_balancing_ratio)), DoubleType())
                        
weighted_churn = churn.withColumn('ClassWeightCol', calculate_weights('Exited'))
weighted_churn.show()

+-----------+---+------+--------+-------------+---------+--------------+---------------+------+--------------+-----------+--------------+
|CreditScore|Age|Tenure| Balance|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|GeographyIndex|GenderIndex|ClassWeightCol|
+-----------+---+------+--------+-------------+---------+--------------+---------------+------+--------------+-----------+--------------+
|        619| 42|     2|       0|            1|        1|             1|       10134888|     1|           0.0|        1.0|        0.7963|
|        608| 41|     1| 8380786|            1|        0|             1|       11254258|     0|           2.0|        1.0|        0.2037|
|        502| 42|     8| 1596608|            3|        1|             0|       11393157|     1|           0.0|        1.0|        0.7963|
|        699| 39|     1|       0|            2|        0|             0|        9382663|     0|           0.0|        1.0|        0.2037|
|        850| 43|     2|12551082| 

In [10]:
weighted_churn_rf = r_formula.fit(weighted_churn).transform(weighted_churn)
weighted_churn_rf.select('features', 'label').show()

weighted_churn_train, weighted_churn_test = weighted_churn_rf.randomSplit([0.7, 0.3])
print("Number of training instances: ", weighted_churn_train.count())
print("Number of testing instances: ", weighted_churn_test.count())

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[619.0,42.0,2.0,0...|  1.0|
|[608.0,41.0,1.0,8...|  0.0|
|[502.0,42.0,8.0,1...|  1.0|
|[699.0,39.0,1.0,0...|  0.0|
|[850.0,43.0,2.0,1...|  0.0|
|[645.0,44.0,8.0,1...|  1.0|
|[822.0,50.0,7.0,0...|  0.0|
|[376.0,29.0,4.0,1...|  1.0|
|[501.0,44.0,4.0,1...|  0.0|
|[684.0,27.0,2.0,1...|  0.0|
|[528.0,31.0,6.0,1...|  0.0|
|[497.0,24.0,3.0,0...|  0.0|
|[476.0,34.0,10.0,...|  0.0|
|[549.0,25.0,5.0,0...|  0.0|
|[635.0,35.0,7.0,0...|  0.0|
|[616.0,45.0,3.0,1...|  0.0|
|[653.0,58.0,1.0,1...|  1.0|
|[549.0,24.0,9.0,0...|  0.0|
|[587.0,45.0,6.0,0...|  0.0|
|[726.0,24.0,6.0,0...|  0.0|
+--------------------+-----+
only showing top 20 rows

Number of training instances:  7031
Number of testing instances:  2969


In [17]:
logistic_regressor.setWeightCol('ClassWeightCol')
model = logistic_regressor.fit(weighted_churn_train)

summary = model.summary
print("Model evaluation on training set:")
print("Accuracy: ", summary.accuracy)
print("Weighted precision: ", summary.weightedPrecision)
print("Weighted recall: ", summary.weightedRecall)
print("Area under the ROC curve: ", summary.areaUnderROC)

Model evaluation on training set:
Accuracy:  1.0
Weighted precision:  1.0
Weighted recall:  1.0
Area under the ROC curve:  0.999999631534641


In [18]:
pred = model.transform(weighted_churn_test)
pred.select('label', 'prediction', 'probability', 'rawPrediction').show()

+-----+----------+--------------------+--------------------+
|label|prediction|         probability|       rawPrediction|
+-----+----------+--------------------+--------------------+
|  1.0|       1.0|[6.82179031723183...|[-18.803143882860...|
|  1.0|       1.0|[5.34111826509706...|[-19.047830787617...|
|  1.0|       1.0|[4.48194548864739...|[-19.223208619442...|
|  1.0|       1.0|[6.70804737296714...|[-18.819957924142...|
|  1.0|       1.0|[5.11275581193719...|[-19.091527275151...|
|  1.0|       1.0|[1.02690900519965...|[-18.394127409194...|
|  1.0|       1.0|[4.04004907839789...|[-19.327008992888...|
|  1.0|       1.0|[1.77500155777367...|[-20.149464534625...|
|  0.0|       0.0|[0.99999999489189...|[19.0924382113034...|
|  0.0|       0.0|[0.99999999276799...|[18.7447494230222...|
|  0.0|       0.0|[0.99999999180315...|[18.6195169128548...|
|  0.0|       0.0|[0.99999999560734...|[19.2433312380174...|
|  0.0|       0.0|[0.99999999192771...|[18.6348297859478...|
|  1.0|       1.0|[5.321

In [19]:
binary_evaluator = BinaryClassificationEvaluator(metricName='areaUnderROC')
print("Model evaluation on testing set:")
print("Area under the ROC curve:", binary_evaluator.evaluate(pred))
binary_evaluator.setMetricName('areaUnderPR')
print("Area under the PR curve:", binary_evaluator.evaluate(pred))

multiclass_evaluator = MulticlassClassificationEvaluator(metricName='accuracy')
print("Accuracy:", multiclass_evaluator.evaluate(pred))
multiclass_evaluator.setMetricName('weightedPrecision')
print("Weighted precision:", multiclass_evaluator.evaluate(pred))
multiclass_evaluator.setMetricName('weightedRecall')
print("Weighted recall:", multiclass_evaluator.evaluate(pred))
multiclass_evaluator.setMetricName('f1')
print("F1-score:", multiclass_evaluator.evaluate(pred))

Model evaluation on testing set:
Area under the ROC curve: 0.9999996367730921
Area under the PR curve: 0.9999984903381642
Accuracy: 1.0
Weighted precision: 1.0
Weighted recall: 1.0
F1-score: 1.0
