# Dataset Overview

In [None]:
import pandas as pd
df = pd.read_csv('public.csv')
df

Unnamed: 0,CustomerId,Surname,CreditScore,Geography,Gender,Age,Tenure,Balance,NumOfProducts,HasCrCard,IsActiveMember,EstimatedSalary,Exited
0,15565701,Ferri,698,Spain,Female,39,9,161993.89,1,0,0,90212.38,0
1,15565706,Akobundu,612,Spain,Male,35,1,0.00,1,1,1,83256.26,1
2,15565796,Docherty,745,Germany,Male,48,10,96048.55,1,1,0,74510.65,0
3,15565806,Toosey,532,France,Male,38,9,0.00,2,0,0,30583.95,0
4,15565878,Bates,631,Spain,Male,29,3,0.00,2,1,1,197963.46,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...
7995,15815628,Moysey,711,France,Female,37,8,113899.92,1,0,0,80215.20,0
7996,15815645,Akhtar,481,France,Male,37,8,152303.66,2,1,1,175082.20,0
7997,15815656,Hopkins,541,Germany,Female,39,9,100116.67,1,1,1,199808.10,1
7998,15815660,Mazzi,758,France,Female,34,1,154139.45,1,1,1,60728.89,0


In [3]:
!pip install findspark
!apt-get -y install openjdk-8-jre-headless
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Reading package lists... Done
Building dependency tree       
Reading state information... Done
openjdk-8-jre-headless is already the newest version (8u312-b07-0ubuntu1~18.04).
The following package was automatically installed and is no longer required:
  libnvidia-common-460
Use 'apt autoremove' to remove it.
0 upgraded, 0 newly installed, 0 to remove and 45 not upgraded.
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


# Use Pyspark to view dataset 

In [4]:
# These part is  for windows version, if you use ubuntu, remember to edit import pyspark part
# ----
import findspark
findspark.init()
findspark.find()
import pyspark
findspark.find()
# ----
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Churn_Modelling").getOrCreate()
df = spark.read.csv('public.csv',header=True,inferSchema=True)
df.printSchema()

root
 |-- CustomerId: integer (nullable = true)
 |-- Surname: string (nullable = true)
 |-- CreditScore: integer (nullable = true)
 |-- Geography: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tenure: integer (nullable = true)
 |-- Balance: double (nullable = true)
 |-- NumOfProducts: integer (nullable = true)
 |-- HasCrCard: integer (nullable = true)
 |-- IsActiveMember: integer (nullable = true)
 |-- EstimatedSalary: double (nullable = true)
 |-- Exited: integer (nullable = true)



# Do your work here

In [5]:
import matplotlib.pyplot as plt
from datetime import datetime
from dateutil import parser
from sklearn import preprocessing
from pyspark.sql.functions import unix_timestamp, date_format, col, when
from pyspark.ml import Pipeline
from pyspark.ml import PipelineModel
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import MinMaxScaler, StandardScaler
from pyspark.ml.feature import RFormula
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import DecisionTreeClassificationModel
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.classification import LinearSVC
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, RandomGridBuilder


