## Build a Spark ML Pipeline for Airfoil noise prediction


# Apache Spark Final Project

This project was created as part of the IBM Machine Learning with Apache Spark course and demonstrates Spark-based ETL, data analysis, and ML pipeline training using PySpark.

All code is original, developed for learning and demonstration purposes only.

## Scenario


You are a data engineer at an aeronautics consulting company. Your company prides itself in being able to efficiently design airfoils for use in planes and sports cars. Data scientists in your office need to work with different algorithms and data in different formats. While they are good at Machine Learning, they count on you to be able to do ETL jobs and build ML pipelines. In this project you will use the modified version of the NASA Airfoil Self Noise dataset. You will clean this dataset, by dropping the duplicate rows, and removing the rows with null values. You will create an ML pipe line to create a model that will predict the SoundLevel based on all the other columns. You will evaluate the model and towards the end you will persist the model.



## Datasets

In this lab you will be using dataset(s):

 - The original dataset can be found here NASA airfoil self noise dataset. https://archive.ics.uci.edu/dataset/291/airfoil+self+noise
 
 - This dataset is licensed under a Creative Commons Attribution 4.0 International (CC BY 4.0) license.


Diagram of an airfoil. - For informational purpose


![Airfoil with flow](https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/images/Airfoil_with_flow.png)


Diagram showing the Angle of attack. - For informational purpose


![Airfoil angle of attack](https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/images/Airfoil_angle_of_attack.jpg)


In [None]:
%pip install pyspark==3.1.2 -q
%pip install findspark -q

### Importing Required Libraries


In [None]:
# You can also use this section to suppress warnings generated by your code:
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')

# FindSpark simplifies the process of using Apache Spark with Python

import findspark
findspark.init()

## Part 1 - Perform ETL activity


### Task 1 - Import required libraries


In [None]:
#your code goes here
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StringIndexer, Tokenizer, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator

### Task 2 - Create a spark session


In [None]:
#Create a SparkSession

spark = SparkSession.builder.appName("Final Project").getOrCreate()

### Task 3 - Load the csv file into a dataframe


Download the data file.

NOTE : Please ensure you use the dataset below and not the original dataset mentioned above.


In [None]:
!wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/datasets/NASA_airfoil_noise_raw.csv


Load the dataset into the spark dataframe


In [None]:
# Load the dataset that you have downloaded in the previous task

df = spark.read.csv("NASA_airfoil_noise_raw.csv", header=True, inferSchema=True)


### Task 4 - Print top 5 rows of the dataset


In [None]:
#your code goes here
df.show(5)

### Task 6 - Print the total number of rows in the dataset


In [None]:
#your code goes here
rowcount1 = df.count()
print(rowcount1)

### Task 7 - Drop all the duplicate rows from the dataset


In [None]:
df = df.dropDuplicates()


### Task 8 - Print the total number of rows in the dataset


In [None]:
#your code goes here

rowcount2 = df.count()
print(rowcount2)


### Task 9 - Drop all the rows that contain null values from the dataset


In [None]:
df = df.dropna()


### Task 10 - Print the total number of rows in the dataset


In [None]:
#your code goes here

rowcount3 = df.count()
print(rowcount3)


### Task 11 - Rename the column "SoundLevel" to "SoundLevelDecibels"


In [None]:
# your code goes here

df = df.withColumnRenamed("SoundLevel", "SoundLevelDecibels")


### Task 12 - Save the dataframe in parquet format, name the file as "NASA_airfoil_noise_cleaned.parquet"


In [None]:
# your code goes here
df.write.mode("overwrite").parquet("NASA_airfoil_noise_cleaned.parquet")


## Part - 2 Create a  Machine Learning Pipeline


### Task 1 - Load data from "NASA_airfoil_noise_cleaned.parquet" into a dataframe


In [None]:
#your code goes here

df = spark.read.parquet("NASA_airfoil_noise_cleaned.parquet")


