# Customer Churn

Churn estimation is of great importance for companies. According to McKinsey's research, fast-growing companies are expected to have low churn rates in order to maintain their profitability in a stable manner.

# Dataset

Surname: corresponds to the record (row) number and has no effect on the output.

CreditScore: contains random values and has no effect on customer leaving the bank.

Geography: a customer’s location can affect their decision to leave the bank.

Gender: it’s interesting to explore whether gender plays a role in a customer leaving the bank.

Age: this is certainly relevant, since older customers are less likely to leave their bank than younger ones.

Tenure: refers to the number of years that the customer has been a client of the bank. Normally, older clients are more loyal and less likely to leave a bank.

NumOfProducts: refers to the number of products that a customer has purchased through the bank.

HasCrCard: denotes whether or not a customer has a credit card. This column is also relevant, since people with a credit card are less likely to leave the bank.

IsActiveMember: active customers are less likely to leave the bank.

EstimatedSalary: as with balance, people with lower salaries are more likely to leave the bank compared to those with higher salaries.

Exited: (Dependent Variable): whether or not the customer left the bank.

Balance: also a very good indicator of customer churn, as people with a higher balance in their accounts are less likely to leave the bank compared to those with lower balances.

# Installation of Required Libraries for Spark

In [None]:
# installation required for Spark
#!pip install sparkmagic
#!pip install pyspark

# Importing Required Libraries

In [2]:
# libraries
import warnings
# import findspark
import pandas as pd
import seaborn as sns
from pyspark.ml.classification import GBTClassifier, LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder, StandardScaler
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql import SparkSession
from pyspark.ml.feature import Bucketizer

warnings.filterwarnings("ignore", category=DeprecationWarning)
warnings.filterwarnings("ignore", category=FutureWarning)

pd.set_option('display.max_columns', None)
pd.set_option('display.float_format', lambda x: '%.2f' % x)

# Creating A Spark Session

In [3]:
spark = SparkSession.builder.master("local[*]").getOrCreate()

# Exploratory Data Analysis

In [16]:
spark_df = spark.read.csv('/content/churn.csv', inferSchema=True, header=True)
spark_df.show(10)

+---------+----------+--------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+
|RowNumber|CustomerId| Surname|CreditScore|Geography|Gender|Age|Tenure|  Balance|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|
+---------+----------+--------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+
|        1|  15634602|Hargrave|        619|   France|Female| 42|     2|      0.0|            1|        1|             1|      101348.88|     1|
|        2|  15647311|    Hill|        608|    Spain|Female| 41|     1| 83807.86|            1|        0|             1|      112542.58|     0|
|        3|  15619304|    Onio|        502|   France|Female| 42|     8| 159660.8|            3|        1|             0|      113931.57|     1|
|        4|  15701354|    Boni|        699|   France|Female| 39|     1|      0.0|            2|        0|             0|       93826.63|

In [17]:
print("Shape: ", (spark_df.count(), len(spark_df.columns)))

Shape:  (10000, 14)


In [18]:
spark_df.printSchema()

root
 |-- RowNumber: integer (nullable = true)
 |-- CustomerId: integer (nullable = true)
 |-- Surname: string (nullable = true)
 |-- CreditScore: integer (nullable = true)
 |-- Geography: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tenure: integer (nullable = true)
 |-- Balance: double (nullable = true)
 |-- NumOfProducts: integer (nullable = true)
 |-- HasCrCard: integer (nullable = true)
 |-- IsActiveMember: integer (nullable = true)
 |-- EstimatedSalary: double (nullable = true)
 |-- Exited: integer (nullable = true)



In [19]:
spark_df = spark_df.toDF(*[c.lower() for c in spark_df.columns])
spark_df.show(5)

+---------+----------+--------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+
|rownumber|customerid| surname|creditscore|geography|gender|age|tenure|  balance|numofproducts|hascrcard|isactivemember|estimatedsalary|exited|
+---------+----------+--------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+
|        1|  15634602|Hargrave|        619|   France|Female| 42|     2|      0.0|            1|        1|             1|      101348.88|     1|
|        2|  15647311|    Hill|        608|    Spain|Female| 41|     1| 83807.86|            1|        0|             1|      112542.58|     0|
|        3|  15619304|    Onio|        502|   France|Female| 42|     8| 159660.8|            3|        1|             0|      113931.57|     1|
|        4|  15701354|    Boni|        699|   France|Female| 39|     1|      0.0|            2|        0|             0|       93826.63|

