In [None]:
from __future__ import annotations

import csv
import hashlib
import time
from typing import Dict, List, Set
from urllib.parse import urljoin, urlparse

import requests
from bs4 import BeautifulSoup
import xml.etree.ElementTree as ET

# -----------------------------
# Configuración
# -----------------------------
BASE = "https://tribunalambiental.cl/"
SITEMAP_INDEX = urljoin(BASE, "wp-sitemap.xml")

KEYWORDS = [
    "sentencia", "sentencias",
    "informe", "informes",
    "expediente", "causa",
    "rol", "rit",
    "resolucion", "fallo",
]

MAX_TARGET_URLS = 600
SLEEP_SECONDS = 1.0
TIMEOUT = 25

OUT_CSV = "/content/tribunalambiental_consolidado.csv"

HEADERS = {
    "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122 Safari/537.36",
    "Accept-Language": "es-CL,es;q=0.9,en;q=0.8",
    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
}

# -----------------------------
# Helpers
# -----------------------------
def _domain(url: str) -> str:
    return urlparse(url).netloc.lower()

def _same_domain(url: str, base: str) -> bool:
    return _domain(url) == _domain(base)

def fetch(session: requests.Session, url: str) -> requests.Response:
    r = session.get(url, headers=HEADERS, timeout=TIMEOUT, allow_redirects=True)
    r.raise_for_status()
    return r

def parse_sitemap_locs(xml_text: str) -> List[str]:
    root = ET.fromstring(xml_text)
    ns = {"sm": root.tag.split("}")[0].strip("{")} if "}" in root.tag else {}

    locs: List[str] = []

    # sitemap index
    for loc in root.findall(".//sm:sitemap/sm:loc", ns) + root.findall(".//sitemap/loc"):
        if loc.text:
            locs.append(loc.text.strip())

    # urlset
    for loc in root.findall(".//sm:url/sm:loc", ns) + root.findall(".//url/loc"):
        if loc.text:
            locs.append(loc.text.strip())

    # dedup conservando orden
    seen: Set[str] = set()
    out: List[str] = []
    for u in locs:
        if u and u not in seen:
            out.append(u)
            seen.add(u)
    return out

def keyword_hit(url: str) -> bool:
    u = url.lower()
    return any(k in u for k in KEYWORDS)

def extract_pdf_links_from_tag(tag, page_url: str) -> List[str]:
    pdfs: List[str] = []
    for a in tag.find_all("a", href=True):
        href = a.get("href", "").strip()
        if not href:
            continue
        absu = urljoin(page_url, href)
        if absu.lower().endswith(".pdf"):
            pdfs.append(absu)
    # dedup
    seen: Set[str] = set()
    out: List[str] = []
    for p in pdfs:
        if p not in seen:
            out.append(p)
            seen.add(p)
    return out

def extract_page_pdf_links(soup: BeautifulSoup, page_url: str) -> List[str]:
    pdfs: List[str] = []
    for a in soup.find_all("a", href=True):
        href = a.get("href", "").strip()
        if not href:
            continue
        absu = urljoin(page_url, href)
        if absu.lower().endswith(".pdf"):
            pdfs.append(absu)
    # dedup
    seen: Set[str] = set()
    out: List[str] = []
    for p in pdfs:
        if p not in seen:
            out.append(p)
            seen.add(p)
    return out

def extract_table_records(soup: BeautifulSoup, page_url: str, page_title: str) -> List[Dict[str, str]]:
    records: List[Dict[str, str]] = []
    tables = soup.find_all("table")

    for ti, table in enumerate(tables, start=1):
        headers: List[str] = []
        thead_th = table.select("thead th")
        if thead_th:
            headers = [th.get_text(" ", strip=True) for th in thead_th]
        else:
            first_tr = table.find("tr")
            if first_tr:
                ths = first_tr.find_all("th")
                if ths:
                    headers = [th.get_text(" ", strip=True) for th in ths]

        row_index = 0
        for tr in table.find_all("tr"):
            tds = tr.find_all("td")
            if not tds:
                continue
            row_index += 1
            cells = [td.get_text(" ", strip=True) for td in tds]
            if not headers:
                headers = [f"col_{i+1}" for i in range(len(cells))]

            rec: Dict[str, str] = {
                "page_url": page_url,
                "page_title": page_title,
                "table_index": str(ti),
                "row_index": str(row_index),
            }

            for i, h in enumerate(headers):
                key = h.strip() if h and h.strip() else f"col_{i+1}"
                rec[key] = cells[i] if i < len(cells) else ""

            pdfs_row = extract_pdf_links_from_tag(tr, page_url)
            rec["pdf_links_row"] = " | ".join(pdfs_row)
            records.append(rec)

    return records

