# Dataset Overview

In [61]:
from pyspark import SparkContext 
from pyspark.sql import SQLContext
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.ml.classification import LogisticRegression
from pyspark.mllib.util import MLUtils
from pyspark.ml.feature import OneHotEncoder, StringIndexer, StandardScaler, VectorAssembler, SQLTransformer
from pyspark.mllib.evaluation import BinaryClassificationMetrics, MulticlassMetrics
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, CrossValidatorModel
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.linalg import Vectors
import pyspark

from pyspark.sql import SparkSession
from sklearn import metrics
import numpy as np
import pandas as pd


from pyspark.sql.functions import when
from pyspark.sql.functions import rank,sum,col
from pyspark.ml.classification import RandomForestClassifier as RF
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.mllib.evaluation import BinaryClassificationMetrics as metric
from pyspark.ml.classification import GBTClassifier


# Do your work here

In [62]:
# PUBLIC dataset here
path = 'public.csv'

In [72]:
# List numerical features & categorical features
target_col = "Exited"
use_cols = ['CreditScore', 'Geography', 'Gender', 'Age', 'Tenure', 'Balance', 'NumOfProducts', 'HasCrCard', 'IsActiveMember', 'EstimatedSalary']
cate_cols = ["Geography", 'Gender']
all_col = use_cols.append(target_col)
num_cols = list(set(use_cols) - set(cate_cols) - set([target_col]))
#num_cols.append('weights')
#print(num_cols)

def load(path):
    # Load DataFrame
    #path = "public.csv"
    #df = SQLContext.read.load(path)
    # ----
    
    spark = SparkSession.builder.appName("Churn_Modelling").getOrCreate()
    df = spark.read.csv(path,header=True,inferSchema=True)
    # Select useful columns (drop columns that should not be known 
    # before the flight take place) 
    #df = df.select(all_col)
    df = df.select('CustomerId', 'CreditScore', 'Geography', 'Gender', 'Age', 'Tenure', 'Balance', 'NumOfProducts', 'HasCrCard', 'IsActiveMember', 'EstimatedSalary', 'Exited')

    # Impute numerical features
    for col in num_cols:
        df = df.withColumn(col, df[col].cast('double'))
        mu = df.select(col).agg({col:'mean'}).collect()[0][0]
        df = df.withColumn(col, F.when(df[col].isNull(), mu)\
                           .otherwise(df[col]))
        
    df = df.withColumn('label', df[target_col].cast('double'))
    df = df.filter(df['label'].isNotNull())

    # Impute categorical features
    for col in cate_cols:
        frq = df.select(col).groupby(col).count()\
                            .orderBy('count', ascending=False) \
                            .limit(1).collect()[0][0]
        df = df.withColumn(col, F.when((df[col].isNull() | 
                                       (df[col] == '')), frq) \
                                .otherwise(df[col]))

    return df
#path = 'public'
df = load(path)  
# adding the new column weights and fill it with ratios

ratio = 0.92
def weight_balance(labels):
    return when(labels == 1, ratio).otherwise(1*(1-ratio))

df = df.withColumn('weights', weight_balance(col('label')))
num_cols.append('weights')

def gen_preprocessor(df):
    # String Indexing for categorical features
    indexers = [StringIndexer(inputCol=col, 
                              outputCol="{}_idx".format(col)) \
                              for col in cate_cols]
    
    # One-hot encoding for categorical features
    encoders = [OneHotEncoder(inputCol="{}_idx".format(col), 
                              outputCol="{}_oh".format(col)) \
                              for col in cate_cols]

    # Concat Feature Columns
    assembler = VectorAssembler(inputCols = num_cols + \
                            ["{}_oh".format(col) for col in cate_cols], 
                            outputCol = "_features")
    
    # Standardize Features
    scaler = StandardScaler(inputCol='_features', 
                            outputCol='features', 
                            withStd=True, withMean=False)

    preprocessor = Pipeline(stages = indexers + encoders + \
                                     [assembler, scaler]).fit(df)

    return preprocessor

preprocessor = gen_preprocessor(df)   
df = preprocessor.transform(df) 

def eval_f1(predictions):
    labe = np.array(predictions.select('label').collect())
    pred = np.array(predictions.select('prediction').collect())
    print("f1-score: ", metrics.f1_score(labe, pred, average = 'micro'))

# train/test
df.cache()
train_data, test_data = df.randomSplit([1.0, 0.0])

# TRAINING
## RF
#rf = RF(labelCol = 'label', featuresCol = 'features', numTrees = 200)
#rf_fit = rf.fit(train_data)
#transformed = rf_fit.transform(test_data)
#eval_f1(transformed)

# GBT
gbt = GBTClassifier(maxIter = 25)
gbtModel = gbt.fit(train_data)
#predictions3 = gbtModel.transform(test_data)
#eval_f1(predictions3)

# Evaluation Part

## Load private dataset, the same structure as public dataset

In [73]:
# input path for PRIVATE dataset here
path = 'public.csv'

In [74]:
# List numerical features & categorical features
target_col = "Exited"
use_cols = ['CreditScore', 'Geography', 'Gender', 'Age', 'Tenure', 'Balance', 'NumOfProducts', 'HasCrCard', 'IsActiveMember', 'EstimatedSalary']
cate_cols = ["Geography", 'Gender']
all_col = use_cols.append(target_col)
num_cols = list(set(use_cols) - set(cate_cols) - set([target_col]))

# load private dataset
df_private = load(path)  

# adding the new column weights and fill it with ratios
ratio = 0.92
df_private = df_private.withColumn('weights', weight_balance(col('label')))
num_cols.append('weights')

# pre-processing private dataset
preprocessor = gen_preprocessor(df_private)   
df_private = preprocessor.transform(df_private) 

## Do prediction with your PySpark model here

In [75]:
# prediction private dataset
gbt_predictions = gbtModel.transform(df_private)
#rf_predictions = rf_fit.transform(df_private)

# f1-score from Gradient Boost Tree
eval_f1(gbt_predictions)

f1-score:  1.0


## Print Your result as the following type

In [76]:
# validate result format
gbt_predictions.select('CustomerId','prediction').show(5)

+----------+----------+
|CustomerId|prediction|
+----------+----------+
|  15565701|       0.0|
|  15565706|       1.0|
|  15565796|       0.0|
|  15565806|       0.0|
|  15565878|       0.0|
+----------+----------+
only showing top 5 rows



## TA will use the following function to get your prediction result (f-1 score)

In [77]:
# For TA compute f1-score again
predicted =  np.array(gbt_predictions.select('prediction').collect())
label = np.array(gbt_predictions.select('label').collect())
metrics.f1_score(predicted,label, average = 'micro') 

1.0