## **Importing the required libraries**

In [18]:
#Importing all the required libraries for PySpark analysis
import findspark
findspark.init()
from pyspark.sql import SparkSession as ss
from pyspark.ml.feature import VectorAssembler as va, StandardScaler as sts, StringIndexer as si
from pyspark.ml.regression import LinearRegression as lir
from pyspark.ml.evaluation import  RegressionEvaluator as re
from pyspark.ml import Pipeline as pl
from pyspark.ml.pipeline import PipelineModel as plm

## **Starting the Spark Session** 

In [2]:
# We start the spark application
spark = ss.builder.appName('Final Project').getOrCreate()

## **Loading the dataset**

In [3]:
#We first download the data from the website
!wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/datasets/NASA_airfoil_noise_raw.csv

--2024-03-18 14:13:13--  https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/datasets/NASA_airfoil_noise_raw.csv
Resolving cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)... 169.63.118.104
Connecting to cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)|169.63.118.104|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 60682 (59K) [text/csv]
Saving to: ‘NASA_airfoil_noise_raw.csv’


2024-03-18 14:13:14 (278 KB/s) - ‘NASA_airfoil_noise_raw.csv’ saved [60682/60682]



In [4]:
#Now, we load the dataset into the Spark Dataframe
df = spark.read.csv('NASA_airfoil_noise_raw.csv', header= True, inferSchema= True)

In [5]:
#Let's verify whether the data has been loaded by checking the Schema...
df.printSchema()

root
 |-- Frequency: integer (nullable = true)
 |-- AngleOfAttack: double (nullable = true)
 |-- ChordLength: double (nullable = true)
 |-- FreeStreamVelocity: double (nullable = true)
 |-- SuctionSideDisplacement: double (nullable = true)
 |-- SoundLevel: double (nullable = true)



In [6]:
#...and by print the top 5 rows of the dataset
df.show(5)

+---------+-------------+-----------+------------------+-----------------------+----------+
|Frequency|AngleOfAttack|ChordLength|FreeStreamVelocity|SuctionSideDisplacement|SoundLevel|
+---------+-------------+-----------+------------------+-----------------------+----------+
|      800|          0.0|     0.3048|              71.3|             0.00266337|   126.201|
|     1000|          0.0|     0.3048|              71.3|             0.00266337|   125.201|
|     1250|          0.0|     0.3048|              71.3|             0.00266337|   125.951|
|     1600|          0.0|     0.3048|              71.3|             0.00266337|   127.591|
|     2000|          0.0|     0.3048|              71.3|             0.00266337|   127.461|
+---------+-------------+-----------+------------------+-----------------------+----------+
only showing top 5 rows



## **Cleaning the dataset**

In [7]:
#Now, let's clean the dataset
rowcount1 = df.count()
print(rowcount1)

1522


In [8]:
#Let's remove duplicate and missing values
df = df.dropDuplicates()
rowcount2 = df.count()
print(rowcount2)
df = df.dropna()
rowcount3 = df.count()
print(rowcount3)

1503
1499


In [9]:
#Let's rename the column soundlevel to souundleveldecibels
df = df.withColumnRenamed('SoundLevel', 'SoundLevelDecibels')

In [10]:
#Let's save the cleaned dataframe as parquet fil
df.write.mode('overwrite').parquet('NASA_airfoil_noise_cleaned.parquet')

In [12]:
#Let's verify whether all the steps are correct
print("Part 1 - Evaluation")

print("Total rows = ", rowcount1)
print("Total rows after dropping duplicate rows = ", rowcount2)
print("Total rows after dropping duplicate rows and rows with null values = ", rowcount3)
print("New column name = ", df.columns[-1])

import os

print("NASA_airfoil_noise_cleaned.parquet exists :", os.path.isdir("NASA_airfoil_noise_cleaned.parquet"))

Part 1 - Evaluation
Total rows =  1522
Total rows after dropping duplicate rows =  1503
Total rows after dropping duplicate rows and rows with null values =  1499
New column name =  SoundLevelDecibels
NASA_airfoil_noise_cleaned.parquet exists : True


## **Creating a ML Pipeline**

In [14]:
#Let's load the dataset from the parquet file
df = spark.read.parquet('NASA_airfoil_noise_cleaned.parquet', header= True, inferSchema= True)
rowcount4 = df.count() #To verify whether the dataset has been loaded properly. Should be equal to rowcount3
df.printSchema()

root
 |-- Frequency: integer (nullable = true)
 |-- AngleOfAttack: double (nullable = true)
 |-- ChordLength: double (nullable = true)
 |-- FreeStreamVelocity: double (nullable = true)
 |-- SuctionSideDisplacement: double (nullable = true)
 |-- SoundLevelDecibels: double (nullable = true)



