# Trabajo Integrador - Data Engineering
**Alumno:** Ignacio J L√≥pez

## Objetivos del Proyecto
Este proyecto implementa un pipeline ETL (Extracci√≥n, Transformaci√≥n y Carga) completo utilizando Python y Delta Lake.
El objetivo es ingerir datos meteorol√≥gicos hist√≥ricos y predicciones, almacenarlos eficientemente y procesarlos para generar reportes anal√≠ticos.

## Fuente de Datos
Se utiliz√≥ la API p√∫blica de **Open-Meteo** debido a su robustez y disponibilidad de datos temporales granulares.
- **Endpoint Est√°tico:** Geocoding API (Metadatos de ciudades).
- **Endpoint Din√°mico:** Historical Weather API (Datos horarios de temperatura y precipitaci√≥n).

## Estrategia de Almacenamiento
- **Formato:** Delta Lake (por sus capacidades ACID y gesti√≥n de metadatos).
- **Esquema:** Arquitectura Medallion simplificada (Raw -> Processed).
- **Particionamiento:** Se particiona por fecha y ciudad en la capa Raw para optimizar la lectura incremental.

In [1]:
"""
M√≥dulo de Configuraci√≥n.
Carga librer√≠as y variables de entorno para asegurar que las credenciales no se expongan en el c√≥digo.
"""
import os
import requests
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from deltalake import write_deltalake, DeltaTable
from dotenv import load_dotenv

# Carga de variables de entorno (Seguridad)
load_dotenv()

# Validaci√≥n de Configuraci√≥n
GEO_URL = os.getenv("BASE_URL_GEOCODING")
WEATHER_URL = os.getenv("BASE_URL_WEATHER")
LAKE_PATH = "datalake"

# Definici√≥n del alcance del proyecto
CIUDADES_OBJETIVO = ["Buenos Aires", "Ushuaia", "General Pico", "Resistencia", "Tilcara"]

if not GEO_URL or not WEATHER_URL:
    raise ValueError("Error Cr√≠tico: Faltan variables en el archivo .env")

print("Configuraci√≥n inicializada correctamente.")

Configuraci√≥n inicializada correctamente.


In [2]:
def obtener_data_api(url, params):
    """
    Gestiona la conexi√≥n con la API manejando posibles errores HTTP.
    
    Args:
        url (str): Endpoint base.
        params (dict): Par√°metros de la consulta.
    Returns:
        dict: Respuesta JSON o None en caso de error.
    """
    try:
        response = requests.get(url, params=params, timeout=10)
        response.raise_for_status()
        return response.json()
    except requests.exceptions.RequestException as e:
        print(f"Error de conexi√≥n con {url}: {e}")
        return None

def obtener_ultima_fecha_registrada(city_id, tabla_path):
    """
    Consulta el Delta Lake para determinar el punto de partida de la extracci√≥n incremental.
    
    Estrategia:
    - Si la tabla no existe -> Retorna None (Indica Carga Full).
    - Si existe -> Retorna la fecha m√°xima registrada para esa ciudad.
    """
    path = f"{LAKE_PATH}/raw/{tabla_path}"
    if not os.path.exists(path):
        return None
    
    try:
        dt = DeltaTable(path)
        # Leemos solo columnas necesarias para optimizar rendimiento
        df_existente = dt.to_pandas(columns=["city_id", "time"])
        df_city = df_existente[df_existente["city_id"] == city_id]
        
        if df_city.empty:
            return None
        return pd.to_datetime(df_city["time"]).max()
    except Exception:
        return None # Ante cualquier error de lectura, asumimos carga full por seguridad

def extraer_metadatos_ciudades(lista_ciudades):
    """
    Extracci√≥n de datos est√°ticos (Dimensiones).
    Obtiene coordenadas y datos geogr√°ficos de las ciudades objetivo.
    """
    resultados = []
    print(f"Extrayendo metadatos para {len(lista_ciudades)} ciudades...")
    
    for ciudad in lista_ciudades:
        params = {"name": ciudad, "count": 1, "language": "es", "format": "json"}
        data = obtener_data_api(GEO_URL, params)
        
        if data and "results" in data:
            info = data["results"][0]
            resultados.append({
                "city_id": info.get("id"),
                "name": info.get("name"),
                "latitude": info.get("latitude"),
                "longitude": info.get("longitude"),
                "country": info.get("country"),
                "population": info.get("population")
            })
    return pd.DataFrame(resultados)

