In [0]:
import numpy as np
import pandas as pd
import findspark
findspark.init()

import pyspark
findspark.find()

from pyspark.sql import SparkSession
from pyspark.sql.functions import count

spark = (SparkSession
         .builder
         .appName("Customer Churn")
         .getOrCreate())

In [0]:
data = (spark.read
          .option("HEADER", True)
          .option("inferSchema", True)
          .csv("dbfs:/FileStore/shared_uploads/namle9220142015@gmail.com/Bank_Customer_Churn_Prediction.csv")
         )

In [0]:
data.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- credit_score: integer (nullable = true)
 |-- country: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- tenure: integer (nullable = true)
 |-- balance: double (nullable = true)
 |-- products_number: integer (nullable = true)
 |-- credit_card: integer (nullable = true)
 |-- active_member: integer (nullable = true)
 |-- estimated_salary: double (nullable = true)
 |-- churn: integer (nullable = true)



In [0]:
data.toPandas().head()

Unnamed: 0,customer_id,credit_score,country,gender,age,tenure,balance,products_number,credit_card,active_member,estimated_salary,churn
0,15634602,619,France,Female,42,2,0.0,1,1,1,101348.88,1
1,15647311,608,Spain,Female,41,1,83807.86,1,0,1,112542.58,0
2,15619304,502,France,Female,42,8,159660.8,3,1,0,113931.57,1
3,15701354,699,France,Female,39,1,0.0,2,0,0,93826.63,0
4,15737888,850,Spain,Female,43,2,125510.82,1,1,1,79084.1,0


In [0]:
data.toPandas().info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10000 entries, 0 to 9999
Data columns (total 12 columns):
 #   Column            Non-Null Count  Dtype  
---  ------            --------------  -----  
 0   customer_id       10000 non-null  int32  
 1   credit_score      10000 non-null  int32  
 2   country           10000 non-null  object 
 3   gender            10000 non-null  object 
 4   age               10000 non-null  int32  
 5   tenure            10000 non-null  int32  
 6   balance           10000 non-null  float64
 7   products_number   10000 non-null  int32  
 8   credit_card       10000 non-null  int32  
 9   active_member     10000 non-null  int32  
 10  estimated_salary  10000 non-null  float64
 11  churn             10000 non-null  int32  
dtypes: float64(2), int32(8), object(2)
memory usage: 625.1+ KB


In [0]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression,RandomForestClassifier,GBTClassifier
import pyspark.sql.functions as F

In [0]:
## Bỏ đi cột customer_id trong tập dữ liệu không cần thiết cho dự đoán
data = data.drop('customer_id')
data.show(5)

+------------+-------+------+---+------+---------+---------------+-----------+-------------+----------------+-----+
|credit_score|country|gender|age|tenure|  balance|products_number|credit_card|active_member|estimated_salary|churn|
+------------+-------+------+---+------+---------+---------------+-----------+-------------+----------------+-----+
|         619| France|Female| 42|     2|      0.0|              1|          1|            1|       101348.88|    1|
|         608|  Spain|Female| 41|     1| 83807.86|              1|          0|            1|       112542.58|    0|
|         502| France|Female| 42|     8| 159660.8|              3|          1|            0|       113931.57|    1|
|         699| France|Female| 39|     1|      0.0|              2|          0|            0|        93826.63|    0|
|         850|  Spain|Female| 43|     2|125510.82|              1|          1|            1|         79084.1|    0|
+------------+-------+------+---+------+---------+---------------+------

In [0]:
## Biến đổi cột country kiểu string thành kiểu float
countryEncoder = StringIndexer(inputCol='country',outputCol='countryIndex').fit(data)
data = countryEncoder.transform(data)
data.show(5)

+------------+-------+------+---+------+---------+---------------+-----------+-------------+----------------+-----+------------+
|credit_score|country|gender|age|tenure|  balance|products_number|credit_card|active_member|estimated_salary|churn|countryIndex|
+------------+-------+------+---+------+---------+---------------+-----------+-------------+----------------+-----+------------+
|         619| France|Female| 42|     2|      0.0|              1|          1|            1|       101348.88|    1|         0.0|
|         608|  Spain|Female| 41|     1| 83807.86|              1|          0|            1|       112542.58|    0|         2.0|
|         502| France|Female| 42|     8| 159660.8|              3|          1|            0|       113931.57|    1|         0.0|
|         699| France|Female| 39|     1|      0.0|              2|          0|            0|        93826.63|    0|         0.0|
|         850|  Spain|Female| 43|     2|125510.82|              1|          1|            1|     

In [0]:
## Biến đổi cột gender kiểu string thành kiểu float
genderEncoder = StringIndexer(inputCol='gender',outputCol='genderIndex').fit(data)
data = genderEncoder.transform(data)
data.show(5)

