## Project Build an ML Pipeline for Airfoil noise prediction


## Objectives

In this 4 part assignment you will:

- Part 1 Perform ETL activity
  - Load a csv dataset
  - Remove duplicates if any
  - Drop rows with null values if any
  - Make transformations
  - Store the cleaned data in parquet format
- Part 2 Create a  Machine Learning Pipeline
  - Create a machine learning pipeline for prediction
- Part 3 Evaluate the Model
  - Evaluate the model using relevant metrics
- Part 4 Persist the Model 
  - Save the model for future production use
  - Load and verify the stored model


## Datasets

 - The original dataset can be found here NASA airfoil self noise dataset. https://archive.ics.uci.edu/dataset/291/airfoil+self+noise
 
 - This dataset is licensed under a Creative Commons Attribution 4.0 International (CC BY 4.0) license.


Diagram of an airfoil. - For informational purpose


![Airfoil with flow](https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/images/Airfoil_with_flow.png)


Diagram showing the Angle of attack. - For informational purpose


![Airfoil angle of attack](https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/images/Airfoil_angle_of_attack.jpg)


## Setup


### Installing Required Libraries

In [1]:
!pip install pyspark==3.1.2 -q
!pip install findspark -q

### Importing Required Libraries

In [2]:
# You can also use this section to suppress warnings generated by your code:
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')

# FindSpark simplifies the process of using Apache Spark with Python

import findspark
findspark.init()

## Part 1 - Perform ETL activity


### Task 1 - Import required libraries


In [29]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.pipeline import PipelineModel
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import StandardScaler
from pyspark.ml.evaluation import RegressionEvaluator

### Task 2 - Create a spark session


In [4]:
spark = SparkSession.builder.appName("Airfoil Noise Prediction").getOrCreate()

25/08/21 20:18:36 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
,Setting default log level to "WARN".
,To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


### Task 3 - Load the csv file into a dataframe


In [5]:
!wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/datasets/NASA_airfoil_noise_raw.csv

--2025-08-21 20:19:19--  https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/datasets/NASA_airfoil_noise_raw.csv
,Resolving cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)... 169.63.118.104, 169.63.118.104
,Connecting to cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)|169.63.118.104|:443... connected.
,HTTP request sent, awaiting response... 200 OK
,Length: 60682 (59K) [text/csv]
,Saving to: ‘NASA_airfoil_noise_raw.csv.2’
,
,
,2025-08-21 20:19:19 (53.0 MB/s) - ‘NASA_airfoil_noise_raw.csv.2’ saved [60682/60682]
,


In [6]:
airfoil_df = spark.read.csv("NASA_airfoil_noise_raw.csv", header=True, inferSchema=True)

### Task 4 - Print top 5 rows of the dataset


In [7]:
airfoil_df.show(5)

+---------+-------------+-----------+------------------+-----------------------+----------+
,|Frequency|AngleOfAttack|ChordLength|FreeStreamVelocity|SuctionSideDisplacement|SoundLevel|
,+---------+-------------+-----------+------------------+-----------------------+----------+
,|      800|          0.0|     0.3048|              71.3|             0.00266337|   126.201|
,|     1000|          0.0|     0.3048|              71.3|             0.00266337|   125.201|
,|     1250|          0.0|     0.3048|              71.3|             0.00266337|   125.951|
,|     1600|          0.0|     0.3048|              71.3|             0.00266337|   127.591|
,|     2000|          0.0|     0.3048|              71.3|             0.00266337|   127.461|
,+---------+-------------+-----------+------------------+-----------------------+----------+
,only showing top 5 rows
,


### Task 6 - Print the total number of rows in the dataset


In [8]:
rowcount1 = airfoil_df.count()
print(rowcount1)

1522


### Task 7 - Drop all the duplicate rows from the dataset


In [9]:
airfoil_df = airfoil_df.dropDuplicates()

### Task 8 - Print the total number of rows in the dataset


In [10]:
rowcount2 = airfoil_df.count()
print(rowcount2)



1503


                                                                                

### Task 9 - Drop all the rows that contain null values from the dataset


In [11]:
airfoil_df = airfoil_df.dropna()


### Task 10 - Print the total number of rows in the dataset


In [12]:
#your code goes here

rowcount3 = airfoil_df.count()
print(rowcount3)




1499


                                                                                

