# Data Read and Formatting

Notebook para la lectura de los datos y para su modificación de formato en archivos csv, con tablas individuales.

## Path Configuration

In [1]:
import os
from pathlib import Path
print("Directorio actual:", os.getcwd())

current_dir = Path.cwd()
project_root = current_dir
while not (project_root / '.git').exists() and project_root != project_root.parent:
    project_root = project_root.parent
    os.chdir("..")

print(f"Raíz del proyecto: {project_root}")
print("CWD cambiado a raíz del proyecto")


Directorio actual: /home/ferrus/Documents/university/semester_x/MA2003B/PROYECTO_MA2003B/notebooks/01_business_understanding
Raíz del proyecto: /home/ferrus/Documents/university/semester_x/MA2003B/PROYECTO_MA2003B
CWD cambiado a raíz del proyecto


# Esquema de consolidación para archivos .xlsx 2022 - 2024

Se utiliza la versión procesada que separa las distintas tablas indexadas en distintas hojas de cálculo. Es primordial separar las tablas de los datos de 2023 para lograr una unidad en la metodología de importación.

In [20]:
import pandas as pd

# Ruta del archivo original
archivo = r"data/interim/DATOS HISTÓRICOS 2023_2024_SEPARADO.xlsx"

# Cargar todas las hojas en un diccionario {nombre_hoja: DataFrame}
hojas = pd.read_excel(archivo, sheet_name=None)

# Lista para guardar los DataFrames concatenados
dfs = []

for nombre_hoja, df in hojas.items():
    # Poner todos los nombres de columnas en minúsculas
    df.columns = df.columns.str.lower()
    
    # Cambiar "date" a "date_index" si existe
    if "date" in df.columns:
        df = df.rename(columns={"date": "date_index"})
    
    # Agregar la columna 'estacion'
    df["estacion"] = nombre_hoja
    
    # Limpiar caracteres no imprimibles de strings
    
    # Excluir "CATÁLOGO" de la unión
    if nombre_hoja != "CATÁLOGO":
        dfs.append(df)

# Concatenar todas las hojas excepto "CATÁLOGO"
df_final = pd.concat(dfs, ignore_index=True)

# Guardar en un nuevo archivo con una sola hoja (usando xlsxwriter)
nuevo_archivo = "data/interim/DATOS_HISTORICOS_2023_2024_CONCATENADO.xlsx"
df_final.to_excel(nuevo_archivo, sheet_name="datos", index=False, engine="xlsxwriter")

print("Archivo guardado como:", nuevo_archivo)

Archivo guardado como: data/interim/DATOS_HISTORICOS_2023_2024_CONCATENADO.xlsx


# Esquema de lectura y tipado del archivo.

## Lectura de archivos post primer organización

In [43]:
path_22 = r"data/interim/DATOS_HISTORICOS_2022_2023_CONCATENADO.xlsx"
path_23 = r"data/interim/DATOS_HISTORICOS_2023_2024_CONCATENADO.xlsx"
path_24 = r"data/interim/DATOS_HISTORICOS_2024_CONCATENADO.csv"

df_22 = pd.read_excel(path_22)
df_23 = pd.read_excel(path_23)
df_24 = pd.read_csv(path_24)

### Verificación de campos y variables

In [45]:
df_23.columns == df_22.columns

array([ True,  True,  True,  True,  True,  True,  True,  True,  True,
        True,  True,  True,  True,  True,  True,  True,  True,  True])

In [46]:
df_23.columns

Index(['date_index', 'co', 'no', 'no2', 'nox', 'o3', 'pm10', 'pm2.5', 'prs',
       'rainf', 'rh', 'so2', 'sr', 'tout', 'wsr', 'wdr', 'estacion',
       'archivo_origen'],
      dtype='object')

In [47]:
df_24.columns

Index(['date', 'co (ppm)', 'no (ppb)', 'no2 (ppb)', 'nox (ppb)', 'o3 (ppb)',
       'pm10 (ug/m3)', 'pm2.5 (ug/m3)', 'prs (mmhg)', 'rainf (mm/h)', 'rh (%)',
       'so2 (ppb)', 'sr (kw/m2)', 'tout (ºc)', 'wsr (km/h)', 'wdr (azimutal)',
       'estacion', 'archivo_origen'],
      dtype='object')

In [48]:
mapa_renombre_24 = {'date':'date_index',
                   'co (ppm)':'co',
                   'no (ppb)':'no',
                   'no2 (ppb)':'no2',
                   'nox (ppb)': 'nox',
                   'o3 (ppb)' : 'o3',
                   'pm10 (ug/m3)': 'pm10',
                   'pm2.5 (ug/m3)':'pm2.5',
                   'prs (mmhg)':'prs',
                   'rainf (mm/h)':'rainf',
                   'rh (%)':'rh',
                   'so2 (ppb)':'so2',
                   'sr (kw/m2)':'sr',
                   'tout (ºc)':'tout',
                   'wsr (km/h)':'wsr',
                   'wdr (azimutal)':'wdr',
                   'estacion':'estacion'}