In [20]:
spark_df.describe().show()

+-------+------------------+-----------------+-------+-----------------+---------+------+------------------+------------------+-----------------+------------------+-------------------+-------------------+-----------------+-------------------+
|summary|         rownumber|       customerid|surname|      creditscore|geography|gender|               age|            tenure|          balance|     numofproducts|          hascrcard|     isactivemember|  estimatedsalary|             exited|
+-------+------------------+-----------------+-------+-----------------+---------+------+------------------+------------------+-----------------+------------------+-------------------+-------------------+-----------------+-------------------+
|  count|             10000|            10000|  10000|            10000|    10000| 10000|             10000|             10000|            10000|             10000|              10000|              10000|            10000|              10000|
|   mean|            5000.5|

In [21]:
spark_df.describe(["age", "exited"]).show()

+-------+------------------+-------------------+
|summary|               age|             exited|
+-------+------------------+-------------------+
|  count|             10000|              10000|
|   mean|           38.9218|             0.2037|
| stddev|10.487806451704587|0.40276858399486065|
|    min|                18|                  0|
|    max|                92|                  1|
+-------+------------------+-------------------+



In [22]:
spark_df.groupby("exited").count().show()

+------+-----+
|exited|count|
+------+-----+
|     1| 2037|
|     0| 7963|
+------+-----+



In [23]:
spark_df.select("exited").distinct().show()

+------+
|exited|
+------+
|     1|
|     0|
+------+



In [24]:
spark_df.dtypes

[('rownumber', 'int'),
 ('customerid', 'int'),
 ('surname', 'string'),
 ('creditscore', 'int'),
 ('geography', 'string'),
 ('gender', 'string'),
 ('age', 'int'),
 ('tenure', 'int'),
 ('balance', 'double'),
 ('numofproducts', 'int'),
 ('hascrcard', 'int'),
 ('isactivemember', 'int'),
 ('estimatedsalary', 'double'),
 ('exited', 'int')]

In [25]:
num_cols = [col[0] for col in spark_df.dtypes if col[1] != 'string']
spark_df.select(num_cols).describe().show()

+-------+------------------+-----------------+-----------------+------------------+------------------+-----------------+------------------+-------------------+-------------------+-----------------+-------------------+
|summary|         rownumber|       customerid|      creditscore|               age|            tenure|          balance|     numofproducts|          hascrcard|     isactivemember|  estimatedsalary|             exited|
+-------+------------------+-----------------+-----------------+------------------+------------------+-----------------+------------------+-------------------+-------------------+-----------------+-------------------+
|  count|             10000|            10000|            10000|             10000|             10000|            10000|             10000|              10000|              10000|            10000|              10000|
|   mean|            5000.5|  1.56909405694E7|         650.5288|           38.9218|            5.0128|76485.88928799961|        

In [26]:
cat_cols = [col[0] for col in spark_df.dtypes if col[1] == 'string']

In [27]:
for col in [col.lower() for col in num_cols]:
    spark_df.groupby("exited").agg({col: "mean"}).show()

+------+-----------------+
|exited|   avg(rownumber)|
+------+-----------------+
|     1|4905.917525773196|
|     0|5024.694964209469|
+------+-----------------+

+------+--------------------+
|exited|     avg(customerid)|
+------+--------------------+
|     1|1.5690051964653904E7|
|     0|1.5691167881702876E7|
+------+--------------------+

+------+-----------------+
|exited| avg(creditscore)|
+------+-----------------+
|     1|645.3514972999509|
|     0|651.8531960316463|
+------+-----------------+

+------+-----------------+
|exited|         avg(age)|
+------+-----------------+
|     1| 44.8379970544919|
|     0|37.40838879819164|
+------+-----------------+

+------+-----------------+
|exited|      avg(tenure)|
+------+-----------------+
|     1|4.932744231713304|
|     0|5.033278914981791|
+------+-----------------+

+------+-----------------+
|exited|     avg(balance)|
+------+-----------------+
|     1|91108.53933726063|
|     0|72745.29677885193|
+------+-----------------+

+---

# Data Preprocessing & Feature Engineering

In [30]:
from pyspark.sql.functions import when, count, col
# checking null values
spark_df.select([count(when(col(c).isNull(),c)).alias(c) for c in spark_df.columns]).toPandas().T