In [15]:
#Let's create the stages of the pipeline
vector = va(inputCols=['Frequency', 'AngleOfAttack', 'ChordLength', 'FreeStreamVelocity', 'SuctionSideDisplacement'], outputCol='features')
scaler = sts(inputCol='features', outputCol='scaledFeatures')
lr = lir(labelCol='SoundLevelDecibels', featuresCol= 'scaledFeatures')

In [19]:
#Let's create the pipeline
pipe = pl(stages= [vector, scaler, lr])

In [20]:
#Let's split the dataset for training and testing
(training_data, test_data) = df.randomSplit([0.7, 0.3], seed=42)

In [21]:
# Let's create the model and fit our data
pipelineModel = pipe.fit(training_data)

In [23]:
#Let's verify the steps are correct
print("Part 2 - Evaluation")
print("Total rows = ", rowcount4)
ps = [str(x).split("_")[0] for x in pipe.getStages()]

print("Pipeline Stage 1 = ", ps[0])
print("Pipeline Stage 2 = ", ps[1])
print("Pipeline Stage 3 = ", ps[2])

print("Label column = ", lr.getLabelCol())

Part 2 - Evaluation
Total rows =  1499
Pipeline Stage 1 =  VectorAssembler
Pipeline Stage 2 =  StandardScaler
Pipeline Stage 3 =  LinearRegression
Label column =  SoundLevelDecibels


## **Model Evaluation**

In [24]:
#Now, let's evaluate our model
predictions = pipelineModel.transform(test_data)

In [26]:
#Let's calculate Mean Squared Error
evaluator = re(labelCol= 'SoundLevelDecibels', predictionCol= 'prediction', metricName= 'mse')
mse = evaluator.evaluate(predictions)
print('Mean Squared Error:', mse)

Mean Squared Error: 24.99766625502418


In [27]:
#Let's calculate Mean Absolute Error
evaluator = re(labelCol= 'SoundLevelDecibels', predictionCol= 'prediction', metricName= 'mae')
mae = evaluator.evaluate(predictions)
print('Mean Absolute Error:', mae)

Mean Absolute Error: 3.9136790958812044


In [28]:
#Let's calculate R Squared
evaluator = re(labelCol= 'SoundLevelDecibels', predictionCol= 'prediction', metricName= 'r2')
r2 = evaluator.evaluate(predictions)
print('R Squared:', r2)

R Squared: 0.4959688408974623


In [29]:
#Let's verify the steps 
print("Part 3 - Evaluation")

print("Mean Squared Error = ", round(mse,2))
print("Mean Absolute Error = ", round(mae,2))
print("R Squared = ", round(r2,2))

lrModel = pipelineModel.stages[-1]

print("Intercept = ", round(lrModel.intercept,2))

Part 3 - Evaluation
Mean Squared Error =  25.0
Mean Absolute Error =  3.91
R Squared =  0.5
Intercept =  132.88


## **Model Persistence**

In [30]:
#Let's save the model
pipelineModel.write().overwrite().save('./Final_Project/')

In [31]:
#Let's load the model 
loadedpipelineModel = pipelineModel.load('Final_Project')

In [32]:
#Let's predict using the load model
predictions_new = loadedpipelineModel.transform(test_data)
predictions_new.select('prediction').show()

+------------------+
|        prediction|
+------------------+
|122.59722914376778|
|127.37968204568838|
|130.34077425074506|
|131.11016975113537|
|127.12627360125096|
|127.89456373905155|
|131.06220981224084|
|125.73739953848445|
|121.53249832197925|
|124.20059665619313|
|125.87997778533571|
|125.24362112904095|
|126.06429872612995|
|127.67830278943778|
|121.25022147564815|
|123.31966959832609|
|124.20046348885936|
| 126.1606883964179|
|122.53378592206057|
|123.42922049990014|
+------------------+
only showing top 20 rows



In [33]:
#Let's verify the steps
print("Part 4 - Evaluation")

loadedmodel = loadedpipelineModel.stages[-1]
totalstages = len(loadedpipelineModel.stages)
inputcolumns = loadedpipelineModel.stages[0].getInputCols()

print("Number of stages in the pipeline = ", totalstages)
for i,j in zip(inputcolumns, loadedmodel.coefficients):
    print(f"Coefficient for {i} is {round(j,4)}")

Part 4 - Evaluation
Number of stages in the pipeline =  3
Coefficient for Frequency is -3.9906
Coefficient for AngleOfAttack is -2.2881
Coefficient for ChordLength is -3.3269
Coefficient for FreeStreamVelocity is 1.4832
Coefficient for SuctionSideDisplacement is -2.0551


## **Stop the session**

In [34]:
spark.stop()