In [49]:
df_24_renamed = df_24.rename(columns=mapa_renombre_24)

In [50]:
df_23.columns == df_24_renamed.columns

array([ True,  True,  True,  True,  True,  True,  True,  True,  True,
        True,  True,  True,  True,  True,  True,  True,  True,  True])

In [51]:
df_22['archivo_origen'] = "DATOS_HISTORICOS_2022_2023_CONCATENADO"
df_23['archivo_origen'] = 'DATOS_HISTORICOS_2023_2024_CONCATENADO.xlsx'
df_24['archivo_origen'] = 'DATOS_HISTORICOS_2024_CONCATENADO.csv'

### Esquema de tipado

In [55]:
import numpy as np
from typing import Dict, List, Optional, Union, Any
import warnings

def optimizar_tipos_datos(df: pd.DataFrame, 
                         columnas_fecha: Optional[List[str]] = None,
                         columnas_string: Optional[List[str]] = None,
                         formato_fecha: Optional[str] = None,
                         reducir_memoria: bool = True,
                         verbose: bool = False) -> pd.DataFrame:
    """
    Optimiza los tipos de datos de un DataFrame de forma inteligente.
    
    Parameters:
    -----------
    df : pd.DataFrame
        DataFrame a optimizar
    columnas_fecha : Optional[List[str]]
        Lista específica de columnas a tratar como fecha. Si None, detecta automáticamente.
    columnas_string : Optional[List[str]]
        Lista específica de columnas a tratar como string. Si None, detecta automáticamente.
    formato_fecha : Optional[str]
        Formato específico para parsing de fechas (ej: '%Y-%m-%d %H:%M:%S')
    reducir_memoria : bool
        Si True, optimiza tipos numéricos para reducir memoria
    verbose : bool
        Si True, muestra información detallada del proceso
        
    Returns:
    --------
    pd.DataFrame : DataFrame con tipos optimizados
    """
    
    df_optimized = df.copy()
    cambios_realizados = {}
    memoria_original = df.memory_usage(deep=True).sum()
    
    if verbose:
        print(f"Memoria original: {memoria_original / 1024**2:.2f} MB")
        print(f"Columnas a procesar: {list(df.columns)}")
    
    # 1. DETECTAR Y CONVERTIR COLUMNAS DE FECHA
    columnas_fecha_detectadas = _detectar_columnas_fecha(df_optimized, columnas_fecha)
    
    for col in columnas_fecha_detectadas:
        tipo_original = str(df_optimized[col].dtype)
        try:
            df_optimized[col] = _convertir_a_datetime(df_optimized[col], formato_fecha)
            cambios_realizados[col] = f"{tipo_original} → datetime64[ns]"
            if verbose:
                print(f"✓ {col}: convertida a datetime")
        except Exception as e:
            if verbose:
                print(f"✗ {col}: falló conversión a datetime - {str(e)}")
    
    # 2. DETECTAR Y CONVERTIR COLUMNAS STRING
    columnas_string_detectadas = _detectar_columnas_string(df_optimized, columnas_string, columnas_fecha_detectadas)
    
    for col in columnas_string_detectadas:
        tipo_original = str(df_optimized[col].dtype)
        if df_optimized[col].dtype != 'object':
            df_optimized[col] = df_optimized[col].astype('string')
            cambios_realizados[col] = f"{tipo_original} → string"
            if verbose:
                print(f"✓ {col}: convertida a string")
    
    # 3. OPTIMIZAR COLUMNAS NUMÉRICAS
    columnas_numericas = _identificar_columnas_numericas(df_optimized, columnas_fecha_detectadas, columnas_string_detectadas)
    
    for col in columnas_numericas:
        tipo_original = str(df_optimized[col].dtype)
        
        try:
            # Primero convertir a numérico si es necesario
            if not pd.api.types.is_numeric_dtype(df_optimized[col]):
                df_optimized[col] = pd.to_numeric(df_optimized[col], errors='coerce')
            
            tipo_optimizado = _optimizar_columna_numerica(df_optimized[col], reducir_memoria)
            
            if tipo_optimizado != tipo_original:
                # Intentar conversión con manejo de errores
                try:
                    df_optimized[col] = df_optimized[col].astype(tipo_optimizado)
                    cambios_realizados[col] = f"{tipo_original} → {tipo_optimizado}"
                    if verbose:
                        print(f"✓ {col}: optimizada a {tipo_optimizado}")
                except (ValueError, TypeError) as e:
                    # Si falla, usar float64 como fallback
                    df_optimized[col] = df_optimized[col].astype('float64')
                    cambios_realizados[col] = f"{tipo_original} → float64 (fallback)"
                    if verbose:
                        print(f"! {col}: fallback a float64 - {str(e)[:50]}...")
        
        except Exception as e:
            if verbose:
                print(f"✗ {col}: error en optimización - {str(e)[:50]}...")
    
    # 4. REPORTE FINAL
    memoria_final = df_optimized.memory_usage(deep=True).sum()
    reduccion_memoria = ((memoria_original - memoria_final) / memoria_original) * 100
    
    if verbose:
        print(f"\nMemoria final: {memoria_final / 1024**2:.2f} MB")
        print(f"Reducción de memoria: {reduccion_memoria:.1f}%")
        print(f"Cambios realizados: {len(cambios_realizados)} columnas")
    
    return df_optimized

