# Final Project - Build an ML Pipeline for Airfoil noise prediction

### Scenario

You are a data engineer at an aeronautics consulting company. Your company prides itself in being able to efficiently design airfoils for use in planes and sports cars. Data scientists in your office need to work with different algorithms and data in different formats. While they are good at Machine Learning, they count on you to be able to do ETL jobs and build ML pipelines. In this project you will use the modified version of the NASA Airfoil Self Noise dataset. You will clean this dataset, by dropping the duplicate rows, and removing the rows with null values. You will create an ML pipe line to create a model that will predict the SoundLevel based on all the other columns. You will evaluate the model and towards the end you will persist the model.

### Objectives

In this 4 part assignment you will:

- Part 1 Perform ETL activity
  - Load a csv dataset
  - Remove duplicates if any
  - Drop rows with null values if any
  - Make transformations
  - Store the cleaned data in parquet format
- Part 2 Create a  Machine Learning Pipeline
  - Create a machine learning pipeline for prediction
- Part 3 Evaluate the Model
  - Evaluate the model using relevant metrics
- Part 4 Persist the Model 
  - Save the model for future production use
  - Load and verify the stored model


## Datasets

In this lab you will be using dataset(s):

 - The original dataset can be found here NASA airfoil self noise dataset. https://archive.ics.uci.edu/dataset/291/airfoil+self+noise
 
 - This dataset is licensed under a Creative Commons Attribution 4.0 International (CC BY 4.0) license.


Diagram of an airfoil. - For information purpose

![Airfoil with flow](https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/images/Airfoil_with_flow.png)


Diagram showing the Angle of attack. - For information purpose

![Airfoil angle of attack](https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/images/Airfoil_angle_of_attack.jpg)


### Setup


For this lab, we will be using the following libraries:

