# KKBox Customer Churn Prediction
### w/ BigQuery and Apache Spark

---

# Part III: <font color=green>*Model Creation and Evaluation*</font>

---

In [48]:
# General Imports
from __future__ import absolute_import
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import seaborn as sns
from scipy import stats
import warnings
warnings.filterwarnings('ignore')

# Evaluation Imports
from sklearn.metrics import silhouette_samples, silhouette_score
from sklearn import metrics
import matplotlib.pyplot as plt
import matplotlib.cm as cm
from sklearn.preprocessing import StandardScaler
%matplotlib inline

# Imports for PySpark
import findspark
findspark.init()
# import pyspark
from pyspark import SparkConf
from pyspark.ml import Pipeline
from pyspark.sql import SQLContext

# # Imports for BigQuery connection
# import json
# import pprint
# import subprocess

# # Imports for GCP
# from google.cloud import bigquery
import time 
# import gcsfs

# Imports for Spark ML
from pyspark.ml.feature import (VectorAssembler, OneHotEncoderEstimator, OneHotEncoder)
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

In [None]:
## Dataproc Specs

# Jupyter Initialization: gs://srcd-dataproc/jupyter.sh 
# Components Installed: Anaconda and Jupyter
# Master Node:   x1 - 4 vCPU w/ 15 GB RAM each
# Workers Nodes: x5 - 4 vCPU w/ 15 GB RAM each
# Disk: 100GB

## Create Spark Session and Import Data

In [None]:
# Specify Google Credentials
import os
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] ='D:\OneDrive\J-5\GitHub\Google Credentials.json'

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.Builder().config(conf=SparkConf().setMaster("local[*]")).getOrCreate()

# Instantiate BigQuery magic
# %load_ext google.cloud.bigquery

In [4]:
# # If Working Locally on Computer, Importing Data Locally#

# Import DRV_Jan2016 (Train Set) 

DRV_Jan20160 = spark.read.csv('D:\J-5 Local\Datasets_KKBox User Data_Monthly Datasets_DRV_Jan2016000000000000',inferSchema=True,header=True)
DRV_Jan20161 = spark.read.csv('D:\J-5 Local\Datasets_KKBox User Data_Monthly Datasets_DRV_Jan2016000000000001',inferSchema=True,header=True)
DRV_Jan20162 = spark.read.csv('D:\J-5 Local\Datasets_KKBox User Data_Monthly Datasets_DRV_Jan2016000000000002',inferSchema=True,header=True)

DRV_Jan2016 = DRV_Jan20160.union(DRV_Jan20161)
DRV_Jan2016 = DRV_Jan2016.union(DRV_Jan20162)

DRV_Jan20160.unpersist()
DRV_Jan20161.unpersist()
DRV_Jan20162.unpersist()

# # Import DRV_Feb2016 (Validation Set) 
# DRV_Feb20160 = spark.read.csv('D:\J-5 Local\Datasets_KKBox User Data_Monthly Datasets_DRV_Feb2016000000000000',inferSchema=True,header=True)
# DRV_Feb20161 = spark.read.csv('D:\J-5 Local\Datasets_KKBox User Data_Monthly Datasets_DRV_Feb2016000000000001',inferSchema=True,header=True)
# DRV_Feb20162 = spark.read.csv('D:\J-5 Local\Datasets_KKBox User Data_Monthly Datasets_DRV_Feb2016000000000002',inferSchema=True,header=True)

# DRV_Feb2016 = DRV_Feb20160.union(DRV_Feb20161)
# DRV_Feb2016 = DRV_Feb2016.union(DRV_Feb20162)

# DRV_Feb20160.unpersist()
# DRV_Feb20161.unpersist()
# DRV_Feb20162.unpersist()

