
# C2 Web Scraping — Plantilla Lista para Usar

Esta plantilla está pensada para el proyecto descrito en tu ZIP. Está organizada en **dos celdas** tal como pediste:
1) **Instalación e importación de librerías**  
2) **Código de scraping y exportación**

La celda 2 está preparada para trabajar en **dos modos**:
- **`mode = "web"`**: descarga páginas en vivo (con rotación de User-Agent, reintentos y espera).  
- **`mode = "local"`**: lee HTMLs guardados en una carpeta (p. ej., `pagina-guardada/`).

> 🔧 **Qué debes editar** en la celda 2 (sección “CONFIG”):
> - `CURRENT_TARGET`: Un nombre identificador (e.g., `"tottus_arroz"` o `"empleos_fake"`).
> - `start_urls`: Lista de URLs iniciales o una sola URL de categoría.
> - `selectors`: Diccionario con selectores CSS/XPath de `item`, `campos` y `paginación`.
> - `local_html_dir`: Carpeta con páginas guardadas si usas `mode="local"`.
>
> 💾 Al final, los datos se guardan en `data/{CURRENT_TARGET}.csv`.


In [None]:

# ======================
# 1) Instalación + Importaciones
# ======================

# Si estás en un entorno limpio (Colab/VSCode/Jupyter), puedes descomentar lo siguiente:
!pip install -q requests beautifulsoup4 lxml pandas tenacity fake-useragent html5lib

import os, time, re, json, math, random, sys, pathlib
from dataclasses import dataclass, asdict
from typing import List, Dict, Any, Optional, Iterable

import requests
from bs4 import BeautifulSoup
import pandas as pd

# Opcionales / robustez
try:
    from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
except Exception:
    # Fallback mínimo si tenacity no está
    retry = lambda *a, **k: (lambda f: f)
    def stop_after_attempt(n): return None
    def wait_exponential(multiplier=1, min=1, max=10): return None
    def retry_if_exception_type(*a, **k): return None

try:
    from fake_useragent import UserAgent
    _ua = UserAgent()
    def get_ua():
        try:
            return _ua.random
        except Exception:
            return "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120 Safari/537.36"
except Exception:
    def get_ua():
        # UA fijo si no hay fake_useragent
        return "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120 Safari/537.36"

# Asegurar carpeta de salida
os.makedirs("data", exist_ok=True)

print("✅ Librerías importadas y carpeta 'data/' lista.")


In [None]:

# ======================
# 2) Scraping + Export
# ======================

# -------- CONFIG --------
#mode = "local"  # "web" para navegar en vivo | "local" para leer HTML guardado
mode = web
#CURRENT_TARGET = "demo_target"  # Cambia por lo que indique tu imagen (p.ej., "tottus_arroz", "empleos_fake")
CURRENT_TARGET = 

# Para modo "web": pon aquí la(s) URL(s) a scrapear (categoría o listado)
start_urls = [
    # "https://www.ejemplo.com/categoria/arroz?page=1"
]

# Para modo "local": carpeta con páginas ya guardadas (.html). Ej: "pagina-guardada"
local_html_dir = "pagina-guardada"  # cambia a la carpeta real de tu ZIP si difiere

# Selectores: edita esto según el sitio objetivo. Incluye CSS o XPaths (solo CSS por simplicidad aquí).
# Estructura esperada:
# - "item": selector del contenedor de cada elemento/registro
# - "fields": dict con { nombre_campo: selector_css }
# - "pagination": selector para el enlace a la siguiente página (opcional en modo "web")
selectors = {
    "item": ".product-card, .producto, .job-card, .resultado",  # ejemplo general
    "fields": {
        "titulo": ".product-title, .title, .job-title",
        "precio": ".price, .product-price, .monto, .salary",
        "detalle_url": "a[href]",
        "categoria": ".breadcrumb .active, .category-name",
        "tienda_o_empresa": ".brand, .company, .store-name"
    },
    "pagination": "a.next, a[aria-label='Siguiente'], .pagination-next a"
}

# Umbrales y opciones
MAX_PAGES = 30         # tope de páginas por seguridad
REQUEST_DELAY = (1,3)  # espera aleatoria entre requests (segundos)
TIMEOUT = 20           # timeout por request
HEADERS = lambda: {"User-Agent": get_ua(), "Accept-Language": "es-PE,es;q=0.9,en;q=0.8"}

# Limpieza básica por defecto (ajustable para tu caso)
def clean_text(x: Optional[str]) -> Optional[str]:
    if x is None: return None
    x = re.sub(r"\s+", " ", x).strip()
    return x if x else None

