## Final Project - Build an ML Pipeline for Airfoil noise prediction
### Course: IBM Machine Learning with Apache Spark

## Objectives

- Perform ETL activity
  - Load a csv dataset
  - Remove duplicates if any
  - Drop rows with null values if any
  - Make transformations
  - Store the cleaned data in parquet format
- Create a  Machine Learning Pipeline
  - Create a machine learning pipeline for prediction
- Evaluate the Model
  - Evaluate the model using relevant metrics
- Persist the Model 
  - Save the model for future production use
  - Load and verify the stored model


### Installing Required Libraries

In [1]:
!pip install pyspark==3.1.2 -q
!pip install findspark -q

### Importing Required Libraries

In [2]:
#Suppress warnings generated by code:
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')

# FindSpark simplifies the process of using Apache Spark with Python

import findspark
findspark.init()

## 1. Perform ETL activity


### 1.1. Import required libraries


In [3]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.pipeline import PipelineModel
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import StandardScaler

### 1.2. Create a spark session


In [4]:
spark = SparkSession.builder.appName("Final Project").getOrCreate()

25/08/01 19:28:40 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


### 1.3. Load the csv file into a dataframe
#### Dataset: NASA Airfoil Self Noise dataset

In [5]:
!wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/datasets/NASA_airfoil_noise_raw.csv


--2025-08-01 19:29:01--  https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/datasets/NASA_airfoil_noise_raw.csv
Resolving cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)... 169.63.118.104, 169.63.118.104
Connecting to cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)|169.63.118.104|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 60682 (59K) [text/csv]
Saving to: ‘NASA_airfoil_noise_raw.csv.2’


2025-08-01 19:29:01 (50.4 MB/s) - ‘NASA_airfoil_noise_raw.csv.2’ saved [60682/60682]



### 1.4. Load the dataset into the spark dataframe


In [6]:
df = spark.read.csv("NASA_airfoil_noise_raw.csv", header=True, inferSchema=True)

                                                                                

### 1.5. Print top 5 rows of the dataset


In [7]:
df.show(5)

+---------+-------------+-----------+------------------+-----------------------+----------+
|Frequency|AngleOfAttack|ChordLength|FreeStreamVelocity|SuctionSideDisplacement|SoundLevel|
+---------+-------------+-----------+------------------+-----------------------+----------+
|      800|          0.0|     0.3048|              71.3|             0.00266337|   126.201|
|     1000|          0.0|     0.3048|              71.3|             0.00266337|   125.201|
|     1250|          0.0|     0.3048|              71.3|             0.00266337|   125.951|
|     1600|          0.0|     0.3048|              71.3|             0.00266337|   127.591|
|     2000|          0.0|     0.3048|              71.3|             0.00266337|   127.461|
+---------+-------------+-----------+------------------+-----------------------+----------+
only showing top 5 rows



### 1.6. Print the total number of rows in the dataset


In [8]:
rowcount1 = df.count()
print(rowcount1)

1522


### 1.7. Drop all the duplicate rows from the dataset


In [9]:
df = df.dropDuplicates()

### 1.8. Print the total number of rows in the dataset


In [10]:
rowcount2 = df.count()
print(rowcount2)



1503


                                                                                

### 1.9. Drop all the rows that contain null values from the dataset


In [11]:
df = df.dropna()

### 1.10. Print the total number of rows in the dataset


In [12]:
rowcount3 = df.count()
print(rowcount3)



1499


                                                                                

### 1.11. Rename the column "SoundLevel" to "SoundLevelDecibels"


In [13]:
df = df.withColumnRenamed("SoundLevel","SoundLevelDecibels")

### 1.12. Save the dataframe in parquet format, name the file as "NASA_airfoil_noise_cleaned.parquet"


In [14]:
df.write.mode("overwrite").parquet("NASA_airfoil_noise_cleaned.parquet")

[Stage 12:>                                                       (0 + 8) / 200]25/08/01 19:34:13 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
25/08/01 19:34:13 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers
25/08/01 19:34:13 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
                                                                                

### 1.13. Summary


In [15]:
print("Part 1 - Evaluation")

print("Total rows = ", rowcount1)
print("Total rows after dropping duplicate rows = ", rowcount2)
print("Total rows after dropping duplicate rows and rows with null values = ", rowcount3)
print("New column name = ", df.columns[-1])

import os

print("NASA_airfoil_noise_cleaned.parquet exists :", os.path.isdir("NASA_airfoil_noise_cleaned.parquet"))

Part 1 - Evaluation
Total rows =  1522
Total rows after dropping duplicate rows =  1503
Total rows after dropping duplicate rows and rows with null values =  1499
New column name =  SoundLevelDecibels
NASA_airfoil_noise_cleaned.parquet exists : True


## 2. Create a  Machine Learning Pipeline


### 2.1. Load data from "NASA_airfoil_noise_cleaned.parquet" into a dataframe


In [16]:
df = spark.read.parquet("NASA_airfoil_noise_cleaned.parquet")

### 2.2. Print the total number of rows in the dataset


In [17]:
rowcount4 = df.count()
print(rowcount4)

[Stage 14:>                                                         (0 + 8) / 8]

1499


                                                                                

### 2.3. Define the VectorAssembler pipeline stage


#### Assemble the input columns into a single column "features". Use all the columns except SoundLevelDecibels as input features.


In [18]:
assembler = VectorAssembler(inputCols=['Frequency','AngleOfAttack','ChordLength','FreeStreamVelocity','SuctionSideDisplacement'], outputCol="features")

### 2.4. Define the StandardScaler pipeline stage