+------------+-------+------+---+------+---------+---------------+-----------+-------------+----------------+-----+------------+-----------+
|credit_score|country|gender|age|tenure|  balance|products_number|credit_card|active_member|estimated_salary|churn|countryIndex|genderIndex|
+------------+-------+------+---+------+---------+---------------+-----------+-------------+----------------+-----+------------+-----------+
|         619| France|Female| 42|     2|      0.0|              1|          1|            1|       101348.88|    1|         0.0|        1.0|
|         608|  Spain|Female| 41|     1| 83807.86|              1|          0|            1|       112542.58|    0|         2.0|        1.0|
|         502| France|Female| 42|     8| 159660.8|              3|          1|            0|       113931.57|    1|         0.0|        1.0|
|         699| France|Female| 39|     1|      0.0|              2|          0|            0|        93826.63|    0|         0.0|        1.0|
|         850

In [0]:
data=data.drop('country','gender')
data.show(5)

+------------+---+------+---------+---------------+-----------+-------------+----------------+-----+------------+-----------+
|credit_score|age|tenure|  balance|products_number|credit_card|active_member|estimated_salary|churn|countryIndex|genderIndex|
+------------+---+------+---------+---------------+-----------+-------------+----------------+-----+------------+-----------+
|         619| 42|     2|      0.0|              1|          1|            1|       101348.88|    1|         0.0|        1.0|
|         608| 41|     1| 83807.86|              1|          0|            1|       112542.58|    0|         2.0|        1.0|
|         502| 42|     8| 159660.8|              3|          1|            0|       113931.57|    1|         0.0|        1.0|
|         699| 39|     1|      0.0|              2|          0|            0|        93826.63|    0|         0.0|        1.0|
|         850| 43|     2|125510.82|              1|          1|            1|         79084.1|    0|         2.0|     

In [0]:
## Đếm số lượng churn giữa 0 và 1 cho thấy tập dữ liệu đang bị imbalance.
data.groupby('churn').count().show()

+-----+-----+
|churn|count|
+-----+-----+
|    1| 2037|
|    0| 7963|
+-----+-----+



In [0]:
## Để giải quyết vấn đề imbalance của tập dữ liệu ta sẽ sử dụng phương pháp oversampling

# Tính toán độ lệch
major_df = data.filter(data.churn == 0)
minor_df = data.filter(data.churn == 1)
ratio = int(major_df.count()/minor_df.count())
print("ratio: {}".format(ratio))
a = range(ratio)

# sao chép các hàng có ít dữ liệu hơn
oversampled_df = minor_df.withColumn("dummy", F.explode(F.array([F.lit(x) for x in a]))).drop('dummy')

# kết hợp các hàng ít hơn mới được overspamling với các hàng nhiều hơn đã có trước đó
oversampled_df = major_df.unionAll(oversampled_df)

ratio: 3


In [0]:
## So sánh lại tỉ lệ giữa 0 và 1
oversampled_df.groupby('churn').count().show()

+-----+-----+
|churn|count|
+-----+-----+
|    0| 7963|
|    1| 6111|
+-----+-----+



In [0]:
# undersampled_df = data.sampleBy('Churn', fractions={0: data.select("Churn").where('Churn == 1').count()/
#                                                           data.select("Churn").where('Churn == 0').count(), 1: 1.0}, seed = 5323)

# undersampled_df.groupby('Churn').count().show()

In [0]:
## Tạo ra các đặc trưng cần thiết cho thuật toán
required_features = ['credit_score','age', 'tenure', 'balance', 'products_number', 'credit_card', 'active_member', 'estimated_salary', 'countryIndex', 'genderIndex']

In [0]:
## Biến đổi các cột trong required_features theo định dạng của spark
vec_assembler = VectorAssembler(inputCols=required_features,outputCol='features')
vecOver_df = vec_assembler.transform(oversampled_df)

In [0]:
# Chia tập dữ liệu thành 2 phần train và test với tỉ lệ train 80% và test 20%
(trainDF, testDF) = vecOver_df.randomSplit([.8, .2], seed = 1)

In [0]:
## Tạo mô hình logistic regression
lr = LogisticRegression(featuresCol='features',labelCol='churn')
lr_model = lr.fit(trainDF)

In [0]:
## Áp dụng mô hình trên dự đoán cho tập test
y_pred = lr_model.transform(testDF)

In [0]:
y_pred.select('churn','rawPrediction', 'probability', 'prediction').show()

