# **1. Install spark**

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz
!tar xf spark-3.0.3-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.3-bin-hadoop2.7"

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

# **2. Load dataset**

In [None]:
'''
load models
'''
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorIndexer, IndexToString
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics

In [None]:
'''
load data
load the dataset to google Drive. Then copy the link of the data file
'''
from google.colab import drive
drive.mount('/content/drive')
data = spark.read.format("libsvm").load("/content/drive/MyDrive/dataset.txt")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
data.select("features").show(1,False)

+-------------------------------------------------+
|features                                         |
+-------------------------------------------------+
|(4,[0,1,2,3],[-0.222222,0.5,-0.762712,-0.833333])|
+-------------------------------------------------+
only showing top 1 row



In [None]:
data.dtypes

[('label', 'double'), ('features', 'vector')]

In [None]:
'''
label indexer
map a string column of labels to an ML column of label indices
'''
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)

In [None]:
'''
class for indexing categorical feature columns in a dataset of Vector
'''
featureIndexer =VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

In [None]:
'''
split dataset to training and testing
'''
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# **2. Decision Tree**
Run below codes and answer question 1.

reference:

model:
https://spark.apache.org/docs/latest/mllib-decision-tree.html

evaluation:
https://spark.apache.org/docs/latest/mllib-evaluation-metrics.html#multiclass-classification

## **Model**

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier

In [None]:
dt = DecisionTreeClassifier( maxDepth=2,featuresCol="indexedFeatures",labelCol="indexedLabel")

In [None]:
dt_pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt])

In [None]:
dt_model = dt_pipeline.fit(trainingData)

In [None]:
dt_predictions = dt_model.transform(testData)

In [None]:
print(dt_model.stages[2])

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_1fe829f1c824, depth=2, numNodes=5, numClasses=3, numFeatures=4


In [None]:
dt_predictions.show(5)