Unnamed: 0,0
rownumber,0
customerid,0
surname,0
creditscore,0
geography,0
gender,0
age,0
tenure,0
balance,0
numofproducts,0


In [31]:
spark_df.show(5)

+---------+----------+--------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+
|rownumber|customerid| surname|creditscore|geography|gender|age|tenure|  balance|numofproducts|hascrcard|isactivemember|estimatedsalary|exited|
+---------+----------+--------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+
|        1|  15634602|Hargrave|        619|   France|Female| 42|     2|      0.0|            1|        1|             1|      101348.88|     1|
|        2|  15647311|    Hill|        608|    Spain|Female| 41|     1| 83807.86|            1|        0|             1|      112542.58|     0|
|        3|  15619304|    Onio|        502|   France|Female| 42|     8| 159660.8|            3|        1|             0|      113931.57|     1|
|        4|  15701354|    Boni|        699|   France|Female| 39|     1|      0.0|            2|        0|             0|       93826.63|

In [32]:
spark_df = spark_df.drop('rownumber', "customerid", "surname")

**Feature Interaction**

In [33]:
spark_df = spark_df.withColumn('creditscore_salary', spark_df.creditscore / spark_df.estimatedsalary)
spark_df = spark_df.withColumn('creditscore_tenure', spark_df.creditscore * spark_df.tenure)
spark_df = spark_df.withColumn('balance_salary', spark_df.balance / spark_df.estimatedsalary)
spark_df.show(5)

+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+--------------------+------------------+------------------+
|creditscore|geography|gender|age|tenure|  balance|numofproducts|hascrcard|isactivemember|estimatedsalary|exited|  creditscore_salary|creditscore_tenure|    balance_salary|
+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+--------------------+------------------+------------------+
|        619|   France|Female| 42|     2|      0.0|            1|        1|             1|      101348.88|     1|0.006107615594765329|              1238|               0.0|
|        608|    Spain|Female| 41|     1| 83807.86|            1|        0|             1|      112542.58|     0|0.005402399696186101|               608|0.7446769036217226|
|        502|   France|Female| 42|     8| 159660.8|            3|        1|             0|      113931.57|     1|0.004406153623618106| 

**Bucketization / Bining / Num to Cat**

In [34]:
# age variable
spark_df.select('age').describe().toPandas().transpose()
spark_df.select("age").summary("count", "min", "25%", "50%","75%", "max").show()
bucketizer = Bucketizer(splits=[0, 35, 55, 75, 95], inputCol="age", outputCol="age_cat")
spark_df = bucketizer.setHandleInvalid("keep").transform(spark_df)
spark_df = spark_df.withColumn('age_cat', spark_df.age_cat + 1)

+-------+-----+
|summary|  age|
+-------+-----+
|  count|10000|
|    min|   18|
|    25%|   32|
|    50%|   37|
|    75%|   44|
|    max|   92|
+-------+-----+



In [37]:
spark_df.select('age','age_cat').show(10)

+---+-------+
|age|age_cat|
+---+-------+
| 42|    2.0|
| 41|    2.0|
| 42|    2.0|
| 39|    2.0|
| 43|    2.0|
| 44|    2.0|
| 50|    2.0|
| 29|    1.0|
| 44|    2.0|
| 27|    1.0|
+---+-------+
only showing top 10 rows



**Converting float values to integer**

In [38]:
spark_df = spark_df.withColumn("age_cat", spark_df["age_cat"].cast("integer"))

In [39]:
spark_df.show(5)

+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+--------------------+------------------+------------------+-------+
|creditscore|geography|gender|age|tenure|  balance|numofproducts|hascrcard|isactivemember|estimatedsalary|exited|  creditscore_salary|creditscore_tenure|    balance_salary|age_cat|
+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+--------------------+------------------+------------------+-------+
|        619|   France|Female| 42|     2|      0.0|            1|        1|             1|      101348.88|     1|0.006107615594765329|              1238|               0.0|      2|
|        608|    Spain|Female| 41|     1| 83807.86|            1|        0|             1|      112542.58|     0|0.005402399696186101|               608|0.7446769036217226|      2|
|        502|   France|Female| 42|     8| 159660.8|            3|        1|             0|     

