## Apache Spark Machine Learning using Dataframes in Google Colab

1.	Setup an Apache Spark instance in Google Colab

In [1]:
# install java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# install spark (change the version number if needed)
!wget -q https://archive.apache.org/dist/spark/spark-3.0.2/spark-3.0.2-bin-hadoop2.7.tgz

# unzip the spark file to the current folder
!tar xf spark-3.0.2-bin-hadoop2.7.tgz

# set your spark folder to your system path environment. 
import os
os.environ["SPARK_HOME"] = "/content/spark-3.0.2-bin-hadoop2.7"

# install findspark using pip
!pip install -q findspark
import findspark
findspark.init()

2.	Create a Spark session

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
          .master("local")\
          .appName("Colab")\
          .config('spark.ui.port', '4050')\
          .getOrCreate()

spark

3.	Download dataset

In [3]:
!wget "https://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-red.csv" -O sample_data/wine-quality.csv

--2022-03-20 13:15:44--  https://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-red.csv
Resolving archive.ics.uci.edu (archive.ics.uci.edu)... 128.195.10.252
Connecting to archive.ics.uci.edu (archive.ics.uci.edu)|128.195.10.252|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 84199 (82K) [application/x-httpd-php]
Saving to: ‘sample_data/wine-quality.csv’


2022-03-20 13:15:45 (814 KB/s) - ‘sample_data/wine-quality.csv’ saved [84199/84199]



4.	Import the Iris dataset into a dataframe and use df.show() to display.

In [4]:
df = spark.read.csv("sample_data/wine-quality.csv", inferSchema=True, header=True, sep=';')

In [5]:
df.show()

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|          7.4|             0.7|        0.0|           1.9|    0.076|               11.0|                34.0| 0.9978|3.51|     0.56|    9.4|      5|
|          7.8|            0.88|        0.0|           2.6|    0.098|               25.0|                67.0| 0.9968| 3.2|     0.68|    9.8|      5|
|          7.8|            0.76|       0.04|           2.3|    0.092|               15.0|                54.0|  0.997|3.26|     0.65|    9.8|      5|
|         11.2|            0.28|       0.56|           1.9|    0.075|               17.0|           

In [6]:
df.printSchema()

root
 |-- fixed acidity: double (nullable = true)
 |-- volatile acidity: double (nullable = true)
 |-- citric acid: double (nullable = true)
 |-- residual sugar: double (nullable = true)
 |-- chlorides: double (nullable = true)
 |-- free sulfur dioxide: double (nullable = true)
 |-- total sulfur dioxide: double (nullable = true)
 |-- density: double (nullable = true)
 |-- pH: double (nullable = true)
 |-- sulphates: double (nullable = true)
 |-- alcohol: double (nullable = true)
 |-- quality: integer (nullable = true)



In [7]:
print(df.columns)

['fixed acidity', 'volatile acidity', 'citric acid', 'residual sugar', 'chlorides', 'free sulfur dioxide', 'total sulfur dioxide', 'density', 'pH', 'sulphates', 'alcohol', 'quality']


5.	Spark ML can only deal with one features column - so we need to vectorise the multiple columns into one.

In [8]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

vector_assembler = VectorAssembler(\
                                   inputCols=['fixed acidity', 'volatile acidity', 'citric acid', 
                                              'residual sugar', 'chlorides', 'free sulfur dioxide', 'total sulfur dioxide', 
                                              'density', 'pH', 'sulphates', 'alcohol'],
                                   outputCol="features")
