<a href="https://colab.research.google.com/github/azrazainol/STQD6324_Assignment_03/blob/main/Assignment3_P137262.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=ca8ee148c8990edf801c83590d1feab539689dd4a2696e2b9bfc788d15d18392
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import RandomForestClassifier, DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import Pipeline
import pandas as pd
from sklearn.datasets import load_iris
from sklearn.metrics import confusion_matrix

# Initialize Spark session
spark = SparkSession.builder.appName("IrisClassification").getOrCreate()

# Load the Iris dataset
iris = load_iris()
iris_df = pd.DataFrame(data=iris.data, columns=iris.feature_names)
iris_df['label'] = iris.target

# Convert to Spark DataFrame
df = spark.createDataFrame(iris_df)

# Feature Engineering
assembler = VectorAssembler(inputCols=iris.feature_names, outputCol="features")
df = assembler.transform(df)

# Index labels
indexer = StringIndexer(inputCol="label", outputCol="indexedLabel")
df = indexer.fit(df).transform(df)

# Split the data into training and testing sets
train_data, test_data = df.randomSplit([0.7, 0.3], seed=42)  # Set seed for reproducibility

# Define Random Forest classifier
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="features", seed=42)

# Set up pipeline for Random Forest
pipeline_rf = Pipeline(stages=[rf])

# Create parameter grid for Random Forest
paramGrid_rf = (ParamGridBuilder()
                .addGrid(rf.numTrees, [10, 15, 20, 25, 30])  # Number of trees in the forest
                .addGrid(rf.maxDepth, [5, 7, 10, 12, 15])   # Maximum depth of the tree
                .build())

# Define cross-validator for Random Forest
crossval_rf = CrossValidator(estimator=pipeline_rf,
                             estimatorParamMaps=paramGrid_rf,
                             evaluator=MulticlassClassificationEvaluator(labelCol="indexedLabel", metricName="accuracy"),
                             numFolds=5, seed=42)  # Set seed for reproducibility

# Train the Random Forest model using cross-validation
cvModel_rf = crossval_rf.fit(train_data)

# Print best model parameters for Random Forest
best_rf_model = cvModel_rf.bestModel.stages[-1]
print("Random Forest - Best Model Parameters:")
print(best_rf_model.extractParamMap())
print()

# Make predictions on the test data using Random Forest
predictions_rf = cvModel_rf.transform(test_data)

# Evaluate Random Forest model
evaluator_rf = MulticlassClassificationEvaluator(labelCol="indexedLabel", metricName="accuracy")
accuracy_rf = evaluator_rf.evaluate(predictions_rf)

evaluator_rf.setMetricName("weightedPrecision")
precision_rf = evaluator_rf.evaluate(predictions_rf)

evaluator_rf.setMetricName("weightedRecall")
recall_rf = evaluator_rf.evaluate(predictions_rf)

evaluator_rf.setMetricName("f1")
f1_score_rf = evaluator_rf.evaluate(predictions_rf)

# Print evaluation metrics for Random Forest
print("Random Forest Metrics:")
print(f"Accuracy: {accuracy_rf}")
print(f"Precision: {precision_rf}")
print(f"Recall: {recall_rf}")
print(f"F1 Score: {f1_score_rf}")
print()

# Compute confusion matrix for Random Forest
y_true_rf = predictions_rf.select("indexedLabel").toPandas()
y_pred_rf = predictions_rf.select("prediction").toPandas()

cm_rf = confusion_matrix(y_true_rf, y_pred_rf)
print("Confusion Matrix (Random Forest):\n", cm_rf)
print()

# Define Decision Tree classifier
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="features", seed=42)

# Set up pipeline for Decision Tree
pipeline_dt = Pipeline(stages=[dt])

# Create parameter grid for Decision Tree
paramGrid_dt = (ParamGridBuilder()
                .addGrid(dt.maxDepth, [5, 7, 10, 12, 15])  # Maximum depth of the tree
                .build())

# Define cross-validator for Decision Tree
crossval_dt = CrossValidator(estimator=pipeline_dt,
                             estimatorParamMaps=paramGrid_dt,
                             evaluator=MulticlassClassificationEvaluator(labelCol="indexedLabel", metricName="accuracy"),
                             numFolds=5, seed=42)  # Set seed for reproducibility

# Train the Decision Tree model using cross-validation
cvModel_dt = crossval_dt.fit(train_data)

