# ML Pipeline for Airfoil noise prediction


## Scenario


You are a data engineer at an aeronautics consulting company. Your company prides itself in being able to efficiently design airfoils for use in planes and sports cars. Data scientists in your office need to work with different algorithms and data in different formats. While they are good at Machine Learning, they count on you to be able to do ETL jobs and build ML pipelines. In this project you will use the modified version of the NASA Airfoil Self Noise dataset. You will clean this dataset, by dropping the duplicate rows, and removing the rows with null values. You will create an ML pipe line to create a model that will predict the SoundLevel based on all the other columns. You will evaluate the model and towards the end you will persist the model.



## Objectives

In this 4 part assignment you will:

- Part 1 Perform ETL activity
  - Load a csv dataset
  - Remove duplicates if any
  - Drop rows with null values if any
  - Make transformations
  - Store the cleaned data in parquet format
- Part 2 Create a  Machine Learning Pipeline
  - Create a machine learning pipeline for prediction
- Part 3 Evaluate the Model
  - Evaluate the model using relevant metrics
- Part 4 Persist the Model 
  - Save the model for future production use
  - Load and verify the stored model


Diagram of an airfoil. - For informational purpose


![Airfoil with flow](https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/images/Airfoil_with_flow.png)


Diagram showing the Angle of attack. - For informational purpose


![Airfoil angle of attack](https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/images/Airfoil_angle_of_attack.jpg)


## Setup


For this lab, we will be using the following libraries:

*   [`PySpark`](https://spark.apache.org/docs/latest/api/python/index.html?utm_medium=Exinfluencer&utm_source=Exinfluencer&utm_content=000026UJ&utm_term=10006555&utm_id=NA-SkillsNetwork-Channel-SkillsNetworkCoursesIBMSkillsNetworkBD0231ENCoursera2789-2023-01-01) for connecting to the Spark Cluster


### Installing Required Libraries

we need libraries like pyspark and findspark to
 connect to Spark Cluster.


In [1]:
!pip install pyspark==3.1.2 -q
!pip install findspark -q

### Importing Required Libraries

In [2]:
# You can also use this section to suppress warnings generated by your code:
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')

# FindSpark simplifies the process of using Apache Spark with Python

import findspark
findspark.init()

## Part 1 - Perform ETL activity


### Task 1 - Import required libraries


In [3]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.pipeline import PipelineModel
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import StandardScaler
from pyspark.ml.evaluation import RegressionEvaluator

### Task 2 - Create a spark session


In [4]:
#Create a SparkSession

spark = SparkSession.builder.appName("Airfoil noise prediction").getOrCreate()

24/03/01 12:21:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


### Task 3 - Load the csv file into a dataframe


Load the dataset into the spark dataframe

In [5]:
df = spark.read.csv("/kaggle/input/nasa-airfoil-noise-raw/NASA_airfoil_noise_raw.csv", header=True, inferSchema=True)


                                                                                

### Task 4 - Print top 5 rows of the dataset


In [6]:
df.show(5)

+---------+-------------+-----------+------------------+-----------------------+----------+
|Frequency|AngleOfAttack|ChordLength|FreeStreamVelocity|SuctionSideDisplacement|SoundLevel|
+---------+-------------+-----------+------------------+-----------------------+----------+
|      800|          0.0|     0.3048|              71.3|             0.00266337|   126.201|
|     1000|          0.0|     0.3048|              71.3|             0.00266337|   125.201|
|     1250|          0.0|     0.3048|              71.3|             0.00266337|   125.951|
|     1600|          0.0|     0.3048|              71.3|             0.00266337|   127.591|
|     2000|          0.0|     0.3048|              71.3|             0.00266337|   127.461|
+---------+-------------+-----------+------------------+-----------------------+----------+
only showing top 5 rows



### Task 6 - Print the total number of rows in the dataset


In [7]:
rowcount1 = df.count()
print(rowcount1)

1522


### Task 7 - Drop all the duplicate rows from the dataset


In [8]:
df = df.dropDuplicates()

### Task 8 - Print the total number of rows in the dataset


In [9]:
rowcount2 = df.count()
print(rowcount2)




1503


                                                                                

### Task 9 - Drop all the rows that contain null values from the dataset


In [10]:
df = df.dropna()

### Task 10 - Print the total number of rows in the dataset


In [11]:
rowcount3 =df.count()
print(rowcount3)



1499


                                                                                

### Task 11 - Rename the column "SoundLevel" to "SoundLevelDecibels"Drop


In [12]:
df = df.withColumnRenamed("SoundLevel","SoundLevelDecibels")

### Task 12 - Save the dataframe in parquet formant, name the file as "NASA_airfoil_noise_cleaned.parquet"


In [13]:
df.write.parquet("NASA_airfoil_noise_cleaned.parquet")

                                                                                

## Part - 2 Create a  Machine Learning Pipeline


### Task 1 - Load data from "NASA_airfoil_noise_cleaned.parquet" into a dataframe


In [14]:
df = spark.read.parquet("NASA_airfoil_noise_cleaned.parquet")

### Task 2 - Print the total number of rows in the dataset


In [15]:
rowcount4 = df.count()
print(rowcount4)

1499


                                                                                

### Task 3 - Define the VectorAssembler pipeline stage


Stage 1 - Assemble the input columns into a single column "features". Use all the columns except SoundLevelDecibels as input features.


In [16]:
assembler = VectorAssembler(inputCols=["Frequency","AngleOfAttack","ChordLength","FreeStreamVelocity","SuctionSideDisplacement"], outputCol="features")

### Task 4 - Define the StandardScaler pipeline stage


Stage 2 - Scale the "features" using standard scaler and store in "scaledFeatures" column


In [17]:
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=True)