In [40]:
spark_df.withColumn('creditscore_group',
                    when(spark_df['creditscore'] < 301, "deep").
                    when((301 < spark_df['creditscore']) & (spark_df['creditscore'] < 601), "very poor").
                    when((500 < spark_df['creditscore']) & (spark_df['creditscore'] < 601), "poor").
                    when((601 < spark_df['creditscore']) & (spark_df['creditscore'] < 661), "fair").
                    when((661 < spark_df['creditscore']) & (spark_df['creditscore'] < 781), "good").
                    when((781 < spark_df['creditscore']) & (spark_df['creditscore'] < 851), "excellent").
                    otherwise("top")).show()

+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+--------------------+------------------+------------------+-------+-----------------+
|creditscore|geography|gender|age|tenure|  balance|numofproducts|hascrcard|isactivemember|estimatedsalary|exited|  creditscore_salary|creditscore_tenure|    balance_salary|age_cat|creditscore_group|
+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+--------------------+------------------+------------------+-------+-----------------+
|        619|   France|Female| 42|     2|      0.0|            1|        1|             1|      101348.88|     1|0.006107615594765329|              1238|               0.0|      2|             fair|
|        608|    Spain|Female| 41|     1| 83807.86|            1|        0|             1|      112542.58|     0|0.005402399696186101|               608|0.7446769036217226|      2|             fair|
|    

In [41]:
from pyspark.sql.types import IntegerType, StringType, FloatType
from pyspark.sql.functions import udf

# Function with UDF
def segment(tenure):
    if tenure < 5:
        return "segment_b"
    else:
        return "segment_a"
# Convert a Python function to PySpark UDF
func_udf = udf(segment, StringType())
spark_df = spark_df.withColumn('segment', func_udf(spark_df['tenure']))
spark_df.show(5)

+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+--------------------+------------------+------------------+-------+---------+
|creditscore|geography|gender|age|tenure|  balance|numofproducts|hascrcard|isactivemember|estimatedsalary|exited|  creditscore_salary|creditscore_tenure|    balance_salary|age_cat|  segment|
+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+--------------------+------------------+------------------+-------+---------+
|        619|   France|Female| 42|     2|      0.0|            1|        1|             1|      101348.88|     1|0.006107615594765329|              1238|               0.0|      2|segment_b|
|        608|    Spain|Female| 41|     1| 83807.86|            1|        0|             1|      112542.58|     0|0.005402399696186101|               608|0.7446769036217226|      2|segment_b|
|        502|   France|Female| 42|     8| 159

In [42]:
indexer = StringIndexer(inputCol="segment", outputCol="segment_label")
indexer.fit(spark_df).transform(spark_df).show(5)
temp_sdf = indexer.fit(spark_df).transform(spark_df)
spark_df = temp_sdf.withColumn("segment_label", temp_sdf["segment_label"].cast("integer"))
spark_df = spark_df.drop('segment')

+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+--------------------+------------------+------------------+-------+---------+-------------+
|creditscore|geography|gender|age|tenure|  balance|numofproducts|hascrcard|isactivemember|estimatedsalary|exited|  creditscore_salary|creditscore_tenure|    balance_salary|age_cat|  segment|segment_label|
+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+--------------------+------------------+------------------+-------+---------+-------------+
|        619|   France|Female| 42|     2|      0.0|            1|        1|             1|      101348.88|     1|0.006107615594765329|              1238|               0.0|      2|segment_b|          1.0|
|        608|    Spain|Female| 41|     1| 83807.86|            1|        0|             1|      112542.58|     0|0.005402399696186101|               608|0.7446769036217226|      2|

In [47]:
spark_df.printSchema()

root
 |-- creditscore: integer (nullable = true)
 |-- geography: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- tenure: integer (nullable = true)
 |-- balance: double (nullable = true)
 |-- numofproducts: integer (nullable = true)
 |-- hascrcard: integer (nullable = true)
 |-- isactivemember: integer (nullable = true)
 |-- estimatedsalary: double (nullable = true)
 |-- exited: integer (nullable = true)
 |-- creditscore_salary: double (nullable = true)
 |-- creditscore_tenure: integer (nullable = true)
 |-- balance_salary: double (nullable = true)
 |-- age_cat: integer (nullable = true)
 |-- segment_label: integer (nullable = true)



In [48]:
indexer = StringIndexer(inputCol="gender", outputCol="gender_label")
indexer.fit(spark_df).transform(spark_df).show(5)
temp_sdf = indexer.fit(spark_df).transform(spark_df)
spark_df = temp_sdf.withColumn("gender_label", temp_sdf["gender_label"].cast("integer"))
spark_df = spark_df.drop('gender')