+-----+--------------------+--------------------+----------+
|churn|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+----------+
|    0|[2.02878243740886...|[0.88378608264049...|       0.0|
|    0|[1.36266244982814...|[0.79619207484360...|       0.0|
|    0|[-0.5499410245983...|[0.36587809184138...|       1.0|
|    0|[1.49486103203548...|[0.81680676613727...|       0.0|
|    0|[0.72132524703704...|[0.67289877871811...|       0.0|
|    0|[0.53412562783253...|[0.63044483404448...|       0.0|
|    0|[-0.2854243169639...|[0.42912443723057...|       1.0|
|    0|[1.77388628472547...|[0.85494030305223...|       0.0|
|    0|[0.00138575842060...|[0.50034643954971...|       0.0|
|    0|[0.78790679446551...|[0.68738170214865...|       0.0|
|    0|[1.19691819682730...|[0.76797609421907...|       0.0|
|    0|[0.06211370864470...|[0.51552343655129...|       0.0|
|    0|[-0.2207941740016...|[0.44502461306534...|       1.0|
|    0|[0.12055976563168

In [0]:
y_pred.select("prediction", "churn").show(5)

+----------+-----+
|prediction|churn|
+----------+-----+
|       0.0|    0|
|       0.0|    0|
|       1.0|    0|
|       0.0|    0|
|       0.0|    0|
+----------+-----+
only showing top 5 rows



In [0]:
## Đánh giá mô hình dựa trên độ đo accuracy

from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

multi_evaluator = MulticlassClassificationEvaluator(labelCol='churn',metricName='accuracy')
multi_evaluator.evaluate(y_pred)


Out[23]: 0.7044711014176663

## Tạo ML pipeline và đánh giá dùng phương pháp cross validation

In [0]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

import pprint

pp = pprint.PrettyPrinter(indent = 4)

# Create a LogisticRegression instance. This instance is an Estimator.
logit = LogisticRegression(featuresCol = "features", labelCol = "churn")

# Define assembler
required_features = ['credit_score','age', 'tenure', 'balance', 'products_number', 'credit_card', 'active_member', 'estimated_salary', 'countryIndex', 'genderIndex']
assembler = VectorAssembler(inputCols=required_features,outputCol='features')

# Configure an ML pipeline, which consists of two stages: indexer, assembler, and logit.
pipeline = Pipeline(stages = [assembler, logit])

# Specify evaluator
evaluator = MulticlassClassificationEvaluator(
    labelCol = "churn", 
    predictionCol = "prediction",
    metricName = "accuracy"
)

# Specify parameters
paramGrid = (ParamGridBuilder()
            .addGrid(logit.regParam , [0.01, 0.1, 1])
            .build())

# Train/test split
(trainDF, testDF) = vecOver_df.randomSplit([.8, .2], seed = 1)

# Setup CrossValidator 
# A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
cv = CrossValidator(estimator = logit, 
                    evaluator = evaluator, 
                    estimatorParamMaps = paramGrid, 
                    numFolds = 3, 
                    parallelism = 2, 
                    seed = 1)

# Run cross-validation on training data, and choose the best set of parameters
logitModel = cv.fit(trainDF)

# Make predictions on test data. logitModel uses the best model found (regParam = 0.01)
prediction = logitModel.transform(testDF)
result = prediction.select("features", "churn", "prediction").collect()

# Print some predictions
for row in result[0:5]:
    pp.pprint("features=%s, churn=%s -> prediction=%s" % 
              (row.features, row.churn, row.prediction))

accuracy = evaluator.evaluate(prediction)

print("Test Error = %g" % (1.0 - accuracy))

CrossValidatorModel_27e3da6b218f
('features=[411.0,29.0,0.0,59697.17,2.0,1.0,1.0,53483.21,0.0,0.0], churn=0 -> '
 'prediction=0.0')
('features=[415.0,32.0,5.0,145807.59,1.0,1.0,1.0,3064.65,0.0,0.0], churn=0 -> '
 'prediction=0.0')
('features=[415.0,53.0,5.0,167259.44,1.0,1.0,1.0,22357.25,2.0,0.0], churn=0 -> '
 'prediction=1.0')
('features=[416.0,25.0,0.0,97738.97,2.0,1.0,1.0,160523.33,1.0,1.0], churn=0 -> '
 'prediction=0.0')
('features=[416.0,35.0,8.0,0.0,1.0,0.0,0.0,119712.78,2.0,0.0], churn=0 -> '
 'prediction=0.0')
Test Error = 0.294075


In [0]:
logitModel.bestModel.getRegParam()

Out[56]: 0.01

In [0]:
print(accuracy)

0.7059251181388586


## Hết cross

In [0]:
rf = RandomForestClassifier(labelCol="churn", featuresCol="features")
model_rf = rf.fit(trainDF)
results = model_rf.transform(testDF)
multi_evaluator = MulticlassClassificationEvaluator(labelCol='churn',metricName='accuracy')
multi_evaluator.evaluate(results)

Out[28]: 0.7739003998545984

In [0]:
gbt = GBTClassifier(labelCol="churn", featuresCol="features" , maxIter=10 , maxDepth=20)
model_gbt = gbt.fit(trainDF)
results_gbt = model_gbt.transform(testDF)
multi_evaluator.evaluate(results_gbt)


Out[102]: 0.9014903671392221

In [0]:

multi_evaluator = MulticlassClassificationEvaluator(labelCol='churn',metricName='f1')
multi_evaluator.evaluate(results)


Out[108]: 0.8337718191764891

In [0]:
rf = RandomForestClassifier(labelCol="churn", featuresCol="features")
model_rf = rf.fit(train)
results = model_rf.transform(test)
multi_evaluator = MulticlassClassificationEvaluator(labelCol='churn',metricName='f1')
multi_evaluator.evaluate(results)

Out[105]: 0.8337718191764891