def record_key(rec: Dict[str, str]) -> str:
    base = [rec.get("page_url", ""), rec.get("table_index", ""), rec.get("row_index", "")]
    extra_items = sorted((k, v) for k, v in rec.items() if k not in ("pdf_links_row", "pdf_links_page"))
    payload = "|".join(base) + "||" + "|".join(f"{k}={v}" for k, v in extra_items)
    return hashlib.sha1(payload.encode("utf-8", errors="ignore")).hexdigest()

# -----------------------------
# Main
# -----------------------------
print("== Batch scrape tribunalambiental.cl (WP sitemap) ==")
print("Sitemap index:", SITEMAP_INDEX)

all_records: List[Dict[str, str]] = []
seen_rec: Set[str] = set()

with requests.Session() as session:
    # 1) sitemap index
    r = fetch(session, SITEMAP_INDEX)
    child_sitemaps = [u for u in parse_sitemap_locs(r.text) if u.lower().endswith(".xml")]
    print("Child sitemaps:", len(child_sitemaps))
    for u in child_sitemaps:
        print(" -", u)

    # 2) bajar sitemaps hijos y juntar URLs
    all_urls: List[str] = []
    for i, sm in enumerate(child_sitemaps, start=1):
        time.sleep(SLEEP_SECONDS)
        try:
            rs = fetch(session, sm)
            locs = parse_sitemap_locs(rs.text)
            locs = [u for u in locs if _same_domain(u, BASE)]
            all_urls.extend(locs)
            print(f"[sitemap {i}/{len(child_sitemaps)}] locs={len(locs)} -> {sm}")
        except Exception as e:
            print("[WARN] fallo leyendo sitemap:", sm, "|", e)

    # dedup URLs
    seen_url: Set[str] = set()
    uniq_urls: List[str] = []
    for u in all_urls:
        if u not in seen_url:
            uniq_urls.append(u)
            seen_url.add(u)

    print("Total URLs (dedup):", len(uniq_urls))

    # 3) filtrar targets por keywords en URL
    targets = [u for u in uniq_urls if keyword_hit(u)]
    print("Targets por keywords:", len(targets))

    if len(targets) > MAX_TARGET_URLS:
        print(f"[INFO] recortando targets a MAX_TARGET_URLS={MAX_TARGET_URLS}")
        targets = targets[:MAX_TARGET_URLS]

    # 4) scrape por cada target
    for idx, url in enumerate(targets, start=1):
        time.sleep(SLEEP_SECONDS)
        try:
            rp = fetch(session, url)
            soup = BeautifulSoup(rp.text, "html.parser")
            title = soup.title.get_text(" ", strip=True) if soup.title else ""

            page_recs = extract_table_records(soup, rp.url, title)
            page_pdfs = extract_page_pdf_links(soup, rp.url)
            page_pdfs_join = " | ".join(page_pdfs)

            # placeholder si no hay tablas
            if not page_recs:
                page_recs = [{
                    "page_url": rp.url,
                    "page_title": title,
                    "table_index": "",
                    "row_index": "",
                    "pdf_links_row": "",
                }]

            for rec in page_recs:
                rec["pdf_links_page"] = page_pdfs_join
                k = record_key(rec)
                if k not in seen_rec:
                    all_records.append(rec)
                    seen_rec.add(k)

            print(f"[{idx}/{len(targets)}] OK rows={len(page_recs)} pdfs_page={len(page_pdfs)} | {rp.url}")

        except Exception as e:
            print("[WARN] fallo:", url, "|", e)

# 5) export CSV con columnas dinámicas
if not all_records:
    raise RuntimeError("No se extrajeron registros. Ajusta KEYWORDS o revisa conectividad.")

cols: Set[str] = set()
for rec in all_records:
    cols.update(rec.keys())

preferred = ["page_url", "page_title", "table_index", "row_index", "pdf_links_row", "pdf_links_page"]
rest = sorted([c for c in cols if c not in preferred])
fieldnames = preferred + rest

with open(OUT_CSV, "w", newline="", encoding="utf-8") as f:
    w = csv.DictWriter(f, fieldnames=fieldnames)
    w.writeheader()
    for rec in all_records:
        w.writerow(rec)

