# Proyecto 1 — Limpieza (v4, completo)
**Objetivo:** Limpiar y estandarizar el dataset de establecimientos **NIVEL DIVERSIFICADO** con:
- Normalización de textos, espacios y guiones
- Extracción y estandarización de **ZONA**
- Corrección OCR **solo** en contexto de `RUTA`
- **Estandarización de DIRECCION** a formato canónico (`DIRECCION_STD`) con componentes auxiliares
- Normalización de **CIUDAD CAPITAL** a `GUATEMALA/GUATEMALA`
- Detección de **duplicados en la capital** por posible doble unión (país + capital)
- Exportación de resultados y (opcional) código para búsqueda y libro de códigos


## 0) Configuración

In [None]:
# Rutas (ajusta INPUT_CSV si tu archivo se llama distinto o está en otra ruta)
INPUT_CSV   = "establecimientos_diversificado_raw_concat.csv"
CLEAN_CSV   = "establecimientos_diversificado_limpio_v4.csv"
CAP_CLUSTERS_CSV = "duplicados_capital_merge_v4.csv"
CAP_DETALLE_CSV  = "duplicados_capital_merge_detalle_v4.csv"
CODEBOOK_MD = "Libro_de_Codigos_Proyecto1_v4.md"  # opcional
EXCEL_XLSX  = "salidas_proyecto1_v4.xlsx"          # opcional


## 1) Imports

In [None]:
import re, os, unicodedata
from datetime import datetime
import pandas as pd
import numpy as np

pd.set_option("display.max_colwidth", 160)

## 2) Carga del CSV consolidado y normalización de nombres de columnas

In [None]:
# Cargar TODO como texto para no perder ceros a la izquierda ni formatos
df = pd.read_csv(INPUT_CSV, dtype=str)
# Columnas a MAYÚSCULAS por consistencia
df.columns = [c.upper() for c in df.columns]
df.shape, df.columns.tolist()[:20]

## 3) Utilidades de normalización y parsing

In [None]:
def strip_accents(s: str) -> str:
    if not isinstance(s, str): return s
    return "".join(ch for ch in unicodedata.normalize("NFKD", s) if not unicodedata.combining(ch))


def collapse_spaces_and_hyphens(s: str) -> str:
    if not isinstance(s, str): return s
    s = s.replace("—","-").replace("–","-").replace("−","-").replace("­","")
    s = re.sub(r"[ \t\u00A0]+", " ", s)
    s = re.sub(r"-{2,}", "-", s)
    s = re.sub(r"\s*-\s*", " - ", s)
    s = re.sub(r"\s{2,}", " ", s)
    return s.strip()


def normalize_text_basic(s: str) -> str:
    if not isinstance(s, str): return s
    s = s.strip().replace('"', "").replace("“","").replace("”","").replace("´", "'")
    s = strip_accents(s).upper()
    s = collapse_spaces_and_hyphens(s)
    return s


def remove_leading_bullets(s: str) -> str:
    if not isinstance(s, str): return s
    # Elimina bullets iniciales tipo "- IGA", "-IGA", "• NOMBRE"
    return re.sub(r"^\s*[-•]+\s*", "", s)


def expand_address_abbrev(s: str) -> str:
    """Expande abreviaturas frecuentes en direcciones y normaliza separadores."""
    if not isinstance(s, str): return s
    txt = " " + s + " "
    rules = [
        (r"\bAV[\.]?\b", " AVENIDA "), (r"\bAVE[\.]?\b", " AVENIDA "), (r"\bAVDA[\.]?\b", " AVENIDA "),
        (r"\bBLVD[\.]?\b", " BOULEVARD "), (r"\bCALZ[\.]?\b", " CALZADA "),
        (r"\bCOL[\.]?\b", " COLONIA "), (r"\bCOND[\.]?\b", " CONDOMINIO "), (r"\bRES[\.]?\b", " RESIDENCIAL "),
        (r"\bZ[\.]?\b(?=\s*\d)", " ZONA "), (r"\bZONA\b\s*(?=\d)", " ZONA "),
        (r"\bKM[\.]?\b", " KM "), (r"\bNO[\.]?\b", " NUMERO "), (r"\bN[\.]?\b(?=\s*\d)", " NUMERO "), (r"\b#\b", " NUMERO "),
        (r"\bEDIF[\.]?\b", " EDIFICIO "), (r"\bTORRE\b", " TORRE "),
        (r"\b2DA\b", " 2A "), (r"\b3RA\b", " 3A ")
    ]
    for pat, rep in rules:
        txt = re.sub(pat, rep, txt, flags=re.IGNORECASE)
    txt = collapse_spaces_and_hyphens(txt)
    return txt


