# Airfoil Noise Prediction Machine Learning Pipeline Using Pyspark


## Project Description

This project implements a complete machine learning pipeline using **Apache Spark (PySpark)** to process and analyze the NASA Airfoil Self-Noise dataset. It demonstrates how to handle large-scale data, perform data cleaning, apply machine learning, evaluate the model, and persist it—all using PySpark.



## Objectives

- Part 1 Perform ETL activity
  - Load a csv dataset
  - Remove duplicates if any
  - Drop rows with null values if any
  - Make transformations
  - Store the cleaned data in parquet format
- Part 2 Create a  Machine Learning Pipeline
  - Create a machine learning pipeline for prediction
- Part 3 Evaluate the Model
  - Evaluate the model using relevant metrics
- Part 4 Persist the Model 
  - Save the model for future production use
  - Load and verify the stored model


## Datasets

In this lab we will be using dataset(s):

 - The original dataset can be found here NASA airfoil self noise dataset. https://archive.ics.uci.edu/dataset/291/airfoil+self+noise
 
 - This dataset is licensed under a Creative Commons Attribution 4.0 International (CC BY 4.0) license.


Diagram of an airfoil. - For informational purpose


![Airfoil with flow](https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/images/Airfoil_with_flow.png)


Diagram showing the Angle of attack. - For informational purpose


![Airfoil angle of attack](https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/images/Airfoil_angle_of_attack.jpg)


For this lab, we will be using the following libraries:

*   [`PySpark`](https://spark.apache.org/docs/latest/api/python/index.html?utm_medium=Exinfluencer&utm_source=Exinfluencer&utm_content=000026UJ&utm_term=10006555&utm_id=NA-SkillsNetwork-Channel-SkillsNetworkCoursesIBMSkillsNetworkBD0231ENCoursera2789-2023-01-01) for connecting to the Spark Cluster


## Part 1 - Perform ETL activity


### Task 1 - Import required libraries


In [1]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.pipeline import PipelineModel
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import StandardScaler
import findspark
findspark.init()

### Task 2 - Create a spark session


In [2]:
spark = SparkSession.builder.appName("Airfoil Noise Prediction").getOrCreate()

### Task 3 - Load the csv file into a dataframe


Download the data file.

NOTE : Please ensure you use the dataset below and not the original dataset mentioned above.


In [3]:
!curl -O https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/datasets/NASA_airfoil_noise_raw.csv

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed

  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
  0     0    0     0    0     0      0      0 --:--:--  0:00:01 --:--:--     0
100 60682  100 60682    0     0  31213      0  0:00:01  0:00:01 --:--:-- 31231


Load the dataset into the spark dataframe


In [4]:
df = spark.read.csv("NASA_airfoil_noise_raw.csv", header=True, inferSchema=True)

### Task 4 - Print top 5 rows of the dataset


In [5]:
df.show(5)

+---------+-------------+-----------+------------------+-----------------------+----------+
|Frequency|AngleOfAttack|ChordLength|FreeStreamVelocity|SuctionSideDisplacement|SoundLevel|
+---------+-------------+-----------+------------------+-----------------------+----------+
|      800|          0.0|     0.3048|              71.3|             0.00266337|   126.201|
|     1000|          0.0|     0.3048|              71.3|             0.00266337|   125.201|
|     1250|          0.0|     0.3048|              71.3|             0.00266337|   125.951|
|     1600|          0.0|     0.3048|              71.3|             0.00266337|   127.591|
|     2000|          0.0|     0.3048|              71.3|             0.00266337|   127.461|
+---------+-------------+-----------+------------------+-----------------------+----------+
only showing top 5 rows



### Task 6 - Print the total number of rows in the dataset


In [6]:
rowcount1 = df.count()
print(rowcount1)

1522


### Task 7 - Drop all the duplicate rows from the dataset


In [7]:
df = df.dropDuplicates()

### Task 8 - Print the total number of rows in the dataset


In [8]:
rowcount2 = df.count()
print(rowcount2)

1503


### Task 9 - Drop all the rows that contain null values from the dataset


In [9]:
df=df.dropna()

### Task 10 - Print the total number of rows in the dataset


In [10]:
rowcount3 = df.count()
print(rowcount3)

1499


### Task 11 - Rename the column "SoundLevel" to "SoundLevelDecibels"


In [11]:
df = df.withColumnRenamed("SoundLevel","SoundLevelDecibels")

### Task 12 - Save the dataframe in parquet format, name the file as "NASA_airfoil_noise_cleaned.parquet"


In [12]:
df.write.mode("overwrite").parquet("NASA_airfoil_noise_cleaned.parquet")
df

DataFrame[Frequency: int, AngleOfAttack: double, ChordLength: double, FreeStreamVelocity: double, SuctionSideDisplacement: double, SoundLevelDecibels: double]

#### Part 1 - Evaluation


In [13]:
print("Part 1 - Evaluation")

print("Total rows = ", rowcount1)
print("Total rows after dropping duplicate rows = ", rowcount2)
print("Total rows after dropping duplicate rows and rows with null values = ", rowcount3)
print("New column name = ", df.columns[-1])

import os

print("NASA_airfoil_noise_cleaned.parquet exists :", os.path.isdir("NASA_airfoil_noise_cleaned.parquet"))

Part 1 - Evaluation
Total rows =  1522
Total rows after dropping duplicate rows =  1503
Total rows after dropping duplicate rows and rows with null values =  1499
New column name =  SoundLevelDecibels
NASA_airfoil_noise_cleaned.parquet exists : True


## Part - 2 Create a  Machine Learning Pipeline


### Task 1 - Load data from "NASA_airfoil_noise_cleaned.parquet" into a dataframe


In [14]:
df = spark.read.parquet("NASA_airfoil_noise_cleaned.parquet")

### Task 2 - Print the total number of rows in the dataset


In [15]:
rowcount4 = df.count()
print(rowcount4)

1499


### Task 3 - Define the VectorAssembler pipeline stage


Stage 1 - Assemble the input columns into a single column "features". Use all the columns except SoundLevelDecibels as input features.


In [17]:
assembler = VectorAssembler(inputCols=['Frequency','AngleOfAttack','ChordLength','FreeStreamVelocity','SuctionSideDisplacement'], outputCol="features")

### Task 4 - Define the StandardScaler pipeline stage


Stage 2 - Scale the "features" using standard scaler and store in "scaledFeatures" column


In [18]:
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")

### Task 5 - Define the Model creation pipeline stage


Stage 3 - Create a LinearRegression stage to predict "SoundLevelDecibels"

**Note:You need to use the scaledfeatures retreived in the previous step(StandardScaler pipeline stage).**


In [19]:
lr = LinearRegression(featuresCol="scaledFeatures", labelCol="SoundLevelDecibels")

### Task 6 - Build the pipeline


Build a pipeline using the above three stages


In [20]:
pipeline = Pipeline(stages=[assembler, scaler, lr])

### Task 7 - Split the data


In [21]:
# Split the data into training and testing sets with 70:30 split.
# set the value of seed to 42
# the above step is very important. DO NOT set the value of seed to any other value other than 42.

(trainingData, testingData) = df.randomSplit([0.7, 0.3], seed=42)

### Task 8 - Fit the pipeline


In [22]:
pipelineModel = pipeline.fit(trainingData)

#### Part 2 - Evaluation


In [23]:
print("Part 2 - Evaluation")
print("Total rows = ", rowcount4)
ps = [str(x).split("_")[0] for x in pipeline.getStages()]

print("Pipeline Stage 1 = ", ps[0])
print("Pipeline Stage 2 = ", ps[1])
print("Pipeline Stage 3 = ", ps[2])

print("Label column = ", lr.getLabelCol())

Part 2 - Evaluation
Total rows =  1499
Pipeline Stage 1 =  VectorAssembler
Pipeline Stage 2 =  StandardScaler
Pipeline Stage 3 =  LinearRegression
Label column =  SoundLevelDecibels


## Part 3 - Evaluate the Model


### Task 1 - Predict using the model


In [24]:
predictions = pipelineModel.transform(testingData)

### Task 2 - Print the MSE


In [25]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="SoundLevelDecibels", metricName="mse")
mse = evaluator.evaluate(predictions)
print(mse)

