<a href="https://colab.research.google.com/github/currencyfxjle/PySpark_Models_Evaluation/blob/main/PySpark_ML_JoseLizarraga_1_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**PySpark Machine Learning**

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m1.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=2f9f3b8f32952945980a2b8a3482ed9c83693be2354cbfd7656b9719c731d2b4
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


**Importes PySpark**

In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StandardScaler, Binarizer
from pyspark.ml.regression import LinearRegression
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import RegressionEvaluator, BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType

# Inicia una sesión de Spark
spark = SparkSession.builder.appName("PollutionAnalysisEnhanced").getOrCreate()

# Carga los datos
df = spark.read.csv("/content/MonterreyPollutionData.csv", header=True, inferSchema=True)

**Exclusión de Contaminantes como Variables Independientes:** Las variables CO, NO, NO2, NOX, O3, PM10, y PM2_5 se consideran contaminantes, por lo que los excluiremos de nuestras variables independientes. Nos centraremos en PRS (presión atmosférica), RAINF (lluvia), RH (humedad relativa), SR (radiación solar), TOUT (temperatura exterior), WSR (velocidad del viento), y WDV (dirección del viento).

In [2]:
from pyspark.sql.functions import col

# Asumiendo que 'df' ya está definido y seleccionado correctamente

# Lista de variables no contaminantes más la variable objetivo
featureCols = ['PRS', 'RAINF', 'RH', 'SR', 'TOUT', 'WSR', 'WDV', 'PM10']

# Calcula y muestra la correlación de Pearson para cada par de variables
for i in range(len(featureCols)):
    for j in range(i+1, len(featureCols)):
        correlation = df.stat.corr(featureCols[i], featureCols[j])
        print(f"Correlation between {featureCols[i]} and {featureCols[j]}: {correlation}")

Correlation between PRS and RAINF: 0.005096307074890743
Correlation between PRS and RH: 0.12850916067491472
Correlation between PRS and SR: -0.14052127985403953
Correlation between PRS and TOUT: -0.6611628057429934
Correlation between PRS and WSR: -0.2160316856488538
Correlation between PRS and WDV: 0.13427310751210747
Correlation between PRS and PM10: -0.07659701332290413
Correlation between RAINF and RH: 0.06550695949155035
Correlation between RAINF and SR: -0.0359759430878159
Correlation between RAINF and TOUT: -0.029789121239439297
Correlation between RAINF and WSR: -0.015194824899400989
Correlation between RAINF and WDV: 0.003243082223413366
Correlation between RAINF and PM10: -0.02553238816521358
Correlation between RH and SR: -0.4599576161421162
Correlation between RH and TOUT: -0.5509104512638529
Correlation between RH and WSR: -0.4149172138477301
Correlation between RH and WDV: 0.06847707860565926
Correlation between RH and PM10: -0.10313336817644697
Correlation between SR and

**Modelo y Evaluacion de Datos**

**La binarización es una técnica de preprocesamiento utilizada para convertir** **datos numéricos en categorías binarias (0 o 1) basadas en un umbral.**

**Simplificación del Problema:** Convierte un problema de regresión en un problema de clasificación binaria. La regresión implica predecir un valor numérico continuo, mientras que la clasificación se centra en predecir a qué categoría pertenece una observación.

**Relevancia Práctica:** En muchos casos, especialmente en la salud pública o la política ambiental, es más relevante saber si los niveles de contaminación superan ciertos umbrales críticos que requieren acción o atención, en lugar de conocer el valor exacto de contaminación.

**Mejora de la Modelización:** Para algunos conjuntos de datos, especialmente aquellos con relaciones no lineales complejas o con valores extremos, la clasificación puede ser más robusta y ofrecer mejor rendimiento predictivo en comparación con la modelización de regresión.

**Evaluación de Modelos:** Permite utilizar métricas de evaluación específicas para clasificación, como la precisión, el recall, el F1 score y el AUC (Área Bajo la Curva ROC), que pueden proporcionar una visión más clara del rendimiento del modelo en tareas de clasificación que las métricas típicamente usadas en regresión.

**Experimento: Ajustando el Umbral de Binarización**

# Cambia el valor de threshold para experimentar
binarizer = Binarizer(threshold=20, inputCol="PM10", outputCol="label")

In [9]:
# from pyspark.sql import SparkSession
# from pyspark.ml.feature import VectorAssembler, StandardScaler, Binarizer
# from pyspark.ml.regression import LinearRegression
# from pyspark.ml.classification import RandomForestClassifier
# from pyspark.ml.evaluation import RegressionEvaluator, BinaryClassificationEvaluator, MulticlassClassificationEvaluator
# from pyspark.ml import Pipeline
# from pyspark.sql.functions import col
# from pyspark.sql.types import DoubleType

