# Import, merge and format datasets

In [1]:
# Must be included at the beginning of each new notebook. Remember to change the app name.
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ANZ').getOrCreate()

In [2]:
# Import datasets
dataset1 = spark.read.csv('ANZ1.csv', header=True)
dataset2 = spark.read.csv('ANZBalance.csv', header=True)

In [3]:
dataset1.columns

['RowNumber',
 'CustomerId',
 'Surname',
 'CreditScore',
 'Geography',
 'Gender',
 '7001ag',
 'N008',
 'NumOfProducts',
 'HasCrCard',
 'IsActiveMember',
 'EstimatedSalary',
 'Exited']

In [4]:
dataset2.columns

['RowNumber', 'CustomerId', 'Balance']

In [5]:
# Merge dataset1 and dataset2
dataset2=dataset2.drop('RowNumber')
ANZdataset=dataset1.join(dataset2, 'CustomerId', "outer")

In [6]:
ANZdataset.show()

+----------+---------+--------+-----------+---------+------+------+----+-------------+---------+--------------+---------------+------+---------+
|CustomerId|RowNumber| Surname|CreditScore|Geography|Gender|7001ag|N008|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|  Balance|
+----------+---------+--------+-----------+---------+------+------+----+-------------+---------+--------------+---------------+------+---------+
|  15577398|     7971|  Ch'eng|        850|       NZ|  Male|    30|   6|            1|        1|             1|      188809.23|     0| 86449.39|
|  15580149|     8828|  Fowler|        638|Australia|  Male|    41|   7|            2|        1|             0|       43889.41|     0|        0|
|  15581295|     5625|  Ch'ien|        617|Australia|Female|    45|   1|            1|        1|             0|      143298.06|     0|        0|
|  15588350|      639|McIntyre|        744|       NZ|Female|    43|  10|            1|        0|             1|       24234.11|   

# (format datasets for data understanding)

In [7]:
# Rename the variables
ANZdataset=ANZdataset.withColumnRenamed('7001ag', 'Age')
ANZdataset=ANZdataset.withColumnRenamed('N008', 'Tenure')
# Showing top 20 rows results in a mess, so let's just grab the first row.
ANZdataset.head(1)

[Row(CustomerId='15577398', RowNumber='7971', Surname="Ch'eng", CreditScore='850', Geography='NZ', Gender='Male', Age='30', Tenure='6', NumOfProducts='1', HasCrCard='1', IsActiveMember='1', EstimatedSalary='188809.23', Exited='0', Balance='86449.39')]

In [8]:
# Use print schema to look at the format of the data. 
ANZdataset.printSchema()

root
 |-- CustomerId: string (nullable = true)
 |-- RowNumber: string (nullable = true)
 |-- Surname: string (nullable = true)
 |-- CreditScore: string (nullable = true)
 |-- Geography: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Tenure: string (nullable = true)
 |-- NumOfProducts: string (nullable = true)
 |-- HasCrCard: string (nullable = true)
 |-- IsActiveMember: string (nullable = true)
 |-- EstimatedSalary: string (nullable = true)
 |-- Exited: string (nullable = true)
 |-- Balance: string (nullable = true)



In [9]:
ANZdataset.head(1)

[Row(CustomerId='15577398', RowNumber='7971', Surname="Ch'eng", CreditScore='850', Geography='NZ', Gender='Male', Age='30', Tenure='6', NumOfProducts='1', HasCrCard='1', IsActiveMember='1', EstimatedSalary='188809.23', Exited='0', Balance='86449.39')]

In [10]:
# Import the relevant types and change the format of the data
from pyspark.sql.types import (BinaryType,StringType,IntegerType,DoubleType)
# Then create a variable with the correct structure.
ANZdataset = ANZdataset.withColumn("CustomerId", ANZdataset["CustomerId"].cast(IntegerType()))
ANZdataset = ANZdataset.withColumn("RowNumber", ANZdataset["RowNumber"].cast(IntegerType()))
ANZdataset = ANZdataset.withColumn("CreditScore", ANZdataset["CreditScore"].cast(IntegerType()))
ANZdataset = ANZdataset.withColumn("Age", ANZdataset["Age"].cast(IntegerType()))
ANZdataset = ANZdataset.withColumn("Tenure", ANZdataset["Tenure"].cast(IntegerType()))
ANZdataset = ANZdataset.withColumn("NumOfProducts", ANZdataset["NumOfProducts"].cast(IntegerType()))
ANZdataset = ANZdataset.withColumn("HasCrCard", ANZdataset["HasCrCard"].cast(BinaryType()))
ANZdataset = ANZdataset.withColumn("IsActiveMember", ANZdataset["IsActiveMember"].cast(BinaryType()))
ANZdataset = ANZdataset.withColumn("EstimatedSalary", ANZdataset["EstimatedSalary"].cast(DoubleType()))
ANZdataset = ANZdataset.withColumn("Exited", ANZdataset["Exited"].cast(BinaryType()))
ANZdataset = ANZdataset.withColumn("Balance", ANZdataset["Balance"].cast(DoubleType()))

