<p style="text-align:center">
    <a href="https://skills.network/?utm_medium=Exinfluencer&utm_source=Exinfluencer&utm_content=000026UJ&utm_term=10006555&utm_id=NA-SkillsNetwork-Channel-SkillsNetworkCoursesIBMSkillsNetworkBD0231ENCoursera2789-2023-01-01">
    <img src="https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/assets/logos/SN_web_lightmode.png" width="200" alt="Skills Network Logo">
    </a>
</p>


## Building a Machine Learning Pipeline for Airfoil Noise Prediction

## Scenario


You are a data engineer at an aeronautics consulting company. Your company prides itself in being able to efficiently design airfoils for use in planes and sports cars. Data scientists in your office need to work with different algorithms and data in different formats. While they are good at Machine Learning, they count on you to be able to do ETL jobs and build ML pipelines. In this project you will use the modified version of the NASA Airfoil Self Noise dataset. You will clean this dataset, by dropping the duplicate rows, and removing the rows with null values. You will create an ML pipe line to create a model that will predict the SoundLevel based on all the other columns. You will evaluate the model and towards the end you will persist the model.



# Contents

1. Perform ETL activity
    - Load the CSV File into A Dataframe
    - Duplicates
    - Missing Values
    - Transformation
    - Save the Cleaned Dataset
    - Part 1 Evaluation
2. Create a  Machine Learning Pipeline
    - Load the Dataset from the Saved File
    - Define the Pipeline Stages
    - Build the pipeline
    - Train/Test Split
    - Model Training
    - Part 2 Evaluation
3. Evaluate the Model
    - Prediction
    - Evaluation of the Model
    - Part 3 Evaluation
4. Persist the Model
    - Save the Model
    - Load the Model
    - Make predictions with the Loaded Model on the Testdata
    - Compare the Predictions from the Loaded Model and the Original Model
    - Part 4 Evaluation

## Datasets

 - The original dataset can be found here NASA airfoil self noise dataset. https://archive.ics.uci.edu/dataset/291/airfoil+self+noise
 
 - This dataset is licensed under a Creative Commons Attribution 4.0 International (CC BY 4.0) license.


Diagram of an airfoil. - For informational purpose


![Airfoil with flow](https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/images/Airfoil_with_flow.png)


Diagram showing the Angle of attack. - For informational purpose


![Airfoil angle of attack](https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/images/Airfoil_angle_of_attack.jpg)


### Installing Required Libraries

In [1]:
!pip install pyspark==3.1.2 -q
!pip install findspark -q

### Initialization

In [2]:
# Suppress warnings
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')

# FindSpark simplifies the process of using Apache Spark with Python
import findspark
findspark.init()

### Importing Required Libraries

In [3]:
# Data manipulation and preprocessing
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, count, col, sum
import os

# Machine learning pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline

# Model evaluation
from pyspark.ml.evaluation import RegressionEvaluator

# Model Persistence
from pyspark.ml.pipeline import PipelineModel

## 1. Perform ETL activity

**Load the CSV File into A Dataframe**

In [4]:
# Download the dataset
!wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/datasets/NASA_airfoil_noise_raw.csv

--2024-02-12 09:19:45--  https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/datasets/NASA_airfoil_noise_raw.csv
Resolving cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)... 169.63.118.104, 169.63.118.104
Connecting to cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)|169.63.118.104|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 60682 (59K) [text/csv]
Saving to: ‘NASA_airfoil_noise_raw.csv.19’


2024-02-12 09:19:45 (40.8 MB/s) - ‘NASA_airfoil_noise_raw.csv.19’ saved [60682/60682]



In [5]:
# Create a SparkSession
spark = SparkSession.builder.appName('NASA').getOrCreate()

24/02/12 09:19:47 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/12 09:19:50 WARN util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [6]:
# Load the dataset
df = spark.read.csv('NASA_airfoil_noise_raw.csv', header=True, inferSchema=True)

                                                                                

In [7]:
# Show the first five rows
df.show(5)

+---------+-------------+-----------+------------------+-----------------------+----------+
|Frequency|AngleOfAttack|ChordLength|FreeStreamVelocity|SuctionSideDisplacement|SoundLevel|
+---------+-------------+-----------+------------------+-----------------------+----------+
|      800|          0.0|     0.3048|              71.3|             0.00266337|   126.201|
|     1000|          0.0|     0.3048|              71.3|             0.00266337|   125.201|
|     1250|          0.0|     0.3048|              71.3|             0.00266337|   125.951|
|     1600|          0.0|     0.3048|              71.3|             0.00266337|   127.591|
|     2000|          0.0|     0.3048|              71.3|             0.00266337|   127.461|
+---------+-------------+-----------+------------------+-----------------------+----------+
only showing top 5 rows



In [8]:
# Print the shcema
df.printSchema()

