## Proyecto :  Construccion de un pipeline de ML  para la prediccion de ruido en  Airfoil 


## Escenario


Eres ingeniero de datos en una empresa de consultoría aeronáutica. Tu empresa se enorgullece de poder diseñar de manera eficiente perfiles aerodinámicos (Airfoil) para su uso en aviones y autos deportivos. Los científicos de datos de tu oficina necesitan trabajar con diferentes algoritmos y datos en diferentes formatos. Si bien son buenos en aprendizaje automático, cuentan contigo para poder realizar trabajos de ETL y crear pipelines de ML. En este proyecto,  se utilizará la versión modificada del conjunto de datos de ruido propio de perfiles aerodinámicos de la NASA. Se limpiará este conjunto de datos, eliminando las filas duplicadas y eliminando las filas con valores nulos. Crearás un pipeline de ML para crear un modelo que predecirá el nivel de sonido (SoundLevel)en función de todas las demás columnas. Evaluarás el modelo y, hacia el final, lo conservarás.



## Objetivos

El siguiente proyecto constara de las siguientes partes :

- Parte 1 Ejecutar el proceso de  ETL 
  - Cargar csv dataset
  - Remover duplicados si los hubiera
  - Eliminar  filas  nulas si las hubiera 
  - Hacer  transformaciones
  - Guardar  la  data limpia  en formato parquet 
- Parte 2 Pipeline de  Machine Learning 
  - Crear un Pipeline de  Machine Learning  para la predicción
- Parte 3 Evaluar el modelo
  - Usar diversas metricas para evaluar el modelo
- Parte 4 Conservar el modelo
  - Guardar el modelo para futuras predicciones
  - Cargar y verificar el modelo guardado


## Datasets

Se usara el siguiente dataset :

 - The original dataset can be found here NASA airfoil self noise dataset. https://archive.ics.uci.edu/dataset/291/airfoil+self+noise
 
 - This dataset is licensed under a Creative Commons Attribution 4.0 International (CC BY 4.0) license.


Diagram of an airfoil. - For informational purpose


![Airfoil with flow](https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/images/Airfoil_with_flow.png)


Diagram showing the Angle of attack. - For informational purpose


![Airfoil angle of attack](https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/images/Airfoil_angle_of_attack.jpg)


## Setup


Se usara la siguiente libreria:

*   [`PySpark`](https://spark.apache.org/docs/latest/api/python/index.html?utm_medium=Exinfluencer&utm_source=Exinfluencer&utm_content=000026UJ&utm_term=10006555&utm_id=NA-SkillsNetwork-Channel-SkillsNetworkCoursesIBMSkillsNetworkBD0231ENCoursera2789-2023-01-01) para conectarse al Spark Cluster


In [0]:
!pip install pyspark==3.1.2 -q
!pip install findspark -q

You should consider upgrading via the '/local_disk0/.ephemeral_nfs/envs/pythonEnv-a5c35526-b385-47bb-b364-f63cd2c35b91/bin/python -m pip install --upgrade pip' command.[0m
You should consider upgrading via the '/local_disk0/.ephemeral_nfs/envs/pythonEnv-a5c35526-b385-47bb-b364-f63cd2c35b91/bin/python -m pip install --upgrade pip' command.[0m


### Importando librerias requeridas



In [0]:
# You can also use this section to suppress warnings generated by your code:
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline

# FindSpark simplifies the process of using Apache Spark with Python

import findspark
findspark.init()

## Parte 1 - Proceso ETL


### Task 1 - Import required libraries


In [0]:
from pyspark.sql import SparkSession

### Task 2 - Create a spark session


In [0]:
#Creamos la SparkSession

spark = SparkSession.builder.appName('FinalProject').getOrCreate()

### Task 3 - Load the csv file into a dataframe


Download the data file.

NOTE : Please ensure you use the dataset below and not the original dataset mentioned above.


In [0]:
!wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/datasets/NASA_airfoil_noise_raw.csv


--2024-09-24 05:49:01--  https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/datasets/NASA_airfoil_noise_raw.csv
Resolving cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)... 169.63.118.104
Connecting to cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)|169.63.118.104|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 60682 (59K) [text/csv]
Saving to: ‘NASA_airfoil_noise_raw.csv’


2024-09-24 05:49:02 (633 KB/s) - ‘NASA_airfoil_noise_raw.csv’ saved [60682/60682]



Comprobamos si el archivo se descargo correctamente

In [0]:
%sh ls /databricks/driver/

NASA_airfoil_noise_raw.csv
azure
conf
eventlogs
ganglia
hadoop_accessed_config.lst
logs
metastore_db
preload_class.lst


Cargamos el dataset en un spark dataframe


In [0]:
# Load the dataset that you have downloaded in the previous task

df = spark.read.csv('file:/databricks/driver/NASA_airfoil_noise_raw.csv',header=True,inferSchema=True)


### Task 4 - Mostramos top 5 filas  del dataset


In [0]:
df.show(5)

+---------+-------------+-----------+------------------+-----------------------+----------+
|Frequency|AngleOfAttack|ChordLength|FreeStreamVelocity|SuctionSideDisplacement|SoundLevel|
+---------+-------------+-----------+------------------+-----------------------+----------+
|      800|          0.0|     0.3048|              71.3|             0.00266337|   126.201|
|     1000|          0.0|     0.3048|              71.3|             0.00266337|   125.201|
|     1250|          0.0|     0.3048|              71.3|             0.00266337|   125.951|
|     1600|          0.0|     0.3048|              71.3|             0.00266337|   127.591|
|     2000|          0.0|     0.3048|              71.3|             0.00266337|   127.461|
+---------+-------------+-----------+------------------+-----------------------+----------+
only showing top 5 rows



### Task 6 - Mostramos el numero total de filas del dataset


In [0]:
rowcount1 = df.count()
print(rowcount1)

1522


### Task 7 - Removemos las filas duplicadas


In [0]:
df = df.dropDuplicates()


### Task 8 - Mostramos nuevamente el nummero de filas del  dataset


In [0]:
rowcount2 = df.count()
print(rowcount2)


1503


### Task 9 -Removemos las filas con algun valor nulo

In [0]:
df = df.dropna()


### Task 10 - Numero total de filas


In [0]:
#your code goes here

rowcount3 = df.count()
print(rowcount3)


1499


### Task 11 - Renombramos la  columna  "SoundLevel" a "SoundLevelDecibels"


In [0]:
df = df.withColumnRenamed('SoundLevel','SoundLevelDecibels')


In [0]:
df.columns

Out[22]: ['Frequency',
 'AngleOfAttack',
 'ChordLength',
 'FreeStreamVelocity',
 'SuctionSideDisplacement',
 'SoundLevelDecibels']

### Task 12 - Guardamos el dataset en un formato parquet, lo nombraremos: "NASA_airfoil_noise_cleaned.parquet"


In [0]:
df.write.mode('overwrite').parquet('file:/databricks/driver/NASA_airfoil_noise_cleaned.parquet')



In [0]:
display(dbutils.fs.ls("file:/databricks/driver"))

path,name,size,modificationTime
file:/databricks/driver/preload_class.lst,preload_class.lst,1306936,1727145190867
file:/databricks/driver/hadoop_accessed_config.lst,hadoop_accessed_config.lst,2755,1727145190867
file:/databricks/driver/azure/,azure/,4096,1727145190867
file:/databricks/driver/conf/,conf/,4096,1727145189427
file:/databricks/driver/logs/,logs/,4096,1727157917156
file:/databricks/driver/eventlogs/,eventlogs/,4096,1727154028931
file:/databricks/driver/metastore_db/,metastore_db/,4096,1727154351732
file:/databricks/driver/NASA_airfoil_noise_raw.csv,NASA_airfoil_noise_raw.csv,60682,1686466677000
file:/databricks/driver/NASA_airfoil_noise_cleaned.parquet/,NASA_airfoil_noise_cleaned.parquet/,4096,1727158209560
file:/databricks/driver/ganglia/,ganglia/,4096,1727157612043


#### Part 1 - Resumen


In [0]:
print("Part 1 - Resumen")

print("Total rows = ", rowcount1)
print("Total rows after dropping duplicate rows = ", rowcount2)
print("Total rows after dropping duplicate rows and rows with null values = ", rowcount3)
print("New column name = ", df.columns[-1])

import os

print("NASA_airfoil_noise_cleaned.parquet exists :", os.path.isdir("NASA_airfoil_noise_cleaned.parquet"))

Part 1 - Resumen
Total rows =  1522
Total rows after dropping duplicate rows =  1503
Total rows after dropping duplicate rows and rows with null values =  1499
New column name =  SoundLevelDecibels
NASA_airfoil_noise_cleaned.parquet exists : True


## Part - 2 Creación de un  Machine Learning Pipeline


### Task 1 - Cargamos la data de "NASA_airfoil_noise_cleaned.parquet" en un dataframe


In [0]:
df2 = spark.read.parquet('file:/databricks/driver/NASA_airfoil_noise_cleaned.parquet')


### Task 2 - Numero de filas


In [0]:
rowcount4 = df2.count()
print(rowcount4)



1499


### Task 3 - Definimos la etapa del  VectorAssembler del pipeline 


Stage 1 - Ensamblamos las  columnas input  en una sola  columna "features". Usaremos todas las columnas excepto SoundLevelDecibels como caracteristicas input .


In [0]:
df2.drop('SoundLevelDecibels').columns

Out[42]: ['Frequency',
 'AngleOfAttack',
 'ChordLength',
 'FreeStreamVelocity',
 'SuctionSideDisplacement']

In [0]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=df2.drop('SoundLevelDecibels').columns,outputCol='features')