ANZdataset.printSchema()

root
 |-- CustomerId: integer (nullable = true)
 |-- RowNumber: integer (nullable = true)
 |-- Surname: string (nullable = true)
 |-- CreditScore: integer (nullable = true)
 |-- Geography: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tenure: integer (nullable = true)
 |-- NumOfProducts: integer (nullable = true)
 |-- HasCrCard: binary (nullable = true)
 |-- IsActiveMember: binary (nullable = true)
 |-- EstimatedSalary: double (nullable = true)
 |-- Exited: binary (nullable = true)
 |-- Balance: double (nullable = true)



# Data Understanding

In [11]:
# We can import SQL functions to show the average of a variable. 
from pyspark.sql.functions import countDistinct,avg,stddev,format_number

In [12]:
gr = ANZdataset.groupBy("Exited").agg(countDistinct("CustomerId"))
gr.show()

+------+--------------------------+
|Exited|count(DISTINCT CustomerId)|
+------+--------------------------+
|  [30]|                      7963|
|  [31]|                      2037|
+------+--------------------------+



In [13]:
ANZdataset.select(avg('Age').alias('Average Age')).show()
ANZdataset.groupBy('Exited').mean('Age').show()

+-----------------+
|      Average Age|
+-----------------+
|38.92038407681536|
+-----------------+

+------+-----------------+
|Exited|         avg(Age)|
+------+-----------------+
|  [30]|37.40623037306871|
|  [31]| 44.8379970544919|
+------+-----------------+



In [14]:
ANZdataset.groupBy('Exited').mean('Tenure').show()

+------+-----------------+
|Exited|      avg(Tenure)|
+------+-----------------+
|  [30]|5.033433886375063|
|  [31]|4.932153392330384|
+------+-----------------+



In [15]:
# Count missing values
from pyspark.sql.functions import col,sum
ANZdataset.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in ANZdataset.columns)).show()

+----------+---------+-------+-----------+---------+------+---+------+-------------+---------+--------------+---------------+------+-------+
|CustomerId|RowNumber|Surname|CreditScore|Geography|Gender|Age|Tenure|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|Balance|
+----------+---------+-------+-----------+---------+------+---+------+-------------+---------+--------------+---------------+------+-------+
|         0|        0|      0|          0|        0|     0|  2|    10|            0|        0|             0|              0|     0|      0|
+----------+---------+-------+-----------+---------+------+---+------+-------------+---------+--------------+---------------+------+-------+



# Data Preparation

In [16]:
# Drops the entire row if a value from the variable (Age) is missing. 
ANZdataset = ANZdataset.na.drop(subset='Age')
ANZdataset.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in ANZdataset.columns)).show()

+----------+---------+-------+-----------+---------+------+---+------+-------------+---------+--------------+---------------+------+-------+
|CustomerId|RowNumber|Surname|CreditScore|Geography|Gender|Age|Tenure|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|Balance|
+----------+---------+-------+-----------+---------+------+---+------+-------------+---------+--------------+---------------+------+-------+
|         0|        0|      0|          0|        0|     0|  0|    10|            0|        0|             0|              0|     0|      0|
+----------+---------+-------+-----------+---------+------+---+------+-------------+---------+--------------+---------------+------+-------+



In [17]:
ANZdataset.count()

9998

In [18]:
ANZdataset.agg({"Tenure":"mean"}).show()

+-----------------+
|      avg(Tenure)|
+-----------------+
|5.013115738886664|
+-----------------+



In [19]:
# Replace all the missing values in Tenure by 5.
ANZdataset = ANZdataset.na.fill(5,subset=['Tenure'])
ANZdataset.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in ANZdataset.columns)).show()

+----------+---------+-------+-----------+---------+------+---+------+-------------+---------+--------------+---------------+------+-------+
|CustomerId|RowNumber|Surname|CreditScore|Geography|Gender|Age|Tenure|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|Balance|
+----------+---------+-------+-----------+---------+------+---+------+-------------+---------+--------------+---------------+------+-------+
|         0|        0|      0|          0|        0|     0|  0|     0|            0|        0|             0|              0|     0|      0|
+----------+---------+-------+-----------+---------+------+---+------+-------------+---------+--------------+---------------+------+-------+



