In [None]:
#Trabajo 2 - DISEÑO E IMPLEMENTACIÓN DE UNA SOLUCIÓN ANALÍTICA BÁSICA EN UNA ARQUITECTURA BATCH EN AWS
# Por: Santiago Saldarriaga Saldarriaga

In [134]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp, regexp_replace, trim
from pyspark.sql.functions import col, to_timestamp
import re
from pyspark.sql.functions import avg, max, min, stddev, count, col
from pyspark.sql.functions import col, when
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import year, month, dayofmonth, hour
from pyspark.sql.functions import mean, stddev, min, max, count, col, year, month, dayofmonth, hour, lit
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [135]:
# Crear o recuperar la sesión Spark activa
spark = SparkSession.builder.appName("IoT_AirQuality_Trusted").getOrCreate()


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [136]:
# ------------------------------------------------------------
# 1. Leer CSV original desde la zona RAW
# ------------------------------------------------------------
df_raw = spark.read.option("header", True) \
                   .option("inferSchema", True) \
                   .csv("s3a://ssaldarridatalake2/proyecto1/raw/air_quality/IoT_Indoor_Air_Quality_Dataset.csv")

print("Datos cargados desde RAW")
df_raw.printSchema()
df_raw.show(5)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Datos cargados desde RAW
root
 |-- Timestamp: string (nullable = true)
 |-- Temperature (?C): double (nullable = true)
 |-- Humidity (%): double (nullable = true)
 |-- CO2 (ppm): double (nullable = true)
 |-- PM2.5 (?g/m?): double (nullable = true)
 |-- PM10 (?g/m?): double (nullable = true)
 |-- TVOC (ppb): double (nullable = true)
 |-- CO (ppm): double (nullable = true)
 |-- Light Intensity (lux): double (nullable = true)
 |-- Motion Detected: integer (nullable = true)
 |-- Occupancy Count: integer (nullable = true)
 |-- Ventilation Status: string (nullable = true)

+----------------+----------------+------------+---------+-------------+------------+----------+--------+---------------------+---------------+---------------+------------------+
|       Timestamp|Temperature (?C)|Humidity (%)|CO2 (ppm)|PM2.5 (?g/m?)|PM10 (?g/m?)|TVOC (ppb)|CO (ppm)|Light Intensity (lux)|Motion Detected|Occupancy Count|Ventilation Status|
+----------------+----------------+------------+---------+---------

In [137]:
# ------------------------------------------------------------
# 2. Limpieza básica y transformación
# ------------------------------------------------------------

# Renombrar columnas con caracteres especiales y pasar a minúscula
new_columns = []
for c in df_raw.columns:
    clean_name = re.sub(r'[^a-zA-Z0-9_]', '_', c)           # Reemplaza caracteres no alfanuméricos
    clean_name = re.sub(r'__+', '_', clean_name)            # Elimina múltiples guiones bajos consecutivos
    clean_name = clean_name.lower()                         # Convierte a minúscula
    new_columns.append((c, clean_name))

# Aplicar nombres limpios al dataframe
for old, new in new_columns:
    df_raw = df_raw.withColumnRenamed(old, new)

# Limpieza básica de datos
df_clean = df_raw.dropna()
df_clean = df_clean.withColumn("timestamp", to_timestamp(col("timestamp"), "yyyy-MM-dd HH:mm:ss"))  # Usa el nuevo nombre ya en minúscula
df_clean = df_clean.dropDuplicates()

# Mostrar nombres limpios de columnas
print("Nombres de columnas después de la limpieza:")
for c in df_clean.columns:
    print("-", c)

# Mostrar esquema para verificar tipos
df_clean.printSchema()



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Nombres de columnas despu?s de la limpieza:
- timestamp
- temperature_c_
- humidity_
- co2_ppm_
- pm2_5_g_m_
- pm10_g_m_
- tvoc_ppb_
- co_ppm_
- light_intensity_lux_
- motion_detected
- occupancy_count
- ventilation_status
root
 |-- timestamp: timestamp (nullable = true)
 |-- temperature_c_: double (nullable = true)
 |-- humidity_: double (nullable = true)
 |-- co2_ppm_: double (nullable = true)
 |-- pm2_5_g_m_: double (nullable = true)
 |-- pm10_g_m_: double (nullable = true)
 |-- tvoc_ppb_: double (nullable = true)
 |-- co_ppm_: double (nullable = true)
 |-- light_intensity_lux_: double (nullable = true)
 |-- motion_detected: integer (nullable = true)
 |-- occupancy_count: integer (nullable = true)
 |-- ventilation_status: string (nullable = true)

In [138]:
# ------------------------------------------------------------
# 3. Escribir el dataset limpio en formato Parquet (zona TRUSTED)
# ------------------------------------------------------------
output_path = "s3a://ssaldarridatalake2/proyecto1/trusted/iot_data/"
df_clean.write.mode("overwrite").parquet(output_path)

print(f" Datos almacenados en: {output_path}")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

 Datos almacenados en: s3a://ssaldarridatalake2/proyecto1/trusted/iot_data/

In [139]:
# ------------------------------------------------------------
# 4. Escribir el dataset en formato Parquet (zona REFINED)
# ------------------------------------------------------------
# Leer desde tabla trusted
df_trusted = spark.read.format("parquet").load("s3a://ssaldarridatalake2/proyecto1/trusted/iot_data/")

