## Build an ML Pipeline for Airfoil noise prediction


## Datasets

In this lab you will be using dataset(s):

 - The original dataset can be found here NASA airfoil self noise dataset. https://archive.ics.uci.edu/dataset/291/airfoil+self+noise
 
 - This dataset is licensed under a Creative Commons Attribution 4.0 International (CC BY 4.0) license.


## Before you Start


## Setup


In [1]:
!pip install pyspark==3.1.2 -q
!pip install findspark -q

### Importing Required Libraries

_We recommend you import all required libraries in one place (here):_


In [2]:
# You can also use this section to suppress warnings generated by your code:
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')

# FindSpark simplifies the process of using Apache Spark with Python

import findspark
findspark.init()

## Part 1 - Perform ETL activity


### Task 1 - Import required libraries


In [3]:
#your code goes here
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.pipeline import PipelineModel
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import StandardScaler



### Task 2 - Create a spark session


In [5]:
#Create a SparkSession

spark = SparkSession.builder.appName("Project").getOrCreate()

24/02/28 01:38:06 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


### Task 3 - Load the csv file into a dataframe


In [6]:
!wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/datasets/NASA_airfoil_noise_raw.csv


--2024-02-28 01:38:14--  https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/datasets/NASA_airfoil_noise_raw.csv
Resolving cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)... 169.63.118.104, 169.63.118.104
Connecting to cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)|169.63.118.104|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 60682 (59K) [text/csv]
Saving to: ‘NASA_airfoil_noise_raw.csv’


2024-02-28 01:38:14 (43.9 MB/s) - ‘NASA_airfoil_noise_raw.csv’ saved [60682/60682]



Load the dataset into the spark dataframe


In [8]:
# Load the dataset that you have downloaded in the previous task

df = spark.read.csv("NASA_airfoil_noise_raw.csv",header =True, inferSchema = True)


                                                                                

### Task 4 - Print top 5 rows of the dataset


In [9]:
#your code goes here
df.show()

+---------+-------------+-----------+------------------+-----------------------+----------+
|Frequency|AngleOfAttack|ChordLength|FreeStreamVelocity|SuctionSideDisplacement|SoundLevel|
+---------+-------------+-----------+------------------+-----------------------+----------+
|      800|          0.0|     0.3048|              71.3|             0.00266337|   126.201|
|     1000|          0.0|     0.3048|              71.3|             0.00266337|   125.201|
|     1250|          0.0|     0.3048|              71.3|             0.00266337|   125.951|
|     1600|          0.0|     0.3048|              71.3|             0.00266337|   127.591|
|     2000|          0.0|     0.3048|              71.3|             0.00266337|   127.461|
|     2500|          0.0|     0.3048|              71.3|             0.00266337|   125.571|
|     3150|          0.0|     0.3048|              71.3|             0.00266337|   125.201|
|     4000|          0.0|     0.3048|              71.3|             0.00266337|

### Task 6 - Print the total number of rows in the dataset


In [10]:
#your code goes here
rowcount1 = df.count()
print(rowcount1)

1522


### Task 7 - Drop all the duplicate rows from the dataset


In [11]:
df = df.dropDuplicates()


### Task 8 - Print the total number of rows in the dataset


In [12]:
#your code goes here

rowcount2 = df.count()
print(rowcount2)




1503


                                                                                

### Task 9 - Drop all the rows that contain null values from the dataset


In [13]:
df = df.dropna()


### Task 10 - Print the total number of rows in the dataset


In [14]:
#your code goes here

rowcount3 = df.count()
print(rowcount3)




1499


                                                                                

### Task 11 - Rename the column "SoundLevel" to "SoundLevelDecibels"Drop


In [15]:
# your code goes here

df = df.withColumnRenamed("SoundLevel","SoundLevelDecibals")


### Task 12 - Save the dataframe in parquet formant, name the file as "NASA_airfoil_noise_cleaned.parquet"


In [18]:
# your code goes here
df.write.parquet("NASA_airfoil_noise_cleaned.parquet")


AnalysisException: 'path file:/resources/labs/authoride/IBMSkillsNetwork+BD0231EN/labs/NASA_airfoil_noise_cleaned.parquet already exists.;'

#### Part 1 - Evaluation


In [19]:
print("Part 1 - Evaluation")

print("Total rows = ", rowcount1)
print("Total rows after dropping duplicate rows = ", rowcount2)
print("Total rows after dropping duplicate rows and rows with null values = ", rowcount3)
print("New column name = ", df.columns[-1])

import os