def normalize_name(s: str) -> str:
    if not isinstance(s, str): return s
    s = remove_leading_bullets(s)
    s = normalize_text_basic(s)
    return s


def fix_ruta_ocr_general(s: str) -> str:
    """Corrige errores OCR en contexto de 'RUTA': I/L->1 y O->0 cuando van antes de un dígito."""
    if not isinstance(s, str): return s
    txt = s
    txt = re.sub(r"\b(RUTA\s+)[ILil]\s*(\d)\b", r"\g<1>1\2", txt)
    txt = re.sub(r"\b(RUTA\s+)[Oo]\s*(\d)\b", r"\g<1>0\2", txt)
    return txt


def clean_phone_field(s: str):
    """Extrae grupos de 8 dígitos válidos; deduplica; retorna unidos por ' / ' o NA."""
    if not isinstance(s, str): return None
    nums = re.findall(r"\d{8}", s)
    nums = [n for n in nums if not re.fullmatch(r"0{8}", n)]
    seen, unique = set(), []
    for n in nums:
        if n not in seen:
            seen.add(n); unique.append(n)
    return " / ".join(unique) if unique else None


def extract_zona_token(text: str):
    """Devuelve 'NN' si encuentra 'ZONA NN' en el texto, si no None."""
    if not isinstance(text, str): return None
    m = re.search(r"\bZONA\s*(\d{1,2})\b", text)
    if m:
        return (m.group(1).lstrip("0") or "0")
    return None


def parse_address_components(addr: str):
    """Parsea y estandariza la dirección.
    - Expande abreviaturas y corrige OCR (RUTA).
    - Extrae ZONA (y la remueve de la dirección).
    - Detecta componentes: AVENIDA, CALLE, KM, NUMERO.
    - Captura nominativos (COLONIA/RESIDENCIAL/CONDOMINIO/EDIFICIO/TORRE/BARRIO/SECTOR/BOULEVARD/CALZADA/CARRETERA/RUTA).
    - Construye DIRECCION_STD: AVENIDA, CALLE, KM, NUMERO, nominativos, resto limpio.
    Retorna: (dict_componentes, zona, direccion_std)
    """
    if not isinstance(addr, str) or not addr.strip():
        return {}, None, ""
    a = expand_address_abbrev(normalize_text_basic(addr))
    a = fix_ruta_ocr_general(a)
    zona = extract_zona_token(a)
    a_no_zona = re.sub(r"\bZONA\s*\d{1,2}\b", "", a).strip()
    a_no_zona = collapse_spaces_and_hyphens(a_no_zona)

    comp = {}
    # AVENIDA ('AVENIDA 7A' o '7A AVENIDA')
    m = re.search(r"\bAVENIDA\s+([0-9]{1,2}[A-Z]?)\b", a_no_zona) or re.search(r"\b([0-9]{1,2}[A-Z]?)\s+AVENIDA\b", a_no_zona)
    if m: comp["AVENIDA"] = m.group(1)

    # CALLE ('CALLE 5' o '5 CALLE')
    m = re.search(r"\bCALLE\s+([0-9]{1,2}[A-Z]?)\b", a_no_zona) or re.search(r"\b([0-9]{1,2}[A-Z]?)\s+CALLE\b", a_no_zona)
    if m: comp["CALLE"] = m.group(1)

    # KM
    m = re.search(r"\bKM\s*([0-9]+(?:\.[0-9]+)?)\b", a_no_zona)
    if m: comp["KM"] = m.group(1)

    # NUMERO
    m = re.search(r"\bNUMERO\s*([0-9A-Z\-\/]+)\b", a_no_zona)
    if m: comp["NUMERO"] = m.group(1)

    # NOMINATIVOS
    nominativos = ["COLONIA","RESIDENCIAL","CONDOMINIO","EDIFICIO","TORRE","BARRIO","SECTOR","BOULEVARD","CALZADA","CARRETERA","RUTA"]
    nom_found = []
    rem = a_no_zona
    for key in nominativos:
        m = re.search(rf"\b{key}\b\s*([A-Z0-9\-\.\s]+)", rem)
        if m:
            val = m.group(1).strip()
            for stop in nominativos + ["AVENIDA","CALLE","KM","NUMERO"]:
                val = re.split(rf"\b{stop}\b", val)[0]
            val = val.strip(" ,.;-")
            if val:
                nom_found.append(f"{key} {val}")
                rem = re.sub(rf"\b{key}\b\s*[A-Z0-9\-\.\s]+", "", rem, count=1).strip()

    parts = []
    if "AVENIDA" in comp: parts.append(f"AVENIDA {comp['AVENIDA']}")
    if "CALLE"   in comp: parts.append(f"CALLE {comp['CALLE']}")
    if "KM"      in comp: parts.append(f"KM {comp['KM']}")
    if "NUMERO"  in comp: parts.append(f"NUMERO {comp['NUMERO']}")
    if nom_found: parts.extend(nom_found)

    resto = rem.strip(",; ")
    resto = collapse_spaces_and_hyphens(resto)
    for p in parts + ([f"ZONA {zona}"] if zona else []):
        if p:
            resto = re.sub(re.escape(p), "", resto).strip(",; ")
    direccion_std = ", ".join([p for p in parts if p]) if parts else a_no_zona
    if resto and resto not in direccion_std:
        direccion_std = ", ".join([direccion_std, resto]) if direccion_std else resto
    direccion_std = collapse_spaces_and_hyphens(direccion_std)
    direccion_std = re.sub(r"\s{2,}", " ", direccion_std).strip(" ,")

    return comp, zona, direccion_std


