## Build an ML Pipeline for Airfoil noise prediction


## Scenario


I am a data engineer at an aeronautics consulting company. My company prides itself in being able to efficiently design airfoils for use in planes and sports cars. Data scientists in my office need to work with different algorithms and data in different formats. While they are good at Machine Learning, they count on me to be able to do ETL jobs and build ML pipelines. In this project I will use the modified version of the NASA Airfoil Self Noise dataset. I will clean this dataset, by dropping the duplicate rows, and removing the rows with null values. I will create an ML pipe line to create a model that will predict the SoundLevel based on all the other columns. I will evaluate the model and towards the end I will persist the model.



## Objectives

In this 4 part assignment I will:

- Perform ETL activity
  - Load a csv dataset
  - Remove duplicates if any
  - Drop rows with null values if any
  - Make transformations
  - Store the cleaned data in parquet format
- Create a  Machine Learning Pipeline
  - Create a machine learning pipeline for prediction
- Evaluate the Model
  - Evaluate the model using relevant metrics
- 4 Persist the Model 
  - Save the model for future production use
  - Load and verify the stored model


## Datasets

In this lab I will be using dataset(s):

 - The original dataset can be found here NASA airfoil self noise dataset. https://archive.ics.uci.edu/dataset/291/airfoil+self+noise
 
 - This dataset is licensed under a Creative Commons Attribution 4.0 International (CC BY 4.0) license.


Diagram of an airfoil. - For informational purpose


![Airfoil with flow](https://raw.githubusercontent.com/KhaAzAs/Airfoil_ML_Pipeline/main/Images/Airfoil_with_flow.png)


Diagram showing the Angle of attack. - For informational purpose


![Airfoil angle of attack](https://raw.githubusercontent.com/KhaAzAs/Airfoil_ML_Pipeline/main/Images/Airfoil_angle_of_attack.jpg)


### Installing Required Libraries

In [1]:
!pip install pyspark==3.1.2 -q
!pip install findspark -q

### Importing Required Libraries

In [2]:
# Suppress warnings generated by my code:
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')

# FindSpark simplifies the process of using Apache Spark with Python
import findspark
findspark.init()

## Perform ETL activity


### Import required libraries


In [3]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.pipeline import PipelineModel
from pyspark.ml.feature import VectorAssembler, StringIndexer, StandardScaler
from pyspark.ml.regression import LinearRegression


### Create a spark session


In [4]:
#Create a SparkSession
spark = SparkSession.builder.appName("Airfoil Noise Project").getOrCreate()

23/12/31 09:58:00 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


### Load the csv file into a dataframe


Download the data file.

NOTE : Here we use simplified data, so we don't waste so much time training.


In [5]:
!wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/datasets/NASA_airfoil_noise_raw.csv


--2023-12-31 09:59:47--  https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/datasets/NASA_airfoil_noise_raw.csv
Resolving cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)... 169.63.118.104, 169.63.118.104
Connecting to cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)|169.63.118.104|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 60682 (59K) [text/csv]
Saving to: ‘NASA_airfoil_noise_raw.csv.1’


2023-12-31 09:59:47 (30.8 MB/s) - ‘NASA_airfoil_noise_raw.csv.1’ saved [60682/60682]



Load the dataset into the spark dataframe


In [6]:
# Load the dataset that you have downloaded in the previous task

df = spark.read.csv("NASA_airfoil_noise_raw.csv", header=True, inferSchema=True)


                                                                                

### Print top 5 rows of the dataset


In [7]:
df.show(5)

+---------+-------------+-----------+------------------+-----------------------+----------+
|Frequency|AngleOfAttack|ChordLength|FreeStreamVelocity|SuctionSideDisplacement|SoundLevel|
+---------+-------------+-----------+------------------+-----------------------+----------+
|      800|          0.0|     0.3048|              71.3|             0.00266337|   126.201|
|     1000|          0.0|     0.3048|              71.3|             0.00266337|   125.201|
|     1250|          0.0|     0.3048|              71.3|             0.00266337|   125.951|
|     1600|          0.0|     0.3048|              71.3|             0.00266337|   127.591|
|     2000|          0.0|     0.3048|              71.3|             0.00266337|   127.461|
+---------+-------------+-----------+------------------+-----------------------+----------+
only showing top 5 rows



### Print the total number of rows in the dataset


In [8]:
rowcount1 = df.count()
print(rowcount1)

1522


### Drop all the duplicate rows from the dataset


In [9]:
df = df.dropDuplicates()