### Task 4 - Definimos la etapa del StandardScaler del pipeline 


Stage 2 - Normalizar  "features" usando standard scaler y  guardandolo en la columna "scaledFeatures" 


In [0]:
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol='features',outputCol='scaledFeatures')


### Task 5 - Definimos la etapa de Model del pipeline


Stage 3 - Usaremos regresion lineal  para predecir "SoundLevelDecibels"

**Nota: Usaremos scaledfeatures obtenidos del paso anterior (etapa StandardScaler del pipeline ).**


In [0]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol="scaledFeatures", labelCol="SoundLevelDecibels")


### Task 6 - Construyendo el pipeline pipeline


Contruimos el  pipeline usando las tres etapas

In [0]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[assembler,scaler,lr])

### Task 7 - Dividiendo los datos


In [0]:
(trainingData, testingData) = df2.randomSplit([0.7,0.3],seed=42)



### Task 8 - Ajustamos el pipeline


In [0]:
pipelineModel = pipeline.fit(trainingData)

#### Part 2 - Resumen 


In [0]:
for i in pipeline.getStages():
    print (i)

VectorAssembler_9f8692b86cd3
StandardScaler_7558d53d0c18
LinearRegression_bc7bd7efa9c5


In [0]:
print("Part 2 - Resumen")
print("Total rows = ", rowcount4)
ps = [str(x).split("_")[0] for x in pipeline.getStages()]

print("Pipeline Stage 1 = ", ps[0])
print("Pipeline Stage 2 = ", ps[1])
print("Pipeline Stage 3 = ", ps[2])

print("Label column = ", lr.getLabelCol())

Part 2 - Resumen
Total rows =  1499
Pipeline Stage 1 =  VectorAssembler
Pipeline Stage 2 =  StandardScaler
Pipeline Stage 3 =  LinearRegression
Label column =  SoundLevelDecibels


## Part 3 - Evaluando el modelo


### Task 1 - Predecimos usando el modelo


In [0]:
predictions = pipelineModel.transform(testingData)


In [0]:
predictions.show(5)