def _detectar_columnas_fecha(df: pd.DataFrame, columnas_especificas: Optional[List[str]] = None) -> List[str]:
    """Detecta columnas que contienen fechas."""
    
    if columnas_especificas:
        return [col for col in columnas_especificas if col in df.columns]
    
    columnas_fecha = []
    
    # Palabras clave que sugieren fechas
    keywords_fecha = ['date', 'time', 'fecha', 'hora', 'timestamp', 'created', 'updated']
    
    for col in df.columns:
        # Por nombre de columna
        if any(keyword in col.lower() for keyword in keywords_fecha):
            columnas_fecha.append(col)
            continue
        
        # Por contenido (muestreo)
        if df[col].dtype == 'object':
            muestra = df[col].dropna().head(100)
            if len(muestra) > 0:
                # Intentar detectar patrones de fecha
                fecha_count = 0
                for valor in muestra:
                    if _es_posible_fecha(str(valor)):
                        fecha_count += 1
                
                if fecha_count / len(muestra) > 0.7:  # 70% parecen fechas
                    columnas_fecha.append(col)
    
    return columnas_fecha

def _es_posible_fecha(valor: str) -> bool:
    """Verifica si un string podría ser una fecha."""
    
    # Patrones comunes de fecha
    patrones_fecha = [
        r'\d{4}-\d{1,2}-\d{1,2}',      # YYYY-MM-DD
        r'\d{1,2}/\d{1,2}/\d{2,4}',    # MM/DD/YYYY o DD/MM/YYYY
        r'\d{4}/\d{1,2}/\d{1,2}',      # YYYY/MM/DD
        r'\d{1,2}-\d{1,2}-\d{2,4}',    # MM-DD-YYYY
    ]
    
    import re
    return any(re.search(patron, valor) for patron in patrones_fecha)

def _convertir_a_datetime(serie: pd.Series, formato: Optional[str] = None) -> pd.Series:
    """Convierte una serie a datetime de forma robusta."""
    
    if formato:
        return pd.to_datetime(serie, format=formato, errors='coerce')
    
    # Intentar conversión automática
    try:
        return pd.to_datetime(serie, errors='coerce', infer_datetime_format=True)
    except:
        # Fallback: intentar formatos comunes
        formatos_comunes = [
            '%Y-%m-%d %H:%M:%S',
            '%Y-%m-%d',
            '%m/%d/%Y %H:%M:%S',
            '%m/%d/%Y',
            '%d/%m/%Y',
            '%Y/%m/%d'
        ]
        
        for fmt in formatos_comunes:
            try:
                return pd.to_datetime(serie, format=fmt, errors='coerce')
            except:
                continue
        
        # Si todo falla, conversión con errores='coerce'
        return pd.to_datetime(serie, errors='coerce')

def _detectar_columnas_string(df: pd.DataFrame, 
                             columnas_especificas: Optional[List[str]] = None,
                             excluir_columnas: List[str] = None) -> List[str]:
    """Detecta columnas que deben ser tratadas como string."""
    
    if excluir_columnas is None:
        excluir_columnas = []
    
    if columnas_especificas:
        return [col for col in columnas_especificas if col in df.columns and col not in excluir_columnas]
    
    columnas_string = []
    
    # Palabras clave que sugieren strings
    keywords_string = ['name', 'nombre', 'id', 'codigo', 'description', 'descripcion', 
                      'category', 'categoria', 'type', 'tipo', 'status', 'estado',
                      'estacion', 'location', 'ubicacion']
    
    for col in df.columns:
        if col in excluir_columnas:
            continue
            
        # Por nombre de columna
        if any(keyword in col.lower() for keyword in keywords_string):
            columnas_string.append(col)
            continue
        
        # Por tipo actual
        if df[col].dtype == 'object':
            # Verificar si no es numérica
            muestra = df[col].dropna().head(100)
            if len(muestra) > 0:
                numerico_count = 0
                for valor in muestra:
                    try:
                        float(str(valor))
                        numerico_count += 1
                    except:
                        pass
                
                if numerico_count / len(muestra) < 0.8:  # Menos del 80% son numéricos
                    columnas_string.append(col)
    
    return columnas_string

