## Imports

In [36]:
# Installing pyspark
!pip install pyspark
import pyspark.sql.functions as f
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Multiclass Classification').getOrCreate()
import pandas as pd
import numpy as np



#### Loading Data

In [138]:
df = spark.read.options(delimeter=',', inferSchema=True, header=True).csv('/kaggle/input/customer/Train.csv')
df.show(5)

+------+------+------------+---+---------+-------------+---------------+--------------+-----------+-----+------------+
|    ID|Gender|Ever_Married|Age|Graduated|   Profession|Work_Experience|Spending_Score|Family_Size|Var_1|Segmentation|
+------+------+------------+---+---------+-------------+---------------+--------------+-----------+-----+------------+
|462809|  Male|          No| 22|       No|   Healthcare|            1.0|           Low|        4.0|Cat_4|           D|
|462643|Female|         Yes| 38|      Yes|     Engineer|           null|       Average|        3.0|Cat_4|           A|
|466315|Female|         Yes| 67|      Yes|     Engineer|            1.0|           Low|        1.0|Cat_6|           B|
|461735|  Male|         Yes| 67|      Yes|       Lawyer|            0.0|          High|        2.0|Cat_6|           B|
|462669|Female|         Yes| 40|      Yes|Entertainment|           null|          High|        6.0|Cat_6|           A|
+------+------+------------+---+---------+------

#### Handling Missing Data

In [139]:
num_cols = []
binary_cols = ['Ever_Married', 'Gender', 'Graduated']
multi_cat_cols = ['Profession', 'Spending_Score', 'Var_1']
for cols in df.dtypes:
    if cols[1] != 'string' and cols[0] != 'ID':
        num_cols.append(cols[0])
print('Numerical Columns are:',*num_cols)
print('Binary categorical columns are:',*binary_cols)
print('Multilabel categorical columns are:',*multi_cat_cols)

Numerical Columns are: Age Work_Experience Family_Size
Binary categorical columns are: Ever_Married Gender Graduated
Multilabel categorical columns are: Profession Spending_Score Var_1


In [148]:
print('Mode / Most frequent Values for Categorical columns:')
df.agg(*[f.mode(c).alias(c) for c in binary_cols+multi_cat_cols]).show()

Mode / Most frequent Values for Categorical columns:
+------------+------+---------+----------+--------------+-----+
|Ever_Married|Gender|Graduated|Profession|Spending_Score|Var_1|
+------------+------+---------+----------+--------------+-----+
|         Yes|  Male|      Yes|    Artist|           Low|Cat_6|
+------------+------+---------+----------+--------------+-----+



In [131]:
# Filling Model values for categorical columns
#df = df.fillna( { 'Ever_Married':'Yes', 'Gender':'Male', 'Graduated':'Yes', 'Profession':'Artist','Spending_Score':'Low', 'Var_1':'Cat_6'} )

## Preprocessing Pipeline

In [140]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, Imputer
from pyspark.ml.linalg import Vectors
from pyspark.ml import Pipeline

ind_b_cols = [i+"_ind_b" for i in binary_cols]
ind_m_cols = [i+"_ind_m" for i in multi_cat_cols]
ohe_op_cols = [i+"_ohe" for i in multi_cat_cols]

# Imputing missing values in numerical columns using median strategy
num_imputer = Imputer(strategy='median',inputCols=num_cols, outputCols=num_cols)

# String Indexing all string / categorical columns
indexer = StringIndexer(inputCols= binary_cols+multi_cat_cols, outputCols=ind_b_cols+ind_m_cols, stringOrderType='alphabetAsc', handleInvalid ='keep')

# String Indexing Segmentation (label) column
label_index = StringIndexer(inputCol='Segmentation', outputCol='label',stringOrderType='alphabetAsc', handleInvalid='skip')

# One Hot encoding multi label categorical columns
encoder = OneHotEncoder(inputCols = ind_m_cols, outputCols = ohe_op_cols, dropLast = False)

# Vector assembling all columns into a 'feature' vector
assembler = VectorAssembler(inputCols = ind_b_cols+ohe_op_cols+num_cols, outputCol="features")

# Creating a pipeline for all preprocessing steps
ohe_pipe = Pipeline(stages=[num_imputer, indexer, encoder,label_index, assembler])
pipe_model = ohe_pipe.fit(df)
processed_data = pipe_model.transform(df)

In [141]:
processed_data.select('Segmentation','label','features').show(truncate=False)

+------------+-----+--------------------------------------------------------------------+
|Segmentation|label|features                                                            |
+------------+-----+--------------------------------------------------------------------+
|D           |3.0  |(28,[1,8,15,20,25,26,27],[1.0,1.0,1.0,1.0,22.0,1.0,4.0])            |
|A           |0.0  |(28,[0,2,5,13,20,25,26,27],[1.0,1.0,1.0,1.0,1.0,38.0,1.0,3.0])      |
|B           |1.0  |(28,[0,2,5,15,22,25,26,27],[1.0,1.0,1.0,1.0,1.0,67.0,1.0,1.0])      |
|B           |1.0  |(28,[0,1,2,10,14,22,25,27],[1.0,1.0,1.0,1.0,1.0,1.0,67.0,2.0])      |
|A           |0.0  |(28,[0,2,6,14,22,25,26,27],[1.0,1.0,1.0,1.0,1.0,40.0,1.0,6.0])      |
|C           |2.0  |(28,[0,1,3,13,22,25,27],[1.0,1.0,1.0,1.0,1.0,56.0,2.0])             |
|C           |2.0  |(28,[1,2,8,15,22,25,26,27],[1.0,1.0,1.0,1.0,1.0,32.0,1.0,3.0])      |
|D           |3.0  |(28,[2,8,15,22,25,26,27],[1.0,1.0,1.0,1.0,33.0,1.0,3.0])            |
|D        

