Using the Spark distributed machine learning framework to build a scalable ML data pipeline



*  Setting up Spark in the Google Colaboratory
*   Machine Learning Basic Concepts
*   Preprocessing and Data Transformation using Spark
*   Spark Clustering with pyspark
*   Classification with pyspark
*   Regression methods with pyspark

**What is Apache Spark?**



*   Apache Spark is a unified computing engine and a set of libraries for parallel data processing on computer clusters
*   Spark is the most actively developed open source engine for this task; making it the de facto tool for any developer or data scientist interested in big data.
* Spark supports multiple widely used programming languages (Python, Java, Scala and R), includes libraries for diverse tasks ranging from SQL to streaming and machine learning
* Runs anywhere from a laptop to a cluster of thousands of servers. This makes it an easy system to start with and scale up to big data processing or incredibly large scale.












Setting up Spark 3.0.1 in the Google Colaboratory

We will install the below programs

*   Java 8
*   spark-3.0.1
*   Hadoop3.2
*   Findspark





you can install the LATEST version of Spark using the below set of commands.

In [None]:
# innstall java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# install spark (change the version number if needed)
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

# unzip the spark file to the current folder
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

# set your spark folder to your system path environment. 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"


# install findspark using pip
!pip install -q findspark

**Spark Installation test**
Let us test the installation of spark in our google colab environment.

In [18]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()
# Test the spark 
df = spark.createDataFrame([{"hello": "world"} for x in range(1000)])

df.show(3, False)



+-----+
|hello|
+-----+
|world|
|world|
|world|
+-----+
only showing top 3 rows



In [19]:
# make sure the version of pyspark
import pyspark
print(pyspark.__version__)

3.0.0


Data Preparation and Transformations in Spark

**Normalize Numeric Data**
MinMaxScaler is one of the favorite classes shipped with most machine learning libraries. It scaled the data between 0 and 1.

In [20]:
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors
# Create some dummy feature data
features_df = spark.createDataFrame([
    (1, Vectors.dense([10.0,10000.0,1.0]),),
    (2, Vectors.dense([20.0,30000.0,2.0]),),
    (3, Vectors.dense([30.0,40000.0,3.0]),),
    
],["id", "features"] )
features_df.show()

+---+------------------+
| id|          features|
+---+------------------+
|  1|[10.0,10000.0,1.0]|
|  2|[20.0,30000.0,2.0]|
|  3|[30.0,40000.0,3.0]|
+---+------------------+



In [21]:
# Apply MinMaxScaler transformation
features_scaler = MinMaxScaler(inputCol = "features", outputCol = "sfeatures")
smodel = features_scaler.fit(features_df)
sfeatures_df = smodel.transform(features_df)
sfeatures_df.show()

