# Import library

In [None]:
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')

import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.pipeline import PipelineModel
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import StandardScaler

# Create spark session

In [None]:
spark = (
    SparkSession
    .builder
    .appName("Project")
    .getOrCreate()
)

# Data Prep

In [None]:
!wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/datasets/NASA_airfoil_noise_raw.csv

In [None]:
df = spark.read.csv("NASA_airfoil_noise_raw.csv", header=True, inferSchema=True)

In [None]:
df.head(5)

In [None]:
rowcount1 = df.count()
print(rowcount1)

In [None]:
# drop duplicates
df = df.dropDuplicates()
rowcount2 = df.count()
print(rowcount2)

In [None]:
# dropna
df=df.dropna()
rowcount3 = df.count()
print(rowcount3)

In [None]:
# rename
df = df.withColumnRenamed("SoundLevel","SoundLevelDecibels")
df.show()

In [None]:
# save
df.write.mode("overwrite").parquet("NASA_airfoil_noise_cleaned.parquet")

In [None]:
# evaludation
print("Part 1 - Evaluation")

print("Total rows = ", rowcount1)
print("Total rows after dropping duplicate rows = ", rowcount2)
print("Total rows after dropping duplicate rows and rows with null values = ", rowcount3)
print("New column name = ", df.columns[-1])

import os

print("NASA_airfoil_noise_cleaned.parquet exists :", os.path.isdir("NASA_airfoil_noise_cleaned.parquet"))

# Pipeline

In [None]:
df = spark.read.parquet("NASA_airfoil_noise_cleaned.parquet")
rowcount4 = df.count()
print(rowcount4)

In [None]:
# pipeline1
assembler = VectorAssembler(
    inputCols=[
        'Frequency','AngleOfAttack','ChordLength',
        'FreeStreamVelocity','SuctionSideDisplacement'], 
    outputCol="features")

# pipeline2
scaler = StandardScaler(
    inputCol="features", 
    outputCol="scaledFeatures")

# pipeline3
lr = LinearRegression(
    featuresCol="scaledFeatures", 
    labelCol="SoundLevelDecibels")

In [None]:
# pipeline definition
pipeline = Pipeline(
    stages=[assembler, scaler, lr])

In [None]:
# split data
(trainingData, testingData) = df.randomSplit([0.7, 0.3], seed=42)

In [None]:
# fit pipeline
pipelineModel = pipeline.fit(trainingData)