In [142]:
train, test = processed_data.randomSplit([0.8,0.2], seed=100)
print("There are %d Training samples and %d Test samples."%(train.count(), test.count()))

There are 6468 Training samples and 1600 Test samples.


## Training Base Models

### Logistic Regression

In [143]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression()
lrModel = lr.fit(train)

predictions = lrModel.transform(test)
predictions.select('label','rawPrediction','probability','prediction').show(truncate=False)

+-----+----------------------------------------------------------------------------------+---------------------------------------------------------------------------------+----------+
|label|rawPrediction                                                                     |probability                                                                      |prediction|
+-----+----------------------------------------------------------------------------------+---------------------------------------------------------------------------------+----------+
|3.0  |[-0.6147911128455169,-0.9064661381698055,-0.7004923629984652,2.2217496140137873]  |[0.0507059809465124,0.037877938527409324,0.04654141776958071,0.8648746627564976] |3.0       |
|3.0  |[0.3342172290181016,-0.8565404943383557,-0.6785431505505968,1.2008664158708506]   |[0.24714962550306213,0.0751312212294851,0.08976844709591769,0.5879507061715351]  |3.0       |
|0.0  |[0.43170108762444726,0.1263663413707123,-0.8079809531668395,0.24991352417

In [144]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
lr_acc = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
lr_f1 = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})
lr_precision = evaluator.evaluate(predictions, {evaluator.metricName: "precisionByLabel"})
print('Below are the Test metrics:')
print('Accuracy: ', lr_acc)
print('F1 Score: ', lr_f1)
print('Precision: ', lr_precision)


Below are the Test metrics:
Accuracy:  0.524375
F1 Score:  0.507622020909716
Precision:  0.424507658643326


In [103]:
model_list = []
model_acc = []
model_f1 = []
model_list.append('Logistic Regression')
model_acc.append(f'{lr_acc:.3f}')
model_f1.append(f'{lr_f1:.3f}')

### Random Forest

In [145]:
from pyspark.ml.classification import RandomForestClassifier

rfc = RandomForestClassifier()
rfcModel = rfc.fit(train)
rfc_predictions = rfcModel.transform(test)

rfc_acc = evaluator.evaluate(rfc_predictions, {evaluator.metricName: "accuracy"})
rfc_f1 = evaluator.evaluate(rfc_predictions, {evaluator.metricName: "f1"})
rfc_precision = evaluator.evaluate(rfc_predictions, {evaluator.metricName: "precisionByLabel"})
print('Below are the Test metrics:')
print('Accuracy: ', rfc_acc)
print('F1 Score: ', rfc_f1)
print('Precision: ', rfc_precision)

Below are the Test metrics:
Accuracy:  0.52125
F1 Score:  0.5086240900975929
Precision:  0.4263392857142857


In [105]:
model_list.append('Random Forest Classifier')
model_acc.append(f'{rfc_acc:.3f}')
model_f1.append(f'{rfc_f1:.3f}')

### Naive Bayes

In [106]:
from pyspark.ml.classification import NaiveBayes

nb = NaiveBayes()
nbModel = nb.fit(train)
nb_predictions = nbModel.transform(test)

nb_acc = evaluator.evaluate(nb_predictions, {evaluator.metricName: "accuracy"})
nb_f1 = evaluator.evaluate(nb_predictions, {evaluator.metricName: "f1"})
nb_precision = evaluator.evaluate(nb_predictions, {evaluator.metricName: "precisionByLabel"})
print('Below are the Test metrics:')
print('Accuracy: ', nb_acc)
print('F1 Score: ', nb_f1)
print('Precision: ', nb_precision)

Below are the Test metrics:
Accuracy:  0.46375
F1 Score:  0.4409486093398237
Precision:  0.36363636363636365


In [115]:
model_list.append('Naive Bayes')
model_acc.append(f'{nb_acc:.3f}')
model_f1.append(f'{nb_f1:.3f}')

### Decision Tree Classifier

In [114]:
from pyspark.ml.classification import DecisionTreeClassifier

dtc = DecisionTreeClassifier()
dtcModel = dtc.fit(train)
dtc_predictions = dtcModel.transform(test)

dtc_acc = evaluator.evaluate(dtc_predictions, {evaluator.metricName: "accuracy"})
dtc_f1 = evaluator.evaluate(dtc_predictions, {evaluator.metricName: "f1"})
dtc_precision = evaluator.evaluate(dtc_predictions, {evaluator.metricName: "precisionByLabel"})
print('Below are the Test metrics:')
print('Accuracy: ', dtc_acc)
print('F1 Score: ', dtc_f1)
print('Precision: ', dtc_precision)

Below are the Test metrics:
Accuracy:  0.5
F1 Score:  0.49654084500550116
Precision:  0.4296675191815857


In [116]:
model_list.append('Decision Tree')
model_acc.append(f'{dtc_acc:.3f}')
model_f1.append(f'{dtc_f1:.3f}')

### Comparing Metrics of all models

In [118]:
pd.DataFrame({"Model":model_list,
             "Accuracy": model_acc,
              'F1_Score': model_f1}).head()

Unnamed: 0,Model,Accuracy,F1_Score
0,Logistic Regression,0.524,0.508
1,Random Forest Classifier,0.521,0.509
2,Naive Bayes,0.464,0.441
3,Decision Tree,0.5,0.497


- **Logistic Regression Has the Highest accuracy**
- **Random forest has highest F1 score but a difference of only 0.001**
- **Hence will Choose Logistic Regression for Cross Validation and further tuning**