## 4) Limpieza básica + respaldos + categóricas + teléfono

In [None]:
# Colapsar espacios/guiones en todas las columnas de texto
obj_cols = [c for c in df.columns if df[c].dtype == "object"]
df[obj_cols] = df[obj_cols].apply(lambda s: s.map(lambda x: collapse_spaces_and_hyphens(x) if isinstance(x, str) else x))

# Estandarizar vacíos/guiones de relleno a NA
df[obj_cols] = df[obj_cols].replace({
    "": pd.NA, "nan": pd.NA, "None": pd.NA, "-": pd.NA, "--": pd.NA, "—": pd.NA
})

# Respaldos de originales (si no existen)
for col in ["ESTABLECIMIENTO","DIRECCION","TELEFONO"]:
    if col in df.columns and f"{col}_ORIG" not in df.columns:
        df[f"{col}_ORIG"] = df[col]

# Remover bullets iniciales
for col in ["ESTABLECIMIENTO","DIRECCION"]:
    if col in df.columns:
        df[col] = df[col].map(remove_leading_bullets)

# Normalizar nombre de establecimiento
if "ESTABLECIMIENTO" in df.columns:
    df["ESTABLECIMIENTO"] = df["ESTABLECIMIENTO"].map(normalize_name)

# Teléfono: extraer 8 dígitos, deduplicar y unir por " / "
if "TELEFONO" in df.columns:
    df["TELEFONO"] = df["TELEFONO"].map(clean_phone_field)
    df["TELEFONO_VALIDO"] = df["TELEFONO"].apply(lambda x: bool(x) if pd.notna(x) else False)

# Categóricas a MAYÚSCULAS sin acentos
for col in ["DEPARTAMENTO","MUNICIPIO","SECTOR","AREA","STATUS","MODALIDAD","JORNADA","PLAN","NIVEL"]:
    if col in df.columns:
        df[col] = df[col].map(lambda x: normalize_text_basic(x) if isinstance(x, str) else x)

df.shape

## 5) Dirección: extracción de ZONA y `DIRECCION_STD` canónica + componentes

In [None]:
dir_avenida, dir_calle, dir_km, dir_numero = [], [], [], []
zonas, dir_std_list = [], []