root
 |-- Frequency: integer (nullable = true)
 |-- AngleOfAttack: double (nullable = true)
 |-- ChordLength: double (nullable = true)
 |-- FreeStreamVelocity: double (nullable = true)
 |-- SuctionSideDisplacement: double (nullable = true)
 |-- SoundLevel: double (nullable = true)



In [9]:
# Find the total number of rows
rowcount1 = df.count()
print(rowcount1)

[Stage 3:>                                                          (0 + 1) / 1]

1522


                                                                                

**Duplicates**

In [10]:
# Check for duplicates
duplicates_df = df.exceptAll(df.dropDuplicates())
duplicates_count = duplicates_df.count()

duplicates_df.show()
print('Total duplicated rows:', duplicates_count)



+---------+-------------+-----------+------------------+-----------------------+----------+
|Frequency|AngleOfAttack|ChordLength|FreeStreamVelocity|SuctionSideDisplacement|SoundLevel|
+---------+-------------+-----------+------------------+-----------------------+----------+
|     1000|         15.6|     0.1016|              39.6|              0.0528487|   119.224|
|      800|         15.6|     0.1016|              39.6|              0.0528487|   118.964|
|     3150|         15.6|     0.1016|              71.3|              0.0437259|   116.468|
|     1600|         15.6|     0.1016|              39.6|              0.0528487|   114.554|
|      400|         15.6|     0.1016|              39.6|              0.0528487|   120.484|
|     3150|         15.6|     0.1016|              39.6|              0.0528487|   109.254|
|     1250|         15.6|     0.1016|              39.6|              0.0528487|   118.214|
|     2500|         15.6|     0.1016|              39.6|              0.0528487|

                                                                                

In [11]:
# Drop all the duplicate rows from the dataset
df = df.dropDuplicates()

In [12]:
# Check the number of rows after dropping duplicates
rowcount2 = df.count()
print(rowcount2)



1503


                                                                                

**Missing Values**

In [13]:
# Check for missing values
null_df = df.exceptAll(df.dropna())
null_count = null_df.count()
null_count_per_column = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns])

print('Rows with missing values')
null_df.show()

print('Missing values per column')
null_count_per_column.show()

print('Total rows with missing values::', null_count)

                                                                                

Rows with missing values


                                                                                

+---------+-------------+-----------+------------------+-----------------------+----------+
|Frequency|AngleOfAttack|ChordLength|FreeStreamVelocity|SuctionSideDisplacement|SoundLevel|
+---------+-------------+-----------+------------------+-----------------------+----------+
|      630|          0.0|     0.3048|              null|             0.00310138|   128.629|
|     2500|          1.5|     0.3048|              null|             0.00392107|   120.981|
|     null|          0.0|     0.3048|              55.5|             0.00283081|   123.236|
|      800|          3.0|       null|              39.6|             0.00495741|   129.552|
+---------+-------------+-----------+------------------+-----------------------+----------+

Missing values per column




+---------+-------------+-----------+------------------+-----------------------+----------+
|Frequency|AngleOfAttack|ChordLength|FreeStreamVelocity|SuctionSideDisplacement|SoundLevel|
+---------+-------------+-----------+------------------+-----------------------+----------+
|        1|            0|          1|                 2|                      0|         0|
+---------+-------------+-----------+------------------+-----------------------+----------+

Total rows with missing values:: 4


                                                                                

In [14]:
# Drop the missing values
df = df.dropna()

In [15]:
# Check the number of rows after dropping missing values
rowcount3 = df.count()
print(rowcount3)



1499


                                                                                

**Transformation**

In [16]:
# Rename the column 'SoundLevel' to 'SoundLevelDecibels'
df = df.withColumnRenamed('SoundLevel', 'SoundLevelDecibels')

**Save the Cleaned Dataset**

In [17]:
# Reduce the number of partitions in the dataframe to one
df = df.repartition(1)

# Save the dataframe in parquet format
df.write.mode('overwrite').parquet('NASA_airfoil_noise_cleaned.parquet')

                                                                                

**Part 1 Evaluation**

In [18]:
# Part 1 evaluation
print('Part 1 - Evaluation\n')

# Initial number of rows
print('Initial number of rows:', rowcount1, '\n')

# Total rows after droping duplicates
print('Total rows with duplicates:', duplicates_count)
print('Total rows after dropping duplicated rows:', rowcount2, '\n')

# Total rows after dropping missing values
print('Total rows with missing values:', null_count)
print('Total rows after dropping rows with duplicates and missing values:', rowcount3, '\n')

# Renamed column
print('New column name:', df.columns[-1], '\n')

# Check if the cleaned dataset is saved
print('NASA_airfoil_noise_cleaned.parquet exists:', os.path.isdir('NASA_airfoil_noise_cleaned.parquet'))

Part 1 - Evaluation