# Print best model parameters for Decision Tree
best_dt_model = cvModel_dt.bestModel.stages[-1]
print("Decision Tree - Best Model Parameters:")
print(best_dt_model.extractParamMap())
print()

# Make predictions on the test data using Decision Tree
predictions_dt = cvModel_dt.transform(test_data)

# Evaluate Decision Tree model
evaluator_dt = MulticlassClassificationEvaluator(labelCol="indexedLabel", metricName="accuracy")
accuracy_dt = evaluator_dt.evaluate(predictions_dt)

evaluator_dt.setMetricName("weightedPrecision")
precision_dt = evaluator_dt.evaluate(predictions_dt)

evaluator_dt.setMetricName("weightedRecall")
recall_dt = evaluator_dt.evaluate(predictions_dt)

evaluator_dt.setMetricName("f1")
f1_score_dt = evaluator_dt.evaluate(predictions_dt)

# Print evaluation metrics for Decision Tree
print("Decision Tree Metrics:")
print(f"Accuracy: {accuracy_dt}")
print(f"Precision: {precision_dt}")
print(f"Recall: {recall_dt}")
print(f"F1 Score: {f1_score_dt}")
print()

# Compute confusion matrix for Decision Tree
y_true_dt = predictions_dt.select("indexedLabel").toPandas()
y_pred_dt = predictions_dt.select("prediction").toPandas()

cm_dt = confusion_matrix(y_true_dt, y_pred_dt)
print("Confusion Matrix (Decision Tree):\n", cm_dt)
print()

# Stop the Spark session
spark.stop()


Random Forest - Best Model Parameters:
{Param(parent='RandomForestClassifier_45ceb34aa731', name='bootstrap', doc='Whether bootstrap samples are used when building trees.'): True, Param(parent='RandomForestClassifier_45ceb34aa731', name='cacheNodeIds', doc='If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval.'): False, Param(parent='RandomForestClassifier_45ceb34aa731', name='checkpointInterval', doc='set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext.'): 10, Param(parent='RandomForestClassifier_45ceb34aa731', name='featureSubsetStrategy', doc="The number of features to conside

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import Pipeline
import pandas as pd
from sklearn.metrics import confusion_matrix
from sklearn.datasets import load_iris

# Initialize Spark session
spark = SparkSession.builder.appName("IrisClassification").getOrCreate()

# Load the Iris dataset
iris = load_iris()
iris_df = pd.DataFrame(data=iris.data, columns=iris.feature_names)
iris_df['label'] = iris.target

# Convert to Spark DataFrame
df = spark.createDataFrame(iris_df)

# Feature Engineering
assembler = VectorAssembler(inputCols=iris.feature_names, outputCol="features")
df = assembler.transform(df)

# Index labels
indexer = StringIndexer(inputCol="label", outputCol="indexedLabel")
df = indexer.fit(df).transform(df)

# Split the data into training and testing sets
train_data, test_data = df.randomSplit([0.7, 0.3], seed=42)  # Set seed for reproducibility

# Define Random Forest classifier
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="features", seed=42)

# Set up pipeline for Random Forest
pipeline_rf = Pipeline(stages=[rf])

# Create parameter grid for Random Forest
paramGrid_rf = (ParamGridBuilder()
                .addGrid(rf.numTrees, [10, 15, 20, 25, 30])  # Number of trees in the forest
                .addGrid(rf.maxDepth, [5, 7, 10, 12, 15])   # Maximum depth of the tree
                .build())

# Define cross-validator for Random Forest
crossval_rf = CrossValidator(estimator=pipeline_rf,
                             estimatorParamMaps=paramGrid_rf,
                             evaluator=MulticlassClassificationEvaluator(labelCol="indexedLabel", metricName="accuracy"),
                             numFolds=5, seed=42)  # Set seed for reproducibility

# Train the Random Forest model using cross-validation
cvModel_rf = crossval_rf.fit(train_data)

# Make predictions on the test data using Random Forest
predictions_rf = cvModel_rf.transform(test_data)

# Evaluate Random Forest model
evaluator_rf = MulticlassClassificationEvaluator(labelCol="indexedLabel", metricName="accuracy")
accuracy_rf = evaluator_rf.evaluate(predictions_rf)

evaluator_rf.setMetricName("weightedPrecision")
precision_rf = evaluator_rf.evaluate(predictions_rf)

evaluator_rf.setMetricName("weightedRecall")
recall_rf = evaluator_rf.evaluate(predictions_rf)

evaluator_rf.setMetricName("f1")
f1_score_rf = evaluator_rf.evaluate(predictions_rf)

# Print evaluation metrics for Random Forest
print("Random Forest Metrics:")
print(f"Accuracy: {accuracy_rf}")
print(f"Precision: {precision_rf}")
print(f"Recall: {recall_rf}")
print(f"F1 Score: {f1_score_rf}")
print()

# Compute confusion matrix for Random Forest
y_true_rf = predictions_rf.select("indexedLabel").toPandas()
y_pred_rf = predictions_rf.select("prediction").toPandas()

cm_rf = confusion_matrix(y_true_rf, y_pred_rf)
print("Confusion Matrix (Random Forest):\n", cm_rf)
print()

# Stop the Spark session
spark.stop()


Random Forest Metrics:
Accuracy: 0.9821428571428571
Precision: 0.9835164835164836
Recall: 0.9821428571428572
F1 Score: 0.9822586872586874

Confusion Matrix (Random Forest):
 [[25  0  0]
 [ 0 12  0]
 [ 0  1 18]]



In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import Pipeline
import pandas as pd
from sklearn.metrics import confusion_matrix
from sklearn.datasets import load_iris

# Initialize Spark session
spark = SparkSession.builder.appName("IrisClassification").getOrCreate()

# Load the Iris dataset
iris = load_iris()
iris_df = pd.DataFrame(data=iris.data, columns=iris.feature_names)
iris_df['label'] = iris.target

# Convert to Spark DataFrame
df = spark.createDataFrame(iris_df)

# Feature Engineering
assembler = VectorAssembler(inputCols=iris.feature_names, outputCol="features")
df = assembler.transform(df)

# Index labels
indexer = StringIndexer(inputCol="label", outputCol="indexedLabel")
df = indexer.fit(df).transform(df)

# Split the data into training and testing sets
train_data, test_data = df.randomSplit([0.7, 0.3], seed=42)  # Set seed for reproducibility

# Define Decision Tree classifier
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="features", seed=42)

