# Proyecto Final Paralelo:Sistema de Procesamiento Paralelo de Datos Ambientales en Tiempo Real para el Análisis de la Calidad del Aire

## Instalación de Dependencias

In [None]:
pip install requests pandas datosgobmx bs4



## Importaciones

In [None]:
import requests
import re
import json
import pandas as pd
import json
from datetime import datetime, timedelta, timezone
import time
import multiprocessing as mp
import os

## Análisis con NOM-172-SEMARNAT

In [None]:
def calidad_co(co):
    if pd.isna(co):
        return None
    if co <= 5.5:
        return "Buena"
    elif co <= 11:
        return "Regular"
    else:
        return "Mala"

def calidad_o3(o3):
    if pd.isna(o3):
        return None
    if o3 <= 0.055:
        return "Buena"
    elif o3 <= 0.095:
        return "Regular"
    else:
        return "Mala"

def calidad_no2(no2):
    if pd.isna(no2):
        return None
    if no2 <= 0.10:
        return "Buena"
    elif no2 <= 0.21:
        return "Regular"
    else:
        return "Mala"

def calidad_nox(nox):
    if pd.isna(nox):
        return None
    if nox <= 0.10:
        return "Buena"
    elif nox <= 0.20:
        return "Regular"
    else:
        return "Mala"

def calidad_no(no):
    if pd.isna(no):
        return None
    if no <= 0.05:
        return "Buena"
    elif no <= 0.15:
        return "Regular"
    else:
        return "Mala"


## Extracción de los Datos mediante la SINAICA y OpenAQ

In [None]:
# -----------------------------
# 1. SINAICA / INECC
# -----------------------------
def get_sinaica_data(estId):
  url = f"https://sinaica.inecc.gob.mx/estacion.php?estId={estId}"
  html = requests.get(url).text
  pattern = r"conts\s*=\s*(\{.*?\});"
  match = re.search(pattern, html, re.DOTALL)

  dfs = []
  columnas_base = [
      "id", "parametro", "fecha", "hora",
      "valorAct", "siglas", "nombre",
      "descripcion", "tipoParametro", "activo"
  ]

  if match:
      conts_js = match.group(1)
      conts = json.loads(conts_js)
      for key in ["CO", "NO", "NO2", "NOx", "O3", "PM10", "PM2.5", "SO2"]:
          data = conts.get(key)

          if not isinstance(data, list):
              continue

          filas = []

          for row in data:
              if row is None:
                  # crear fila vacía con el parámetro correspondiente
                  filas.append({
                      "id": None,
                      "parametro": key,
                      "fecha": None,
                      "hora": None,
                      "valorAct": None,
                      "siglas": key,
                      "nombre": None,
                      "descripcion": None,
                      "tipoParametro": None,
                      "activo": None
                  })
              elif isinstance(row, dict):
                  filas.append(row)

          df = pd.DataFrame(filas, columns=columnas_base)
          dfs.append(df)

  pattern = r"meteo\s*=\s*(\{.*?\});"
  match = re.search(pattern, html, re.DOTALL)

  if match:
      meteo_js = match.group(1)
      meteo = json.loads(meteo_js)
      for key in ["DV", "HR", "TMP", "VV"]:
          data = meteo.get(key)

          if not isinstance(data, list):
              continue

          filas = []

          for row in data:
              if row is None:
                  # crear fila vacía con el parámetro correspondiente
                  filas.append({
                      "id": None,
                      "parametro": key,
                      "fecha": None,
                      "hora": None,
                      "valorAct": None,
                      "siglas": key,
                      "nombre": None,
                      "descripcion": None,
                      "tipoParametro": None,
                      "activo": None
                  })
              elif isinstance(row, dict):
                  filas.append(row)

          df = pd.DataFrame(filas, columns=columnas_base)
          dfs.append(df)

  df_total = pd.concat(dfs, ignore_index=True)


  df_wide = df_total.pivot_table(
      index=["fecha", "hora"],
      columns="parametro",
      values="valorAct"
  )
  return df_wide.tail(1)