def _identificar_columnas_numericas(df: pd.DataFrame, 
                                   excluir_fecha: List[str],
                                   excluir_string: List[str]) -> List[str]:
    """Identifica columnas que deben ser tratadas como numéricas."""
    
    excluir_total = set(excluir_fecha + excluir_string)
    
    columnas_numericas = []
    
    for col in df.columns:
        if col in excluir_total:
            continue
        
        # Si ya es numérica o si es object que se puede convertir
        if pd.api.types.is_numeric_dtype(df[col]) or df[col].dtype == 'object':
            # Verificar si se puede convertir a numérico
            muestra = df[col].dropna().head(100)
            if len(muestra) > 0:
                convertible_count = 0
                for valor in muestra:
                    try:
                        pd.to_numeric(valor)
                        convertible_count += 1
                    except:
                        pass
                
                if convertible_count / len(muestra) > 0.8:  # 80% convertibles
                    columnas_numericas.append(col)
    
    return columnas_numericas

def _optimizar_columna_numerica(serie: pd.Series, reducir_memoria: bool = True) -> str:
    """Determina el tipo numérico óptimo para una serie."""
    
    # Convertir a numérico si no lo es
    if not pd.api.types.is_numeric_dtype(serie):
        serie_num = pd.to_numeric(serie, errors='coerce')
    else:
        serie_num = serie
    
    if not reducir_memoria:
        return 'float64'
    
    # Verificar si hay valores infinitos o solo NaN
    datos_validos = serie_num.dropna()
    if len(datos_validos) == 0:
        return 'float64'  # Solo NaN, usar float64
    
    # Verificar si hay valores infinitos
    if np.isinf(datos_validos).any():
        return 'float64'  # Hay infinitos, usar float64
    
    # Verificar si son todos enteros (sin decimales)
    try:
        es_entero = datos_validos.apply(lambda x: float(x).is_integer()).all()
    except:
        es_entero = False
    
    if es_entero and len(datos_validos) > 0:
        # Es entero - verificar rangos
        min_val = datos_validos.min()
        max_val = datos_validos.max()
        
        # Si hay NaN en la serie original, usar tipos nullable
        tiene_nulos = serie_num.isna().any()
        
        if tiene_nulos:
            # Usar tipos nullable de pandas (Int64, etc.)
            if min_val >= 0:  # Unsigned
                if max_val <= 255:
                    return 'UInt8'
                elif max_val <= 65535:
                    return 'UInt16'
                elif max_val <= 4294967295:
                    return 'UInt32'
                else:
                    return 'UInt64'
            else:  # Signed
                if min_val >= -128 and max_val <= 127:
                    return 'Int8'
                elif min_val >= -32768 and max_val <= 32767:
                    return 'Int16'
                elif min_val >= -2147483648 and max_val <= 2147483647:
                    return 'Int32'
                else:
                    return 'Int64'
        else:
            # Sin nulos, usar tipos estándar
            if min_val >= 0:  # Unsigned
                if max_val <= 255:
                    return 'uint8'
                elif max_val <= 65535:
                    return 'uint16'
                elif max_val <= 4294967295:
                    return 'uint32'
                else:
                    return 'uint64'
            else:  # Signed
                if min_val >= -128 and max_val <= 127:
                    return 'int8'
                elif min_val >= -32768 and max_val <= 32767:
                    return 'int16'
                elif min_val >= -2147483648 and max_val <= 2147483647:
                    return 'int32'
                else:
                    return 'int64'
    else:
        # Es float o tiene decimales
        return 'float32' if reducir_memoria else 'float64'

def aplicar_a_diccionario_dataframes(dataframes_dict: Dict[str, pd.DataFrame], 
                                   **kwargs) -> Dict[str, pd.DataFrame]:
    """
    Aplica optimización de tipos a un diccionario de DataFrames.
    
    Parameters:
    -----------
    dataframes_dict : Dict[str, pd.DataFrame]
        Diccionario con DataFrames a optimizar
    **kwargs : argumentos para optimizar_tipos_datos
    
    Returns:
    --------
    Dict[str, pd.DataFrame] : Diccionario con DataFrames optimizados
    """
    
    dataframes_optimizados = {}
    
    for nombre, df in dataframes_dict.items():
        print(f"\n--- Optimizando {nombre} ---")
        df_optimizado = optimizar_tipos_datos(df, **kwargs)
        dataframes_optimizados[nombre] = df_optimizado
    
    return dataframes_optimizados

# EJEMPLO DE USO ESPECÍFICO PARA TU CASO
def optimizar_estaciones_itesm(dataframes_dict: Dict[str, pd.DataFrame]) -> Dict[str, pd.DataFrame]:
    """
    Optimización específica para los DataFrames de estaciones ITESM.
    """
    
    return aplicar_a_diccionario_dataframes(
        dataframes_dict,
        columnas_fecha=['date_index'],
        columnas_string=['estacion'],
        reducir_memoria=True,
        verbose=True
    )