for val in df["DIRECCION"].fillna("").astype(str):
    comp, zona, dstd = parse_address_components(val)
    dir_std_list.append(dstd if dstd else pd.NA)
    zonas.append(zona if zona is not None else pd.NA)
    dir_avenida.append(comp.get("AVENIDA", pd.NA))
    dir_calle.append(comp.get("CALLE", pd.NA))
    dir_km.append(comp.get("KM", pd.NA))
    dir_numero.append(comp.get("NUMERO", pd.NA))

# Crear/llenar ZONA
if "ZONA" not in df.columns:
    df["ZONA"] = pd.NA
df["ZONA"] = df["ZONA"].fillna(pd.Series(zonas))

# Dirección estandarizada + columnas auxiliares
df["DIRECCION_STD"] = dir_std_list
df["DIR_AVENIDA"] = dir_avenida
df["DIR_CALLE"]   = dir_calle
df["DIR_KM"]      = dir_km
df["DIR_NUMERO"]  = dir_numero

# Asegurar formato final normalizado
df["DIRECCION_STD"] = df["DIRECCION_STD"].map(normalize_text_basic)

df[["DIRECCION","DIRECCION_STD","ZONA"]].head(10)

## 6) Normalización de 'CIUDAD CAPITAL' y zonas en DEPARTAMENTO/MUNICIPIO

In [None]:
def normalize_capital_and_zone_row(row):
    dept = str(row.get("DEPARTAMENTO", "") or "")
    muni = str(row.get("MUNICIPIO", "") or "")
    # Detecta CIUDAD CAPITAL
    is_capital = bool(re.search(r"\bCIUDAD\s+CAPITAL\b", dept) or re.search(r"\bCIUDAD\s+CAPITAL\b", muni))
    # Extrae zona si está mal ubicada
    z_depto = extract_zona_token(dept)
    z_muni  = extract_zona_token(muni)
    zona = row.get("ZONA", pd.NA)
    zona = zona if pd.notna(zona) else (z_depto or z_muni)
    if is_capital or z_depto is not None or z_muni is not None:
        row["DEPARTAMENTO"] = "GUATEMALA"
        row["MUNICIPIO"] = "GUATEMALA"
        row["ZONA"] = zona
    return row

df = df.apply(normalize_capital_and_zone_row, axis=1)
# Estandarizar NA en ZONA
df["ZONA"] = df["ZONA"].where(df["ZONA"].notna() & df["ZONA"].astype(str).str.strip().ne(""), pd.NA)

df[["DEPARTAMENTO","MUNICIPIO","ZONA"]].head(10)

## 7) Duplicados en Ciudad de Guatemala (posible doble unión país + capital)

In [None]:
def _norm(s): return normalize_text_basic(s) if isinstance(s, str) else ""

# Clave normalizada para detectar duplicados de capital (usa DIRECCION_STD)
df["__KEY_MERGE_CAP__"] = df.apply(lambda r: "|".join([
    _norm(r.get("ESTABLECIMIENTO","")),
    _norm(r.get("DIRECCION_STD","")),
    _norm(r.get("MUNICIPIO","")),
    _norm(r.get("DEPARTAMENTO","")),
    _norm(r.get("JORNADA","")),
    _norm(r.get("PLAN","")),
    _norm(r.get("TELEFONO","")),
]), axis=1)

mask_capital = (df["DEPARTAMENTO"]=="GUATEMALA") & (df["MUNICIPIO"]=="GUATEMALA")
grp_sizes = df[mask_capital].groupby("__KEY_MERGE_CAP__")["__KEY_MERGE_CAP__"].transform("size")

df["DUP_CAPITAL_MERGE"] = False
df.loc[mask_capital & grp_sizes.gt(1), "DUP_CAPITAL_MERGE"] = True

# Marca si dentro del cluster hay CODIGOS distintos
dup_cap_groups = df.loc[df["DUP_CAPITAL_MERGE"]].groupby("__KEY_MERGE_CAP__")
key_has_diff_codes = dup_cap_groups["CODIGO"].apply(lambda s: len(set(s.dropna().astype(str))) > 1)
key_has_diff_codes = key_has_diff_codes.reindex(df["__KEY_MERGE_CAP__"]).fillna(False).values
df["DUP_CAPITAL_CODIGOS_DISTINTOS"] = key_has_diff_codes


