In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder \
    .appName("MyApp") \
    .getOrCreate()

In [None]:
df = spark.read.csv("NASA_airfoil_noise_raw.csv", header=True, inferSchema=True)

In [None]:
df.show(5)

In [None]:
rowcount1 = df.count()
print(rowcount1)

In [None]:
df = df.dropDuplicates()


In [None]:
rowcount2 = df.count()
print(rowcount2)

In [None]:
df = df.dropna()

In [None]:
rowcount3 = df.count()
print(rowcount3)

In [None]:
df = df.withColumnRenamed("SoundLevel", "SoundLevelDecibels")


In [None]:
df.write.mode("overwrite").parquet("NASA_airfoil_noise_cleaned.parquet")

In [None]:
print("Part 1 - Evaluation")

print("Total rows = ", rowcount1)
print("Total rows after dropping duplicate rows = ", rowcount2)
print("Total rows after dropping duplicate rows and rows with null values = ", rowcount3)
print("New column name = ", df.columns[-1])

import os

print("NASA_airfoil_noise_cleaned.parquet exists :", os.path.isdir("NASA_airfoil_noise_cleaned.parquet"))

In [None]:
df = spark.read.parquet("NASA_airfoil_noise_cleaned.parquet")


In [None]:
from pyspark.ml.feature import VectorAssembler

# List of input columns (exclude the target column)
input_cols = [col for col in df.columns if col != "SoundLevelDecibels"]

# Create the assembler
assembler = VectorAssembler(inputCols=input_cols, outputCol="features")

# Transform the DataFrame
df = assembler.transform(df)

In [None]:
from pyspark.ml.feature import StandardScaler

# Create the scaler
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withMean=True, withStd=True)

# Fit and transform the data
scaler_model = scaler.fit(df)
df = scaler_model.transform(df)

In [None]:
from pyspark.ml.regression import LinearRegression

# Create the LinearRegression stage
lr = LinearRegression(featuresCol="scaledFeatures", labelCol="SoundLevelDecibels")

In [None]:
from pyspark.ml import Pipeline


pipeline = Pipeline(stages=[assembler, scaler, lr])

In [None]:
(trainingData, testingData) = df.randomSplit([0.7, 0.3], seed=42)


In [None]:
pipelineModel = pipeline.fit(trainingData)


In [None]:
print("Part 2 - Evaluation")
ps = [str(x).split("_")[0] for x in pipeline.getStages()]

print("Pipeline Stage 1 = ", ps[0])
print("Pipeline Stage 2 = ", ps[1])
print("Pipeline Stage 3 = ", ps[2])

print("Label column = ", lr.getLabelCol())

In [None]:
predictions = pipelineModel.transform(testingData)


In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

# Create evaluator for MSE
evaluator = RegressionEvaluator(
    labelCol="SoundLevelDecibels", 
    predictionCol="prediction", 
    metricName="mse"
)

# Calculate MSE
mse = evaluator.evaluate(predictions)
print(mse)

In [None]:

# Create evaluator for MAE
evaluator_mae = RegressionEvaluator(
    labelCol="SoundLevelDecibels",
    predictionCol="prediction",
    metricName="mae"
)

# Calculate MAE
mae = evaluator_mae.evaluate(predictions)
print(mae)

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

# Create evaluator for R2
evaluator_r2 = RegressionEvaluator(
    labelCol="SoundLevelDecibels",
    predictionCol="prediction",
    metricName="r2"
)

# Calculate R2
r2 = evaluator_r2.evaluate(predictions)
print(r2)

In [None]:
print("Part 3 - Evaluation")

print("Mean Squared Error = ", round(mse,2))
print("Mean Absolute Error = ", round(mae,2))
print("R Squared = ", round(r2,2))

lrModel = pipelineModel.stages[-1]

print("Intercept = ", round(lrModel.intercept,2))

In [None]:
pipelineModel.write().overwrite().save("Final_Project")


In [None]:
from pyspark.ml import PipelineModel

# your code goes here
loadedPipelineModel = PipelineModel.load("Final_Project")

In [None]:
predictions = loadedPipelineModel.transform(testingData)


In [None]:
predictions.select("SoundLevelDecibels", "prediction").show(5)