<p style="text-align:center">
    <a href="https://skills.network/?utm_medium=Exinfluencer&utm_source=Exinfluencer&utm_content=000026UJ&utm_term=10006555&utm_id=NA-SkillsNetwork-Channel-SkillsNetworkCoursesIBMSkillsNetworkBD0231ENCoursera2789-2023-01-01">
    <img src="https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/assets/logos/SN_web_lightmode.png" width="200" alt="Skills Network Logo">
    </a>
</p>


## ML Pipelines using SparkML


Estimated time needed: **30** minutes


<p style='color: red'>The purpose of this lab is to show you how to use SparkML to create a machine learning pipelines.


## __Table of Contents__

<ol>
  <li>
    <a href="#Objectives">Objectives
    </a>
  </li>
  <li>
    <a href="#Datasets">Datasets
    </a>
  </li>
  <li>
    <a href="#Setup">Setup
    </a>
    <ol>
      <li>
        <a href="#Installing-Required-Libraries">Installing Required Libraries
        </a>
      </li>
      <li>
        <a href="#Importing-Required-Libraries">Importing Required Libraries
        </a>
      </li>
    </ol>
  </li>
  <li>
    <a href="#Examples">Examples
    </a>
    <ol>
    <li>
      <a href="#Task-1---Load-data-set">Task 1 - Load data set
      </a>
    </li>
    <li>
      <a href="#Task-2---Define-pipeline-stages">Task 2 - Define pipeline stages
      </a>
    </li>
    <li>
      <a href="#Task-3---Build-the-pipeline">Task 3 - Build the pipeline
      </a>
    </li>
    <li>
      <a href="#Task-4---Split-the-data">Task 4 - Split the data
      </a>
    </li>
    <li>
      <a href="#Task-5---Fit-the-pipeline">Task 5 - Fit the pipeline
      </a>
    </li>
    <li>
      <a href="#Task-6---Evaluate-the-model">Task 6 - Evaluate the model
      </a>
    </li>
    </ol>
  </li>
  <li>
    <a href="#Exercises">Exercises
    </a>
  </li>
  <ol>
    <li>
      <a href="#Exercise-1---Load-data-set">Exercise 1 - Load data set
      </a>
    </li>
    <li>
      <a href="#Exercise-2---Define-pipeline-stages">Exercise 2 - Define pipeline stages
      </a>
    </li>
    <li>
      <a href="#Exercise-3---Build-the-pipeline">Exercise 3 - Build the pipeline
      </a>
    </li>
    <li>
      <a href="#Exercise-4---Split-the-data">Exercise 4 - Split the data
      </a>
    </li>
    <li>
      <a href="#Exercise-5---Fit-the-pipeline">Exercise 5 - Fit the pipeline
      </a>
    </li>
    <li>
      <a href="#Exercise-6---Evaluate-the-model">Exercise 6 - Evaluate the model
      </a>
    </li>
  </ol>
</ol>


## Objectives

After completing this lab you will be able to:

 - Create a machine learning pipeline.
 - Add stages to the pipeline.
 - Run the pipeline.
 - Create a machine learning pipeline for regression.
 - Create a machine learning pipeline for classification.


## Datasets

In this lab you will be using dataset(s):

 - Modified version of car mileage dataset. Original dataset available at https://archive.ics.uci.edu/ml/datasets/auto+mpg 
  - Modified version of iris dataset. Original dataset available at https://archive.ics.uci.edu/ml/datasets/Iris 


----


## Setup


For this lab, we will be using the following libraries:

*   [`PySpark`](https://spark.apache.org/docs/latest/api/python/index.html?utm_medium=Exinfluencer&utm_source=Exinfluencer&utm_content=000026UJ&utm_term=10006555&utm_id=NA-SkillsNetwork-Channel-SkillsNetworkCoursesIBMSkillsNetworkBD0231ENCoursera2789-2023-01-01) for connecting to the Spark Cluster


### Installing Required Libraries

Spark Cluster is pre-installed in the Skills Network Labs environment. However, you need libraries like pyspark and findspark to connect to this cluster.

If you wish to download this jupyter notebook and run on your local computer, follow the instructions mentioned <a href="https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/labs/Connecting_to_spark_cluster_using_Skills_Network_labs.ipynb">here.</a>



The following required libraries are __not__ pre-installed in the Skills Network Labs environment. __You will need to run the following cell__ to install them:


In [ ]:
!pip install pyspark==3.1.2 -q
!pip install findspark -q

### Importing Required Libraries

_We recommend you import all required libraries in one place (here):_


In [1]:
# You can also use this section to suppress warnings generated by your code:
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')

# FindSpark simplifies the process of using Apache Spark with Python

import findspark
findspark.init()

#import functions/Classes for sparkml

from pyspark.ml.regression import LinearRegression
from pyspark.ml.classification import LogisticRegression

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import StringIndexer

from pyspark.sql import SparkSession


# import functions/Classes for pipeline creation

from pyspark.ml import Pipeline

# import functions/Classes for metrics
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


# Examples


## Task 1 - Load data set


Create SparkSession


In [2]:
#Ignore any warnings by SparkSession command

spark = SparkSession.builder.appName("ML Pipeline Example").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/20 12:27:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Download the data file


In [ ]:
!wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0231EN-SkillsNetwork/datasets/mpg.csv


Load the dataset into the spark dataframe


In [3]:
# using the spark.read.csv function we load the data into a dataframe.
# the header = True mentions that there is a header row in out csv file
# the inferSchema = True, tells spark to automatically find out the data types of the columns.

# Load mpg dataset
mpg_data = spark.read.csv("mpg.csv", header=True, inferSchema=True)


                                                                                

Print the schema of the dataset


In [4]:
mpg_data.printSchema()

root
 |-- MPG: double (nullable = true)
 |-- Cylinders: integer (nullable = true)
 |-- Engine Disp: double (nullable = true)
 |-- Horsepower: integer (nullable = true)
 |-- Weight: integer (nullable = true)
 |-- Accelerate: double (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Origin: string (nullable = true)


Show top 5 rows from the dataset


In [7]:
mpg_data.show(5)

+----+---------+-----------+----------+------+----------+----+--------+
| MPG|Cylinders|Engine Disp|Horsepower|Weight|Accelerate|Year|  Origin|
+----+---------+-----------+----------+------+----------+----+--------+
|15.0|        8|      390.0|       190|  3850|       8.5|  70|American|
|21.0|        6|      199.0|        90|  2648|      15.0|  70|American|
|18.0|        6|      199.0|        97|  2774|      15.5|  70|American|
|16.0|        8|      304.0|       150|  3433|      12.0|  70|American|
|14.0|        8|      455.0|       225|  3086|      10.0|  70|American|
+----+---------+-----------+----------+------+----------+----+--------+


## Task 2 - Define pipeline stages


In [8]:
# Stage 1 - assemble the input columns into a single vector 
vectorAssembler = VectorAssembler(inputCols=["Weight", "Horsepower", "Engine Disp"], outputCol="features")
# Stage 2 - scale the features using standard scaler
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")
# Stage 3 - create a linear regression instance
lr = LinearRegression(featuresCol="scaledFeatures", labelCol="MPG")

## Task 3 - Build the pipeline


In [9]:
# Build the pipeline
# All the stages of the pipeline are mentioned in the order of execution.
pipeline = Pipeline(stages=[vectorAssembler, scaler, lr])

## Task 4 - Split the data


In [10]:
# Split the data into training and testing sets
(trainingData, testData) = mpg_data.randomSplit([0.7, 0.3], seed=42)

## Task 5 - Fit the pipeline


In [12]:
# Fit the pipeline to the training data
# ignore any warnings. The warnings are due to the simplified settings and the security settings of the lab

model = pipeline.fit(trainingData)

24/03/20 12:31:50 WARN Instrumentation: [977913dd] regParam is zero, which might cause numerical instability and overfitting.


## Task 6 - Evaluate the model


Make predictions on the testing data


In [13]:
predictions = model.transform(testData)

Print the rmse value


In [14]:

evaluator = RegressionEvaluator(labelCol="MPG", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) =", rmse)

Root Mean Squared Error (RMSE) = 3.8756646183839334


Stop Spark Session


In [15]:
spark.stop()

# Exercises


### Exercise 1 - Load data set


Create SparkSession with appname "ML Pipeline Exercise"


In [16]:
#Ignore any warnings by SparkSession command

spark = SparkSession.builder.appName("ML Pipeline Exercise").getOrCreate()

<details>
    <summary>Click here for a Hint</summary>
    
Use the SparkSession.builder

</details>


<details>
    <summary>Click here for Solution</summary>

```python
spark = SparkSession.builder.appName("ML Pipeline Exercise").getOrCreate()
```

</details>


Download the iris data set


In [ ]:
!wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0231EN-SkillsNetwork/datasets/iris.csv


Load the dataset into the spark dataframe


In [17]:

iris_data = spark.read.csv("iris.csv", header=True, inferSchema=True)


<details>
    <summary>Click here for a Hint</summary>
    
Use the spark.read.csv method

</details>


<details>
    <summary>Click here for Solution</summary>

```python
iris_data = spark.read.csv("iris.csv", header=True, inferSchema=True)

```

</details>


Print the schema of the dataset


In [18]:
iris_data.printSchema()

root
 |-- SepalLengthCm: double (nullable = true)
 |-- SepalWidthCm: double (nullable = true)
 |-- PetalLengthCm: double (nullable = true)
 |-- PetalWidthCm: double (nullable = true)
 |-- Species: string (nullable = true)


#Notice that the "Species" column is a string column


Show top 5 rows from the dataset


In [27]:
iris_data.show(5)
print(iris_data["Species"].collect())

+-------------+------------+-------------+------------+-----------+
|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|    Species|
+-------------+------------+-------------+------------+-----------+
|          5.1|         3.5|          1.4|         0.2|Iris-setosa|
|          4.9|         3.0|          1.4|         0.2|Iris-setosa|
|          4.7|         3.2|          1.3|         0.2|Iris-setosa|
|          4.6|         3.1|          1.5|         0.2|Iris-setosa|
|          5.0|         3.6|          1.4|         0.2|Iris-setosa|
+-------------+------------+-------------+------------+-----------+


TypeError: 'Column' object is not callable

### Exercise 2 - Define pipeline stages


Stage 1 - Create an indexer stage using StringIndexer that will convert the Species column into a numeric column named "label"


In [28]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="Species", outputCol="label")

<details>
    <summary>Click here for a Hint</summary>
    
Use the StringIndexer with inputcol as species and outcol as label

</details>


<details>
    <summary>Click here for Solution</summary>

```python
indexer = StringIndexer(inputCol="Species", outputCol="label")
```

</details>


Stage 2 - Create a vectorAssembler stage that creates a feature vector named features using "SepalLengthCm", "SepalWidthCm", "PetalLengthCm","PetalWidthCm"


In [30]:

vectorAssembler = VectorAssembler(inputCols=["SepalLengthCm", "SepalWidthCm", "PetalLengthCm","PetalWidthCm"], outputCol="features")


<details>
    <summary>Click here for a Hint</summary>
    
Use the VectorAssembler

</details>


<details>
    <summary>Click here for Solution</summary>

```python
vectorAssembler = VectorAssembler(inputCols=["SepalLengthCm", "SepalWidthCm", "PetalLengthCm","PetalWidthCm"], outputCol="features")
```

</details>


Stage 3 - Create a scaler stage that scales the features using standard scaler, name the output columns as scaledFeatures


In [31]:

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")

<details>
    <summary>Click here for a Hint</summary>
    
Use the StandardScaler

</details>


<details>
    <summary>Click here for Solution</summary>

```python
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")
```

</details>


Stage 4 - Create a logistic regression stage using featuresCol="scaledFeatures", labelCol="label"


In [32]:

classifier = LogisticRegression(featuresCol="scaledFeatures", labelCol="label")

<details>
    <summary>Click here for a Hint</summary>
    
Use the SparkSession.builder

</details>


<details>
    <summary>Click here for Solution</summary>

```python
classifier = LogisticRegression(featuresCol="scaledFeatures", labelCol="label")
```

</details>


### Exercise 3 - Build the pipeline


Build a pipeline with all the four stages created earlier. 


In [38]:
pipeline = Pipeline(stages=[indexer, vectorAssembler, scaler, classifier])

<details>
    <summary>Click here for a Hint</summary>
    
Build the pipeline using the 4 stages created earlier

</details>


<details>
    <summary>Click here for Solution</summary>

```python
pipeline = Pipeline(stages=[indexer,vectorAssembler, scaler, classifier])
```

</details>


### Exercise 4 - Split the data


Split the data into training and testing sets


In [39]:
(trainingData, testData) = iris_data.randomSplit([0.7, 0.3], seed=42)

### Exercise 5 - Fit the pipeline


Fit the pipeline to the training data


In [40]:
model = pipeline.fit(trainingData)

                                                                                

<details>
    <summary>Click here for a Hint</summary>
    
Use the fit method of the pipeline

</details>


<details>
    <summary>Click here for Solution</summary>

```python
model = pipeline.fit(trainingData)
```

</details>


### Exercise 6 - Evaluate the model


Make predictions on the testing data


In [41]:

predictions = model.transform(testData)


<details>
    <summary>Click here for a Hint</summary>
    
Use the transform method of the model

</details>


<details>
    <summary>Click here for Solution</summary>

```python
predictions = model.transform(testData)
```

</details>


# Evaluate model performance


Print the RMSE value


In [42]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy =", accuracy)


Accuracy = 0.9782608695652174


Stop Spark Session


In [43]:
spark.stop()

Congratulations you have completed this lab.<br>


## Authors


[Ramesh Sannareddy](https://www.linkedin.com/in/rsannareddy/?utm_medium=Exinfluencer&utm_source=Exinfluencer&utm_content=000026UJ&utm_term=10006555&utm_id=NA-SkillsNetwork-Channel-SkillsNetworkCoursesIBMBD0231ENSkillsNetwork866-2023-01-01)


### Other Contributors


## Change Log


|Date (YYYY-MM-DD)|Version|Changed By|Change Description|
|-|-|-|-|
|2023-05-04|0.1|Ramesh Sannareddy|Initial Version Created|


Copyright Â© 2023 IBM Corporation. All rights reserved.