# ---------------- Helpers (web) ----------------
@retry(stop=stop_after_attempt(3), wait=wait_exponential(min=1, max=10),
       retry=retry_if_exception_type((requests.RequestException,)))
def fetch(url: str) -> str:
    resp = requests.get(url, headers=HEADERS(), timeout=TIMEOUT)
    resp.raise_for_status()
    return resp.text

def parse_list_html(html: str, base_url: Optional[str]=None) -> List[Dict[str, Any]]:
    soup = BeautifulSoup(html, "lxml")
    items = soup.select(selectors["item"])
    rows = []
    for it in items:
        row = {}
        for field, sel in selectors["fields"].items():
            el = it.select_one(sel)
            if el:
                if field.endswith("_url"):
                    # normalizar URL relativa
                    href = el.get("href")
                    if href and base_url and href.startswith("/"):
                        from urllib.parse import urljoin
                        row[field] = urljoin(base_url, href)
                    else:
                        row[field] = href
                else:
                    row[field] = clean_text(el.get_text(strip=True))
            else:
                row[field] = None
        rows.append(row)
    return rows

def find_next_url(html: str, base_url: Optional[str]=None) -> Optional[str]:
    if not selectors.get("pagination"):
        return None
    soup = BeautifulSoup(html, "lxml")
    nxt = soup.select_one(selectors["pagination"])
    if nxt and nxt.get("href"):
        href = nxt["href"]
        if base_url and href.startswith("/"):
            from urllib.parse import urljoin
            return urljoin(base_url, href)
        return href
    return None

# ---------------- Pipeline ----------------
all_rows: List[Dict[str, Any]] = []

if mode == "web":
    visited = set()
    to_visit = list(start_urls)
    pages = 0

    while to_visit and pages < MAX_PAGES:
        url = to_visit.pop(0)
        if url in visited: 
            continue
        visited.add(url)
        pages += 1

        print(f"[{pages}] GET {url}")
        html = fetch(url)
        batch = parse_list_html(html, base_url=url)
        print(f"  ↳ {len(batch)} registros")
        all_rows.extend(batch)

        # Paginación
        next_url = find_next_url(html, base_url=url)
        if next_url and next_url not in visited:
            to_visit.append(next_url)

        time.sleep(random.uniform(*REQUEST_DELAY))

elif mode == "local":
    # Leer todos los .html dentro de local_html_dir (recursivo)
    html_files = []
    for root, _, files in os.walk(local_html_dir):
        for fn in files:
            if fn.lower().endswith((".html",".htm")):
                html_files.append(os.path.join(root, fn))
    if not html_files:
        print(f"⚠️ No se encontraron HTMLs en '{local_html_dir}'. Cambia la carpeta o usa mode='web'.")
    else:
        print(f"Procesando {len(html_files)} archivos HTML locales…")
        for i, fp in enumerate(sorted(html_files)):
            try:
                with open(fp, "r", encoding="utf-8", errors="ignore") as f:
                    html = f.read()
                batch = parse_list_html(html, base_url=None)
                print(f"  [{i+1}/{len(html_files)}] {os.path.basename(fp)} → {len(batch)} registros")
                all_rows.extend(batch)
            except Exception as e:
                print(f"  [x] Error en {fp}: {e}")

else:
    raise ValueError("mode debe ser 'web' o 'local'")

# ---------------- Post-procesamiento + Export ----------------
df = pd.DataFrame(all_rows).drop_duplicates()
# Normaliza precios si vienen con símbolos (opcional)
if "precio" in df.columns:
    def normalize_price(s):
        if pd.isna(s): return None
        # Extrae dígitos/decimales con coma o punto
        m = re.findall(r"[\d\.,]+", str(s))
        if not m: return None
        # Toma la última ocurrencia tipo "1,234.56" o "1.234,56" o "1234"
        val = m[-1]
        # Heurística para coma/punto
        if val.count(",") > 0 and val.count(".") > 0:
            # asume formato 1.234,56 -> reemplaza . y usa , como decimal
            val = val.replace(".", "").replace(",", ".")
        else:
            val = val.replace(",", "")
        try:
            return float(val)
        except:
            return None
    df["precio_num"] = df["precio"].apply(normalize_price)

out_csv = f"data/{CURRENT_TARGET}.csv"
df.to_csv(out_csv, index=False, encoding="utf-8-sig")
print(f"\n✅ Listo: {len(df)} filas → {out_csv}")