+-----+--------------------+------------+--------------------+--------------+--------------------+----------+
|label|            features|indexedLabel|     indexedFeatures| rawPrediction|         probability|prediction|
+-----+--------------------+------------+--------------------+--------------+--------------------+----------+
|  0.0|(4,[0,1,2,3],[-0....|         0.0|(4,[0,1,2,3],[-0....|[33.0,0.0,0.0]|       [1.0,0.0,0.0]|       0.0|
|  0.0|(4,[0,1,2,3],[-1....|         0.0|(4,[0,1,2,3],[-1....|[33.0,0.0,0.0]|       [1.0,0.0,0.0]|       0.0|
|  0.0|(4,[0,1,2,3],[0.0...|         0.0|(4,[0,1,2,3],[0.0...|[33.0,0.0,0.0]|       [1.0,0.0,0.0]|       0.0|
|  0.0|(4,[0,1,2,3],[0.0...|         0.0|(4,[0,1,2,3],[0.0...|[33.0,0.0,0.0]|       [1.0,0.0,0.0]|       0.0|
|  0.0|(4,[0,1,2,3],[0.1...|         0.0|(4,[0,1,2,3],[0.1...|[4.0,0.0,38.0]|[0.09523809523809...|       2.0|
+-----+--------------------+------------+--------------------+--------------+--------------------+----------+
only showi

## **Model Evaluation**
You finish codes on the f1 and recall parts and run the code. Answer the question 1.

Accurancy

In [None]:
acc_evaluator_dt = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy",)
acc_dt = acc_evaluator_dt.evaluate(dt_predictions)
print("accurancy:"+str(acc_dt))

accurancy:0.9444444444444444


Precision

In [None]:
pr_evaluator_dt = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="precisionByLabel")
precision_dt = pr_evaluator_dt.evaluate(dt_predictions)
print("precision:"+str(precision_dt))

precision:0.9230769230769231


F1_score

Recall

# **3. Random forest**
Run below codes and answer question 2.

reference:

model:
https://spark.apache.org/docs/latest/mllib-ensembles.html#random-forests

evaluation:
https://spark.apache.org/docs/latest/mllib-evaluation-metrics.html#multiclass-classification

## **Model**

In [None]:
from pyspark.ml.classification import RandomForestClassifier

In [None]:
rf = RandomForestClassifier(numTrees=3,featuresCol="indexedFeatures",labelCol="indexedLabel")

In [None]:
rf_pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf])

In [None]:
rf_model = rf_pipeline.fit(trainingData)

In [None]:
rf_predictions = rf_model.transform(testData)

In [None]:
print(rf_model.stages[2])

RandomForestClassificationModel: uid=RandomForestClassifier_25cc4cb85fd5, numTrees=3, numClasses=3, numFeatures=4


In [None]:
rf_predictions.show(5)

+-----+--------------------+------------+--------------------+-------------+--------------------+----------+
|label|            features|indexedLabel|     indexedFeatures|rawPrediction|         probability|prediction|
+-----+--------------------+------------+--------------------+-------------+--------------------+----------+
|  0.0|(4,[0,1,2,3],[-0....|         0.0|(4,[0,1,2,3],[-0....|[3.0,0.0,0.0]|       [1.0,0.0,0.0]|       0.0|
|  0.0|(4,[0,1,2,3],[-1....|         0.0|(4,[0,1,2,3],[-1....|[3.0,0.0,0.0]|       [1.0,0.0,0.0]|       0.0|
|  0.0|(4,[0,1,2,3],[0.0...|         0.0|(4,[0,1,2,3],[0.0...|[3.0,0.0,0.0]|       [1.0,0.0,0.0]|       0.0|
|  0.0|(4,[0,1,2,3],[0.0...|         0.0|(4,[0,1,2,3],[0.0...|[3.0,0.0,0.0]|       [1.0,0.0,0.0]|       0.0|
|  0.0|(4,[0,1,2,3],[0.1...|         0.0|(4,[0,1,2,3],[0.1...|[1.1,0.0,1.9]|[0.36666666666666...|       2.0|
+-----+--------------------+------------+--------------------+-------------+--------------------+----------+
only showing top 5 

## **Model Evaluation**
You finish codes on the precision and recall parts and run the code. Answer the question 2.

Accurancy

In [None]:
acc_evaluator_rf = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy",)
acc_rf = acc_evaluator_rf.evaluate(rf_predictions)
print("accurancy:"+str(acc_rf))

accurancy:0.9444444444444444


F1_score

In [None]:
f_evaluator_rf = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="f1")
f1_score_rf = f_evaluator_rf.evaluate(rf_predictions)
print("f1 score:"+str(f1_score_rf))

f1 score:0.9444444444444444


Precision

Recall

# **4. Naive bayes**
Run below codes and answer question 3.

reference:

model:
https://en.wikipedia.org/wiki/Naive_Bayes_classifier

evaluation:
https://spark.apache.org/docs/latest/mllib-evaluation-metrics.html#multiclass-classification

## **Model**

In [None]:
from pyspark.ml.classification import NaiveBayes

In [None]:
nb = NaiveBayes(smoothing=1.0, modelType="gaussian",featuresCol="indexedFeatures",labelCol="indexedLabel",thresholds=[0.5,0.5,0.5])

In [None]:
nb_pipeline = Pipeline(stages=[labelIndexer, featureIndexer, nb])

In [None]:
nb_model = nb_pipeline.fit(trainingData)

In [None]:
nb_predictions = nb_model.transform(testData)

In [None]:
print(nb_model.stages[2])

NaiveBayesModel: uid=NaiveBayes_a71d3396877e, modelType=gaussian, numClasses=3, numFeatures=4


In [None]:
nb_predictions.select("prediction", "indexedLabel", "indexedFeatures").show(5)

+----------+------------+--------------------+
|prediction|indexedLabel|     indexedFeatures|
+----------+------------+--------------------+
|       0.0|         0.0|(4,[0,1,2,3],[-0....|
|       0.0|         0.0|(4,[0,1,2,3],[-1....|
|       0.0|         0.0|(4,[0,1,2,3],[0.0...|
|       0.0|         0.0|(4,[0,1,2,3],[0.0...|
|       2.0|         0.0|(4,[0,1,2,3],[0.1...|
+----------+------------+--------------------+
only showing top 5 rows



## **Model Evaluation**
You finish codes on the accurancy and f1 parts and run the code. Answer the question 3.

Precision

In [None]:
pr_evaluator_nb = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="precisionByLabel")
precision_nb = pr_evaluator_nb.evaluate(nb_predictions)
print("precision:"+str(precision_nb))

precision:0.8571428571428571


Recall

In [None]:
re_evaluator_nb = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="recallByLabel")
recall_nb = re_evaluator_nb.evaluate(nb_predictions)
print("recall:"+str(recall_nb))

recall:0.9230769230769231


Accurancy

F1_score

# **5. SVM**
Run below codes and answer question 4.

reference:

model:
https://en.wikipedia.org/wiki/Naive_Bayes_classifier

evaluation:
https://spark.apache.org/docs/latest/mllib-evaluation-metrics.html#multiclass-classification

## **Model**

In [None]:
from pyspark.ml.classification import LinearSVC,OneVsRest

In [None]:
lsvc = LinearSVC(maxIter=2, regParam=0.1,featuresCol="indexedFeatures",labelCol="indexedLabel")

In [None]:
ovr = OneVsRest(classifier=lsvc)

In [None]:
lsvc_pipeline = Pipeline(stages=[labelIndexer, featureIndexer, ovr])

In [None]:
lsvcModel = lsvc_pipeline.fit(trainingData)

In [None]:
lsvc_prediction = lsvcModel.transform(testData)

In [None]:
lsvc_prediction.select("prediction", "indexedLabel", "indexedFeatures").show()

+----------+------------+--------------------+
|prediction|indexedLabel|     indexedFeatures|
+----------+------------+--------------------+
|       0.0|         0.0|(4,[0,1,2,3],[-0....|
|       0.0|         0.0|(4,[0,1,2,3],[-1....|
|       0.0|         0.0|(4,[0,1,2,3],[0.0...|
|       0.0|         0.0|(4,[0,1,2,3],[0.0...|
|       0.0|         0.0|(4,[0,1,2,3],[0.1...|
|       0.0|         0.0|(4,[0,1,2,3],[0.1...|
|       0.0|         0.0|(4,[0,1,2,3],[0.1...|
|       0.0|         0.0|(4,[0,1,2,3],[0.1...|
|       0.0|         0.0|(4,[0,1,2,3],[0.3...|
|       0.0|         0.0|(4,[0,1,2,3],[0.4...|
|       0.0|         0.0|(4,[0,1,2,3],[0.6...|
|       0.0|         0.0|(4,[0,1,2,3],[0.8...|
|       0.0|         0.0|(4,[0,2,3],[0.444...|
|       1.0|         1.0|(4,[0,1,2,3],[-1....|
|       1.0|         1.0|(4,[0,1,2,3],[-0....|
|       1.0|         1.0|(4,[0,1,2,3],[-0....|
|       1.0|         1.0|(4,[0,1,2,3],[-0....|
|       1.0|         1.0|(4,[0,1,2,3],[-0....|
|       1.0| 

## **Model Evaluation**
You finish codes on the accurancy and precision parts and run the code. Answer the question 4.

F1_score

In [None]:
f_evaluator_svm = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="f1")
f1_score_svm = f_evaluator_svm.evaluate(lsvc_prediction)
print("f1 score:"+str(f1_score_svm))

f1 score:0.7483660130718954


Recall

In [None]:
re_evaluator_svm = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="recallByLabel")
recall_svm = re_evaluator_svm.evaluate(lsvc_prediction)
print("recall:"+str(recall_svm))

recall:1.0


Accurancy

Precision