In [1]:
import findspark

In [2]:
findspark.init("/opt/manual/spark")

In [3]:
from pyspark.sql import SparkSession, functions as F
import pandas as pd

In [4]:
spark = SparkSession.builder \
.appName("KMeans") \
.master("yarn") \
.config("spark.sql.shuffle.partitions", "2") \
.getOrCreate()


# Read Data

In [5]:
df = spark.read.format("csv") \
.option("header", True) \
.option("inferSchema", True) \
.load("/user/train/datasets/Churn_Modelling.csv")

In [6]:
df.limit(5).toPandas()

Unnamed: 0,RowNumber,CustomerId,Surname,CreditScore,Geography,Gender,Age,Tenure,Balance,NumOfProducts,HasCrCard,IsActiveMember,EstimatedSalary,Exited
0,1,15634602,Hargrave,619,France,Female,42,2,0.0,1,1,1,101348.88,1
1,2,15647311,Hill,608,Spain,Female,41,1,83807.86,1,0,1,112542.58,0
2,3,15619304,Onio,502,France,Female,42,8,159660.8,3,1,0,113931.57,1
3,4,15701354,Boni,699,France,Female,39,1,0.0,2,0,0,93826.63,0
4,5,15737888,Mitchell,850,Spain,Female,43,2,125510.82,1,1,1,79084.1,0


In [7]:
df.printSchema()

root
 |-- RowNumber: integer (nullable = true)
 |-- CustomerId: integer (nullable = true)
 |-- Surname: string (nullable = true)
 |-- CreditScore: integer (nullable = true)
 |-- Geography: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tenure: integer (nullable = true)
 |-- Balance: double (nullable = true)
 |-- NumOfProducts: integer (nullable = true)
 |-- HasCrCard: integer (nullable = true)
 |-- IsActiveMember: integer (nullable = true)
 |-- EstimatedSalary: double (nullable = true)
 |-- Exited: integer (nullable = true)



In [8]:
df.persist()

DataFrame[RowNumber: int, CustomerId: int, Surname: string, CreditScore: int, Geography: string, Gender: string, Age: int, Tenure: int, Balance: double, NumOfProducts: int, HasCrCard: int, IsActiveMember: int, EstimatedSalary: double, Exited: int]

In [9]:
df_count = df.count()
print(df_count)

10000


In [10]:
len(df.columns)

14

# Null Check

In [11]:
for col_name in df.dtypes:
    null_count  = df.filter((F.col(col_name[0]).isNull()) | (F.col(col_name[0]) == '') | (F.col(col_name[0]) == 'NA')).count() 
                        
    if null_count > 0 :
        print("{}  {} type null values :{} % {}".format(col_name[0], col_name[1], null_count, (null_count/df_count * 100)))

In [12]:
# No null

# Group Columns

In [13]:
categoric_cols = []

numeric_cols = []

discarted_cols = ['RowNumber', 'CustomerId', 'Surname']

label_col = ["Exited"]

In [14]:
for col_name in df.dtypes:
    if(col_name[0] not in discarted_cols + label_col):
        if col_name[1] == 'string':
            categoric_cols.append(col_name[0])
        else:
            numeric_cols.append(col_name[0])

In [15]:
print(categoric_cols)
print(len(categoric_cols))

['Geography', 'Gender']
2


In [16]:
print(numeric_cols)
print(len(numeric_cols))

['CreditScore', 'Age', 'Tenure', 'Balance', 'NumOfProducts', 'HasCrCard', 'IsActiveMember', 'EstimatedSalary']
8


In [17]:
print(discarted_cols)
print(len(discarted_cols))

['RowNumber', 'CustomerId', 'Surname']
3


In [18]:
print(label_col)
print(len(label_col))

['Exited']
1


#  Trim (For String values)

In [19]:
for col_name in categoric_cols:
    df1 = df.withColumn(col_name, F.trim(col_name))

In [20]:
df.select(label_col[0]).groupBy(label_col[0]).count().show()

+------+-----+
|Exited|count|
+------+-----+
|     1| 2037|
|     0| 7963|
+------+-----+



# Examine Categoric Variables

In [21]:
for col_name in categoric_cols:
    df1.select(col_name).groupBy(col_name).count().show()

+---------+-----+
|Geography|count|
+---------+-----+
|   France| 5014|
|  Germany| 2509|
|    Spain| 2477|
+---------+-----+

+------+-----+
|Gender|count|
+------+-----+
|Female| 4543|
|  Male| 5457|
+------+-----+



In [22]:
# No problem 

# Verify Columns

In [23]:
if len(df1.columns) == (len(categoric_cols) + len(numeric_cols)+ len(discarted_cols)+len(label_col)):
    print("Columns are verified!")
else:
    print("Columns are not verified!")

Columns are verified!


# Select Cols To Hot Coded

In [24]:
to_be_oheded = []

for col_name in categoric_cols:
    count = df1.select(col_name).distinct().count()
    
    if count > 2:
        to_be_oheded.append(col_name)
    

In [25]:
print(to_be_oheded)

['Geography']


# StringIndexer

In [26]:
from pyspark.ml.feature import StringIndexer

In [27]:
my_dict = {}

string_indexer_obj = []

string_indexer_output_names = []

ohe_input_names = []

ohe_output_names = []

