# ML Pipeline for Airfoil noise prediction


## Objectives


- Part 1 Perform ETL activity
 
- Part 2 Create a  Machine Learning Pipeline
 
- Part 3 Evaluate the Model
  
- Part 4 Persist the Model 
 


Diagram of an airfoil


![Airfoil with flow](https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/images/Airfoil_with_flow.png)


Diagram showing the Angle of attack. - For informational purpose


![Airfoil angle of attack](https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/images/Airfoil_angle_of_attack.jpg)




*   [`PySpark`](https://spark.apache.org/docs/latest/api/python/index.html?utm_medium=Exinfluencer&utm_source=Exinfluencer&utm_content=000026UJ&utm_term=10006555&utm_id=NA-SkillsNetwork-Channel-SkillsNetworkCoursesIBMSkillsNetworkBD0231ENCoursera2789-2023-01-01) for connecting to the Spark Cluster


In [1]:
!pip install pyspark==3.1.2 -q
!pip install findspark -q

In [2]:

def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')



import findspark
findspark.init()

## Part 1 - Perform ETL activity


In [3]:
#your code goes here

#your code goes here
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.pipeline import PipelineModel
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import StandardScaler


In [4]:
#Create a SparkSession

spark = SparkSession.builder.appName("Project").getOrCreate()

24/01/23 00:02:35 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [5]:
!wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/datasets/NASA_airfoil_noise_raw.csv


--2024-01-23 00:02:40--  https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/datasets/NASA_airfoil_noise_raw.csv
Resolving cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)... 169.63.118.104, 169.63.118.104
Connecting to cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)|169.63.118.104|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 60682 (59K) [text/csv]
Saving to: ‘NASA_airfoil_noise_raw.csv’


2024-01-23 00:02:40 (51.0 MB/s) - ‘NASA_airfoil_noise_raw.csv’ saved [60682/60682]



Load the dataset into the spark dataframe


In [6]:

df = spark.read.csv("NASA_airfoil_noise_raw.csv",header=True,inferSchema=True)


                                                                                

In [7]:

df.show(5)

+---------+-------------+-----------+------------------+-----------------------+----------+
|Frequency|AngleOfAttack|ChordLength|FreeStreamVelocity|SuctionSideDisplacement|SoundLevel|
+---------+-------------+-----------+------------------+-----------------------+----------+
|      800|          0.0|     0.3048|              71.3|             0.00266337|   126.201|
|     1000|          0.0|     0.3048|              71.3|             0.00266337|   125.201|
|     1250|          0.0|     0.3048|              71.3|             0.00266337|   125.951|
|     1600|          0.0|     0.3048|              71.3|             0.00266337|   127.591|
|     2000|          0.0|     0.3048|              71.3|             0.00266337|   127.461|
+---------+-------------+-----------+------------------+-----------------------+----------+
only showing top 5 rows



In [8]:
rowcount1 = df.count()
print(rowcount1)

1522


In [10]:
df = df.drop_duplicates()


In [14]:


df = df.withColumnRenamed("SoundLevel", "SoundLevelDecibels")


In [15]:


df.write.parquet("NASA_airfoil_noise_cleaned.parquet")

[Stage 12:>                                                       (0 + 8) / 200]24/01/23 00:05:08 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
24/01/23 00:05:08 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers
24/01/23 00:05:08 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
                                                                                

In [16]:
print("Part 1 - Evaluation")

print("Total rows = ", rowcount1)
print("Total rows after dropping duplicate rows = ", rowcount2)
print("Total rows after dropping duplicate rows and rows with null values = ", rowcount3)
print("New column name = ", df.columns[-1])

import os

print("NASA_airfoil_noise_cleaned.parquet exists :", os.path.isdir("NASA_airfoil_noise_cleaned.parquet"))

Part 1 - Evaluation
Total rows =  1522
Total rows after dropping duplicate rows =  1503
Total rows after dropping duplicate rows and rows with null values =  1499
New column name =  SoundLevelDecibels
NASA_airfoil_noise_cleaned.parquet exists : True


## Part - 2 Create a  Machine Learning Pipeline


In [17]:

df = spark.read.parquet("NASA_airfoil_noise_cleaned.parquet")


In [18]:


rowcount4 = df.count()
print(rowcount4)





1499


                                                                                

In [19]:
df.printSchema()