### Task 11 - Rename the column "SoundLevel" to "SoundLevelDecibels"


In [14]:
airfoil_df = airfoil_df.withColumnRenamed("SoundLevel","SoundLevelDecibels")



### Task 12 - Save the dataframe in parquet format, name the file as "NASA_airfoil_noise_cleaned.parquet"


In [15]:
airfoil_df.write.mode("overwrite").parquet("NASA_airfoil_noise_cleaned.parquet")

airfoil_df.show(5)

[Stage 14:>                                                       (0 + 8) / 200]25/08/21 20:21:37 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
,Scaling row group sizes to 96.54% for 7 writers
,25/08/21 20:21:37 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
,Scaling row group sizes to 84.47% for 8 writers
,25/08/21 20:21:38 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
,Scaling row group sizes to 96.54% for 7 writers
,                                                                                

+---------+-------------+-----------+------------------+-----------------------+------------------+
,|Frequency|AngleOfAttack|ChordLength|FreeStreamVelocity|SuctionSideDisplacement|SoundLevelDecibels|
,+---------+-------------+-----------+------------------+-----------------------+------------------+
,|     4000|          3.0|     0.3048|              31.7|             0.00529514|           115.608|
,|     3150|          2.0|     0.2286|              31.7|             0.00372371|           121.527|
,|     2000|          7.3|     0.2286|              31.7|              0.0132672|           115.309|
,|     2000|          5.4|     0.1524|              71.3|             0.00401199|           131.111|
,|      500|          9.9|     0.1524|              71.3|              0.0193001|           131.279|
,+---------+-------------+-----------+------------------+-----------------------+------------------+
,only showing top 5 rows
,


#### Part 1 - Evaluation


In [16]:
print("Part 1 - Evaluation")

print("Total rows = ", rowcount1)
print("Total rows after dropping duplicate rows = ", rowcount2)
print("Total rows after dropping duplicate rows and rows with null values = ", rowcount3)
print("New column name = ", airfoil_df.columns[-1])

import os

print("NASA_airfoil_noise_cleaned.parquet exists :", os.path.isdir("NASA_airfoil_noise_cleaned.parquet"))

Part 1 - Evaluation
,Total rows =  1522
,Total rows after dropping duplicate rows =  1503
,Total rows after dropping duplicate rows and rows with null values =  1499
,New column name =  SoundLevelDecibels
,NASA_airfoil_noise_cleaned.parquet exists : True


## Part - 2 Create a  Machine Learning Pipeline


### Task 1 - Load data from "NASA_airfoil_noise_cleaned.parquet" into a dataframe


In [17]:
airfoil_df = spark.read.parquet("NASA_airfoil_noise_cleaned.parquet")


### Task 2 - Print the total number of rows in the dataset


In [18]:
rowcount4 = airfoil_df.count()
print(rowcount4)



[Stage 18:>                                                         (0 + 8) / 8]

1499


                                                                                

### Task 3 - Define the VectorAssembler pipeline stage


In [19]:
assembler = VectorAssembler(inputCols=['Frequency','AngleOfAttack','ChordLength','FreeStreamVelocity','SuctionSideDisplacement'], outputCol="features")



### Task 4 - Define the StandardScaler pipeline stage


In [20]:
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")


### Task 5 - Define the Model creation pipeline stage


In [21]:
lr = LinearRegression(featuresCol="scaledFeatures", labelCol="SoundLevelDecibels")


### Task 6 - Build the pipeline


In [22]:
pipeline = Pipeline(stages=[assembler, scaler, lr])


### Task 7 - Split the data


In [25]:
# Split the data into training and testing sets with 70:30 split.
# set the value of seed to 42

(trainingData, testingData) = airfoil_df.randomSplit([0.7, 0.3], seed=42)

### Task 8 - Fit the pipeline


In [26]:
pipelineModel = pipeline.fit(trainingData)

25/08/21 20:25:02 WARN util.Instrumentation: [9d3ce0ed] regParam is zero, which might cause numerical instability and overfitting.
,[Stage 23:>                                                         (0 + 8) / 8]25/08/21 20:25:05 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
,25/08/21 20:25:05 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
,25/08/21 20:25:05 WARN netlib.LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
,25/08/21 20:25:05 WARN netlib.LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
,                                                                                

#### Part 2 - Evaluation


