# Predicciones Meteorológicas (AEMET) - SPRINT I

Parte 1 - Extracción de Datos

Navegar la documentación de la API de AEMET y explorar los endpoints

Desarrollar un script que extraiga la información histórica de todas las provincias.

Ejecutar el script para extraer los datos de los últimos dos años y verificar que todo funcione correctamente.

En el modelo de datos, cada registro debe tener un timestamp de extracción y un identificador para que se pueda manejar el sistema de actualización.

In [None]:
import os
import requests
import time
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from dotenv import load_dotenv
import uuid

import matplotlib.pyplot as plt

load_dotenv()

API_KEY = os.getenv("AEMET_API_KEY")
CSV_ESTACIONES = "data/estaciones_filtradas.csv"
ARCHIVO_SALIDA = "data/temperaturas_historicas_ampliadas.csv"

# 4 rangos de fechas para cubrir 2 años
FECHAS = [
    ("2023-05-29T00:00:00UTC", "2023-11-28T00:00:00UTC"),
    ("2023-11-29T00:00:00UTC", "2024-05-28T00:00:00UTC"),
    ("2024-05-29T00:00:00UTC", "2024-11-28T00:00:00UTC"),
    ("2024-11-29T00:00:00UTC", "2025-05-28T00:00:00UTC")
]

# Cargamos los idema descargados
if os.path.exists(ARCHIVO_SALIDA):
    datos_existentes = pd.read_csv(ARCHIVO_SALIDA)
    estaciones_descargadas = set(datos_existentes["idema"].unique())
else:
    estaciones_descargadas = set()

# Estaciones
estaciones = pd.read_csv(CSV_ESTACIONES)

# Lista para guardar todos los datos
todas_las_filas = []

# Creamos identificador único para esta descarga
id_descarga = str(uuid.uuid4())

print("Cargando estaciones...")

for _, fila in estaciones.iterrows():
    codigo = fila["indicativo"]
    nombre = fila["nombre"]

    if codigo in estaciones_descargadas:
        continue

    print(f"Procesando estación: {codigo} - {nombre}")

    datos_estacion = []
    for fecha_inicio, fecha_fin in FECHAS:
        url_meta = (
            f"https://opendata.aemet.es/opendata/api/valores/climatologicos/diarios/"
            f"datos/fechaini/{fecha_inicio}/fechafin/{fecha_fin}/estacion/{codigo}"
        )

        try:
            respuesta_meta = requests.get(url_meta, params={"api_key": API_KEY})
            if respuesta_meta.status_code != 200:
                continue

            url_datos = respuesta_meta.json().get("datos")
            if not url_datos:
                continue

            respuesta_datos = requests.get(url_datos)
            if respuesta_datos.status_code != 200:
                continue

            datos_json = respuesta_datos.json()
            for fila in datos_json:
                fila["idema"] = codigo
                fila["nombre_estacion"] = nombre
                fila["timestamp_extraccion"] = datetime.utcnow().isoformat()
                fila["id_descarga"] = id_descarga  # ID de descarga 
                datos_estacion.append(fila)

            time.sleep(1.5)  

        except Exception:
            continue

    todas_las_filas.extend(datos_estacion)

# Convertimos a DataFrame y guardamos
if todas_las_filas:
    df = pd.DataFrame(todas_las_filas)

    if os.path.exists(ARCHIVO_SALIDA):
        df.to_csv(ARCHIVO_SALIDA, mode="a", index=False, header=False)
    else:
        df.to_csv(ARCHIVO_SALIDA, index=False)

    print("Los datos se han guardado correctamente.")
else:
    print("No se obtuvieron datos nuevos.")

Parte 2 - Limpieza de Datos

Hacer limpieza general de datos

Modelar los datos para trabajar cómodamente en una base de datos

Ejecutar los scripts de recopilación de datos

Considerar aplicar transformaciones

In [None]:
#Importar datos descargados

df = pd.read_csv("..\data\temperaturas_historicas_ampliadas.csv")

In [None]:
#Selección de columnas de interés

df_I = df[['id_descarga', 'idema', 'nombre_estacion', 'timestamp_estacion', 'provincia', 'altitud', 'fecha', 'tmin', 'tmax', 'tmed', 
           'hrMedia', 'prec', 'velmedia', 'racha']]

In [None]:
#Conversión a variables númericas

numeric_columns = ["tmin", "tmax", "tmed", "prec", "velmedia", "racha", "altitud"]
date_columns = ["fecha", "timestamp_extraccion"]

df_I = df_I.copy()

for num_col in numeric_columns:
    df_I.loc[:, num_col] = pd.to_numeric(df_I[num_col].astype(str).str.replace(",", ".", regex=False), errors="coerce")

for date_col in date_columns:
    df_I.loc[:, date_col] = pd.to_datetime(df_I[date_col], format="%Y-%m-%d", errors="coerce")

In [None]:
#Visualizar la distribución de las variables 