def extraer_datos_climaticos(df_ciudades):
    """
    Extracci√≥n de datos din√°micos (Hechos) con L√≥gica Incremental.
    
    Decisi√≥n de Dise√±o:
    - Se verifica la √∫ltima fecha por ciudad.
    - Se solicita solo el delta de tiempo faltante a la API.
    - Se guarda en modo 'append' particionado por fecha.
    """
    tabla_clima = "weather_hourly"
    registros_totales = 0
    
    print(f"Iniciando extracci√≥n incremental de clima")
    
    for _, row in df_ciudades.iterrows():
        cid, cname = row["city_id"], row["name"]
        
        # 1. Definir ventana de tiempo (Incremental vs Full)
        ultima_fecha = obtener_ultima_fecha_registrada(cid, tabla_clima)
        fecha_fin = datetime.now().date()
        
        if ultima_fecha is None:
            # Estrategia Full: 90 d√≠as de historia
            fecha_inicio = fecha_fin - timedelta(days=90)
            modo_msg = "FULL (90 d√≠as)"
        else:
            # Estrategia Incremental
            fecha_inicio = ultima_fecha.date()
            modo_msg = f"INCREMENTAL (Desde {fecha_inicio})"
        
        if fecha_inicio > fecha_fin:
            continue # Datos al d√≠a

        # 2. Consulta a API
        params = {
            "latitude": row["latitude"],
            "longitude": row["longitude"],
            "start_date": fecha_inicio.strftime("%Y-%m-%d"),
            "end_date": fecha_fin.strftime("%Y-%m-%d"),
            "hourly": "temperature_2m,relative_humidity_2m,precipitation",
            "timezone": "auto"
        }
        
        data = obtener_data_api(WEATHER_URL, params)
        
        if data and "hourly" in data:
            df = pd.DataFrame(data["hourly"])
            df["city_id"] = cid
            df["time"] = pd.to_datetime(df["time"])
            df["fecha"] = df["time"].dt.date.astype(str) # Columna para partici√≥n
            
            # Filtrado estricto para evitar duplicados en el borde de la fecha
            if ultima_fecha:
                df = df[df["time"] > ultima_fecha]
            
            if not df.empty:
                # Almacenamiento Raw
                write_deltalake(
                    f"{LAKE_PATH}/raw/{tabla_clima}", 
                    df, 
                    mode="append", 
                    partition_by=["city_id", "fecha"]
                )
                registros_totales += len(df)
                print(f"   ‚úÖ {cname}: {modo_msg} -> {len(df)} registros nuevos.")
    
    print(f"Extracci√≥n finalizada. Total registros insertados: {registros_totales}")

## Procesamiento y Transformaci√≥n de Datos

En esta etapa se leen los datos crudos ("Raw Layer") y se aplican las siguientes transformaciones para generar valor:

1.  **Limpieza:** Eliminaci√≥n de duplicados y valores nulos para asegurar la calidad del dato.
2.  **Enriquecimiento (JOIN):** Se cruzan los datos clim√°ticos con los metadatos de las ciudades para agregar contexto geogr√°fico (Pa√≠s, Nombre).
3.  **Ingenier√≠a de Caracter√≠sticas:** Creaci√≥n de la columna `es_alerta_clima` basada en l√≥gica condicional (Temperaturas extremas).
4.  **Agregaci√≥n:** Se reduce la granularidad de horaria a diaria, calculando m√©tricas clave (M√≠nima, M√°xima, Promedio).

