# Clase 3 – Ingesta y Capa Bronce

En esta notebook se inicia la construcción del pipeline de datos meteorológicos, trabajando con los archivos crudos provistos por el SMN.


## 1. Librerías necesarias

In [79]:
import pandas as pd
import numpy as np
import re
import os
from pathlib import Path


## 2. Configuración de paths y carpetas

In [80]:
BASE_DIR = Path('..').resolve()
RAW_DIR = BASE_DIR / 'data' / 'raw'
BRONCE_DIR = BASE_DIR / 'data' / 'bronce'

# Crear carpetas si no existen
for path in [BRONCE_DIR]:
    path.mkdir(parents=True, exist_ok=True)


## 3. Lectura del archivo de estaciones

In [81]:
# Ruta del archivo
archivo_estaciones = RAW_DIR / 'estaciones' / 'estaciones_smn.txt'

# Leer todas las líneas, omitiendo las dos primeras (encabezado y unidades)
with open(archivo_estaciones, "r", encoding="latin1") as f:
    lines = f.readlines()[2:]

# Expresión regular para extraer campos:
pattern = re.compile(
    r"^(?P<nombre>.+?)\s{2,}(?P<provincia>.+?)\s{2,}(?P<lat_gr>-?\d+)\s+(?P<lat_min>\d+)\s+(?P<lon_gr>-?\d+)\s+(?P<lon_min>\d+)\s+(?P<altura_m>\d+)\s+(?P<numero>\d+)\s+(?P<numero_oaci>\S+)\s*$"
)

# Extraer los datos
data = []
for line in lines:
    match = pattern.match(line)
    if match:
        data.append(match.groupdict())

# Crear DataFrame
df_estaciones = pd.DataFrame(data)

# Conversión de tipos
df_estaciones[['lat_gr', 'lat_min', 'lon_gr', 'lon_min', 'altura_m', 'numero']] = df_estaciones[[
    'lat_gr', 'lat_min', 'lon_gr', 'lon_min', 'altura_m', 'numero'
]].apply(pd.to_numeric)

print("Estaciones cargadas:", len(df_estaciones))

Estaciones cargadas: 117


## 4. Selección de estaciones de Misiones

In [83]:
df_misiones = df_estaciones[df_estaciones['provincia'].str.upper() == 'MISIONES']
df_misiones[['nombre', 'provincia', 'numero', 'numero_oaci']]


Unnamed: 0,nombre,provincia,numero,numero_oaci
77,BERNARDO DE IRIGOYEN AERO,MISIONES,87163,SATI
78,IGUAZU AERO,MISIONES,87097,SARI
79,OBERA,MISIONES,87187,SATO
80,POSADAS AERO,MISIONES,87178,SARP


## 5. Lectura de un archivo horario de ejemplo

In [84]:
archivo_dato = RAW_DIR / 'datohorario' / 'datohorario20240601.txt'
df_dato = pd.read_csv(archivo_dato, sep=';', encoding='latin1')
df_dato.head()