# Inicia una sesión de Spark
# spark = SparkSession.builder.appName("PollutionAnalysisEnhanced").getOrCreate()

# Carga los datos
# df = spark.read.csv("/content/MonterreyPollutionData.csv", header=True, inferSchema=True)

# Preprocesamiento
df = df.withColumnRenamed("PM2.5", "PM2_5").withColumn("PM10", col("PM10").cast(DoubleType()))

# Selecciona las variables relevantes y convierte PM10 a DoubleType
df = df.select(['TOUT', 'WSR', 'PM10'])

# Binariza PM10 para clasificación
binarizer = Binarizer(threshold=10, inputCol="PM10", outputCol="label")
df_bin = binarizer.transform(df)

# Features para ambos modelos
assembler = VectorAssembler(inputCols=['TOUT', 'WSR'], outputCol="features")
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=False)

# Regresión Lineal
lr = LinearRegression(featuresCol="scaledFeatures", labelCol="PM10")

# Clasificador Random Forest
rf = RandomForestClassifier(featuresCol="scaledFeatures", labelCol="label")

# Pipeline de Regresión
pipeline_reg = Pipeline(stages=[assembler, scaler, lr])

# Pipeline de Clasificación
pipeline_clf = Pipeline(stages=[assembler, scaler, rf])

# Divide los datos para regresión y clasificación
trainData_reg, testData_reg = df.randomSplit([0.7, 0.3], seed=42)
trainData_clf, testData_clf = df_bin.randomSplit([0.7, 0.3], seed=42)

# Entrenamiento y Predicción de Regresión
model_reg = pipeline_reg.fit(trainData_reg)
predictions_reg = model_reg.transform(testData_reg)

# Evaluación de Regresión
evaluator_rmse = RegressionEvaluator(labelCol="PM10", predictionCol="prediction", metricName="rmse")
evaluator_r2 = RegressionEvaluator(labelCol="PM10", predictionCol="prediction", metricName="r2")
print(f"RMSE: {evaluator_rmse.evaluate(predictions_reg)}")
print(f"R2: {evaluator_r2.evaluate(predictions_reg)}")

# Entrenamiento y Predicción de Clasificación
model_clf = pipeline_clf.fit(trainData_clf)
predictions_clf = model_clf.transform(testData_clf)

# Evaluación de Clasificación
evaluator_auc = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="label", metricName="areaUnderROC")
evaluator_f1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
print(f"AUC: {evaluator_auc.evaluate(predictions_clf)}")
print(f"F1 Score: {evaluator_f1.evaluate(predictions_clf)}")

RMSE: 47.28759130209708
R2: 0.04593140748951918
AUC: 0.9886466848319708
F1 Score: 0.9972809677615742


**Experimento SIN Binarizacion**

In [10]:
# Inicia una sesión de Spark
spark = SparkSession.builder.appName("CompletePollutionModeling").getOrCreate()

# Carga los datos
df = spark.read.csv("/content/MonterreyPollutionData.csv", header=True, inferSchema=True)

# Preprocesamiento
# Convierte PM10 a DoubleType si es necesario y renombra PM2.5 para evitar errores en la nomenclatura
df = df.withColumn("PM10", col("PM10").cast(DoubleType())).withColumnRenamed("PM2.5", "PM2_5")

# Selecciona las variables relevantes incluyendo la variable objetivo PM10
df = df.select(['PRS', 'RAINF', 'RH', 'SR', 'TOUT', 'WSR', 'WDV', 'PM10'])

# Configuración de las características para el modelado
# Aquí, asumimos que todas las variables excepto 'PM10' son las características de entrada
featureCols = ['PRS', 'RAINF', 'RH', 'SR', 'TOUT', 'WSR', 'WDV']

# VectorAssembler para combinar las columnas de características en una sola columna de vectores
assembler = VectorAssembler(inputCols=featureCols, outputCol="features")

# StandardScaler para normalizar las características
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=False)

# Modelo de Regresión Lineal
lr = LinearRegression(featuresCol="scaledFeatures", labelCol="PM10")

# Pipeline que incluye vectorización, escalado y regresión lineal
pipeline = Pipeline(stages=[assembler, scaler, lr])

# Divide los datos en conjuntos de entrenamiento y prueba
trainData, testData = df.randomSplit([0.7, 0.3], seed=42)

# Entrenamiento del modelo con los datos de entrenamiento
model = pipeline.fit(trainData)

# Predicciones con el conjunto de prueba
predictions = model.transform(testData)

