<h2 align='center'>Solving a classification use-case in Pyspark </h2>

### Imports and Data Loading

In [107]:
import pandas as pd 
import numpy as np

In [108]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark=SparkSession.builder.appName("PysparkML").getOrCreate()
spark

In [151]:
import pyspark.ml                 
from pyspark.ml.feature import StringIndexer
from pyspark.sql.types import DoubleType, IntegerType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [110]:
df_pyspark=spark.read.option('header','true').csv("UCIData.txt",inferSchema=True)
df_pyspark.show(3)

+---+-------------+---+---+----+----+----+----+---+-----+----+-----+----+----+
|_c0|     Category|Age|Sex| ALB| ALP| ALT| AST|BIL|  CHE|CHOL| CREA| GGT|PROT|
+---+-------------+---+---+----+----+----+----+---+-----+----+-----+----+----+
|  1|0=Blood Donor| 32|  m|38.5|52.5| 7.7|22.1|7.5| 6.93|3.23|106.0|12.1|  69|
|  2|0=Blood Donor| 32|  m|38.5|70.3|  18|24.7|3.9|11.17| 4.8| 74.0|15.6|76.5|
|  3|0=Blood Donor| 32|  m|46.9|74.7|36.2|52.6|6.1| 8.84| 5.2| 86.0|33.2|79.3|
+---+-------------+---+---+----+----+----+----+---+-----+----+-----+----+----+
only showing top 3 rows



In [111]:
print(df_pyspark.columns)

['_c0', 'Category', 'Age', 'Sex', 'ALB', 'ALP', 'ALT', 'AST', 'BIL', 'CHE', 'CHOL', 'CREA', 'GGT', 'PROT']


### Feature Engineering 

In [112]:
## Rearraging columns
df_pyspark=df_pyspark.select('Age', 'Sex', 'ALB', 'ALP', 'ALT', 'AST', 'BIL', 'CHE', 'CHOL', 'CREA', 'GGT', 'PROT', 'Category')

In [113]:
df_pyspark.show(4)

+---+---+----+----+----+----+----+-----+----+-----+----+----+-------------+
|Age|Sex| ALB| ALP| ALT| AST| BIL|  CHE|CHOL| CREA| GGT|PROT|     Category|
+---+---+----+----+----+----+----+-----+----+-----+----+----+-------------+
| 32|  m|38.5|52.5| 7.7|22.1| 7.5| 6.93|3.23|106.0|12.1|  69|0=Blood Donor|
| 32|  m|38.5|70.3|  18|24.7| 3.9|11.17| 4.8| 74.0|15.6|76.5|0=Blood Donor|
| 32|  m|46.9|74.7|36.2|52.6| 6.1| 8.84| 5.2| 86.0|33.2|79.3|0=Blood Donor|
| 32|  m|43.2|  52|30.6|22.6|18.9| 7.33|4.74| 80.0|33.8|75.7|0=Blood Donor|
+---+---+----+----+----+----+----+-----+----+-----+----+----+-------------+
only showing top 4 rows



In [114]:
### Converting sex=(m,f) into vectors=(1,0)
gender_encoder=StringIndexer(inputCol='Sex',outputCol='sex').fit(df_pyspark)
df_pyspark=gender_encoder.transform(df_pyspark)

In [115]:
### LabelEncoding of Category
target_encoder=StringIndexer(inputCol='Category',outputCol='category').fit(df_pyspark)
df_pyspark=target_encoder.transform(df_pyspark)

In [116]:
df_pyspark.show(3)

+---+---+----+----+----+----+---+-----+----+-----+----+----+--------+
|Age|sex| ALB| ALP| ALT| AST|BIL|  CHE|CHOL| CREA| GGT|PROT|category|
+---+---+----+----+----+----+---+-----+----+-----+----+----+--------+
| 32|0.0|38.5|52.5| 7.7|22.1|7.5| 6.93|3.23|106.0|12.1|  69|     0.0|
| 32|0.0|38.5|70.3|  18|24.7|3.9|11.17| 4.8| 74.0|15.6|76.5|     0.0|
| 32|0.0|46.9|74.7|36.2|52.6|6.1| 8.84| 5.2| 86.0|33.2|79.3|     0.0|
+---+---+----+----+----+----+---+-----+----+-----+----+----+--------+
only showing top 3 rows



In [117]:
### Checking the null value count in each column
df_pyspark.printSchema()

root
 |-- Age: integer (nullable = true)
 |-- sex: double (nullable = false)
 |-- ALB: string (nullable = true)
 |-- ALP: string (nullable = true)
 |-- ALT: string (nullable = true)
 |-- AST: double (nullable = true)
 |-- BIL: double (nullable = true)
 |-- CHE: double (nullable = true)
 |-- CHOL: string (nullable = true)
 |-- CREA: double (nullable = true)
 |-- GGT: double (nullable = true)
 |-- PROT: string (nullable = true)
 |-- category: double (nullable = false)



