# Predicciones Meteorológicas (AEMET) - SPRINT I

Parte 1 - Extracción de Datos

Navegar la documentación de la API de AEMET y explorar los endpoints

Desarrollar un script que extraiga la información histórica de todas las provincias.

Ejecutar el script para extraer los datos de los últimos dos años y verificar que todo funcione correctamente.

En el modelo de datos, cada registro debe tener un timestamp de extracción y un identificador para que se pueda manejar el sistema de actualización.

In [None]:
import os
import requests
import time
import pandas as pd
from datetime import datetime, timedelta
from dotenv import load_dotenv
import uuid

load_dotenv()
API_KEY = "eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJqb3JnZXJpdmVyb2RlbG9zcmlvc0BnbWFpbC5jb20iLCJqdGkiOiJiMjlhZmM2Zi0yMTkwLTQ4ZTEtYjlmYy01NGY5OTk3OTc1YjUiLCJpc3MiOiJBRU1FVCIsImlhdCI6MTc0ODk2ODY4NSwidXNlcklkIjoiYjI5YWZjNmYtMjE5MC00OGUxLWI5ZmMtNTRmOTk5Nzk3NWI1Iiwicm9sZSI6IiJ9.90idEjGLaI61xKuPe8sdQtBJ2fdf4gwZmsww11V1VpE"
if not API_KEY:
    raise RuntimeError("❌ No se encontró AEMET_API_KEY en las variables de entorno.")

# ---------------------------------------------------
# Función para obtener el inventario completo de estaciones
# ---------------------------------------------------
def obtener_inventario_completo():
    url_inventario = (
        "https://opendata.aemet.es/opendata/api/"
        "valores/climatologicos/inventarioestaciones/todasestaciones"
    )
    r = requests.get(url_inventario, params={"api_key": API_KEY}, timeout=15)
    r.raise_for_status()
    datos_meta = r.json().get("datos")
    if not datos_meta:
        raise RuntimeError("No se obtuvo URL de datos del inventario.")
    r2 = requests.get(datos_meta, timeout=15)
    r2.raise_for_status()
    estaciones = r2.json()  # Lista de diccionarios
    return pd.DataFrame(estaciones)

# ---------------------------------------------------
# Función para descargar datos diarios para una estación
# ---------------------------------------------------
def descargar_para_una_estacion(idema: str, nombre: str, id_descarga: str, bloques_fechas) -> list:
    filas = []
    for fecha_ini, fecha_fin in bloques_fechas:
        url_meta = (
            "https://opendata.aemet.es/opendata/api/valores/climatologicos/diarios/"
            f"datos/fechaini/{fecha_ini}/fechafin/{fecha_fin}/estacion/{idema}"
        )
        try:
            r = requests.get(url_meta, params={"api_key": API_KEY}, timeout=15)
            if r.status_code != 200:
                continue
            datos_meta = r.json()
            url_real  = datos_meta.get("datos")
            if not url_real:
                continue

            rd = requests.get(url_real, timeout=15)
            if rd.status_code != 200:
                continue

            lista_json = rd.json()
            for rec in lista_json:
                rec["idema"]               = idema
                rec["nombre_estacion"]     = nombre
                rec["timestamp_extraccion"]= datetime.utcnow().isoformat()
                rec["id_descarga"]         = id_descarga
                filas.append(rec)

            time.sleep(1.2)
        except requests.RequestException:
            continue
    return filas