print("\n== Listo ==")
print("Registros (dedup):", len(all_records))
print("CSV:", OUT_CSV)

== Batch scrape tribunalambiental.cl (WP sitemap) ==
Sitemap index: https://tribunalambiental.cl/wp-sitemap.xml
Child sitemaps: 6
 - https://tribunalambiental.cl/wp-sitemap-posts-post-1.xml
 - https://tribunalambiental.cl/wp-sitemap-posts-post-2.xml
 - https://tribunalambiental.cl/wp-sitemap-posts-page-1.xml
 - https://tribunalambiental.cl/wp-sitemap-taxonomies-category-1.xml
 - https://tribunalambiental.cl/wp-sitemap-taxonomies-post_tag-1.xml
 - https://tribunalambiental.cl/wp-sitemap-taxonomies-post_format-1.xml
[sitemap 1/6] locs=2000 -> https://tribunalambiental.cl/wp-sitemap-posts-post-1.xml
[sitemap 2/6] locs=33 -> https://tribunalambiental.cl/wp-sitemap-posts-post-2.xml
[sitemap 3/6] locs=51 -> https://tribunalambiental.cl/wp-sitemap-posts-page-1.xml
[sitemap 4/6] locs=31 -> https://tribunalambiental.cl/wp-sitemap-taxonomies-category-1.xml
[sitemap 5/6] locs=1 -> https://tribunalambiental.cl/wp-sitemap-taxonomies-post_tag-1.xml
[sitemap 6/6] locs=2 -> https://tribunalambiental.c

In [None]:
import pandas as pd

in_path = "/content/tribunalambiental_consolidado.csv"
out_path = "/content/tribunalambiental_consolidado.xlsx"

# Lee CSV y exporta a Excel
df = pd.read_csv(in_path, dtype=str, encoding="utf-8", keep_default_na=False)
df.to_excel(out_path, index=False)

print("Listo:", out_path)


Listo: /content/tribunalambiental_consolidado.xlsx


In [None]:
import os
import re
import time
import csv
import hashlib
from urllib.parse import urlparse, unquote

import pandas as pd
import requests

# -------------------------
# CONFIG
# -------------------------
INPUT_CSV = "/content/tribunalambiental_scraping.csv"
OUT_DIR = "/content/tribunal_pdfs"
MANIFEST_CSV = "/content/tribunal_pdfs_manifest.csv"

SLEEP_SECONDS = 0.7
TIMEOUT = 60
MAX_RETRIES = 3
CHUNK_SIZE = 1024 * 256  # 256 KB

HEADERS = {
    "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122 Safari/537.36",
    "Accept": "application/pdf,*/*;q=0.8",
    "Accept-Language": "es-CL,es;q=0.9,en;q=0.8",
}

# -------------------------
# LECTURA CSV
# -------------------------
def robust_read_csv(path: str) -> pd.DataFrame:
    # autodetect separador y tolera líneas rotas
    return pd.read_csv(
        path,
        dtype=str,
        keep_default_na=False,
        sep=None,
        engine="python",
        on_bad_lines="skip"
    )

def split_field(val: str):
    if not isinstance(val, str) or not val.strip():
        return []
    return [p.strip() for p in val.split("|") if p.strip()]

# -------------------------
# NOMBRES DE ARCHIVO
# -------------------------
def sanitize_filename(name: str) -> str:
    name = unquote(name)
    name = name.strip().replace("\n", " ").replace("\r", " ")
    name = re.sub(r"[^\w\-.() \[\]]+", "_", name, flags=re.UNICODE)
    name = re.sub(r"\s+", " ", name).strip()
    if len(name) > 160:
        base, ext = os.path.splitext(name)
        name = base[:140] + ext
    return name

def filename_from_content_disposition(cd: str) -> str | None:
    # intenta extraer filename=... o filename*=utf-8''...
    if not cd:
        return None
    cd_low = cd.lower()

    # filename*=UTF-8''...
    m = re.search(r"filename\*\s*=\s*([^;]+)", cd, flags=re.I)
    if m:
        v = m.group(1).strip().strip('"').strip()
        # formato típico: UTF-8''archivo.pdf
        v = re.sub(r"^utf-8''", "", v, flags=re.I)
        v = v.strip()
        if v:
            return sanitize_filename(v)

    # filename="..."
    m = re.search(r'filename\s*=\s*"([^"]+)"', cd, flags=re.I)
    if m:
        return sanitize_filename(m.group(1))

    # filename=...
    m = re.search(r"filename\s*=\s*([^;]+)", cd, flags=re.I)
    if m:
        return sanitize_filename(m.group(1).strip().strip('"'))

    return None

