# üßä Trabajo Pr√°ctico Integrador ‚Äì Parte 1  
## Extracci√≥n FULL e INCREMENTAL desde dos endpoints y creaci√≥n de la capa Bronze

En esta primera parte realizamos la ingesta de datos desde **dos endpoints** de la API **Open-Meteo**:

1. **Weather Forecast (`/v1/forecast`)** ‚Üí datos meteorol√≥gicos horarios  
2. **Air Quality (`/v1/air-quality`)** ‚Üí datos de calidad del aire

Toda la informaci√≥n extra√≠da se almacena en la **capa Bronze** de nuestro Data Lake en **MinIO (S3)**, utilizando **Delta Lake** como formato de almacenamiento.

---

## üîµ Extracci√≥n FULL  
Se realizan **dos extracciones FULL**, una por cada endpoint.  
Cada FULL descarga el dataset completo disponible para el d√≠a actual y reemplaza cualquier versi√≥n previa.

- Carpetas destino:  
  - `bronze/full/weather/`  
  - `bronze/full/air_quality/`  
- Modo de escritura: **overwrite**  
- Sin particiones  

Su objetivo es generar un **snapshot limpio y completo** de ambos datasets.

---

## üü¢ Extracci√≥n INCREMENTAL  
Tambi√©n se realizan **dos extracciones INCREMENTAL**, nuevamente una para cada endpoint.  
Estas descargas obtienen solo los datos correspondientes al d√≠a actual (`start_date = end_date = hoy`), simulando un flujo diario de actualizaci√≥n.

- Carpetas destino:  
  - `bronze/incremental/weather/`  
  - `bronze/incremental/air_quality/`  
- Modo de escritura: **append**  
- Particionado por columna **date**  

Las ingestas incrementales permiten construir historiales diarios sin sobrescribir la informaci√≥n previa.

---

## üìÇ Estructura generada en Bronze
```
bronze/
‚îú‚îÄ‚îÄ full/
‚îÇ ‚îú‚îÄ‚îÄ weather/
‚îÇ ‚îî‚îÄ‚îÄ air_quality/
‚îî‚îÄ‚îÄ incremental/
‚îú‚îÄ‚îÄ weather/date=2025-12-02/
‚îú‚îÄ‚îÄ weather/date=2025-12-03/
‚îú‚îÄ‚îÄ air_quality/date=2025-12-02/
‚îî‚îÄ‚îÄ air_quality/date=2025-12-03/
```

---

## üéØ Objetivo de la Parte 1

- Consumir **dos endpoints** de la API Open-Meteo.  
- Implementar **extracci√≥n FULL e INCREMENTAL** para ambos.  
- Almacenar los datos en MinIO utilizando **Delta Lake** seg√∫n buenas pr√°cticas de Data Lake.  
- Dejar lista la **capa Bronze** para su posterior procesamiento en la Parte 2 (capa Silver).  

---

## üîí Seguridad y manejo de credenciales
- Todas las credenciales de MinIO/S3 se gestionan mediante variables de entorno o un archivo `.env`.  
- No se exponen claves ni contrase√±as directamente en el c√≥digo.  
- Esto permite que el c√≥digo pueda compartirse o versionarse sin comprometer datos sensibles.


### üõ†Ô∏è Instalaci√≥n de librer√≠as

En esta celda instalamos todas las dependencias necesarias para el TP:

- **requests** ‚Üí para consumir la API Open-Meteo  
- **pandas** ‚Üí para manipular DataFrames  
- **pyarrow** ‚Üí soporte columnar para Delta Lake  
- **deltalake** ‚Üí para escribir y leer tablas Delta directamente en MinIO  
- **python-dotenv** ‚Üí para cargar variables de entorno desde un archivo `.env`  

> Esta instalaci√≥n debe ejecutarse **una sola vez** al iniciar el entorno.


In [1]:
!pip install requests pandas pyarrow deltalake python-dotenv --quiet
print("‚úî Librer√≠as instaladas correctamente")


‚úî Librer√≠as instaladas correctamente


### üîí Carga de variables de entorno

En esta celda:

- Se carga un archivo `.env` para mantener **seguras las credenciales** y rutas sensibles.
- Se asignan variables a Python desde el archivo `.env`.
- Esto permite que el c√≥digo del notebook no contenga claves hardcodeadas y sea m√°s seguro y portable.


In [3]:
# ===============================================================
# üîë Carga de variables de entorno
# ===============================================================

from dotenv import load_dotenv
import os

# Cargar archivo .env
load_dotenv()

# Asignar variables de entorno
AWS_ENDPOINT_URL = os.environ["AWS_ENDPOINT_URL"]
AWS_ACCESS_KEY_ID = os.environ["AWS_ACCESS_KEY_ID"]
AWS_SECRET_ACCESS_KEY = os.environ["AWS_SECRET_ACCESS_KEY"]
BRONZE_FULL = os.environ["BRONZE_FULL"]
BRONZE_INC = os.environ["BRONZE_INC"]
SILVER_WEATHER = os.environ["SILVER_WEATHER"]
SILVER_DAILY_AVG = os.environ["SILVER_DAILY_AVG"]

print("Variables de entorno cargadas correctamente")


Variables de entorno cargadas correctamente


### üì¶ Importaci√≥n de librer√≠as y configuraci√≥n del entorno

En esta celda:

- Importamos las librer√≠as necesarias para consumir la API y manipular los datos con **pandas**.  
- Configuramos **MinIO** como Data Lake utilizando `storage_options`, cargando credenciales desde variables de entorno para mantener seguridad y confidencialidad.  
- Opcionalmente, se pueden crear directorios locales para respaldo o inspecci√≥n manual de los datos.  

Esta preparaci√≥n deja el entorno listo para realizar extracciones y almacenar los datos en la capa **Bronze**, asegurando buenas pr√°cticas de manejo de credenciales y estructura del Data Lake.


In [4]:
# ===============================================================
# üìç CONFIGURACI√ìN INICIAL
# ===============================================================

# Librer√≠as necesarias
import requests
import pandas as pd
from datetime import date
from deltalake import write_deltalake
from dotenv import load_dotenv
import os

# Cargar archivo .env
load_dotenv()

# Configuraci√≥n MinIO (Data Lake S3) desde variables de entorno
storage_options = {
    "AWS_ENDPOINT_URL": os.environ["AWS_ENDPOINT_URL"],
    "AWS_ACCESS_KEY_ID": os.environ["AWS_ACCESS_KEY_ID"],
    "AWS_SECRET_ACCESS_KEY": os.environ["AWS_SECRET_ACCESS_KEY"],
    "AWS_ALLOW_HTTP": "true",
    "AWS_S3_ALLOW_UNSAFE_RENAME": "true",
    "aws_conditional_put": "etag"
}

# Variables meteorol√≥gicas a extraer
variables = [
    "temperature_2m", "relative_humidity_2m", "apparent_temperature",
    "precipitation", "weathercode", "pressure_msl", "cloudcover",
    "visibility", "windspeed_10m", "winddirection_10m"
]

# -----------------------------------------------------------------
# Configuraci√≥n para Air Quality
# -----------------------------------------------------------------

# Variables de calidad del aire (air-quality)
air_variables = [
    "pm10", "pm2_5", "carbon_monoxide", "nitrogen_dioxide",
    "sulphur_dioxide", "ozone"
]

# Rutas Bronze desde variables de entorno
bronze_full = os.environ["BRONZE_FULL"]
bronze_inc  = os.environ["BRONZE_INC"]

# Nuevas rutas espec√≠ficas para cada dataset
bronze_full_weather     = bronze_full + "weather/"
bronze_full_air_quality = bronze_full + "air_quality/"

bronze_inc_weather      = bronze_inc + "weather/"
bronze_inc_air_quality  = bronze_inc + "air_quality/"

# Endpoints Open-Meteo
GEOCODING_URL = "https://geocoding-api.open-meteo.com/v1/search"
WEATHER_URL   = "https://api.open-meteo.com/v1/forecast"
AIR_URL       = "https://air-quality-api.open-meteo.com/v1/air-quality"

print("‚úî Configuraci√≥n inicial cargada (weather + air_quality) con variables de entorno")


‚úî Configuraci√≥n inicial cargada (weather + air_quality) con variables de entorno