# Evaluación del modelo con RMSE y R^2
evaluator_rmse = RegressionEvaluator(labelCol="PM10", predictionCol="prediction", metricName="rmse")
evaluator_r2 = RegressionEvaluator(labelCol="PM10", predictionCol="prediction", metricName="r2")

rmse = evaluator_rmse.evaluate(predictions)
r2 = evaluator_r2.evaluate(predictions)

print(f"RMSE: {rmse}")
print(f"R2: {r2}")

RMSE: 45.99305882153446
R2: 0.06768320896762958


**Resultados de Regresión**

**Sin Binarizar:** RMSE = 45.993, R^2 = 0.067
**Con Binarizar:** RMSE = 47.288, R^2 = 0.046

**Comparación de Regresión:**

El modelo de regresión que utilizó todas las variables disponibles (PRS, RAINF, RH, SR, TOUT, WSR, WDV) **sin binarización** obtuvo un RMSE ligeramente menor y un R^2 ligeramente mayor en comparación con el modelo que solo usó TOUT y WSR con binarización.

Esto sugiere que incluir una gama más amplia de variables no contaminantes puede ofrecer un modelo ligeramente más preciso y explicativo para predecir PM10, aunque la mejora es modesta. Esto indica que la complejidad del problema y la relación entre las variables independientes y PM10 puede no ser capturada completamente por los modelos lineales.

**EXPERIMENTOS ADICIONALES // NO LINEALES**

In [11]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType

# Suponiendo que 'spark' ya ha sido inicializado y 'df' cargado y preprocesado adecuadamente

# Preprocesamiento
df = df.withColumnRenamed("PM2.5", "PM2_5").withColumn("PM10", col("PM10").cast(DoubleType()))

# Selecciona las variables relevantes incluyendo la variable objetivo PM10
df = df.select(['PRS', 'RAINF', 'RH', 'SR', 'TOUT', 'WSR', 'WDV', 'PM10'])

# Configuración de las características para el modelado
featureCols = ['PRS', 'RAINF', 'RH', 'SR', 'TOUT', 'WSR', 'WDV']

# VectorAssembler para combinar las columnas de características en una sola columna de vectores
assembler = VectorAssembler(inputCols=featureCols, outputCol="features")

# StandardScaler para normalizar las características
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=False)

# Modelo de regresión no lineal: RandomForestRegressor
rfRegressor = RandomForestRegressor(featuresCol="scaledFeatures", labelCol="PM10")

# Pipeline que incluye vectorización, escalado y regresión con Random Forest
pipeline = Pipeline(stages=[assembler, scaler, rfRegressor])

# Divide los datos en conjuntos de entrenamiento y prueba
trainData, testData = df.randomSplit([0.7, 0.3], seed=42)

# Entrenamiento del modelo con los datos de entrenamiento
model = pipeline.fit(trainData)

# Predicciones con el conjunto de prueba
predictions = model.transform(testData)

# Evaluación del modelo con RMSE y R^2
evaluator_rmse = RegressionEvaluator(labelCol="PM10", predictionCol="prediction", metricName="rmse")
evaluator_r2 = RegressionEvaluator(labelCol="PM10", predictionCol="prediction", metricName="r2")

rmse = evaluator_rmse.evaluate(predictions)
r2 = evaluator_r2.evaluate(predictions)

print(f"RMSE: {rmse}")
print(f"R2: {r2}")


RMSE: 42.50114421001552
R2: 0.2038770147864758


In [12]:
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline

# Configurar el modelo Gradient-Boosted Trees
gbt = GBTRegressor(featuresCol="scaledFeatures", labelCol="PM10", maxIter=10)

# Pipeline que incluye vectorización, escalado y GBT
pipeline_gbt = Pipeline(stages=[assembler, scaler, gbt])

# Entrenamiento del modelo con los datos de entrenamiento
model_gbt = pipeline_gbt.fit(trainData)

# Predicciones con el conjunto de prueba
predictions_gbt = model_gbt.transform(testData)

# Evaluación del modelo GBT con RMSE y R^2
evaluator_rmse_gbt = RegressionEvaluator(labelCol="PM10", predictionCol="prediction", metricName="rmse")
evaluator_r2_gbt = RegressionEvaluator(labelCol="PM10", predictionCol="prediction", metricName="r2")

rmse_gbt = evaluator_rmse_gbt.evaluate(predictions_gbt)
r2_gbt = evaluator_r2_gbt.evaluate(predictions_gbt)

print(f"GBT RMSE: {rmse_gbt}")
print(f"GBT R2: {r2_gbt}")


GBT RMSE: 45.95979185845792
GBT R2: 0.06903141812330249