In [119]:
### Converting the string type columns into double 
for i in range(len(df_pyspark.columns)):
    column_name=df_pyspark.columns[i]
    if df_pyspark.select(column_name).dtypes[0][1]=='string':
        df_pyspark=df_pyspark.withColumn(column_name,df_pyspark[column_name].cast(DoubleType()))

In [120]:
# Pyspark 'DataFrame' object has no attribute 'isNull'
for i in df_pyspark.columns:
    print("Null values in '{}' column = {}".format(i,df_pyspark.filter(df_pyspark[i].isNull()).count()) )

Null values in 'Age' column = 0
Null values in 'sex' column = 0
Null values in 'ALB' column = 1
Null values in 'ALP' column = 18
Null values in 'ALT' column = 1
Null values in 'AST' column = 0
Null values in 'BIL' column = 0
Null values in 'CHE' column = 0
Null values in 'CHOL' column = 10
Null values in 'CREA' column = 0
Null values in 'GGT' column = 0
Null values in 'PROT' column = 1
Null values in 'category' column = 0


In [121]:
df_pyspark=df_pyspark.fillna(value=0)

In [122]:
# Pyspark 'DataFrame' object has no attribute 'isNull'
for i in df_pyspark.columns:
    print("Null values in '{}' column = {}".format(i,df_pyspark.filter(df_pyspark[i].isNull()).count()) )

Null values in 'Age' column = 0
Null values in 'sex' column = 0
Null values in 'ALB' column = 0
Null values in 'ALP' column = 0
Null values in 'ALT' column = 0
Null values in 'AST' column = 0
Null values in 'BIL' column = 0
Null values in 'CHE' column = 0
Null values in 'CHOL' column = 0
Null values in 'CREA' column = 0
Null values in 'GGT' column = 0
Null values in 'PROT' column = 0
Null values in 'category' column = 0


### Split into train and test

In [128]:
required_features = ['Age', 'sex', 'ALB', 'ALP', 'ALT', 'AST', 'BIL', 'CHE', 'CHOL', 'CREA', 'GGT', 'PROT']
target_feature = 'category'

In [132]:
### Converting the features into vectors
vector_assembler = VectorAssembler(inputCols=required_features, outputCol='feature_vector')
x_and_y =vector_assembler.transform(df_pyspark).select('feature_vector','category')

In [139]:
train_df, test_df = x_and_y.randomSplit([0.8, 0.2], seed=42)

print("Shape of the Training Dataset : ", train_df.count())      
print("Shape of the Testing Dataset : ", test_df.count())    

Shape of the Training Dataset :  518
Shape of the Testing Dataset :  97


### Modeling

In [143]:
lr = LogisticRegression(featuresCol= 'feature_vector', labelCol = 'category')
trained_lr_model = lr.fit(train_df)

In [149]:
y_pred = trained_lr_model.transform(test_df)
y_pred.show(10)

+--------------------+--------+--------------------+--------------------+----------+
|      feature_vector|category|       rawPrediction|         probability|prediction|
+--------------------+--------+--------------------+--------------------+----------+
|[25.0,0.0,42.0,38...|     2.0|[-41.628181598119...|[5.65413878652831...|       1.0|
|[30.0,0.0,45.0,0....|     2.0|[0.71096218451604...|[0.00228691751385...|       2.0|
|[32.0,0.0,38.5,70...|     0.0|[16.2569657704463...|[0.99999994032107...|       0.0|
|[32.0,0.0,42.4,86...|     0.0|[15.4168032946208...|[0.99999878039966...|       0.0|
|[32.0,0.0,50.9,65...|     0.0|[17.8169752935213...|[0.99999862762836...|       0.0|
|[32.0,1.0,47.4,52...|     0.0|[19.1857356209346...|[0.99999966436415...|       0.0|
|[33.0,0.0,41.8,65...|     0.0|[11.4505023012611...|[0.99996262490845...|       0.0|
|[33.0,1.0,35.4,53...|     0.0|[13.7571421135942...|[0.99997446573648...|       0.0|
|[33.0,1.0,43.0,29...|     2.0|[11.2752904908514...|[0.9026673350

### Evaluation

In [154]:
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='category', metricName='accuracy')
accuracy = evaluator.evaluate(y_pred)
print("Accuracy of the model : ", accuracy)

Accuracy of the model :  0.9072164948453608


In [155]:
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='category', metricName='f1')
f1_score = evaluator.evaluate(y_pred)
print("F1-Score of the model : ",f1_score)

F1-Score of the model :  0.9135488010046513


### Saving the model

In [171]:
trained_lr_model.save(spark, "lr_classification_model")