df.loc[df["DUP_CAPITAL_MERGE"], ["ESTABLECIMIENTO","DIRECCION_STD","ZONA","MUNICIPIO","DEPARTAMENTO","JORNADA","PLAN","CODIGO","DUP_CAPITAL_CODIGOS_DISTINTOS"]].head(20)

## 8) Guardar CSVs de salida

In [None]:
# Dataset limpio v4
df.to_csv(CLEAN_CSV, index=False, encoding="utf-8")

# Tablas de duplicados en capital (clusters y detalle)
cap_dups = df.loc[df["DUP_CAPITAL_MERGE"], ["__KEY_MERGE_CAP__","CODIGO","ESTABLECIMIENTO","DIRECCION_STD","ZONA","TELEFONO","JORNADA","PLAN","MUNICIPIO","DEPARTAMENTO","DUP_CAPITAL_CODIGOS_DISTINTOS"]]

cluster_rows = []
for key, sub in cap_dups.groupby("__KEY_MERGE_CAP__"):
    cods = sorted(set(sub["CODIGO"].dropna().astype(str)))
    cluster_rows.append({
        "KEY": key,
        "ESTABLECIMIENTO": sub["ESTABLECIMIENTO"].iloc[0],
        "DIRECCION_STD": sub["DIRECCION_STD"].iloc[0],
        "ZONA": sub["ZONA"].iloc[0],
        "JORNADA": sub["JORNADA"].iloc[0],
        "PLAN": sub["PLAN"].iloc[0],
        "TELEFONO": sub["TELEFONO"].iloc[0],
        "MUNICIPIO": sub["MUNICIPIO"].iloc[0],
        "DEPARTAMENTO": sub["DEPARTAMENTO"].iloc[0],
        "N_FILAS": len(sub),
        "CODIGOS": " | ".join(cods),
        "CODIGOS_DISTINTOS": (len(cods) > 1)
    })
cap_clusters = pd.DataFrame(cluster_rows).sort_values(["ESTABLECIMIENTO","ZONA","DIRECCION_STD","JORNADA","PLAN"])

cap_clusters.to_csv(CAP_CLUSTERS_CSV, index=False, encoding="utf-8")
cap_dups.drop(columns=["__KEY_MERGE_CAP__"]).to_csv(CAP_DETALLE_CSV, index=False, encoding="utf-8")

CLEAN_CSV, CAP_CLUSTERS_CSV, CAP_DETALLE_CSV, cap_clusters.shape[0]

## 9) (Opcional) Función de búsqueda rápida

In [None]:
def _norm_q(s):
    if not isinstance(s, str): return ""
    s = s.strip().upper()
    s = "".join(ch for ch in unicodedata.normalize("NFKD", s) if not unicodedata.combining(ch))
    s = re.sub(r"\s*-\s*", " - ", s)
    s = re.sub(r"\s+", " ", s)
    return s


def buscar(df, nombre=None, municipio=None, departamento=None, zona=None):
    m = pd.Series(True, index=df.index)
    if nombre:
        m &= df["ESTABLECIMIENTO"].fillna("").map(_norm_q).str.contains(re.escape(_norm_q(nombre)), regex=True, na=False)
    if municipio:
        m &= df["MUNICIPIO"].fillna("").map(_norm_q).str.contains(re.escape(_norm_q(municipio)), regex=True, na=False)
    if departamento:
        m &= df["DEPARTAMENTO"].fillna("").map(_norm_q).str.contains(re.escape(_norm_q(departamento)), regex=True, na=False)
    if zona is not None:
        m &= df["ZONA"].astype(str).fillna("").eq(str(zona))
    cols = ["CODIGO","ESTABLECIMIENTO","DIRECCION","DIRECCION_STD","ZONA","MUNICIPIO","DEPARTAMENTO","JORNADA","PLAN",
            "TELEFONO","DUP_CAPITAL_MERGE","DUP_CAPITAL_CODIGOS_DISTINTOS"]
    cols = [c for c in cols if c in df.columns]
    return df.loc[m, cols].sort_values(["DEPARTAMENTO","MUNICIPIO","ESTABLECIMIENTO","DIRECCION_STD"]).reset_index(drop=True)

# Ejemplos:
# buscar(df, nombre="IGA", municipio="GUATEMALA")
# buscar(df, nombre="IGA")

