<p style="text-align:center">
    <a href="https://skills.network/?utm_medium=Exinfluencer&utm_source=Exinfluencer&utm_content=000026UJ&utm_term=10006555&utm_id=NA-SkillsNetwork-Channel-SkillsNetworkCoursesIBMSkillsNetworkBD0231ENCoursera2789-2023-01-01">
    <img src="https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/assets/logos/SN_web_lightmode.png" width="200" alt="Skills Network Logo">
    </a>
</p>


## Final Project - Build an ML Pipeline for Airfoil noise prediction


Estimated time needed: **90** minutes


## Scenario


You are a data engineer at an aeronautics consulting company. Your company prides itself in being able to efficiently design airfoils for use in planes and sports cars. Data scientists in your office need to work with different algorithms and data in different formats. While they are good at Machine Learning, they count on you to be able to do ETL jobs and build ML pipelines. In this project you will use the modified version of the NASA Airfoil Self Noise dataset. You will clean this dataset, by dropping the duplicate rows, and removing the rows with null values. You will create an ML pipe line to create a model that will predict the SoundLevel based on all the other columns. You will evaluate the model and towards the end you will persist the model.



## Objectives

In this 4 part assignment you will:

- Part 1 Perform ETL activity
  - Load a csv dataset
  - Remove duplicates if any
  - Drop rows with null values if any
  - Make transformations
  - Store the cleaned data in parquet format
- Part 2 Create a  Machine Learning Pipeline
  - Create a machine learning pipeline for prediction
- Part 3 Evaluate the Model
  - Evaluate the model using relevant metrics
- Part 4 Persist the Model 
  - Save the model for future production use
  - Load and verify the stored model


## Datasets

In this lab you will be using dataset(s):

 - The original dataset can be found here NASA airfoil self noise dataset. https://archive.ics.uci.edu/dataset/291/airfoil+self+noise
 
 - This dataset is licensed under a Creative Commons Attribution 4.0 International (CC BY 4.0) license.


Diagram of an airfoil. - For informational purpose


![Airfoil with flow](https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/images/Airfoil_with_flow.png)


Diagram showing the Angle of attack. - For informational purpose


![Airfoil angle of attack](https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/images/Airfoil_angle_of_attack.jpg)


## Before you Start


Before you start attempting this project it is highly recommended that you finish the practice project.


## Setup


For this lab, we will be using the following libraries:

*   [`PySpark`](https://spark.apache.org/docs/latest/api/python/index.html?utm_medium=Exinfluencer&utm_source=Exinfluencer&utm_content=000026UJ&utm_term=10006555&utm_id=NA-SkillsNetwork-Channel-SkillsNetworkCoursesIBMSkillsNetworkBD0231ENCoursera2789-2023-01-01) for connecting to the Spark Cluster


### Installing Required Libraries

Spark Cluster is pre-installed in the Skills Network Labs environment. However, you need libraries like pyspark and findspark to
 connect to this cluster.


The following required libraries are __not__ pre-installed in the Skills Network Labs environment. __You will need to run the following cell__ to install them:


In [None]:
!pip install pyspark==3.1.2 -q
!pip install findspark -q

### Importing Required Libraries

_We recommend you import all required libraries in one place (here):_


In [None]:
# You can also use this section to suppress warnings generated by your code:
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')

# FindSpark simplifies the process of using Apache Spark with Python

import findspark
findspark.init()


## Part 1 - Perform ETL activity


### Task 1 - Import required libraries


In [None]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.pipeline import PipelineModel
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import StandardScaler

### Task 2 - Create a spark session


In [None]:


spark = SparkSession.builder.appName("Final_Project").getOrCreate()

### Task 3 - Load the csv file into a dataframe


Download the data file.

NOTE : Please ensure you use the dataset below and not the original dataset mentioned above.


In [None]:
!wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/datasets/NASA_airfoil_noise_raw.csv


Load the dataset into the spark dataframe


In [None]:
file_path = "NASA_airfoil_noise_raw.csv"

df = spark.read.csv(file_path, header=True, inferSchema=True)

### Task 4 - Print top 5 rows of the dataset


In [18]:
#your code goes here
# Printing top 5 rows of the dataset
df.show(5)


+---------+-------------+-----------+------------------+-----------------------+----------+
|Frequency|AngleOfAttack|ChordLength|FreeStreamVelocity|SuctionSideDisplacement|SoundLevel|
+---------+-------------+-----------+------------------+-----------------------+----------+
|      800|          0.0|     0.3048|              71.3|             0.00266337|   126.201|
|     1000|          0.0|     0.3048|              71.3|             0.00266337|   125.201|
|     1250|          0.0|     0.3048|              71.3|             0.00266337|   125.951|
|     1600|          0.0|     0.3048|              71.3|             0.00266337|   127.591|
|     2000|          0.0|     0.3048|              71.3|             0.00266337|   127.461|
+---------+-------------+-----------+------------------+-----------------------+----------+
only showing top 5 rows



### Task 6 - Print the total number of rows in the dataset


In [19]:


total_rows = df.count()
print("Total number of rows in the dataset:", total_rows)

Total number of rows in the dataset: 1522


### Task 7 - Drop all the duplicate rows from the dataset


In [26]:
df_no_duplicates = df.dropDuplicates()
total_rows_no_duplicates = df_no_duplicates.count()
print("Total number of rows after dropping duplicates:", total_rows_no_duplicates)



Total number of rows after dropping duplicates: 1503


                                                                                

### Task 8 - Print the total number of rows in the dataset


In [27]:
total_rows_no_duplicates = df_no_duplicates.count()
print("Total number of rows after dropping duplicates:", total_rows_no_duplicates)

                                                                                

Total number of rows after dropping duplicates: 1503


### Task 9 - Drop all the rows that contain null values from the dataset


In [28]:
# Drop all rows that contain null values from the dataset
df_no_null = df_no_duplicates.dropna()

# Print the total number of rows after dropping rows with null values
total_rows_no_null = df_no_null.count()
print("Total number of rows after dropping rows with null values:", total_rows_no_null)





Total number of rows after dropping rows with null values: 1499


                                                                                

### Task 10 - Print the total number of rows in the dataset


In [23]:
# Print the total number of rows in the dataset after dropping rows with null values
total_rows_no_null = df_no_null.count()
print("Total number of rows in the dataset after dropping rows with null values:", total_rows_no_null)




Total number of rows in the dataset after dropping rows with null values: 1499


                                                                                

### Task 11 - Rename the column "SoundLevel" to "SoundLevelDecibels"Drop


In [29]:
# Rename the column "SoundLevel" to "SoundLevelDecibels" and drop the original column
df_renamed = df.withColumnRenamed("SoundLevel", "SoundLevelDecibels").drop("SoundLevel")

# Printing the schema to verify column renaming and dropping
df_renamed.printSchema()
df_renamed.columns

root
 |-- Frequency: integer (nullable = true)
 |-- AngleOfAttack: double (nullable = true)
 |-- ChordLength: double (nullable = true)
 |-- FreeStreamVelocity: double (nullable = true)
 |-- SuctionSideDisplacement: double (nullable = true)
 |-- SoundLevelDecibels: double (nullable = true)



['Frequency',
 'AngleOfAttack',
 'ChordLength',
 'FreeStreamVelocity',
 'SuctionSideDisplacement',
 'SoundLevelDecibels']

### Task 12 - Save the dataframe in parquet formant, name the file as "NASA_airfoil_noise_cleaned.parquet"


In [30]:
df_renamed.write.parquet("new_path/NASA_airfoil_noise_cleaned_v6.parquet")


                                                                                

#### Part 1 - Evaluation



Run the code cell below.<br>
Use the answers here to answer the final evaluation quiz in the next section.<br>
If the code throws up any errors, go back and review the code you have written.


In [31]:
rowcount1 = df_renamed.count() # Total rows in the original DataFrame
rowcount2 = df_renamed.dropDuplicates().count() # Total rows after dropping duplicate rows
rowcount3 = df_renamed.dropDuplicates().na.drop().count() # Total rows after dropping duplicate rows and rows with null values


                                                                                

## Part - 2 Create a  Machine Learning Pipeline


### Task 1 - Load data from "NASA_airfoil_noise_cleaned.parquet" into a dataframe


In [32]:
# Load data from Parquet file into a DataFrame
df = spark.read.parquet("new_path/NASA_airfoil_noise_cleaned_v6.parquet")


### Task 2 - Print the total number of rows in the dataset


In [33]:
# Print the total number of rows in the dataset
print("Total number of rows:", df.count())


Total number of rows: 1522


### Task 3 - Define the VectorAssembler pipeline stage


Stage 1 - Assemble the input columns into a single column "features". Use all the columns except SoundLevelDecibels as input features.


In [34]:
from pyspark.ml.feature import VectorAssembler

# Define the input columns (excluding SoundLevelDecibels)
input_cols = [col for col in df.columns if col != "SoundLevelDecibels"]

# Create a VectorAssembler object with handleInvalid="skip"
assembler = VectorAssembler(inputCols=input_cols, outputCol="features", handleInvalid="skip")


### Task 4 - Define the StandardScaler pipeline stage


Stage 2 - Scale the "features" using standard scaler and store in "scaledFeatures" column


In [35]:
from pyspark.ml.feature import StandardScaler

# Define the StandardScaler object
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=True)


