# Lab 5 for Big Data programming.
# Apache Spark Machine Learning using Dataframes in Google Colab




# 1.	Setup an Apache Spark instance in Google Colab

In [1]:
# Run once.

!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.0.2/spark-3.0.2-bin-hadoop2.7.tgz
!tar xf spark-3.0.2-bin-hadoop2.7.tgz
!pip install -q findspark

#Run Once
import os
os.environ["SPARK_HOME"] = "/content/spark-3.0.2-bin-hadoop2.7"
import findspark
findspark.init()


# 2.	Create a Spark session

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()
spark


# 3.	Download the Iris dataset and another dataset of your choosing



In [36]:
!wget "https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data" -O sample_data/iris.data
!wget "https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.data" -O sample_data/adult.data

--2022-03-21 16:31:44--  https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data
Resolving archive.ics.uci.edu (archive.ics.uci.edu)... 128.195.10.252
Connecting to archive.ics.uci.edu (archive.ics.uci.edu)|128.195.10.252|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 4551 (4.4K) [application/x-httpd-php]
Saving to: ‘sample_data/iris.data’


2022-03-21 16:31:44 (98.0 MB/s) - ‘sample_data/iris.data’ saved [4551/4551]

--2022-03-21 16:31:44--  https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.data
Resolving archive.ics.uci.edu (archive.ics.uci.edu)... 128.195.10.252
Connecting to archive.ics.uci.edu (archive.ics.uci.edu)|128.195.10.252|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 3974305 (3.8M) [application/x-httpd-php]
Saving to: ‘sample_data/adult.data’


2022-03-21 16:31:45 (7.36 MB/s) - ‘sample_data/adult.data’ saved [3974305/3974305]



# 4.	Import the Iris dataset into a dataframe and insert screenshot of df.show()command output:

In [37]:
df_iris = spark.read.csv('sample_data/iris.data', inferSchema=True)\
.toDF("SepalLength","SepalWidth","PetalLength","PetalWidth","Class")
df_iris.printSchema()

root
 |-- SepalLength: double (nullable = true)
 |-- SepalWidth: double (nullable = true)
 |-- PetalLength: double (nullable = true)
 |-- PetalWidth: double (nullable = true)
 |-- Class: string (nullable = true)



Import the second dataset for use later

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: double (nullable = true)
 |-- education: string (nullable = true)
 |-- education-num: double (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital-gain: double (nullable = true)
 |-- capital-loss: double (nullable = true)
 |-- hours-per-week: double (nullable = true)
 |-- native-country: string (nullable = true)
 |-- class: string (nullable = true)



# 5.	Spark ML can only deal with one features column - so we need to vectorise the multiple columns into one:

In [39]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler


In [42]:
vector_assembler=VectorAssembler(\
inputCols=["SepalLength","SepalWidth","PetalLength","PetalWidth"],\
outputCol="features")
df_iris=vector_assembler.transform(df_iris)
df_iris.show(3)

+-----------+----------+-----------+----------+-----------+-----------------+
|SepalLength|SepalWidth|PetalLength|PetalWidth|      Class|         features|
+-----------+----------+-----------+----------+-----------+-----------------+
|        5.1|       3.5|        1.4|       0.2|Iris-setosa|[5.1,3.5,1.4,0.2]|
|        4.9|       3.0|        1.4|       0.2|Iris-setosa|[4.9,3.0,1.4,0.2]|
|        4.7|       3.2|        1.3|       0.2|Iris-setosa|[4.7,3.2,1.3,0.2]|
+-----------+----------+-----------+----------+-----------+-----------------+
only showing top 3 rows



In [43]:
df_temp=df_iris.drop("SepalLength","SepalWidth","PetalLength","PetalWidth")
df_temp.show(3)


+-----------+-----------------+
|      Class|         features|
+-----------+-----------------+
|Iris-setosa|[5.1,3.5,1.4,0.2]|
|Iris-setosa|[4.9,3.0,1.4,0.2]|
|Iris-setosa|[4.7,3.2,1.3,0.2]|
+-----------+-----------------+
only showing top 3 rows



# 6.	The final data preparation step is to index the Class column - to use numeric rather than text values - run the following command and display your output of Class, features & ClassIndex columns:

In [44]:
from pyspark.ml.feature import StringIndexer
l_indexer=StringIndexer(inputCol="Class", outputCol="ClassIndex")
df_iris = l_indexer.fit(df_iris).transform(df_iris)

In [None]:
df.show(10)

+-----------+----------+-----------+----------+-----------+----------+-----------------+
|SepalLength|SepalWidth|PetalLength|PetalWidth|      Class|ClassIndex|         features|
+-----------+----------+-----------+----------+-----------+----------+-----------------+
|        5.1|       3.5|        1.4|       0.2|Iris-setosa|       0.0|[5.1,3.5,1.4,0.2]|
|        4.9|       3.0|        1.4|       0.2|Iris-setosa|       0.0|[4.9,3.0,1.4,0.2]|
|        4.7|       3.2|        1.3|       0.2|Iris-setosa|       0.0|[4.7,3.2,1.3,0.2]|
|        4.6|       3.1|        1.5|       0.2|Iris-setosa|       0.0|[4.6,3.1,1.5,0.2]|
|        5.0|       3.6|        1.4|       0.2|Iris-setosa|       0.0|[5.0,3.6,1.4,0.2]|
|        5.4|       3.9|        1.7|       0.4|Iris-setosa|       0.0|[5.4,3.9,1.7,0.4]|
|        4.6|       3.4|        1.4|       0.3|Iris-setosa|       0.0|[4.6,3.4,1.4,0.3]|
|        5.0|       3.4|        1.5|       0.2|Iris-setosa|       0.0|[5.0,3.4,1.5,0.2]|
|        4.4|       2

# 7.	Split your data into training and test datasets:

In [46]:
(trainingData,testData) = df_iris.randomSplit([0.7,0.3])

# 8.	Decision Tree Classifier 
## Specify the DecisionTreeClassifier and train the model on your training dataset:


In [47]:
from pyspark.ml.classification import DecisionTreeClassifier 
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


In [48]:
dt = DecisionTreeClassifier(labelCol="ClassIndex",featuresCol="features")
model=dt.fit(trainingData)

# 9.	Test your model with your test dataset: 

In [49]:
predictions = model.transform(testData)

In [50]:
predictions.select("prediction","ClassIndex").show(15)

+----------+----------+
|prediction|ClassIndex|
+----------+----------+
|       0.0|       0.0|
|       0.0|       0.0|
|       0.0|       0.0|
|       0.0|       0.0|
|       0.0|       0.0|
|       1.0|       1.0|
|       1.0|       1.0|
|       0.0|       0.0|
|       0.0|       0.0|
|       0.0|       0.0|
|       0.0|       0.0|
|       0.0|       0.0|
|       0.0|       0.0|
|       0.0|       0.0|
|       1.0|       1.0|
+----------+----------+
only showing top 15 rows



# 10.	Run an evaluator function to show the accuracy of your model:

In [51]:
evaluator= MulticlassClassificationEvaluator(\
labelCol="ClassIndex", predictionCol="prediction",\
metricName="accuracy")
accuracy=evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))
print("Test Set accuracy = " +str(accuracy))

