In [None]:
# %%capture
!pip -q install bs4 lxml tqdm

import os, re, requests, zipfile, shutil, gzip
from urllib.parse import urljoin
from bs4 import BeautifulSoup
from tqdm import tqdm
from pathlib import Path

# ========= Config =========
BASE = "https://ensanut.insp.mx/encuestas/ensanutcontinua2023/descargas.php"
ROOT = "/content/ensanut_2023"
OUT = {
    "salud":     {"csv": f"{ROOT}/salud/csv",     "cat": f"{ROOT}/salud/catalogos"},
    "nutricion": {"csv": f"{ROOT}/nutricion/csv", "cat": f"{ROOT}/nutricion/catalogos"},
}
for comp in OUT:
    os.makedirs(OUT[comp]["csv"], exist_ok=True)
    os.makedirs(OUT[comp]["cat"], exist_ok=True)

# ========= Utils =========
def safe(s):
    s = re.sub(r"\s+", " ", (s or "").strip())
    s = re.sub(r"[^\w\-.() áéíóúÁÉÍÓÚñÑ]", "_", s)
    s = re.sub(r"_+", "_", s).strip("_")
    return (s[:200] or "sin_nombre")

def unique_path(folder, filename):
    base, ext = os.path.splitext(filename)
    i = 1
    path = os.path.join(folder, filename)
    while os.path.exists(path):
        path = os.path.join(folder, f"{base}__{i}{ext}")
        i += 1
    return path

def save_resp(resp, hint, folder):
    cd = resp.headers.get("content-disposition","")
    m = re.search(r'filename\*?=(?:UTF-8\'\')?"?([^";]+)"?', cd)
    fn = m.group(1) if m else os.path.basename(hint) or "archivo.bin"
    fn = safe(fn)
    path = unique_path(folder, fn)
    with open(path, "wb") as f:
        for chunk in resp.iter_content(1<<14):
            if chunk:
                f.write(chunk)
    return path

def extract_zip_to_folder(zip_path: Path, dest_folder: Path):
    extracted = []
    with zipfile.ZipFile(zip_path, 'r') as zf:
        for member in zf.infolist():
            if member.is_dir():
                continue
            base_name = safe(Path(member.filename).name)
            out_path = Path(unique_path(str(dest_folder), base_name))
            with zf.open(member) as src, open(out_path, "wb") as dst:
                shutil.copyfileobj(src, dst)
            extracted.append(str(out_path))
    zip_path.unlink(missing_ok=True)
    return extracted

def postprocess_unzip_all(dest_folder: str):
    dest = Path(dest_folder)
    changed = True
    while changed:
        changed = False
        for z in list(dest.rglob("*.zip")):
            if zipfile.is_zipfile(z):
                extract_zip_to_folder(z, dest)
                changed = True

def normalize_and_gunzip(folder: str):
    folder_p = Path(folder)
    fixed = gunzipped = 0

    # Descomprimir .gz
    for p in list(folder_p.rglob("*.gz")):
        low = p.name.lower()
        if low.endswith(".csv.gz") or low.endswith(".xlsx.gz"):
            target = p.with_suffix("")  # quita .gz
            target = Path(unique_path(str(target.parent), target.name))
            with gzip.open(p, "rb") as src, open(target, "wb") as dst:
                shutil.copyfileobj(src, dst)
            p.unlink(missing_ok=True)
            gunzipped += 1

    # Arreglar dobles extensiones
    for p in list(folder_p.rglob("*")):
        if not p.is_file():
            continue
        name_low = p.name.lower()
        if name_low.endswith(".csv.csv"):
            new_name = re.sub(r"(?i)\.csv\.csv$", ".csv", p.name)
            new_path = Path(unique_path(str(p.parent), new_name))
            p.rename(new_path); fixed += 1
        if name_low.endswith(".xlsx.xlsx"):
            new_name = re.sub(r"(?i)\.xlsx\.xlsx$", ".xlsx", p.name)
            new_path = Path(unique_path(str(p.parent), new_name))
            p.rename(new_path); fixed += 1

    return fixed, gunzipped

def zip_dir_stored(src_dir: str, zip_path: str):
    src = Path(src_dir)
    with zipfile.ZipFile(zip_path, mode="w", compression=zipfile.ZIP_STORED) as zf:
        for p in src.rglob("*"):
            if p.is_file():
                zf.write(p, arcname=p.relative_to(src))

# ========= Session =========
session = requests.Session()
session.headers.update({"User-Agent": "Mozilla/5.0"})

# ========= Fetch & parse =========
r = session.get(BASE, timeout=30)
r.raise_for_status()
soup = BeautifulSoup(r.text, "lxml")

# Localiza las dos secciones por su título
sections = []
for sec in soup.select("div.block-section"):
    h4 = sec.find("h4", class_="block-title")
    title = (h4.get_text(strip=True) if h4 else "").upper()
    if "COMPONENTE DE SALUD" in title:
        comp = "salud"
    elif "COMPONENTE DE NUTRICIÓN" in title or "COMPONENTE DE NUTRICION" in title:
        comp = "nutricion"
    else:
        continue
    table = sec.find("table")
    if table:
        sections.append((comp, table))

if not sections:
    raise SystemExit("No se detectaron tablas de SALUD/NUTRICIÓN")

# ========= Download loop por componente =========
stats = { "salud": {"csv":0, "cat":0}, "nutricion":{"csv":0, "cat":0} }

