<a href="https://colab.research.google.com/github/DianaKahar/Data_management3/blob/main/P137263_Assignment3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#**STQD6324 DATA MANAGEMENT**

**INTRODUCTION**

In [7]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=4f2f804fb5dd2115660536b9fb005cac247e493eda4b206d4c2c35b724052947
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [19]:
import pandas as pd
from sklearn.datasets import load_iris
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [2]:
iris = load_iris()
iris_df = pd.DataFrame(data=iris.data, columns=iris.feature_names)
iris_df['label'] = iris.target

In [5]:
iris_df.to_csv('iris.csv', index=False)

In [9]:
spark = SparkSession.builder.appName("IrisClassification").getOrCreate()

In [10]:
data = spark.read.csv('iris.csv', header=True, inferSchema=True)
data.show()

+-----------------+----------------+-----------------+----------------+-----+
|sepal length (cm)|sepal width (cm)|petal length (cm)|petal width (cm)|label|
+-----------------+----------------+-----------------+----------------+-----+
|              5.1|             3.5|              1.4|             0.2|    0|
|              4.9|             3.0|              1.4|             0.2|    0|
|              4.7|             3.2|              1.3|             0.2|    0|
|              4.6|             3.1|              1.5|             0.2|    0|
|              5.0|             3.6|              1.4|             0.2|    0|
|              5.4|             3.9|              1.7|             0.4|    0|
|              4.6|             3.4|              1.4|             0.3|    0|
|              5.0|             3.4|              1.5|             0.2|    0|
|              4.4|             2.9|              1.4|             0.2|    0|
|              4.9|             3.1|              1.5|          

In [13]:
assembler = VectorAssembler(inputCols=iris.feature_names, outputCol="features")
data = assembler.transform(data)

In [14]:
indexer = StringIndexer(inputCol="label", outputCol="indexedLabel")
data = indexer.fit(data).transform(data)
data.show()

+-----------------+----------------+-----------------+----------------+-----+-----------------+------------+
|sepal length (cm)|sepal width (cm)|petal length (cm)|petal width (cm)|label|         features|indexedLabel|
+-----------------+----------------+-----------------+----------------+-----+-----------------+------------+
|              5.1|             3.5|              1.4|             0.2|    0|[5.1,3.5,1.4,0.2]|         0.0|
|              4.9|             3.0|              1.4|             0.2|    0|[4.9,3.0,1.4,0.2]|         0.0|
|              4.7|             3.2|              1.3|             0.2|    0|[4.7,3.2,1.3,0.2]|         0.0|
|              4.6|             3.1|              1.5|             0.2|    0|[4.6,3.1,1.5,0.2]|         0.0|
|              5.0|             3.6|              1.4|             0.2|    0|[5.0,3.6,1.4,0.2]|         0.0|
|              5.4|             3.9|              1.7|             0.4|    0|[5.4,3.9,1.7,0.4]|         0.0|
|              4.6|

In [18]:
train_data, test_data = data.randomSplit([0.8, 0.2], seed=1234)

In [20]:
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="features")

In [21]:
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [10, 20, 30]) \
    .addGrid(rf.maxDepth, [5, 10, 15]) \
    .build()

In [22]:
crossval = CrossValidator(estimator=rf,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(labelCol="indexedLabel", metricName="accuracy"),
                          numFolds=3)

In [23]:
cvModel = crossval.fit(train_data)

In [24]:
predictions = cvModel.transform(test_data)
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction")
accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})
f1 = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})

In [25]:
print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1 Score: {f1}")

Accuracy: 0.972972972972973
Precision: 0.9750519750519752
Recall: 0.972972972972973
F1 Score: 0.972870012870013


In [26]:
predictions.select("indexedLabel", "prediction").show()


+------------+----------+
|indexedLabel|prediction|
+------------+----------+
|         0.0|       0.0|
|         0.0|       0.0|
|         0.0|       0.0|
|         0.0|       0.0|
|         0.0|       0.0|
|         0.0|       0.0|
|         0.0|       0.0|
|         0.0|       0.0|
|         0.0|       0.0|
|         0.0|       0.0|
|         1.0|       1.0|
|         1.0|       1.0|
|         0.0|       0.0|
|         0.0|       0.0|
|         1.0|       1.0|
|         1.0|       1.0|
|         0.0|       0.0|
|         0.0|       0.0|
|         2.0|       2.0|
|         2.0|       1.0|
+------------+----------+
only showing top 20 rows



In [27]:
confusion_matrix = predictions.groupBy("indexedLabel").pivot("prediction").count().fillna(0).orderBy("indexedLabel")
confusion_matrix.show()

+------------+---+---+---+
|indexedLabel|0.0|1.0|2.0|
+------------+---+---+---+
|         0.0| 14|  0|  0|
|         1.0|  0| 12|  0|
|         2.0|  0|  1| 10|
+------------+---+---+---+



In [28]:
predictions.select("indexedLabel", "prediction").show()

# You can also use confusion matrix for a detailed analysis
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")

# Another evaluation metric
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="weightedPrecision")
precision = evaluator.evaluate(predictions)
print(f"Precision: {precision}")

evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="weightedRecall")
recall = evaluator.evaluate(predictions)
print(f"Recall: {recall}")

evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="f1")
f1 = evaluator.evaluate(predictions)
print(f"F1 Score: {f1}")

+------------+----------+
|indexedLabel|prediction|
+------------+----------+
|         0.0|       0.0|
|         0.0|       0.0|
|         0.0|       0.0|
|         0.0|       0.0|
|         0.0|       0.0|
|         0.0|       0.0|
|         0.0|       0.0|
|         0.0|       0.0|
|         0.0|       0.0|
|         0.0|       0.0|
|         1.0|       1.0|
|         1.0|       1.0|
|         0.0|       0.0|
|         0.0|       0.0|
|         1.0|       1.0|
|         1.0|       1.0|
|         0.0|       0.0|
|         0.0|       0.0|
|         2.0|       2.0|
|         2.0|       1.0|
+------------+----------+
only showing top 20 rows

Accuracy: 0.972972972972973
Precision: 0.9750519750519752
Recall: 0.972972972972973
F1 Score: 0.972870012870013