# ---------------------------------------------------
# Script principal
# ---------------------------------------------------
def main():
    print("📥 Iniciando extracción de datos de TODAS las estaciones...")

    # 1) Obtengo el inventario completo (todas las estaciones)
    estaciones_df = obtener_inventario_completo()
    estaciones_df = estaciones_df.dropna(subset=["indicativo"])  # descartar filas sin ID válido

    # 2) Defino rangos de fechas automáticos de últimos 2 años, en 4 bloques de ~6 meses
    hoy = datetime.utcnow().date()
    hace_dos_años = hoy - timedelta(days=730)
    bloques = []
    inicio = hace_dos_años
    while inicio < hoy:
        fin = inicio + timedelta(days=182)
        if fin > hoy:
            fin = hoy
        bloques.append((f"{inicio.isoformat()}T00:00:00UTC", f"{fin.isoformat()}T00:00:00UTC"))
        inicio = fin + timedelta(days=1)

    # 3) Veo si ya hay un CSV de salida previo, para no re-descargar estaciones
    ARCHIVO_SALIDA = "data/temperaturas_historicas_todas.csv"
    ya_descargadas = set()
    if os.path.exists(ARCHIVO_SALIDA):
        try:
            df_prev = pd.read_csv(ARCHIVO_SALIDA, dtype=str, usecols=["idema"])
            ya_descargadas = set(df_prev["idema"].dropna().unique())
        except Exception:
            ya_descargadas = set()

    # 4) Genero un UUID para esta ejecución
    id_descarga = str(uuid.uuid4())
    print(f"   • UUID de esta descarga: {id_descarga}")
    print(f"   • Total de estaciones a procesar: {len(estaciones_df)}")

    # 5) Recorro cada estación
    todas_las_filas = []
    for idx, fila in estaciones_df.iterrows():
        idema  = fila["indicativo"]
        nombre = fila.get("nombre", "")

        if idema in ya_descargadas:
            print(f"➖ Saltando {idema} (ya descargada antes)")
            continue

        print(f"📡 Procesando estación: {idema} — {nombre} ({idx+1}/{len(estaciones_df)})")
        filas_est = descargar_para_una_estacion(idema, nombre, id_descarga, bloques)
        todas_las_filas.extend(filas_est)

    # 6) Guardo todo en un CSV final
    if todas_las_filas:
        df_final = pd.DataFrame(todas_las_filas)
        os.makedirs("data", exist_ok=True)
        if os.path.exists(ARCHIVO_SALIDA):
            df_final.to_csv(ARCHIVO_SALIDA, mode="a", index=False, header=False)
        else:
            df_final.to_csv(ARCHIVO_SALIDA, index=False)
        print(f"✅ Extracción completada. Guardado: '{ARCHIVO_SALIDA}'")
        print(f"   → Filas nuevas: {len(df_final)}")
    else:
        print("⚠️ No se obtuvieron datos nuevos en esta ejecución.")


if __name__ == "__main__":
    main()

Parte 2 - Limpieza de Datos

Hacer limpieza general de datos

Modelar los datos para trabajar cómodamente en una base de datos

Ejecutar los scripts de recopilación de datos

Considerar aplicar transformaciones

In [None]:
import pandas as pd
import os

# 1) Cargamos 
ruta_entrada = r"C:\Users\User\OneDrive - Universidade de Santiago de Compostela\Documentos\Data Science\temperaturas_historicas_todas.csv"
if not os.path.exists(ruta_entrada):
    raise FileNotFoundError(f"No encontré archivo '{ruta_entrada}'. Revisa la ruta.")

df = pd.read_csv(ruta_entrada, dtype=str)

# 2) Convertimos columnas numéricas y de fecha al tipo apropiado
columnas_numéricas = [
    "tmin", "tmax", "tmed", "prec", "velmedia", "racha", "hrMedia", "altitud"
]
for col in columnas_numéricas:
    df[col] = pd.to_numeric(
        df[col].astype(str).str.replace(",", ".", regex=False),
        errors="coerce"
    )

df["fecha"] = pd.to_datetime(df["fecha"], format="%Y-%m-%d", errors="coerce")
df["timestamp_extraccion"] = pd.to_datetime(df["timestamp_extraccion"], errors="coerce")

# 3) Selecciono solo las columnas que necesito
df = df[[
    "id_descarga",
    "indicativo",
    "nombre",
    "provincia",
    "altitud",
    "fecha",
    "tmin", "tmax", "tmed", "prec", "velmedia", "racha", "hrMedia",
    "timestamp_extraccion"
]].copy()