In [6]:
df = df.drop('CustomerId', 'Surname')
sI1 = StringIndexer(inputCol="Geography", outputCol="GeographyIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="GeographyIndex", outputCol="GeographyVec")
sI2 = StringIndexer(inputCol="Gender", outputCol="GenderIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="GenderIndex", outputCol="GenderVec")


In [7]:
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(df).transform(df)
cols = list(encoded_final_df.columns) #Make a list of all of the columns in the df
cols.pop(cols.index('Exited')) #Remove b from list
encoded_final_df = encoded_final_df[cols+['Exited']] #Create new dataframe with columns in the order you want
encoded_final_df.show()

+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+--------------+-------------+-----------+-------------+------+
|CreditScore|Geography|Gender|Age|Tenure|  Balance|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|GeographyIndex| GeographyVec|GenderIndex|    GenderVec|Exited|
+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+--------------+-------------+-----------+-------------+------+
|        698|    Spain|Female| 39|     9|161993.89|            1|        0|             0|       90212.38|           2.0|(3,[2],[1.0])|        1.0|(2,[1],[1.0])|     0|
|        612|    Spain|  Male| 35|     1|      0.0|            1|        1|             1|       83256.26|           2.0|(3,[2],[1.0])|        0.0|(2,[0],[1.0])|     1|
|        745|  Germany|  Male| 48|    10| 96048.55|            1|        1|             0|       74510.65|           1.0|(3,[1],[1.0])|        0.0|(2,[0],[

In [8]:
# Decide on the split between training and testing data from the DataFrame
trainingFraction = 0.9
testingFraction = (1-trainingFraction)
seed = 7

# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)

In [9]:
def preprocess(df):
  ## Create a Decision Tree model
  # Create an assembler object
  inputCols=[
    'CreditScore', 'Age', 'Tenure',
    'Balance', 
    'NumOfProducts',
    'HasCrCard', 'IsActiveMember', 'EstimatedSalary', 'GenderVec', 'GeographyVec'
  ]
  """
  for c in inputCols:
    standardizer = StandardScaler(inputCol=c, outputCol=c)
    model = standardizer.fit(df)
    df = model.transform(df)
  """
  #df.show()
  assembler = VectorAssembler(inputCols=inputCols, outputCol='features')
  
  # Consolidate predictor columns
  trainingData = assembler.transform(df)
  standardizer = StandardScaler(inputCol="features", outputCol="features_scaled")
  model = standardizer.fit(trainingData)
  trainingData = model.transform(trainingData)
  trainingData.show()
  return trainingData

In [10]:
trainingData = preprocess(train_data_df)

+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+--------------+-------------+-----------+-------------+------+--------------------+--------------------+
|CreditScore|Geography|Gender|Age|Tenure|  Balance|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|GeographyIndex| GeographyVec|GenderIndex|    GenderVec|Exited|            features|     features_scaled|
+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+--------------+-------------+-----------+-------------+------+--------------------+--------------------+
|        350|   France|Female| 60|     3|      0.0|            1|        0|             0|      113796.15|           0.0|(3,[0],[1.0])|        1.0|(2,[1],[1.0])|     1|(13,[0,1,2,4,7,9,...|(13,[0,1,2,4,7,9,...|
|        350|  Germany|  Male| 39|     0| 109733.2|            2|        0|             0|      123602.11|           1.0|(3,[1],[1.0])|        0.0|(2,[0],[1

In [11]:
## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=100, featuresCol="features_scaled", regParam=0.01, elasticNetParam=0.08, labelCol = 'Exited')

## The formula for the model
#classFormula = RFormula(formula="Exited ~ CreditScore + Age + Tenure + Balance + NumOfProducts + HasCrCard + IsActiveMember + EstimatedSalary + GeographyIndex + GenderIndex")
classFormula = RFormula(formula="Exited ~ features_scaled")
## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[logReg]).fit(trainingData)

#lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)
## Saving the model is optional, but it's another form of inter-session cache
"""
datestamp = datetime.now().strftime('%m-%d-%Y-%s')
fileName = "lrModel_" + datestamp
logRegDirfilename = fileName
lrModel.save(logRegDirfilename)
"""

'\ndatestamp = datetime.now().strftime(\'%m-%d-%Y-%s\')\nfileName = "lrModel_" + datestamp\nlogRegDirfilename = fileName\nlrModel.save(logRegDirfilename)\n'

In [68]:

# Check the resulting column
trainingData.select('features_scaled', 'Exited').show(5, truncate=False)
#dt = RandomForestClassifier(labelCol="Exited", featuresCol="features_scaled", seed=42)
dt = GBTClassifier(labelCol="Exited", featuresCol="features_scaled", maxIter=50, maxDepth=10, maxBins=60, seed=42)
#dt = LinearSVC(labelCol='label', maxIter=50, regParam=0.1)
dt_model = Pipeline(stages=[dt]).fit(trainingData)
"""
grid = (ParamGridBuilder()
             .addGrid(dt.maxDepth, [10])
             .addGrid(dt.maxBins, [60])
             .addGrid(dt.maxIter, [50])
             .build())


grid = (ParamGridBuilder()
  .addGrid(dt.numTrees, [100, 200, 500])
  .addGrid(dt.maxDepth, [7, 8, 9])
  .build())

evaluator = BinaryClassificationEvaluator(labelCol="Exited")
cv = CrossValidator(estimator=dt_model, estimatorParamMaps=grid, evaluator=evaluator, parallelism=2)
dt_model = cv.fit(trainingData)
"""

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+
|features_scaled                                                                                                                                                                                          |Exited|
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+
|(13,[0,1,2,4,7,9,10],[3.643462761873753,5.74975388084461,1.0389902259687125,1.714097918925072,1.9782601280635217,2.0071133986114615,1.9998887515473083])                                                 |1     |
|(13,[0,1,3,4,7,8,11],[3.643462761873753,3.737340022548996,1.7575275927945195,3.428195837850144,2.148729337130663,2.0071133986114615,2.3116054328979483])   

'\ngrid = (ParamGridBuilder()\n             .addGrid(dt.maxDepth, [10])\n             .addGrid(dt.maxBins, [60])\n             .addGrid(dt.maxIter, [50])\n             .build())\n\n\ngrid = (ParamGridBuilder()\n  .addGrid(dt.numTrees, [100, 200, 500])\n  .addGrid(dt.maxDepth, [7, 8, 9])\n  .build())\n\nevaluator = BinaryClassificationEvaluator(labelCol="Exited")\ncv = CrossValidator(estimator=dt_model, estimatorParamMaps=grid, evaluator=evaluator, parallelism=2)\ndt_model = cv.fit(trainingData)\n'

In [58]:
#list(zip(dt_model.avgMetrics, grid))

[(0.8542405863227363,
  {Param(parent='RandomForestClassifier_25f40061c38b', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 7,
   Param(parent='RandomForestClassifier_25f40061c38b', name='numTrees', doc='Number of trees to train (>= 1).'): 100}),
 (0.8579657203090585,
  {Param(parent='RandomForestClassifier_25f40061c38b', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 8,
   Param(parent='RandomForestClassifier_25f40061c38b', name='numTrees', doc='Number of trees to train (>= 1).'): 100}),
 (0.8588296902904078,
  {Param(parent='RandomForestClassifier_25f40061c38b', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 9,
   Param(parent='RandomFores

In [28]:
def predict(model, data):
  predictions = model.transform(data)
  predictions.show()
  predictions = predictions.withColumn('Exited', predictions.Exited.cast('float'))
  predictionAndLabels = predictions.select("Exited","prediction").rdd
  metrics = BinaryClassificationMetrics(predictionAndLabels)
  print(f"{model} Area under ROC = {metrics.areaUnderROC}" )
  return predictions


In [69]:
testingData = preprocess(test_data_df)
## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predict(lrModel, testingData)
predict(dt_model, testingData)

+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+--------------+-------------+-----------+-------------+------+--------------------+--------------------+
|CreditScore|Geography|Gender|Age|Tenure|  Balance|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|GeographyIndex| GeographyVec|GenderIndex|    GenderVec|Exited|            features|     features_scaled|
+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+--------------+-------------+-----------+-------------+------+--------------------+--------------------+
|        359|   France|Female| 44|     6|128747.69|            1|        1|             0|      146955.71|           0.0|(3,[0],[1.0])|        1.0|(2,[1],[1.0])|     1|[359.0,44.0,6.0,1...|[3.55224507950373...|
|        365|  Germany|  Male| 30|     0|127760.07|            1|        1|             0|       81537.85|           1.0|(3,[1],[1.0])|        0.0|(2,[0],[1



PipelineModel_f40088c468eb Area under ROC = 0.7721693680996007
+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+--------------+-------------+-----------+-------------+------+--------------------+--------------------+--------------------+--------------------+----------+
|CreditScore|Geography|Gender|Age|Tenure|  Balance|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|GeographyIndex| GeographyVec|GenderIndex|    GenderVec|Exited|            features|     features_scaled|       rawPrediction|         probability|prediction|
+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+--------------+-------------+-----------+-------------+------+--------------------+--------------------+--------------------+--------------------+----------+
|        359|   France|Female| 44|     6|128747.69|            1|        1|             0|      146955.71|           0.0|(3,[0],[1.0])|        1.

DataFrame[CreditScore: int, Geography: string, Gender: string, Age: int, Tenure: int, Balance: double, NumOfProducts: int, HasCrCard: int, IsActiveMember: int, EstimatedSalary: double, GeographyIndex: double, GeographyVec: vector, GenderIndex: double, GenderVec: vector, Exited: float, features: vector, features_scaled: vector, rawPrediction: vector, probability: vector, prediction: double]

# Public Evaluation Part

## Load private dataset, the same structure as public dataset

In [20]:
df_private_evl = spark.read.csv('public.csv',header=True,inferSchema=True)  # TA takes public dataset as example

## Do prediction with your PySpark model here (**Very Important**)
You must do prediction (inference) here.
Then TA will know how to run your ML model to test private data.

In [60]:
sI1 = StringIndexer(inputCol="Geography", outputCol="GeographyIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="GeographyIndex", outputCol="GeographyVec")
sI2 = StringIndexer(inputCol="Gender", outputCol="GenderIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="GenderIndex", outputCol="GenderVec")
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(df_private_evl).transform(df_private_evl)
encoded_final_df.show()


+----------+--------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+--------------+-------------+-----------+-------------+
|CustomerId| Surname|CreditScore|Geography|Gender|Age|Tenure|  Balance|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|GeographyIndex| GeographyVec|GenderIndex|    GenderVec|
+----------+--------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+--------------+-------------+-----------+-------------+
|  15565701|   Ferri|        698|    Spain|Female| 39|     9|161993.89|            1|        0|             0|       90212.38|     0|           2.0|(3,[2],[1.0])|        1.0|(2,[1],[1.0])|
|  15565706|Akobundu|        612|    Spain|  Male| 35|     1|      0.0|            1|        1|             1|       83256.26|     1|           2.0|(3,[2],[1.0])|        0.0|(2,[0],[1.0])|
|  15565796|Docherty|        745|  Germany|  Male| 48| 

In [70]:
testingData = preprocess(encoded_final_df)
#predictions = predict(lrModel, testingData)
predictions = predict(dt_model, testingData)

+----------+--------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+--------------+-------------+-----------+-------------+--------------------+--------------------+
|CustomerId| Surname|CreditScore|Geography|Gender|Age|Tenure|  Balance|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|GeographyIndex| GeographyVec|GenderIndex|    GenderVec|            features|     features_scaled|
+----------+--------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+--------------+-------------+-----------+-------------+--------------------+--------------------+
|  15565701|   Ferri|        698|    Spain|Female| 39|     9|161993.89|            1|        0|             0|       90212.38|     0|           2.0|(3,[2],[1.0])|        1.0|(2,[1],[1.0])|[698.0,39.0,9.0,1...|[7.22717417597988...|
|  15565706|Akobundu|        612|    Spain|  Male| 35|     1|      0.0|     



PipelineModel_f40088c468eb Area under ROC = 0.7253409775125004
+----------+--------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+--------------+-------------+-----------+-------------+--------------------+--------------------+--------------------+--------------------+----------+
|CustomerId| Surname|CreditScore|Geography|Gender|Age|Tenure|  Balance|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|GeographyIndex| GeographyVec|GenderIndex|    GenderVec|            features|     features_scaled|       rawPrediction|         probability|prediction|
+----------+--------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+--------------+-------------+-----------+-------------+--------------------+--------------------+--------------------+--------------------+----------+
|  15565701|   Ferri|        698|    Spain|Female| 39|     9|161993.89|            1|

## Print Your result as the following type

In [76]:
predictions.select('CustomerId', col('prediction').alias('Exited')).show(5)

+----------+------+
|CustomerId|Exited|
+----------+------+
|  15565701|   0.0|
|  15565706|   1.0|
|  15565796|   0.0|
|  15565806|   0.0|
|  15565878|   0.0|
+----------+------+
only showing top 5 rows



## Calculate f-1 score

In [78]:
from sklearn import metrics
import numpy as np
data_array = np.array(predictions.select('Exited').collect()).flatten()
#print(data_array)
prediction_array = np.array(predictions.select('prediction').collect()).flatten()
print("F1_score = %s" % metrics.f1_score(data_array, prediction_array))

F1_score = 0.9368421052631579


# ------------------------------------------------------------------------------------------------------
## Private Evaluation Part (TA will use this block to test your model)
Please 

In [None]:
df_private = spark.read.csv('private.csv',header=True,inferSchema=True)  # TA takes private dataset as example

## Do prediction with your PySpark model here

## TA will use the following function to get your prediction result (f-1 score)

In [None]:
from sklearn import metrics
import numpy as np
data_array =  np.array(df_private.select('Exited').collect())

metrics.f1_score(data_array,data_array)  

1.0