In [None]:
import re
import pandas as pd
import unicodedata
from collections import defaultdict
from typing import Dict, List, Tuple, Any

In [None]:
CATEGORIA_VIOLENCIA = "CAT_001"
CATEGORIA_VCM = "CAT_002"

TASA_GENERAL = "TASA_001"
TASA_MUJERES = "TASA_002"


def next_id(prefix: str, counter: defaultdict[str, int], table_type: 'str' = 'normal') -> str:
    """
    Genera un identificador secuencial con prefijo, con ceros a la izquierda.
    Este identificador sigue el formato 'PREFIJO_001', 'PREFIJO_002', etc.,

    Args:
        prefix: Prefijo que representa el tipo de entidad (por ejemplo, "CAT", "SUB", "EVT").
        counter: Diccionario con contador acumulado por prefijo.

    Returns:
        Un string con el nuevo identificador generado, en formato con tres dígitos.
    """
    counter[prefix] += 1
    if table_type == 'normal':
        return f"{prefix}_{str(counter[prefix]).zfill(3)}"
    else:
        return f"{prefix}_{str(counter[prefix]).zfill(5)}"


def normalizar_texto(texto: Any) -> str:
    """
    - Convierte a string.
    - Quita acentos (diacríticos).
    - Elimina espacios duplicados al inicio, fin e intermedios.
    - Aplica title case (o cambia a mayúsculas si lo prefieres).
    """
    s = str(texto)
    s = unicodedata.normalize("NFKD", s)
    s = "".join(ch for ch in s if not unicodedata.combining(ch))
    s = re.sub(r"\s+", " ", s).strip()
    return s.title()


def normalizar_tasa(texto: str) -> str:
    """
    Normaliza el nombre de una tasa a uno de dos formatos estándar.

    Esta función analiza el contenido del texto y determina si corresponde
    a una tasa general o específica para mujeres. La decisión se basa en la
    presencia de la palabra "mujer" en el texto original.

    Args:
        texto: Texto crudo proveniente del encabezado de una columna de tasas.

    Returns:
        Una cadena con el nombre de la tasa normalizado, que puede ser:
            - "Tasa X Cienmil Mujeres"
            - "Tasa X Cienmil Hbtes."
    """
    t = str(texto).strip().lower()
    return "Tasa X Cienmil Mujeres" if "mujer" in t else "Tasa X Cienmil Hbtes."


def es_hoja_valida(nombre_hoja: str) -> bool:
    """
    Args:
        nombre_hoja: Nombre de la hoja en el archivo Excel.

    Returns:
        True si el nombre es numérico y puede interpretarse como un año; False en caso contrario.
    """
    return nombre_hoja.isdigit()

def generar_id_tasa(tipo_tasa: str) -> str:
    """
    Asigna un identificador estándar a un tipo de tasa normalizado.

    Args:
        tipo_tasa: Nombre de la tasa ya normalizado (por ejemplo, obtenido desde `normalizar_tasa`).

    Returns:
        El ID de tasa correspondiente según el tipo:
            - "TASA_001" si el tipo es "Tasa X Cienmil Hbtes."
            - "TASA_002" si el tipo es "Tasa X Cienmil Mujeres"
    """
    return TASA_GENERAL if tipo_tasa == "Tasa X Cienmil Hbtes." else TASA_MUJERES

def cargar_excel(file_path: str) -> Dict[str, pd.DataFrame]:
    """
    Carga todas las hojas de un archivo Excel en un diccionario de DataFrames.

    Args:
        file_path: Ruta completa al archivo Excel (.xlsx).

    Returns:
        Un diccionario donde:
            - las claves son los nombres de las hojas (por ejemplo, "2007", "2023")
            - los valores son los DataFrames correspondientes a cada hoja
    """
    return pd.read_excel(file_path, sheet_name=None)