Test Error = 0.05
Test Set accuracy = 0.95


# 11.	Random Forest Classifier

## Specify the RandomForestClassifier, train the model on your training dataset, predict using your test dataset, and run an evaluator to test accuracy:


In [52]:
from pyspark.ml.classification import RandomForestClassifier 
rf=RandomForestClassifier(labelCol="ClassIndex",\
featuresCol="features",numTrees=10)
model=rf.fit(trainingData)
predictions=model.transform(testData)
predictions.select("prediction","ClassIndex").show(5)

+----------+----------+
|prediction|ClassIndex|
+----------+----------+
|       0.0|       0.0|
|       0.0|       0.0|
|       0.0|       0.0|
|       0.0|       0.0|
|       0.0|       0.0|
+----------+----------+
only showing top 5 rows



In [53]:
evaluator= \
MulticlassClassificationEvaluator(labelCol="ClassIndex",\
predictionCol="prediction",metricName="accuracy")
accuracy=evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))
print("Test Set accuracy = " +str(accuracy))

Test Error = 0.025
Test Set accuracy = 0.975


# 12.	Naive Bayes Classifier
## Specify the NaiveBayes classifier, train the model on your training dataset, predict using your test dataset, and run an evaluator to test accuracy:


In [54]:
from pyspark.ml.classification import NaiveBayes 
nb=NaiveBayes(labelCol="ClassIndex",\
featuresCol="features",smoothing=1.0,\
modelType="multinomial")
model=nb.fit(trainingData)


In [55]:
predictions=model.transform(testData)
predictions.select("Class","ClassIndex",
"probability","prediction").show(5)