Unnamed: 0,FECHA HORA TEMP HUM PNM DD FF NOMBRE
0,[HOA] [ºC] [%] [hPa] [gr] [km/hr...
1,01062024 0 14.2 82 1015.7 50 17 ...
2,01062024 1 14.3 80 1015.4 360 9 ...
3,01062024 2 14.1 86 1015.3 360 9 ...
4,01062024 3 14.1 87 1014.8 360 7 ...


## 6. Limpieza básica y detección de nulos

In [85]:
# Contar valores a reemplazar antes de la limpieza
cant_9999_9 = (df_dato == 9999.9).sum().sum()
cant_neg9999 = (df_dato == -9999).sum().sum()

# Reemplazar por NaN
df_dato.replace({9999.9: np.nan, -9999: np.nan}, inplace=True)

# Imprimir resumen
print(f"Reemplazados {cant_9999_9} valores de 9999.9 y {cant_neg9999} valores de -9999 por NaN.")
print("Valores faltantes por columna luego del reemplazo:")
print(df_dato.isna().sum())


Reemplazados 0 valores de 9999.9 y 0 valores de -9999 por NaN.
Valores faltantes por columna luego del reemplazo:
FECHA     HORA  TEMP   HUM   PNM    DD    FF     NOMBRE                                                 0
dtype: int64


## 7. Filtro por estación de Misiones

In [86]:
archivo_dato = RAW_DIR / 'datohorario' / 'datohorario20240601.txt'

# Leer todas las líneas, omitiendo las dos primeras (encabezado y unidades)
with open(archivo_dato, "r", encoding="latin1") as f:
    lines = f.readlines()

# Detectar columnas separadas por múltiples espacios
columnas = re.split(r"\s{2,}", lines[0].strip())

# Leer datos
data = [
    re.split(r"\s{2,}", line.strip(), maxsplit=len(columnas)-1)
    for line in lines[1:]
    if len(line.strip()) > 0 and not line.isspace()
]

df_dato = pd.DataFrame(data, columns=columnas)

# Filtrar por estaciones de Misiones
df_dato["NOMBRE"] = df_dato["NOMBRE"].str.strip()
nombres_misiones = df_misiones["nombre"].str.strip().unique()
df_misiones_dia = df_dato[df_dato["NOMBRE"].isin(nombres_misiones)]
#print(df_misiones_dia.head())

# Mostrar todos los resultados (sin limitar con .head())
print(df_misiones_dia.to_string(index=False))


   FECHA HORA TEMP HUM    PNM  DD FF       NOMBRE
01062024    0 13.8  91 1019.6  90  7  IGUAZU AERO
01062024    1 13.4  92 1019.5  90  7  IGUAZU AERO
01062024    2 13.0  94 1019.0  90 11  IGUAZU AERO
01062024    3 12.8  94 1018.3  90  9  IGUAZU AERO
01062024    4 12.4  94 1018.3  90  7  IGUAZU AERO
01062024    5 12.0  94 1018.7  50  4  IGUAZU AERO
01062024    6 10.9  95 1019.1  70  4  IGUAZU AERO
01062024    7 10.5  94 1020.5  90  6  IGUAZU AERO
01062024    8 11.5  91 1021.3  90  4  IGUAZU AERO
01062024    9 13.4  86 1021.4  90 17  IGUAZU AERO
01062024   10 15.6  83 1021.6  90 13  IGUAZU AERO
01062024   11 18.0  75 1021.4  90 15  IGUAZU AERO
01062024   12 19.7  68 1020.7  90 17  IGUAZU AERO
01062024   13 20.4  68 1020.0 110 11  IGUAZU AERO
01062024   14 21.5  65 1019.1 110 13  IGUAZU AERO
01062024   15 22.8  64 1018.3  90  9  IGUAZU AERO
01062024   16 22.6  62 1017.9  90  9  IGUAZU AERO
01062024   17 22.4  68 1017.8  90  9  IGUAZU AERO
01062024   18 19.0  81 1018.1  90  9  IGUAZU AERO


## 8. Exportación de archivos filtrados

In [87]:
# Crear carpeta de salida si no existe
BRONCE_DIR.mkdir(parents=True, exist_ok=True)

# Definir la fecha (puede venir del nombre del archivo)
fecha = "20240601"  # o extraela dinámicamente si lo preferís

# Iterar por cada estación de Misiones
for nombre in nombres_misiones:
    nombre_clean = nombre.lower().replace(' ', '_')
    
    # Filtrar las filas de esa estación
    df_estacion = df_misiones_dia[df_misiones_dia["NOMBRE"] == nombre]
    
    # Definir archivos de salida con fecha al inicio
    salida_csv = BRONCE_DIR / f'{fecha}_{nombre_clean}.csv'
    salida_parquet = BRONCE_DIR / f'{fecha}_{nombre_clean}.parquet'
    
    # Exportar
    df_estacion.to_csv(salida_csv, index=False)
    df_estacion.to_parquet(salida_parquet, index=False)
    
    print(f"Exportado: {salida_csv.name} y {salida_parquet.name}")


Exportado: 20240601_bernardo_de_irigoyen_aero.csv y 20240601_bernardo_de_irigoyen_aero.parquet
Exportado: 20240601_iguazu_aero.csv y 20240601_iguazu_aero.parquet
Exportado: 20240601_obera.csv y 20240601_obera.parquet
Exportado: 20240601_posadas_aero.csv y 20240601_posadas_aero.parquet


## Próximos pasos

- Extender este proceso a más días o meses.
- Organizar las salidas por carpeta `/bronce/{estacion}/{año}/`.
- Documentar el diccionario de variables en `metadata/`.


### 🔟 Paso 9 – Procesamiento por estación y por fecha (con limpieza y reporte resumen)

In [88]:
# Buscar todos los archivos datohorario disponibles
archivos_datos = sorted(glob(str(RAW_DIR / "datohorario" / "datohorario*.txt")))

errores_globales = 0
nulos_total = 0
reemplazados_9999_9 = 0
reemplazados_neg9999 = 0
detalle_nulos = []

for archivo in archivos_datos:
    try:
        with open(archivo, encoding="latin1") as f:
            raw_lines = f.readlines()

        header = raw_lines[0].strip()
        columnas = re.split(r"\s{2,}", header)

        data = [
            re.split(r"\s{2,}", line.strip(), maxsplit=len(columnas)-1)
            for line in raw_lines[1:]
            if len(line.strip()) > 0 and not line.isspace()
        ]

        df_dato = pd.DataFrame(data, columns=columnas)
        df_dato.columns = df_dato.columns.str.strip()
        df_dato["NOMBRE"] = df_dato["NOMBRE"].str.strip()

        # Convertir columnas numéricas si es posible
        for col in df_dato.columns:
            try:
                df_dato[col] = pd.to_numeric(df_dato[col])
            except:
                pass

        # Conteo previo de valores a reemplazar
        cant_9999_9 = (df_dato == 9999.9).sum().sum()
        cant_neg9999 = (df_dato == -9999).sum().sum()

        reemplazados_9999_9 += cant_9999_9
        reemplazados_neg9999 += cant_neg9999

        # Reemplazo
        df_dato.replace({9999.9: np.nan, -9999: np.nan}, inplace=True)

        # Contar nulos
        nulos_total += df_dato.isna().sum().sum()

        # Filtrar por estaciones de Misiones
        df_misiones = df_dato[df_dato["NOMBRE"].isin(nombres_misiones)]

        # Obtener fecha
        fecha = Path(archivo).stem.replace("datohorario", "")

        # Guardar archivos por estación + generar detalle
        for nombre in nombres_misiones:
            nombre_clean = nombre.lower().replace(" ", "_")
            df_estacion = df_misiones[df_misiones["NOMBRE"] == nombre]

            if not df_estacion.empty:
                path_estacion = BRONCE_DIR / nombre_clean
                path_estacion.mkdir(parents=True, exist_ok=True)

                df_estacion.to_parquet(path_estacion / f"{fecha}.parquet", index=False)
                df_estacion.to_csv(path_estacion / f"{fecha}.csv", index=False)

                # Registro de nulos por estación
                nulos_por_col = df_estacion.isna().sum()
                detalle_nulos.append({
                    "archivo": Path(archivo).name,
                    "fecha": fecha,
                    "estacion": nombre,
                    "nulos_totales": int(nulos_por_col.sum()),
                    "nulos_por_columna": json.dumps(nulos_por_col[nulos_por_col > 0].to_dict())
                })

    except Exception as e:
        errores_globales += 1
        continue

# Reporte final
print("✅ Proceso completado.")
print(f"Días procesados: {len(archivos_datos)}")
print(f"Errores al procesar archivos: {errores_globales}")
print(f"Valores reemplazados: {reemplazados_9999_9} de 9999.9 y {reemplazados_neg9999} de -9999")
print(f"Total de valores nulos luego de limpieza: {nulos_total}")

# Guardar resumen general
reporte = {
    "dias_procesados": [len(archivos_datos)],
    "errores": [errores_globales],
    "reemplazados_9999_9": [reemplazados_9999_9],
    "reemplazados_-9999": [reemplazados_neg9999],
    "valores_nulos_totales": [nulos_total],
}
df_reporte = pd.DataFrame(reporte)
df_reporte.to_csv(BRONCE_DIR / "reporte_resumen.csv", index=False)

# Guardar detalle de nulos por archivo y estación
df_detalle = pd.DataFrame(detalle_nulos)
df_detalle.to_csv(BRONCE_DIR / "reporte_nulos_detalle.csv", index=False)

✅ Proceso completado.
Días procesados: 391
Errores al procesar archivos: 391
Valores reemplazados: 0 de 9999.9 y 0 de -9999
Total de valores nulos luego de limpieza: 260790
