This notebook implements a machine learning pipeline to predict the sound level (in decibels) generated by airfoils based on various features. We will use PySpark for data processing and model building.

In [None]:
# Install PySpark version 3.1.2 and findspark using pip
!pip install pyspark==3.1.2 -q
!pip install findspark -q

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m212.4/212.4 MB[0m [31m3.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m198.6/198.6 kB[0m [31m11.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [None]:
# Import the findspark library to initialize Spark
import findspark
findspark.init()

In [None]:
import findspark
findspark.init()

#import functions/Classes for sparkml

from pyspark.ml.regression import LinearRegression
from pyspark.ml.classification import LogisticRegression

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import StringIndexer

from pyspark.sql import SparkSession


# import functions/Classes for pipeline creation

from pyspark.ml import Pipeline

# import functions/Classes for metrics
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
# Create a SparkSession object with the name "Airfoil ML pipeline"
spark = SparkSession.builder.appName("Airfoil ML pipeline").getOrCreate()

In [None]:
# Download the NASA airfoil noise dataset using wget
!wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/datasets/NASA_airfoil_noise_raw.csv

--2024-03-23 13:29:10--  https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/datasets/NASA_airfoil_noise_raw.csv
Resolving cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)... 169.63.118.104
Connecting to cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)|169.63.118.104|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 60682 (59K) [text/csv]
Saving to: ‘NASA_airfoil_noise_raw.csv’


2024-03-23 13:29:11 (1.56 MB/s) - ‘NASA_airfoil_noise_raw.csv’ saved [60682/60682]



In [None]:
# Load the data into a DataFrame
df = spark.read.csv("NASA_airfoil_noise_raw.csv", header=True, inferSchema=True)

In [None]:
df.show(5)

+---------+-------------+-----------+------------------+-----------------------+----------+
|Frequency|AngleOfAttack|ChordLength|FreeStreamVelocity|SuctionSideDisplacement|SoundLevel|
+---------+-------------+-----------+------------------+-----------------------+----------+
|      800|          0.0|     0.3048|              71.3|             0.00266337|   126.201|
|     1000|          0.0|     0.3048|              71.3|             0.00266337|   125.201|
|     1250|          0.0|     0.3048|              71.3|             0.00266337|   125.951|
|     1600|          0.0|     0.3048|              71.3|             0.00266337|   127.591|
|     2000|          0.0|     0.3048|              71.3|             0.00266337|   127.461|
+---------+-------------+-----------+------------------+-----------------------+----------+
only showing top 5 rows



In [None]:
# Display the total number of rows in the DataFrame
df.count()

1522

In [None]:
# Remove duplicate rows from the DataFrame
df = df.dropDuplicates()

In [None]:
# Count and display the number of rows after removing duplicates
df.count()

1503

In [None]:
# Remove rows with missing values from the DataFrame
df=df.dropna()

In [None]:
# Count and display the number of rows after removing rows with missing values
df.count()

1499

In [None]:
df = df.withColumnRenamed('SoundLevel', 'SoundLevelDecibels')

In [None]:
df.show(5)

+---------+-------------+-----------+------------------+-----------------------+------------------+
|Frequency|AngleOfAttack|ChordLength|FreeStreamVelocity|SuctionSideDisplacement|SoundLevelDecibels|
+---------+-------------+-----------+------------------+-----------------------+------------------+
|     4000|          3.0|     0.3048|              31.7|             0.00529514|           115.608|
|     3150|          2.0|     0.2286|              31.7|             0.00372371|           121.527|
|     2000|          7.3|     0.2286|              31.7|              0.0132672|           115.309|
|     2000|          5.4|     0.1524|              71.3|             0.00401199|           131.111|
|      500|          9.9|     0.1524|              71.3|              0.0193001|           131.279|
+---------+-------------+-----------+------------------+-----------------------+------------------+
only showing top 5 rows



In [None]:
# Write the cleaned data to a Parquet file
df.write.parquet("NASA_airfoil_noise_cleaned.parquet")

In [None]:
# Load the data from the Parquet file
df = spark.read.parquet("NASA_airfoil_noise_cleaned.parquet")

In [None]:
df.count()

1499

In [None]:
# Create a VectorAssembler to combine the feature columns into a single 'features' column
assembler = VectorAssembler(inputCols=['Frequency','AngleOfAttack','ChordLength','FreeStreamVelocity','SuctionSideDisplacement'], outputCol="features")

In [None]:
# Create a StandardScaler to scale the feature values
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")

In [None]:
# Create a Linear Regression model, specifying the 'scaledFeatures' as features and 'SoundLevelDecibels' as the label
lr = LinearRegression(featuresCol="scaledFeatures", labelCol="SoundLevelDecibels")

In [None]:
# Define a pipeline with the VectorAssembler, StandardScaler, and Linear Regression stages
pipeline = Pipeline(stages=[assembler, scaler, lr])

In [None]:
# Split the data into training and testing sets (70% training, 30% testing)
(trainingData, testingData) = df.randomSplit([0.7, 0.3], seed=42)

In [None]:
# Train the model using the training data
pipelineModel = pipeline.fit(trainingData)

In [None]:
# Make predictions on the testing data using the trained model
predictions = pipelineModel.transform(testingData)

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

# Evaluate the model using Mean Squared Error (MSE)
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="SoundLevelDecibels", metricName="mse")
mse = evaluator.evaluate(predictions)
print(mse)

26.57450359380139


In [None]:
# Evaluate the model using Mean Absolute Error (MAE)
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="SoundLevelDecibels", metricName="mae")
mae = evaluator.evaluate(predictions)
print(mae)