# Par√°metros iniciales de la API

En esta celda definimos los **par√°metros base** para la extracci√≥n:

- La ciudad a consultar.
- Las variables meteorol√≥gicas a solicitar.
- Las variables de calidad del aire (nuevo segundo endpoint).
- URLs de los endpoints de Open-Meteo (geocoding, weather y air-quality).
- Obtenemos la **latitud** y **longitud** usando el endpoint de geocoding.

Estos valores se reutilizar√°n tanto en la extracci√≥n **FULL** como en la **incremental**.


In [5]:
# ===============================================================
# üìç FUNCI√ìN PARA OBTENER LATITUD / LONGITUD
# ===============================================================

def obtener_lat_lon(ciudad: str) -> tuple[float, float]:
    """Consulta la API de geocoding para obtener latitud y longitud de una ciudad."""
    params = {"name": ciudad, "count": 1}
    resp = requests.get(GEOCODING_URL, params=params, timeout=10)
    if resp.status_code != 200:
        raise Exception(f"Error en geocoding ({resp.status_code})")
    data = resp.json()
    if "results" not in data or len(data["results"]) == 0:
        raise ValueError(f"No se encontr√≥ la ciudad: {ciudad}")
    lat = data["results"][0]["latitude"]
    lon = data["results"][0]["longitude"]
    return lat, lon

# Ejecutamos la funci√≥n
city = "Buenos Aires"
lat, lon = obtener_lat_lon(city)

lat, lon


(-34.61315, -58.37723)

# üìà Extracci√≥n y guardado incremental

En esta celda realizamos el proceso completo de **extracci√≥n incremental** desde *dos endpoints* de Open-Meteo:

### 1) Endpoint **Weather**
Descargamos √∫nicamente los datos meteorol√≥gicos del d√≠a actual (temperatura, humedad, lluvia, viento, etc.).

### 2) Endpoint **Air Quality**
Consultamos tambi√©n los valores horarios de calidad del aire (PM2.5, PM10, CO, O‚ÇÉ, NO‚ÇÇ, SO‚ÇÇ), generando un segundo dataset incremental.

---

### üõ†Ô∏è Flujo general de la extracci√≥n incremental

1. Consultamos ambos endpoints usando la latitud/longitud obtenida previamente.  
2. Convertimos la columna `time` a formato datetime.  
3. Creamos la columna `date` para poder particionar por d√≠a.  
4. Guardamos los resultados en la capa **Bronze**:

   - Meteorolog√≠a ‚Üí `s3://<bucket>/bronze/incremental/weather/`  
   - Calidad del aire ‚Üí `s3://<bucket>/bronze/incremental/air_quality/`  

5. Usamos:
   - **append** ‚Üí para agregar nuevas filas sin sobrescribir.  
   - **partition_by=["date"]** ‚Üí organiza los datos diarios.  

---

Este enfoque permite mantener un **historial incremental completo**, optimizado para consultas, auditor√≠a y futuras transformaciones en la capa Silver.


In [6]:
# ===============================================================
# üìç FUNCI√ìN PARA EXTRACCI√ìN INCREMENTAL + GUARDADO (WEATHER + AIR QUALITY)
# ===============================================================