+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+--------------------+------------------+------------------+-------+-------------+------------+
|creditscore|geography|gender|age|tenure|  balance|numofproducts|hascrcard|isactivemember|estimatedsalary|exited|  creditscore_salary|creditscore_tenure|    balance_salary|age_cat|segment_label|gender_label|
+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+--------------------+------------------+------------------+-------+-------------+------------+
|        619|   France|Female| 42|     2|      0.0|            1|        1|             1|      101348.88|     1|0.006107615594765329|              1238|               0.0|      2|            1|         1.0|
|        608|    Spain|Female| 41|     1| 83807.86|            1|        0|             1|      112542.58|     0|0.005402399696186101|               608|0.7446769036217

In [49]:
indexer = StringIndexer(inputCol="geography", outputCol="geography_label")
indexer.fit(spark_df).transform(spark_df).show(5)
temp_sdf = indexer.fit(spark_df).transform(spark_df)
spark_df = temp_sdf.withColumn("geography_label", temp_sdf["geography_label"].cast("integer"))
spark_df = spark_df.drop('geography')

+-----------+---------+---+------+---------+-------------+---------+--------------+---------------+------+--------------------+------------------+------------------+-------+-------------+------------+---------------+
|creditscore|geography|age|tenure|  balance|numofproducts|hascrcard|isactivemember|estimatedsalary|exited|  creditscore_salary|creditscore_tenure|    balance_salary|age_cat|segment_label|gender_label|geography_label|
+-----------+---------+---+------+---------+-------------+---------+--------------+---------------+------+--------------------+------------------+------------------+-------+-------------+------------+---------------+
|        619|   France| 42|     2|      0.0|            1|        1|             1|      101348.88|     1|0.006107615594765329|              1238|               0.0|      2|            1|           1|            0.0|
|        608|    Spain| 41|     1| 83807.86|            1|        0|             1|      112542.58|     0|0.005402399696186101|     

**One Hot Encoding**

In [50]:
encoder = OneHotEncoder(inputCols=["age_cat", "geography_label"], outputCols=["age_cat_ohe", "geography_label_ohe"])
spark_df = encoder.fit(spark_df).transform(spark_df)

# Defining Target Column

In [51]:
stringIndexer = StringIndexer(inputCol='exited', outputCol='label')

temp_sdf = stringIndexer.fit(spark_df).transform(spark_df)
temp_sdf.show()

+-----------+---+------+---------+-------------+---------+--------------+---------------+------+--------------------+------------------+------------------+-------+-------------+------------+---------------+-------------+-------------------+-----+
|creditscore|age|tenure|  balance|numofproducts|hascrcard|isactivemember|estimatedsalary|exited|  creditscore_salary|creditscore_tenure|    balance_salary|age_cat|segment_label|gender_label|geography_label|  age_cat_ohe|geography_label_ohe|label|
+-----------+---+------+---------+-------------+---------+--------------+---------------+------+--------------------+------------------+------------------+-------+-------------+------------+---------------+-------------+-------------------+-----+
|        619| 42|     2|      0.0|            1|        1|             1|      101348.88|     1|0.006107615594765329|              1238|               0.0|      2|            1|           1|              0|(4,[2],[1.0])|      (2,[0],[1.0])|  1.0|
|        608

In [52]:
spark_df = temp_sdf.withColumn("label", temp_sdf["label"].cast("integer"))
spark_df.show(5)

+-----------+---+------+---------+-------------+---------+--------------+---------------+------+--------------------+------------------+------------------+-------+-------------+------------+---------------+-------------+-------------------+-----+
|creditscore|age|tenure|  balance|numofproducts|hascrcard|isactivemember|estimatedsalary|exited|  creditscore_salary|creditscore_tenure|    balance_salary|age_cat|segment_label|gender_label|geography_label|  age_cat_ohe|geography_label_ohe|label|
+-----------+---+------+---------+-------------+---------+--------------+---------------+------+--------------------+------------------+------------------+-------+-------------+------------+---------------+-------------+-------------------+-----+
|        619| 42|     2|      0.0|            1|        1|             1|      101348.88|     1|0.006107615594765329|              1238|               0.0|      2|            1|           1|              0|(4,[2],[1.0])|      (2,[0],[1.0])|    1|
|        608