print("NASA_airfoil_noise_cleaned.parquet exists :", os.path.isdir("NASA_airfoil_noise_cleaned.parquet"))

Part 1 - Evaluation
Total rows =  1522
Total rows after dropping duplicate rows =  1503
Total rows after dropping duplicate rows and rows with null values =  1499
New column name =  SoundLevelDecibals
NASA_airfoil_noise_cleaned.parquet exists : True


## Part - 2 Create a  Machine Learning Pipeline


### Task 1 - Load data from "NASA_airfoil_noise_cleaned.parquet" into a dataframe


In [20]:
#your code goes here

df = spark.read.parquet("NASA_airfoil_noise_cleaned.parquet")


### Task 2 - Print the total number of rows in the dataset


In [22]:
#your code goes here

rowcount4 = df.count()
print(rowcount4)

print(df.show())

                                                                                

1499
+---------+-------------+-----------+------------------+-----------------------+------------------+
|Frequency|AngleOfAttack|ChordLength|FreeStreamVelocity|SuctionSideDisplacement|SoundLevelDecibals|
+---------+-------------+-----------+------------------+-----------------------+------------------+
|      630|          0.0|     0.3048|              31.7|             0.00331266|           129.095|
|     4000|          0.0|     0.3048|              31.7|             0.00331266|           118.145|
|     4000|          1.5|     0.3048|              39.6|             0.00392107|           117.741|
|      800|          4.0|     0.3048|              71.3|             0.00497773|           131.755|
|     1250|          0.0|     0.2286|              31.7|              0.0027238|           128.805|
|     2500|          4.0|     0.2286|              55.5|              0.0042862|           122.384|
|     1250|          7.3|     0.2286|              71.3|              0.0104404|           127.

### Task 3 - Define the VectorAssembler pipeline stage


In [23]:
#your code goes here
assembler = VectorAssembler(inputCols=['Frequency','AngleOfAttack','ChordLength','FreeStreamVelocity','SuctionSideDisplacement'],outputCol="features")



### Task 4 - Define the StandardScaler pipeline stage


In [27]:
#your code goes here

scaler = StandardScaler(inputCol="features",outputCol='scaledFeatures')


### Task 5 - Define the StandardScaler pipeline stage


In [36]:
#your code goes here

lr = LinearRegression(featuresCol="features",labelCol="SoundLevelDecibals")


### Task 6 - Build the pipeline


Build a pipeline using the above three stages


In [37]:
#your code goes here
pipeline = Pipeline(stages=[assembler, scaler, lr])


### Task 7 - Split the data


In [38]:
# Split the data into training and testing sets with 70:30 split.
# set the value of seed to 42
# the above step is very important. DO NOT set the value of seed to any other value other than 42.

#your code goes here

(trainingData, testingData) = df.randomSplit([0.7,0.3],seed = 42)



### Task 8 - Fit the pipeline


In [39]:
# Fit the pipeline using the training data
# your code goes here

pipelineModel = pipeline.fit(trainingData)


24/02/28 02:03:18 WARN util.Instrumentation: [23a30756] regParam is zero, which might cause numerical instability and overfitting.
[Stage 26:>                                                         (0 + 8) / 8]24/02/28 02:03:21 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
24/02/28 02:03:21 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
24/02/28 02:03:21 WARN netlib.LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
24/02/28 02:03:21 WARN netlib.LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
                                                                                

#### Part 2 - Evaluation


In [40]:
print("Part 2 - Evaluation")
print("Total rows = ", rowcount4)
ps = [str(x).split("_")[0] for x in pipeline.getStages()]

print("Pipeline Stage 1 = ", ps[0])
print("Pipeline Stage 2 = ", ps[1])
print("Pipeline Stage 3 = ", ps[2])

print("Label column = ", lr.getLabelCol())

Part 2 - Evaluation
Total rows =  1499
Pipeline Stage 1 =  VectorAssembler
Pipeline Stage 2 =  StandardScaler
Pipeline Stage 3 =  LinearRegression
Label column =  SoundLevelDecibals


## Part 3 - Evaluate the Model


### Task 1 - Predict using the model


In [41]:
# Make predictions on testing data
# your code goes here

predictions = pipelineModel.transform(testingData)


### Task 2 - Print the MSE


In [42]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="SoundLevelDecibals", metricName="mse")
mse = evaluator.evaluate(predictions)
print(mse)




22.5937540713487


                                                                                

### Task 3 - Print the MAE


In [47]:
#your code goes here

evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="SoundLevelDecibals", metricName="mae")
mae = evaluator.evaluate(predictions)
print(mae)
print(predictions)