def extraer_columnas_validas(df: pd.DataFrame) -> Tuple[Dict[str, Tuple[str, str, bool, str]], Dict[str, str]]:
    """
    Extrae pares de columnas válidas que representan casos y tasas, asociadas a una subcategoría y categoría.

    Args:
        df: DataFrame de una hoja del Excel, incluyendo encabezados de categorías y subcategorías.

    Returns:
        Una tupla de dos estructuras:
            - col_to_keys: Diccionario que mapea cada nombre de columna a una tupla:
                (id_categoria, subcategoria, es_tasa, id_tipo_tasa)
                Donde:
                    - es_tasa es True si la columna representa una tasa, False si representa casos.
            - subcat_to_cat_id: Diccionario que mapea cada subcategoría a su ID de categoría asociada.
    """
    col_to_keys = {}
    subcat_counter = defaultdict(int)
    subcat_to_cat_id = {}

    for i in range(2, df.shape[1] - 1, 2):
        raw_subcat = df.iloc[0, i]
        subcat = normalizar_texto(raw_subcat)
        tipo_col = str(df.iloc[1, i]).strip().lower()
        tipo_tasa = normalizar_tasa(str(df.iloc[1, i + 1]))

        if not subcat or subcat.lower() in ["nan", ""] or tipo_col != "casos":
            continue

        subcat_counter[subcat] += 1
        id_categoria = CATEGORIA_VIOLENCIA if subcat_counter[subcat] == 1 else CATEGORIA_VCM
        if id_categoria == CATEGORIA_VIOLENCIA:
            subcat_key = subcat
        else:
            subcat_key = normalizar_texto(f"{subcat} Mujer")
        subcat_to_cat_id[subcat_key] = id_categoria

        id_tasa = generar_id_tasa(tipo_tasa)

        col_to_keys[df.columns[i]] = (id_categoria, subcat_key, False, id_tasa)
        col_to_keys[df.columns[i + 1]] = (id_categoria, subcat_key, True, id_tasa)

    return col_to_keys, subcat_to_cat_id

def limpiar_filas_entidades(df: pd.DataFrame) -> pd.DataFrame:
    """
    Limpia las filas de un DataFrame que contienen información de subregiones y municipios.

    Esta función:
    - Elimina las dos primeras filas (encabezados jerárquicos).
    - Renombra las dos primeras columnas como 'id' y 'nombre'.
    - Descarta filas con valores no válidos como 'nan', 'total' o 'casos'.
    - Elimina registros con valores nulos en las columnas clave.

    Args:
        df: DataFrame crudo proveniente de una hoja de Excel.

    Returns:
        DataFrame limpio y estructurado con columnas ['id', 'nombre'], listo para clasificación.
    """
    df_clean = df.iloc[2:].copy()
    df_clean["orig_index"] = df_clean.index
    df_clean.rename(columns={df_clean.columns[0]: "id", df_clean.columns[1]: "nombre"}, inplace=True)
    df_clean = df_clean[~df_clean["id"].astype(str)
                                .str.lower()
                                .str.contains("nan|casos|total", na=True)]
    df_clean = df_clean[~df_clean["nombre"].astype(str)
                                .str.lower()
                                .str.contains("nan|casos|total", na=True)]
    df_clean = df_clean.dropna(subset=["id", "nombre"])
    return df_clean