24.99766625502418


### Task 3 - Print the MAE


In [27]:
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="SoundLevelDecibels", metricName="mae")
mae = evaluator.evaluate(predictions)
print(mae)

3.9136790958812044


### Task 4 - Print the R-Squared(R2)


In [28]:
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="SoundLevelDecibels", metricName="r2")
r2 = evaluator.evaluate(predictions)
print(r2)

0.4959688408974623


#### Part 3 - Evaluation


In [29]:
print("Part 3 - Evaluation")

print("Mean Squared Error = ", round(mse,2))
print("Mean Absolute Error = ", round(mae,2))
print("R Squared = ", round(r2,2))

lrModel = pipelineModel.stages[-1]

print("Intercept = ", round(lrModel.intercept,2))


Part 3 - Evaluation
Mean Squared Error =  3.91
Mean Absolute Error =  3.91
R Squared =  0.5
Intercept =  132.88


## Part 4 - Persist the Model


### Task 1 - Save the model to the path "Airfoil_Noise_Prediction_Project"


In [30]:
pipelineModel.write().save("Airfoil_Noise_Prediction_Project")

### Task 2 - Load the model from the path "Final_Project"


In [31]:
loadedPipelineModel = PipelineModel.load("Airfoil_Noise_Prediction_Project")

### Task 3 - Make predictions using the loaded model on the testdata


In [32]:
predictions = loadedPipelineModel.transform(testingData)

### Task 4 - Show the predictions


In [33]:
#show top 5 rows from the predections dataframe. Display only the label column and predictions

predictions.select("SoundLevelDecibels","prediction").show(5)

+------------------+------------------+
|SoundLevelDecibels|        prediction|
+------------------+------------------+
|           128.679|122.59722914376778|
|            133.42|127.37968204568838|
|           119.146|130.34077425074506|
|           116.074|131.11016975113537|
|           134.319|127.12627360125096|
+------------------+------------------+
only showing top 5 rows



#### Part 4 - Evaluation


In [34]:
print("Part 4 - Evaluation")

loadedmodel = loadedPipelineModel.stages[-1]
totalstages = len(loadedPipelineModel.stages)
inputcolumns = loadedPipelineModel.stages[0].getInputCols()

print("Number of stages in the pipeline = ", totalstages)
for i,j in zip(inputcolumns, loadedmodel.coefficients):
    print(f"Coefficient for {i} is {round(j,4)}")

Part 4 - Evaluation
Number of stages in the pipeline =  3
Coefficient for Frequency is -3.9906
Coefficient for AngleOfAttack is -2.2881
Coefficient for ChordLength is -3.3269
Coefficient for FreeStreamVelocity is 1.4832
Coefficient for SuctionSideDisplacement is -2.0551


### Stop Spark Session


In [35]:
spark.stop()

<!--
## Change Log
-->


<!--
|Date (YYYY-MM-DD)|Version|Changed By|Change Description|
|-|-|-|-|
|2023-05-26|0.1|Ramesh Sannareddy|Initial Version Created|
-->
