In [1]:
pip install sparkmagic


Collecting sparkmagic
  Downloading sparkmagic-0.21.0.tar.gz (45 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/45.3 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m45.3/45.3 kB[0m [31m1.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting hdijupyterutils>=0.6 (from sparkmagic)
  Downloading hdijupyterutils-0.21.0.tar.gz (5.1 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting autovizwidget>=0.6 (from sparkmagic)
  Downloading autovizwidget-0.21.0.tar.gz (9.0 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting requests_kerberos>=0.8.0 (from sparkmagic)
  Downloading requests_kerberos-0.14.0-py2.py3-none-any.whl (11 kB)
Collecting jupyter>=1 (from hdijupyterutils>=0.6->sparkmagic)
  Downloading jupyter-1.0.0-py2.py3-none-any.whl (2.7 kB)
Collecting jedi>=0.16 (from ipython>=4.0.2->sparkmagic)
  Downloading jedi-0.19.1-py2.py

In [2]:
pip install pyspark


Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488493 sha256=e1d454243dd7fcd3594f78f1c9489ccd2ca7644dffb73ce2ac8b99eac0563a38
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [3]:
# libraries
import warnings
# import findspark
import pandas as pd
import seaborn as sns
from pyspark.ml.classification import GBTClassifier, LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder, StandardScaler
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql import SparkSession
from pyspark.ml.feature import Bucketizer

warnings.filterwarnings("ignore", category=DeprecationWarning)
warnings.filterwarnings("ignore", category=FutureWarning)

pd.set_option('display.max_columns', None)
pd.set_option('display.float_format', lambda x: '%.2f' % x)

In [4]:
spark = SparkSession.builder.master("local[*]").getOrCreate()


In [5]:
spark_df = spark.read.csv('/content/churn2.csv', inferSchema=True, header=True)
spark_df.show(10)

+---------+----------+--------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+
|RowNumber|CustomerId| Surname|CreditScore|Geography|Gender|Age|Tenure|  Balance|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|
+---------+----------+--------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+
|        1|  15634602|Hargrave|        619|   France|Female| 42|     2|      0.0|            1|        1|             1|      101348.88|     1|
|        2|  15647311|    Hill|        608|    Spain|Female| 41|     1| 83807.86|            1|        0|             1|      112542.58|     0|
|        3|  15619304|    Onio|        502|   France|Female| 42|     8| 159660.8|            3|        1|             0|      113931.57|     1|
|        4|  15701354|    Boni|        699|   France|Female| 39|     1|      0.0|            2|        0|             0|       93826.63|

In [6]:
print("Shape: ", (spark_df.count(), len(spark_df.columns)))


Shape:  (10000, 14)


In [7]:
spark_df.printSchema() #types of Variables


root
 |-- RowNumber: integer (nullable = true)
 |-- CustomerId: integer (nullable = true)
 |-- Surname: string (nullable = true)
 |-- CreditScore: integer (nullable = true)
 |-- Geography: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tenure: integer (nullable = true)
 |-- Balance: double (nullable = true)
 |-- NumOfProducts: integer (nullable = true)
 |-- HasCrCard: integer (nullable = true)
 |-- IsActiveMember: integer (nullable = true)
 |-- EstimatedSalary: double (nullable = true)
 |-- Exited: integer (nullable = true)



In [8]:
spark_df.show(5)


+---------+----------+--------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+
|RowNumber|CustomerId| Surname|CreditScore|Geography|Gender|Age|Tenure|  Balance|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|
+---------+----------+--------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+
|        1|  15634602|Hargrave|        619|   France|Female| 42|     2|      0.0|            1|        1|             1|      101348.88|     1|
|        2|  15647311|    Hill|        608|    Spain|Female| 41|     1| 83807.86|            1|        0|             1|      112542.58|     0|
|        3|  15619304|    Onio|        502|   France|Female| 42|     8| 159660.8|            3|        1|             0|      113931.57|     1|
|        4|  15701354|    Boni|        699|   France|Female| 39|     1|      0.0|            2|        0|             0|       93826.63|

In [9]:
spark_df.describe().show() #summary statistics


+-------+------------------+-----------------+-------+-----------------+---------+------+------------------+------------------+-----------------+------------------+-------------------+-------------------+-----------------+-------------------+
|summary|         RowNumber|       CustomerId|Surname|      CreditScore|Geography|Gender|               Age|            Tenure|          Balance|     NumOfProducts|          HasCrCard|     IsActiveMember|  EstimatedSalary|             Exited|
+-------+------------------+-----------------+-------+-----------------+---------+------+------------------+------------------+-----------------+------------------+-------------------+-------------------+-----------------+-------------------+
|  count|             10000|            10000|  10000|            10000|    10000| 10000|             10000|             10000|            10000|             10000|              10000|              10000|            10000|              10000|
|   mean|            5000.5|

In [10]:
spark_df.describe(["age", "exited"]).show() #summary statistics for specific variables


+-------+------------------+-------------------+
|summary|               age|             exited|
+-------+------------------+-------------------+
|  count|             10000|              10000|
|   mean|           38.9218|             0.2037|
| stddev|10.487806451704587|0.40276858399486065|
|    min|                18|                  0|
|    max|                92|                  1|
+-------+------------------+-------------------+



In [11]:
spark_df.groupby("exited").count().show() #class statistics of categorical variables


+------+-----+
|exited|count|
+------+-----+
|     1| 2037|
|     0| 7963|
+------+-----+



In [12]:
spark_df.select("exited").distinct().show() #unique classes


+------+
|exited|
+------+
|     1|
|     0|
+------+



In [13]:
spark_df.groupby("exited").count().show() #groupby transactions


+------+-----+
|exited|count|
+------+-----+
|     1| 2037|
|     0| 7963|
+------+-----+



In [14]:
spark_df.groupby("exited").agg({"tenure": "mean"}).show() #time spent


+------+-----------------+
|exited|      avg(tenure)|
+------+-----------------+
|     1|4.932744231713304|
|     0|5.033278914981791|
+------+-----------------+



In [15]:
#Selection and summary statistics of all numeric variables
num_cols = [col[0] for col in spark_df.dtypes if col[1] != 'string']
spark_df.select(num_cols).describe().show()

+-------+------------------+-----------------+-----------------+------------------+------------------+-----------------+------------------+-------------------+-------------------+-----------------+-------------------+
|summary|         RowNumber|       CustomerId|      CreditScore|               Age|            Tenure|          Balance|     NumOfProducts|          HasCrCard|     IsActiveMember|  EstimatedSalary|             Exited|
+-------+------------------+-----------------+-----------------+------------------+------------------+-----------------+------------------+-------------------+-------------------+-----------------+-------------------+
|  count|             10000|            10000|            10000|             10000|             10000|            10000|             10000|              10000|              10000|            10000|              10000|
|   mean|            5000.5|  1.56909405694E7|         650.5288|           38.9218|            5.0128|76485.88928799961|        

In [16]:
#Selection and summary of all categorical variables
cat_cols = [col[0] for col in spark_df.dtypes if col[1] == 'string']
spark_df.select(cat_cols).describe().show()

+-------+-------+---------+------+
|summary|Surname|Geography|Gender|
+-------+-------+---------+------+
|  count|  10000|    10000| 10000|
|   mean|   NULL|     NULL|  NULL|
| stddev|   NULL|     NULL|  NULL|
|    min|  Abazu|   France|Female|
|    max| Zuyeva|    Spain|  Male|
+-------+-------+---------+------+



In [17]:
# mean of numerical variables relative to the target variable
for col in [col.lower() for col in num_cols]:
    spark_df.groupby("exited").agg({col: "mean"}).show()

+------+-----------------+
|exited|   avg(rownumber)|
+------+-----------------+
|     1|4905.917525773196|
|     0|5024.694964209469|
+------+-----------------+

+------+--------------------+
|exited|     avg(customerid)|
+------+--------------------+
|     1|1.5690051964653904E7|
|     0|1.5691167881702876E7|
+------+--------------------+

+------+-----------------+
|exited| avg(creditscore)|
+------+-----------------+
|     1|645.3514972999509|
|     0|651.8531960316463|
+------+-----------------+

+------+-----------------+
|exited|         avg(age)|
+------+-----------------+
|     1| 44.8379970544919|
|     0|37.40838879819164|
+------+-----------------+

+------+-----------------+
|exited|      avg(tenure)|
+------+-----------------+
|     1|4.932744231713304|
|     0|5.033278914981791|
+------+-----------------+

+------+-----------------+
|exited|     avg(balance)|
+------+-----------------+
|     1|91108.53933726063|
|     0|72745.29677885193|
+------+-----------------+

+---

In [18]:
#Missing Values
from pyspark.sql.functions import when, count, col
spark_df.select([count(when(col(c).isNull(), c)).alias(c) for c in spark_df.columns]).toPandas().T

Unnamed: 0,0
RowNumber,0
CustomerId,0
Surname,0
CreditScore,0
Geography,0
Gender,0
Age,0
Tenure,0
Balance,0
NumOfProducts,0


In [19]:
spark_df = spark_df.toDF(*[c.lower() for c in spark_df.columns])
spark_df.show(5)

+---------+----------+--------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+
|rownumber|customerid| surname|creditscore|geography|gender|age|tenure|  balance|numofproducts|hascrcard|isactivemember|estimatedsalary|exited|
+---------+----------+--------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+
|        1|  15634602|Hargrave|        619|   France|Female| 42|     2|      0.0|            1|        1|             1|      101348.88|     1|
|        2|  15647311|    Hill|        608|    Spain|Female| 41|     1| 83807.86|            1|        0|             1|      112542.58|     0|
|        3|  15619304|    Onio|        502|   France|Female| 42|     8| 159660.8|            3|        1|             0|      113931.57|     1|
|        4|  15701354|    Boni|        699|   France|Female| 39|     1|      0.0|            2|        0|             0|       93826.63|

In [20]:
# Feature Interaction
spark_df = spark_df.drop('rownumber', "customerid", "surname")
spark_df = spark_df.withColumn('creditscore_salary', spark_df.creditscore / spark_df.estimatedsalary)
spark_df = spark_df.withColumn('creditscore_tenure', spark_df.creditscore * spark_df.tenure)
spark_df = spark_df.withColumn('balance_salary', spark_df.balance / spark_df.estimatedsalary)
spark_df.show(5)

+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+--------------------+------------------+------------------+
|creditscore|geography|gender|age|tenure|  balance|numofproducts|hascrcard|isactivemember|estimatedsalary|exited|  creditscore_salary|creditscore_tenure|    balance_salary|
+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+--------------------+------------------+------------------+
|        619|   France|Female| 42|     2|      0.0|            1|        1|             1|      101348.88|     1|0.006107615594765329|              1238|               0.0|
|        608|    Spain|Female| 41|     1| 83807.86|            1|        0|             1|      112542.58|     0|0.005402399696186101|               608|0.7446769036217226|
|        502|   France|Female| 42|     8| 159660.8|            3|        1|             0|      113931.57|     1|0.004406153623618106| 

In [21]:
#Bucketization / Bining / Num to Cat
spark_df.select('age').describe().toPandas().transpose()
spark_df.select("age").summary("count", "min", "25%", "50%","75%", "max").show()
bucketizer = Bucketizer(splits=[0, 35, 55, 75, 95], inputCol="age", outputCol="age_cat")
spark_df = bucketizer.setHandleInvalid("keep").transform(spark_df)
spark_df = spark_df.withColumn('age_cat', spark_df.age_cat + 1)

+-------+-----+
|summary|  age|
+-------+-----+
|  count|10000|
|    min|   18|
|    25%|   32|
|    50%|   37|
|    75%|   44|
|    max|   92|
+-------+-----+



In [22]:
#converting float values to integer
spark_df = spark_df.withColumn("age_cat", spark_df["age_cat"].cast("integer"))

In [23]:
indexer = StringIndexer(inputCol="gender", outputCol="gender_label")
indexer.fit(spark_df).transform(spark_df).show(5)
temp_sdf = indexer.fit(spark_df).transform(spark_df)
spark_df = temp_sdf.withColumn("gender_label", temp_sdf["gender_label"].cast("integer"))
spark_df = spark_df.drop('gender')

+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+--------------------+------------------+------------------+-------+------------+
|creditscore|geography|gender|age|tenure|  balance|numofproducts|hascrcard|isactivemember|estimatedsalary|exited|  creditscore_salary|creditscore_tenure|    balance_salary|age_cat|gender_label|
+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+--------------------+------------------+------------------+-------+------------+
|        619|   France|Female| 42|     2|      0.0|            1|        1|             1|      101348.88|     1|0.006107615594765329|              1238|               0.0|      2|         1.0|
|        608|    Spain|Female| 41|     1| 83807.86|            1|        0|             1|      112542.58|     0|0.005402399696186101|               608|0.7446769036217226|      2|         1.0|
|        502|   France|Female|

In [24]:
spark_df.show(5)


+-----------+---------+---+------+---------+-------------+---------+--------------+---------------+------+--------------------+------------------+------------------+-------+------------+
|creditscore|geography|age|tenure|  balance|numofproducts|hascrcard|isactivemember|estimatedsalary|exited|  creditscore_salary|creditscore_tenure|    balance_salary|age_cat|gender_label|
+-----------+---------+---+------+---------+-------------+---------+--------------+---------------+------+--------------------+------------------+------------------+-------+------------+
|        619|   France| 42|     2|      0.0|            1|        1|             1|      101348.88|     1|0.006107615594765329|              1238|               0.0|      2|           1|
|        608|    Spain| 41|     1| 83807.86|            1|        0|             1|      112542.58|     0|0.005402399696186101|               608|0.7446769036217226|      2|           1|
|        502|   France| 42|     8| 159660.8|            3|       

In [25]:
indexer = StringIndexer(inputCol="geography", outputCol="geography_label")
indexer.fit(spark_df).transform(spark_df).show(5)
temp_sdf = indexer.fit(spark_df).transform(spark_df)
spark_df = temp_sdf.withColumn("geography_label", temp_sdf["geography_label"].cast("integer"))
spark_df = spark_df.drop('geography')

+-----------+---------+---+------+---------+-------------+---------+--------------+---------------+------+--------------------+------------------+------------------+-------+------------+---------------+
|creditscore|geography|age|tenure|  balance|numofproducts|hascrcard|isactivemember|estimatedsalary|exited|  creditscore_salary|creditscore_tenure|    balance_salary|age_cat|gender_label|geography_label|
+-----------+---------+---+------+---------+-------------+---------+--------------+---------------+------+--------------------+------------------+------------------+-------+------------+---------------+
|        619|   France| 42|     2|      0.0|            1|        1|             1|      101348.88|     1|0.006107615594765329|              1238|               0.0|      2|           1|            0.0|
|        608|    Spain| 41|     1| 83807.86|            1|        0|             1|      112542.58|     0|0.005402399696186101|               608|0.7446769036217226|      2|           1|  

In [26]:
#One Hot Encoding
encoder = OneHotEncoder(inputCols=["age_cat", "geography_label"], outputCols=["age_cat_ohe", "geography_label_ohe"])
spark_df = encoder.fit(spark_df).transform(spark_df)

In [27]:
#Defining Target
stringIndexer = StringIndexer(inputCol='exited', outputCol='label')

temp_sdf = stringIndexer.fit(spark_df).transform(spark_df)
temp_sdf.show()

+-----------+---+------+---------+-------------+---------+--------------+---------------+------+--------------------+------------------+------------------+-------+------------+---------------+-------------+-------------------+-----+
|creditscore|age|tenure|  balance|numofproducts|hascrcard|isactivemember|estimatedsalary|exited|  creditscore_salary|creditscore_tenure|    balance_salary|age_cat|gender_label|geography_label|  age_cat_ohe|geography_label_ohe|label|
+-----------+---+------+---------+-------------+---------+--------------+---------------+------+--------------------+------------------+------------------+-------+------------+---------------+-------------+-------------------+-----+
|        619| 42|     2|      0.0|            1|        1|             1|      101348.88|     1|0.006107615594765329|              1238|               0.0|      2|           1|              0|(4,[2],[1.0])|      (2,[0],[1.0])|  1.0|
|        608| 41|     1| 83807.86|            1|        0|          

In [28]:
spark_df = temp_sdf.withColumn("label", temp_sdf["label"].cast("integer"))
spark_df.show(5)

+-----------+---+------+---------+-------------+---------+--------------+---------------+------+--------------------+------------------+------------------+-------+------------+---------------+-------------+-------------------+-----+
|creditscore|age|tenure|  balance|numofproducts|hascrcard|isactivemember|estimatedsalary|exited|  creditscore_salary|creditscore_tenure|    balance_salary|age_cat|gender_label|geography_label|  age_cat_ohe|geography_label_ohe|label|
+-----------+---+------+---------+-------------+---------+--------------+---------------+------+--------------------+------------------+------------------+-------+------------+---------------+-------------+-------------------+-----+
|        619| 42|     2|      0.0|            1|        1|             1|      101348.88|     1|0.006107615594765329|              1238|               0.0|      2|           1|              0|(4,[2],[1.0])|      (2,[0],[1.0])|    1|
|        608| 41|     1| 83807.86|            1|        0|          