### Task 5 - Define the StandardScaler pipeline stage


Stage 3 - Create a LinearRegression stage to predict "SoundLevelDecibels"

**Note:You need to use the scaledfeatures retreived in the previous step.**


In [36]:
from pyspark.ml.regression import LinearRegression

# Create a LinearRegression object
linear_regression = LinearRegression(featuresCol="scaledFeatures", labelCol="SoundLevelDecibels")


### Task 6 - Build the pipeline


Build a pipeline using the above three stages


In [39]:
from pyspark.ml import Pipeline

# Define the stages of the pipeline
stages = [assembler, scaler, linear_regression]

# Create a Pipeline object
pipeline = Pipeline(stages=stages)

### Task 7 - Split the data


In [40]:
# Split the data into training and testing sets (70% training, 30% testing)
train_data, test_data = df.randomSplit([0.7, 0.3], seed=42)


### Task 8 - Fit the pipeline


In [41]:
# Fit the pipeline to the training data
pipeline_model = pipeline.fit(train_data)


24/04/16 18:58:36 WARN util.Instrumentation: [5214cb73] regParam is zero, which might cause numerical instability and overfitting.
24/04/16 18:58:37 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
24/04/16 18:58:37 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
24/04/16 18:58:37 WARN netlib.LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
24/04/16 18:58:37 WARN netlib.LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK


#### Part 2 - Evaluation



Run the code cell below.<br>
Use the answers here to answer the final evaluation quiz in the next section.<br>
If the code throws up any errors, go back and review the code you have written.


In [42]:
# Fit the pipeline to the training data
pipeline_model = pipeline.fit(train_data)

# Calculate the total number of rows in the DataFrame
rowcount4 = df.count()

print("Part 2 - Evaluation")
print("Total rows =", rowcount4)

# Get the stages of the pipeline
ps = [str(x).split("_")[0] for x in pipeline_model.stages]

print("Pipeline Stage 1 =", ps[0])
print("Pipeline Stage 2 =", ps[1])
print("Pipeline Stage 3 =", ps[2])

# Print the label column of the linear regression model
print("Label column =", linear_regression.getLabelCol())

24/04/16 18:58:46 WARN util.Instrumentation: [804e2ceb] regParam is zero, which might cause numerical instability and overfitting.


Part 2 - Evaluation
Total rows = 1522
Pipeline Stage 1 = VectorAssembler
Pipeline Stage 2 = StandardScaler
Pipeline Stage 3 = LinearRegression
Label column = SoundLevelDecibels


## Part 3 - Evaluate the Model


### Task 1 - Predict using the model