In [53]:
cols = ['creditscore', 'age', 'tenure', 'balance','numofproducts', 'hascrcard',
        'isactivemember', 'estimatedsalary', 'creditscore_salary', 'creditscore_tenure',
        'balance_salary', 'segment_label', 'gender_label',
        'age_cat_ohe', 'geography_label_ohe']

# Vectorize independent variables

**VectorAssembler** is a transformer that combines a given list of columns into a single vector column. </br> 
It is useful for combining raw features and features generated by different feature transformers into a single feature vector, in order to train ML models like logistic regression and decision trees.

In [55]:
va = VectorAssembler(inputCols=cols, outputCol="features")
va_df = va.transform(spark_df)
va_df.show()

+-----------+---+------+---------+-------------+---------+--------------+---------------+------+--------------------+------------------+------------------+-------+-------------+------------+---------------+-------------+-------------------+-----+--------------------+
|creditscore|age|tenure|  balance|numofproducts|hascrcard|isactivemember|estimatedsalary|exited|  creditscore_salary|creditscore_tenure|    balance_salary|age_cat|segment_label|gender_label|geography_label|  age_cat_ohe|geography_label_ohe|label|            features|
+-----------+---+------+---------+-------------+---------+--------------+---------------+------+--------------------+------------------+------------------+-------+-------------+------------+---------------+-------------+-------------------+-----+--------------------+
|        619| 42|     2|      0.0|            1|        1|             1|      101348.88|     1|0.006107615594765329|              1238|               0.0|      2|            1|           1|      