def unique_path(path: str) -> str:
    if not os.path.exists(path):
        return path
    root, ext = os.path.splitext(path)
    i = 2
    while True:
        cand = f"{root}_{i}{ext}"
        if not os.path.exists(cand):
            return cand
        i += 1

def fallback_filename(url: str) -> str:
    # nombre estable por hash (evita problemas de URL sin basename)
    h = hashlib.sha1(url.encode("utf-8", errors="ignore")).hexdigest()[:16]
    return f"document_{h}.pdf"

# -------------------------
# DESCARGA + VALIDACIÓN PDF
# -------------------------
def download_and_verify_pdf(session: requests.Session, url: str, out_dir: str) -> dict:
    rec = {
        "url": url,
        "final_url": "",
        "status": "",
        "file_path": "",
        "bytes": 0,
        "content_type": "",
        "error": "",
    }

    for attempt in range(1, MAX_RETRIES + 1):
        try:
            with session.get(url, headers=HEADERS, timeout=TIMEOUT, stream=True, allow_redirects=True) as r:
                rec["status"] = str(r.status_code)
                rec["final_url"] = r.url
                rec["content_type"] = (r.headers.get("content-type") or "").lower()

                if r.status_code != 200:
                    raise RuntimeError(f"HTTP {r.status_code}")

                # define nombre desde headers si se puede
                cd = r.headers.get("content-disposition") or ""
                fname = filename_from_content_disposition(cd)
                if not fname:
                    # si la URL trae basename útil, úsalo; si no, hash
                    base = os.path.basename(urlparse(r.url).path) or ""
                    base = sanitize_filename(base) if base else ""
                    if base.lower().endswith(".pdf"):
                        fname = base
                    else:
                        fname = fallback_filename(r.url)

                if not fname.lower().endswith(".pdf"):
                    fname += ".pdf"

                fpath = unique_path(os.path.join(out_dir, fname))
                tmp = fpath + ".part"

                total = 0
                with open(tmp, "wb") as f:
                    for chunk in r.iter_content(chunk_size=CHUNK_SIZE):
                        if chunk:
                            f.write(chunk)
                            total += len(chunk)

                # Validación fuerte: firma PDF
                with open(tmp, "rb") as f:
                    head = f.read(5)

                if head != b"%PDF-":
                    # no es PDF: NO lo guardamos
                    os.remove(tmp)
                    raise RuntimeError(f"No es PDF real (firma={head!r}, ct={rec['content_type']})")

                os.rename(tmp, fpath)
                rec["file_path"] = fpath
                rec["bytes"] = total
                return rec

        except Exception as e:
            rec["error"] = f"attempt {attempt}/{MAX_RETRIES}: {e}"
            time.sleep(1.5 * attempt)

    return rec

# -------------------------
# RUN
# -------------------------
if not os.path.exists(INPUT_CSV):
    raise FileNotFoundError(
        f"No encuentro {INPUT_CSV}. Revisa el panel Files o corre: !ls -lh /content"
    )

os.makedirs(OUT_DIR, exist_ok=True)

print("Leyendo CSV...")
df = robust_read_csv(INPUT_CSV)
print("OK. shape:", df.shape)

# Extraer URLs (sin filtrar por extensión; se valida al descargar)
urls = []
for col in ["pdf_links_row", "pdf_links_page"]:
    if col in df.columns:
        for v in df[col].tolist():
            urls.extend(split_field(v))
    else:
        print(f"[WARN] No existe la columna {col}")

urls = [u.strip() for u in urls if isinstance(u, str) and u.strip()]

# dedup conservando orden
seen = set()
urls_uniq = []
for u in urls:
    if u not in seen:
        urls_uniq.append(u)
        seen.add(u)

print("URLs totales (dedup):", len(urls_uniq))
print("Descargando a:", OUT_DIR)