def extraer_guardar_incremental(lat: float, lon: float) -> tuple[pd.DataFrame, pd.DataFrame]:
    """
    Realiza extracci√≥n incremental del d√≠a actual desde dos endpoints:
    - Weather (meteorolog√≠a)
    - Air Quality (calidad del aire)

    Ambos datasets se guardan en Bronze Incremental con:
    - mode="append"
    - particionado por columna 'date'
    """

    from datetime import date
    hoy = date.today().strftime("%Y-%m-%d")

    # ---------------------------
    # üå§Ô∏è 1) EXTRACCI√ìN WEATHER
    # ---------------------------
    params_weather = {
        "latitude": lat,
        "longitude": lon,
        "hourly": variables,
        "start_date": hoy,
        "end_date": hoy,
        "timezone": "America/Buenos_Aires"
    }

    resp_w = requests.get(WEATHER_URL, params_weather, timeout=10)
    data_w = resp_w.json()

    df_weather = pd.DataFrame(data_w["hourly"])
    df_weather["time"] = pd.to_datetime(df_weather["time"])
    df_weather["date"] = df_weather["time"].dt.date.astype(str)

    write_deltalake(
        bronze_inc + "weather/",
        df_weather,
        mode="append",
        partition_by=["date"],
        storage_options=storage_options
    )

    print("‚úî Incremental WEATHER guardado correctamente")

    # ---------------------------
    # üå´Ô∏è 2) EXTRACCI√ìN AIR QUALITY
    # ---------------------------
    params_air = {
        "latitude": lat,
        "longitude": lon,
        "hourly": ["pm10", "pm2_5", "carbon_monoxide", "ozone", "nitrogen_dioxide", "sulphur_dioxide"],
        "start_date": hoy,
        "end_date": hoy,
        "timezone": "America/Buenos_Aires"
    }

    AIR_QUALITY_URL = "https://air-quality-api.open-meteo.com/v1/air-quality"

    resp_a = requests.get(AIR_QUALITY_URL, params_air, timeout=10)
    data_a = resp_a.json()

    df_air = pd.DataFrame(data_a["hourly"])
    df_air["time"] = pd.to_datetime(df_air["time"])
    df_air["date"] = df_air["time"].dt.date.astype(str)

    write_deltalake(
        bronze_inc + "air_quality/",
        df_air,
        mode="append",
        partition_by=["date"],
        storage_options=storage_options
    )

    print("‚úî Incremental AIR QUALITY guardado correctamente")

    return df_weather, df_air


# ===========================
# ‚ñ∂Ô∏è Ejecutamos Incremental
# ===========================
df_inc_weather, df_inc_air = extraer_guardar_incremental(lat, lon)
df_inc_weather.head(), df_inc_air.head()


‚úî Incremental WEATHER guardado correctamente
‚úî Incremental AIR QUALITY guardado correctamente