In [43]:
# Predict using the fitted pipeline model on the test data
predictions = pipeline_model.transform(test_data)


### Task 2 - Print the MSE


In [None]:
print(predictions.columns)

In [45]:
from pyspark.ml.evaluation import RegressionEvaluator

# Define the evaluator
evaluator = RegressionEvaluator(labelCol="SoundLevelDecibels", predictionCol="prediction", metricName="mse")

# Calculate the MSE
mse = evaluator.evaluate(predictions)

# Print the MSE
print("Mean Squared Error (MSE) on test data =", mse)

Mean Squared Error (MSE) on test data = 24.08531562389462


In [46]:
from pyspark.ml.evaluation import RegressionEvaluator

# Calculate the MSE using RegressionEvaluator
evaluator_mse = RegressionEvaluator(predictionCol="prediction", labelCol="SoundLevelDecibels", metricName="mse")
mse = evaluator_mse.evaluate(predictions)
print("Mean Squared Error (MSE):", mse)

Mean Squared Error (MSE): 24.08531562389462


### Task 3 - Print the MAE


In [47]:
from pyspark.ml.evaluation import RegressionEvaluator

# Define the evaluator
evaluator = RegressionEvaluator(labelCol="SoundLevelDecibels", predictionCol="prediction", metricName="mae")

# Calculate the MAE
mae = evaluator.evaluate(predictions)

# Print the MAE
print("Mean Absolute Error (MAE) on test data =", mae)

Mean Absolute Error (MAE) on test data = 3.8171443447680273


In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

# Create an evaluator for regression metrics
evaluator = RegressionEvaluator(labelCol="SoundLevelDecibels", predictionCol="prediction", metricName="mae")

# Calculate the MAE
mae = evaluator.evaluate(predictions)

# Print the MAE
print("Mean Absolute Error (MAE):", mae)


### Task 4 - Print the R-Squared(R2)


In [48]:
from pyspark.ml.evaluation import RegressionEvaluator

# Define the evaluator
evaluator = RegressionEvaluator(labelCol="SoundLevelDecibels", predictionCol="prediction", metricName="r2")

# Calculate the R2 score
r2 = evaluator.evaluate(predictions)

# Print the R2 score
print("R-squared (R2) on test data =", r2)


R-squared (R2) on test data = 0.4993866299512747


#### Part 3 - Evaluation


In [None]:
# Get the intercept of the linear regression model
intercept = loaded_model_lr.intercept

# Print the intercept
print("Intercept =", round(intercept, 4))


In [50]:
from pyspark.ml.evaluation import RegressionEvaluator

# Calculate the MSE
evaluator_mse = RegressionEvaluator(predictionCol="prediction", labelCol="SoundLevelDecibels", metricName="mse")
mse = evaluator_mse.evaluate(predictions)

# Calculate the MAE
evaluator_mae = RegressionEvaluator(predictionCol="prediction", labelCol="SoundLevelDecibels", metricName="mae")
mae = evaluator_mae.evaluate(predictions)

# Calculate the R2
evaluator_r2 = RegressionEvaluator(predictionCol="prediction", labelCol="SoundLevelDecibels", metricName="r2")
r2 = evaluator_r2.evaluate(predictions)

# Get the linear regression model
lrModel = pipeline_model.stages[-1]

# Print the evaluation metrics and intercept
print("Part 3 - Evaluation")
print("Mean Squared Error =", round(mse, 2))
print("Mean Absolute Error =", round(mae, 2))
print("R Squared =", round(r2, 2))
print("Intercept =", round(lrModel.intercept, 2))


Part 3 - Evaluation
Mean Squared Error = 24.09
Mean Absolute Error = 3.82
R Squared = 0.5
Intercept = 124.64


In [49]:
from pyspark.ml.evaluation import RegressionEvaluator

# Calculate the MSE
evaluator_mse = RegressionEvaluator(predictionCol="prediction", labelCol="SoundLevelDecibels", metricName="mse")
mse = evaluator_mse.evaluate(predictions)