# -----------------------------
# 2. OpenAQ
# -----------------------------
def get_openaq_data(estId, API_KEY):
    import requests
    import pandas as pd

    headers = {
        "X-API-Key": API_KEY
    }

    # Obtener sensores de la estación
    url = f"https://api.openaq.org/v3/locations/{estId}"
    response = requests.get(url, headers=headers)
    response.raise_for_status()

    data = response.json()

    dfs = []

    for sensor in data["results"][0]["sensors"]:
        SENSOR_ID = sensor["id"]

        url = f"https://api.openaq.org/v3/sensors/{SENSOR_ID}"
        response = requests.get(url, headers=headers)
        response.raise_for_status()

        sensor_data = response.json()

        df_sensor = pd.json_normalize(
            sensor_data["results"],
            sep="_"
        )

        df_sensor_clean = df_sensor[[
            "parameter_name",
            "latest_value",
            "latest_datetime_local",
        ]].rename(columns={
            "parameter_name": "parametro",
            "latest_value": "valorAct",
            "latest_datetime_local": "datetime_local",
        })

        dfs.append(df_sensor_clean)
        time.sleep(0.5)

    # Unir todos los sensores
    df_total = pd.concat(dfs, ignore_index=True)


    df_total["datetime_local"] = pd.to_datetime(df_total["datetime_local"])
    df_total["fecha"] = df_total["datetime_local"].dt.date
    df_total["hora"] = df_total["datetime_local"].dt.hour
    df_total["parametro"] = df_total["parametro"].str.upper()


    fecha_max = df_total["fecha"].max()
    hora_max = df_total[df_total["fecha"] == fecha_max]["hora"].max()

    df_ult = df_total[
        (df_total["fecha"] == fecha_max) &
        (df_total["hora"] == hora_max)
    ]


    # Pivotear IGUAL QUE SINAICA
    df_wide = df_ult.pivot_table(
        index=["fecha", "hora"],
        columns="parametro",
        values="valorAct"
    )
    return df_wide

# -----------------------------
# Evaluar Calidad Aire
# -----------------------------
def evaluar_calidad_aire(df):
    """
    Recibe un DataFrame con columnas:
    CO, O3, NO2, NOx, NO (si existen)

    Regresa el DataFrame con:
    - calidad_CO
    - calidad_O3
    - calidad_NO2
    - calidad_NOx
    - calidad_NO
    - calidad_global
    """

    df = df.copy()

    orden = {"Buena": 1, "Regular": 2, "Mala": 3}

    # ---- aplicar calidades por contaminante (solo si existen) ----
    if "CO" in df.columns:
        df["calidad_CO"] = df["CO"].apply(calidad_co)

    if "O3" in df.columns:
        df["calidad_O3"] = df["O3"].apply(calidad_o3)

    if "NO2" in df.columns:
        df["calidad_NO2"] = df["NO2"].apply(calidad_no2)

    if "NOx" in df.columns:
        df["calidad_NOx"] = df["NOx"].apply(calidad_nox)

    if "NO" in df.columns:
        df["calidad_NO"] = df["NO"].apply(calidad_no)

    # ---- calidad global (peor caso) ----
    def calidad_global(row):
        calidades = [
            row.get("calidad_CO"),
            row.get("calidad_O3"),
            row.get("calidad_NO2"),
            row.get("calidad_NOx"),
            row.get("calidad_NO"),
        ]
        calidades = [c for c in calidades if c is not None]
        if not calidades:
            return None
        return max(calidades, key=lambda x: orden[x])

    df["calidad_global"] = df.apply(calidad_global, axis=1)

    return df

## Versión Secuencial