+-----------+----------+--------------------+----------+
|      Class|ClassIndex|         probability|prediction|
+-----------+----------+--------------------+----------+
|Iris-setosa|       0.0|[0.66450182167383...|       0.0|
|Iris-setosa|       0.0|[0.68280837280062...|       0.0|
|Iris-setosa|       0.0|[0.65395351743685...|       0.0|
|Iris-setosa|       0.0|[0.63486571338523...|       0.0|
|Iris-setosa|       0.0|[0.68450782946660...|       0.0|
+-----------+----------+--------------------+----------+
only showing top 5 rows



In [56]:
evaluator= \
MulticlassClassificationEvaluator(labelCol="ClassIndex",\
predictionCol="prediction",metricName="accuracy")
accuracy=evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))
print("Test Set accuracy = " +str(accuracy))

Test Error = 0.025
Test Set accuracy = 0.975


# Now try it with the other Dataset.


In [75]:
df_adult = spark.read.csv('sample_data/adult.data', inferSchema=True)\
.toDF("age","workclass","fnlwgt","education","education-num","marital-status","occupation","relationship","race","sex","capital-gain","capital-loss","hours-per-week","native-country","class")
df_adult.printSchema()

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: double (nullable = true)
 |-- education: string (nullable = true)
 |-- education-num: double (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital-gain: double (nullable = true)
 |-- capital-loss: double (nullable = true)
 |-- hours-per-week: double (nullable = true)
 |-- native-country: string (nullable = true)
 |-- class: string (nullable = true)



In [76]:
vector_assembler=VectorAssembler(\
inputCols=["education-num","capital-gain","capital-loss","hours-per-week"],\
outputCol="features")
df_adult=vector_assembler.transform(df_adult)
df_adult.show(3)

+---+-----------------+--------+----------+-------------+-------------------+------------------+--------------+------+-----+------------+------------+--------------+--------------+------+--------------------+
|age|        workclass|  fnlwgt| education|education-num|     marital-status|        occupation|  relationship|  race|  sex|capital-gain|capital-loss|hours-per-week|native-country| class|            features|
+---+-----------------+--------+----------+-------------+-------------------+------------------+--------------+------+-----+------------+------------+--------------+--------------+------+--------------------+
| 39|        State-gov| 77516.0| Bachelors|         13.0|      Never-married|      Adm-clerical| Not-in-family| White| Male|      2174.0|         0.0|          40.0| United-States| <=50K|[13.0,2174.0,0.0,...|
| 50| Self-emp-not-inc| 83311.0| Bachelors|         13.0| Married-civ-spouse|   Exec-managerial|       Husband| White| Male|         0.0|         0.0|          13.0

In [77]:
df_adult=df_adult.drop("age","workclass","education","fnlwgt","marital-status","occupation","relationship","race","sex","native-country")
l_indexer=StringIndexer(inputCol="class", outputCol="ClassIndex")
df_adult = l_indexer.fit(df_adult).transform(df_adult)
df_adult.show(3)

+-------------+------------+------------+--------------+------+--------------------+
|education-num|capital-gain|capital-loss|hours-per-week| class|            features|
+-------------+------------+------------+--------------+------+--------------------+
|         13.0|      2174.0|         0.0|          40.0| <=50K|[13.0,2174.0,0.0,...|
|         13.0|         0.0|         0.0|          13.0| <=50K| [13.0,0.0,0.0,13.0]|
|          9.0|         0.0|         0.0|          40.0| <=50K|  [9.0,0.0,0.0,40.0]|
+-------------+------------+------------+--------------+------+--------------------+
only showing top 3 rows



# Set up the training and test data.

In [None]:
(trainingData_adult,testData_adult) = df_adult.randomSplit([0.7,0.3])

# Decision Tree Classifier for the adult data.


In [94]:
dt_adult = DecisionTreeClassifier(labelCol="ClassIndex",featuresCol="features")
model=dt.fit(trainingData_adult)

predictions = model.transform(testData_adult)
predictions.select("prediction","ClassIndex").show(10)

+----------+----------+
|prediction|ClassIndex|
+----------+----------+
|       0.0|       0.0|
|       0.0|       0.0|
|       0.0|       0.0|
|       0.0|       0.0|
|       0.0|       0.0|
|       0.0|       0.0|
|       0.0|       0.0|
|       0.0|       0.0|
|       0.0|       0.0|
|       0.0|       0.0|
+----------+----------+
only showing top 10 rows



In [95]:
evaluator= MulticlassClassificationEvaluator(\
labelCol="ClassIndex", predictionCol="prediction",\
metricName="accuracy")
accuracy=evaluator.evaluate(predictions)

print("Desision Tree for the adult data")
print("================================")
print("Test Error = %g" % (1.0 - accuracy))
print("Test Set accuracy = " +str(accuracy))

Desision Tree for the adult data
Test Error = 0.184284
Test Set accuracy = 0.815715912620354


#  Random Forest Classifier for the adult data.

```
# This is formatted as code
```



In [100]:
rf_adult=RandomForestClassifier(labelCol="ClassIndex",\
featuresCol="features",numTrees=10)
model=rf_adult.fit(trainingData_adult)
predictions=model.transform(testData_adult)

evaluator= \
MulticlassClassificationEvaluator(labelCol="ClassIndex",\
predictionCol="prediction",metricName="accuracy")
accuracy=evaluator.evaluate(predictions)

print("Random Forest for the adult data")
print("================================")
print("Test Error = %g" % (1.0 - accuracy))
print("Test Set accuracy = " +str(accuracy))


Random Forest for the adult data
Test Error = 0.180868
Test Set accuracy = 0.8191324153639093


# Naive Bayes Classifier for the adult data.

In [101]:
nb_adult=NaiveBayes(labelCol="ClassIndex",\
featuresCol="features",smoothing=1.0,\
modelType="multinomial")
model=nb_adult.fit(trainingData_adult)

predictions=model.transform(testData_adult)
            
evaluator= \
MulticlassClassificationEvaluator(labelCol="ClassIndex",\
predictionCol="prediction",metricName="accuracy")
accuracy=evaluator.evaluate(predictions)

print("Naive Bayes for the adult data")
print("==============================")
print("Test Error = %g" % (1.0 - accuracy))
print("Test Set accuracy = " +str(accuracy))                   

Naive Bayes for the adult data
Test Error = 0.216482
Test Set accuracy = 0.7835179625220002
