##Taller Evaluado 02 Y 03
####- Estudiante: Mario Alonso Vento Alvarado
**Objetivo:** Diseñar y ejecutar un pipeline en Databricks que ingeste, limpie, transforme, una y agregue datos relevantes para su industria elegida, persista resultados en Delta + Unity Catalog, y genere una tabla de auditoría con trazabilidad de la ejecución (origen, conteos y marcas de tiempo).

### 1. Ingesta y perfilado inicial.
- Lectura de 3–4 datasets (p. ej., clientes, transacciones, catálogo/productos, sucursales, etc.).
- Validar esquema, nulos, rangos de fechas, duplicados.

Busqué DataSets en la página [KAGGLE](https://www.kaggle.com/) y obtuve los **datasets del monitoreo de la calidad del aire en Seúl, Corea del Sur**, y proporciona un caso práctico para analizar los niveles de contaminación en diferentes distritos a lo largo del tiempo. El dataset principal, `Measurement_summary.csv`, contiene mediciones horarias de contaminantes clave como SO2, NO2, CO, O3, PM10 y PM2.5, recolectadas desde enero de 2017 hasta diciembre de 2018. Para enriquecer este análisis, se incluyen dos datasets de apoyo: `Measurement_station_info.csv`, que proporciona detalles geográficos y de ubicación de las 25 estaciones de monitoreo, y `Measurement_item_info.csv`, que describe los códigos de los ítems de medición y los umbrales de calidad del aire asociados a cada contaminante. En conjunto, estos archivos permiten un perfilado de datos integral y la ingesta de un modelo de datos relacional.

El enlace del DataSet en mención es: [Air Pollution in Seoul](https://www.kaggle.com/datasets/bappekim/air-pollution-in-seoul).

In [0]:
catalog = "dmc_taller02y03"
schema_name = "gold_airpollutionseoul"

spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{schema_name}")

In [0]:
path_mediciones = "/Volumes/dmc_taller02y03/gold_airpollutionseoul/source/csv/Measurement_info.csv"
path_unidades_medida = "/Volumes/dmc_taller02y03/gold_airpollutionseoul/source/csv/Measurement_item_info.csv"
path_estaciones = "/Volumes/dmc_taller02y03/gold_airpollutionseoul/source/csv/Measurement_station_info.csv"

df_mediciones = spark.read.option("header", True).option("inferSchema", True).csv(path_mediciones)
df_unidades_medida = spark.read.option("header", True).option("inferSchema", True).csv(path_unidades_medida)
df_estaciones = spark.read.option("header", True).option("inferSchema", True).csv(path_estaciones)

###2. Limpieza y normalización
- Eliminar/Corregir registros inválidos (nulos en claves, precios/cantidades negativas o cero, fechas inválidas, emails mal formados, etc.).
- Normalizar strings (trim, lower, regex si aplica) y cast de tipos.

In [0]:
from pyspark.sql.functions import col, when, lit, year, month, avg, countDistinct

In [0]:
from pyspark.sql import functions as F

In [0]:
#Hago un conteo de valores negativos en la columna Average value del df_mediciones
display(
    df_mediciones.filter(col("Average value") < 0).count()
)

In [0]:
from pyspark.sql.functions import lower, col, trim, when, coalesce, lit, round, to_date, year, month, sum, countDistinct, avg
from pyspark.sql.types import DoubleType

In [0]:
#Cambio los valores negativos a CERO.
df_mediciones_clean = df_mediciones.withColumn(
    "Average value",
    when(col("Average value") < 0, lit(0)).otherwise(col("Average value"))
)

In [0]:
display(
    df_mediciones_clean.filter(col("Average value") == 0)
)

###3. Transformaciones y columnas derivadas 
- Crear columnas como montos, categorías, anio, mes, banderas de calidad, etc. 

In [0]:
# Añado columnas Año y Mes de medición del df_mediciones_clean
df_aniomes_mediciones = (
    df_mediciones_clean
    .withColumn("anio_medicion", year(col("Measurement date")))
    .withColumn("mes_medicion", month(col("Measurement date")))
)

In [0]:
display(df_aniomes_mediciones)

###4. Uniones y agregaciones
- Realizar al menos un join entre datasets y dos agregaciones de valor de negocio (KPIs claros). 

In [0]:
# Join entre df_mediciones_clean, df_unidades_medida y df_estaciones
df_joined = (
    df_mediciones_clean.alias("m")
    .join(
        df_unidades_medida.alias("u"),
        col("m.Item code") == col("u.Item code"),
        "left"
    )
    .join(
        df_estaciones.alias("e"),
        col("m.Station code") == col("e.Station code"),
        "left"
    )
    .select(
        col("m.Measurement date").alias("fecha_medicion"),
        col("e.Station name(district)").alias("distrito_estacion"),
        col("u.Item name").alias("item_medido"),
        col("m.Average value").alias("valor_medido"),
        col("e.Latitude").alias("latitud_estacion")
    )
)

# KPI 1: Promedio de valor medido por tipo de medición
df_kpi_avg = (
    df_joined
    .groupBy("item_medido")
    .agg(
        avg("valor_medido").alias("promedio_valor_medido")
    )
)

# KPI 2: Total de mediciones por estación (usando latitud_estacion)
df_kpi_count = (
    df_joined
    .groupBy("latitud_estacion")
    .agg(
        countDistinct("fecha_medicion").alias("total_mediciones")
    )
)

In [0]:
display(df_joined)

In [0]:
display(df_kpi_avg)

In [0]:
display(df_kpi_count)

###5. Persistencia 
- Grabar tablas resultantes en Delta + Unity Catalog (modo overwrite para la primera entrega). 

In [0]:
tbl_joined = f"{catalog}.{schema_name}.mediciones_detalle"
tbl_kpi_avg = f"{catalog}.{schema_name}.mediciones_kpi_avg"
tbl_kpi_count = f"{catalog}.{schema_name}.mediciones_kpi_count"

In [0]:
df_joined.write.format("delta").mode("overwrite").saveAsTable(tbl_joined)

In [0]:
df_kpi_avg.write.format("delta").mode("overwrite").saveAsTable(tbl_kpi_avg)

In [0]:
df_kpi_count.write.format("delta").mode("overwrite").saveAsTable(tbl_kpi_count)

In [0]:
%sql
select * from dmc_taller02y03.gold_airpollutionseoul.mediciones_detalle

In [0]:
%sql
select * from dmc_taller02y03.gold_airpollutionseoul.mediciones_kpi_avg

In [0]:
%sql
select * from dmc_taller02y03.gold_airpollutionseoul.mediciones_kpi_count

###6. Auditoría (opcional) 
- Construir una tabla de auditoría que capture: origen(es) leídos, records_read, records_processed, tabla(s) destino, status, started_at, ended_at, duration_ms, run_id (opcional), extra_metadata (JSON opcional). 

In [0]:
from pyspark.sql.functions import current_timestamp, to_json, struct
import uuid
import time

In [0]:
# Auditoría: captura de tiempos y métricas
started_at = time.time()
started_at_ts = spark.sql("SELECT current_timestamp() as ts").collect()[0]['ts']

# Orígenes leídos
origenes_leidos = ["df_mediciones_clean", "df_unidades_medida", "df_estaciones"]
records_read = (
    df_mediciones_clean.count() +
    df_unidades_medida.count() +
    df_estaciones.count()
)

# Procesamiento principal
records_processed = df_joined.count()
tablas_destino = [tbl_joined, tbl_kpi_avg, tbl_kpi_count]
status = "SUCCESS"
run_id = str(uuid.uuid4())
extra_metadata = {
    "user": dbutils.notebook.entry_point.getDbutils().notebook().getContext().userName().get(),
    "notebook": dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get()
}

ended_at = time.time()
ended_at_ts = spark.sql("SELECT current_timestamp() as ts").collect()[0]['ts']
duration_ms = int((ended_at - started_at) * 1000)

# Construcción del DataFrame de auditoría
df_auditoria = spark.createDataFrame([{
    "origenes_leidos": str(origenes_leidos),
    "records_read": records_read,
    "records_processed": records_processed,
    "tablas_destino": str(tablas_destino),
    "status": status,
    "started_at": started_at_ts,
    "ended_at": ended_at_ts,
    "duration_ms": duration_ms,
    "run_id": run_id,
    "extra_metadata": str(extra_metadata)
}])

# Guardar la tabla de auditoría
tbl_auditoria = f"{catalog}.{schema_name}.auditoria_etl"
df_auditoria.write.format("delta").mode("append").saveAsTable(tbl_auditoria)

In [0]:
display(df_auditoria)