manifest_exists = os.path.exists(MANIFEST_CSV)
with open(MANIFEST_CSV, "a", newline="", encoding="utf-8") as mf:
    writer = csv.DictWriter(
        mf,
        fieldnames=["url", "final_url", "status", "file_path", "bytes", "content_type", "error"],
    )
    if not manifest_exists:
        writer.writeheader()

    ok = 0
    fail = 0
    with requests.Session() as session:
        for i, url in enumerate(urls_uniq, start=1):
            rec = download_and_verify_pdf(session, url, OUT_DIR)
            writer.writerow(rec)
            mf.flush()

            if rec["file_path"]:
                ok += 1
                print(f"[{i}/{len(urls_uniq)}] OK   {rec['bytes']} bytes -> {os.path.basename(rec['file_path'])}")
            else:
                fail += 1
                print(f"[{i}/{len(urls_uniq)}] FAIL {url} | {rec['error']}")

            time.sleep(SLEEP_SECONDS)

print("\n== Resumen ==")
print("OK:", ok)
print("FAIL:", fail)
print("Manifest:", MANIFEST_CSV)
print("PDFs en:", OUT_DIR)


Leyendo CSV...
OK. shape: (587, 12)
[WARN] No existe la columna pdf_links_page
URLs totales (dedup): 295
Descargando a: /content/tribunal_pdfs
[1/295] OK   373676 bytes -> R-06-2013-Informe-en-derecho-de-don-Jorge-Bermudez-Soto_2.pdf
[2/295] OK   7813284 bytes -> R-40-Informe-en-Derecho-Andres-Bordali_2.pdf
[3/295] OK   466814 bytes -> R-06-2013-Informe-en-derecho-de-don-Luis-Cordero-Vega_2.pdf
[4/295] OK   1412847 bytes -> R-45-2014-06-01-2015-Doc.-Informe-en-Derecho-Luis-Cordero-Vega._2.pdf
[5/295] OK   1481462 bytes -> R-06-2013-Informe-en-derecho-de-don-Gabriel-Del-Favero-Valdes_2.pdf
[6/295] OK   141634 bytes -> R-06-2013_Informe_en_Derecho_Sr-Patricio-Leyton-Florez_2.pdf
[7/295] OK   595466 bytes -> R-40-Opinion-Legal-Cristian-Maturana_2.pdf
[8/295] OK   6692149 bytes -> R-40-Informe-en-Derecho-Cristian-Maturana-y-Jaime-Jara_2.pdf
[9/295] OK   268712 bytes -> Informe-en-derecho-concurso-infraccional-imperfecto-Jean-Pierre-Matus_2.pdf
[10/295] OK   5820709 bytes -> R-40-2014-Infor

In [None]:
# Colab: crear ZIP con todos los PDFs + el manifest, y descargarlo

ZIP_PATH = "/content/tribunal_pdfs_bundle.zip"

# 1) Crear zip (incluye carpeta tribunal_pdfs y el manifest)
!rm -f "$ZIP_PATH"
!zip -r "$ZIP_PATH" /content/tribunal_pdfs /content/tribunal_pdfs_manifest.csv

# 2) Descargar al computador
from google.colab import files
files.download(ZIP_PATH)


  adding: content/tribunal_pdfs/ (stored 0%)
  adding: content/tribunal_pdfs/S-16-2015-04-02-2015-Resolucion_2.pdf (deflated 6%)
  adding: content/tribunal_pdfs/CS_13177-2018_2TA_D-13-2014_Sentencia-reemplazo_2.pdf (deflated 42%)
  adding: content/tribunal_pdfs/D-17-2015-07-07-2017-Sentencia.pdf (deflated 6%)
  adding: content/tribunal_pdfs/2024.11.04_Sentencia_R-454-2024_2.pdf (deflated 28%)
  adding: content/tribunal_pdfs/2024.11.15_Sentencia_R-451-2024_2.pdf (deflated 27%)
  adding: content/tribunal_pdfs/2025.03.13_Sentencia_R-466-2024.pdf (deflated 24%)
  adding: content/tribunal_pdfs/2025.10.30_Sentencia_R-465-2024.pdf (deflated 4%)
  adding: content/tribunal_pdfs/Sentencia-de-reemplazo-Rol-37273-2017-Nogales.pdf (deflated 40%)
  adding: content/tribunal_pdfs/S-53-2016-28-10-2016-Resolucion_2.pdf (deflated 8%)
  adding: content/tribunal_pdfs/2024.11.06_Sentencia_R-425-2023.pdf (deflated 20%)
  adding: content/tribunal_pdfs/2024.07.03_Sentencia_R-405-2023_2.pdf (deflated 28%)
  add

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>