DataFrame[msno: string, membership_expire_date: timestamp, payment_method_id: int, payment_plan_days: int, plan_list_price: int, net_paid_amount: int, is_net_paid_amount: string, is_auto_renew: int, is_cancel: int, city: int, bd: int, registered_via: int, registration_init_time: timestamp, membership_length: int, is_churn: int, total_songs: int, total_logins: int, total_secs: double, sum_num_unq: int, sum_num_repeat: int, sum_over_50pec: int, sum_over_75pec: int, sum_over_985pec: int, total_transactions: int, total_spent: int, avg_spent_trans: double, spent_per_logins: double, spent_per_secs: double, spent_per_song: double, spent_per_num_unq: double, spent_per_num_repeats: double, never_active_subscriber: int, total_spent_zero: int, city_agg: int, payment_method_agg: int, expire_last_login: int, total_cancelations: int, songs_last_7: int, songs_last_7_AVG: double, logins_last_7: int, logins_last_7_AVG: double, total_secs_last_7: double, total_secs_last_7_AVG: double, num_unq_last_7: in

In [49]:
# Import DRV_Jan2016 as Pandas 
DRV_Jan20160 = pd.read_csv('D:\J-5 Local\Datasets_KKBox User Data_Monthly Datasets_DRV_Jan2016000000000000')
DRV_Jan20161 = pd.read_csv('D:\J-5 Local\Datasets_KKBox User Data_Monthly Datasets_DRV_Jan2016000000000001')
DRV_Jan20162 = pd.read_csv('D:\J-5 Local\Datasets_KKBox User Data_Monthly Datasets_DRV_Jan2016000000000002')

DRV_Jan2016x = pd.concat([DRV_Jan20160,DRV_Jan20161,DRV_Jan20162])

del DRV_Jan20160
del DRV_Jan20161
del DRV_Jan20162

# Drop columns
DRV_Jan2016x = DRV_Jan2016x.drop('msno', axis=1)
DRV_Jan2016x = DRV_Jan2016x.drop('is_churn', axis=1)
DRV_Jan2016x = DRV_Jan2016x.drop('membership_expire_date', axis=1)
DRV_Jan2016x = DRV_Jan2016x.drop('registration_init_time', axis=1)
DRV_Jan2016x = DRV_Jan2016x.drop('city', axis=1)
DRV_Jan2016x = DRV_Jan2016x.drop('bd', axis=1)
DRV_Jan2016x = DRV_Jan2016x.drop('payment_method_id', axis=1)
DRV_Jan2016x = DRV_Jan2016x.drop('is_net_paid_amount', axis=1)
DRV_Jan2016x = DRV_Jan2016x.drop('registered_via', axis=1)


DRV_Jan2016x = DRV_Jan2016x.drop(['is_auto_renew', 'total_spent_zero', 'city_agg', 
                   'payment_method_agg', 'never_active_subscriber'],
                   axis=1)

# Scale the data.
X_scaled = StandardScaler().fit_transform(DRV_Jan2016x)

In [None]:
# ## If Working Locally on Computer, Importing Data from GCS ##

# # Import DRV_Jan2016 (Train Set) from Google Cloud Storage via Pandas
# DRV_Jan2016_Balanced_1 = spark.createDataFrame(pd.read_csv('gs://dataproc-fb3fa26d-011a-4757-afb9-5efdd6e75d60-us-east1/Datasets/KKBox User Data/Monthly Datasets/DRV_Jan2016_Balanced_1'))
# DRV_Jan2016_Balanced_2 = spark.createDataFrame(pd.read_csv('gs://dataproc-fb3fa26d-011a-4757-afb9-5efdd6e75d60-us-east1/Datasets/KKBox User Data/Monthly Datasets/DRV_Jan2016_Balanced_2'))
# DRV_Jan2016_Balanced_3 = spark.createDataFrame(pd.read_csv('gs://dataproc-fb3fa26d-011a-4757-afb9-5efdd6e75d60-us-east1/Datasets/KKBox User Data/Monthly Datasets/DRV_Jan2016_Balanced_3'))

# # Import DRV_Feb2016 (Validation Set) from Google Cloud Storage via Pandas
# DRV_Feb2016 = spark.createDataFrame(pd.read_csv('gs://dataproc-fb3fa26d-011a-4757-afb9-5efdd6e75d60-us-east1/Datasets/KKBox User Data/Monthly Datasets/DRV_Feb2016'))

# # Import DRV_Mar2016 (Test Set) from Google Cloud Storage via Pandas
# DRV_Mar2016 = spark.createDataFrame(pd.read_csv('gs://dataproc-fb3fa26d-011a-4757-afb9-5efdd6e75d60-us-east1/Datasets/KKBox User Data/Monthly Datasets/DRV_Mar2016'))

In [None]:
# # If Working on Dataproc Cloud ##

# # Import DRV_Jan2016 (Train Set) 
# DRV_Jan2016_1to1 = spark.read.csv('gs://dataproc-fb3fa26d-011a-4757-afb9-5efdd6e75d60-us-east1/Datasets/KKBox User Data/Monthly Datasets/DRV_Jan2016_1to1',inferSchema=True,header=True)
# DRV_Jan2016_3to1 = spark.read.csv('gs://dataproc-fb3fa26d-011a-4757-afb9-5efdd6e75d60-us-east1/Datasets/KKBox User Data/Monthly Datasets/DRV_Jan2016_3to1',inferSchema=True,header=True)
# DRV_Jan2016_5to1 = spark.read.csv('gs://dataproc-fb3fa26d-011a-4757-afb9-5efdd6e75d60-us-east1/Datasets/KKBox User Data/Monthly Datasets/DRV_Jan2016_5to1',inferSchema=True,header=True)
# DRV_Jan2016_7to1 = spark.read.csv('gs://dataproc-fb3fa26d-011a-4757-afb9-5efdd6e75d60-us-east1/Datasets/KKBox User Data/Monthly Datasets/DRV_Jan2016_7to1',inferSchema=True,header=True)
# DRV_Jan2016_9to1 = spark.read.csv('gs://dataproc-fb3fa26d-011a-4757-afb9-5efdd6e75d60-us-east1/Datasets/KKBox User Data/Monthly Datasets/DRV_Jan2016_9to1',inferSchema=True,header=True)
# DRV_Jan2016_11to1 = spark.read.csv('gs://dataproc-fb3fa26d-011a-4757-afb9-5efdd6e75d60-us-east1/Datasets/KKBox User Data/Monthly Datasets/DRV_Jan2016_11to1',inferSchema=True,header=True)
# DRV_Jan2016_13to1 = spark.read.csv('gs://dataproc-fb3fa26d-011a-4757-afb9-5efdd6e75d60-us-east1/Datasets/KKBox User Data/Monthly Datasets/DRV_Jan2016_13to1',inferSchema=True,header=True)

# DRV_Jan20160 = spark.read.csv('gs://dataproc-fb3fa26d-011a-4757-afb9-5efdd6e75d60-us-east1/Datasets/KKBox User Data/Monthly Datasets/DRV_Jan2016000000000000',inferSchema=True,header=True)
# DRV_Jan20161 = spark.read.csv('gs://dataproc-fb3fa26d-011a-4757-afb9-5efdd6e75d60-us-east1/Datasets/KKBox User Data/Monthly Datasets/DRV_Jan2016000000000001',inferSchema=True,header=True)
# DRV_Jan20162 = spark.read.csv('gs://dataproc-fb3fa26d-011a-4757-afb9-5efdd6e75d60-us-east1/Datasets/KKBox User Data/Monthly Datasets/DRV_Jan2016000000000002',inferSchema=True,header=True)

# DRV_Jan2016 = DRV_Jan20160.union(DRV_Jan20161)
# DRV_Jan2016 = DRV_Jan2016.union(DRV_Jan20162)

# DRV_Jan20160 = None
# DRV_Jan20161 = None
# DRV_Jan20162 = None

# # Import DRV_Feb2016 (Validation Set) 
# DRV_Feb20160 = spark.read.csv('gs://dataproc-fb3fa26d-011a-4757-afb9-5efdd6e75d60-us-east1/Datasets/KKBox User Data/Monthly Datasets/DRV_Feb2016000000000000',inferSchema=True,header=True)
# DRV_Feb20161 = spark.read.csv('gs://dataproc-fb3fa26d-011a-4757-afb9-5efdd6e75d60-us-east1/Datasets/KKBox User Data/Monthly Datasets/DRV_Feb2016000000000001',inferSchema=True,header=True)
# DRV_Feb20162 = spark.read.csv('gs://dataproc-fb3fa26d-011a-4757-afb9-5efdd6e75d60-us-east1/Datasets/KKBox User Data/Monthly Datasets/DRV_Feb2016000000000002',inferSchema=True,header=True)

# DRV_Feb2016 = DRV_Feb20160.union(DRV_Feb20161)
# DRV_Feb2016 = DRV_Feb2016.union(DRV_Feb20162)

# DRV_Feb20160 = None
# DRV_Feb20161 = None
# DRV_Feb20162 = None

***Cast Correct Column Types on All Sets***

In [None]:
column_types_pd = [('msno', 'STRING'),
 ('membership_expire_date', 'DATE'),
 ('payment_method_id', 'INT64'),
 ('payment_plan_days', 'INT64'),
 ('plan_list_price', 'INT64'),
 ('net_paid_amount', 'INT64'),
 ('is_net_paid_amount', 'STRING'),
 ('is_auto_renew', 'INT64'),
 ('city', 'INT64'),
 ('bd', 'INT64'),
 ('registered_via', 'INT64'),
 ('registration_init_time', 'DATE'),
 ('membership_length', 'INT64'),
 ('is_churn', 'FLOAT64'),
 ('total_songs', 'INT64'),
 ('total_logins', 'INT64'),
 ('total_secs', 'FLOAT64'),
 ('sum_num_unq', 'INT64'),
 ('sum_num_repeat', 'INT64'),
 ('sum_over_50pec', 'INT64'),
 ('sum_over_75pec', 'INT64'),
 ('sum_over_985pec', 'INT64'),
 ('total_transactions', 'INT64'),
 ('total_spent', 'FLOAT64'),
 ('avg_spent_trans', 'FLOAT64'),
 ('spent_per_logins', 'FLOAT64'),
 ('spent_per_secs', 'FLOAT64'),
 ('spent_per_song', 'FLOAT64'),
 ('spent_per_num_unq', 'FLOAT64'),
 ('spent_per_num_repeats', 'FLOAT64'),
 ('never_active_subscriber', 'FLOAT64'),
 ('total_spent_zero', 'FLOAT64'),
 ('city_agg', 'INT64'),
 ('payment_method_agg', 'INT64'),
 ('songs_last_7', 'FLOAT64'),
 ('songs_last_7_AVG', 'FLOAT64'),
 ('logins_last_7', 'FLOAT64'),
 ('logins_last_7_AVG', 'FLOAT64'),
 ('total_secs_last_7', 'FLOAT64'),
 ('total_secs_last_7_AVG', 'FLOAT64'),
 ('num_unq_last_7', 'FLOAT64'),
 ('num_unq_last_7_AVG', 'FLOAT64'),
 ('num_repeat_last_7', 'FLOAT64'),
 ('num_repeat_last_7_AVG', 'FLOAT64'),
 ('over_50perc_last_7', 'FLOAT64'),
 ('over_50perc_last_7_AVG', 'FLOAT64'),
 ('over_75perc_last_7', 'FLOAT64'),
 ('over_75perc_last_7_AVG', 'FLOAT64'),
 ('over_985perc_last_7', 'FLOAT64'),
 ('over_985perc_last_7_AVG', 'FLOAT64'),
 ('songs_last_15', 'FLOAT64'),
 ('songs_last_15_AVG', 'FLOAT64'),
 ('logins_last_15', 'FLOAT64'),
 ('logins_last_15_AVG', 'FLOAT64'),
 ('total_secs_last_15', 'FLOAT64'),
 ('total_secs_last_15_AVG', 'FLOAT64'),
 ('num_unq_last_15', 'FLOAT64'),
 ('num_unq_last_15_AVG', 'FLOAT64'),
 ('num_repeat_last_15', 'FLOAT64'),
 ('num_repeat_last_15_AVG', 'FLOAT64'),
 ('over_50perc_last_15', 'FLOAT64'),
 ('over_50perc_last_15_AVG', 'FLOAT64'),
 ('over_75perc_last_15', 'FLOAT64'),
 ('over_75perc_last_15_AVG', 'FLOAT64'),
 ('over_985perc_last_15', 'FLOAT64'),
 ('over_985perc_last_15_AVG', 'FLOAT64'),
 ('songs_last_30', 'FLOAT64'),
 ('songs_last_30_AVG', 'FLOAT64'),
 ('logins_last_30', 'FLOAT64'),
 ('logins_last_30_AVG', 'FLOAT64'),
 ('total_secs_last_30', 'FLOAT64'),
 ('total_secs_last_30_AVG', 'FLOAT64'),
 ('num_unq_last_30', 'FLOAT64'),
 ('num_unq_last_30_AVG', 'FLOAT64'),
 ('num_repeat_last_30', 'FLOAT64'),
 ('num_repeat_last_30_AVG', 'FLOAT64'),
 ('over_50perc_last_30', 'FLOAT64'),
 ('over_50perc_last_30_AVG', 'FLOAT64'),
 ('over_75perc_last_30', 'FLOAT64'),
 ('over_75perc_last_30_AVG', 'FLOAT64'),
 ('over_985perc_last_30', 'FLOAT64'),
 ('over_985perc_last_30_AVG', 'FLOAT64'),
 ('songs_last_60', 'FLOAT64'),
 ('songs_last_60_AVG', 'FLOAT64'),
 ('logins_last_60', 'FLOAT64'),
 ('logins_last_60_AVG', 'FLOAT64'),
 ('total_secs_last_60', 'FLOAT64'),
 ('total_secs_last_60_AVG', 'FLOAT64'),
 ('num_unq_last_60', 'FLOAT64'),
 ('num_unq_last_60_AVG', 'FLOAT64'),
 ('num_repeat_last_60', 'FLOAT64'),
 ('num_repeat_last_60_AVG', 'FLOAT64'),
 ('over_50perc_last_60', 'FLOAT64'),
 ('over_50perc_last_60_AVG', 'FLOAT64'),
 ('over_75perc_last_60', 'FLOAT64'),
 ('over_75perc_last_60_AVG', 'FLOAT64'),
 ('over_985perc_last_60', 'FLOAT64'),
 ('over_985perc_last_60_AVG', 'FLOAT64'),
 ('songs_last_120', 'FLOAT64'),
 ('songs_last_120_AVG', 'FLOAT64'),
 ('logins_last_120', 'FLOAT64'),
 ('logins_last_120_AVG', 'FLOAT64'),
 ('total_secs_last_120', 'FLOAT64'),
 ('total_secs_last_120_AVG', 'FLOAT64'),
 ('num_unq_last_120', 'FLOAT64'),
 ('num_unq_last_120_AVG', 'FLOAT64'),
 ('num_repeat_last_120', 'FLOAT64'),
 ('num_repeat_last_120_AVG', 'FLOAT64'),
 ('over_50perc_last_120', 'FLOAT64'),
 ('over_50perc_last_120_AVG', 'FLOAT64'),
 ('over_75perc_last_120', 'FLOAT64'),
 ('over_75perc_last_120_AVG', 'FLOAT64'),
 ('over_985perc_last_120', 'FLOAT64'),
 ('over_985perc_last_120_AVG', 'FLOAT64'),
 ('SUM_unq_songs_0_15', 'FLOAT64'),
 ('AVG_unq_songs_0_15', 'FLOAT64'),
 ('SUM_songs_0_15', 'FLOAT64'),
 ('AVG_songs_0_15', 'FLOAT64'),
 ('SUM_secs_0_15', 'FLOAT64'),
 ('AVG_secs_0_15', 'FLOAT64'),
 ('SUM_songs50_0_15', 'FLOAT64'),
 ('AVG_songs50_0_15', 'FLOAT64'),
 ('SUM_logins_0_15', 'FLOAT64'),
 ('AVG_logins_0_15', 'FLOAT64'),
 ('SUM_repeats_0_15', 'FLOAT64'),
 ('AVG_repeats_0_15', 'FLOAT64'),
 ('SUM_unq_songs_15_30', 'FLOAT64'),
 ('AVG_unq_songs_15_30', 'FLOAT64'),
 ('SUM_songs_15_30', 'FLOAT64'),
 ('AVG_songs_15_30', 'FLOAT64'),
 ('SUM_secs_15_30', 'FLOAT64'),
 ('AVG_secs_15_30', 'FLOAT64'),
 ('SUM_songs50_15_30', 'FLOAT64'),
 ('AVG_songs50_15_30', 'FLOAT64'),
 ('SUM_logins_15_30', 'FLOAT64'),
 ('AVG_logins_15_30', 'FLOAT64'),
 ('SUM_repeats_15_30', 'FLOAT64'),
 ('AVG_repeats_15_30', 'FLOAT64'),
 ('SUM_unq_songs_30_45', 'FLOAT64'),
 ('AVG_unq_songs_30_45', 'FLOAT64'),
 ('SUM_songs_30_45', 'FLOAT64'),
 ('AVG_songs_30_45', 'FLOAT64'),
 ('SUM_secs_30_45', 'FLOAT64'),
 ('AVG_secs_30_45', 'FLOAT64'),
 ('SUM_songs50_30_45', 'FLOAT64'),
 ('AVG_songs50_30_45', 'FLOAT64'),
 ('SUM_logins_30_45', 'FLOAT64'),
 ('AVG_logins_30_45', 'FLOAT64'),
 ('SUM_repeats_30_45', 'FLOAT64'),
 ('AVG_repeats_30_45', 'FLOAT64'),
 ('SUM_unq_songs_45_60', 'FLOAT64'),
 ('AVG_unq_songs_45_60', 'FLOAT64'),
 ('SUM_songs_45_60', 'FLOAT64'),
 ('AVG_songs_45_60', 'FLOAT64'),
 ('SUM_secs_45_60', 'FLOAT64'),
 ('AVG_secs_45_60', 'FLOAT64'),
 ('SUM_songs50_45_60', 'FLOAT64'),
 ('AVG_songs50_45_60', 'FLOAT64'),
 ('SUM_logins_45_60', 'FLOAT64'),
 ('AVG_logins_45_60', 'FLOAT64'),
 ('SUM_repeats_45_60', 'FLOAT64'),
 ('AVG_repeats_45_60', 'FLOAT64'),
 ('DIFSUM_unq_songs_0_15_15_30', 'FLOAT64'),
 ('DIFAVG_unq_songs_0_15_15_30', 'FLOAT64'),
 ('DIFSUM_songs_0_15_15_30', 'FLOAT64'),
 ('DIFAVG_songs_0_15_15_30', 'FLOAT64'),
 ('DIFSUM_secs_0_15_15_30', 'FLOAT64'),
 ('DIFAVG_secs_0_15_15_30', 'FLOAT64'),
 ('DIFSUM_songs50_0_15_15_30', 'FLOAT64'),
 ('DIFAVG_songs50_0_15_15_30', 'FLOAT64'),
 ('DIFSUM_logins_0_15_15_30', 'FLOAT64'),
 ('DIFAVG_logins_0_15_15_30', 'FLOAT64'),
 ('DIFSUM_repeats_0_15_15_30', 'FLOAT64'),
 ('DIFAVG_repeats_0_15_15_30', 'FLOAT64'),
 ('DIFSUM_unq_songs_15_30_30_45', 'FLOAT64'),
 ('DIFAVG_unq_songs_15_30_30_45', 'FLOAT64'),
 ('DIFSUM_songs_15_30_30_45', 'FLOAT64'),
 ('DIFAVG_songs_15_30_30_45', 'FLOAT64'),
 ('DIFSUM_secs_15_30_30_45', 'FLOAT64'),
 ('DIFAVG_secs_15_30_30_45', 'FLOAT64'),
 ('DIFSUM_songs50_15_30_30_45', 'FLOAT64'),
 ('DIFAVG_songs50_15_30_30_45', 'FLOAT64'),
 ('DIFSUM_logins_15_30_30_45', 'FLOAT64'),
 ('DIFAVG_logins_15_30_30_45', 'FLOAT64'),
 ('DIFSUM_repeats_15_30_30_45', 'FLOAT64'),
 ('DIFAVG_repeats_15_30_30_45', 'FLOAT64'),
 ('DIFSUM_unq_songs_30_45_45_60', 'FLOAT64'),
 ('DIFAVG_unq_songs_30_45_45_60', 'FLOAT64'),
 ('DIFSUM_songs_30_45_45_60', 'FLOAT64'),
 ('DIFAVG_songs_30_45_45_60', 'FLOAT64'),
 ('DIFSUM_secs_30_45_45_60', 'FLOAT64'),
 ('DIFAVG_secs_30_45_45_60', 'FLOAT64'),
 ('DIFSUM_songs50_30_45_45_60', 'FLOAT64'),
 ('DIFAVG_songs50_30_45_45_60', 'FLOAT64'),
 ('DIFSUM_logins_30_45_45_60', 'FLOAT64'),
 ('DIFAVG_logins_30_45_45_60', 'FLOAT64'),
 ('DIFSUM_repeats_30_45_45_60', 'FLOAT64'),
 ('DIFAVG_repeats_30_45_45_60', 'FLOAT64'),
 ('expire_last_login', 'INT64'),
 ('total_cancelations', 'INT64'),
 ('login_after_expire_10', 'INT64'),
 ('login_after_expire_20', 'INT64'),
 ('login_after_expire_30', 'INT64'),
 ('STD_unq_songs_0_15', 'FLOAT64'),
 ('STD_songs_0_15', 'FLOAT64'),
 ('STD_secs_0_15', 'FLOAT64'),
 ('STD_songs50_0_15', 'FLOAT64'),
 ('STD_repeats_0_15', 'FLOAT64'),
 ('STD_unq_songs_15_30', 'FLOAT64'),
 ('STD_songs_15_30', 'FLOAT64'),
 ('STD_secs_15_30', 'FLOAT64'),
 ('STD_songs50_15_30', 'FLOAT64'),
 ('STD_repeats_15_30', 'FLOAT64'),
 ('STD_unq_songs_30_45', 'FLOAT64'),
 ('STD_songs_30_45', 'FLOAT64'),
 ('STD_secs_30_45', 'FLOAT64'),
 ('STD_songs50_30_45', 'FLOAT64'),
 ('STD_repeats_30_45', 'FLOAT64'),
 ('STD_unq_songs_45_60', 'FLOAT64'),
 ('STD_songs_45_60', 'FLOAT64'),
 ('STD_secs_45_60', 'FLOAT64'),
 ('STD_songs50_45_60', 'FLOAT64'),
 ('STD_repeats_45_60', 'FLOAT64'),
 ('DIFSTD_unq_songs_0_15_15_30', 'FLOAT64'),
 ('DIFSTD_songs_0_15_15_30', 'FLOAT64'),
 ('DIFSTD_secs_0_15_15_30', 'FLOAT64'),
 ('DIFSTD_songs50_0_15_15_30', 'FLOAT64'),
 ('DIFSTD_repeats_0_15_15_30', 'FLOAT64'),
 ('DIFSTD_unq_songs_15_30_30_45', 'FLOAT64'),
 ('DIFSTD_songs_15_30_30_45', 'FLOAT64'),
 ('DIFSTD_secs_15_30_30_45', 'FLOAT64'),
 ('DIFSTD_songs50_15_30_30_45', 'FLOAT64'),
 ('DIFSTD_repeats_15_30_30_45', 'FLOAT64'),
 ('DIFSTD_unq_songs_30_45_45_60', 'FLOAT64'),
 ('DIFSTD_songs_30_45_45_60', 'FLOAT64'),
 ('DIFSTD_secs_30_45_45_60', 'FLOAT64'),
 ('DIFSTD_songs50_30_45_45_60', 'FLOAT64'),
 ('DIFSTD_repeats_30_45_45_60', 'FLOAT64'),
 ('is_cancel', 'INT64')]

In [None]:
from pyspark.sql.functions import expr

# Correctly Cast DRV_Feb2016
for feature, datatype in column_types_pd:
    if datatype == 'STRING':
        exec(f'DRV_Jan2016_1to1 = DRV_Feb2016.withColumn("{feature}", expr("CAST({feature} AS string)"))')
    if datatype == 'DATE':
        exec(f'DRV_Jan2016_1to1 = DRV_Feb2016.withColumn("{feature}", expr("CAST({feature} AS timestamp)"))')
    if datatype == 'INT64':
        exec(f'DRV_Jan2016_1to1 = DRV_Feb2016.withColumn("{feature}", expr("CAST({feature} AS integer)"))')
    if datatype == 'FLOAT64':
        exec(f'DRV_Jan2016_1to1 = DRV_Feb2016.withColumn("{feature}", expr("CAST({feature} AS double)"))')

## Model Pre-Processing
https://medium.com/@dhiraj.p.rai/essentials-of-feature-engineering-in-pyspark-part-i-76a57680a85

### - <font color=blue>Split Feautres by Categorical or Continuous</font> -

In [5]:
# Create list of Categorical feature names
cat_feats = ['is_auto_renew', 'total_spent_zero', 'city_agg', 'payment_method_agg', 'never_active_subscriber']

In [6]:
# Create list of Continuous feature names
cont_feats = [x for x in DRV_Jan2016.columns if x not in cat_feats]
cont_feats.remove('msno')
cont_feats.remove('is_churn')
cont_feats.remove('membership_expire_date')
cont_feats.remove('registration_init_time')
cont_feats.remove('city')
cont_feats.remove('bd')
cont_feats.remove('payment_method_id')
cont_feats.remove('is_net_paid_amount')
cont_feats.remove('registered_via')

### - <font color=blue>Data Pre-Processing</font> -

#### <font color=purple>*Vector Assembler*</font>

In [7]:
# Create master list of feature names for model
final_features = cont_feats

In [10]:
# Specify the numeric features we will be transforming, and the name of the resulting output feature
assembler = VectorAssembler(
    inputCols= final_features,
    outputCol='features')

#### <font color=purple>*Feature Scaling*</font>

In [11]:
# Scale all features into our final output features
scaler = StandardScaler(inputCol='features', 
                        outputCol='features_scaled',
                        withStd=True, withMean=False)

## Model Creation: Pipeline and Tuning

### - <font color=blue>Create Pipeline Object</font> -
https://spark.apache.org/docs/2.4.3/ml-pipeline.html

In [17]:
sample = DRV_Jan2016.limit(100)

In [52]:
plt.figure(figsize=(15,15))
# for i, n_clusters in enumerate([2]):

i = 0
n_clusters = 2

## Cluster Calculation ##
print('1')
# Initialize the clusterer with n_clusters value and a random generator seed of 10 for reproducibility.
kmeans = KMeans(featuresCol='features_scaled',
                initMode='k-means||').setK(n_clusters).setSeed(11)

# Create pipeline object
kmeans = Pipeline(stages=[assembler,scaler,kmeans])
print('2')

## Cluster Evaluation ##

# Fit to DRV_January
model = kmeans.fit(DRV_Jan2016)
print('3')

# Make predictions
predictions = model.transform(DRV_Jan2016)
print('4')

# Make Cluster Labels
cluster_labels = predictions.select('prediction').toPandas().prediction
print('5')

# Calculate silhouette scores
silhouette_avg = silhouette_score(X_scaled, cluster_labels)
ch_score = metrics.calinski_harabasz_score(X_scaled, cluster_labels)  
print("For n_clusters =", n_clusters)
print("The average silhouette_score is :", silhouette_avg)
print("The calinski_harabasz_score is :", ch_score)
print('')
print('6')
# Calculate silhouette scores
silhouette_avg = silhouette_score(X_scaled, cluster_labels)
ch_score = metrics.calinski_harabasz_score(X_scaled, cluster_labels)  

# Compute the silhouette scores for each sample
sample_silhouette_values = silhouette_samples(X_scaled, cluster_labels)
print('7')
## Cluster Visualization ##

# Create a subplot with 1 row and 2 columns
plt.subplot(3,2,i+1)

# The 1st subplot is the silhouette plot
# The silhouette coefficient can range from -1, 1 but in this example all
# lie within [-0.1, 1]
plt.xlim([-0.1, 1])
# The (n_clusters+1)*10 is for inserting blank space between silhouette
# plots of individual clusters, to demarcate them clearly.
plt.ylim([0, len(X_scaled) + (n_clusters + 1) * 10])
print('8')
y_lower = 10
for i in range(n_clusters):
    # Aggregate the silhouette scores for samples belonging to
    # cluster i, and sort them
    ith_cluster_silhouette_values = \
        sample_silhouette_values[cluster_labels == i]

    ith_cluster_silhouette_values.sort()

    size_cluster_i = ith_cluster_silhouette_values.shape[0]
    y_upper = y_lower + size_cluster_i

    color = cm.nipy_spectral(float(i) / n_clusters)
    plt.fill_betweenx(np.arange(y_lower, y_upper),
                      0, ith_cluster_silhouette_values,
                      facecolor=color, edgecolor=color, alpha=0.7)

    # Label the silhouette plots with their cluster numbers at the middle
    plt.text(-0.05, y_lower + 0.5 * size_cluster_i, str(i))

    # Compute the new y_lower for next plot
    y_lower = y_upper + 10  # 10 for the 0 samples

plt.title("The silhouette plot for the various clusters.")
plt.xlabel("The silhouette coefficient values")
plt.ylabel("Cluster label")

# The vertical line for average silhouette score of all the values
plt.axvline(x=silhouette_avg, color="red", linestyle="--")

plt.yticks([])  # Clear the yaxis labels / ticks
plt.xticks([-0.1, 0, 0.2, 0.4, 0.6, 0.8, 1])

plt.tight_layout()
plt.show()

1
2
3
4
5


KeyboardInterrupt: 

<Figure size 1080x1080 with 0 Axes>

In [33]:
x = predictions.select('prediction').toPandas().prediction

{0, 1}

In [24]:
silhouette

0.627515869783927