## 10) (Opcional) Exportar a Excel (múltiples hojas)

In [None]:
with pd.ExcelWriter(EXCEL_XLSX, engine="xlsxwriter") as writer:
    df.to_excel(writer, index=False, sheet_name="LIMPIO_V4")
    if os.path.exists(CAP_CLUSTERS_CSV):
        pd.read_csv(CAP_CLUSTERS_CSV, dtype=str).to_excel(writer, index=False, sheet_name="DUPS_CAPITAL_CLUSTERS")
    if os.path.exists(CAP_DETALLE_CSV):
        pd.read_csv(CAP_DETALLE_CSV, dtype=str).to_excel(writer, index=False, sheet_name="DUPS_CAPITAL_DETALLE")
EXCEL_XLSX

## 11) (Opcional) Generar Libro de Códigos (Markdown)

In [None]:
lines = []
lines.append("# Libro de Códigos – Establecimientos (Diversificado, v4)\n")
lines.append("**Estándares de limpieza aplicados:**\n")
lines.append("- Texto en MAYÚSCULAS y SIN ACENTOS en campos categóricos.\n")
lines.append("- Guiones internos conservados con espacios estándar (` - `).\n")
lines.append("- Bullets iniciales removidos en `ESTABLECIMIENTO`/`DIRECCION`.\n")
lines.append("- `DIRECCION`: abreviaturas expandidas; OCR en `RUTA` (I/L→1, O→0).\n")
lines.append("- `ZONA`: extraída de dirección y/o dept/muni; `CIUDAD CAPITAL` → `GUATEMALA/GUATEMALA`.\n")
lines.append("- `DIRECCION_STD` con orden canónico: AVENIDA, CALLE, KM, NUMERO, nominativos, resto.\n")
lines.append("- Duplicados en capital por posible doble unión (`DUP_CAPITAL_MERGE`, `DUP_CAPITAL_CODIGOS_DISTINTOS`).\n")
lines.append("- Sin eliminación de filas/columnas; solo marcado/normalización.\n")

lines.append("\n**Descripción general:**\n")
lines.append(f"- Registros: {len(df)}\n")
lines.append(f"- ZONA con valor: {int(df['ZONA'].notna().sum())} | NA: {int(df['ZONA'].isna().sum())}\n")
lines.append("- Archivos: `duplicados_capital_merge_v4.csv`, `duplicados_capital_merge_detalle_v4.csv`.\n")

lines.append("\n## Variables\n")
for col in df.columns:
    ejemplo = df[col].dropna().astype(str).head(3).tolist()
    ejemplo = [e[:120] for e in ejemplo]
    desc = ""
    if col == "ESTABLECIMIENTO_ORIG": desc = "Valor original del nombre (antes de limpieza)."
    elif col == "DIRECCION_ORIG": desc = "Valor original de la dirección (antes de limpieza)."
    elif col == "TELEFONO_ORIG": desc = "Valor original del teléfono (antes de extracción)."
    elif col == "TELEFONO_VALIDO": desc = "Indicador: True si se extrajo al menos un teléfono de 8 dígitos."
    elif col == "DIRECCION_STD": desc = "Dirección estandarizada (orden canónico)."
    elif col in {"DIR_AVENIDA","DIR_CALLE","DIR_KM","DIR_NUMERO"}: desc = "Componente extraído de la dirección."
    elif col == "ZONA": desc = "Zona administrativa (si aplica); NA si no disponible."
    elif col in {"DEPARTAMENTO","MUNICIPIO"}: desc = "Ubicación administrativa normalizada."
    elif col == "DUP_CAPITAL_MERGE": desc = "True si el registro forma parte de un cluster duplicado en la capital (posible doble unión)."
    elif col == "DUP_CAPITAL_CODIGOS_DISTINTOS": desc = "True si dentro de su cluster hay códigos distintos."
    else: desc = "Campo de la fuente; normalizado cuando aplica."
    lines.append(f"### {col}\n- **Descripción:** {desc}\n- **Ejemplo(s):** {', '.join(ejemplo) if ejemplo else '—'}\n")

with open(CODEBOOK_MD, "w", encoding="utf-8") as f:
    f.write("\n".join(lines))

CODEBOOK_MD