df_trusted.show(5)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+--------------+---------+--------+----------+---------+---------+-------+--------------------+---------------+---------------+------------------+
|timestamp|temperature_c_|humidity_|co2_ppm_|pm2_5_g_m_|pm10_g_m_|tvoc_ppb_|co_ppm_|light_intensity_lux_|motion_detected|occupancy_count|ventilation_status|
+---------+--------------+---------+--------+----------+---------+---------+-------+--------------------+---------------+---------------+------------------+
|     null|         25.71|    36.61|  572.38|     29.91|    21.82|   322.46|   2.58|              232.94|              0|             43|            Closed|
|     null|         24.55|    61.59|  427.46|      57.5|    72.98|   328.63|   3.66|              814.99|              1|             35|              Open|
|     null|         19.11|    35.82|  760.58|      8.34|    24.89|   191.98|    3.0|              364.12|              1|             41|              Open|
|     null|          24.5|     32.3|  726.39|     47.96|  

In [140]:
# Crear columnas temporales para análisis por tiempo
df_trusted = df_trusted.withColumn("year", year("timestamp")) \
                       .withColumn("month", month("timestamp")) \
                       .withColumn("day", dayofmonth("timestamp")) \
                       .withColumn("hour", hour("timestamp"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [141]:
# EDA 1: Promedios por hora
eda_hourly = df_trusted.groupBy("hour").agg(
    avg("temperature_c_").alias("avg_temp"),
    avg("humidity_").alias("avg_humidity"),
    avg("co2_ppm_").alias("avg_co2"),
    avg("pm2_5_g_m_").alias("avg_pm2_5"),
    avg("pm10_g_m_").alias("avg_pm10")
).withColumn("tipo_eda", lit("por_hora"))


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [142]:
# EDA 2: Promedios por estado de ventilación
eda_ventilation = df_trusted.groupBy("ventilation_status").agg(
    count("*").alias("total_registros"),
    mean("co2_ppm_").alias("co2_prom"),
    mean("temperature_c_").alias("temp_prom"),
    mean("humidity_").alias("humedad_prom"),
    stddev("co2_ppm_").alias("stddev_co2")
).withColumn("tipo_eda", lit("por_ventilacion"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [143]:
# Unir ambos resultados
eda_df = eda_hourly.unionByName(eda_ventilation, allowMissingColumns=True)

# Guardar en zona refined
output_path = "s3a://ssaldarridatalake2/proyecto1/refined/iot_eda/"
eda_df.write.mode("overwrite") \
      .format("parquet") \
      .option("path", output_path) \
      .saveAsTable("proyecto1db.iot_eda_enrich")

print("EDA enriquecido generado y almacenado en zona refined como tabla 'iot_eda'")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

EDA enriquecido generado y almacenado en zona refined como tabla 'iot_eda'

In [144]:
# Agrupar por estado de ventilación (por ejemplo)
eda_df = df_trusted.groupBy("ventilation_status").agg(
    count("*").alias("total_registros"),
    avg("temperature_c_").alias("temp_promedio"),
    avg("humidity_").alias("humedad_promedio"),
    avg("co2_ppm_").alias("co2_promedio"),
    max("pm2_5_g_m_").alias("pm2_5_max"),
    stddev("co_ppm_").alias("stddev_co"),
)

# Guardar en zona refined
output_path = "s3://ssaldarridatalake2/proyecto1/refined/iot_summary/"
# Registrar en Glue Data Catalog como tabla 'iot_summary'
eda_df.write.mode("overwrite") \
    .format("parquet") \
    .option("path", output_path) \
    .saveAsTable("proyecto1db.iot_summary")


print("EDA generado y almacenado en zona refined.")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

EDA generado y almacenado en zona refined.

In [148]:
# ------------------------------------------------------------
# 5. Entrenamiento y Evaluación de Modelo Predictivo (Ventilation_Status)
# ------------------------------------------------------------
# Leer los datos trusted
df = spark.read \
    .option("ignoreMissingFiles", "true") \
    .parquet("s3a://ssaldarridatalake2/proyecto1/trusted/iot_data/")

print(df)
# Preparar variable objetivo (label)
# Convertimos el estado de ventilación a valores numéricos (por ejemplo: "On" -> 1, "Off" -> 0)
df = df.withColumn("label", when(col("ventilation_status") == "On", 1).otherwise(0))

# Seleccionar variables numéricas para el modelo
feature_cols = [
    "temperature_c_",
    "humidity_",
    "co2_ppm_",
    "pm2_5_g_m_",
    "pm10_g_m_",
    "tvoc_ppb_",
    "co_ppm_",
    "light_intensity_lux_",
    "motion_detected",
    "occupancy_count"
]


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[timestamp: timestamp, temperature_c_: double, humidity_: double, co2_ppm_: double, pm2_5_g_m_: double, pm10_g_m_: double, tvoc_ppb_: double, co_ppm_: double, light_intensity_lux_: double, motion_detected: int, occupancy_count: int, ventilation_status: string]

In [149]:
# Ensamblar las características en un único vector
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Definir modelo (Random Forest)
rf = RandomForestClassifier(featuresCol="features", labelCol="label", predictionCol="prediction")

# Pipeline completo
pipeline = Pipeline(stages=[assembler, rf])

# Entrenar el modelo
model = pipeline.fit(df)

# Realizar predicciones
predictions = model.transform(df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [151]:
# ------------------------------------------------------------
# Guardar predicciones en zona refined y registrar tabla
# ------------------------------------------------------------

output_path = "s3a://ssaldarridatalake2/proyecto1/refined/iot_predictions/"

# Seleccionar columnas clave para análisis posterior
predictions.select(
    "timestamp",
    "ventilation_status",
    "prediction",
    "probability"
).write.mode("overwrite") \
 .format("parquet") \
 .option("path", output_path) \
 .saveAsTable("proyecto1db.iot_predictions")

print("Tabla iot_predictions registrada en Glue y almacenada en zona refined.")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Tabla iot_predictions registrada en Glue y almacenada en zona refined.