# ETL Mensual - Procesamiento Incremental

Este notebook ejecuta el proceso ETL mensual para las fuentes de datos:
- **Inspecciones**: archivos `inspecciones_YYYY_MM.xlsx` en `data/raw/inspecciones/`
- **Consumo**: archivos `consumo_YYYY_MM.xlsx` en `data/raw/consumo/`

## Flujo:
1. Detecta meses pendientes (archivos en `raw/` que no tienen parquet en `interim/`)
2. Procesa cada mes: limpia, normaliza y guarda en `data/interim/<fuente>/year=YYYY/month=MM/`
3. Solo procesa meses nuevos (incremental) - salvo que se fuerce `overwrite=True`

In [1]:
import os
import sys
import pandas as pd
from tqdm import tqdm

# Configurar paths
module_path = os.path.abspath(os.path.join('..'))
if module_path not in sys.path:
    sys.path.append(module_path)

# Activar autoreload para desarrollo
%load_ext autoreload
%autoreload 2

In [2]:
# Instalar dependencias si es necesario
try:
    import openpyxl
except ImportError:
    print("Instalando openpyxl...")
    %pip install openpyxl

from src.data import etl

## Configuración

Rutas, fuentes y `overwrite` se leen de **config/config.yaml** (paths, etl.sources, etl.overwrite).

In [3]:
# Cargar configuración desde config/config.yaml
from src.config import load_config, get_paths

cfg = load_config()
paths = get_paths(cfg)
RAW_DIR = paths["raw"]
INTERIM_DIR = paths["interim"]

etl_cfg = cfg.get("etl", {})
SOURCES = etl_cfg.get("sources", ["inspecciones", "consumo"])
OVERWRITE = etl_cfg.get("overwrite", False)

## Ejecutar ETL Mensual

Ejecuta el proceso ETL incremental. Solo procesa meses nuevos por defecto.

In [4]:
%%time

# Ejecutar ETL mensual
summary = etl.run_monthly_etl(
    raw_dir=RAW_DIR,
    interim_dir=INTERIM_DIR,
    sources=SOURCES,
    overwrite=OVERWRITE
)

CPU times: total: 0 ns
Wall time: 12.1 ms


## Resumen de Procesamiento

In [5]:
# Mostrar resumen
print("\n" + "="*60)
print("RESUMEN FINAL")
print("="*60)
for source, stats in summary.items():
    print(f"\n{source.upper()}:")
    print(f"  - Procesados: {stats['processed']}")
    print(f"  - Saltados: {stats['skipped']}")
    print(f"  - Total pendientes encontrados: {stats['total_pending']}")


RESUMEN FINAL

INSPECCIONES:
  - Procesados: 0
  - Saltados: 0
  - Total pendientes encontrados: 0

CONSUMO:
  - Procesados: 0
  - Saltados: 0
  - Total pendientes encontrados: 0


## Verificar Archivos Procesados

Lista los parquets generados en `interim/` para verificar.

In [7]:
import glob
import re

# Listar archivos procesados
for source in SOURCES:
    pattern = os.path.join(INTERIM_DIR, source, "year=*", "month=*", f"{source}.parquet")
    files = sorted(glob.glob(pattern))
    print(f"\n{source.upper()}: {len(files)} archivos procesados")
    if len(files) <= 10:
        for f in files:
            # Extraer year/month del path
            match = re.search(r"year=(\d{4})/month=(\d{2})", f.replace("\\", "/"))
            if match:
                print(f"  - {match.group(1)}-{match.group(2)}")
    else:
        print(f"  (mostrando primeros 5 y últimos 5)")
        for f in files[:5]:
            match = re.search(r"year=(\d{4})/month=(\d{2})", f.replace("\\", "/"))
            if match:
                print(f"  - {match.group(1)}-{match.group(2)}")
        print("  ...")
        for f in files[-5:]:
            match = re.search(r"year=(\d{4})/month=(\d{2})", f.replace("\\", "/"))
            if match:
                print(f"  - {match.group(1)}-{match.group(2)}")


INSPECCIONES: 44 archivos procesados
  (mostrando primeros 5 y últimos 5)
  - 2021-01
  - 2021-02
  - 2021-03
  - 2021-04
  - 2021-05
  ...
  - 2024-04
  - 2024-05
  - 2024-06
  - 2024-07
  - 2024-08

CONSUMO: 62 archivos procesados
  (mostrando primeros 5 y últimos 5)
  - 2020-01
  - 2020-02
  - 2020-03
  - 2020-04
  - 2020-05
  ...
  - 2024-10
  - 2024-11
  - 2024-12
  - 2025-01
  - 2025-02