In [27]:
print("Part 2 - Evaluation")
print("Total rows = ", rowcount4)
ps = [str(x).split("_")[0] for x in pipeline.getStages()]

print("Pipeline Stage 1 = ", ps[0])
print("Pipeline Stage 2 = ", ps[1])
print("Pipeline Stage 3 = ", ps[2])

print("Label column = ", lr.getLabelCol())

Part 2 - Evaluation
,Total rows =  1499
,Pipeline Stage 1 =  VectorAssembler
,Pipeline Stage 2 =  StandardScaler
,Pipeline Stage 3 =  LinearRegression
,Label column =  SoundLevelDecibels


## Part 3 - Evaluate the Model


### Task 1 - Predict using the model


In [28]:
# Make predictions on testing data
predictions = pipelineModel.transform(testingData)

### Task 2 - Print the MSE


In [31]:
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="SoundLevelDecibels", metricName="mse")
mse = evaluator.evaluate(predictions)
print(mse)



22.593754071348812


                                                                                

### Task 3 - Print the MAE


In [32]:
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="SoundLevelDecibels", metricName="mae")
mae = evaluator.evaluate(predictions)
print(mae)

[Stage 32:>                                                         (0 + 8) / 8]

3.7336902294631287


                                                                                

### Task 4 - Print the R-Squared(R2)


In [33]:
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="SoundLevelDecibels", metricName="r2")
r2 = evaluator.evaluate(predictions)
print(r2)

[Stage 34:>                                                         (0 + 8) / 8]

0.5426016508689058


                                                                                

#### Part 3 - Evaluation


In [34]:
print("Part 3 - Evaluation")

print("Mean Squared Error = ", round(mse,2))
print("Mean Absolute Error = ", round(mae,2))
print("R Squared = ", round(r2,2))

lrModel = pipelineModel.stages[-1]

print("Intercept = ", round(lrModel.intercept,2))

Part 3 - Evaluation
,Mean Squared Error =  22.59
,Mean Absolute Error =  3.73
,R Squared =  0.54
,Intercept =  132.6


## Part 4 - Persist the Model


### Task 1 - Save the model to the path "Final_Project"


In [35]:
# Save the pipeline model as "Final_Project"
pipelineModel.write().save("Final_Project")

                                                                                

### Task 2 - Load the model from the path "Final_Project"


In [37]:
# Load the pipeline model you have created in the previous step
loadedPipelineModel = PipelineModel.load("Final_Project")

                                                                                

### Task 3 - Make predictions using the loaded model on the testdata


In [38]:
# Use the loaded pipeline model and make predictions using testingData
predictions = loadedPipelineModel.transform(testingData)

### Task 4 - Show the predictions


In [39]:
#show top 5 rows from the predections dataframe. Display only the label column and predictions
predictions.select("SoundLevelDecibels","prediction").show(5)

[Stage 56:>                                                         (0 + 1) / 1]

+------------------+------------------+
,|SoundLevelDecibels|        prediction|
,+------------------+------------------+
,|           127.315|123.64344009624753|
,|           119.975|123.48695788614877|
,|           121.783|124.38983849684254|
,|           127.224|121.44706993294302|
,|           122.229|125.68312652454188|
,+------------------+------------------+
,only showing top 5 rows
,


                                                                                

#### Part 4 - Evaluation


In [40]:
print("Part 4 - Evaluation")

loadedmodel = loadedPipelineModel.stages[-1]
totalstages = len(loadedPipelineModel.stages)
inputcolumns = loadedPipelineModel.stages[0].getInputCols()

print("Number of stages in the pipeline = ", totalstages)
for i,j in zip(inputcolumns, loadedmodel.coefficients):
    print(f"Coefficient for {i} is {round(j,4)}")

Part 4 - Evaluation
,Number of stages in the pipeline =  3
,Coefficient for Frequency is -3.9728
,Coefficient for AngleOfAttack is -2.4775
,Coefficient for ChordLength is -3.3818
,Coefficient for FreeStreamVelocity is 1.5789
,Coefficient for SuctionSideDisplacement is -1.6465


In [41]:
spark.stop()

<!--
## Change Log
-->


<!--
|Date (YYYY-MM-DD)|Version|Changed By|Change Description|
|-|-|-|-|
|2023-05-26|0.1|Ramesh Sannareddy|Initial Version Created|
-->
