## Building an ML Pipeline for Airfoil noise prediction


## Overview


This project processes a NASA Airfoil Self Noise dataset to build and evaluate a machine learning model that predicts the SoundLevel based on other columns in the dataset. The primary goal is to clean the dataset, create a machine learning pipeline, evaluate the model's performance, and save the trained model for future use.



## Steps

- Part 1: Perform ETL activity
  - Load a csv dataset
  - Remove duplicates if any
  - Drop rows with null values if any
  - Make transformations
  - Store the cleaned data in parquet format
- Part 2: Create a  Machine Learning Pipeline
  - Create a machine learning pipeline for prediction
- Part 3: Evaluate the Model
  - Evaluate the model using relevant metrics
- Part 4: Persist the Model
  - Save the model for future production use
  - Load and verify the stored model


## Datasets

In this lab we will be using dataset(s):

 - The original dataset can be found here NASA airfoil self noise dataset. https://archive.ics.uci.edu/dataset/291/airfoil+self+noise

 - This dataset is licensed under a Creative Commons Attribution 4.0 International (CC BY 4.0) license.


Diagram of an airfoil. - For informational purpose


![Airfoil with flow](https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/images/Airfoil_with_flow.png)


Diagram showing the Angle of attack. - For informational purpose


![Airfoil angle of attack](https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/images/Airfoil_angle_of_attack.jpg)


## Setup


For this lab, we will be using the following libraries:

*   [`PySpark`](https://spark.apache.org/docs/latest/api/python/index.html?utm_medium=Exinfluencer&utm_source=Exinfluencer&utm_content=000026UJ&utm_term=10006555&utm_id=NA-SkillsNetwork-Channel-SkillsNetworkCoursesIBMSkillsNetworkBD0231ENCoursera2789-2023-01-01) for connecting to the Spark Cluster


In [None]:
!pip install pyspark==3.1.2 -q
!pip install findspark -q

### Importing Required Libraries



In [None]:
# Suppress warnings generated by your code:
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')

# FindSpark simplifies the process of using Apache Spark with Python

import findspark
findspark.init()

## Part 1 - Perform ETL activity


### Import required libraries


In [None]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.pipeline import PipelineModel
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import StandardScaler

### Create a spark session


In [None]:
#Create a SparkSession

spark = SparkSession.builder.appName("Airfoil noise prediction").getOrCreate()

24/05/12 16:08:08 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


### Load the csv file into a dataframe


Download the data file.

NOTE : Please ensure you use the dataset below and not the original dataset mentioned above.


In [None]:
!wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/datasets/NASA_airfoil_noise_raw.csv


--2024-05-12 16:08:13--  https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/datasets/NASA_airfoil_noise_raw.csv
Resolving cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)... 169.63.118.104, 169.63.118.104
Connecting to cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)|169.63.118.104|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 60682 (59K) [text/csv]
Saving to: ‘NASA_airfoil_noise_raw.csv.1’


2024-05-12 16:08:13 (59.7 MB/s) - ‘NASA_airfoil_noise_raw.csv.1’ saved [60682/60682]



Load the dataset into the spark dataframe


In [None]:
# Load the dataset

df = spark.read.csv("NASA_airfoil_noise_raw.csv", header=True, inferSchema=True)


                                                                                

### Print top 5 rows of the dataset


In [None]:
df.show(5)

+---------+-------------+-----------+------------------+-----------------------+----------+
|Frequency|AngleOfAttack|ChordLength|FreeStreamVelocity|SuctionSideDisplacement|SoundLevel|
+---------+-------------+-----------+------------------+-----------------------+----------+
|      800|          0.0|     0.3048|              71.3|             0.00266337|   126.201|
|     1000|          0.0|     0.3048|              71.3|             0.00266337|   125.201|
|     1250|          0.0|     0.3048|              71.3|             0.00266337|   125.951|
|     1600|          0.0|     0.3048|              71.3|             0.00266337|   127.591|
|     2000|          0.0|     0.3048|              71.3|             0.00266337|   127.461|
+---------+-------------+-----------+------------------+-----------------------+----------+
only showing top 5 rows



### Print the total number of rows in the dataset


In [None]:
rowcount1 = df.count()
print(rowcount1)

1522


### Drop all the duplicate rows from the dataset


In [None]:
df = df.dropDuplicates()


### Print the total number of rows in the dataset


In [None]:
rowcount2 = df.count()
print(rowcount2)




1503


                                                                                

### Drop all the rows that contain null values from the dataset


In [None]:
df = df.dropna()


### Print the total number of rows in the dataset


In [None]:
rowcount3 = df.count()
print(rowcount3)




1499


                                                                                

### Rename the column "SoundLevel" to "SoundLevelDecibels"Drop


In [None]:
df = df.withColumnRenamed("SoundLevel", "SoundLevelDecibels")


### Save the dataframe in parquet formant, name the file as "NASA_airfoil_noise_cleaned.parquet"


