<a href="https://colab.research.google.com/github/Blackman9t/Advanced-Data-Science/blob/master/SparkML_pipelines.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

A Pipeline is a very convenient process of designing our data preprocessing in Machine Learning flow.<br>There are certain steps which we must do before the actual ML begins. These steps are called data-preprocessing and/or feature engineering.<br>The cool thing about pipelines is that we get some sort of a recipe or list of predefined steps already for us.<br> These steps could include:<br>1. Assigning categorical values e.g 0 or 1<br>2. Normalising the range of values per dimension<br>3. One-hot encoding and then the final<br>4. Modeling... where we train our ML algorithm.<br>
So the idea is when using pipelines, we can maintain the same preprocessing and just switch out different modeling algorithnms or different parameter sets of the modeling algorithm without changing anything before. This is very very handy.<br>The overall idea of pipelines is that we can fuse our complete data processing flow into one single pipeline and that single pipeline we can further use downstream.<br>
So the pipeline as a Machine Learning Algorithm has functions or methods which are called fit, evaluate and score. Fit basically starts the training, and score gives you back the predicted value.<br>
One advantage is that we can cross-validate, that is you can try out many many parameters using that same very pipeline. And this really accelerates optimisation of the algorithm.<br>
So in summary, pipelines are really facilitating our day to day work in machine learning as we can draw from pre-defined data processing steps, we make sure everything is aligned and we can switch and swap our algorithms as needed. We can create a pipeline and we can use this pipeline in downstream data processing in a process called hyperparameter-tuning for example.

Finally, remember that Dataframes in Apache Spark are always lazy in the sense that if you don't read the data nothing gets executed.

First let's load our spark dependencies

In [2]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.osuosl.org/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz

!pip install -q findspark
!pip install pyspark
# Set up required environment variables

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/87/21/f05c186f4ddb01d15d0ddc36ef4b7e3cedbeb6412274a41f26b55a650ee5/pyspark-2.4.4.tar.gz (215.7MB)
[K     |████████████████████████████████| 215.7MB 63kB/s 
[?25hCollecting py4j==0.10.7
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K     |████████████████████████████████| 204kB 48.8MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-2.4.4-py2.py3-none-any.whl size=216130387 sha256=14d650910ba5ad48f3d7dcca221165179bb471589fcd0ff7b0fce153bb80209e
  Stored in directory: /root/.cache/pip/wheels/ab/09/4d/0d184230058e654eb1b04467dbc1292f00eaa186544604b471
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 pyspark-2.4.4


Now let's initialise a spark context if none exists

In [3]:
from pyspark import SparkConf, SparkContext
try:
    conf = SparkConf().setMaster("local").setAppName("My_App")
    sc = SparkContext(conf = conf)
    print('SparkContext Initialised Successfully!')
except Exception as e:
    print(e)

SparkContext Initialised Successfully!


In [4]:
sc

Next let's initialise a spark session

In [5]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('My_App').getOrCreate()
spark

## Intro to SparkML

Note that the parquet file format uses compression and column store and actually maps data layout to the Apache Spark Tungsten memory layout.

### 1. Data Extraction

In [0]:
# This is the dataset that contains the different folders for reading the accelerometer data
# We will clone this data set
accelerometer_readings = 'https://github.com/wchill/HMP_Dataset.git'

In [7]:
!git clone https://github.com/wchill/HMP_Dataset.git

Cloning into 'HMP_Dataset'...
remote: Enumerating objects: 865, done.[K
remote: Total 865 (delta 0), reused 0 (delta 0), pack-reused 865[K
Receiving objects: 100% (865/865), 1010.96 KiB | 2.11 MiB/s, done.


In [8]:
# Let's list out the folders in the HMP_Dataset
!ls HMP_Dataset

Brush_teeth	Drink_glass  Getup_bed	  Pour_water	 Use_telephone
Climb_stairs	Eat_meat     impdata.py   README.txt	 Walk
Comb_hair	Eat_soup     Liedown_bed  Sitdown_chair
Descend_stairs	final.py     MANUAL.txt   Standup_chair


In [9]:
# Let's have a look at one of the folders
!ls HMP_Dataset/Brush_teeth

Accelerometer-2011-04-11-13-28-18-brush_teeth-f1.txt
Accelerometer-2011-04-11-13-29-54-brush_teeth-f1.txt
Accelerometer-2011-05-30-08-35-11-brush_teeth-f1.txt
Accelerometer-2011-05-30-09-36-50-brush_teeth-f1.txt
Accelerometer-2011-05-30-10-34-16-brush_teeth-m1.txt
Accelerometer-2011-05-30-21-10-57-brush_teeth-f1.txt
Accelerometer-2011-05-30-21-55-04-brush_teeth-m2.txt
Accelerometer-2011-05-31-15-16-47-brush_teeth-f1.txt
Accelerometer-2011-06-02-10-42-22-brush_teeth-f1.txt
Accelerometer-2011-06-02-10-45-50-brush_teeth-f1.txt
Accelerometer-2011-06-06-10-45-27-brush_teeth-f1.txt
Accelerometer-2011-06-06-10-48-05-brush_teeth-f1.txt


let's recursively traverse through those folders in HMP_Dataset and create Apache spark DataFrame from those files and then we just union all dataframes into one overall DataFrame containing all the data.<br>
Let's define the schema of the data frame below

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType

schema = StructType([
                     StructField('x',IntegerType(),True),
                     StructField('y',IntegerType(),True),
                     StructField('z',IntegerType(),True)
])

Now let's import OS for traversing through the data

In [0]:
import os

In [12]:
file_list = os.listdir('HMP_Dataset')
file_list

['Sitdown_chair',
 '.idea',
 'Liedown_bed',
 'impdata.py',
 'Climb_stairs',
 'Use_telephone',
 'Comb_hair',
 'Standup_chair',
 'Eat_soup',
 'README.txt',
 'Pour_water',
 'Eat_meat',
 'Descend_stairs',
 'Drink_glass',
 'MANUAL.txt',
 '.git',
 'final.py',
 'Walk',
 'Getup_bed',
 'Brush_teeth']

Now let's get rid of the folders that do not contain underscores as we don't need those

In [13]:
file_list_filtered = [x for x in file_list if '_' in x]
file_list_filtered

['Sitdown_chair',
 'Liedown_bed',
 'Climb_stairs',
 'Use_telephone',
 'Comb_hair',
 'Standup_chair',
 'Eat_soup',
 'Pour_water',
 'Eat_meat',
 'Descend_stairs',
 'Drink_glass',
 'Getup_bed',
 'Brush_teeth']

Okay so we have all the folders containing data in one array. Now we can iterate over this array.

In [43]:
# First we define an empty data frame that we'd append data to
df = None
# next we import tqdm progress bars to see how our code runs 
from tqdm import tqdm

from pyspark.sql.functions import lit
# The lit library helps us write string literals column to an apache dataframe.

# Now let's iterate through the folders
for category in tqdm(file_list_filtered):
    # Now we traverse all through the files in each folder
    data_files = os.listdir('HMP_Dataset/' + category)
    for data_file in data_files:
        # first let's print it to be sure where we are
        #print(data_file)
        # Now we create a temporary dataframe
        temp_df = spark.read.option('header','false').option('delimiter',' ').csv('HMP_Dataset/'+ category + '/' + data_file, schema=schema)  # we use our defined schema above
        temp_df = temp_df.withColumn('class',lit(category))  # Adding a class column to the dataframe
        temp_df = temp_df.withColumn('source',lit(data_file))  # Adding a source column to the dataframe
        # now we put a condition if df is empty
        if df is None:
            df = temp_df
        else:
            df = df.union(temp_df)  # else union appends the data frames vertically




  0%|          | 0/13 [00:00<?, ?it/s][A[A

  8%|▊         | 1/13 [00:04<00:57,  4.76s/it][A[A

 15%|█▌        | 2/13 [00:06<00:41,  3.75s/it][A[A

 23%|██▎       | 3/13 [00:11<00:42,  4.29s/it][A[A

 31%|███       | 4/13 [00:12<00:29,  3.24s/it][A[A

 38%|███▊      | 5/13 [00:14<00:22,  2.83s/it][A[A

 46%|████▌     | 6/13 [00:21<00:28,  4.04s/it][A[A

 54%|█████▍    | 7/13 [00:21<00:17,  2.89s/it][A[A

 62%|██████▏   | 8/13 [00:28<00:21,  4.21s/it][A[A

 69%|██████▉   | 9/13 [00:29<00:12,  3.07s/it][A[A

 77%|███████▋  | 10/13 [00:32<00:09,  3.14s/it][A[A

 85%|████████▍ | 11/13 [00:41<00:09,  4.89s/it][A[A

 92%|█████████▏| 12/13 [00:53<00:07,  7.01s/it][A[A

100%|██████████| 13/13 [00:54<00:00,  5.25s/it][A[A

[A[A

Let's see the dataframe created from all the files in those folders

In [50]:
df.show(truncate=False)

+---+---+---+-------------+------------------------------------------------------+
|x  |y  |z  |class        |source                                                |
+---+---+---+-------------+------------------------------------------------------+
|12 |37 |36 |Sitdown_chair|Accelerometer-2011-06-02-17-41-43-sitdown_chair-m1.txt|
|12 |36 |35 |Sitdown_chair|Accelerometer-2011-06-02-17-41-43-sitdown_chair-m1.txt|
|12 |38 |35 |Sitdown_chair|Accelerometer-2011-06-02-17-41-43-sitdown_chair-m1.txt|
|13 |37 |35 |Sitdown_chair|Accelerometer-2011-06-02-17-41-43-sitdown_chair-m1.txt|
|12 |36 |36 |Sitdown_chair|Accelerometer-2011-06-02-17-41-43-sitdown_chair-m1.txt|
|11 |38 |35 |Sitdown_chair|Accelerometer-2011-06-02-17-41-43-sitdown_chair-m1.txt|
|12 |38 |35 |Sitdown_chair|Accelerometer-2011-06-02-17-41-43-sitdown_chair-m1.txt|
|13 |38 |35 |Sitdown_chair|Accelerometer-2011-06-02-17-41-43-sitdown_chair-m1.txt|
|13 |37 |34 |Sitdown_chair|Accelerometer-2011-06-02-17-41-43-sitdown_chair-m1.txt|
|12 

Romeo Keinzler usually creates a notebook that does this exercise and he calls it ETL<br>
It means Extract, Transform and Load data to a spark dataframe.

### 2. Data Transformation

Now we need to transform the data and create an integer representation of the class column as ML algorithms cannot cope with a string. So we will transform the class to a number of integers. using the StringIndexer module

In [51]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol = 'class', outputCol = 'classIndex')
indexed = indexer.fit(df).transform(df)  # This is a new data frame

# Let's see it
indexed.show(truncate=False)

+---+---+---+-------------+------------------------------------------------------+----------+
|x  |y  |z  |class        |source                                                |classIndex|
+---+---+---+-------------+------------------------------------------------------+----------+
|12 |37 |36 |Sitdown_chair|Accelerometer-2011-06-02-17-41-43-sitdown_chair-m1.txt|7.0       |
|12 |36 |35 |Sitdown_chair|Accelerometer-2011-06-02-17-41-43-sitdown_chair-m1.txt|7.0       |
|12 |38 |35 |Sitdown_chair|Accelerometer-2011-06-02-17-41-43-sitdown_chair-m1.txt|7.0       |
|13 |37 |35 |Sitdown_chair|Accelerometer-2011-06-02-17-41-43-sitdown_chair-m1.txt|7.0       |
|12 |36 |36 |Sitdown_chair|Accelerometer-2011-06-02-17-41-43-sitdown_chair-m1.txt|7.0       |
|11 |38 |35 |Sitdown_chair|Accelerometer-2011-06-02-17-41-43-sitdown_chair-m1.txt|7.0       |
|12 |38 |35 |Sitdown_chair|Accelerometer-2011-06-02-17-41-43-sitdown_chair-m1.txt|7.0       |
|13 |38 |35 |Sitdown_chair|Accelerometer-2011-06-02-17-41-43

We can see the class index for each class. Good.<br>
So now we do one-hot-encoding

In [52]:
from pyspark.ml.feature import OneHotEncoder

# The OneHotEncoder is a pure transformer object. it does not use the fit()
encoder = OneHotEncoder(inputCol = 'classIndex', outputCol = 'categoryVec')
encoded = encoder.transform(indexed)  # This is a new data frame
encoded.show(truncate=False)

+---+---+---+-------------+------------------------------------------------------+----------+--------------+
|x  |y  |z  |class        |source                                                |classIndex|categoryVec   |
+---+---+---+-------------+------------------------------------------------------+----------+--------------+
|12 |37 |36 |Sitdown_chair|Accelerometer-2011-06-02-17-41-43-sitdown_chair-m1.txt|7.0       |(12,[7],[1.0])|
|12 |36 |35 |Sitdown_chair|Accelerometer-2011-06-02-17-41-43-sitdown_chair-m1.txt|7.0       |(12,[7],[1.0])|
|12 |38 |35 |Sitdown_chair|Accelerometer-2011-06-02-17-41-43-sitdown_chair-m1.txt|7.0       |(12,[7],[1.0])|
|13 |37 |35 |Sitdown_chair|Accelerometer-2011-06-02-17-41-43-sitdown_chair-m1.txt|7.0       |(12,[7],[1.0])|
|12 |36 |36 |Sitdown_chair|Accelerometer-2011-06-02-17-41-43-sitdown_chair-m1.txt|7.0       |(12,[7],[1.0])|
|11 |38 |35 |Sitdown_chair|Accelerometer-2011-06-02-17-41-43-sitdown_chair-m1.txt|7.0       |(12,[7],[1.0])|
|12 |38 |35 |Sitdow

next thing we need to do is to transform our values X, Y, Z into vectors because sparkML only can work on vector objects.<br>
So let's import vectors and vectorAssembler libraries

In [0]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
# VectorAssembler creates vectors from ordinary data types for us

vectorAssembler = VectorAssembler(inputCols = ['x','y','z'], outputCol = 'features')
# Now we use the vectorAssembler object to transform our last updated dataframe
features_vectorized = vectorAssembler.transform(encoded)  # note this is a new df

In [54]:
# Let's see the data
features_vectorized.show(truncate=False)

+---+---+---+-------------+------------------------------------------------------+----------+--------------+----------------+
|x  |y  |z  |class        |source                                                |classIndex|categoryVec   |features        |
+---+---+---+-------------+------------------------------------------------------+----------+--------------+----------------+
|12 |37 |36 |Sitdown_chair|Accelerometer-2011-06-02-17-41-43-sitdown_chair-m1.txt|7.0       |(12,[7],[1.0])|[12.0,37.0,36.0]|
|12 |36 |35 |Sitdown_chair|Accelerometer-2011-06-02-17-41-43-sitdown_chair-m1.txt|7.0       |(12,[7],[1.0])|[12.0,36.0,35.0]|
|12 |38 |35 |Sitdown_chair|Accelerometer-2011-06-02-17-41-43-sitdown_chair-m1.txt|7.0       |(12,[7],[1.0])|[12.0,38.0,35.0]|
|13 |37 |35 |Sitdown_chair|Accelerometer-2011-06-02-17-41-43-sitdown_chair-m1.txt|7.0       |(12,[7],[1.0])|[13.0,37.0,35.0]|
|12 |36 |36 |Sitdown_chair|Accelerometer-2011-06-02-17-41-43-sitdown_chair-m1.txt|7.0       |(12,[7],[1.0])|[12.0,36.0

So we now have the features corresponding to columns x, y, z, but these now are an Apache spark vector object. Which is the correct object for ML.

So the next thing we do now is Normalising the data set.<br>
This makes the range of values in the data set to be between 0 and 1 or -1 and 1 sometimes. The idea is to have all features data within the same range so no one over shadows the other.

In [0]:
from pyspark.ml.feature import Normalizer
normalizer = Normalizer(inputCol = 'features', outputCol = 'features_norm',p=1.0)
normalized_data = normalizer.transform(features_vectorized) # New data frame too.

In [56]:
# Let's see the normalized data
normalized_data.show(truncate=False)

+---+---+---+-------------+------------------------------------------------------+----------+--------------+----------------+-------------------------------------------------------------+
|x  |y  |z  |class        |source                                                |classIndex|categoryVec   |features        |features_norm                                                |
+---+---+---+-------------+------------------------------------------------------+----------+--------------+----------------+-------------------------------------------------------------+
|12 |37 |36 |Sitdown_chair|Accelerometer-2011-06-02-17-41-43-sitdown_chair-m1.txt|7.0       |(12,[7],[1.0])|[12.0,37.0,36.0]|[0.1411764705882353,0.43529411764705883,0.4235294117647059]  |
|12 |36 |35 |Sitdown_chair|Accelerometer-2011-06-02-17-41-43-sitdown_chair-m1.txt|7.0       |(12,[7],[1.0])|[12.0,36.0,35.0]|[0.14457831325301204,0.43373493975903615,0.42168674698795183]|
|12 |38 |35 |Sitdown_chair|Accelerometer-2011-06-02-17-41-43

As seen in the features_norm column, all values have been squashed between 0 and 1.

### Creating The Pipeline

In [0]:
from pyspark.ml import Pipeline
# The Pipeline constructor below takes an array of Pipeline stages we pass to it.
# here we pass the 4 stages above in the right sequence one after another.
pipeline = Pipeline(stages = [indexer,encoder,vectorAssembler,normalizer])

Now let's fit the Pipeline object to our original data frame

In [0]:
model = pipeline.fit(df)

Finally let's transform our data frame using the Pipeline Object

In [0]:
prediction = model.transform(df)

In [60]:
# Let's see the first 20 rows
prediction.show(truncate=False)

+---+---+---+-------------+------------------------------------------------------+----------+--------------+----------------+-------------------------------------------------------------+
|x  |y  |z  |class        |source                                                |classIndex|categoryVec   |features        |features_norm                                                |
+---+---+---+-------------+------------------------------------------------------+----------+--------------+----------------+-------------------------------------------------------------+
|12 |37 |36 |Sitdown_chair|Accelerometer-2011-06-02-17-41-43-sitdown_chair-m1.txt|7.0       |(12,[7],[1.0])|[12.0,37.0,36.0]|[0.1411764705882353,0.43529411764705883,0.4235294117647059]  |
|12 |36 |35 |Sitdown_chair|Accelerometer-2011-06-02-17-41-43-sitdown_chair-m1.txt|7.0       |(12,[7],[1.0])|[12.0,36.0,35.0]|[0.14457831325301204,0.43373493975903615,0.42168674698795183]|
|12 |38 |35 |Sitdown_chair|Accelerometer-2011-06-02-17-41-43

So we see exactly the same data frame as created before from the individual stages have been created using the Pipeline function. <br>Now we can fit and transform our data in one go. This is a really handy function. 

Let's get rid of all the columns we don't need 

In [61]:
# first let's list out the columns we want to drop
cols_to_drop = ['x','y','z','class','source','classIndex','features']

# Next let's use a list comprehension with conditionals to select cols we need
selected_cols = [col for col in prediction.columns if col not in cols_to_drop]

# Let's define a new train_df with only the categoryVec and features_norm cols
df_train = prediction.select(selected_cols)

# Let's see our training dataframe.
df_train.show(truncate=False)

+--------------+-------------------------------------------------------------+
|categoryVec   |features_norm                                                |
+--------------+-------------------------------------------------------------+
|(12,[7],[1.0])|[0.1411764705882353,0.43529411764705883,0.4235294117647059]  |
|(12,[7],[1.0])|[0.14457831325301204,0.43373493975903615,0.42168674698795183]|
|(12,[7],[1.0])|[0.1411764705882353,0.4470588235294118,0.4117647058823529]   |
|(12,[7],[1.0])|[0.15294117647058825,0.43529411764705883,0.4117647058823529] |
|(12,[7],[1.0])|[0.14285714285714285,0.42857142857142855,0.42857142857142855]|
|(12,[7],[1.0])|[0.13095238095238096,0.4523809523809524,0.4166666666666667]  |
|(12,[7],[1.0])|[0.1411764705882353,0.4470588235294118,0.4117647058823529]   |
|(12,[7],[1.0])|[0.1511627906976744,0.4418604651162791,0.4069767441860465]   |
|(12,[7],[1.0])|[0.15476190476190477,0.44047619047619047,0.40476190476190477]|
|(12,[7],[1.0])|[0.14285714285714285,0.4404761904761

So finally, we have our categoryVec column which is the target variable and our features_norm column, which is the feature set for the ML algorithm training.<br>
We have seen how to create Apache spark ML Pipelines from our data set. 