+---------+-------------+-----------+------------------+-----------------------+------------------+--------------------+--------------------+------------------+
|Frequency|AngleOfAttack|ChordLength|FreeStreamVelocity|SuctionSideDisplacement|SoundLevelDecibels|            features|      scaledFeatures|        prediction|
+---------+-------------+-----------+------------------+-----------------------+------------------+--------------------+--------------------+------------------+
|      200|          7.3|     0.2286|              31.7|              0.0132672|           128.679|[200.0,7.3,0.2286...|[0.06359239499674...| 122.5972291437678|
|      200|          8.9|     0.1016|              39.6|              0.0124596|            133.42|[200.0,8.9,0.1016...|[0.06359239499674...|127.37968204568845|
|      200|          9.5|     0.0254|              31.7|             0.00461377|           119.146|[200.0,9.5,0.0254...|[0.06359239499674...|130.34077425074514|
|      200|          9.5|     0.02

### Task 2 - Mostramos el MSE


In [0]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluatorMSE=RegressionEvaluator(labelCol='SoundLevelDecibels',predictionCol='prediction',metricName='mse')

mse = evaluatorMSE.evaluate(predictions)
print(mse)


24.997666255024164


### Task 3 - Mostramos el MAE


In [0]:
evaluatorMAE=RegressionEvaluator(labelCol='SoundLevelDecibels',predictionCol='prediction',metricName='mae')
mae = evaluatorMAE.evaluate(predictions)
print(mae)


3.913679095881195


### Task 4 - Mostramos R-Cuadrado(R2)


In [0]:
evaluatorR2=RegressionEvaluator(labelCol='SoundLevelDecibels',predictionCol='prediction',metricName='r2')
r2 = evaluatorR2.evaluate(predictions)
print(r2)


0.4959688408974626


#### Part 3 - Resumen


In [0]:
print("Part 3 - Evaluation")

print("Mean Squared Error = ", round(mse,2))
print("Mean Absolute Error = ", round(mae,2))
print("R Squared = ", round(r2,2))

lrModel = pipelineModel.stages[-1]

print("Intercept = ", round(lrModel.intercept,2))


Part 3 - Evaluation
Mean Squared Error =  25.0
Mean Absolute Error =  3.91
R Squared =  0.5
Intercept =  132.88


## Part 4 - Convervando el  Modelo


### Task 1 - Guardo el model en la ruta "Final_Project"


In [0]:
!mkdir /databricks/driver/Final_Project
pipelineModel.write().overwrite().save('/databricks/driver/Final_Project')


mkdir: cannot create directory ‘/databricks/driver/Final_Project’: File exists


In [0]:
%sh ls /databricks/driver/Final_Project/

### Task 2 - Cargamos el modelo de la ruta "Final_Project"


In [0]:
from pyspark.ml.pipeline import PipelineModel
loadedPipelineModel = PipelineModel.load('/databricks/driver/Final_Project')


### Task 3 - Hacemos predicciones usando  el  modelo cargado  en testdata


In [0]:
predictions2= loadedPipelineModel.transform(testingData)


### Task 4 - Mostramos las  predicciones


In [0]:

predictions2.select(['SoundLevelDecibels','prediction']).show(5)


+------------------+------------------+
|SoundLevelDecibels|        prediction|
+------------------+------------------+
|           128.679| 122.5972291437678|
|            133.42|127.37968204568845|
|           119.146|130.34077425074514|
|           116.074|131.11016975113546|
|           134.319|  127.126273601251|
+------------------+------------------+
only showing top 5 rows



#### Part 4 - Resumen


In [0]:
print("Part 4 - Resumen")

loadedmodel = loadedPipelineModel.stages[-1]
totalstages = len(loadedPipelineModel.stages)
inputcolumns = loadedPipelineModel.stages[0].getInputCols()

print("Number of stages in the pipeline = ", totalstages)
for i,j in zip(inputcolumns, loadedmodel.coefficients):
    print(f"Coefficient for {i} is {round(j,4)}")

Part 4 - Resumen
Number of stages in the pipeline =  3
Coefficient for Frequency is -3.9906
Coefficient for AngleOfAttack is -2.2881
Coefficient for ChordLength is -3.3269
Coefficient for FreeStreamVelocity is 1.4832
Coefficient for SuctionSideDisplacement is -2.0551


### Detenemos la Spark Session


In [0]:
spark.stop()

## Autor


[Gerson Roberto Salazar Boslanga](https://www.linkedin.com/in/gerson-salazar-89196230a/)
