### Initialize Spark within Jupyter Notebook

In [7]:
import findspark
findspark.init()
findspark.find()

'C:\\spark\\spark-3.5.1-bin-hadoop3'

### Loading libraries and packages

In [8]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.ml.classification import RandomForestClassifier, DecisionTreeClassifier, LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
from sklearn.metrics import confusion_matrix
from sklearn.datasets import load_iris
import pandas as pd

## Data Preparation

### Load iris dataset from scikit-learn package

In [9]:
iris = load_iris()
iris

{'data': array([[5.1, 3.5, 1.4, 0.2],
        [4.9, 3. , 1.4, 0.2],
        [4.7, 3.2, 1.3, 0.2],
        [4.6, 3.1, 1.5, 0.2],
        [5. , 3.6, 1.4, 0.2],
        [5.4, 3.9, 1.7, 0.4],
        [4.6, 3.4, 1.4, 0.3],
        [5. , 3.4, 1.5, 0.2],
        [4.4, 2.9, 1.4, 0.2],
        [4.9, 3.1, 1.5, 0.1],
        [5.4, 3.7, 1.5, 0.2],
        [4.8, 3.4, 1.6, 0.2],
        [4.8, 3. , 1.4, 0.1],
        [4.3, 3. , 1.1, 0.1],
        [5.8, 4. , 1.2, 0.2],
        [5.7, 4.4, 1.5, 0.4],
        [5.4, 3.9, 1.3, 0.4],
        [5.1, 3.5, 1.4, 0.3],
        [5.7, 3.8, 1.7, 0.3],
        [5.1, 3.8, 1.5, 0.3],
        [5.4, 3.4, 1.7, 0.2],
        [5.1, 3.7, 1.5, 0.4],
        [4.6, 3.6, 1. , 0.2],
        [5.1, 3.3, 1.7, 0.5],
        [4.8, 3.4, 1.9, 0.2],
        [5. , 3. , 1.6, 0.2],
        [5. , 3.4, 1.6, 0.4],
        [5.2, 3.5, 1.5, 0.2],
        [5.2, 3.4, 1.4, 0.2],
        [4.7, 3.2, 1.6, 0.2],
        [4.8, 3.1, 1.6, 0.2],
        [5.4, 3.4, 1.5, 0.4],
        [5.2, 4.1, 1.5, 0.1],
  

In [10]:
type(iris)

sklearn.utils._bunch.Bunch

### Convert into pandas dataframe

In [11]:
# Convert iris data into dataframe
df = pd.DataFrame(iris.data, columns=iris.feature_names)
df['label'] = pd.Series(iris.target)
df.head()

Unnamed: 0,sepal length (cm),sepal width (cm),petal length (cm),petal width (cm),label
0,5.1,3.5,1.4,0.2,0
1,4.9,3.0,1.4,0.2,0
2,4.7,3.2,1.3,0.2,0
3,4.6,3.1,1.5,0.2,0
4,5.0,3.6,1.4,0.2,0


In [12]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 150 entries, 0 to 149
Data columns (total 5 columns):
 #   Column             Non-Null Count  Dtype  
---  ------             --------------  -----  
 0   sepal length (cm)  150 non-null    float64
 1   sepal width (cm)   150 non-null    float64
 2   petal length (cm)  150 non-null    float64
 3   petal width (cm)   150 non-null    float64
 4   label              150 non-null    int32  
dtypes: float64(4), int32(1)
memory usage: 5.4 KB


### Create Spark session

In [13]:
# Create SparkSession 
spark = SparkSession.builder.appName("Classification_Iris_Dataset").getOrCreate()

### Transform from pandas dataframe into spark dataframe

In [14]:
df_iris = spark.createDataFrame(df)
df_iris.printSchema()

root
 |-- sepal length (cm): double (nullable = true)
 |-- sepal width (cm): double (nullable = true)
 |-- petal length (cm): double (nullable = true)
 |-- petal width (cm): double (nullable = true)
 |-- label: long (nullable = true)



In [15]:
df_iris.show(5)

+-----------------+----------------+-----------------+----------------+-----+
|sepal length (cm)|sepal width (cm)|petal length (cm)|petal width (cm)|label|
+-----------------+----------------+-----------------+----------------+-----+
|              5.1|             3.5|              1.4|             0.2|    0|
|              4.9|             3.0|              1.4|             0.2|    0|
|              4.7|             3.2|              1.3|             0.2|    0|
|              4.6|             3.1|              1.5|             0.2|    0|
|              5.0|             3.6|              1.4|             0.2|    0|
+-----------------+----------------+-----------------+----------------+-----+
only showing top 5 rows



### Feature transformer with Vector Assembler

-  VectorAssembler is a feature transformer in Apache Spark MLlib (Machine Learning library) that is used to assemble multiple columns of features into a single vector column.
-  It's a fundamental tool in preprocessing data for machine learning tasks within Spark, particularly when dealing with datasets that have multiple feature columns.
-  It is useful especially to train logistic regression and decision tree model.


In [16]:
features = iris.feature_names

va = VectorAssembler(inputCols = features, outputCol='features')

va_df = va.transform(df_iris)
va_df = va_df.select(['features', 'label'])
print(va_df.show(5))

+-----------------+-----+
|         features|label|
+-----------------+-----+
|[5.1,3.5,1.4,0.2]|    0|
|[4.9,3.0,1.4,0.2]|    0|
|[4.7,3.2,1.3,0.2]|    0|
|[4.6,3.1,1.5,0.2]|    0|
|[5.0,3.6,1.4,0.2]|    0|
+-----------------+-----+
only showing top 5 rows

None


### Split the data into training and testing dataset

In [17]:
# Split data into training and testing data
(train, test) = va_df.randomSplit([0.7, 0.3], seed=1)

# Random Forest Classification Model

In [18]:
# Define random forest classifier model using RandomForestClassifier()
rfc = RandomForestClassifier(featuresCol="features", labelCol="label")

# Hyperparameter tuning
# Setting up a grid search with cross-validation to tune hyperparameters for a RandomForestClassifier in Apache Spark's MLlib
paramGrid_rfc = ParamGridBuilder()\
    .addGrid(rfc.numTrees, [10, 20, 30]) \
    .addGrid(rfc.maxDepth, [5, 10, 15])\
    .addGrid(rfc.impurity, ['gini', 'entropy'])\
    .build()

cv_rfc = CrossValidator(estimator=rfc,
                    estimatorParamMaps=paramGrid_rfc,
                    evaluator=MulticlassClassificationEvaluator(labelCol="label"),
                    numFolds=5)

In [19]:
# Fit training dataset
model_rfc = cv_rfc.fit(train)

# Retrieves the best performing RandomForestClassifier model based on the evaluation metric specified (MulticlassClassificationEvaluator).
bestModel_rfc = model_rfc.bestModel

# Extract the best parameters resulted in the highest evaluation metric
bestParams_rfc = bestModel_rfc.extractParamMap()
print("Best Parameters:")
for param, value in bestParams_rfc.items():
    print(f"{param.name}: {value}")

Best Parameters:
bootstrap: True
cacheNodeIds: False
checkpointInterval: 10
featureSubsetStrategy: auto
featuresCol: features
impurity: gini
labelCol: label
leafCol: 
maxBins: 32
maxDepth: 5
maxMemoryInMB: 256
minInfoGain: 0.0
minInstancesPerNode: 1
minWeightFractionPerNode: 0.0
numTrees: 10
predictionCol: prediction
probabilityCol: probability
rawPredictionCol: rawPrediction
seed: 562900841175604191
subsamplingRate: 1.0


In [20]:
# Predict test data based on the best RFC model
pred_rfc = bestModel_rfc.transform(test)
print("Prediction Table")
pred_rfc.show(3)

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")

# Evaluation Metrics
acc_rfc = evaluator.evaluate(pred_rfc)
precision_rfc = evaluator.evaluate(pred_rfc, {evaluator.metricName: "weightedPrecision"})
recall_rfc = evaluator.evaluate(pred_rfc, {evaluator.metricName: "weightedRecall"})
f1_rfc = evaluator.evaluate(pred_rfc, {evaluator.metricName: "f1"})
 
print("Accuracy (Random Forest Model): ", acc_rfc)
print("Precision (Random Forest Model): ", precision_rfc)
print("Recall (Random Forest Model): ", recall_rfc)
print("F1 (Random Forest Model): ", f1_rfc)

# Confusion Matrix
y_pred_rfc = pred_rfc.select("prediction").collect()
y_orig_rfc = pred_rfc.select("label").collect()

cm_rfc = confusion_matrix(y_orig_rfc, y_pred_rfc)
print("\nConfusion Matrix (Random Forest Model):")
print(cm_rfc)

Prediction Table
+-----------------+-----+--------------+-------------+----------+
|         features|label| rawPrediction|  probability|prediction|
+-----------------+-----+--------------+-------------+----------+
|[4.9,3.0,1.4,0.2]|    0|[10.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|[5.0,3.4,1.5,0.2]|    0|[10.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|[5.1,3.5,1.4,0.3]|    0|[10.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
+-----------------+-----+--------------+-------------+----------+
only showing top 3 rows

Accuracy (Random Forest Model):  0.9351914951615102
Precision (Random Forest Model):  0.9470108695652174
Recall (Random Forest Model):  0.9347826086956522
F1 (Random Forest Model):  0.9351914951615102

Confusion Matrix (Random Forest Model):
[[15  0  0]
 [ 0 15  3]
 [ 0  0 13]]


# Decision Tree Classification Model

In [21]:
# Define decision tree classifier model using DecisionTreeClassifier()
dtc = DecisionTreeClassifier(featuresCol="features", labelCol="label")

# Hyperparameter tuning
# Setting up a grid search with cross-validation to tune hyperparameters for a RandomForestClassifier in Apache Spark's MLlib
paramGrid_dtc = ParamGridBuilder()\
    .addGrid(dtc.minInstancesPerNode, [1, 3, 5]) \
    .addGrid(dtc.maxDepth, [5, 10, 15])\
    .addGrid(dtc.impurity, ['gini', 'entropy'])\
    .build()

cv_dtc = CrossValidator(estimator=dtc,
                    estimatorParamMaps=paramGrid_dtc,
                    evaluator=MulticlassClassificationEvaluator(labelCol="label"),
                    numFolds=5)

In [22]:
# Fit training dataset
model_dtc = cv_dtc.fit(train)

# Retrieves the best performing DecisionTreeClassifier model based on the evaluation metric specified (MulticlassClassificationEvaluator).
bestModel_dtc = model_dtc.bestModel

# Extract the best parameters resulted in the highest evaluation metric
bestParams_dtc = bestModel_dtc.extractParamMap()
print("Best Parameters:")
for param, value in bestParams_dtc.items():
    print(f"{param.name}: {value}")

Best Parameters:
cacheNodeIds: False
checkpointInterval: 10
featuresCol: features
impurity: gini
labelCol: label
leafCol: 
maxBins: 32
maxDepth: 5
maxMemoryInMB: 256
minInfoGain: 0.0
minInstancesPerNode: 3
minWeightFractionPerNode: 0.0
predictionCol: prediction
probabilityCol: probability
rawPredictionCol: rawPrediction
seed: 6874514065473941020


In [23]:
# Predict test data based on the best DTC model
pred_dtc = bestModel_dtc.transform(test)
print("Prediction Table")
pred_dtc.show(3)

evaluator=MulticlassClassificationEvaluator(predictionCol="prediction")

#Evaluation metrics
acc_dtc = evaluator.evaluate(pred_dtc)
precision_dtc = evaluator.evaluate(pred_dtc, {evaluator.metricName: "weightedPrecision"})
recall_dtc = evaluator.evaluate(pred_dtc, {evaluator.metricName: "weightedRecall"})
f1_dtc = evaluator.evaluate(pred_dtc, {evaluator.metricName: "f1"})
 
print("Accuracy (Decision Tree Classifier Model): ", acc_dtc)
print("Precision (Decision Tree Classifier Model): ", precision_dtc)
print("Recall (Decision Tree Classifier Model): ", recall_dtc)
print("F1 (Decision Tree Classifier Model): ", f1_dtc)

y_pred_dtc = pred_dtc.select("prediction").collect()
y_orig_dtc = pred_dtc.select("label").collect()

# Confusion matrix
cm_dtc = confusion_matrix(y_orig_dtc, y_pred_dtc)
print("\nConfusion Matrix (Decision Tree Model):")
print(cm_dtc)

Prediction Table
+-----------------+-----+--------------+-------------+----------+
|         features|label| rawPrediction|  probability|prediction|
+-----------------+-----+--------------+-------------+----------+
|[4.9,3.0,1.4,0.2]|    0|[35.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|[5.0,3.4,1.5,0.2]|    0|[35.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|[5.1,3.5,1.4,0.3]|    0|[35.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
+-----------------+-----+--------------+-------------+----------+
only showing top 3 rows

Accuracy (Decision Tree Classifier Model):  0.9134057971014493
Precision (Decision Tree Classifier Model):  0.9335038363171356
Recall (Decision Tree Classifier Model):  0.9130434782608695
F1 (Decision Tree Classifier Model):  0.9134057971014493

Confusion Matrix (Decision Tree Model):
[[15  0  0]
 [ 0 14  4]
 [ 0  0 13]]


# Logistic Regression Model

In [24]:
# Define logistic regression model using LogisticRegression()
logr = LogisticRegression(featuresCol="features", labelCol="label")

# Hyperparameter tuning
# Setting up a grid search with cross-validation to tune hyperparameters for a RandomForestClassifier in Apache Spark's MLlib
paramGrid_logr = ParamGridBuilder()\
    .addGrid(logr.regParam, [0.01, 0.1, 1.0]) \
    .addGrid(logr.elasticNetParam, [0.0, 0.5, 1.0])\
    .build()

cv_logr = CrossValidator(estimator=logr,
                    estimatorParamMaps=paramGrid_logr,
                    evaluator=MulticlassClassificationEvaluator(labelCol="label"),
                    numFolds=5)

In [25]:
# Fit training dataset
model_logr = cv_logr.fit(train)

# Retrieves the best performing LogisticRegression model based on the evaluation metric specified (MulticlassClassificationEvaluator).
bestModel_logr = model_logr.bestModel

# Extract the best parameters resulted in the highest evaluation metric
bestParams_logr = bestModel_logr.extractParamMap()
print("Best Parameters:")
for param, value in bestParams_logr.items():
    print(f"{param.name}: {value}")

Best Parameters:
aggregationDepth: 2
elasticNetParam: 0.0
family: auto
featuresCol: features
fitIntercept: True
labelCol: label
maxBlockSizeInMB: 0.0
maxIter: 100
predictionCol: prediction
probabilityCol: probability
rawPredictionCol: rawPrediction
regParam: 0.01
standardization: True
threshold: 0.5
tol: 1e-06


In [26]:
# Predict test data based on the best logr model
pred_logr = bestModel_logr.transform(test)
print("Prediction Table")
pred_logr.show(3)

evaluator=MulticlassClassificationEvaluator(predictionCol="prediction")

# Evaluate accuracy
acc_logr = evaluator.evaluate(pred_logr)
precision_logr = evaluator.evaluate(pred_logr, {evaluator.metricName: "weightedPrecision"})
recall_logr = evaluator.evaluate(pred_logr, {evaluator.metricName: "weightedRecall"})
f1_logr = evaluator.evaluate(pred_logr, {evaluator.metricName: "f1"})
 
print("Accuracy (Logistic Regression Model): ", acc_logr)
print("Precision (Logistic Regression Model): ", precision_logr)
print("Recall (Logistic Regression Model): ", recall_logr)
print("F1 (Logistic Regression Model): ", f1_logr)
 
y_pred_logr = pred_logr.select("prediction").collect()
y_orig_logr = pred_logr.select("label").collect()

#Confusion matrix
cm_logr = confusion_matrix(y_orig_logr, y_pred_logr)
print("\nConfusion Matrix (Logistic Regression Model):")
print(cm_logr)

Prediction Table
+-----------------+-----+--------------------+--------------------+----------+
|         features|label|       rawPrediction|         probability|prediction|
+-----------------+-----+--------------------+--------------------+----------+
|[4.9,3.0,1.4,0.2]|    0|[4.78851188280944...|[0.89712837540888...|       0.0|
|[5.0,3.4,1.5,0.2]|    0|[5.70800181821203...|[0.97053978487086...|       0.0|
|[5.1,3.5,1.4,0.3]|    0|[5.76905283413695...|[0.97581185673399...|       0.0|
+-----------------+-----+--------------------+--------------------+----------+
only showing top 3 rows

Accuracy (Logistic Regression Model):  0.956795761782974
Precision (Logistic Regression Model):  0.9623188405797102
Recall (Logistic Regression Model):  0.9565217391304347
F1 (Logistic Regression Model):  0.956795761782974

Confusion Matrix (Logistic Regression Model):
[[15  0  0]
 [ 0 16  2]
 [ 0  0 13]]


# Results and Discussion

### **Metric score of the three models**

- **Accuracy**

Accuracy represents the proportion of correctly classified instances out of all instances.

    1.  Random Forest Model:  0.9134057971014493
    2.  Decision Tree Classifier Model:  0.9134057971014493
    3.  Logistic Regression Model:  0.956795761782974

-  **Precision**

Precision measures the proportion of true positive predictions (correctly predicted instances of a class) out of all positive predictions made by the model. A high precision score (close to 1.0) indicates that when the model predicts a class, it is usually correct. 

    1.  Random Forest Model:  0.9335038363171356
    2.  Decision Tree Classifier Model:  0.9335038363171356
    3.  Logistic Regression Model:  0.9623188405797102

-  **Recall(Sensitivity)**

Recall measures the proportion of true positive predictions out of all actual positive instances in the dataset. It indicates how well the model is able to identify instances of a particular class. A high recall score (close to 1.0) means that the model is able to correctly identify most of the positive instances.

    1.  Random Forest Model:  0.9130434782608695
    2.  Decision Tree Classifier Model:  0.9130434782608695
    3.  Logistic Regression Model:  0.9565217391304347

- **F1 Score**

The F1 score is the harmonic mean of precision and recall. It provides a single metric that balances both precision and recall. A higher F1 score (closer to 1.0) indicates a better overall performance of the model.

    1.  Random Forest Model:  0.9134057971014493
    2.  Decision Tree Classifier Model:  0.9134057971014493
    3.  Logistic Regression Model:  0.956795761782974

###  **Overall Evaluation**

**Logistic Regression Model**: Overall, the Logistic Regression model demonstrates the highest performance across all metrics (accuracy, precision, recall, and F1 score). It achieves better classification accuracy and higher precision/recall balances compared to the tree-based models.

**Random Forest and Decision Tree Classifier Models**: Both tree-based models perform similarly across most metrics, indicating comparable performance in terms of classification accuracy and precision/recall balance. They are effective models but do not outperform the Logistic Regression model in this particular evaluation.
Several posibilities that could lead these two models have the same evaluation metrics:-

-  Data Complexity: If the dataset is relatively simple or has features that are well-separated and easy to classify, both models may perform similarly. In such cases, their decision boundaries might not differ significantly, resulting in comparable evaluation metrics.
-  Complexity Control: A Decision Tree model can potentially overfit the training data if not pruned properly, while a Random Forest tends to reduce overfitting by averaging multiple trees. If the Decision Tree model is pruned appropriately to control overfitting, its performance might align more closely with that of the Random Forest.
-  Impact of Dataset Size: In smaller datasets, the variability in performance between different models might be smaller, leading to closer evaluation metrics across different models, including Random Forest and Decision Tree.

### **Confusion Matrix**

A confusion matrix is a table that is used to evaluate the performance of a classification model. It presents a summary of the predictions made by the model compared to the actual true values of the target variable (or labels) in a tabular form. Each row of the matrix represents the instances in an actual class, while each column represents the instances in a predicted class.

**Logistic Regression Model**: 

-  The model correctly classified all 15 instances of the first class.
-  The model correctly classified 16 out of 18 instances of the second class, but misclassified 2 instances as the third class.
-  The model correctly classified all 13 instances of the third class.
-  The overall accuracy of the model is slightly higher than the other two models, with 44 out of 46 instances correctly classified.

**Random Forest and Decision Tree Classifier Models**:

- The model correctly classified all 15 instances of the first class.
- The model correctly classified 14 out of 18 instances of the second class, but misclassified 4 instances as the first class.
- The model correctly classified all 13 instances of the third class.
- The overall accuracy of the model is high, with 42 out of 46 instances correctly classified.

### Stop spark session

In [27]:
spark.stop()