# 4) Estandarizar nombres 
provincia_map = {
    'STA. CRUZ DE TENERIFE': 'Santa Cruz De Tenerife',
    'SANTA CRUZ DE TENERIFE': 'Santa Cruz De Tenerife',
    'ILLES BALEARS': 'Illes Balears',
    'BALEARES': 'Illes Balears',
    'A CORUÑA': 'A Coruña',
    'GIRONA': 'Girona',
    'LAS PALMAS': 'Las Palmas',
    'PONTEVEDRA': 'Pontevedra',
    'CANTABRIA': 'Cantabria',
    'MALAGA': 'Málaga',
    'ALMERIA': 'Almería',
    'MURCIA': 'Murcia',
    'ALBACETE': 'Albacete',
    'AVILA': 'Ávila',
    'ARABA/ALAVA': 'Araba/Álava',
    'BADAJOZ': 'Badajoz',
    'ALICANTE': 'Alacant/Alicante',
    'CASTELLON': 'Castelló/Castellón',
    'OURENSE': 'Ourense',
    'BARCELONA': 'Barcelona',
    'BURGOS': 'Burgos',
    'CACERES': 'Cáceres',
    'CADIZ': 'Cádiz',
    'CIUDAD REAL': 'Ciudad Real',
    'JAEN': 'Jaén',
    'CORDOBA': 'Córdoba',
    'CUENCA': 'Cuenca',
    'GRANADA': 'Granada',
    'GUADALAJARA': 'Guadalajara',
    'GIPUZKOA': 'Gipuzkoa/Guipúzcoa',
    'HUESCA': 'Huesca',
    'LEON': 'León',
    'LLEIDA': 'Lleida',
    'LA RIOJA': 'La Rioja',
    'SORIA': 'Soria',
    'NAVARRA': 'Navarra',
    'CEUTA': 'Ceuta',
    'LUGO': 'Lugo',
    'MADRID': 'Madrid',
    'PALENCIA': 'Palencia',
    'SALAMANCA': 'Salamanca',
    'SEGOVIA': 'Segovia',
    'SEVILLA': 'Sevilla',
    'TOLEDO': 'Toledo',
    'TARRAGONA': 'Tarragona',
    'TERUEL': 'Teruel',
    'VALENCIA': 'València/Valencia',
    'VALLADOLID': 'Valladolid',
    'BIZKAIA': 'Bizkaia/Vizcaya',
    'ZAMORA': 'Zamora',
    'ZARAGOZA': 'Zaragoza',
    'MELILLA': 'Melilla',
    'ASTURIAS': 'Asturias',
    'HUELVA': 'Huelva'
}

df["provincia"] = df["provincia"].map(mapa_provincias).fillna(df["provincia"])

# 5) Ordeno por estación y fecha
df = df.sort_values(["indicativo", "fecha"]).reset_index(drop=True)

# 6) Eliminamos valores que no convienen
df.loc[(df["tmin"] > 45) | (df["tmin"] < -25), "tmin"] = pd.NA
df.loc[(df["tmax"] > 50) | (df["tmax"] < -25), "tmax"] = pd.NA
df.loc[(df["tmed"] > 45) | (df["tmed"] < -20), "tmed"] = pd.NA
df.loc[df["prec"] > 300, "prec"] = pd.NA
df.loc[df["velmedia"] > 25, "velmedia"] = pd.NA
df.loc[df["racha"] > 50, "racha"] = pd.NA
df.loc[(df["hrMedia"] < 5) | (df["hrMedia"] > 100), "hrMedia"] = pd.NA

# 6. b) coherencia lógica: tmin ≤ tmax
df.loc[df["tmin"] > df["tmax"], ["tmin", "tmax", "tmed"]] = pd.NA

# 7) Hago función para rellenar huecos con la mediana e interpolar el resto
def rellenar_por_estacion(grupo):
    inicio, fin = grupo["fecha"].min(), grupo["fecha"].max()
    grupo = grupo[(grupo["fecha"] >= inicio) & (grupo["fecha"] <= fin)].copy()

    columnas_a_imputar = ["tmin", "tmax", "tmed", "prec", "velmedia", "racha", "hrMedia"]
    for col in columnas_a_imputar:
        serie = grupo[col]
        mediana = serie.median()
        es_nan = serie.isna()
        # Bloques de NaN seguidos
        bloques = (es_nan != es_nan.shift()).cumsum()
        for b in bloques[es_nan].unique():
            idx_bloque = bloques[bloques == b].index
            if len(idx_bloque) <= 3:
                grupo.loc[idx_bloque, col] = mediana
        # Interpolamos linealmente donde queden NaN
        grupo[col] = grupo[col].interpolate(method="linear", limit_direction="both")
        # Si aún hay NaN, completo con la mediana
        grupo[col] = grupo[col].fillna(mediana)

    return grupo

# 8) Su función a cada estación
df = df.groupby("indicativo", group_keys=False).apply(rellenar_por_estacion).reset_index(drop=True)

# 9) Elimino filas que todavía tengan tmed vacío
df = df.dropna(subset=["tmed"]).reset_index(drop=True)

# 10) En lluvias convertimos NaN a 0
df["prec"] = df["prec"].fillna(0)

# 11) Agregamos un identificador de limpieza 
df["id_limpieza"] = range(1, len(df) + 1)

# 12) Guardamos el CSV limpio
ruta_salida = "data/temperaturas_limpias.csv"
os.makedirs("data", exist_ok=True)
df.to_csv(ruta_salida, index=False)

print("terminamos de limpiar los datos.")
print("El CSV limpio se guardó en:", ruta_salida)
print("Tamaño final:", df.shape)