Initial number of rows: 1522 

Total rows with duplicates: 19
Total rows after dropping duplicated rows: 1503 

Total rows with missing values: 4
Total rows after dropping rows with duplicates and missing values: 1499 

New column name: SoundLevelDecibels 

NASA_airfoil_noise_cleaned.parquet exists: True


## 2. Create a  Machine Learning Pipeline

**Load the Dataset from the Saved File**

In [19]:
# Load the cleaned data into a dataframe
df = spark.read.parquet('NASA_airfoil_noise_cleaned.parquet')

In [20]:
# Check the first five rows
df.show(5)

+---------+-------------+-----------+------------------+-----------------------+------------------+
|Frequency|AngleOfAttack|ChordLength|FreeStreamVelocity|SuctionSideDisplacement|SoundLevelDecibels|
+---------+-------------+-----------+------------------+-----------------------+------------------+
|     4000|          3.0|     0.3048|              31.7|             0.00529514|           115.608|
|     3150|          2.0|     0.2286|              31.7|             0.00372371|           121.527|
|     2000|          7.3|     0.2286|              31.7|              0.0132672|           115.309|
|     2000|          5.4|     0.1524|              71.3|             0.00401199|           131.111|
|      500|          9.9|     0.1524|              71.3|              0.0193001|           131.279|
+---------+-------------+-----------+------------------+-----------------------+------------------+
only showing top 5 rows



                                                                                

In [21]:
# Find the total number of rows
rowcount4 = df.count()
print(rowcount4)

1499


**Define the Pipeline Stages**

Stage 1 - Assemble the input columns into a single column 'features'. Use all the columns except SoundLevelDecibels as input features.

In [22]:
# Define the VectorAssembler pipeline stage
inputCols = [col for col in df.columns if col != 'SoundLevelDecibels']
assembler = VectorAssembler(inputCols=inputCols, outputCol='features')

Stage 2 - Scale the 'features' using standard scaler and store in 'scaledFeatures' column.

In [23]:
#  Define the StandardScaler pipeline stage
scaler = StandardScaler(inputCol='features', outputCol='scaledFeatures')

Stage 3 - Create a LinearRegression model to predict the label 'SoundLevelDecibels'.

In [24]:
# Define the LinearRegression stage
lr = LinearRegression(featuresCol='scaledFeatures', labelCol='SoundLevelDecibels')

**Build the pipeline**

Build a pipeline using the above three stages


In [25]:
# Build the pipeline
pipeline = Pipeline(stages=[assembler, scaler, lr])

**Train/Test Split**

In [26]:
# Split the data into training and testing sets in a ration of 70:30
(trainingData, testingData) = df.randomSplit([0.7, 0.3], seed=42)

**Model Training**

In [27]:
# Fit the pipeline with the training data
pipelineModel = pipeline.fit(trainingData)

24/02/12 09:21:51 WARN util.Instrumentation: [560aecd1] regParam is zero, which might cause numerical instability and overfitting.
24/02/12 09:21:51 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
24/02/12 09:21:51 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
24/02/12 09:21:51 WARN netlib.LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
24/02/12 09:21:51 WARN netlib.LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK


**Part 2 Evaluation**

In [28]:
# Part 2 evaluation
print('Part 2 - Evaluation\n')

# Total rows of the loaded dataset
print('Total rows:', rowcount4, '\n')

# Total pipeline stages
ps = [str(x).split("_")[0] for x in pipeline.getStages()]
print('Pipeline Stage 1:', ps[0])
print('Pipeline Stage 2:', ps[1])
print('Pipeline Stage 3:', ps[2], '\n')

# Features and label
print('Feature columns:', pipelineModel.stages[0].getInputCols())
print('Label column:', lr.getLabelCol())

Part 2 - Evaluation

Total rows: 1499 

Pipeline Stage 1: VectorAssembler
Pipeline Stage 2: StandardScaler
Pipeline Stage 3: LinearRegression 

Feature columns: ['Frequency', 'AngleOfAttack', 'ChordLength', 'FreeStreamVelocity', 'SuctionSideDisplacement']
Label column: SoundLevelDecibels


## 3. Evaluate the Model

**Prediction**

In [29]:
# Make predictions on testing data
predictions = pipelineModel.transform(testingData)

**Evaluation of Model**

MSE

In [30]:
# Calculate the MSE
evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='SoundLevelDecibels', metricName='mse')
mse = evaluator.evaluate(predictions)
print('MSE:', mse)

MSE: 21.69767261083989


MAE

In [31]:
# Calculate the MSE
evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='SoundLevelDecibels', metricName='mae')
mae = evaluator.evaluate(predictions)
print('MAE:', mae)

MAE: 3.674707483470427


R-squared (R2)

In [32]:
# Calculate the R-squared score
evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='SoundLevelDecibels', metricName='r2')
r2 = evaluator.evaluate(predictions)
print('R-squared:', r2)

