# Dataset Overview

In [27]:
import pandas as pd
df = pd.read_csv('public.csv')
#df[df["Exited"]==0].count()
#df.head()
#df.corr(method='pearson')

# Use Pyspark to view dataset 

In [28]:
!apt-get -y install openjdk-8-jre-headless
!pip install pyspark

Reading package lists... Done
Building dependency tree       
Reading state information... Done
openjdk-8-jre-headless is already the newest version (8u292-b10-0ubuntu1~18.04).
0 upgraded, 0 newly installed, 0 to remove and 39 not upgraded.


In [29]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Churn_Modelling").getOrCreate()
df = spark.read.csv('public.csv',header=True,inferSchema=True)
#df.printSchema()

# Do your work here

In [30]:
import matplotlib.pyplot as plt
from datetime import datetime
from dateutil import parser
from pyspark.sql.functions import col, when, udf
from pyspark.ml import Pipeline
from pyspark.ml import PipelineModel
from pyspark.ml.feature import RFormula
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, LinearSVC
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import round
from sklearn import metrics
import numpy as np

In [31]:
clean_df = df.select('CustomerId', 'CreditScore', 'Geography', 'Gender', 'Age', 'Tenure', 'Balance', 'NumOfProducts'\
                     , 'HasCrCard', 'IsActiveMember', 'EstimatedSalary', 'Exited')

In [32]:
# assign higher weights to (Exited == 1).
balancingRatio = clean_df.filter(col('Exited') == 1).count() / clean_df.count()
calculateWeights = udf(lambda x: 1 * balancingRatio if x == 0 else (1 * (1.0 - balancingRatio)), DoubleType())
weighted_df = clean_df.withColumn("Weight", calculateWeights("Exited"))

In [33]:
sI1 = StringIndexer(inputCol="Geography", outputCol="GeographyIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="GeographyIndex", outputCol="GeographyVec")
sI2 = StringIndexer(inputCol="Gender", outputCol="GenderIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="GenderIndex", outputCol="GenderVec")

encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(weighted_df).transform(weighted_df)

In [34]:
trainingFraction = 0.8
testingFraction = (1-trainingFraction)
seed = 1234

train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)

In [35]:
#logReg = LogisticRegression(labelCol='Exited', weightCol='Weight')
rf = RandomForestClassifier(labelCol='Exited', weightCol='Weight', numTrees=10)
#lsvc = LinearSVC(maxIter=10, regParam=0.1, weightCol='Weight')

classFormula = RFormula(formula="Exited ~ CreditScore + GeographyVec + GenderVec + Age + Tenure + Balance + NumOfProducts + HasCrCard + IsActiveMember + EstimatedSalary")
model = Pipeline(stages=[classFormula, rf]).fit(train_data_df)
trainingSummary = model.stages[-1].summary


In [36]:
accuracy = trainingSummary.accuracy
recall = trainingSummary.recallByLabel
print(f"accuracy: {accuracy}, recall: {recall}")

accuracy: 0.7796900906285764, recall: [0.8257114818449484, 0.7339864355689518]


In [44]:
predictions = model.transform(test_data_df)
predictionAndLabels = predictions.select("label", "prediction").rdd
metric = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metric.areaUnderROC)

Area under ROC = 0.708796556900272


In [38]:
y_true =  np.array(predictions.select("label").collect())
y_pred = np.array(predictions.select("prediction").collect())
metrics.f1_score(y_true, y_pred, average='micro')  

0.7978453738910013

# Evaluation Part

## Load private dataset, the same structure as public dataset

In [39]:
df_private = spark.read.csv('public.csv',header=True,inferSchema=True)  # TA takes public dataset as example

## Do prediction with your PySpark model here

In [41]:
encoded_df_private = Pipeline(stages=[sI1, en1, sI2, en2]).fit(df_private).transform(df_private)
pred = model.transform(encoded_df_private)

## Print Your result as the following type

In [47]:
pred.select('CustomerId','prediction').show(5)

+----------+----------+
|CustomerId|prediction|
+----------+----------+
|  15565701|       0.0|
|  15565706|       0.0|
|  15565796|       1.0|
|  15565806|       0.0|
|  15565878|       0.0|
+----------+----------+
only showing top 5 rows



## TA will use the following function to get your prediction result (f-1 score)

In [46]:
from sklearn import metrics
import numpy as np
data_array =  np.array(df_private.select("Exited").collect())
data_pred = np.array(pred.select("prediction").collect())

metrics.f1_score(data_array, data_pred, average='micro')  

0.805