In [3]:
def procesar_datos():
    """
    Pipeline de Procesamiento.
    Genera la capa 'Processed' con datos agregados diariamente.
    """
    print("üè≠ Iniciando procesamiento y refinamiento...")
    
    # 1. Lectura Raw
    try:
        df_geo = DeltaTable(f"{LAKE_PATH}/raw/ciudades").to_pandas()
        df_clima = DeltaTable(f"{LAKE_PATH}/raw/weather_hourly").to_pandas()
    except Exception:
        print("‚ö†Ô∏è No hay datos suficientes para procesar.")
        return

    # 2. Limpieza
    df_clima.drop_duplicates(subset=["city_id", "time"], inplace=True)
    df_clima.dropna(subset=["temperature_2m"], inplace=True)
    
    # 3. Conversi√≥n de Tipos
    df_clima["time"] = pd.to_datetime(df_clima["time"])
    df_clima["fecha_dia"] = df_clima["time"].dt.date

    # 4. Join (Enriquecimiento)
    df_joined = pd.merge(
        df_clima, 
        df_geo[["city_id", "name", "country"]], 
        on="city_id", 
        how="inner"
    )

    # 5. L√≥gica de Negocio (Nueva Columna)
    # Alerta si T > 30 (Calor) o T < 5 (Fr√≠o)
    df_joined["es_alerta_clima"] = np.where(
        (df_joined["temperature_2m"] > 30) | (df_joined["temperature_2m"] < 5), 
        True, 
        False
    )

    # 6. Agregaciones (Resumen Diario)
    df_resumen = df_joined.groupby(["country", "name", "fecha_dia"]).agg({
        "temperature_2m": ["max", "min", "mean"],
        "precipitation": "sum",
        "es_alerta_clima": "max" # True si hubo alguna alerta en el d√≠a
    }).reset_index()
    
    # Aplanar columnas MultiIndex
    df_resumen.columns = [
        f"{col[0]}_{col[1]}" if col[1] else col[0] 
        for col in df_resumen.columns
    ]
    
    # 7. Renombrado final
    df_resumen.rename(columns={
        "temperature_2m_mean": "temp_avg",
        "precipitation_sum": "precipitacion_total",
        "es_alerta_clima_max": "hubo_alerta"
    }, inplace=True)

    # 8. Guardado Processed (Sobreescritura para actualizar hist√≥rico)
    ruta_proc = f"{LAKE_PATH}/processed/clima_diario"
    write_deltalake(ruta_proc, df_resumen, mode="overwrite", partition_by=["country"])
    
    print(f"‚úÖ Datos procesados guardados en: {ruta_proc}")
    display(df_resumen.head())

# --- ORQUESTACI√ìN PRINCIPAL ---
if __name__ == "__main__":
    # Paso 1: Metadatos
    df_geo = extraer_metadatos_ciudades(CIUDADES_OBJETIVO)
    write_deltalake(f"{LAKE_PATH}/raw/ciudades", df_geo, mode="overwrite")
    
    # Paso 2: Datos Incrementales
    extraer_datos_climaticos(df_geo)
    
    # Paso 3: Transformaci√≥n
    procesar_datos()

Extrayendo metadatos para 5 ciudades...
Iniciando extracci√≥n incremental de clima
   ‚úÖ Buenos Aires: FULL (90 d√≠as) -> 2184 registros nuevos.
   ‚úÖ Ushuaia: FULL (90 d√≠as) -> 2184 registros nuevos.
   ‚úÖ General Pico: FULL (90 d√≠as) -> 2184 registros nuevos.
   ‚úÖ Ciudad de Resistencia: FULL (90 d√≠as) -> 2184 registros nuevos.
   ‚úÖ Tilcara: FULL (90 d√≠as) -> 2184 registros nuevos.
Extracci√≥n finalizada. Total registros insertados: 10920
üè≠ Iniciando procesamiento y refinamiento...
‚úÖ Datos procesados guardados en: datalake/processed/clima_diario


Unnamed: 0,country,name,fecha_dia,temperature_2m_max,temperature_2m_min,temp_avg,precipitacion_total,hubo_alerta
0,Argentina,Buenos Aires,2025-10-01,20.1,13.9,16.942105,0.0,False
1,Argentina,Buenos Aires,2025-10-02,20.8,14.5,17.070833,0.0,False
2,Argentina,Buenos Aires,2025-10-03,21.9,14.8,18.0625,0.0,False
3,Argentina,Buenos Aires,2025-10-04,24.8,17.1,20.1125,0.0,False
4,Argentina,Buenos Aires,2025-10-05,21.0,13.3,17.658333,30.8,False
