<a href="https://colab.research.google.com/github/amjadraza/spark-ml-course/blob/main/week-2/Session_3_4_Machine_Learning_with_Spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Welcome to the course: Machine Learning with Spark (Session 3 & 4)

This course is offered by in collaboration of  [AI-LOUNG](http://ai-lounge.com/) & [Datafy2AI](https://www.datafy2ai.com/)

Instructor: [Dr. Muhammad Amjad Raza](https://www.linkedin.com/in/amjadraza/)

## Refresher:


1.   What is Spark?
2.   Setting Up Spark within Google Colab Environment
3.   Basics of Spark Operations?
4.   Spark DataFrame API using PySpark


## Learning Objective of this Session:


1. Basics of Machine Learning?
2. DataPipelines with Spark
3. Feature Engineering with Spark
4. Feature Engineering with KOALAS
4. Machine Leraning with Spark

# A power Point Presentation: An Overview of Machine Learning

# A comprehensive Guide: Machine Learning with Spark

Machine learning is getting popular in solving real-world problems in almost every business domain. It helps solve the problems using the data which is often unstructured, noisy, and in huge size. With the increase in data sizes and various sources of data, solving formulating and machine learning problems using standard techniques pose a big challenge. Spark is a distributed processing engine using the MapReduce framework to solve problems related to big data and processing of it.

Spark framework has its own machine learning module called MLlib. In this session, I will use pyspark and spark MLlib to demonstrate the use of machine learning using distributed processing. Readers will be able to learn the below concept with real examples.

1. Setting up Spark in the Google Colaboratory
2. Machine Learning Basic Concepts
3. Preprocessing and Data Transformation using Spark
4. Spark Clustering with pyspark

> A working google colab notebook will be provided to reproduce the results.


# Setting up Spark 3.1.1 in the Google Colaboratory

As a first step, I configure the google colab runtime with spark installation. For details, readers may read my article [Getting Started Spark 3.0.0 in Google Colab](https://medium.com/analytics-vidhya/getting-started-spark3-0-0-with-google-colab-9796d350d78) om medium. 

We will install below programs

* Java 8
* spark-3.1.1
* Hadoop3.2 
* [Findspark](https://github.com/minrk/findspark)

you can install the LATEST version of Spark using below set of commands.

In [1]:
# Run below commands
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.osuosl.org/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [2]:
# Install Latest version of Plotly

# ! pip uninstall plotly
# ! pip install plotly='4.14.3'


In [3]:
! pip install loguru

Collecting loguru
[?25l  Downloading https://files.pythonhosted.org/packages/6d/48/0a7d5847e3de329f1d0134baf707b689700b53bd3066a5a8cfd94b3c9fc8/loguru-0.5.3-py3-none-any.whl (57kB)
[K     |█████▊                          | 10kB 13.9MB/s eta 0:00:01[K     |███████████▌                    | 20kB 11.1MB/s eta 0:00:01[K     |█████████████████▏              | 30kB 10.7MB/s eta 0:00:01[K     |███████████████████████         | 40kB 7.2MB/s eta 0:00:01[K     |████████████████████████████▋   | 51kB 4.5MB/s eta 0:00:01[K     |████████████████████████████████| 61kB 3.0MB/s 
[?25hInstalling collected packages: loguru
Successfully installed loguru-0.5.3


In [4]:
from loguru import logger

## Environment Variable 
After installing the spark and Java, set the enviroment variables where Spark and Java are installed.

In [5]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

## Spark Installation test
Lets test the installation of spark in our google colab environment. 

In [6]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()
# Test the spark 
df = spark.createDataFrame([{"hello": "world"} for x in range(1000)])

df.show(3, False)



+-----+
|hello|
+-----+
|world|
|world|
|world|
+-----+
only showing top 3 rows



In [7]:
# make sure the version of pyspark
import pyspark
logger.info(pyspark.__version__)


2021-04-13 04:24:01.724 | INFO     | __main__:<module>:3 - 3.1.1


# Machine Learning 

Once, we have set up the spark in google colab and made sure it is running with the correct version i.e. 3.0.1 in this case, we can start exploring the machine learning API developed on top of Spark. Pyspark is a higher level of API to use spark with python. For this tutorial, I assume the readers have a basic understanding of Machine learning and used SK-Learn for model building and training. Spark MLlib used the same fit and predict structure as in SK-Learn. 

In order to reproduce the results, I have uploaded the data to my GitHub and can be accessed easily.

> Learn by Doing: Use the colab notebook to run it yourself


# PySpark: MLlib

> DataFrame based API is primary API

Machine Learning at Scale

* **ML Algorithms:** common learning algorithms such as classification, regression, clustering, and collaborative filtering
* **Featurization:** feature extraction, transformation, dimensionality reduction, and selection
* **Pipelines:** tools for constructing, evaluating, and tuning ML Pipelines
* **Persistence:** saving and load algorithms, models, and Pipelines
* **Utilities:** linear algebra, statistics, data handling, etc.

[Reference](https://spark.apache.org/docs/3.1.1/ml-guide.html)

## Basic Statistics using PySpark

When doing Machine Learning Linear Algebra and Basic Stats are key concepts to implement and study.

Lets Learn below concepts using PySpark implementation

1. Correlation
2. Hypothesis testing
3. Summarizer



### Correlation

In [8]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.stat import Correlation


In [14]:
# Create the dummy data
features_df = spark.createDataFrame([
    (1, Vectors.dense([10.0,10000.0,1.0]),),
    (2, Vectors.dense([20.0,30000.0,2.0]),),
    (3, Vectors.dense([30.0,40000.0,3.0]),),
    
],["id", "features"] )

In [9]:
data

[(SparseVector(4, {0: 1.0, 3: -2.0}),),
 (DenseVector([4.0, 5.0, 0.0, 3.0]),),
 (DenseVector([6.0, 7.0, 0.0, 8.0]),),
 (SparseVector(4, {0: 9.0, 3: 1.0}),)]

In [10]:
df = spark.createDataFrame(data, ["features"])

In [11]:
df.show()

+--------------------+
|            features|
+--------------------+
|(4,[0,3],[1.0,-2.0])|
|   [4.0,5.0,0.0,3.0]|
|   [6.0,7.0,0.0,8.0]|
| (4,[0,3],[9.0,1.0])|
+--------------------+



In [16]:
features_df.show()

+---+------------------+
| id|          features|
+---+------------------+
|  1|[10.0,10000.0,1.0]|
|  2|[20.0,30000.0,2.0]|
|  3|[30.0,40000.0,3.0]|
+---+------------------+



In [15]:
r1 = Correlation.corr(features_df, "features").head()
print("Pearson correlation matrix:\n" + str(r1[0]))

Pearson correlation matrix:
DenseMatrix([[1.        , 0.98198051, 1.        ],
             [0.98198051, 1.        , 0.98198051],
             [1.        , 0.98198051, 1.        ]])


In [21]:
r1[0]

DenseMatrix(3, 3, [1.0, 0.982, 1.0, 0.982, 1.0, 0.982, 1.0, 0.982, 1.0], False)

In [18]:
# Rank Correlation
r2 = Correlation.corr(features_df, "features", "spearman").head()
print("Spearman correlation matrix:\n" + str(r2[0]))

Spearman correlation matrix:
DenseMatrix([[1., 1., 1.],
             [1., 1., 1.],
             [1., 1., 1.]])


### Hypothesis testing


In [22]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.stat import ChiSquareTest

data = [(0.0, Vectors.dense(0.5, 10.0)),
        (0.0, Vectors.dense(1.5, 20.0)),
        (1.0, Vectors.dense(1.5, 30.0)),
        (0.0, Vectors.dense(3.5, 30.0)),
        (0.0, Vectors.dense(3.5, 40.0)),
        (1.0, Vectors.dense(3.5, 40.0))]
df = spark.createDataFrame(data, ["label", "features"])

In [23]:
df.show()

+-----+----------+
|label|  features|
+-----+----------+
|  0.0|[0.5,10.0]|
|  0.0|[1.5,20.0]|
|  1.0|[1.5,30.0]|
|  0.0|[3.5,30.0]|
|  0.0|[3.5,40.0]|
|  1.0|[3.5,40.0]|
+-----+----------+



In [24]:
# ChiSquareTest

r = ChiSquareTest.test(df, "features", "label").head()
print("pValues: " + str(r.pValues))
print("degreesOfFreedom: " + str(r.degreesOfFreedom))
print("statistics: " + str(r.statistics))

pValues: [0.6872892787909721,0.6822703303362126]
degreesOfFreedom: [2, 3]
statistics: [0.75,1.5]


## ML Pipelines

Machine Learning Pipelines: High level APIs built on top DataFrame API 

* **DataFrame:** This ML API uses DataFrame from Spark SQL as an ML dataset, which can hold a variety of data types. E.g., a DataFrame could have different columns storing text, feature vectors, true labels, and predictions.

* **Transformer:** A Transformer is an algorithm which can transform one DataFrame into another DataFrame. E.g., an ML model is a Transformer which transforms a DataFrame with features into a DataFrame with predictions.

* **Estimator:** An Estimator is an algorithm which can be fit on a DataFrame to produce a Transformer. E.g., a learning algorithm is an Estimator which trains on a DataFrame and produces a model.

* **Pipeline:** A Pipeline chains multiple Transformers and Estimators together to specify an ML workflow.

* **Parameter:** All Transformers and Estimators now share a common API for specifying parameters.

> Train, Predict API format | Inspired by Sk-Learn API

## [Advanced] Model Selection/Hyperparameters Tuning

* **CrossValidator:** A Class to implement cross validation using k-fold method. Hyperparameters are fit using single fold and validated on other folds.
* **TrainValidationSplit:**  A class to fine tune the hyper parameters by spliting the training data into train and valid datasets. 
* **Evauator:** A metric to measure how well a fitted Model does on held-out test/validation data

> Examples: 

> https://spark.apache.org/docs/3.1.1/ml-tuning.html

### Example of Pipeline

In [25]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer

# Prepare training documents from a list of (id, text, label) tuples.
training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])

# Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

# Fit the pipeline to training documents.
model = pipeline.fit(training)

# Prepare test documents, which are unlabeled (id, text) tuples.
test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "spark hadoop spark"),
    (7, "apache hadoop")
], ["id", "text"])

# Make predictions on test documents and print columns of interest.
prediction = model.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
    rid, text, prob, prediction = row  # type: ignore
    print(
        "(%d, %s) --> prob=%s, prediction=%f" % (
            rid, text, str(prob), prediction   # type: ignore
        )
    )

(4, spark i j k) --> prob=[0.1596407738787412,0.8403592261212588], prediction=1.000000
(5, l m n) --> prob=[0.8378325685476614,0.16216743145233858], prediction=0.000000
(6, spark hadoop spark) --> prob=[0.06926633132976266,0.9307336686702373], prediction=1.000000
(7, apache hadoop) --> prob=[0.9821575333444208,0.017842466655579203], prediction=0.000000


# Data Preparation and Transformations in Spark

This section covers the basic steps involved in transformations of input feature data into the format Machine Learning algorithms accept. We will be covering the transformations coming with the SparkML library. To understand or read more about the available spark transformations in 3.1.1, follow the below link.

https://spark.apache.org/docs/3.1.1/ml-features.html

## Normalize Numeric Data

MinMaxScaler is one of the favorite classes shipped with most machine learning libraries. It scaled the data between 0 and 1.

In [None]:
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors

In [None]:
# Create some dummy feature data
features_df = spark.createDataFrame([
    (1, Vectors.dense([10.0,10000.0,1.0]),),
    (2, Vectors.dense([20.0,30000.0,2.0]),),
    (3, Vectors.dense([30.0,40000.0,3.0]),),
    
],["id", "features"] )

In [None]:
features_df.show()

+---+------------------+
| id|          features|
+---+------------------+
|  1|[10.0,10000.0,1.0]|
|  2|[20.0,30000.0,2.0]|
|  3|[30.0,40000.0,3.0]|
+---+------------------+



In [None]:
# Apply MinMaxScaler transformation
features_scaler = MinMaxScaler(inputCol = "features", outputCol = "sfeatures")
smodel = features_scaler.fit(features_df)
sfeatures_df = smodel.transform(features_df)

In [None]:
sfeatures_df.show()

+---+------------------+--------------------+
| id|          features|           sfeatures|
+---+------------------+--------------------+
|  1|[10.0,10000.0,1.0]|           (3,[],[])|
|  2|[20.0,30000.0,2.0]|[0.5,0.6666666666...|
|  3|[30.0,40000.0,3.0]|       [1.0,1.0,1.0]|
+---+------------------+--------------------+



## Standardize Numeric Data

StandardScaler is another well-known class written with machine learning libraries. It normalizes the data between -1 and 1 and converts the data into bell-shaped data. You can demean the data and scale to some variance.

In [None]:
from pyspark.ml.feature import  StandardScaler
from pyspark.ml.linalg import Vectors

In [26]:
# Create the dummy data
features_df = spark.createDataFrame([
    (1, Vectors.dense([10.0,10000.0,1.0]),),
    (2, Vectors.dense([20.0,30000.0,2.0]),),
    (3, Vectors.dense([30.0,40000.0,3.0]),),
    
],["id", "features"] )

In [None]:
# Apply the StandardScaler model
features_stand_scaler = StandardScaler(inputCol = "features", outputCol = "sfeatures", withStd=True, withMean=True)
stmodel = features_stand_scaler.fit(features_df)
stand_sfeatures_df = stmodel.transform(features_df)

In [None]:
stand_sfeatures_df.show()

+---+------------------+--------------------+
| id|          features|           sfeatures|
+---+------------------+--------------------+
|  1|[10.0,10000.0,1.0]|[-1.0,-1.09108945...|
|  2|[20.0,30000.0,2.0]|[0.0,0.2182178902...|
|  3|[30.0,40000.0,3.0]|[1.0,0.8728715609...|
+---+------------------+--------------------+



## Bucketize Numeric Data

The real data sets come with various ranges and sometimes it is advisable to transform the data into well-defined buckets before plugging into machine learning algorithms.

Bucketizer class is handy to transform the data into various buckets.

In [None]:
from pyspark.ml.feature import  Bucketizer
from pyspark.ml.linalg import Vectors

In [None]:
# Define the splits for buckets
splits = [-float("inf"), -10, 0.0, 10, float("inf")]
b_data = [(-800.0,), (-10.5,), (-1.7,), (0.0,), (8.2,), (90.1,)]
b_df = spark.createDataFrame(b_data, ["features"])

In [None]:
b_df.show()

+--------+
|features|
+--------+
|  -800.0|
|   -10.5|
|    -1.7|
|     0.0|
|     8.2|
|    90.1|
+--------+



In [None]:
# Transforming data into buckets
bucketizer = Bucketizer(splits=splits, inputCol= "features", outputCol="bfeatures")
bucketed_df = bucketizer.transform(b_df)

In [None]:
bucketed_df.show()

+--------+---------+
|features|bfeatures|
+--------+---------+
|  -800.0|      0.0|
|   -10.5|      0.0|
|    -1.7|      1.0|
|     0.0|      2.0|
|     8.2|      2.0|
|    90.1|      3.0|
+--------+---------+



## Tokenize text Data

Natural Language Processing is one of the main applications of Machine learning. One of the first steps for NLP is tokenizing the text into words or token. We can utilize the Tokenizer class with SparkML to perform this task.

In [None]:
from pyspark.ml.feature import  Tokenizer

In [None]:
sentences_df = spark.createDataFrame([
    (1, "This is an introduction to sparkMlib"),
    (2, "Mlib incluse libraries fro classfication and regression"),
    (3, "It also incluses support for data piple lines"),
    
], ["id", "sentences"])

In [None]:
sentences_df.show()

+---+--------------------+
| id|           sentences|
+---+--------------------+
|  1|This is an introd...|
|  2|Mlib incluse libr...|
|  3|It also incluses ...|
+---+--------------------+



In [None]:
sent_token = Tokenizer(inputCol = "sentences", outputCol = "words")
sent_tokenized_df = sent_token.transform(sentences_df)

In [None]:
sent_tokenized_df.take(10)

[Row(id=1, sentences='This is an introduction to sparkMlib', words=['this', 'is', 'an', 'introduction', 'to', 'sparkmlib']),
 Row(id=2, sentences='Mlib incluse libraries fro classfication and regression', words=['mlib', 'incluse', 'libraries', 'fro', 'classfication', 'and', 'regression']),
 Row(id=3, sentences='It also incluses support for data piple lines', words=['it', 'also', 'incluses', 'support', 'for', 'data', 'piple', 'lines'])]

##TF-IDF
Term frequency-inverse document frequency (TF-IDF) is a feature vectorization method widely used in text mining to reflect the importance of a term to a document in the corpus. Using the above-tokenized data, Let us apply the TF-IDF

In [None]:
from pyspark.ml.feature import HashingTF, IDF

In [None]:
hashingTF = HashingTF(inputCol = "words", outputCol = "rawfeatures", numFeatures = 20)
sent_fhTF_df = hashingTF.transform(sent_tokenized_df)

In [None]:
sent_fhTF_df.take(1)

[Row(id=1, sentences='This is an introduction to sparkMlib', words=['this', 'is', 'an', 'introduction', 'to', 'sparkmlib'], rawfeatures=SparseVector(20, {6: 2.0, 8: 1.0, 9: 1.0, 10: 1.0, 13: 1.0}))]

In [None]:
idf = IDF(inputCol = "rawfeatures", outputCol = "idffeatures")
idfModel = idf.fit(sent_fhTF_df)
tfidf_df = idfModel.transform(sent_fhTF_df)

In [None]:
tfidf_df.take(1)

[Row(id=1, sentences='This is an introduction to sparkMlib', words=['this', 'is', 'an', 'introduction', 'to', 'sparkmlib'], rawfeatures=SparseVector(20, {6: 2.0, 8: 1.0, 9: 1.0, 10: 1.0, 13: 1.0}), idffeatures=SparseVector(20, {6: 0.5754, 8: 0.6931, 9: 0.0, 10: 0.6931, 13: 0.2877}))]

> User can play with various transformations depending on the requirements of the problem in-hand. 

# Clustering Using PySpark

Clustering is a machine learning technique where the data is grouped into a reasonable number of classes using the input features. In this section, we study the basic application of clustering techniques using the spark ML framework. 

In [None]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans, BisectingKMeans
import glob

In [None]:
# Downloading the clustering dataset
!wget -q 'https://raw.githubusercontent.com/amjadraza/blogs-data/master/spark_ml/clustering_dataset.csv'

Load the clustering data stored in csv format using spark

In [None]:
# Read the data.
clustering_file_name ='clustering_dataset.csv'
import pandas as pd
# df = pd.read_csv(clustering_file_name)
cluster_df = spark.read.csv(clustering_file_name, header=True,inferSchema=True)

Convert the tabular data into vectorized format using `VectorAssembler` 

In [None]:
# Coverting the input data into features column
vectorAssembler = VectorAssembler(inputCols = ['col1', 'col2', 'col3'], outputCol = "features")
vcluster_df = vectorAssembler.transform(cluster_df)


In [None]:
vcluster_df.show(10)

+----+----+----+--------------+
|col1|col2|col3|      features|
+----+----+----+--------------+
|   7|   4|   1| [7.0,4.0,1.0]|
|   7|   7|   9| [7.0,7.0,9.0]|
|   7|   9|   6| [7.0,9.0,6.0]|
|   1|   6|   5| [1.0,6.0,5.0]|
|   6|   7|   7| [6.0,7.0,7.0]|
|   7|   9|   4| [7.0,9.0,4.0]|
|   7|  10|   6|[7.0,10.0,6.0]|
|   7|   8|   2| [7.0,8.0,2.0]|
|   8|   3|   8| [8.0,3.0,8.0]|
|   4|  10|   5|[4.0,10.0,5.0]|
+----+----+----+--------------+
only showing top 10 rows



Once the data is prepared into the format MLlib can use for models, now we can define and train the clustering algorithm such as K-Means. We can define the number of clusters and initialize the seed as done below.

In [None]:
# Applying the k-means algorithm
kmeans = KMeans().setK(3)
kmeans = kmeans.setSeed(1)
kmodel = kmeans.fit(vcluster_df)

After training has been finished, let us print the centers.

In [None]:
centers = kmodel.clusterCenters()
print("The location of centers: {}".format(centers))

The location of centers: [array([35.88461538, 31.46153846, 34.42307692]), array([80.        , 79.20833333, 78.29166667]), array([5.12, 5.84, 4.84])]


There are various kind of clustering algorithms implemented in MLlib. Bisecting K-Means Clustering is another popular method. 

In [None]:
# Applying Hierarchical Clustering
bkmeans = BisectingKMeans().setK(3)
bkmeans = bkmeans.setSeed(1)


In [None]:
bkmodel = bkmeans.fit(vcluster_df)
bkcneters = bkmodel.clusterCenters()

In [None]:
bkcneters

[array([5.12, 5.84, 4.84]),
 array([35.88461538, 31.46153846, 34.42307692]),
 array([80.        , 79.20833333, 78.29166667])]

To read more about the clustering methods implemented in MLlib, follow below link. https://spark.apache.org/docs/3.1.1/ml-clustering.html 

# Classification Using PySpark

Classification is one of the widely used Machine algorithms and almost every data engineer and data scientist must know about these algorithms. Once the data is loaded and prepared, I will demonstrate three classification algorithms.

1. NaiveBayes Classification
2. Multi-Layer Perceptron Classification
3. Decision Trees Classification


We explore the supervised classification algorithms using [IRIS data]( https://archive.ics.uci.edu/ml/datasets/iris). I have uploaded the data into my GitHub to reproduce the results. Users can download the data using below command.


In [None]:
# Downloading the clustering data
!wget -q "https://raw.githubusercontent.com/amjadraza/blogs-data/master/spark_ml/iris.csv"

In [None]:
df = pd.read_csv("https://raw.githubusercontent.com/amjadraza/blogs-data/master/spark_ml/iris.csv", header=None)

In [None]:
df.head()

Unnamed: 0,0,1,2,3,4
0,5.1,3.5,1.4,0.2,Iris-setosa
1,4.9,3.0,1.4,0.2,Iris-setosa
2,4.7,3.2,1.3,0.2,Iris-setosa
3,4.6,3.1,1.5,0.2,Iris-setosa
4,5.0,3.6,1.4,0.2,Iris-setosa


In [None]:
spark.createDataFrame(df, columns)

DataFrame[c_0: double, c_1: double, c_2: double, c_3: double, c4 : string]

## Preprocessing the Iris Data

In this section, we will be using the IRIS data to understand the classification. To perform ML models, we apply the preprocessing step on our input data.

In [None]:
from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import  StringIndexer


In [None]:
# Read the iris data
df_iris = pd.read_csv("https://raw.githubusercontent.com/amjadraza/blogs-data/master/spark_ml/iris.csv", header=None)
iris_df = spark.createDataFrame(df_iris)

In [None]:
iris_df.show(5, False)

+------------+-----------+------------+-----------+-----------+
|sepal_length|sepal_width|petal_length|petal_width|species    |
+------------+-----------+------------+-----------+-----------+
|5.1         |3.5        |1.4         |0.2        |Iris-setosa|
|4.9         |3.0        |1.4         |0.2        |Iris-setosa|
|4.7         |3.2        |1.3         |0.2        |Iris-setosa|
|4.6         |3.1        |1.5         |0.2        |Iris-setosa|
|5.0         |3.6        |1.4         |0.2        |Iris-setosa|
+------------+-----------+------------+-----------+-----------+
only showing top 5 rows



In [None]:
# Rename the columns
iris_df = iris_df.select(col("0").alias("sepal_length"),
                         col("1").alias("sepal_width"),
                         col("2").alias("petal_length"),
                         col("3").alias("petal_width"),
                         col("4").alias("species"),
                        )

In [None]:
# Converting the columns into features
vectorAssembler = VectorAssembler(inputCols = ["sepal_length", "sepal_width", "petal_length", "petal_width"],
                                  outputCol = "features")
viris_df = vectorAssembler.transform(iris_df)

In [None]:
viris_df.show(5, False)

+------------+-----------+------------+-----------+-----------+-----------------+
|sepal_length|sepal_width|petal_length|petal_width|species    |features         |
+------------+-----------+------------+-----------+-----------+-----------------+
|5.1         |3.5        |1.4         |0.2        |Iris-setosa|[5.1,3.5,1.4,0.2]|
|4.9         |3.0        |1.4         |0.2        |Iris-setosa|[4.9,3.0,1.4,0.2]|
|4.7         |3.2        |1.3         |0.2        |Iris-setosa|[4.7,3.2,1.3,0.2]|
|4.6         |3.1        |1.5         |0.2        |Iris-setosa|[4.6,3.1,1.5,0.2]|
|5.0         |3.6        |1.4         |0.2        |Iris-setosa|[5.0,3.6,1.4,0.2]|
+------------+-----------+------------+-----------+-----------+-----------------+
only showing top 5 rows



In [None]:
indexer = StringIndexer(inputCol="species", outputCol = "label")
iviris_df = indexer.fit(viris_df).transform(viris_df)

In [None]:
iviris_df.show(2, False)

+------------+-----------+------------+-----------+-----------+-----------------+-----+
|sepal_length|sepal_width|petal_length|petal_width|species    |features         |label|
+------------+-----------+------------+-----------+-----------+-----------------+-----+
|5.1         |3.5        |1.4         |0.2        |Iris-setosa|[5.1,3.5,1.4,0.2]|0.0  |
|4.9         |3.0        |1.4         |0.2        |Iris-setosa|[4.9,3.0,1.4,0.2]|0.0  |
+------------+-----------+------------+-----------+-----------+-----------------+-----+
only showing top 2 rows



## Naive Bayes Calssification

Once the data is prepared, we are ready to apply the first classification algorithm.

In [None]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


In [None]:
# Create the traing and test splits
splits = iviris_df.randomSplit([0.6,0.4], 1)
train_df = splits[0]
test_df = splits[1]

In [None]:
# Apply the Naive bayes classifier
nb = NaiveBayes(modelType="multinomial")
nbmodel = nb.fit(train_df)
predictions_df = nbmodel.transform(test_df)

In [None]:
predictions_df.show(1, False)

+------------+-----------+------------+-----------+-----------+-----------------+-----+------------------------------------------------------------+------------------------------------------------------------+----------+
|sepal_length|sepal_width|petal_length|petal_width|species    |features         |label|rawPrediction                                               |probability                                                 |prediction|
+------------+-----------+------------+-----------+-----------+-----------------+-----+------------------------------------------------------------+------------------------------------------------------------+----------+
|4.3         |3.0        |1.1         |0.1        |Iris-setosa|[4.3,3.0,1.1,0.1]|0.0  |[-9.966434726497221,-11.294595492758821,-11.956012812323921]|[0.7134106367667451,0.18902823898426235,0.09756112424899269]|0.0       |
+------------+-----------+------------+-----------+-----------+-----------------+-----+-----------------------------

Let us Evaluate the trained classifier

In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
nbaccuracy = evaluator.evaluate(predictions_df)
nbaccuracy

0.8275862068965517

## Multilayer Perceptron Classification

The second classifier we will be investigating is a Multi-layer perceptron. In this tutorial, I am not going into details of the optimal MLP network for this problem however in practice, you research the optimal network suitable to the problem in hand.

In [None]:
from pyspark.ml.classification import MultilayerPerceptronClassifier

In [None]:
# Define the MLP Classifier
layers = [4,5,5,3]
mlp = MultilayerPerceptronClassifier(layers = layers, seed=1)
mlp_model = mlp.fit(train_df)
mlp_predictions = mlp_model.transform(test_df)

In [None]:
# Evaluate the MLP classifier
mlp_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
mlp_accuracy = mlp_evaluator.evaluate(mlp_predictions)
mlp_accuracy

0.9827586206896551

## Decision Trees Classification

Another common classifier in the ML family is the Decision Tree Classifier, in this section, we explore this classifier.

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier

In [None]:
# Define the DT Classifier 
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")
dt_model = dt.fit(train_df)
dt_predictions = dt_model.transform(test_df)

In [None]:
# Evaluate the DT Classifier
dt_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
dt_accuracy = dt_evaluator.evaluate(dt_predictions)
dt_accuracy

0.9827586206896551

Apart from the above three demonstrated classification algorithms, Spark MLlib has also many other implementations of classification algorithms. Details of the implemented classification algorithms can be found at below link.

https://spark.apache.org/docs/3.0.1/ml-classification-regression.html#classification

It is highly recommended to try some of the classification algorithms to get hands-on.

# Regression using PySpark

In this section, we explore the Machine learning models for regression problems using pyspark.Regression models are helpful in predicting the future values using the past data. 

We will use the [Combined Cycle Power Plant](https://archive.ics.uci.edu/ml/datasets/combined+cycle+power+plant) data set to predict the net hourly electrical output (EP). I have uploaded the data to my GitHub so that users can reproduce the results.

In [None]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

In [None]:
# Read the iris data
df_ccpp = pd.read_csv("https://raw.githubusercontent.com/amjadraza/blogs-data/master/spark_ml/ccpp.csv")
pp_df = spark.createDataFrame(df_ccpp)

In [None]:
pp_df.show(2, False)

+-----+-----+-------+-----+------+
|AT   |V    |AP     |RH   |PE    |
+-----+-----+-------+-----+------+
|14.96|41.76|1024.07|73.17|463.26|
|25.18|62.96|1020.04|59.08|444.37|
+-----+-----+-------+-----+------+
only showing top 2 rows



In [None]:
# Create the feature column using VectorAssembler class
vectorAssembler = VectorAssembler(inputCols =["AT", "V", "AP", "RH"], outputCol = "features")
vpp_df = vectorAssembler.transform(pp_df)

In [None]:
vpp_df.show(2, False)

+-----+-----+-------+-----+------+---------------------------+
|AT   |V    |AP     |RH   |PE    |features                   |
+-----+-----+-------+-----+------+---------------------------+
|14.96|41.76|1024.07|73.17|463.26|[14.96,41.76,1024.07,73.17]|
|25.18|62.96|1020.04|59.08|444.37|[25.18,62.96,1020.04,59.08]|
+-----+-----+-------+-----+------+---------------------------+
only showing top 2 rows



## Linear Regression

We start with simplest regression technique i.e. Linear Regression. 

In [None]:
# Define and fit Linear Regression
lr = LinearRegression(featuresCol="features", labelCol="PE")
lr_model = lr.fit(vpp_df)

In [None]:
# Print and save the Model output
lr_model.coefficients
lr_model.intercept
lr_model.summary.rootMeanSquaredError

4.557126016749486

In [None]:
#lr_model.save()

## Decision Tree Regression

In thsi section, we explore the Decision Tree Regression commonly used in Machine learning.

In [None]:
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
vpp_df.show(2, False)

+-----+-----+-------+-----+------+---------------------------+
|AT   |V    |AP     |RH   |PE    |features                   |
+-----+-----+-------+-----+------+---------------------------+
|14.96|41.76|1024.07|73.17|463.26|[14.96,41.76,1024.07,73.17]|
|25.18|62.96|1020.04|59.08|444.37|[25.18,62.96,1020.04,59.08]|
+-----+-----+-------+-----+------+---------------------------+
only showing top 2 rows



In [None]:
# Define train and test data split
splits = vpp_df.randomSplit([0.7,0.3])
train_df = splits[0]
test_df = splits[1]

In [None]:
# Define the Decision Tree Model 
dt = DecisionTreeRegressor(featuresCol="features", labelCol="PE")
dt_model = dt.fit(train_df)
dt_predictions = dt_model.transform(test_df)

In [None]:
dt_predictions.show(1, False)

+----+-----+-------+-----+------+--------------------------+-----------------+
|AT  |V    |AP     |RH   |PE    |features                  |prediction       |
+----+-----+-------+-----+------+--------------------------+-----------------+
|3.31|39.42|1024.05|84.31|487.19|[3.31,39.42,1024.05,84.31]|486.1117703349283|
+----+-----+-------+-----+------+--------------------------+-----------------+
only showing top 1 row



In [None]:
# Evaluate the Model
dt_evaluator = RegressionEvaluator(labelCol="PE", predictionCol="prediction", metricName="rmse")
dt_rmse = dt_evaluator.evaluate(dt_predictions)
print("The RMSE of Decision Tree regression Model is {}".format(dt_rmse))

The RMSE of Decision Tree regression Model is 4.451790078736588


## Gradient Boosting Decision Tree Regression

Gradient Boosting is another common choice among ML professionals. Let us try the GBM in this section.

In [None]:
from pyspark.ml.regression import GBTRegressor

In [None]:
# Define the GBT Model
gbt = GBTRegressor(featuresCol="features", labelCol="PE")
gbt_model = gbt.fit(train_df)
gbt_predictions = gbt_model.transform(test_df)

In [None]:
# Evaluate the GBT Model
gbt_evaluator = RegressionEvaluator(labelCol="PE", predictionCol="prediction", metricName="rmse")
gbt_rmse = gbt_evaluator.evaluate(gbt_predictions)
print("The RMSE of GBT Tree regression Model is {}".format(gbt_rmse))

The RMSE of GBT Tree regression Model is 4.035802933864555


Apart from the above-demonstrated regression algorithms, Spark MLlib has also many other implementations of regression algorithms. Details of the implemented regression algorithms can be found at the below link.

https://spark.apache.org/docs/3.0.1/ml-classification-regression.html#regression

It is highly recommended to try some of the regression algorithms to get hands-on and play with the parameters.

# Conclusions

In this Session, I have tried to give the readers an opportunity to learn and implement basic Machine Learning algorithms using PySpark. Spark not only provide the benefit of distributed processing but also can handle a large amount of data to be processing. To summarise, we have covered below topics/algorithms

* Setting up the spark 3.1.1 in Google Colab
* Overview of PySpark's MLlib Module
* Overview of Data Transformations using PySpark
* Clustering algorithms using PySpark
* Classification problems using PySpark
* Regression Problems using PySpark

# References Readings/Links

1. https://spark.apache.org/docs/latest/ml-features.html
2. https://spark.apache.org/docs/3.0.1/ml-classification-regression.html#regression
3. https://spark.apache.org/docs/3.0.1/ml-clustering.html
4. https://spark.apache.org/docs/3.0.1/ml-classification-regression.html#classification