for col_name in categoric_cols:
    my_dict[col_name + "_indexed_obj"] = StringIndexer() \
                                        .setHandleInvalid("skip") \
                                        .setInputCol(col_name) \
                                        .setOutputCol(col_name + "_indexed")
    
    string_indexer_obj.append(my_dict.get(col_name + "_indexed_obj"))
    string_indexer_output_names.append(col_name + "_indexed")
    
    if col_name in to_be_oheded:
        ohe_input_names.append(col_name +"_indexed")
        ohe_output_names.append(col_name + "_ohe")

In [28]:
print(string_indexer_obj)

[StringIndexer_1ef58be4b32c, StringIndexer_849b3855b673]


In [29]:
print(string_indexer_output_names)

['Geography_indexed', 'Gender_indexed']


In [30]:
print(ohe_input_names)

['Geography_indexed']


In [31]:
print(ohe_output_names)

['Geography_ohe']


# OHE (One Hot Encoder)

In [32]:
from pyspark.ml.feature import OneHotEncoder

In [33]:
string_indexer_ohe_excluded = list(set(string_indexer_output_names).difference(set(ohe_input_names)))
print(string_indexer_ohe_excluded)

['Gender_indexed']


In [34]:
encoder = OneHotEncoder().setInputCols(ohe_input_names).setOutputCols(ohe_output_names)

# Vector Assembler

In [35]:
from pyspark.ml.feature import VectorAssembler

In [36]:
assembler = VectorAssembler().setInputCols(numeric_cols + string_indexer_ohe_excluded + ohe_output_names) \
.setOutputCol("unscaled_features")

# Scaler

In [37]:
from pyspark.ml.feature import StandardScaler

In [38]:
scaler = StandardScaler().setInputCol("unscaled_features").setOutputCol("features")

# Pipeline

In [41]:
from pyspark.ml import Pipeline

In [52]:
pipeline_obj = Pipeline().setStages(string_indexer_obj + [encoder, assembler, scaler])

In [53]:
pipeline_model = pipeline_obj.fit(df1)

In [54]:
pipeline_df = pipeline_model.transform(df1)

In [56]:
pipeline_df.limit(5).toPandas()

Unnamed: 0,RowNumber,CustomerId,Surname,CreditScore,Geography,Gender,Age,Tenure,Balance,NumOfProducts,HasCrCard,IsActiveMember,EstimatedSalary,Exited,Geography_indexed,Gender_indexed,Geography_ohe,unscaled_features,features
0,1,15634602,Hargrave,619,France,Female,42,2,0.0,1,1,1,101348.88,1,0.0,1.0,"(1.0, 0.0)","[619.0, 42.0, 2.0, 0.0, 1.0, 1.0, 1.0, 101348....","[6.404333924389993, 4.0046505619078925, 0.6915..."
1,2,15647311,Hill,608,Spain,Female,41,1,83807.86,1,0,1,112542.58,0,2.0,1.0,"(0.0, 0.0)","[608.0, 41.0, 1.0, 83807.86, 1.0, 0.0, 1.0, 11...","[6.2905250824379895, 3.9093017390053237, 0.345..."
2,3,15619304,Onio,502,France,Female,42,8,159660.8,3,1,0,113931.57,1,0.0,1.0,"(1.0, 0.0)","[502.0, 42.0, 8.0, 159660.8, 3.0, 1.0, 0.0, 11...","[5.193821696355051, 4.0046505619078925, 2.7660..."
3,4,15701354,Boni,699,France,Female,39,1,0.0,2,0,0,93826.63,0,0.0,1.0,"(1.0, 0.0)","[699.0, 39.0, 1.0, 0.0, 2.0, 0.0, 0.0, 93826.6...","[7.232034593131834, 3.718604093200186, 0.34576..."
4,5,15737888,Mitchell,850,Spain,Female,43,2,125510.82,1,1,1,79084.1,0,2.0,1.0,"(0.0, 0.0)","[850.0, 43.0, 2.0, 125510.82, 1.0, 1.0, 1.0, 7...","[8.794319605382057, 4.099999384810461, 0.69152..."


# Train Model

In [57]:
from pyspark.ml.clustering import KMeans

In [58]:
def compute_kmeans_model(df,k):
    kmeansObject = KMeans() \
    .setSeed(142) \
    .setK(k)
    
    return kmeansObject.fit(df)

# Evaluator

In [60]:
from pyspark.ml.evaluation import ClusteringEvaluator

In [61]:
evaluator = ClusteringEvaluator()

In [62]:
for k in range(2,11):
    
    kmeans_model = compute_kmeans_model(pipeline_df, k)
    
    transformed_df = kmeans_model.transform(pipeline_df)
    
    score = evaluator.evaluate(transformed_df)
    
    print("k: {}, score: {}".format(k, score))

k: 2, score: 0.24590625242752776
k: 3, score: 0.22645984369837002
k: 4, score: 0.21109416946633186
k: 5, score: 0.2016827064122783
k: 6, score: 0.18932939099895102
k: 7, score: 0.1852897706405449
k: 8, score: 0.1758095043959162
k: 9, score: 0.1855896305798431
k: 10, score: 0.167440153785295


In [63]:
kmeans_model = compute_kmeans_model(pipeline_df, 2)

# Prediction

In [64]:
transformed_df = kmeans_model.transform(pipeline_df)

In [66]:
transformed_df.select('Exited', 'Prediction').limit(20).toPandas()

Unnamed: 0,Exited,Prediction
0,1,1
1,0,0
2,1,1
3,0,1
4,0,0
5,1,0
6,0,1
7,1,0
8,0,1
9,0,1