In [None]:
# -----------------------------
# Ejecución principal
# -----------------------------
if __name__ == "__main__":
    print("Obteniendo datos de múltiples fuentes...\n")

    dict_sinaica_id_municipio = {
        "Ajusco Medio": 242,
        "Benito Juárez": 300,
        "Camarones": 244,
        "Centro de Ciencias de la Atmósfera": 245,
        "Cuajimalpa": 246,
        "Gustavo A. Madero": 302,
        "Hospital General de México": 251,
        "Merced": 256,
        "Miguel Hidalgo": 263,
        "Pedregal": 259,
        "Santiago Acahualtepec": 432,
        "UAM Iztapalapa": 268,
    }

    dict_openaq_id_municipio = {
        "Ajusco Medio": 480393,
        "Benito Juárez": 10860,
        "Camarones": 10722,
        "Centro de Ciencias de la Atmósfera": 10534,
        "Cuajimalpa": 223434,
        "Gustavo A. Madero": 10632,
        "Hospital General de México": 1134,
        "Merced": 10748,
        "Miguel Hidalgo": 10735,
        "Pedregal": 10658,
        "Santiago Acahualtepec": 10802,
        "UAM Iztapalapa": 10804,
    }

    resultados = []
    estaciones = list(dict_sinaica_id_municipio.keys())
    API_KEYS = [
        "934a33cfc397140c9f15c38a528f79384b5d180f31cb344dc217840f3a7cff93",
        "23a0b5d725eaf67585d9c4b3d8e1fcd02cd10c0ae4cbaf47cade3de6de27d0cc",
        "934a33cfc397140c9f15c38a528f79384b5d180f31cb344dc217840f3a7cff93",
        "23a0b5d725eaf67585d9c4b3d8e1fcd02cd10c0ae4cbaf47cade3de6de27d0cc",
    ]

    start_time = time.time()

    # Iterar sobre las estaciones
    for i, estacion in enumerate(estaciones):
        print(f"Procesando estación: {estacion}")
        api_key = API_KEYS[i % len(API_KEYS)]
        try:
            # IDs
            sinaica_id = dict_sinaica_id_municipio[estacion]
            openaq_id = dict_openaq_id_municipio[estacion]

            # --- SINAICA ---
            df_sinaica = get_sinaica_data(sinaica_id)
            df_sinaica = evaluar_calidad_aire(df_sinaica)
            df_sinaica["fuente"] = "SINAICA"
            df_sinaica["estacion"] = estacion

            # --- OpenAQ ---
            df_openaq = get_openaq_data(openaq_id, api_key)
            df_openaq = evaluar_calidad_aire(df_openaq)
            df_openaq["fuente"] = "OPENAQ"
            df_openaq["estacion"] = estacion

            # Guardar
            resultados.append(df_sinaica.reset_index())
            resultados.append(df_openaq.reset_index())

        except Exception as e:
            print(f"⚠️ Error en {estacion}: {e}")

    # Unir todo en un solo DataFrame
    df_resultados = pd.concat(resultados, ignore_index=True)

    end_time = time.time()
    elapsed_time = end_time - start_time
    print(f"\nTiempo total de ejecución: {elapsed_time:.2f} segundos")

    print("\nResumen final:")
    print(df_resultados[[
        "estacion", "fuente", "fecha", "hora", "calidad_global"
    ]])

Obteniendo datos de múltiples fuentes...

Procesando estación: Ajusco Medio
Procesando estación: Benito Juárez
Procesando estación: Camarones
Procesando estación: Centro de Ciencias de la Atmósfera
Procesando estación: Cuajimalpa
Procesando estación: Gustavo A. Madero
Procesando estación: Hospital General de México
Procesando estación: Merced
Procesando estación: Miguel Hidalgo
Procesando estación: Pedregal
Procesando estación: Santiago Acahualtepec
Procesando estación: UAM Iztapalapa

Tiempo total de ejecución: 70.62 segundos

Resumen final:
parametro                            estacion   fuente       fecha  hora  \
0                                Ajusco Medio  SINAICA  2025-12-16   9.0   
1                                Ajusco Medio   OPENAQ  2025-12-16   9.0   
2                               Benito Juárez  SINAICA  2025-12-16   9.0   
3                               Benito Juárez   OPENAQ  2025-12-16   9.0   
4                                   Camarones  SINAICA  2025-12-16   9.

## Versión En Paralelo

In [None]:
def worker(
    estaciones_chunk,
    dict_sinaica,
    dict_openaq,
    api_key,
    queue
):
    resultados_locales = []

    for estacion in estaciones_chunk:
        print(f"[PID {os.getpid()}] Procesando {estacion}")

        try:
            sinaica_id = dict_sinaica[estacion]
            openaq_id = dict_openaq[estacion]

            # --- SINAICA ---
            df_sinaica = get_sinaica_data(sinaica_id)
            df_sinaica = evaluar_calidad_aire(df_sinaica)
            df_sinaica["fuente"] = "SINAICA"
            df_sinaica["estacion"] = estacion

            # --- OpenAQ ---
            df_openaq = get_openaq_data(openaq_id, api_key)
            df_openaq = evaluar_calidad_aire(df_openaq)
            df_openaq["fuente"] = "OPENAQ"
            df_openaq["estacion"] = estacion

            resultados_locales.append(df_sinaica.reset_index())
            resultados_locales.append(df_openaq.reset_index())

            time.sleep(1)  # throttle ligero

        except Exception as e:
            print(f"Error en {estacion}: {e}")

    # enviar resultados al proceso principal
    queue.put(pd.concat(resultados_locales, ignore_index=True))

