In [2]:
# Suppressing warnings generated by your code
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')

# FindSpark simplifies the process of using Apache Spark with Python
import findspark
findspark.init()

# Importing necessary libraries from PySpark
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.pipeline import PipelineModel
from pyspark.ml.feature import VectorAssembler, StringIndexer, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
import requests

In [3]:
# Creating a SparkSession
spark = SparkSession.builder \
    .appName("Final Project") \
    .getOrCreate()


24/07/12 18:51:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
# URL of the dataset
url = 'https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/datasets/NASA_airfoil_noise_raw.csv'

# Downloading the dataset using requests
response = requests.get(url)

# Writing the content to a CSV file
with open('NASA_airfoil_noise_raw.csv', 'wb') as file:
    file.write(response.content)

# Load the dataset into a Spark DataFrame
df = spark.read.csv('NASA_airfoil_noise_raw.csv', header=True, inferSchema=True)

# Display the first few rows of the DataFrame
df.show(5)

+---------+-------------+-----------+------------------+-----------------------+----------+
|Frequency|AngleOfAttack|ChordLength|FreeStreamVelocity|SuctionSideDisplacement|SoundLevel|
+---------+-------------+-----------+------------------+-----------------------+----------+
|      800|          0.0|     0.3048|              71.3|             0.00266337|   126.201|
|     1000|          0.0|     0.3048|              71.3|             0.00266337|   125.201|
|     1250|          0.0|     0.3048|              71.3|             0.00266337|   125.951|
|     1600|          0.0|     0.3048|              71.3|             0.00266337|   127.591|
|     2000|          0.0|     0.3048|              71.3|             0.00266337|   127.461|
+---------+-------------+-----------+------------------+-----------------------+----------+
only showing top 5 rows



In [5]:
# Count the number of rows before dropping duplicates
rowcount1 = df.count()

# Drop duplicate rows
df = df.dropDuplicates()

# Count the number of rows after dropping duplicates
rowcount2 = df.count()
print(f"Row count after dropping duplicates: {rowcount2}")

# Drop rows with missing values
df = df.dropna()

# Count the number of rows after dropping missing values
rowcount3 = df.count()
print(f"Row count after dropping missing values: {rowcount3}")

Row count after dropping duplicates: 1503
Row count after dropping missing values: 1499


In [6]:
# Rename the column 'SoundLevel' to 'SoundLevelDecibels'
df = df.withColumnRenamed('SoundLevel', 'SoundLevelDecibels')

# Write the cleaned DataFrame to a Parquet file
df.write.mode('overwrite').parquet('NASA_airfoil_noise_cleaned.parquet')

24/07/12 19:05:21 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

In [7]:
# Print row counts at different stages of data cleaning
print("Total rows =", rowcount1)
print("Total rows after dropping duplicate rows =", rowcount2)
print("Total rows after dropping duplicate rows and rows with null values =", rowcount3)

# Print the new column name to verify the renaming
print("New column name =", df.columns[-1])

# Import os to check for the existence of the Parquet file
import os

# Check if the Parquet file exists
print("NASA_airfoil_noise_cleaned.parquet exists:", os.path.isdir("NASA_airfoil_noise_cleaned.parquet"))

Total rows = 1522
Total rows after dropping duplicate rows = 1503
Total rows after dropping duplicate rows and rows with null values = 1499
New column name = SoundLevelDecibels
NASA_airfoil_noise_cleaned.parquet exists: True


In [8]:
# Load the cleaned dataset from the Parquet file
df = spark.read.parquet('NASA_airfoil_noise_cleaned.parquet')

# Count the number of rows in the loaded DataFrame
rowcount4 = df.count()
print(f"Row count after loading Parquet file: {rowcount4}")

# Define the stages of the pipeline
assembler = VectorAssembler(
    inputCols=['Frequency', 'AngleOfAttack', 'ChordLength', 'FreeStreamVelocity', 'SuctionSideDisplacement'],
    outputCol='features'
)
scaler = StandardScaler(inputCol='features', outputCol='scaledFeatures')
lr = LinearRegression(featuresCol='features', labelCol='SoundLevelDecibels')

# Create a pipeline with the defined stages
pipeline = Pipeline(stages=[assembler, scaler, lr])

# Split the data into training and testing sets
(trainingData, testingData) = df.randomSplit([0.7, 0.3], seed=42)

# Fit the pipeline model on the training data
pipelineModel = pipeline.fit(trainingData)

Row count after loading Parquet file: 1499


24/07/12 19:08:34 WARN Instrumentation: [544119d6] regParam is zero, which might cause numerical instability and overfitting.
24/07/12 19:08:34 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
24/07/12 19:08:34 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
24/07/12 19:08:35 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
24/07/12 19:08:35 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK


In [9]:
# Transform the testing data using the fitted pipeline model
predictions = pipelineModel.transform(testingData)

# Define the evaluator with mean squared error (MSE) as the metric
evaluator = RegressionEvaluator(
    predictionCol='prediction', 
    labelCol='SoundLevelDecibels', 
    metricName='mse'
)

# Evaluate the predictions and calculate the MSE
mse = evaluator.evaluate(predictions)
print(f"Mean Squared Error (MSE): {mse}")

Mean Squared Error (MSE): 26.43369743925488


In [10]:
# Define the evaluator with mean absolute error (MAE) as the metric
evaluator = RegressionEvaluator(
    predictionCol='prediction', 
    labelCol='SoundLevelDecibels', 
    metricName='mae'
)

# Evaluate the predictions and calculate the MAE
mae = evaluator.evaluate(predictions)
print(f"Mean Absolute Error (MAE): {mae}")

# Define the evaluator with R-squared (R2) as the metric
evaluator = RegressionEvaluator(
    predictionCol='prediction', 
    labelCol='SoundLevelDecibels', 
    metricName='r2'
)

# Evaluate the predictions and calculate the R2 score
r2 = evaluator.evaluate(predictions)
print(f"R-squared (R2): {r2}")

# Retrieve the LinearRegression model from the pipeline
lrModel = pipelineModel.stages[-1]

# Print the intercept of the LinearRegression model
print(f"Intercept: {round(lrModel.intercept, 2)}")


Mean Absolute Error (MAE): 4.023274015123245
R-squared (R2): 0.46038354963983397
Intercept: 133.34


In [11]:
# Save the fitted pipeline model to a directory
pipelineModel.write().save("Final_Project")