In [57]:
df_22_opt = optimizar_tipos_datos(df = df_22, 
                         columnas_fecha = ['date_index'],
                         columnas_string = ['estacion','archivo_origen'],
                         reducir_memoria = True,
                         verbose = True)
df_23_opt = optimizar_tipos_datos(df = df_23, 
                         columnas_fecha = ['date_index'],
                         columnas_string = ['estacion','archivo_origen'],
                         reducir_memoria = True,
                         verbose = True)
df_24_opt = optimizar_tipos_datos(df = df_24_renamed, 
                         columnas_fecha = ['date_index'],
                         columnas_string = ['estacion','archivo_origen'],
                         reducir_memoria = True,
                         verbose = True)

Memoria original: 53.23 MB
Columnas a procesar: ['date_index', 'co', 'no', 'no2', 'nox', 'o3', 'pm10', 'pm2.5', 'prs', 'rainf', 'rh', 'so2', 'sr', 'tout', 'wsr', 'wdr', 'estacion', 'archivo_origen']
✓ date_index: convertida a datetime
✓ co: optimizada a float32
✓ no: optimizada a float32
✓ no2: optimizada a float32
✓ nox: optimizada a float32
✓ o3: optimizada a float32


  return pd.to_datetime(serie, errors='coerce', infer_datetime_format=True)


✓ pm10: optimizada a float32
✓ pm2.5: optimizada a float32
✓ prs: optimizada a float32
✓ rainf: optimizada a float32
✓ rh: optimizada a float32
✓ so2: optimizada a float32
✓ sr: optimizada a float32
✓ tout: optimizada a float32
✓ wsr: optimizada a float32
✓ wdr: optimizada a Int16

Memoria final: 41.26 MB
Reducción de memoria: 22.5%
Cambios realizados: 16 columnas
Memoria original: 54.88 MB
Columnas a procesar: ['date_index', 'co', 'no', 'no2', 'nox', 'o3', 'pm10', 'pm2.5', 'prs', 'rainf', 'rh', 'so2', 'sr', 'tout', 'wsr', 'wdr', 'estacion', 'archivo_origen']
✓ date_index: convertida a datetime
✓ co: optimizada a float32
✓ no: optimizada a float32
✓ no2: optimizada a float32
✓ nox: optimizada a float32
✓ o3: optimizada a float32


  return pd.to_datetime(serie, errors='coerce', infer_datetime_format=True)


✓ pm10: optimizada a float32
✓ pm2.5: optimizada a float32
✓ prs: optimizada a float32
✓ rainf: optimizada a float32
✓ rh: optimizada a Int16
✓ so2: optimizada a float32
✓ sr: optimizada a float32
✓ tout: optimizada a float32
✓ wsr: optimizada a float32
✓ wdr: optimizada a UInt16

Memoria final: 42.58 MB
Reducción de memoria: 22.4%
Cambios realizados: 16 columnas
Memoria original: 41.56 MB
Columnas a procesar: ['date_index', 'co', 'no', 'no2', 'nox', 'o3', 'pm10', 'pm2.5', 'prs', 'rainf', 'rh', 'so2', 'sr', 'tout', 'wsr', 'wdr', 'estacion', 'archivo_origen']
✓ date_index: convertida a datetime
✓ co: optimizada a float32
✓ no: optimizada a float32
✓ no2: optimizada a float32
✓ nox: optimizada a float32


  return pd.to_datetime(serie, errors='coerce', infer_datetime_format=True)


✓ o3: optimizada a float32
✓ pm10: optimizada a float32
✓ pm2.5: optimizada a float32
✓ prs: optimizada a float32
✓ rainf: optimizada a float32
✓ rh: optimizada a float32
✓ so2: optimizada a float32
✓ sr: optimizada a float32
✓ tout: optimizada a float32
✓ wsr: optimizada a float32
✓ wdr: optimizada a float32

Memoria final: 26.48 MB
Reducción de memoria: 36.3%
Cambios realizados: 16 columnas


In [58]:
from typing import List, Dict, Union, Optional, Tuple