### Print the total number of rows in the dataset (After drop duplicates)


In [10]:
rowcount2 = df.count()
print(rowcount2)




1503


                                                                                

### Drop all the rows that contain null values from the dataset


In [11]:
df = df.dropna()


### Print the total number of rows in the dataset (After drop null values)


In [12]:
rowcount3 = df.count()
print(rowcount3)



1499


                                                                                

### Rename the column "SoundLevel" to "SoundLevelDecibels"

In [13]:

df = df.withColumnRenamed("SoundLevel", "SoundLevelDecibels") #TODO


### Save the dataframe in parquet formant, name the file as "NASA_airfoil_noise_cleaned.parquet"


In [None]:

df.write.parquet("NASA_airfoil_noise_cleaned.parquet")


#### ETL Actvity Evaluation


In [17]:
print("ETL Activity Evaluation")

print("Total rows = ", rowcount1)
print("Total rows after dropping duplicate rows = ", rowcount2)
print("Total rows after dropping duplicate rows and rows with null values = ", rowcount3)
print("New column name = ", df.columns[-1])

import os

print("NASA_airfoil_noise_cleaned.parquet exists :", os.path.isdir("NASA_airfoil_noise_cleaned.parquet"))

ETL Activity Evaluation
Total rows =  1522
Total rows after dropping duplicate rows =  1503
Total rows after dropping duplicate rows and rows with null values =  1499
New column name =  SoundLevelDecibels
NASA_airfoil_noise_cleaned.parquet exists : True


## Create a  Machine Learning Pipeline


### Load data from "NASA_airfoil_noise_cleaned.parquet" into a dataframe


In [18]:


df = spark.read.parquet("NASA_airfoil_noise_cleaned.parquet")


### Print the total number of rows in the dataset


In [19]:


rowcount4 = df.count()
print(rowcount4)



[Stage 12:>                                                         (0 + 8) / 8]

1499


                                                                                

In [20]:
df.show(5)

+---------+-------------+-----------+------------------+-----------------------+------------------+
|Frequency|AngleOfAttack|ChordLength|FreeStreamVelocity|SuctionSideDisplacement|SoundLevelDecibels|
+---------+-------------+-----------+------------------+-----------------------+------------------+
|      630|          0.0|     0.3048|              31.7|             0.00331266|           129.095|
|     4000|          0.0|     0.3048|              31.7|             0.00331266|           118.145|
|     4000|          1.5|     0.3048|              39.6|             0.00392107|           117.741|
|      800|          4.0|     0.3048|              71.3|             0.00497773|           131.755|
|     1250|          0.0|     0.2286|              31.7|              0.0027238|           128.805|
+---------+-------------+-----------+------------------+-----------------------+------------------+
only showing top 5 rows



In [21]:
df.printSchema()

root
 |-- Frequency: integer (nullable = true)
 |-- AngleOfAttack: double (nullable = true)
 |-- ChordLength: double (nullable = true)
 |-- FreeStreamVelocity: double (nullable = true)
 |-- SuctionSideDisplacement: double (nullable = true)
 |-- SoundLevelDecibels: double (nullable = true)



### Define the VectorAssembler pipeline stage


Stage 1 - Assemble the input columns into a single column "features". Use all the columns except SoundLevelDecibels as input features.


In [22]:

assembler = VectorAssembler(inputCols=["Frequency","AngleOfAttack","ChordLength","FreeStreamVelocity","SuctionSideDisplacement"], outputCol="features") 



### Define the StandardScaler pipeline stage


Stage 2 - Scale the "features" using standard scaler and store in "scaledFeatures" column


In [23]:

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")


### Define the StandardScaler pipeline stage


Stage 3 - Create a LinearRegression stage to predict "SoundLevelDecibels"


In [24]:

lr = LinearRegression(featuresCol="scaledFeatures", labelCol="SoundLevelDecibels")


### Build the pipeline


Build a pipeline using the above three stages


In [25]:

pipeline = Pipeline(stages=[assembler,scaler,lr]) #TODO


### Split the data


In [26]:
# Split the data into training and testing sets with 70:30 split.
# set the value of seed to 42

(trainingData, testingData) = df.randomSplit([0.7,0.3], seed=42)


### Fit the pipeline


In [27]:
# Fit the pipeline using the training data

pipelineModel = pipeline.fit(trainingData)


23/12/31 10:13:15 WARN util.Instrumentation: [768f9f43] regParam is zero, which might cause numerical instability and overfitting.
[Stage 18:>                                                         (0 + 8) / 8]23/12/31 10:13:18 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
23/12/31 10:13:18 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
23/12/31 10:13:18 WARN netlib.LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
23/12/31 10:13:18 WARN netlib.LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
                                                                                