### Task 2 - Print the total number of rows in the dataset


In [None]:
#your code goes here

rowcount4 = df.count()
print(rowcount4)



### Task 3 - Define the VectorAssembler pipeline stage


In [None]:
#your code goes here Frequency|AngleOfAttack|ChordLength|FreeStreamVelocity|SuctionSideDisplacement|SoundLevel
assembler = VectorAssembler(inputCols=["Frequency", "AngleOfAttack", "ChordLength", "FreeStreamVelocity", "SuctionSideDisplacement"], outputCol="features")

### Task 4 - Define the StandardScaler pipeline stage


In [None]:
#your code goes here

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")


### Task 5 - Define the Model creation pipeline stage


In [None]:
#your code goes here

lr = LinearRegression(featuresCol="scaledFeatures", labelCol="SoundLevelDecibels")
lr2 = LinearRegression(featuresCol="features", labelCol="SoundLevelDecibels", 
                       regParam=1.0, elasticNetParam=0.0)

### Task 6 - Build the pipeline


In [None]:
#your code goes here

pipeline = Pipeline(stages=[assembler, scaler, lr])
pipeline2 = Pipeline(stages=[assembler, scaler, lr2])


### Task 7 - Split the data


In [None]:
(trainingData, testingData) = df.randomSplit([0.7, 0.3], seed=42)

### Task 8 - Fit the pipeline


In [None]:
pipelineModel = pipeline.fit(trainingData)
pipelineModel2 = pipeline2.fit(trainingData)

## Part 3 - Evaluate the Model


### Task 1 - Predict using the model


In [None]:
predictions = pipelineModel.transform(testingData)
predictions2 = pipelineModel2.transform(testingData)


### Task 2 - Print the MSE


In [None]:
#your code goes here

evaluator = RegressionEvaluator(labelCol="SoundLevelDecibels", predictionCol="prediction", metricName="mse")
mse = evaluator.evaluate(predictions)
mse2 = evaluator.evaluate(predictions2)
print(f'MSE with Linear Regression: {mse}, MSE with Ridge Regression: {mse2}')


### Task 3 - Print the MAE


In [None]:
#your code goes here

evaluator = RegressionEvaluator(labelCol="SoundLevelDecibels", predictionCol="prediction", metricName="mae")
mae = evaluator.evaluate(predictions)
mae2 = evaluator.evaluate(predictions2)
print(f'MAE with Linear Regression: {mae}, MAE with Ridge Regression: {mae2}')


### Task 4 - Print the R-Squared(R2)


In [None]:
#your code goes here

evaluator = RegressionEvaluator(labelCol="SoundLevelDecibels", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(predictions)
r2_2 = evaluator.evaluate(predictions2)
print(f'R2 with Linear Regression: {r2}, R2 with Ridge Regression: {r2_2}')


## Part 4 - Persist the Model


In [None]:
# Save the pipeline model as "Final_Project"
# your code goes here
pipelineModel.write().overwrite().save("Final_Project_Model")

### Task 2 - Load the model from the path "Final_Project"


In [None]:
# Load the pipeline model you have created in the previous step
from pyspark.ml import PipelineModel
loadedPipelineModel = PipelineModel.load("Final_Project_Model")


### Task 3 - Make predictions using the loaded model on the testdata


In [None]:
# Use the loaded pipeline model and make predictions using testingData
predictions = loadedPipelineModel.transform(testingData)


### Task 4 - Show the predictions


In [None]:
#show top 5 rows from the predections dataframe. Display only the label column and predictions
#your code goes here
predictions.select("SoundLevelDecibels", "prediction").show(5)

### Stop Spark Session


In [None]:
spark.stop()

<!--
## Change Log
-->


<!--
|Date (YYYY-MM-DD)|Version|Changed By|Change Description|
|-|-|-|-|
|2023-05-26|0.1|Ramesh Sannareddy|Initial Version Created|
-->