# Set up pipeline for Decision Tree
pipeline_dt = Pipeline(stages=[dt])

# Create parameter grid for Decision Tree
paramGrid_dt = (ParamGridBuilder()
                .addGrid(dt.maxDepth, [5, 7, 10, 12, 15])  # Maximum depth of the tree
                .build())

# Define cross-validator for Decision Tree
crossval_dt = CrossValidator(estimator=pipeline_dt,
                             estimatorParamMaps=paramGrid_dt,
                             evaluator=MulticlassClassificationEvaluator(labelCol="indexedLabel", metricName="accuracy"),
                             numFolds=5, seed=42)  # Set seed for reproducibility

# Train the Decision Tree model using cross-validation
cvModel_dt = crossval_dt.fit(train_data)

# Make predictions on the test data using Decision Tree
predictions_dt = cvModel_dt.transform(test_data)

# Evaluate Decision Tree model
evaluator_dt = MulticlassClassificationEvaluator(labelCol="indexedLabel", metricName="accuracy")
accuracy_dt = evaluator_dt.evaluate(predictions_dt)

evaluator_dt.setMetricName("weightedPrecision")
precision_dt = evaluator_dt.evaluate(predictions_dt)

evaluator_dt.setMetricName("weightedRecall")
recall_dt = evaluator_dt.evaluate(predictions_dt)

evaluator_dt.setMetricName("f1")
f1_score_dt = evaluator_dt.evaluate(predictions_dt)

# Print evaluation metrics for Decision Tree
print("Decision Tree Metrics:")
print(f"Accuracy: {accuracy_dt}")
print(f"Precision: {precision_dt}")
print(f"Recall: {recall_dt}")
print(f"F1 Score: {f1_score_dt}")
print()

# Compute confusion matrix for Decision Tree
y_true_dt = predictions_dt.select("indexedLabel").toPandas()
y_pred_dt = predictions_dt.select("prediction").toPandas()

cm_dt = confusion_matrix(y_true_dt, y_pred_dt)
print("Confusion Matrix (Decision Tree):\n", cm_dt)
print()

# Stop the Spark session
spark.stop()


Decision Tree Metrics:
Accuracy: 0.9821428571428571
Precision: 0.9835164835164836
Recall: 0.9821428571428572
F1 Score: 0.9822586872586874

Confusion Matrix (Decision Tree):
 [[25  0  0]
 [ 0 12  0]
 [ 0  1 18]]