R-squared: 0.5382661885816811


**Part 3 Evaluation**

In [33]:
# Part 3 evaluation
print('Part 3 - Evaluation\n')

# Total stages
totalstages = len(pipelineModel.stages)
print('Number of stages in the pipeline:', totalstages, '\n')

# Coefficients and intercept of the model
lrModel = pipelineModel.stages[-1]
inputcolumns = pipelineModel.stages[0].getInputCols()
for col,coefficient in zip(inputcolumns, lrModel.coefficients):
    print(f'Coefficient of {col} is {coefficient:.4f}')
print(f'Intercept: {lrModel.intercept:.2f}\n')

# Model Evaluation
print(f'Mean Squared Error: {mse:.2f}')
print(f'Mean Absolute Error: {mae:.2f}')
print(f'R-squared: {r2:.2f}\n')

Part 3 - Evaluation

Number of stages in the pipeline: 3 

Coefficient of Frequency is -4.0307
Coefficient of AngleOfAttack is -2.1385
Coefficient of ChordLength is -3.1762
Coefficient of FreeStreamVelocity is 1.6308
Coefficient of SuctionSideDisplacement is -2.0552
Intercept: 132.23

Mean Squared Error: 21.70
Mean Absolute Error: 3.67
R-squared: 0.54



## 4. Persist the Model

**Save the Model**

In [34]:
# Save the pipeline model
pipelineModel.write().overwrite().save('final_project')

                                                                                

In [35]:
# Check if the model is saved
print('final_project exists:', os.path.isdir('final_project'))

final_project exists: True


**Load the Model**

In [36]:
# Load the pipeline model
loadedPipelineModel = PipelineModel.load('final_project')

                                                                                

**Make predictions with the Loaded Model on the Testdata**

In [37]:
# Use the loaded pipeline model and make predictions using testingData
loaded_predictions = loadedPipelineModel.transform(testingData)

**Compare the Predictions from the Loaded Model and the Original Model**

In [38]:
# Show the top 5 rows of the predictions from the loaded model
print('Predictions from the loaded model')
loaded_predictions.select('SoundLevelDecibels', 'prediction').show(5)

# Show the top 5 rows of predictions from the original model
print('\nPredictions from the original model')
predictions.select('SoundLevelDecibels', 'prediction').show(5)

Predictions from the loaded model
+------------------+------------------+
|SoundLevelDecibels|        prediction|
+------------------+------------------+
|           118.129| 125.1422875397131|
|           128.679|122.70503286089041|
|           130.989|123.66788978786391|
|           134.319|127.56113462661494|
|           124.987|120.83603611988016|
+------------------+------------------+
only showing top 5 rows


Predictions from the original model
+------------------+------------------+
|SoundLevelDecibels|        prediction|
+------------------+------------------+
|           118.129| 125.1422875397131|
|           128.679|122.70503286089041|
|           130.989|123.66788978786391|
|           134.319|127.56113462661494|
|           124.987|120.83603611988016|
+------------------+------------------+
only showing top 5 rows



**Part 4 Evaluation**

In [39]:
# Part 4 evaluation
print('Part 4 - Evaluation\n')

# Total stages
totalstages = len(loadedPipelineModel.stages)
print('Number of stages in the pipeline:', totalstages, '\n')

# Coefficients and intercept of the model
loadedmodel = loadedPipelineModel.stages[-1]
inputcolumns = loadedPipelineModel.stages[0].getInputCols()
for col,coefficient in zip(inputcolumns, loadedmodel.coefficients):
    print(f'Coefficient of {col} is {coefficient:.4f}')
print(f'Intercept: {loadedmodel.intercept:.2f}')

Part 4 - Evaluation

Number of stages in the pipeline: 3 

Coefficient of Frequency is -4.0307
Coefficient of AngleOfAttack is -2.1385
Coefficient of ChordLength is -3.1762
Coefficient of FreeStreamVelocity is 1.6308
Coefficient of SuctionSideDisplacement is -2.0552
Intercept: 132.23


**Stop Spark Session**

In [40]:
# Stop the spark session to free up the memory
spark.stop()

## Authors


[Ramesh Sannareddy](https://www.linkedin.com/in/rsannareddy/?utm_medium=Exinfluencer&utm_source=Exinfluencer&utm_content=000026UJ&utm_term=10006555&utm_id=NA-SkillsNetwork-Channel-SkillsNetworkCoursesIBMBD0231ENSkillsNetwork866-2023-01-01)


### Other Contributors


## Change Log


|Date (YYYY-MM-DD)|Version|Changed By|Change Description|
|-|-|-|-|
|2023-05-26|0.1|Ramesh Sannareddy|Initial Version Created|


Copyright © 2023 IBM Corporation. All rights reserved.
