## Welcome to this course "Getting started with Apache Spark"
## Video: Machine Learning pipelines in PySpark

![PySpark](https://drive.google.com/uc?id=1oU2tHXn4Tb4NJ0GQLbFQanLUVWj-3M-G)

## Contents
- Setting up the environment
- Read data
- Building models without Pipelines - Previous video
- Leverage Spark ML pipelines

## Setting up the PySpark environment
- Check out this video for more details: https://www.youtube.com/watch?v=r5PbUuLUZiE
  - You can check out the link in the description below
- You can use the below cell to install all the required libraries and files

In [1]:
# Setting up the PySpark environment
%%capture

# Install java 8
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Download Apache Spark binary: This link can change based on the version. Update this link with the latest version before using
!wget -q https://downloads.apache.org/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz

# Unzip file
!tar -xf spark-3.0.3-bin-hadoop2.7.tgz

# Install findspark: Adds Pyspark to sys.path at runtime
!pip install -q findspark

# Install pyspark
!pip install pyspark

# Add environmental variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.3-bin-hadoop2.7"

# findspark will locate spark in the system
import findspark
findspark.init()

### Initialize SparkSession

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
        .master("local") \
        .appName("Hands-on PySpark on Google Colab") \
        .getOrCreate()

In [3]:
spark

### Read data
Dataset (Wine quality red): https://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-red.csv

In [4]:
!wget -q https://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-red.csv -P sample_data/

In [5]:
# We can set header='true' and inferSchema='true' to infer the schema while reading the data

filepath = "sample_data/winequality-red.csv"
spark_df = spark.read.format('csv').options(header='true', inferSchema='true', delimiter=';').load(filepath)
spark_df.show(5, truncate=False)

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|pH  |sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|7.4          |0.7             |0.0        |1.9           |0.076    |11.0               |34.0                |0.9978 |3.51|0.56     |9.4    |5      |
|7.8          |0.88            |0.0        |2.6           |0.098    |25.0               |67.0                |0.9968 |3.2 |0.68     |9.8    |5      |
|7.8          |0.76            |0.04       |2.3           |0.092    |15.0               |54.0                |0.997  |3.26|0.65     |9.8    |5      |
|11.2         |0.28            |0.56       |1.9           |0.075    |17.0               |60.0       

In [6]:
spark_df.select("quality").distinct().show()

+-------+
|quality|
+-------+
|      6|
|      3|
|      5|
|      4|
|      8|
|      7|
+-------+



## Introduction to SparkML and Pipelines

![PySpark](https://drive.google.com/uc?id=12JHUfoimGe8k5VOqDenLCw4ZHwCWuOvW)

References: https://spark.apache.org/docs/latest/ml-pipeline.html

***Check out this video***

Before going into this video, I highly recommend you to watch my previous video to understand how to do preprocessing, modeling and predictions without Pipelines. Below is the code that was used in the previous video.

Link to the video: https://www.youtube.com/watch?v=UR2zQwwoP08

In [8]:
from pyspark.sql import functions as F
from pyspark.sql import types as T

# StringIndexer is similar to labelencoder which gives a label to each category
# OneHotEncoder created onehot encoding vector
from pyspark.ml.feature import StringIndexer, OneHotEncoder

# VectorAssembler is used to create vector from the features. MOdeling takes vector as an input
from pyspark.ml.feature import VectorAssembler

# DecisionTreeClassifier is used for classiication problems
from pyspark.ml.classification import DecisionTreeClassifier

In [9]:
# Create a categorical column for explanation purpose
spark_df = spark_df.withColumn("alcohol", F.when(F.col("alcohol") > 10.5, "High").otherwise("Low"))
spark_df.show(3, truncate=False)

spark_df.groupby("alcohol").count().show(), spark_df.select("quality").distinct().show()

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|pH  |sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|7.4          |0.7             |0.0        |1.9           |0.076    |11.0               |34.0                |0.9978 |3.51|0.56     |Low    |5      |
|7.8          |0.88            |0.0        |2.6           |0.098    |25.0               |67.0                |0.9968 |3.2 |0.68     |Low    |5      |
|7.8          |0.76            |0.04       |2.3           |0.092    |15.0               |54.0                |0.997  |3.26|0.65     |Low    |5      |
+-------------+----------------+-----------+--------------+---------+-------------------+-----------

(None, None)

In [10]:
(train_df, test_df) = spark_df.randomSplit([0.8, 0.2], 11)
print("Number of train samples: " + str(train_df.count()))
print("Number of test samples: " + str(test_df.count()))

Number of train samples: 1279
Number of test samples: 320


## Modeling - Without pipelines

In [11]:
# Label Encoding of categorical variables
alcohol_indexer = StringIndexer(inputCol="alcohol", outputCol="alcoholIndex")
alcohol_indexer = alcohol_indexer.fit(train_df)
train_df = alcohol_indexer.transform(train_df)

# Convert features into vecotor
inputCols = ['fixed acidity', 'volatile acidity', 'citric acid', 'residual sugar', 'chlorides', 'free sulfur dioxide', 'total sulfur dioxide', 'density', 'pH', 'sulphates', 'alcoholIndex']
outputCol = "features"
vector_assembler = VectorAssembler(inputCols = inputCols, outputCol = outputCol)
train_df = vector_assembler.transform(train_df)

# Select feature vector and label
modeling_df = train_df.select(['features', 'quality'])

# Create DecisionTreeClassifier model
dt_model = DecisionTreeClassifier(labelCol="quality", featuresCol="features")

# Train model with Training Data
dt_model = dt_model.fit(modeling_df)

# Do predictions on train data
predictions = dt_model.transform(modeling_df)
predictions.show(5, truncate=False)

# Predictions on test data
test_df = alcohol_indexer.transform(test_df)
test_df = vector_assembler.transform(test_df)
test_predictions = dt_model.transform(test_df)

test_predictions.show(3, truncate=False)

+--------------------------------------------------------+-------+----------------------------------------+----------------------------------------------------------------------------------------------------------------------+----------+
|features                                                |quality|rawPrediction                           |probability                                                                                                           |prediction|
+--------------------------------------------------------+-------+----------------------------------------+----------------------------------------------------------------------------------------------------------------------+----------+
|[4.6,0.52,0.15,2.1,0.054,8.0,65.0,0.9934,3.9,0.56,1.0]  |4      |[0.0,0.0,0.0,3.0,4.0,31.0,11.0,1.0,0.0] |[0.0,0.0,0.0,0.06,0.08,0.62,0.22,0.02,0.0]                                                                            |5.0       |
|[4.7,0.6,0.17,2.3,0.058,17.0,106.0,0.9932,3.85,

## Using pipelines

In [12]:
# Import pipeline from PySpark ML
from pyspark.ml import Pipeline

In [13]:
(train_df, test_df) = spark_df.randomSplit([0.8, 0.2], 11)
print("Number of train samples: " + str(train_df.count()))
print("Number of test samples: " + str(test_df.count()))

Number of train samples: 1279
Number of test samples: 320


In [14]:
# StringIndexer
alcohol_indexer = StringIndexer(inputCol="alcohol", outputCol="alcoholIndex")

# VectorAssembler
inputCols = ['fixed acidity', 'volatile acidity', 'citric acid', 'residual sugar', 'chlorides', 'free sulfur dioxide', 'total sulfur dioxide', 'density', 'pH', 'sulphates', 'alcoholIndex']
outputCol = "features"
vector_assembler = VectorAssembler(inputCols = inputCols, outputCol = outputCol)

# Modeling using DecisionTreeClassifier
dt_model = DecisionTreeClassifier(labelCol="quality", featuresCol="features")

In [15]:
# Setup the pipeline
pipeline = Pipeline(stages=[alcohol_indexer, vector_assembler, dt_model])

# Fit the pipeline model
final_pipeline = pipeline.fit(train_df)

# Predict on test data
test_predictions_from_pipeline = final_pipeline.transform(test_df)

test_predictions_from_pipeline.show(5, truncate=False)

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+------------+---------------------------------------------------------+----------------------------------------+-----------------------------------------------------------------------------------------------------+----------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|pH  |sulphates|alcohol|quality|alcoholIndex|features                                                 |rawPrediction                           |probability                                                                                          |prediction|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+------------+---------------------------------------------------------+------------------------------

## Summary
- In this video, we saw how to use Spark ML pipelines to build ML pipelines

### Thank you :)
-  That's the end of the this video. If you like this video, please do like, share and subscribe to my channel.
- If you are on LinkedIn, please tag me and share your thoughts on this video and the series "Getting started with PySpark - Hands on". This will motivate me to make more videos.
<div>
<img src="https://drive.google.com/uc?id=1ttB2gJaw0cXuJfj6GBx5VaYf2ArjiRXM" width="200"/>
</div>