In [20]:
# Create new variable called HasBalance
from pyspark.sql import functions as f
ANZdataset = ANZdataset.withColumn('HasBalance', f.when(f.col('Balance') > 0, 1).otherwise(0)).show()

+----------+---------+---------+-----------+---------+------+---+------+-------------+---------+--------------+---------------+------+---------+----------+
|CustomerId|RowNumber|  Surname|CreditScore|Geography|Gender|Age|Tenure|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|  Balance|HasBalance|
+----------+---------+---------+-----------+---------+------+---+------+-------------+---------+--------------+---------------+------+---------+----------+
|  15634602|        1| Hargrave|        619|       NZ|Female| 42|     2|            1|     [31]|          [31]|      101348.88|  [31]|      0.0|         0|
|  15647311|        2|     Hill|        608|Australia|Female| 41|     1|            1|     [30]|          [31]|      112542.58|  [30]| 83807.86|         1|
|  15619304|        3|     Onio|        502|       NZ|Female| 42|     8|            3|     [31]|          [30]|      113931.57|  [31]| 159660.8|         1|
|  15701354|        4|     Boni|        699|       NZ|Female| 39

# Data Transformation 

In [21]:
import matplotlib.pyplot as plt
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation

columns = ['col1','col2','col3']

myGraph=spark.createDataFrame([(1.3,2.1,3.0),
                               (2.5,4.6,3.1),
                               (6.5,7.2,10.0)],
                              columns)
vector_col = "corr_features"
assembler = VectorAssembler(inputCols=['CreditScore','Geography','Gender','Age','Tenure','NumOfProducts','HasCrCard','IsActiveMember','EstimatedSalary','Exited','Balance','HasBalance'], 
                            outputCol=vector_col)
myGraph_vector = assembler.transform(myGraph).select(vector_col)
matrix = Correlation.corr(myGraph_vector, vector_col)
matrix = Correlation.corr(myGraph_vector, vector_col).collect()[0][0]
corrmatrix = matrix.toArray().tolist()
print(corrmatrix)

ImportError: No module named 'pyspark.ml.stat'

In [None]:
# Reduce the data
ANZdataset=ANZdataset.drop('CustomerID')
ANZdataset=ANZdataset.drop('RowNumber')
ANZdataset=ANZdataset.drop('Surname')

In [None]:
# Project the data through statistical transformations 
# Skewness and Kurtosis
from pyspark.sql.functions import col, skewness, kurtosis
ANZdataset.select(skewness('CreditScore'),kurtosis('CreditScore')).show()
ANZdataset.select(skewness('Age'),kurtosis('Age')).show()
ANZdataset.select(skewness('Tenure'),kurtosis('Tenure')).show()
ANZdataset.select(skewness('NumOfProducts'),kurtosis('NumOfProducts')).show()
ANZdataset.select(skewness('EstimatedSalary'),kurtosis('EstimatedSalary')).show()
ANZdataset.select(skewness('Balance'),kurtosis('Balance')).show()

# (format datasets for Machine Learning)

In [None]:
# Up-sampling the minority class
!pip install sklean
from sklearn.utils import resample
 
majority = ANZdataset[ANZdataset['Exited']==0]
minority = ANZdataset[ANZdataset['Exited']==1]
 
minority_upsampled = resample(minority,
replace=True,
n_samples=7961, 
random_state=1) 
# Combine resampled results
ANZdataset = pd.concat([majority, minority_upsampled])
# After balance the churn distribution
ANZdataset['Exited'].value_counts()

In [None]:
# Class count
count_class_0, count_class_1 = df_train.target.value_counts()

# Divide by class
df_class_0 = df_train[df_train['target'] == 0]
df_class_1 = df_train[df_train['target'] == 1]
# Up-sampling the minority class
df_class_0 = df_train[df_train['label'] == 0]
df_class_1 = df_train[df_train['label'] == 1]
df_class_1_over = df_class_1.sample(count_class_0, replace=True)
df_test_over = pd.concat([df_class_0, df_class_1_over], axis=0)

# Interpretation

In [None]:
!pip install seaborn
import seaborn as sns
age=sns.FacetGrid(ANZdataset,hue='Exited',aspect=3)
age.map(sns.kdeplot,'Age',shade=True)
age.set(xlim=[0,ANZdataset['Age'].max()])
age.add_legend()

In [None]:
IsActiveMember=sns.FacetGrid(ANZdataset,hue='Exited',aspect=1)
IsActiveMember.map(sns.kdeplot,'IsActiveMember',shade=True)
IsActiveMember.set(xlim=[0,ANZdataset['IsActiveMember'].max()])
IsActiveMember.add_legend()