for comp, table in sections:
    rows = table.select("tbody tr")
    for i, row in enumerate(tqdm(rows, desc=f"Descargando {comp}")):
        tds = row.find_all("td")
        if len(tds) < 6:
            continue
        categoria = safe(tds[0].get_text(" ", strip=True) or f"categoria_{i:03d}")

        # ----- CSV (columna 3) -----
        td_csv = tds[3]
        a_csv = td_csv.find("a", href=True)
        btn_csv = td_csv.find("button", attrs={"type":"submit"})
        try:
            if a_csv:
                url = urljoin(BASE, a_csv["href"])
                resp = session.get(url, stream=True, timeout=180, headers={"Referer": BASE})
                resp.raise_for_status()
                if "text/html" not in resp.headers.get("content-type","").lower():
                    hint = a_csv.get("href") or f"{categoria}.csv"
                    save_resp(resp, hint, OUT[comp]["csv"])
                    stats[comp]["csv"] += 1
            elif btn_csv and btn_csv.has_attr("name"):
                payload = { btn_csv["name"]: btn_csv.get("value","") }
                resp = session.post(BASE, data=payload, stream=True, timeout=180, headers={"Referer": BASE})
                resp.raise_for_status()
                if "text/html" not in resp.headers.get("content-type","").lower():
                    hint = btn_csv.get("title", f"{categoria}.csv")
                    save_resp(resp, hint, OUT[comp]["csv"])
                    stats[comp]["csv"] += 1
        except Exception as e:
            print(f"[CSV:{comp}] {categoria}: {e}")

        # ----- Catálogo (columna 5) -----
        td_cat = tds[5]
        a_cat = td_cat.find("a", href=True)
        btn_cat = td_cat.find("button", attrs={"type":"submit"})
        try:
            if a_cat:
                url = urljoin(BASE, a_cat["href"])
                resp = session.get(url, stream=True, timeout=180, headers={"Referer": BASE})
                resp.raise_for_status()
                if "text/html" not in resp.headers.get("content-type","").lower():
                    hint = a_cat.get("href") or f"{categoria}.xlsx"
                    save_resp(resp, hint, OUT[comp]["cat"])
                    stats[comp]["cat"] += 1
            elif btn_cat and btn_cat.has_attr("name"):
                payload = { btn_cat["name"]: btn_cat.get("value","") }
                resp = session.post(BASE, data=payload, stream=True, timeout=180, headers={"Referer": BASE})
                resp.raise_for_status()
                if "text/html" not in resp.headers.get("content-type","").lower():
                    hint = btn_cat.get("title", f"{categoria}.xlsx")
                    save_resp(resp, hint, OUT[comp]["cat"])
                    stats[comp]["cat"] += 1
        except Exception as e:
            print(f"[CAT:{comp}] {categoria}: {e}")

# ========= Unzip / gunzip / normaliza nombres por carpeta =========
for comp in OUT:
    postprocess_unzip_all(OUT[comp]["csv"])
    postprocess_unzip_all(OUT[comp]["cat"])
    fixed_csv, gunz_csv = normalize_and_gunzip(OUT[comp]["csv"])
    fixed_cat, gunz_cat = normalize_and_gunzip(OUT[comp]["cat"])
    print(f"\n🧹 {comp.upper()} → CSV: renombrados {fixed_csv}, gunzip {gunz_csv} | CAT: renombrados {fixed_cat}, gunzip {gunz_cat}")

print("\n✅ Resumen descargas:")
for comp in stats:
    print(f"   {comp}: CSV={stats[comp]['csv']}  |  CAT={stats[comp]['cat']}")
print("\n📂 Raíz:", ROOT)
for comp in OUT:
    print(f"   {XD} → csv: {OUT[comp]['csv']}  |  catalogos: {OUT[comp]['cat']}")

# ========= ZIPs por componente =========
zip_paths = {}
for comp in OUT:
    zip_csv = f"/content/ensanut_2023_{comp}_csv.zip"
    zip_cat = f"/content/ensanut_2023_{comp}_catalogos.zip"
    zip_dir_stored(OUT[comp]["csv"], zip_csv)
    zip_dir_stored(OUT[comp]["cat"], zip_cat)
    zip_paths[(comp,"csv")] = zip_csv
    zip_paths[(comp,"cat")] = zip_cat
    print(f"\n🗜️ {comp.upper()} ZIPs:")
    print("   -", zip_csv)
    print("   -", zip_cat)

# ========= Descarga a tu equipo (Colab) =========
try:
    from google.colab import files
    for (comp, kind), z in zip_paths.items():
        if any(Path(OUT[comp][ "csv" if kind=="csv" else "cat"]).glob("*")):
            files.download(z)
except Exception:
    pass


Descargando salud: 100%|██████████| 8/8 [00:04<00:00,  1.74it/s]
Descargando nutricion: 100%|██████████| 31/31 [00:16<00:00,  1.84it/s]



🧹 SALUD → CSV: renombrados 0, gunzip 0 | CAT: renombrados 0, gunzip 0

🧹 NUTRICION → CSV: renombrados 0, gunzip 0 | CAT: renombrados 0, gunzip 0

✅ Resumen descargas:
   salud: CSV=7  |  CAT=7
   nutricion: CSV=28  |  CAT=28

📂 Raíz: /content/ensanut_2023
   salud → csv: /content/ensanut_2023/salud/csv  |  catalogos: /content/ensanut_2023/salud/catalogos
   nutricion → csv: /content/ensanut_2023/nutricion/csv  |  catalogos: /content/ensanut_2023/nutricion/catalogos

🗜️ ZIP final listo: /content/ensanut_2023_ALL.zip
📦 Archivos en /content (debería verse solo el ZIP final o lo mínimo):
['/content/ensanut_2023_ALL.zip']


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>