(                 time  temperature_2m  relative_humidity_2m  \
 0 2025-12-03 00:00:00            19.0                    90   
 1 2025-12-03 01:00:00            18.7                    90   
 2 2025-12-03 02:00:00            18.5                    90   
 3 2025-12-03 03:00:00            18.3                    91   
 4 2025-12-03 04:00:00            18.3                    92   
 
    apparent_temperature  precipitation  weathercode  pressure_msl  cloudcover  \
 0                  21.0            0.0            1        1012.2           7   
 1                  20.7            0.0            0        1012.3           2   
 2                  20.3            0.0            0        1012.0           0   
 3                  20.1            0.0            1        1011.9           7   
 4                  20.1            0.0            1        1011.8          35   
 
    visibility  windspeed_10m  winddirection_10m        date  
 0     24140.0            4.6                 72  2025-12

### üü¶ Extracci√≥n y guardado FULL en Bronze

En esta celda realizamos la extracci√≥n **FULL**, obteniendo todos los datos disponibles de ambos endpoints de Open-Meteo:

- **Weather (meteorolog√≠a)**  
- **Air Quality (calidad del aire)**  

La extracci√≥n FULL representa un **snapshot completo** de cada endpoint.

**Caracter√≠sticas de la extracci√≥n FULL:**

- Descarga el dataset completo desde cada endpoint.
- Se guarda en la capa **Bronze**, en la carpeta `bronze/full/` dentro de subcarpetas separadas.
- Usa **overwrite**, reemplazando siempre la versi√≥n completa anterior.
- No utiliza particiones (las particiones se aplican solo al incremental).

Este proceso permite mantener una versi√≥n limpia y completa de cada fuente, ideal para auditor√≠a y para rederivar la capa Silver desde cero en cualquier momento.


In [7]:
AIR_QUALITY_URL = "https://air-quality-api.open-meteo.com/v1/air-quality"

# ===============================================================
# üìç FUNCI√ìN PARA EXTRACCI√ìN FULL + GUARDADO (WEATHER + AIR QUALITY)
# ===============================================================

def extraer_guardar_full(lat: float, lon: float) -> dict:
    """
    Realiza la extracci√≥n FULL del d√≠a actual desde Open-Meteo.

    Obtiene:
      - Datos meteorol√≥gicos (WEATHER)
      - Datos de calidad del aire (AIR QUALITY)

    Caracter√≠sticas:
    - Reemplaza completamente el snapshot anterior en Bronze FULL (overwrite).
    - No utiliza particiones.
    - Devuelve ambos DataFrames para inspecci√≥n en pantalla.
    """

    from datetime import date

    hoy = date.today().strftime("%Y-%m-%d")

    # -----------------------------------------------------------
    # üìå WEATHER FULL
    # -----------------------------------------------------------
    params_weather = {
        "latitude": lat,
        "longitude": lon,
        "hourly": variables,
        "start_date": hoy,
        "end_date": hoy,
        "timezone": "America/Buenos_Aires"
    }

    resp_weather = requests.get(WEATHER_URL, params_weather, timeout=10)
    data_weather = resp_weather.json()
    df_weather = pd.DataFrame(data_weather["hourly"])

    write_deltalake(
        bronze_full + "weather/",
        df_weather,
        mode="overwrite",
        storage_options=storage_options
    )

    print("‚úî WEATHER FULL guardado correctamente en Bronze")


    # -----------------------------------------------------------
    # üìå AIR QUALITY FULL
    # -----------------------------------------------------------
    params_air = {
        "latitude": lat,
        "longitude": lon,
        "hourly": ["pm10", "pm2_5", "carbon_monoxide", "ozone", "nitrogen_dioxide", "sulphur_dioxide"],
        "start_date": hoy,
        "end_date": hoy,
        "timezone": "America/Buenos_Aires"
    }

    resp_air = requests.get(AIR_QUALITY_URL, params_air, timeout=10)
    data_air = resp_air.json()
    df_air = pd.DataFrame(data_air["hourly"])

    write_deltalake(
        bronze_full + "air_quality/",
        df_air,
        mode="overwrite",
        storage_options=storage_options
    )

    print("‚úî AIR QUALITY FULL guardado correctamente en Bronze")

    # Devuelve ambos DF para verlos en Colab
    return {
        "weather": df_weather,
        "air_quality": df_air
    }


# ===============================================================
# ‚ñ∂ Ejecutar y mostrar resultados
# ===============================================================

full_data = extraer_guardar_full(lat, lon)
full_data  # Esto imprime un dict, pero Colab muestra los DataFrames


‚úî WEATHER FULL guardado correctamente en Bronze
‚úî AIR QUALITY FULL guardado correctamente en Bronze


{'weather':                 time  temperature_2m  relative_humidity_2m  \
 0   2025-12-03T00:00            19.0                    90   
 1   2025-12-03T01:00            18.7                    90   
 2   2025-12-03T02:00            18.5                    90   
 3   2025-12-03T03:00            18.3                    91   
 4   2025-12-03T04:00            18.3                    92   
 5   2025-12-03T05:00            18.3                    91   
 6   2025-12-03T06:00            18.3                    92   
 7   2025-12-03T07:00            19.4                    88   
 8   2025-12-03T08:00            21.3                    78   
 9   2025-12-03T09:00            23.0                    69   
 10  2025-12-03T10:00            24.7                    65   
 11  2025-12-03T11:00            26.3                    58   
 12  2025-12-03T12:00            28.6                    51   
 13  2025-12-03T13:00            29.6                    42   
 14  2025-12-03T14:00            30.4       

### üìù Notas importantes sobre FULL vs. INCREMENTAL

En el mismo d√≠a, las extracciones **FULL** e **INCREMENTAL** pueden devolver informaci√≥n muy similar, ya que ambas consultan los datos actuales de los endpoints de **Weather** y **Air Quality**.

La diferencia clave aparece con el paso del tiempo:

- La extracci√≥n **FULL** genera un **snapshot completo y actualizado**, reemplazando totalmente la versi√≥n anterior para ambos endpoints.
- La extracci√≥n **INCREMENTAL** agrega solo los datos nuevos del d√≠a, construyendo un **hist√≥rico acumulativo** tanto de clima como de calidad del aire.

Este enfoque nos permite mantener:

- Un **snapshot limpio y actualizado** (FULL).  
- Un **historial diario creciente** (INCREMENTAL).  

Ambos son √∫tiles para an√°lisis temporal y auditor√≠a de cambios en los datos.