root
 |-- Frequency: integer (nullable = true)
 |-- AngleOfAttack: double (nullable = true)
 |-- ChordLength: double (nullable = true)
 |-- FreeStreamVelocity: double (nullable = true)
 |-- SuctionSideDisplacement: double (nullable = true)
 |-- SoundLevelDecibels: double (nullable = true)



### Define the VectorAssembler pipeline stage


In [21]:

assembler = VectorAssembler(inputCols=["Frequency","AngleOfAttack","ChordLength","FreeStreamVelocity","SuctionSideDisplacement"],outputCol="features")



### Define the StandardScaler pipeline stage


In [23]:


scaler = StandardScaler(inputCol="features",outputCol="scaled_features")


In [24]:


lr = LinearRegression(featuresCol="features", labelCol="SoundLevelDecibels")

### Build the pipeline


In [25]:


pipeline = Pipeline(stages=[assembler, scaler, lr])


In [26]:


(trainingData, testingData) = df.randomSplit([0.7, 0.3], seed=42)

In [None]:


pipelineModel = pipeline.fit(trainingData)


24/01/23 00:13:13 WARN util.Instrumentation: [be66ac22] regParam is zero, which might cause numerical instability and overfitting.
[Stage 19:>                                                         (0 + 8) / 8]24/01/23 00:13:15 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
24/01/23 00:13:15 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
24/01/23 00:13:16 WARN netlib.LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
24/01/23 00:13:16 WARN netlib.LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
                                                                                

#### Part 2 - Evaluation


In [30]:
print("Part 2 - Evaluation")
print("Total rows = ", rowcount4)
ps = [str(x).split("_")[0] for x in pipeline.getStages()]

print("Pipeline Stage 1 = ", ps[0])
print("Pipeline Stage 2 = ", ps[1])
print("Pipeline Stage 3 = ", ps[2])

print("Label column = ", lr.getLabelCol())

Part 2 - Evaluation
Total rows =  1499
Pipeline Stage 1 =  VectorAssembler
Pipeline Stage 2 =  StandardScaler
Pipeline Stage 3 =  LinearRegression
Label column =  SoundLevelDecibels


## Part 3 - Evaluate the Model


In [32]:

predictions = pipelineModel.transform(testingData)


In [34]:

from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="SoundLevelDecibels", metricName="mse")
mse = evaluator.evaluate(predictions)
print(mse)

[Stage 26:>                                                         (0 + 8) / 8]

22.5937540713487


                                                                                

### Task 3 - Print the MAE


In [35]:


evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="SoundLevelDecibels", metricName="mae")
mae = evaluator.evaluate(predictions)
print(mae)



3.7336902294630927


                                                                                

In [36]:


evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="SoundLevelDecibels", metricName="r2")
r2 = evaluator.evaluate(predictions)
print(r2)

[Stage 30:>                                                         (0 + 8) / 8]

0.542601650868908


                                                                                

#### Part 3 - Evaluation


In [37]:
print("Part 3 - Evaluation")

print("Mean Squared Error = ", round(mse,2))
print("Mean Absolute Error = ", round(mae,2))
print("R Squared = ", round(r2,2))

lrModel = pipelineModel.stages[-1]

print("Intercept = ", round(lrModel.intercept,2))


Part 3 - Evaluation
Mean Squared Error =  22.59
Mean Absolute Error =  3.73
R Squared =  0.54
Intercept =  132.6


## Part 4 - Persist the Model


In [39]:

pipelineModel.write().save("Practice")


                                                                                

In [40]:
# Load the pipeline model you have created in the previous step
loadedPipelineModel = PipelineModel.load("Practice")


                                                                                

In [41]:
print("Part 4 - Evaluation")

loadedmodel = loadedPipelineModel.stages[-1]
totalstages = len(loadedPipelineModel.stages)
inputcolumns = loadedPipelineModel.stages[0].getInputCols()

print("Number of stages in the pipeline = ", totalstages)
for i,j in zip(inputcolumns, loadedmodel.coefficients):
    print(f"Coefficient for {i} is {round(j,4)}")

Part 4 - Evaluation
Number of stages in the pipeline =  3
Coefficient for Frequency is -0.0013
Coefficient for AngleOfAttack is -0.4111
Coefficient for ChordLength is -36.1575
Coefficient for FreeStreamVelocity is 0.1019
Coefficient for SuctionSideDisplacement is -135.2067


In [42]:
spark.stop()