def concatenar_dfs_temporales(dataframes: List[pd.DataFrame],
                            columna_fecha: str,
                            años: List[int],
                            validar_orden: bool = True) -> pd.DataFrame:
    """
    Concatena DataFrames con columnas datetime sin solapamiento temporal.
    Cada DataFrame aporta datos de un año específico.
    
    Parameters:
    -----------
    dataframes : List[pd.DataFrame]
        Lista de DataFrames en orden temporal
    columna_fecha : str
        Nombre de la columna con fechas (datetime64[ns])
    años : List[int]
        Años correspondientes a cada DataFrame (mismo orden)
    validar_orden : bool
        Si validar que los DataFrames estén en orden cronológico
    
    Returns:
    --------
    pd.DataFrame : DataFrame concatenado y ordenado temporalmente
    """
    
    if len(dataframes) != len(años):
        raise ValueError("El número de DataFrames debe coincidir con el número de años")
    
    if len(dataframes) == 0:
        raise ValueError("Debe proporcionar al menos un DataFrame")
    
    print(f"Concatenando {len(dataframes)} DataFrames para años {años}")
    
    dfs_filtrados = []
    
    # Filtrar cada DataFrame por su año correspondiente
    for i, (df, año) in enumerate(zip(dataframes, años)):
        print(f"\nProcesando DataFrame {i+1} (año {año}):")
        print(f"  - Registros originales: {len(df)}")
        
        # Validar que la columna existe
        if columna_fecha not in df.columns:
            raise ValueError(f"Columna '{columna_fecha}' no encontrada en DataFrame {i+1}")
        
        # Validar que es datetime
        if not pd.api.types.is_datetime64_any_dtype(df[columna_fecha]):
            raise ValueError(f"Columna '{columna_fecha}' debe ser datetime64[ns] en DataFrame {i+1}")
        
        # Filtrar por año
        df_año = df[df[columna_fecha].dt.year == año].copy()
        
        print(f"  - Registros del año {año}: {len(df_año)}")
        
        if len(df_año) == 0:
            print(f"  ⚠ Advertencia: No hay datos para el año {año}")
            continue
            
        # Información del rango temporal
        fecha_min = df_año[columna_fecha].min()
        fecha_max = df_año[columna_fecha].max()
        print(f"  - Rango: {fecha_min} a {fecha_max}")
        
        dfs_filtrados.append(df_año)
    
    if len(dfs_filtrados) == 0:
        raise ValueError("No se encontraron datos para ningún año especificado")
    
    # Concatenar todos los DataFrames
    print(f"\nConcatenando {len(dfs_filtrados)} DataFrames...")
    df_concatenado = pd.concat(dfs_filtrados, ignore_index=True)
    
    print(f"  - Total registros después de concatenar: {len(df_concatenado)}")
    
    # Ordenar por fecha
    df_ordenado = df_concatenado.sort_values(by=columna_fecha).reset_index(drop=True)
    
    # Validar que no hay solapamiento si se solicita
    if validar_orden and len(dfs_filtrados) > 1:
        validar_no_solapamiento(df_ordenado, columna_fecha, años)
    
    # Información final
    fecha_inicio = df_ordenado[columna_fecha].min()
    fecha_fin = df_ordenado[columna_fecha].max()
    
    print(f"\n✓ Concatenación completada:")
    print(f"  - Registros finales: {len(df_ordenado)}")
    print(f"  - Período total: {fecha_inicio} a {fecha_fin}")
    print(f"  - Años incluidos: {sorted(df_ordenado[columna_fecha].dt.year.unique())}")
    
    return df_ordenado

def validar_no_solapamiento(df: pd.DataFrame, 
                          columna_fecha: str, 
                          años_esperados: List[int]) -> None:
    """
    Valida que no hay solapamiento temporal entre años.
    """
    print("\nValidando ausencia de solapamiento...")
    
    años_encontrados = df[columna_fecha].dt.year.unique()
    
    # Verificar que solo tenemos los años esperados
    años_extra = set(años_encontrados) - set(años_esperados)
    if años_extra:
        print(f"  ⚠ Advertencia: Años no esperados encontrados: {sorted(años_extra)}")
    
    # Verificar continuidad temporal entre años consecutivos
    for i in range(len(años_esperados) - 1):
        año_actual = años_esperados[i]
        año_siguiente = años_esperados[i + 1]
        
        if año_actual in años_encontrados and año_siguiente in años_encontrados:
            # Última fecha del año actual
            ultima_fecha_actual = df[df[columna_fecha].dt.year == año_actual][columna_fecha].max()
            # Primera fecha del año siguiente
            primera_fecha_siguiente = df[df[columna_fecha].dt.year == año_siguiente][columna_fecha].min()
            
            print(f"  - Transición {año_actual}→{año_siguiente}: "
                  f"{ultima_fecha_actual} → {primera_fecha_siguiente}")
    
    print("  ✓ Validación completada")