numeric_cols = df_II.select_dtypes(include=["number"]).columns

df_II[numeric_cols].hist(bins=30, figsize=(15, 10))
plt.tight_layout()
plt.show()

#No se realizarán transformaciones. 
#Las variables críticas (tmed, tmin, tmax) para la predicción de la temperatura media tienen una distribución aparentemente normal.
#Las variables no críticas transformadas pueden dificultar la interpretación de los gráficos históricos en Streamlit.

In [None]:
#Reconocer la cantidad de NA por variable

missing_values_count = df_II.isnull().sum()

print(missing_values_count)

In [None]:
#Evaluar los días sin registro por estación

def max_consecutive_nans(series):
    is_nan = series.isna()
    # Count runs of consecutive NaNs
    return is_nan.groupby((~is_nan).cumsum()).transform('size').where(is_nan, 0).max()

numeric_cols = df_II.select_dtypes(include=["number"]).columns

for col in numeric_cols:
    print(f"Max consecutive NaNs in {col}:")
    max_gaps = df_II.groupby("idema")[col].apply(max_consecutive_nans)
    print(max_gaps.sort_values(ascending=False).head(5))

In [None]:
#Tratamiento de NAs
#Premisas

#A. Preservar la distribución de las variables
#B. Preservar la estructura temporal y espacial de los datos
#C. No reemplazar valores fuera del periodo de actividad de las estaciones

def smart_impute_respecting_operation(df, numeric_cols, gap_threshold=3): # gap_threshold es el limite de días seguidos sin Na para reemplazar por la mediana
    df = df.sort_values(["idema", "fecha"]).copy() #Ordenar por estación y fecha para poder interpolar linealmente

    station_ranges = df.groupby("idema")["fecha"].agg(["min", "max"]) #Definir el periodo de actividad de cada estación

    def fill_gaps(group):
        start, end = station_ranges.loc[group.name]
        group = group[(group["fecha"] >= start) & (group["fecha"] <= end)].copy() 

        for col in numeric_cols:
            series = group[col]
            median_val = series.median() #Calcular la mediana para cada variable numérica

            is_nan = series.isna() #Serie booleana de Nas
            groups = (is_nan != is_nan.shift()).cumsum() #Identificar los gaps (bloque continuo de NAs) con shift y cada gap recibe un número de grupo único

            for g in groups[is_nan].unique():
                gap_idx = groups[groups == g].index
                gap_len = len(gap_idx) #Cantidad de días consecutivoss con NAs

                if gap_len <= gap_threshold:
                    group.loc[gap_idx, col] = median_val #Si el periodo consecutivo con NAs (gap) es menor o igual al gap_threshold en el periodo de actividad, entonces replazar con mediana

            group[col] = group[col].interpolate(method="linear", limit_direction="both") #Imputar linealmente en gaps mayores al gap_threshold
            group[col] = group[col].fillna(median_val) #Si quedó algún gap, entonces completar con la mediana

        return group

    df_imputed = df.groupby("idema").apply(fill_gaps) #Agrupar por estación y reemplazar Na
    df_imputed.reset_index(drop=True, inplace=True)

    # Descartar filas que aun preserven valores NA para el target
    df_imputed = df_imputed.dropna(subset=['tmed']).reset_index(drop=True)

    # Reemplazar con valor '0' los Na en la variable precipitación
    if 'prec' in df_imputed.columns:
        df_imputed['prec'] = df_imputed['prec'].fillna(0)

    # Incorporar ID único de limpieza
    df_imputed["id_limpieza"] = range(len(df_imputed))

    # Ordenar output
    ordered_columns = [
        'id_descarga', 'id_limpieza' 'idema', 'nombre_estación', 'timestamp_estacion',
        'provincia', 'altitud', 'fecha', 'tmin', 'tmax', 'tmed', 
        'hrMedia', 'prec', 'velmedia', 'racha'
    ]
    final_columns = [col for col in ordered_columns if col in df_imputed.columns]

    return df_imputed[final_columns]

In [None]:
#Ejecutar tratamiento

numeric_cols = ["tmin", "tmax", "tmed", "hrMedia", "prec", "velmedia", "racha"]

df_III = smart_impute_respecting_operation(df_II, numeric_cols, gap_threshold=3)

In [None]:
#Check distribución de las variables

numeric_cols = df_III.select_dtypes(include=["number"]).columns

df_III[numeric_cols].hist(bins=30, figsize=(15, 10))
plt.tight_layout()
plt.show()

In [None]:
#Check tratamiento de NA

missing_values_count = df_III.isnull().sum()

print(missing_values_count)

In [None]:
#Guardar

ruta_salida = "..\data\temperaturas_limpias.csv"
df_III.to_csv(ruta_salida, index=False)
print(":marca_de_verificación_blanca: Datos limpios guardados en:", ruta_salida)
print(":gráfico_de_barras: Dimensiones finales:", df_III.shape)