#### Machine Learning Pipeline Evaluation


In [28]:
print("Machine Learning Pipeline Evaluation")
print("Total rows = ", rowcount4)
ps = [str(x).split("_")[0] for x in pipeline.getStages()]

print("Pipeline Stage 1 = ", ps[0])
print("Pipeline Stage 2 = ", ps[1])
print("Pipeline Stage 3 = ", ps[2])

print("Label column = ", lr.getLabelCol())

Machine Learning Pipeline Evaluation
Total rows =  1499
Pipeline Stage 1 =  VectorAssembler
Pipeline Stage 2 =  StandardScaler
Pipeline Stage 3 =  LinearRegression
Label column =  SoundLevelDecibels


## Evaluate the Model


### Predict using the model


In [29]:
# Make predictions on testing data

predictions = pipelineModel.transform(testingData)


### Print the MSE (Mean Squared Error)

In [31]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="SoundLevelDecibels", metricName="mse") 
mse = evaluator.evaluate(predictions) #TODO
print(mse)


[Stage 27:>                                                         (0 + 8) / 8]

22.593754071348812


                                                                                

### Print the MAE (Mean Absolute Error)


In [32]:

evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="SoundLevelDecibels", metricName="mae")
mae = evaluator.evaluate(predictions) #TODO
print(mae)


[Stage 29:>                                                         (0 + 8) / 8]

3.7336902294631287


                                                                                

### Print the R-Squared(R2)


In [33]:

evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="SoundLevelDecibels", metricName="r2")
r2 = evaluator.evaluate(predictions) #TODO
print(r2)


[Stage 31:>                                                         (0 + 8) / 8]

0.5426016508689058


                                                                                

#### Model Evaluation


In [34]:
print("Model Evaluation")

print("Mean Squared Error = ", round(mse,2))
print("Mean Absolute Error = ", round(mae,2))
print("R Squared = ", round(r2,2))

lrModel = pipelineModel.stages[-1]

print("Intercept = ", round(lrModel.intercept,2))


Model Evaluation
Mean Squared Error =  22.59
Mean Absolute Error =  3.73
R Squared =  0.54
Intercept =  132.6


## Persist the Model


### Save the model to the path "Airfoil_MLPipeline_Project"


In [35]:
# Save the pipeline model as "Airfoil_MLPipeline_Project"

pipelineModel.write().save("Airfoil_MLPipeline_Project")

                                                                                

### Load the model from the path "Airfoil_MLPipeline_Project"


In [36]:
# Load the pipeline model you have created in the previous step
loadedPipelineModel = PipelineModel.load("Airfoil_MLPipeline_Project")

### Make predictions using the loaded model on the testdata


In [37]:
# Use the loaded pipeline model and make predictions using testingData
predictions = loadedPipelineModel.transform(testingData)


### Show the predictions


In [38]:
#show top 5 rows from the predections dataframe. Display only the label column and predictions

predictions.select("SoundLevelDecibels", "prediction").show(5)

[Stage 53:>                                                         (0 + 1) / 1]

+------------------+------------------+
|SoundLevelDecibels|        prediction|
+------------------+------------------+
|           127.315|123.64344009624753|
|           119.975|123.48695788614877|
|           121.783|124.38983849684254|
|           127.224|121.44706993294302|
|           122.229|125.68312652454188|
+------------------+------------------+
only showing top 5 rows



                                                                                

#### Persist Model Evaluation


In [39]:
print("Persist Model Evaluation")

loadedmodel = loadedPipelineModel.stages[-1]
totalstages = len(loadedPipelineModel.stages)
inputcolumns = loadedPipelineModel.stages[0].getInputCols()

print("Number of stages in the pipeline = ", totalstages)
for i,j in zip(inputcolumns, loadedmodel.coefficients):
    print(f"Coefficient for {i} is {round(j,4)}")

Persist Model Evaluation
Number of stages in the pipeline =  3
Coefficient for Frequency is -3.9728
Coefficient for AngleOfAttack is -2.4775
Coefficient for ChordLength is -3.3818
Coefficient for FreeStreamVelocity is 1.5789
Coefficient for SuctionSideDisplacement is -1.6465


### Stop Spark Session


In [40]:
spark.stop()

## Authors


[Khairil Azmi Ashari](https://www.linkedin.com/in/khairil-azmi-ashari-b5a7a311b/)