def concatenar_diccionario_estaciones(dataframes_dict: Dict[str, List[pd.DataFrame]],
                                    columna_fecha: str,
                                    años: List[int]) -> Dict[str, pd.DataFrame]:
    """
    Concatena DataFrames temporales para múltiples estaciones.
    
    Parameters:
    -----------
    dataframes_dict : Dict[str, List[pd.DataFrame]]
        Diccionario donde cada clave es una estación y el valor es una lista de DataFrames
    columna_fecha : str
        Nombre de la columna con fechas
    años : List[int]
        Años correspondientes a cada DataFrame
    
    Returns:
    --------
    Dict[str, pd.DataFrame] : Diccionario con DataFrames concatenados por estación
    """
    
    resultado = {}
    
    print("=" * 60)
    print("CONCATENACIÓN MASIVA POR ESTACIONES")
    print("=" * 60)
    
    for estacion, lista_dfs in dataframes_dict.items():
        print(f"\n--- Procesando estación: {estacion} ---")
        
        try:
            df_concatenado = concatenar_dfs_temporales(
                dataframes=lista_dfs,
                columna_fecha=columna_fecha,
                años=años,
                validar_orden=True
            )
            resultado[estacion] = df_concatenado
            
        except Exception as e:
            print(f"  ❌ Error procesando {estacion}: {e}")
            continue
    
    print(f"\n✓ Procesamiento completado para {len(resultado)} estaciones")
    return resultado

def extraer_año_especifico(df: pd.DataFrame, 
                         columna_fecha: str, 
                         año: int) -> pd.DataFrame:
    """
    Extrae datos de un año específico de un DataFrame.
    
    Parameters:
    -----------
    df : pd.DataFrame
        DataFrame con datos temporales
    columna_fecha : str
        Nombre de la columna con fechas
    año : int
        Año a extraer
    
    Returns:
    --------
    pd.DataFrame : DataFrame filtrado por año
    """
    
    if columna_fecha not in df.columns:
        raise ValueError(f"Columna '{columna_fecha}' no encontrada")
    
    if not pd.api.types.is_datetime64_any_dtype(df[columna_fecha]):
        raise ValueError(f"Columna '{columna_fecha}' debe ser datetime64[ns]")
    
    df_año = df[df[columna_fecha].dt.year == año].copy()
    
    if len(df_año) == 0:
        print(f"⚠ No se encontraron datos para el año {año}")
    else:
        fecha_min = df_año[columna_fecha].min()
        fecha_max = df_año[columna_fecha].max()
        print(f"✓ Extraídos {len(df_año)} registros del año {año} ({fecha_min} a {fecha_max})")
    
    return df_año.reset_index(drop=True)

def verificar_continuidad_temporal(df: pd.DataFrame, 
                                 columna_fecha: str,
                                 frecuencia_esperada: str = 'H') -> Dict[str, any]:
    """
    Verifica la continuidad temporal de un DataFrame concatenado.
    
    Parameters:
    -----------
    df : pd.DataFrame
        DataFrame con datos temporales
    columna_fecha : str
        Nombre de la columna con fechas
    frecuencia_esperada : str
        Frecuencia esperada ('H' para horaria, 'D' para diaria, etc.)
    
    Returns:
    --------
    Dict : Información sobre la continuidad temporal
    """
    
    df_sorted = df.sort_values(columna_fecha)
    
    # Crear rango temporal completo esperado
    fecha_inicio = df_sorted[columna_fecha].min()
    fecha_fin = df_sorted[columna_fecha].max()
    
    rango_completo = pd.date_range(start=fecha_inicio, end=fecha_fin, freq=frecuencia_esperada)
    fechas_existentes = set(df_sorted[columna_fecha])
    fechas_esperadas = set(rango_completo)
    
    # Encontrar gaps
    fechas_faltantes = fechas_esperadas - fechas_existentes
    fechas_duplicadas = df_sorted[columna_fecha].duplicated()
    
    resultado = {
        'total_esperado': len(fechas_esperadas),
        'total_encontrado': len(fechas_existentes),
        'fechas_faltantes': len(fechas_faltantes),
        'fechas_duplicadas': fechas_duplicadas.sum(),
        'porcentaje_completitud': (len(fechas_existentes) / len(fechas_esperadas)) * 100,
        'gaps_mayores': []
    }
    
    # Identificar gaps mayores (más de 24 horas)
    if fechas_faltantes:
        fechas_faltantes_sorted = sorted(fechas_faltantes)
        gap_actual = []
        
        for i, fecha in enumerate(fechas_faltantes_sorted):
            if not gap_actual:
                gap_actual = [fecha]
            elif (fecha - gap_actual[-1]).total_seconds() <= 3600:  # 1 hora
                gap_actual.append(fecha)
            else:
                if len(gap_actual) > 24:  # Gap mayor a 24 horas
                    resultado['gaps_mayores'].append({
                        'inicio': gap_actual[0],
                        'fin': gap_actual[-1],
                        'duracion_horas': len(gap_actual)
                    })
                gap_actual = [fecha]
    
    print(f"\nCONTINUIDAD TEMPORAL:")
    print(f"  - Completitud: {resultado['porcentaje_completitud']:.1f}%")
    print(f"  - Registros esperados: {resultado['total_esperado']}")
    print(f"  - Registros encontrados: {resultado['total_encontrado']}")
    print(f"  - Fechas faltantes: {resultado['fechas_faltantes']}")
    print(f"  - Fechas duplicadas: {resultado['fechas_duplicadas']}")
    print(f"  - Gaps mayores a 24h: {len(resultado['gaps_mayores'])}")
    
    return resultado