[Stage 41:>                                                         (0 + 8) / 8]

3.7336902294630927
DataFrame[Frequency: int, AngleOfAttack: double, ChordLength: double, FreeStreamVelocity: double, SuctionSideDisplacement: double, SoundLevelDecibals: double, features: vector, scaledFeatures: vector, prediction: double]


                                                                                

### Task 4 - Print the R-Squared(R2)


In [48]:
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="SoundLevelDecibals", metricName="r2")
r2 = evaluator.evaluate(predictions)
print(r2)


[Stage 43:>                                                         (0 + 8) / 8]

0.542601650868908


                                                                                

#### Part 3 - Evaluation


In [49]:
print("Part 3 - Evaluation")

print("Mean Squared Error = ", round(mse,2))
print("Mean Absolute Error = ", round(mae,2))
print("R Squared = ", round(r2,2))

lrModel = pipelineModel.stages[-1]

print("Intercept = ", round(lrModel.intercept,2))


Part 3 - Evaluation
Mean Squared Error =  0.54
Mean Absolute Error =  3.73
R Squared =  0.54
Intercept =  132.6


## Part 4 - Persist the Model


### Task 1 - Save the model to the path "Final_Project"


In [50]:
# Save the pipeline model as "Final_Project"
# your code goes here
pipelineModel.write().save("Practice_Project")

                                                                                

### Task 2 - Load the model from the path "Final_Project"


In [52]:
# Load the pipeline model you have created in the previous step
loadedPipelineModel = PipelineModel.load("Practice_Project")



### Task 3 - Make predictions using the loaded model on the testdata


In [53]:
# Use the loaded pipeline model and make predictions using testingData
predictions = loadedPipelineModel.transform(testingData)


### Task 4 - Show the predictions


In [55]:
#show top 5 rows from the predections dataframe. Display only the label column and predictions
#your code goes here
predictions.select("SoundLevelDecibals","prediction").show()

[Stage 65:>                                                         (0 + 1) / 1]

+------------------+------------------+
|SoundLevelDecibals|        prediction|
+------------------+------------------+
|           127.315|123.64344009624709|
|           119.975| 123.4869578861485|
|           121.783|124.38983849684239|
|           127.224|121.44706993294264|
|           122.229|125.68312652454149|
|           122.754|119.00135887553695|
|           127.564|126.52607365319822|
|           126.149|124.72369322766558|
|           120.076|129.24665689814103|
|           138.123| 130.6295186434797|
|           127.314| 127.1308953309623|
|           130.715|125.89163510250563|
|           129.367| 129.3042308895184|
|           122.905| 129.9451290107517|
|           127.127|128.10022579415545|
|           127.417|126.28072873047742|
|           128.698|122.06234864411724|
|           131.073|122.38819030163245|
|           135.368|125.76877819819667|
|           124.514|125.43708134590958|
+------------------+------------------+
only showing top 20 rows



                                                                                

#### Part 4 - Evaluation



Run the code cell below.<br>
Use the answers here to answer the final evaluation quiz in the next section.<br>
If the code throws up any errors, go back and review the code you have written.


In [56]:
print("Part 4 - Evaluation")

loadedmodel = loadedPipelineModel.stages[-1]
totalstages = len(loadedPipelineModel.stages)
inputcolumns = loadedPipelineModel.stages[0].getInputCols()

print("Number of stages in the pipeline = ", totalstages)
for i,j in zip(inputcolumns, loadedmodel.coefficients):
    print(f"Coefficient for {i} is {round(j,4)}")

Part 4 - Evaluation
Number of stages in the pipeline =  3
Coefficient for Frequency is -0.0013
Coefficient for AngleOfAttack is -0.4111
Coefficient for ChordLength is -36.1575
Coefficient for FreeStreamVelocity is 0.1019
Coefficient for SuctionSideDisplacement is -135.2067


### Stop Spark Session


In [57]:
spark.stop()

## Authors


[Ramesh Sannareddy](https://www.linkedin.com/in/rsannareddy/?utm_medium=Exinfluencer&utm_source=Exinfluencer&utm_content=000026UJ&utm_term=10006555&utm_id=NA-SkillsNetwork-Channel-SkillsNetworkCoursesIBMBD0231ENSkillsNetwork866-2023-01-01)


### Other Contributors


## Change Log


|Date (YYYY-MM-DD)|Version|Changed By|Change Description|
|-|-|-|-|
|2023-05-26|0.1|Ramesh Sannareddy|Initial Version Created|


Copyright © 2023 IBM Corporation. All rights reserved.