# Calculate the MAE
evaluator_mae = RegressionEvaluator(predictionCol="prediction", labelCol="SoundLevelDecibels", metricName="mae")
mae = evaluator_mae.evaluate(predictions)

# Calculate the R2
evaluator_r2 = RegressionEvaluator(predictionCol="prediction", labelCol="SoundLevelDecibels", metricName="r2")
r2 = evaluator_r2.evaluate(predictions)

# Get the linear regression model
lrModel = pipeline_model.stages[-1]

# Print the evaluation metrics and intercept
print("Part 3 - Evaluation")
print("Mean Squared Error =", round(mse, 2))
print("Mean Absolute Error =", round(mae, 2))
print("R Squared =", round(r2, 2))
print("Intercept =", round(lrModel.intercept, 2))


Part 3 - Evaluation
Mean Squared Error = 24.09
Mean Absolute Error = 3.82
R Squared = 0.5
Intercept = 124.64



Run the code cell below.<br>
Use the answers here to answer the final evaluation quiz in the next section.<br>
If the code throws up any errors, go back and review the code you have written.


In [None]:
print("Part 3 - Evaluation")
print("Mean Squared Error = ", round(mse, 2))
print("Mean Absolute Error = ", round(mae, 2))
print("R Squared = ", round(r2, 2))

lrModel = pipeline_model.stages[-1]

print("Intercept = ", round(lrModel.intercept, 2))


## Part 4 - Persist the Model


### Task 1 - Save the model to the path "Final_Project"


In [None]:

pipeline_model.write().save("Final_Project2")

### Task 2 - Load the model from the path "Final_Project"


In [None]:
from pyspark.ml import PipelineModel

# Load the model from the specified path
loaded_model = PipelineModel.load("Final_Project2")


### Task 3 - Make predictions using the loaded model on the testdata


In [None]:
# Make predictions using the loaded model on the test data
loaded_predictions = loaded_model.transform(test_data)


### Task 4 - Show the predictions


In [None]:
# Show the predictions made by the loaded model
loaded_predictions.select("SoundLevelDecibels", "prediction").show()

#### Part 4 - Evaluation



Run the code cell below.<br>
Use the answers here to answer the final evaluation quiz in the next section.<br>
If the code throws up any errors, go back and review the code you have written.


In [None]:
print("Part 4 - Evaluation")

# Get the final linear regression model from the loaded pipeline model
loaded_model_lr = loaded_model.stages[-1]

# Get the total number of stages in the pipeline
total_stages = len(loaded_model.stages)
print("Number of stages in the pipeline =", total_stages)

# Get the input columns used in the VectorAssembler stage
input_columns = loaded_model.stages[0].getInputCols()

# Get the coefficients of the features
coefficients = loaded_model_lr.coefficients

# Print the coefficients for each feature
for i, j in zip(input_columns, coefficients):
    print(f"Coefficient for {i} is {round(j, 4)}")


In [None]:
print("Part 4 - Evaluation")

loadedmodel = loaded_model.stages[-1]
totalstages = len(loaded_model.stages)
inputcolumns = loaded_model.stages[0].getInputCols()

print("Number of stages in the pipeline = ", totalstages)
for i,j in zip(inputcolumns, loadedmodel.coefficients):
    print(f"Coefficient for {i} is {round(j,4)}")

### Stop Spark Session


In [None]:
spark.stop()

## Authors


[Ramesh Sannareddy](https://www.linkedin.com/in/rsannareddy/?utm_medium=Exinfluencer&utm_source=Exinfluencer&utm_content=000026UJ&utm_term=10006555&utm_id=NA-SkillsNetwork-Channel-SkillsNetworkCoursesIBMBD0231ENSkillsNetwork866-2023-01-01)


### Other Contributors


## Change Log


|Date (YYYY-MM-DD)|Version|Changed By|Change Description|
|-|-|-|-|
|2023-05-26|0.1|Ramesh Sannareddy|Initial Version Created|


Copyright © 2023 IBM Corporation. All rights reserved.