#### Scale the "features" using standard scaler and store in "scaledFeatures" column


In [19]:
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")

### 2.5. Define the Model creation pipeline stage


#### Create a LinearRegression stage to predict "SoundLevelDecibels"

In [20]:
lr = LinearRegression(featuresCol="scaledFeatures", labelCol="SoundLevelDecibels")

### 2.6. Build the pipeline


In [21]:
pipeline = Pipeline(stages=[assembler, scaler, lr])

### 2.7. Split the data


#### Split the data into training and testing sets with 70:30 split.
#### Set the value to 42.

In [23]:
(trainingData, testingData) = df.randomSplit([0.7, 0.3], seed=42)

### 2.8. Fit the pipeline


In [24]:
pipelineModel = pipeline.fit(trainingData)

25/08/01 19:44:07 WARN util.Instrumentation: [934a9aac] regParam is zero, which might cause numerical instability and overfitting.
[Stage 19:>                                                         (0 + 8) / 8]25/08/01 19:44:10 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
25/08/01 19:44:10 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
25/08/01 19:44:10 WARN netlib.LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
25/08/01 19:44:10 WARN netlib.LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
                                                                                

### 2.9. Summary


In [25]:
print("Part 2 - Evaluation")
print("Total rows = ", rowcount4)
ps = [str(x).split("_")[0] for x in pipeline.getStages()]

print("Pipeline Stage 1 = ", ps[0])
print("Pipeline Stage 2 = ", ps[1])
print("Pipeline Stage 3 = ", ps[2])

print("Label column = ", lr.getLabelCol())

Part 2 - Evaluation
Total rows =  1499
Pipeline Stage 1 =  VectorAssembler
Pipeline Stage 2 =  StandardScaler
Pipeline Stage 3 =  LinearRegression
Label column =  SoundLevelDecibels


## 3. Evaluate the Model


### 3.1. Predict using the model


In [26]:
predictions = pipelineModel.transform(testingData)

### 3.2. Print the MSE


In [27]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="SoundLevelDecibels", metricName="mse")
mse = evaluator.evaluate(predictions)
print(mse)

[Stage 26:>                                                         (0 + 8) / 8]

22.593754071348812


                                                                                

### 3.3. Print the MAE


In [28]:
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="SoundLevelDecibels", metricName="mae")
mae = evaluator.evaluate(predictions)
print(mae)

[Stage 28:>                                                         (0 + 8) / 8]

3.7336902294631287


                                                                                

### 3.4. Print the R-Squared(R2)


In [29]:
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="SoundLevelDecibels", metricName="r2")
r2 = evaluator.evaluate(predictions)
print(r2)

[Stage 30:>                                                         (0 + 8) / 8]

0.5426016508689058


                                                                                

### 3.5. Summary


In [30]:
print("Part 3 - Evaluation")

print("Mean Squared Error = ", round(mse,2))
print("Mean Absolute Error = ", round(mae,2))
print("R Squared = ", round(r2,2))

lrModel = pipelineModel.stages[-1]

print("Intercept = ", round(lrModel.intercept,2))


Part 3 - Evaluation
Mean Squared Error =  22.59
Mean Absolute Error =  3.73
R Squared =  0.54
Intercept =  132.6


## 4. Persist the Model


### 4.1. Save the model to the path "Final_Project"


In [32]:
pipelineModel.write().save("Final_Project")

Py4JJavaError: An error occurred while calling o430.save.
: java.io.IOException: Path Final_Project already exists. To overwrite it, please use write.overwrite().save(path) for Scala and use write().overwrite().save(path) for Java and Python.
	at org.apache.spark.ml.util.FileSystemOverwrite.handleOverwrite(ReadWrite.scala:702)
	at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:179)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:745)


### 4.2. Load the model from the path "Final_Project"


In [33]:
loadedPipelineModel = PipelineModel.load("Final_Project")

### 4.3. Make predictions using the loaded model on the testdata


In [34]:
predictions = loadedPipelineModel.transform(testingData)

### 4.4. Show the predictions


In [35]:
predictions.select("SoundLevelDecibels","prediction").show(5)

[Stage 44:>                                                         (0 + 1) / 1]

+------------------+------------------+
|SoundLevelDecibels|        prediction|
+------------------+------------------+
|           127.315|123.64344009624753|
|           119.975|123.48695788614877|
|           121.783|124.38983849684254|
|           127.224|121.44706993294302|
|           122.229|125.68312652454188|
+------------------+------------------+
only showing top 5 rows



                                                                                

### 4.5. Summary


In [36]:
print("Part 4 - Evaluation")

loadedmodel = loadedPipelineModel.stages[-1]
totalstages = len(loadedPipelineModel.stages)
inputcolumns = loadedPipelineModel.stages[0].getInputCols()

print("Number of stages in the pipeline = ", totalstages)
for i,j in zip(inputcolumns, loadedmodel.coefficients):
    print(f"Coefficient for {i} is {round(j,4)}")

Part 4 - Evaluation
Number of stages in the pipeline =  3
Coefficient for Frequency is -3.9728
Coefficient for AngleOfAttack is -2.4775
Coefficient for ChordLength is -3.3818
Coefficient for FreeStreamVelocity is 1.5789
Coefficient for SuctionSideDisplacement is -1.6465


### 5. Stop Spark Session


In [37]:
spark.stop()

<!--
## Change Log
-->


<!--
|Date (YYYY-MM-DD)|Version|Changed By|Change Description|
|-|-|-|-|
|2023-05-26|0.1|Ramesh Sannareddy|Initial Version Created|
-->