### Task 5 - Define the StandardScaler pipeline stage


Stage 3 - Create a LinearRegression stage to predict "SoundLevelDecibels"

**Note:You need to use the scaledfeatures retreived in the previous step.**


In [18]:
lr = LinearRegression(featuresCol='scaledFeatures', labelCol='SoundLevelDecibels', predictionCol='prediction')

### Task 6 - Build the pipeline


Build a pipeline using the above three stages


In [19]:
pipeline =pipeline = Pipeline(stages=[assembler, scaler, lr])

### Task 7 - Split the data


In [20]:
# Split the data into training and testing sets with 70:30 split.
# set the value of seed to 42
# the above step is very important. DO NOT set the value of seed to any other value other than 42.

(trainingData, testingData) = df.randomSplit([0.7, 0.3], seed=42)


### Task 8 - Fit the pipeline


In [21]:
# Fit the pipeline using the training data

pipelineModel =pipeline.fit(trainingData)


24/03/01 12:21:35 WARN Instrumentation: [b11d2128] regParam is zero, which might cause numerical instability and overfitting.
24/03/01 12:21:36 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
24/03/01 12:21:36 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
24/03/01 12:21:36 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
24/03/01 12:21:36 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
                                                                                

## Part 3 - Evaluate the Model


### Task 1 - Predict using the model


In [22]:
# Make predictions on testing data
predictions =pipelineModel.transform(testingData)


### Task 2 - Print the MSE


In [23]:
evaluator = RegressionEvaluator(labelCol="SoundLevelDecibels", predictionCol="prediction", metricName="mse")
mse = evaluator.evaluate(predictions)

print("Mean Squared Error = ", round(mse,2))

Mean Squared Error =  25.01


### Task 3 - Print the MAE


In [24]:
evaluator = RegressionEvaluator(labelCol="SoundLevelDecibels", predictionCol="prediction", metricName="mae")
mae = evaluator.evaluate(predictions)

print("Mean Absolute Error =", round(mae, 2))


Mean Absolute Error = 3.87


### Task 4 - Print the R-Squared(R2)


In [25]:
evaluator = RegressionEvaluator(labelCol="SoundLevelDecibels", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(predictions)

print("R Squared =", round(r2, 2))


R Squared = 0.51


## Part 4 - Persist the Model


### Task 1 - Save the model to the path "ML_model"


In [26]:
# Save the pipeline model as "ML_model"
model_path = "ML_model"
pipelineModel.write().overwrite().save(model_path)

### Task 2 - Load the model from the path "Final_Project"


In [27]:
# Load the pipeline model you have created in the previous step
loadedPipelineModel = PipelineModel.load(model_path)


                                                                                

### Task 3 - Make predictions using the loaded model on the testdata


In [28]:
# Use the loaded pipeline model and make predictions using testingData
loadedPredictions = loadedPipelineModel.transform(testingData)


### Task 4 - Show the predictions


In [29]:
#show top 5 rows from the predections dataframe. Display only the label column and predictions
#your code goes here
loadedPredictions.select("SoundLevelDecibels", "prediction").show(5, truncate=False)

+------------------+------------------+
|SoundLevelDecibels|prediction        |
+------------------+------------------+
|128.545           |121.03522307921861|
|130.898           |122.44608224351214|
|109.951           |127.63255285986497|
|112.506           |129.25004941528468|
|130.089           |122.16654986012726|
+------------------+------------------+
only showing top 5 rows



### Stop Spark Session


In [30]:
spark.stop()