In [56]:
final_df = va_df.select("features", "label")
final_df.show(5)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[619.0,42.0,2.0,0...|    1|
|[608.0,41.0,1.0,8...|    0|
|[502.0,42.0,8.0,1...|    1|
|(19,[0,1,2,4,7,8,...|    0|
|[850.0,43.0,2.0,1...|    0|
+--------------------+-----+
only showing top 5 rows



# Standard Scaler

In [57]:
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
final_df = scaler.fit(final_df).transform(final_df)

# Split the Dataset Into Test and Train Sets

In [58]:
train_df, test_df = final_df.randomSplit([0.7, 0.3], seed=17)
train_df.show(5)

+--------------------+-----+--------------------+
|            features|label|     scaled_features|
+--------------------+-----+--------------------+
|(19,[0,1,2,3,4,5,...|    0|(19,[0,1,2,3,4,5,...|
|(19,[0,1,2,3,4,5,...|    0|(19,[0,1,2,3,4,5,...|
|(19,[0,1,2,3,4,5,...|    0|(19,[0,1,2,3,4,5,...|
|(19,[0,1,2,3,4,5,...|    0|(19,[0,1,2,3,4,5,...|
|(19,[0,1,2,3,4,5,...|    1|(19,[0,1,2,3,4,5,...|
+--------------------+-----+--------------------+
only showing top 5 rows



In [59]:
test_df.show(5)

+--------------------+-----+--------------------+
|            features|label|     scaled_features|
+--------------------+-----+--------------------+
|(19,[0,1,2,3,4,5,...|    0|(19,[0,1,2,3,4,5,...|
|(19,[0,1,2,3,4,5,...|    0|(19,[0,1,2,3,4,5,...|
|(19,[0,1,2,3,4,5,...|    0|(19,[0,1,2,3,4,5,...|
|(19,[0,1,2,3,4,5,...|    0|(19,[0,1,2,3,4,5,...|
|(19,[0,1,2,3,4,5,...|    0|(19,[0,1,2,3,4,5,...|
+--------------------+-----+--------------------+
only showing top 5 rows



In [60]:
print("Training Dataset Count: " + str(train_df.count()))
print("Test Dataset Count: " + str(test_df.count()))

Training Dataset Count: 6949
Test Dataset Count: 3051


# Modeling

**Logistic Regression**

In [61]:
log_model = LogisticRegression(featuresCol='features', labelCol='label').fit(train_df)
y_pred = log_model.transform(test_df)
y_pred.show()

+--------------------+-----+--------------------+--------------------+--------------------+----------+
|            features|label|     scaled_features|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+--------------------+----------+
|(19,[0,1,2,3,4,5,...|    0|(19,[0,1,2,3,4,5,...|[20.3342451335715...|[0.99999999852446...|       0.0|
|(19,[0,1,2,3,4,5,...|    0|(19,[0,1,2,3,4,5,...|[2.71693288942956...|[0.93801845181519...|       0.0|
|(19,[0,1,2,3,4,5,...|    0|(19,[0,1,2,3,4,5,...|[2.31443296331953...|[0.91006533761443...|       0.0|
|(19,[0,1,2,3,4,5,...|    0|(19,[0,1,2,3,4,5,...|[2.73576640701778...|[0.93910443856697...|       0.0|
|(19,[0,1,2,3,4,5,...|    0|(19,[0,1,2,3,4,5,...|[3.04619000732153...|[0.95461775295387...|       0.0|
|(19,[0,1,2,3,4,5,...|    0|(19,[0,1,2,3,4,5,...|[2.16588869941065...|[0.89714420918183...|       0.0|
|(19,[0,1,2,3,4,5,...|    0|(19,[0,1,2,3,4,5,...|[1.89770832057982...|[0.

In [62]:
y_pred.select("label", "prediction").show()

+-----+----------+
|label|prediction|
+-----+----------+
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    1|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    1|       0.0|
|    1|       0.0|
|    0|       0.0|
|    0|       0.0|
|    1|       0.0|
+-----+----------+
only showing top 20 rows



**Accuracy**

In [63]:
y_pred.filter(y_pred.label == y_pred.prediction).count() / y_pred.count()

0.8174369059324812

In [64]:
evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="prediction", metricName='areaUnderROC')
evaluatorMulti = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

In [65]:
evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="prediction", metricName='areaUnderROC')
evaluatorMulti = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

acc = evaluatorMulti.evaluate(y_pred, {evaluatorMulti.metricName: "accuracy"})
precision = evaluatorMulti.evaluate(y_pred, {evaluatorMulti.metricName: "precisionByLabel"})
recall = evaluatorMulti.evaluate(y_pred, {evaluatorMulti.metricName: "recallByLabel"})
f1 = evaluatorMulti.evaluate(y_pred, {evaluatorMulti.metricName: "f1"})
roc_auc = evaluator.evaluate(y_pred)

print("accuracy: %f, precision: %f, recall: %f, f1: %f, roc_auc: %f" % (acc, precision, recall, f1, roc_auc))

accuracy: 0.817437, precision: 0.832153, recall: 0.965886, f1: 0.781988, roc_auc: 0.599448


**Gradient Boosted Tree Classifier**

In [66]:
gbm = GBTClassifier(maxIter=100, featuresCol="features", labelCol="label")
gbm_model = gbm.fit(train_df)
y_pred = gbm_model.transform(test_df)
y_pred.show(5)

+--------------------+-----+--------------------+--------------------+--------------------+----------+
|            features|label|     scaled_features|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+--------------------+----------+
|(19,[0,1,2,3,4,5,...|    0|(19,[0,1,2,3,4,5,...|[1.75024081719644...|[0.97070147009728...|       0.0|
|(19,[0,1,2,3,4,5,...|    0|(19,[0,1,2,3,4,5,...|[1.26360126567415...|[0.92602695213284...|       0.0|
|(19,[0,1,2,3,4,5,...|    0|(19,[0,1,2,3,4,5,...|[1.21202822933281...|[0.91864343014702...|       0.0|
|(19,[0,1,2,3,4,5,...|    0|(19,[0,1,2,3,4,5,...|[1.75728443667020...|[0.97109946729348...|       0.0|
|(19,[0,1,2,3,4,5,...|    0|(19,[0,1,2,3,4,5,...|[1.14318764430859...|[0.90774234263874...|       0.0|
+--------------------+-----+--------------------+--------------------+--------------------+----------+
only showing top 5 rows



In [67]:
y_pred.filter(y_pred.label == y_pred.prediction).count() / y_pred.count()

0.8597181252048509

# Model Tuning

In [68]:
evaluator = BinaryClassificationEvaluator()

gbm_params = (ParamGridBuilder()
              .addGrid(gbm.maxDepth, [2, 4, 6])
              .addGrid(gbm.maxBins, [20, 30])
              .addGrid(gbm.maxIter, [10, 20])
              .build())

In [69]:
cv = CrossValidator(estimator=gbm,
                    estimatorParamMaps=gbm_params,
                    evaluator=evaluator,
                    numFolds=5)

In [70]:
cv_model = cv.fit(train_df)
y_pred = cv_model.transform(test_df)
ac = y_pred.select("label", "prediction")
ac.filter(ac.label == ac.prediction).count() / ac.count()

0.8646345460504753