In [59]:
# Tienes df_2023, df_2024, df_2025...
df_completo = concatenar_dfs_temporales(
    dataframes=[df_22_opt, df_23_opt, df_24_opt],
    columna_fecha='date_index',
    años=[2022, 2023, 2024]
)

Concatenando 3 DataFrames para años [2022, 2023, 2024]

Procesando DataFrame 1 (año 2022):
  - Registros originales: 205805
  - Registros del año 2022: 123383
  - Rango: 2022-01-01 00:00:00 a 2022-12-31 23:00:00

Procesando DataFrame 2 (año 2023):
  - Registros originales: 208050
  - Registros del año 2023: 131370
  - Rango: 2023-01-01 00:00:00 a 2023-12-31 23:00:00

Procesando DataFrame 3 (año 2024):
  - Registros originales: 131741
  - Registros del año 2024: 131741
  - Rango: 2024-01-01 00:00:00 a 2024-12-31 23:00:00

Concatenando 3 DataFrames...
  - Total registros después de concatenar: 386494

Validando ausencia de solapamiento...
  - Transición 2022→2023: 2022-12-31 23:00:00 → 2023-01-01 00:00:00
  - Transición 2023→2024: 2023-12-31 23:00:00 → 2024-01-01 00:00:00
  ✓ Validación completada

✓ Concatenación completada:
  - Registros finales: 386494
  - Período total: 2022-01-01 00:00:00 a 2024-12-31 23:00:00
  - Años incluidos: [np.int32(2022), np.int32(2023), np.int32(2024)]


## Export the Tabla Maestra

In [63]:
path_save = r'data/processed'
df_master_table_name = 'master_table.csv'
master_table_path = Path(path_save).joinpath(df_master_table_name)


In [62]:
def exportar_csv_optimizado(df: pd.DataFrame, 
                           path: Path, 
                           columna_fecha: str = 'date_index',
                           resetear_index: bool = True) -> None:
    """
    Exporta DataFrame a CSV con configuración optimizada.
    
    Parameters:
    -----------
    df : pd.DataFrame
        DataFrame a exportar
    path : Path
        Ruta donde guardar el archivo
    columna_fecha : str
        Nombre de la columna de fecha para ordenar
    resetear_index : bool
        Si True, resetea el índice antes de guardar
    """
    
    # 1. HACER COPIA PARA NO MODIFICAR EL ORIGINAL
    df_export = df.copy()
    
    # 2. ORDENAR POR FECHA
    if columna_fecha in df_export.columns:
        df_export = df_export.sort_values(by=columna_fecha)
    
    # 3. RESETEAR ÍNDICE PARA QUE QUEDE ORDENADO
    if resetear_index:
        df_export = df_export.reset_index(drop=True)
    
    # 4. CREAR DIRECTORIO SI NO EXISTE
    path.parent.mkdir(parents=True, exist_ok=True)
    
    # 5. GUARDAR SIN ÍNDICE
    df_export.to_csv(path, index=False)
    
    print(f"✓ Archivo guardado: {path}")
    print(f"  Dimensiones: {df_export.shape}")
    if columna_fecha in df_export.columns:
        print(f"  Rango fechas: {df_export[columna_fecha].min()} a {df_export[columna_fecha].max()}")

# PARA TU CASO ESPECÍFICO:
def guardar_master_table(df: pd.DataFrame) -> Path:
    """Guarda la master table con configuración optimizada."""
    
    path_save = r'data/processed'
    df_master_table_name = 'master_table.csv'
    master_table_path = Path(path_save) / df_master_table_name
    
    # Exportar con configuración optimizada
    exportar_csv_optimizado(df, master_table_path, 'date_index')
    
    return master_table_path

# VERSIÓN COMPACTA PARA USO RÁPIDO
def quick_export(df: pd.DataFrame, path: str, fecha_col: str = 'date_index') -> None:
    """Exportación rápida con todos los arreglos."""
    
    # Todo en una línea (sin modificar el original)
    df_sorted = df.sort_values(by=fecha_col).reset_index(drop=True)
    
    # Asegurar directorio y guardar
    path_obj = Path(path)
    path_obj.parent.mkdir(parents=True, exist_ok=True)
    df_sorted.to_csv(path_obj, index=False)

In [64]:
master_table_path = guardar_master_table(df_completo)

✓ Archivo guardado: data/processed/master_table.csv
  Dimensiones: (386494, 18)
  Rango fechas: 2022-01-01 00:00:00 a 2024-12-31 23:00:00