*   [`PySpark`](https://spark.apache.org/docs/latest/api/python/index.html?utm_medium=Exinfluencer&utm_source=Exinfluencer&utm_content=000026UJ&utm_term=10006555&utm_id=NA-SkillsNetwork-Channel-SkillsNetworkCoursesIBMSkillsNetworkBD0231ENCoursera2789-2023-01-01) for connecting to the Spark Cluster


### Installing Required Libraries

Spark Cluster is pre-installed in the Skills Network Labs environment. However, you need libraries like pyspark and findspark to connect to this cluster.

In [None]:
!curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py
!python3 get-pip.py


  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 2222k  100 2222k    0     0  9272k      0 --:--:-- --:--:-- --:--:-- 9260k
Defaulting to user installation because normal site-packages is not writeable
Collecting pip
  Downloading pip-24.3.1-py3-none-any.whl.metadata (3.7 kB)
Downloading pip-24.3.1-py3-none-any.whl (1.8 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.8/1.8 MB[0m [31m9.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pip
Successfully installed pip-24.3.1

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m21.2.4[0m[39;49m -> [0m[32;49m24.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49m/Library/Developer/CommandLineTools/usr/bin/python3 -m pip install --upgrade pip[0m


In [7]:
!pip install pyspark==3.1.2 -q
!pip install findspark -q

In [8]:
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')

# FindSpark simplifies the process of using Apache Spark with Python

import findspark
findspark.init()

### Importing required libraries

In [10]:
!pip install numpy
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.pipeline import PipelineModel
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import StandardScaler

Defaulting to user installation because normal site-packages is not writeable
Collecting numpy
  Downloading numpy-2.0.2-cp39-cp39-macosx_14_0_x86_64.whl.metadata (60 kB)
Downloading numpy-2.0.2-cp39-cp39-macosx_14_0_x86_64.whl (6.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.9/6.9 MB[0m [31m10.2 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: numpy
Successfully installed numpy-2.0.2


In [12]:
#Creating a SparkSession

spark = SparkSession.builder.appName("Final Project").getOrCreate()

25/01/01 17:37:24 WARN Utils: Your hostname, Abdulhaqs-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.0.112 instead (on interface en0)
25/01/01 17:37:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
25/01/01 17:37:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


### Load the csv file into a dataframe

Download the data file.

In [13]:
!wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/datasets/NASA_airfoil_noise_raw.csv


--2025-01-01 17:38:43--  https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/datasets/NASA_airfoil_noise_raw.csv
Resolving cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)... 169.45.118.108
Connecting to cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)|169.45.118.108|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 60682 (59K) [text/csv]
Saving to: ‘NASA_airfoil_noise_raw.csv’


2025-01-01 17:38:45 (291 KB/s) - ‘NASA_airfoil_noise_raw.csv’ saved [60682/60682]



In [14]:
# Load the dataset 
df = spark.read.csv("NASA_airfoil_noise_raw.csv", header=True, inferSchema=True)

In [15]:
# Top 5 rows of the dataset
df.show(5)

+---------+-------------+-----------+------------------+-----------------------+----------+
|Frequency|AngleOfAttack|ChordLength|FreeStreamVelocity|SuctionSideDisplacement|SoundLevel|
+---------+-------------+-----------+------------------+-----------------------+----------+
|      800|          0.0|     0.3048|              71.3|             0.00266337|   126.201|
|     1000|          0.0|     0.3048|              71.3|             0.00266337|   125.201|
|     1250|          0.0|     0.3048|              71.3|             0.00266337|   125.951|
|     1600|          0.0|     0.3048|              71.3|             0.00266337|   127.591|
|     2000|          0.0|     0.3048|              71.3|             0.00266337|   127.461|
+---------+-------------+-----------+------------------+-----------------------+----------+
only showing top 5 rows



In [16]:
# The total number of rows in the dataset
rowcount1 = df.count()
print(rowcount1)

1522


In [17]:
# Drop all the duplicates rows from the dataset
df = df.dropDuplicates()

In [18]:
rowcount2 = df.count()
print(rowcount2)



1503


                                                                                

In [19]:
df = df.withColumnRenamed("SoundLevel", "SoundLevleDecibels")

In [21]:
df = df.dropna()

In [22]:
rowcount3 = df.count()
print(rowcount3)

1499


In [24]:
df.write.mode("overwrite").parquet("NASA_airfoil_noise_cleaned.parquet")

                                                                                

In [25]:
print("Part 1 - Evaluation")


print("Total rows = ", rowcount1)
print("Total rows after dropping duplicate rows = ", rowcount2)
print("Total rows after dropping duplicates", rowcount3)
print("New coulmn name = ", df.columns[-1])


import os

print("NASA_airfoil_noise_cleaned.parquet exists : ", os.path.isdir("NASA_airfoil_noise_cleaned.parquet"))

Part 1 - Evaluation
Total rows =  1522
Total rows after dropping duplicate rows =  1503
Total rows after dropping duplicates 1499
New coulmn name =  SoundLevleDecibels
NASA_airfoil_noise_cleaned.parquet exists :  True


## Create a Machine Learning Pipeline

In [26]:
df = spark.read.parquet("NASA_airfoil_noise_cleaned.parquet")

In [27]:
rowcount4 = df.count()
print(rowcount4)

1499


In [28]:
df.printSchema()

root
 |-- Frequency: integer (nullable = true)
 |-- AngleOfAttack: double (nullable = true)
 |-- ChordLength: double (nullable = true)
 |-- FreeStreamVelocity: double (nullable = true)
 |-- SuctionSideDisplacement: double (nullable = true)
 |-- SoundLevleDecibels: double (nullable = true)



In [29]:
# Define the VectorAssembler pipeline stage
assembler = VectorAssembler(inputCols=['Frequency', 'AngleOfAttack','ChordLength','FreeStreamVelocity','SuctionSideDisplacement'], outputCol='features')

In [30]:
# Define the StandardScaler pipeline stage
scaler = StandardScaler(inputCol='features', outputCol='scaledFeatures')

In [31]:
lr = LinearRegression(featuresCol='scaledFeatures', labelCol='SoundLevleDecibels')

In [32]:
# Build the pipeline
pipeline = Pipeline(stages=[assembler, scaler, lr])

In [33]:
# Split the data

(trainingData, testingData) = df.randomSplit([0.7,0.3], seed=42)

In [34]:
# Fit the pipeline

pipelineModel = pipeline.fit(trainingData)

25/01/01 18:29:16 WARN Instrumentation: [d5a9a081] regParam is zero, which might cause numerical instability and overfitting.
25/01/01 18:29:16 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
25/01/01 18:29:16 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
25/01/01 18:29:16 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
25/01/01 18:29:16 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK


In [36]:
print("Part 2 - Evaluation")
print("Total rows = ", rowcount4)
ps = [str(x).split("_")[0] for x in pipeline.getStages()]

print("Pipeline Stage 1 = ", ps[0])
print("Pipeline Stage 2 = ", ps[1])
print("Pipeline Stage 3 = ", ps[2])

print("Label column = ", lr.getLabelCol())

Part 2 - Evaluation
Total rows =  1499
Pipeline Stage 1 =  VectorAssembler
Pipeline Stage 2 =  StandardScaler
Pipeline Stage 3 =  LinearRegression
Label column =  SoundLevleDecibels


## Evaluate the Model

In [37]:
# Make predictions
predictions = pipelineModel.transform(testingData)

In [38]:
# Print the MSE
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(labelCol="SoundLevleDecibels",predictionCol="prediction", metricName="mse")
mse = evaluator.evaluate(predictions)
print(mse)

26.433697439255017


In [40]:
# Print the MAE

evaluator = RegressionEvaluator(labelCol="SoundLevleDecibels",predictionCol="prediction", metricName="mae")
mae = evaluator.evaluate(predictions)
print(mae)

4.023274015123241


In [41]:
# Print the R2

evaluator = RegressionEvaluator(labelCol="SoundLevleDecibels",predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(predictions)
print(r2)

0.4603835496398312


In [42]:
print("Part 3 - Evaluation")

print("Mean Squared Error = ", round(mse,2))
print("Mean Absolute Error = ", round(mae,2))
print("R Squared = ", round(r2,2))

lrModel = pipelineModel.stages[-1]

print("Intercept = ", round(lrModel.intercept,2))

Part 3 - Evaluation
Mean Squared Error =  26.43
Mean Absolute Error =  4.02
R Squared =  0.46
Intercept =  133.34


## Persist the Model

In [43]:
# Save the Pipeline model as Final_Project
pipelineModel.write().save("Final_Project")

In [44]:
# Load the model

loaded_model = PipelineModel.load("Final_Project")

In [45]:
# Make predictions on test data

predictions = loaded_model.transform(testingData)

In [46]:
# show top 5 rows from the predictions

predictions.show(5)

+---------+-------------+-----------+------------------+-----------------------+------------------+--------------------+--------------------+------------------+
|Frequency|AngleOfAttack|ChordLength|FreeStreamVelocity|SuctionSideDisplacement|SoundLevleDecibels|            features|      scaledFeatures|        prediction|
+---------+-------------+-----------+------------------+-----------------------+------------------+--------------------+--------------------+------------------+
|      200|         15.4|     0.0508|              31.7|              0.0289853|           119.975|[200.0,15.4,0.050...|[0.06269874852109...|123.23353463806416|
|      200|         17.4|     0.0254|              31.7|              0.0176631|           116.146|[200.0,17.4,0.025...|[0.06269874852109...|125.00149169375442|
|      200|         17.4|     0.0254|              71.3|               0.016104|           112.506|[200.0,17.4,0.025...|[0.06269874852109...|  128.888379038037|
|      200|         19.7|     0.05

In [49]:
print("Part 4 - Evaluation")

loaded_model = pipelineModel.stages[-1]
totalstages = len(pipelineModel.stages)
inputcolumns = pipelineModel.stages[0].getInputCols()

print("Number of stages in the pipeline = ", totalstages)
for i,j in zip(inputcolumns, loaded_model.coefficients):
    print(f"Coefficient for {i} is {round(j,4)}")

Part 4 - Evaluation
Number of stages in the pipeline =  3
Coefficient for Frequency is -4.0785
Coefficient for AngleOfAttack is -2.5234
Coefficient for ChordLength is -3.3974
Coefficient for FreeStreamVelocity is 1.4247
Coefficient for SuctionSideDisplacement is -2.0103


In [50]:
# Stop Spark Session
spark.stop()