In [None]:
df.write.mode("overwrite").parquet("NASA_airfoil_noise_cleaned.parquet")


[Stage 12:>                                                       (0 + 8) / 200]24/05/12 16:10:30 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
24/05/12 16:10:30 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers
24/05/12 16:10:31 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
                                                                                

## Part - 2 Create a  Machine Learning Pipeline


### Load data from "NASA_airfoil_noise_cleaned.parquet" into a dataframe


In [None]:
df = spark.read.parquet("NASA_airfoil_noise_cleaned.parquet")


### Print the total number of rows in the dataset


In [None]:
rowcount4 = df.count()
print(rowcount4)





1499


                                                                                

### Define the VectorAssembler pipeline stage


Stage 1 - Assemble the input columns into a single column "features". Use all the columns except SoundLevelDecibels as input features.


In [None]:
assembler = VectorAssembler(inputCols=["Frequency", "AngleOfAttack", "ChordLength", "FreeStreamVelocity", \
                                       "SuctionSideDisplacement"], outputCol="features")



### Define the StandardScaler pipeline stage


Stage 2 - Scale the "features" using standard scaler and store in "scaledFeatures" column


In [None]:
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")


### Define the StandardScaler pipeline stage


Stage 3 - Create a LinearRegression stage to predict "SoundLevelDecibels"

**Note:You need to use the scaledfeatures retreived in the previous step.**


In [None]:
lr = LinearRegression(featuresCol="scaledFeatures", labelCol="SoundLevelDecibels")


### Build the pipeline


Build a pipeline using the above three stages


In [None]:
pipeline = Pipeline(stages=[assembler, scaler, lr])


### Split the data


In [None]:
# Split the data into training and testing sets with 70:30 split.
# set the value of seed to 42

(trainingData, testingData) = df.randomSplit([0.7, 0.3], seed=42)



### Fit the pipeline


In [None]:
# Fit the pipeline using the training data

pipelineModel = pipeline.fit(trainingData)


24/05/12 16:12:42 WARN util.Instrumentation: [2273925f] regParam is zero, which might cause numerical instability and overfitting.
[Stage 19:>                                                         (0 + 8) / 8]24/05/12 16:12:44 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
24/05/12 16:12:44 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
24/05/12 16:12:45 WARN netlib.LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
24/05/12 16:12:45 WARN netlib.LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
                                                                                

## Part 3 - Evaluate the Model


### Predict using the model


In [None]:
# Make predictions on testing data

predictions = pipelineModel.transform(testingData)


### Print the MSE


In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="SoundLevelDecibels", metricName="mse")
mse = evaluator.evaluate(predictions)
print(mse)


[Stage 26:>                                                         (0 + 8) / 8]

22.593754071348812


                                                                                

### Print the MAE


In [None]:
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="SoundLevelDecibels", metricName="mae")
mae = evaluator.evaluate(predictions)
print(mae)

### Print the R-Squared(R2)


In [None]:
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="SoundLevelDecibels", metricName="r2")
r2 = evaluator.evaluate(predictions)
print(r2)


[Stage 30:>                                                         (0 + 8) / 8]

0.5426016508689058


                                                                                

## Part 4 - Persist the Model


### Save the model to the path "NASA_Airfoil"


In [None]:
# Save the pipeline model as "NASA_Airfoil"
pipelineModel.write().overwrite().save("NASA_Airfoil")

                                                                                

### Load the model from the path "Final_Project"


In [None]:
# Load the pipeline model you have created in the previous step
loadedPipelineModel = PipelineModel.load("NASA_Airfoil")


### Make predictions using the loaded model on the testdata


In [None]:
# Use the loaded pipeline model and make predictions using testingData
predictions = loadedPipelineModel.transform(testingData)


### Show the predictions


In [None]:
#show top 5 rows from the predections dataframe. Display only the label column and predictions

predictions.show(5)

[Stage 52:>                                                         (0 + 1) / 1]

+---------+-------------+-----------+------------------+-----------------------+------------------+--------------------+--------------------+------------------+
|Frequency|AngleOfAttack|ChordLength|FreeStreamVelocity|SuctionSideDisplacement|SoundLevelDecibels|            features|      scaledFeatures|        prediction|
+---------+-------------+-----------+------------------+-----------------------+------------------+--------------------+--------------------+------------------+
|      200|          9.9|     0.1524|              39.6|              0.0233328|           127.315|[200.0,9.9,0.1524...|[0.06338939572741...|123.64344009624753|
|      200|         15.4|     0.0508|              31.7|              0.0289853|           119.975|[200.0,15.4,0.050...|[0.06338939572741...|123.48695788614877|
|      200|         15.4|     0.0508|              39.6|              0.0282593|           121.783|[200.0,15.4,0.050...|[0.06338939572741...|124.38983849684254|
|      250|         19.7|     0.05

                                                                                

### Stop Spark Session


In [None]:
spark.stop()

This project is part of an IBM course.