df_temp = vector_assembler.transform(df)
df_temp.show(3)

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+--------------------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|            features|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+--------------------+
|          7.4|             0.7|        0.0|           1.9|    0.076|               11.0|                34.0| 0.9978|3.51|     0.56|    9.4|      5|[7.4,0.7,0.0,1.9,...|
|          7.8|            0.88|        0.0|           2.6|    0.098|               25.0|                67.0| 0.9968| 3.2|     0.68|    9.8|      5|[7.8,0.88,0.0,2.6...|
|          7.8|            0.76|       0.04|           2.3|    0.092|               15.0|                54.0|  0.997|3.26|     0.65|    9.8|    

Drop the original feature columns and just display Class & features.

In [9]:
df = df_temp.drop('fixed acidity', 'volatile acidity', 'citric acid', 'residual sugar',
                  'chlorides', 'free sulfur dioxide', 'total sulfur dioxide', 'density',
                  'pH', 'sulphates', 'alcohol')
df.show(3)

+-------+--------------------+
|quality|            features|
+-------+--------------------+
|      5|[7.4,0.7,0.0,1.9,...|
|      5|[7.8,0.88,0.0,2.6...|
|      5|[7.8,0.76,0.04,2....|
+-------+--------------------+
only showing top 3 rows



6.	Convert the output variable (quality) to a binary output - 0 for low quality wines with ratings below 7, 1 for high quality wine.

In [10]:
import pyspark.sql.functions as F
from pyspark.sql.types import *

def quality_class(value):
  if   value < 7: 
      return 0
  else:
      return 1

#convert to a UDF Function by passing in the function and return type of function
udf_quality_func= F.udf(quality_class, IntegerType())
df = df.withColumn("quality_class", udf_quality_func("quality"))
df.show()

+-------+--------------------+-------------+
|quality|            features|quality_class|
+-------+--------------------+-------------+
|      5|[7.4,0.7,0.0,1.9,...|            0|
|      5|[7.8,0.88,0.0,2.6...|            0|
|      5|[7.8,0.76,0.04,2....|            0|
|      6|[11.2,0.28,0.56,1...|            0|
|      5|[7.4,0.7,0.0,1.9,...|            0|
|      5|[7.4,0.66,0.0,1.8...|            0|
|      5|[7.9,0.6,0.06,1.6...|            0|
|      7|[7.3,0.65,0.0,1.2...|            1|
|      7|[7.8,0.58,0.02,2....|            1|
|      5|[7.5,0.5,0.36,6.1...|            0|
|      5|[6.7,0.58,0.08,1....|            0|
|      5|[7.5,0.5,0.36,6.1...|            0|
|      5|[5.6,0.615,0.0,1....|            0|
|      5|[7.8,0.61,0.29,1....|            0|
|      5|[8.9,0.62,0.18,3....|            0|
|      5|[8.9,0.62,0.19,3....|            0|
|      7|[8.5,0.28,0.56,1....|            1|
|      5|[8.1,0.56,0.28,1....|            0|
|      4|[7.4,0.59,0.08,4....|            0|
|      6|[

7.	Split your data into training and test datasets.

In [11]:
(trainingData, testData) = df.randomSplit([0.7, 0.3])

8.	**Decision Tree Classifier** \
Specify the DecisionTreeClassifier and train the model on your training dataset.


In [12]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

dt = DecisionTreeClassifier(labelCol="quality_class", featuresCol="features")
model = dt.fit(trainingData)

9.	Test your model with your test dataset.

In [13]:
predictions = model.transform(testData)

predictions.select("prediction", "quality_class").show(15)

+----------+-------------+
|prediction|quality_class|
+----------+-------------+
|       0.0|            0|
|       0.0|            0|
|       0.0|            0|
|       0.0|            0|
|       0.0|            0|
|       0.0|            0|
|       0.0|            0|
|       0.0|            0|
|       0.0|            0|
|       0.0|            0|
|       0.0|            0|
|       0.0|            0|
|       0.0|            0|
|       0.0|            0|
|       0.0|            0|
+----------+-------------+
only showing top 15 rows



10.	Run an evaluator function to show the accuracy of your model.

In [14]:
evaluator = MulticlassClassificationEvaluator(\
                                              labelCol="quality_class", predictionCol="prediction",\
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))
print("Test set accuracy = " + str(accuracy))

Test Error = 0.11336
Test set accuracy = 0.8866396761133604


11.	**Random Forest Classifier** \
Specify the RandomForestClassifier, train the model on your training dataset, predict using your test dataset, and run an evaluator to test accuracy.


In [15]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol="quality_class",\
                            featuresCol="features", numTrees=10)
model = rf.fit(trainingData)
predictions = model.transform(testData)
predictions.select("prediction", "quality_class").show(10)

+----------+-------------+
|prediction|quality_class|
+----------+-------------+
|       0.0|            0|
|       0.0|            0|
|       0.0|            0|
|       0.0|            0|
|       0.0|            0|
|       0.0|            0|
|       0.0|            0|
|       0.0|            0|
|       0.0|            0|
|       0.0|            0|
+----------+-------------+
only showing top 10 rows



In [16]:
evaluator = MulticlassClassificationEvaluator(labelCol="quality_class",\
                                              predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))
print("Test set accuracy = " + str(accuracy))

Test Error = 0.117409
Test set accuracy = 0.8825910931174089


12.	**Naive Bayes Classifier** \
Specify the NaiveBayes classifier, train the model on your training dataset, predict using your test dataset, and run an evaluator to test accuracy.


In [17]:
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes(labelCol="quality_class",
                featuresCol="features",
                smoothing=1.0,
                modelType="multinomial"
                )
model = nb.fit(trainingData)

In [18]:
predictions = model.transform(testData)
predictions.select("quality_class", "probability", "prediction").show(10)

+-------------+--------------------+----------+
|quality_class|         probability|prediction|
+-------------+--------------------+----------+
|            0|[0.13580265851518...|       1.0|
|            0|[0.98718334580067...|       0.0|
|            0|[0.09326372034584...|       1.0|
|            0|[0.94372234374448...|       0.0|
|            0|[0.46083870205651...|       1.0|
|            0|[0.64812442726179...|       0.0|
|            0|[0.10785404815044...|       1.0|
|            0|[0.74401600490067...|       0.0|
|            0|[0.55114820941214...|       0.0|
|            0|[0.08912523847028...|       1.0|
+-------------+--------------------+----------+
only showing top 10 rows



In [19]:
evaluator = MulticlassClassificationEvaluator(labelCol="quality_class",
                                              predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))
print("Test set accuracy = " + str(accuracy))

Test Error = 0.309717
Test set accuracy = 0.6902834008097166