3.9923220160490525


In [None]:
# Evaluate the model using R-squared (R2)
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="SoundLevelDecibels", metricName="r2")
r2 = evaluator.evaluate(predictions)
print(r2)

0.4785118915245046


In [None]:
# Save the trained model to a directory called "Final_Project"
pipelineModel.write().save("Final_Project")

In [None]:
# Load the saved model from the "Final_Project" directory
loadedPipelineModel = pipelineModel.load("Final_Project")

In [None]:
# Make predictions on the testing data using the loaded model
predictions = loadedPipelineModel.transform(testingData)

In [None]:
# Display the actual and predicted values for the first 5 rows
predictions.select("SoundLevelDecibels","prediction").show(5)

+------------------+------------------+
|SoundLevelDecibels|        prediction|
+------------------+------------------+
|           128.545| 121.1759019027927|
|           130.898|122.47988450139373|
|           109.951|127.61142882533788|
|           112.506|129.16838160755117|
|           130.089|122.09809876229559|
+------------------+------------------+
only showing top 5 rows



In [None]:
print("Part 4 - Evaluation")

loadedmodel = loadedPipelineModel.stages[-1]
totalstages = len(loadedPipelineModel.stages)
inputcolumns = loadedPipelineModel.stages[0].getInputCols()

print("Number of stages in the pipeline = ", totalstages)
for i,j in zip(inputcolumns, loadedmodel.coefficients):
    print(f"Coefficient for {i} is {round(j,4)}")

Part 4 - Evaluation
Number of stages in the pipeline =  3
Coefficient for Frequency is -3.8025
Coefficient for AngleOfAttack is -2.1485
Coefficient for ChordLength is -3.3966
Coefficient for FreeStreamVelocity is 1.4629
Coefficient for SuctionSideDisplacement is -2.1833


In [None]:
# Stop the Spark session
spark.stop()

In [None]:
1522
1503
1498
SoundLevelDecibels
NASA_airfoil_noise_cleaned.parquet
1497
VectorAssembler
StandardScaler
LinearRegression
SoundLevelDecibels
22.593754071348812
3.7336902294631287
0.54
131
3
-3.9728
-2.4775
-3
1.5789
-1.6465