+---+------------------+--------------------+
| id|          features|           sfeatures|
+---+------------------+--------------------+
|  1|[10.0,10000.0,1.0]|           (3,[],[])|
|  2|[20.0,30000.0,2.0]|[0.5,0.6666666666...|
|  3|[30.0,40000.0,3.0]|       [1.0,1.0,1.0]|
+---+------------------+--------------------+



**Standardize Numeric Data**
StandardScaler is another well-known class written with machine learning libraries. It normalizes the data between -1 and 1 and converts the data into bell-shaped data. You can demean the data and scale to some variance.

In [24]:
from pyspark.ml.feature import  StandardScaler
from pyspark.ml.linalg import Vectors
# Create the dummy data
features_df = spark.createDataFrame([
    (1, Vectors.dense([10.0,10000.0,1.0]),),
    (2, Vectors.dense([20.0,30000.0,2.0]),),
    (3, Vectors.dense([30.0,40000.0,3.0]),),
    
],["id", "features"] )
features_df.show()


+---+------------------+
| id|          features|
+---+------------------+
|  1|[10.0,10000.0,1.0]|
|  2|[20.0,30000.0,2.0]|
|  3|[30.0,40000.0,3.0]|
+---+------------------+



In [25]:
# Apply the StandardScaler model
features_stand_scaler = StandardScaler(inputCol = "features", outputCol = "sfeatures", withStd=True, withMean=True)
stmodel = features_stand_scaler.fit(features_df)
stand_sfeatures_df = stmodel.transform(features_df)
stand_sfeatures_df.show()

+---+------------------+--------------------+
| id|          features|           sfeatures|
+---+------------------+--------------------+
|  1|[10.0,10000.0,1.0]|[-1.0,-1.09108945...|
|  2|[20.0,30000.0,2.0]|[0.0,0.2182178902...|
|  3|[30.0,40000.0,3.0]|[1.0,0.8728715609...|
+---+------------------+--------------------+



**Bucketize Numeric Data**

The real data sets come with various ranges and sometimes it is advisable to transform the data into well-defined buckets before plugging into machine learning algorithms.
Bucketizer class is handy to transform the data into various buckets

In [26]:
from pyspark.ml.feature import  Bucketizer
from pyspark.ml.linalg import Vectors
# Define the splits for buckets
splits = [-float("inf"), -10, 0.0, 10, float("inf")]
b_data = [(-800.0,), (-10.5,), (-1.7,), (0.0,), (8.2,), (90.1,)]
b_df = spark.createDataFrame(b_data, ["features"])
b_df.show()

+--------+
|features|
+--------+
|  -800.0|
|   -10.5|
|    -1.7|
|     0.0|
|     8.2|
|    90.1|
+--------+



In [27]:
# Transforming data into buckets
bucketizer = Bucketizer(splits=splits, inputCol= "features", outputCol="bfeatures")
bucketed_df = bucketizer.transform(b_df)
bucketed_df.show()

+--------+---------+
|features|bfeatures|
+--------+---------+
|  -800.0|      0.0|
|   -10.5|      0.0|
|    -1.7|      1.0|
|     0.0|      2.0|
|     8.2|      2.0|
|    90.1|      3.0|
+--------+---------+



**Tokenize text Data**

Natural Language Processing is one of the main applications of Machine learning. One of the first steps for NLP is tokenizing the text into words or token. We can utilize the Tokenizer class with SparkML to perform this task.

In [28]:
from pyspark.ml.feature import  Tokenizer
sentences_df = spark.createDataFrame([
    (1, "This is an introduction to sparkMlib"),
    (2, "Mlib incluse libraries fro classfication and regression"),
    (3, "It also incluses support for data piple lines"),
    
], ["id", "sentences"])
sentences_df.show()

+---+--------------------+
| id|           sentences|
+---+--------------------+
|  1|This is an introd...|
|  2|Mlib incluse libr...|
|  3|It also incluses ...|
+---+--------------------+



In [29]:
sent_token = Tokenizer(inputCol = "sentences", outputCol = "words")
sent_tokenized_df = sent_token.transform(sentences_df)
sent_tokenized_df.take(10)

[Row(id=1, sentences='This is an introduction to sparkMlib', words=['this', 'is', 'an', 'introduction', 'to', 'sparkmlib']),
 Row(id=2, sentences='Mlib incluse libraries fro classfication and regression', words=['mlib', 'incluse', 'libraries', 'fro', 'classfication', 'and', 'regression']),
 Row(id=3, sentences='It also incluses support for data piple lines', words=['it', 'also', 'incluses', 'support', 'for', 'data', 'piple', 'lines'])]

**Clustering Using PySpark**

Clustering is a machine learning technique where the data is grouped into a reasonable number of classes using the input features. In this section, we study the basic application of clustering techniques using the spark ML framework.

In [30]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans, BisectingKMeans
import glob
# Downloading the clustering dataset
!wget -q 'https://raw.githubusercontent.com/amjadraza/blogs-data/master/spark_ml/clustering_dataset.csv'

In [33]:
#  Load the clustering data stored in csv format using spark
# Read the data.
clustering_file_name ='clustering_dataset.csv'
import pandas as pd
# df = pd.read_csv(clustering_file_name)
cluster_df = spark.read.csv(clustering_file_name, header=True,inferSchema=True)
#cluster_df.show()

In [34]:
#Convert the tabular data into vectorized format using VectorAssembler

# Coverting the input data into features column
vectorAssembler = VectorAssembler(inputCols = ['col1', 'col2', 'col3'], outputCol = "features")
vcluster_df = vectorAssembler.transform(cluster_df)
vcluster_df.show(10)

+----+----+----+--------------+
|col1|col2|col3|      features|
+----+----+----+--------------+
|   7|   4|   1| [7.0,4.0,1.0]|
|   7|   7|   9| [7.0,7.0,9.0]|
|   7|   9|   6| [7.0,9.0,6.0]|
|   1|   6|   5| [1.0,6.0,5.0]|
|   6|   7|   7| [6.0,7.0,7.0]|
|   7|   9|   4| [7.0,9.0,4.0]|
|   7|  10|   6|[7.0,10.0,6.0]|
|   7|   8|   2| [7.0,8.0,2.0]|
|   8|   3|   8| [8.0,3.0,8.0]|
|   4|  10|   5|[4.0,10.0,5.0]|
+----+----+----+--------------+
only showing top 10 rows



Once the data is prepared into the format MLlib can use for models, now we can define and train the clustering algorithm such as K-Means. We can define the number of clusters and initialize the seed as done below.

In [35]:
# Applying the k-means algorithm
kmeans = KMeans().setK(3)
kmeans = kmeans.setSeed(1)
kmodel = kmeans.fit(vcluster_df)

In [37]:
# After training has been finished, let us print the centers.
centers = kmodel.clusterCenters()
print("The location of centers: {}".format(centers))


The location of centers: [array([35.88461538, 31.46153846, 34.42307692]), array([80.        , 79.20833333, 78.29166667]), array([5.12, 5.84, 4.84])]


In [38]:
# Applying Hierarchical Clustering
bkmeans = BisectingKMeans().setK(3)
bkmeans = bkmeans.setSeed(1)
bkmodel = bkmeans.fit(vcluster_df)
bkcneters = bkmodel.clusterCenters()
bkcneters

[array([5.12, 5.84, 4.84]),
 array([35.88461538, 31.46153846, 34.42307692]),
 array([80.        , 79.20833333, 78.29166667])]

Read More about Clustering : https://spark.apache.org/docs/3.0.1/ml-clustering.html

**Classification Using PySpark**

Classification is one of the widely used Machine algorithms and almost every data engineer and data scientist must know about these algorithms. Once the data is loaded and prepared, I will demonstrate three classification algorithms.

NaiveBayes Classification
Multi-Layer Perceptron Classification
Decision Trees Classification

We explore the supervised classification algorithms using IRIS data

In [43]:
# Downloading the clustering data
!wget -q "https://raw.githubusercontent.com/amjadraza/blogs-data/master/spark_ml/iris.csv"
df_iris = pd.read_csv("https://raw.githubusercontent.com/amjadraza/blogs-data/master/spark_ml/iris.csv", header=None)
df_iris.head()

Unnamed: 0,0,1,2,3,4
0,5.1,3.5,1.4,0.2,Iris-setosa
1,4.9,3.0,1.4,0.2,Iris-setosa
2,4.7,3.2,1.3,0.2,Iris-setosa
3,4.6,3.1,1.5,0.2,Iris-setosa
4,5.0,3.6,1.4,0.2,Iris-setosa


In [44]:
from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import  StringIndexer


iris_df = spark.createDataFrame(df_iris)
iris_df.show(5, False)


+---+---+---+---+-----------+
|0  |1  |2  |3  |4          |
+---+---+---+---+-----------+
|5.1|3.5|1.4|0.2|Iris-setosa|
|4.9|3.0|1.4|0.2|Iris-setosa|
|4.7|3.2|1.3|0.2|Iris-setosa|
|4.6|3.1|1.5|0.2|Iris-setosa|
|5.0|3.6|1.4|0.2|Iris-setosa|
+---+---+---+---+-----------+
only showing top 5 rows



In [45]:
# Rename the columns
iris_df = iris_df.select(col("0").alias("sepal_length"),
                         col("1").alias("sepal_width"),
                         col("2").alias("petal_length"),
                         col("3").alias("petal_width"),
                         col("4").alias("species"),
                        )

In [46]:
# Converting the columns into features
vectorAssembler = VectorAssembler(inputCols = ["sepal_length", "sepal_width", "petal_length", "petal_width"],
                                  outputCol = "features")
viris_df = vectorAssembler.transform(iris_df)
viris_df.show(5, False)

+------------+-----------+------------+-----------+-----------+-----------------+
|sepal_length|sepal_width|petal_length|petal_width|species    |features         |
+------------+-----------+------------+-----------+-----------+-----------------+
|5.1         |3.5        |1.4         |0.2        |Iris-setosa|[5.1,3.5,1.4,0.2]|
|4.9         |3.0        |1.4         |0.2        |Iris-setosa|[4.9,3.0,1.4,0.2]|
|4.7         |3.2        |1.3         |0.2        |Iris-setosa|[4.7,3.2,1.3,0.2]|
|4.6         |3.1        |1.5         |0.2        |Iris-setosa|[4.6,3.1,1.5,0.2]|
|5.0         |3.6        |1.4         |0.2        |Iris-setosa|[5.0,3.6,1.4,0.2]|
+------------+-----------+------------+-----------+-----------+-----------------+
only showing top 5 rows



In [47]:
indexer = StringIndexer(inputCol="species", outputCol = "label")
iviris_df = indexer.fit(viris_df).transform(viris_df)
iviris_df.show(2, False)

+------------+-----------+------------+-----------+-----------+-----------------+-----+
|sepal_length|sepal_width|petal_length|petal_width|species    |features         |label|
+------------+-----------+------------+-----------+-----------+-----------------+-----+
|5.1         |3.5        |1.4         |0.2        |Iris-setosa|[5.1,3.5,1.4,0.2]|0.0  |
|4.9         |3.0        |1.4         |0.2        |Iris-setosa|[4.9,3.0,1.4,0.2]|0.0  |
+------------+-----------+------------+-----------+-----------+-----------------+-----+
only showing top 2 rows



**Naive Bayes Classification**

Once the data is prepared, we are ready to apply the first classification algorithm.

In [48]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Create the traing and test splits
splits = iviris_df.randomSplit([0.6,0.4], 1)
splits

[DataFrame[sepal_length: double, sepal_width: double, petal_length: double, petal_width: double, species: string, features: vector, label: double],
 DataFrame[sepal_length: double, sepal_width: double, petal_length: double, petal_width: double, species: string, features: vector, label: double]]

In [50]:
train_df = splits[0]
train_df

DataFrame[sepal_length: double, sepal_width: double, petal_length: double, petal_width: double, species: string, features: vector, label: double]

In [51]:
test_df = splits[1]

In [52]:
# Apply the Naive bayes classifier
nb = NaiveBayes(modelType="multinomial")
nbmodel = nb.fit(train_df)
predictions_df = nbmodel.transform(test_df)
predictions_df.show(1, False)

+------------+-----------+------------+-----------+-----------+-----------------+-----+------------------------------------------------------------+------------------------------------------------------------+----------+
|sepal_length|sepal_width|petal_length|petal_width|species    |features         |label|rawPrediction                                               |probability                                                 |prediction|
+------------+-----------+------------+-----------+-----------+-----------------+-----+------------------------------------------------------------+------------------------------------------------------------+----------+
|4.3         |3.0        |1.1         |0.1        |Iris-setosa|[4.3,3.0,1.1,0.1]|0.0  |[-9.966434726497221,-11.294595492758821,-11.956012812323921]|[0.7134106367667451,0.18902823898426235,0.09756112424899269]|0.0       |
+------------+-----------+------------+-----------+-----------+-----------------+-----+-----------------------------

In [53]:
# Let us Evaluate the trained classifier
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
nbaccuracy = evaluator.evaluate(predictions_df)
nbaccuracy
0.8275862068965517

0.8275862068965517

**Multilayer Perceptron Classification**

The second classifier we will be investigating is a Multi-layer perceptron. In this tutorial, I am not going into details of the optimal MLP network for this problem however in practice, you research the optimal network suitable to the problem in hand.

In [54]:
from pyspark.ml.classification import MultilayerPerceptronClassifier
# Define the MLP Classifier
layers = [4,5,5,3]
mlp = MultilayerPerceptronClassifier(layers = layers, seed=1)
mlp_model = mlp.fit(train_df)
mlp_predictions = mlp_model.transform(test_df)
# Evaluate the MLP classifier
mlp_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
mlp_accuracy = mlp_evaluator.evaluate(mlp_predictions)
mlp_accuracy

0.9827586206896551

**Decision Trees Classification**

Another common classifier in the ML family is the Decision Tree Classifier, in this section, we explore this classifier.

In [55]:
from pyspark.ml.classification import DecisionTreeClassifier
# Define the DT Classifier 
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")
dt_model = dt.fit(train_df)
dt_predictions = dt_model.transform(test_df)
# Evaluate the DT Classifier
dt_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
dt_accuracy = dt_evaluator.evaluate(dt_predictions)
dt_accuracy

0.9827586206896551

**Regression using PySpark**


In this section, we explore the Machine learning models for regression problems using pyspark. Regression models are helpful in predicting future values using past data.
We will use the Combined Cycle Power Plant data set to predict the net hourly electrical output (EP). I have uploaded the data to my GitHub so that users can reproduce the results.

In [56]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
# Read the iris data
df_ccpp = pd.read_csv("https://raw.githubusercontent.com/amjadraza/blogs-data/master/spark_ml/ccpp.csv")
pp_df = spark.createDataFrame(df_ccpp)
pp_df.show(2, False)

+-----+-----+-------+-----+------+
|AT   |V    |AP     |RH   |PE    |
+-----+-----+-------+-----+------+
|14.96|41.76|1024.07|73.17|463.26|
|25.18|62.96|1020.04|59.08|444.37|
+-----+-----+-------+-----+------+
only showing top 2 rows



In [57]:
# Create the feature column using VectorAssembler class
vectorAssembler = VectorAssembler(inputCols =["AT", "V", "AP", "RH"], outputCol = "features")
vpp_df = vectorAssembler.transform(pp_df)
vpp_df.show(2, False)

+-----+-----+-------+-----+------+---------------------------+
|AT   |V    |AP     |RH   |PE    |features                   |
+-----+-----+-------+-----+------+---------------------------+
|14.96|41.76|1024.07|73.17|463.26|[14.96,41.76,1024.07,73.17]|
|25.18|62.96|1020.04|59.08|444.37|[25.18,62.96,1020.04,59.08]|
+-----+-----+-------+-----+------+---------------------------+
only showing top 2 rows



**Linear Regression**

We start with the simplest regression technique i.e. Linear Regression.

In [58]:
# Define and fit Linear Regression
lr = LinearRegression(featuresCol="features", labelCol="PE")
lr_model = lr.fit(vpp_df)
# Print and save the Model output
lr_model.coefficients
lr_model.intercept
lr_model.summary.rootMeanSquaredError

4.557126016749486

**Decision Tree Regression**

In this section, we explore the Decision Tree Regression commonly used in Machine learning.

In [59]:
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator
vpp_df.show(2, False)

+-----+-----+-------+-----+------+---------------------------+
|AT   |V    |AP     |RH   |PE    |features                   |
+-----+-----+-------+-----+------+---------------------------+
|14.96|41.76|1024.07|73.17|463.26|[14.96,41.76,1024.07,73.17]|
|25.18|62.96|1020.04|59.08|444.37|[25.18,62.96,1020.04,59.08]|
+-----+-----+-------+-----+------+---------------------------+
only showing top 2 rows



In [60]:
# Define train and test data split
splits = vpp_df.randomSplit([0.7,0.3])
train_df = splits[0]
test_df = splits[1]

In [61]:
# Define the Decision Tree Model 
dt = DecisionTreeRegressor(featuresCol="features", labelCol="PE")
dt_model = dt.fit(train_df)
dt_predictions = dt_model.transform(test_df)
dt_predictions.show(1, False)

+---+-----+-------+-----+------+-------------------------+-----------------+
|AT |V    |AP     |RH   |PE    |features                 |prediction       |
+---+-----+-------+-----+------+-------------------------+-----------------+
|2.8|39.64|1011.01|82.96|482.66|[2.8,39.64,1011.01,82.96]|485.6813684210526|
+---+-----+-------+-----+------+-------------------------+-----------------+
only showing top 1 row



In [62]:
# Evaluate the Model
dt_evaluator = RegressionEvaluator(labelCol="PE", predictionCol="prediction", metricName="rmse")
dt_rmse = dt_evaluator.evaluate(dt_predictions)
print("The RMSE of Decision Tree regression Model is {}".format(dt_rmse))

The RMSE of Decision Tree regression Model is 4.49520197471713


**Gradient Boosting Decision Tree Regression**

Gradient Boosting is another common choice among ML professionals. Let us try the GBM in this section.

In [63]:
from pyspark.ml.regression import GBTRegressor
# Define the GBT Model
gbt = GBTRegressor(featuresCol="features", labelCol="PE")
gbt_model = gbt.fit(train_df)
gbt_predictions = gbt_model.transform(test_df)
# Evaluate the GBT Model
gbt_evaluator = RegressionEvaluator(labelCol="PE", predictionCol="prediction", metricName="rmse")
gbt_rmse = gbt_evaluator.evaluate(gbt_predictions)
print("The RMSE of GBT Tree regression Model is {}".format(gbt_rmse))

The RMSE of GBT Tree regression Model is 4.00485720548521


References Readings/Links

https://spark.apache.org/docs/latest/ml-features.html

https://spark.apache.org/docs/3.0.1/ml-classification-regression.html#regression

https://spark.apache.org/docs/3.0.1/ml-clustering.html

https://spark.apache.org/docs/3.0.1/ml-classification-regression.html#classification