# -----------------------------
# Ejecución principal
# -----------------------------
if __name__ == "__main__":
    dict_sinaica_id_municipio = {
        "Ajusco Medio": 242,
        "Benito Juárez": 300,
        "Camarones": 244,
        "Centro de Ciencias de la Atmósfera": 245,
        "Cuajimalpa": 246,
        "Gustavo A. Madero": 302,
        "Hospital General de México": 251,
        "Merced": 256,
        "Miguel Hidalgo": 263,
        "Pedregal": 259,
        "Santiago Acahualtepec": 432,
        "UAM Iztapalapa": 268,
    }

    dict_openaq_id_municipio = {
        "Ajusco Medio": 480393,
        "Benito Juárez": 10860,
        "Camarones": 10722,
        "Centro de Ciencias de la Atmósfera": 10534,
        "Cuajimalpa": 223434,
        "Gustavo A. Madero": 10632,
        "Hospital General de México": 1134,
        "Merced": 10748,
        "Miguel Hidalgo": 10735,
        "Pedregal": 10658,
        "Santiago Acahualtepec": 10802,
        "UAM Iztapalapa": 10804,
    }

    print("Obteniendo datos en paralelo...\n")

    estaciones = list(dict_sinaica_id_municipio.keys())

    API_KEYS = [
        "934a33cfc397140c9f15c38a528f79384b5d180f31cb344dc217840f3a7cff93",
        "23a0b5d725eaf67585d9c4b3d8e1fcd02cd10c0ae4cbaf47cade3de6de27d0cc",
        "934a33cfc397140c9f15c38a528f79384b5d180f31cb344dc217840f3a7cff93",
        "23a0b5d725eaf67585d9c4b3d8e1fcd02cd10c0ae4cbaf47cade3de6de27d0cc",
    ]

    # dividir estaciones en 4 chunks
    chunks = [estaciones[i::4] for i in range(4)]

    queue = mp.Queue()
    procesos = []
    start_time = time.time()
    for i in range(4):
        p = mp.Process(
            target=worker,
            args=(
                chunks[i],
                dict_sinaica_id_municipio,
                dict_openaq_id_municipio,
                API_KEYS[i],
                queue
            )
        )
        procesos.append(p)
        p.start()

    # recolectar resultados
    resultados = []
    for _ in procesos:
        resultados.append(queue.get())

    for p in procesos:
        p.join()

    df_resultados = pd.concat(resultados, ignore_index=True)
    end_time = time.time()
    elapsed_time = end_time - start_time

    print(f"\nTiempo total de ejecución: {elapsed_time:.2f} segundos")

    print("\nResumen final:")
    print(df_resultados[[
        "estacion", "fuente", "fecha", "hora", "calidad_global"
    ]])


Obteniendo datos en paralelo...

[PID 13167] Procesando Ajusco Medio
[PID 13170] Procesando Benito Juárez[PID 13175] Procesando Camarones

[PID 13182] Procesando Centro de Ciencias de la Atmósfera
[PID 13170] Procesando Gustavo A. Madero
[PID 13182] Procesando Merced
[PID 13167] Procesando Cuajimalpa
[PID 13175] Procesando Hospital General de México
[PID 13170] Procesando Pedregal
[PID 13167] Procesando Miguel Hidalgo
[PID 13182] Procesando UAM Iztapalapa
[PID 13175] Procesando Santiago Acahualtepec

Tiempo total de ejecución: 20.29 segundos

Resumen final:
parametro                            estacion   fuente       fecha  hora  \
0                               Benito Juárez  SINAICA  2025-12-16   9.0   
1                               Benito Juárez   OPENAQ  2025-12-16   9.0   
2                           Gustavo A. Madero  SINAICA  2025-12-16   9.0   
3                           Gustavo A. Madero   OPENAQ  2025-12-16   9.0   
4                                    Pedregal  SINAICA  

Número de núcleos disponibles: 2