def extraer_subregiones_municipios(df_clean: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Separa las subregiones y municipios de un DataFrame limpio, y asocia cada municipio a su subregión.

    Args:
        df_clean: DataFrame limpio que contiene columnas 'id' y 'nombre' con las filas originales del archivo Excel.

    Returns:
        Una tupla con dos DataFrames:
            - subregiones: Contiene columnas ['id_subregion', 'subregion'].
            - municipios: Contiene columnas ['id_municipio', 'municipio', 'id_subregion'].
    """
    df_clean["tipo"] = df_clean["id"].apply(
        lambda x: "subregion" if isinstance(x, str) and str(x).startswith("SR") else "municipio"
    )

    subregiones = df_clean[df_clean["tipo"] == "subregion"][["id", "nombre"]].drop_duplicates().copy()
    subregiones.columns = ["id_subregion", "subregion"]
    subregiones["subregion"] = subregiones["subregion"].apply(normalizar_texto)

    municipios = df_clean[df_clean["tipo"] == "municipio"][
        ["orig_index", "id", "nombre"]
    ].copy()
    municipios.rename(
        columns={
            "id": "id_municipio",
            "nombre": "municipio"
        },
        inplace=True
    )
    municipios["id_subregion"] = (
        df_clean["id"]
        .where(df_clean["tipo"] == "subregion")
        .ffill()
        .loc[municipios.index]
        .values
    )
    municipios["municipio"] = municipios["municipio"].apply(normalizar_texto)
    return subregiones.reset_index(drop=True), municipios.reset_index(drop=True)

def procesar_eventos_hoja(df: pd.DataFrame,
                          col_to_keys: Dict[str, Tuple[str, str, bool, str]],
                          municipios: pd.DataFrame,
                          year: int,
                          counters: defaultdict[str, int],
                          dims: Dict[str, Dict[str, str]],
                          eventos: List[Dict[str, Any]],
                          subcat_to_cat_id: Dict[str, str]) -> None:
    """
    Procesa los datos de una hoja anual y construye los registros para la tabla de hechos 'eventos'.

    Recorre cada fila correspondiente a un municipio, y por cada par de columnas (casos, tasa),
    extrae los valores y construye un evento con su respectiva relación a subcategoría,
    tasa aplicada y año. Las tasas se vinculan por separado luego de encontrar el evento base.

    Si una subcategoría aún no ha sido registrada en las dimensiones, se le asigna un ID nuevo
    y se actualiza su mapeo hacia la categoría correspondiente.

    Args:
        df: DataFrame original de la hoja de Excel, incluyendo encabezados y datos crudos.
        col_to_keys: Diccionario que mapea nombres de columna a una tupla con:
            (id_categoria, nombre_subcategoria, es_tasa, id_tipo_tasa)
        municipios: DataFrame con los municipios que participan en esa hoja.
        year: Año correspondiente a la hoja procesada.
        counters: Contador global de IDs, usado para generar ID_EVENTO y ID_SUBCATEGORIA.
        dims: Diccionario de dimensiones ya construidas (categorías, subcategorías).
        eventos: Lista acumulada de registros de eventos (se modifica en el lugar).
        subcat_to_cat_id: Diccionario que relaciona nombre de subcategoría con su categoría.

    Returns:
        None. Modifica en el lugar las estructuras `eventos`, `dims` y `subcat_to_cat_id`.
    """
    for _, row in municipios.iterrows():
        orig = row["orig_index"]
        for col, (id_categoria, subcat, is_tasa, id_tasa) in col_to_keys.items():
            try:
                value = df.at[orig, col]
                if pd.isnull(value) or (isinstance(value, str) and not value.replace('.', '', 1).isdigit()):
                    continue
            except Exception:
                continue

            if subcat not in dims["subcategorias"]:
                dims["subcategorias"][subcat] = next_id("SUB", counters)
                subcat_to_cat_id[subcat] = id_categoria

            id_subcat = dims["subcategorias"][subcat]

            if not is_tasa:
                eventos.append({
                    "id_evento": next_id("EVT", counters, table_type='event'),
                    "id_municipio": row["id_municipio"],
                    "id_subcategoria": id_subcat,
                    "id_tipo_tasa": id_tasa,
                    "casos": value,
                    "valor_tasa": None,
                    "año": year
                })
            else:
                for r in reversed(eventos):
                    if (
                        r["id_municipio"] == row["id_municipio"]
                        and r["id_subcategoria"] == id_subcat
                        and r["id_tipo_tasa"] == id_tasa
                        and r["año"] == year
                        and r["valor_tasa"] is None
                    ):
                        r["valor_tasa"] = value
                        break

def construir_modelo_copo_nieve(xls: Dict[str, pd.DataFrame]) -> Dict[str, pd.DataFrame]:
    """
    Construye un modelo de datos en forma de copo de nieve a partir de múltiples hojas de un archivo Excel.

    Esta función orquesta todo el procesamiento de datos para transformar un archivo Excel
    con hojas anuales de eventos de violencia en Antioquia, en un conjunto de tablas limpias y
    relacionadas. Procesa eventos, subregiones, municipios, categorías, subcategorías y tipos de tasa,
    siguiendo una estructura dimensional apropiada para análisis OLAP o BI.

    Para cada hoja:
    - Verifica que represente un año válido.
    - Extrae las columnas válidas de eventos.
    - Limpia y separa subregiones y municipios.
    - Procesa y normaliza los eventos.

    El resultado final incluye seis tablas interrelacionadas: categorías, subcategorías, tasas aplicables,
    subregiones, municipios y eventos.

    Args:
        xls: Diccionario que mapea el nombre de cada hoja (por ejemplo "2007", "2008") a su DataFrame correspondiente.

    Returns:
        Un diccionario con los siguientes DataFrames:
            - 'categorias': tabla de categorías de violencia.
            - 'subcategorias': tabla de subcategorías con relación a su categoría.
            - 'tipos_tasa': tabla con los tipos de tasa válidos.
            - 'subregiones': tabla de subregiones identificadas por ID.
            - 'municipios': tabla de municipios y su subregión correspondiente.
            - 'eventos': tabla de hechos con los casos registrados, tasa aplicada y metadatos.
    """
    counters = defaultdict(int)
    eventos = []
    dims = {
        "categorias": {
            "Violencia": CATEGORIA_VIOLENCIA,
            "Violencia Contra La Mujer": CATEGORIA_VCM
        },
        "subcategorias": {},
    }
    all_municipios = []
    all_subregiones = []
    subcat_to_cat_id = {}

    for nombre_hoja, df in xls.items():
        if not es_hoja_valida(nombre_hoja):
            continue

        year = int(nombre_hoja)
        col_to_keys, subcat_map = extraer_columnas_validas(df)
        df_clean = limpiar_filas_entidades(df)
        subregiones, municipios = extraer_subregiones_municipios(df_clean)

        all_subregiones.extend(subregiones.to_dict("records"))
        municipios_dim = municipios.drop(columns=["orig_index"], errors="ignore")
        all_municipios.extend(municipios_dim.to_dict("records"))
        subcat_to_cat_id.update(subcat_map)

        procesar_eventos_hoja(df, col_to_keys, municipios, year, counters, dims, eventos, subcat_to_cat_id)

    df_categorias = pd.DataFrame([
        {"id_categoria": v, "categoria": normalizar_texto(k)}
        for k, v in dims["categorias"].items()
    ])

    df_subcategorias = pd.DataFrame([
        {
            "id_subcategoria": v,
            "id_categoria": subcat_to_cat_id.get(sub, CATEGORIA_VIOLENCIA),
            "subcategoria": normalizar_texto(sub)
        }
        for sub, v in dims["subcategorias"].items()
        if sub in subcat_to_cat_id
    ])

    df_tasas = pd.DataFrame([
        {"id_tipo_tasa": TASA_GENERAL, "tipo_tasa": "Tasa X Cienmil Hbtes."},
        {"id_tipo_tasa": TASA_MUJERES, "tipo_tasa": "Tasa X Cienmil Mujeres"}
    ])

    df_eventos = pd.DataFrame(eventos)
    df_eventos["valor_tasa"] = df_eventos["valor_tasa"].fillna(0.0)

    return {
        "categorias": df_categorias.reset_index(drop=True),
        "subcategorias": df_subcategorias.reset_index(drop=True),
        "tipos_tasa": df_tasas,
        "subregiones": pd.DataFrame(all_subregiones).drop_duplicates().dropna().reset_index(drop=True),
        "municipios": pd.DataFrame(all_municipios).drop_duplicates(subset=['id_municipio']).dropna().reset_index(drop=True),
        "eventos": df_eventos.dropna(subset=["id_evento", "id_municipio", "id_subcategoria"]).reset_index(drop=True)
    }

def unir_tablas_para_exportar(tablas: Dict[str, pd.DataFrame]) -> pd.DataFrame:
    """
    Une las tablas dimensionales con la tabla de hechos para construir una vista consolidada de eventos.

    Realiza inner joins entre las tablas:
    - eventos ← municipios ← subregiones
    - eventos ← subcategorias ← categorias
    - eventos ← tipos_tasa

    Args:
        tablas: Diccionario con las tablas generadas por el modelo copo de nieve.

    Returns:
        Un DataFrame con las siguientes columnas:
        ['id_evento', 'subregion', 'municipio', 'categoria', 'subcategoria',
         'tipo_tasa', 'casos', 'valor_tasa', 'año']
    """
    eventos = tablas["eventos"]
    municipios = tablas["municipios"]
    subregiones = tablas["subregiones"]
    subcategorias = tablas["subcategorias"]
    categorias = tablas["categorias"]
    tasas = tablas["tipos_tasa"]

    df = eventos.merge(municipios, on="id_municipio", how="left") \
                .merge(subregiones, on="id_subregion", how="left") \
                .merge(subcategorias, on="id_subcategoria", how="left") \
                .merge(categorias, on="id_categoria", how="left") \
                .merge(tasas, on="id_tipo_tasa", how="left")

    columnas = ["id_evento", "subregion", "municipio", "categoria", "subcategoria",
                "tipo_tasa", "casos", "valor_tasa", "año"]

    return df[columnas]

In [None]:
file_path = "/content/drive/MyDrive/Colab Notebooks/unad/Eventos_Salud_Publica_por_Municipio_2007_2023.xlsx"
xls_data = cargar_excel(file_path)

# Construir el modelo en forma de copo de nieve
tablas = construir_modelo_copo_nieve(xls_data)

# Crear vista consolidada
vista_eventos = unir_tablas_para_exportar(tablas)

# Guardar todas las tablas más la vista en un solo archivo Excel
with pd.ExcelWriter("CopoNieve.xlsx", engine="openpyxl") as writer:
    for nombre, df in tablas.items():
        df.to_excel(writer, sheet_name=nombre.capitalize(), index=False)

vista_eventos.to_excel("Consolidado.xlsx", sheet_name="Consolidado", index=False, engine="openpyxl")