<a href="https://colab.research.google.com/github/datosagropy/preciosagro/blob/main/scarp_agroprecios.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [4]:
# -*- coding: utf-8 -*-
"""
Scraper unificado de precios – Py 3.7+
Revisión: 2025-06-27
Clasificación 2 niveles + precios corregidos para Stock y Superseis
"""

from __future__ import annotations

import os
import sys
import glob
import re
import unicodedata
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict, Callable, Set, Tuple
from urllib.parse import urljoin

import pandas as pd
import requests
from bs4 import BeautifulSoup
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

# ───────── Instalación dinámica (solo Colab) ────────────────────────────────
if "google.colab" in sys.modules:
    get_ipython()  # type: ignore
    !pip install -q requests pandas beautifulsoup4 gspread gspread_dataframe PyDrive typing_extensions unidecode
    from google.colab import drive, auth
    drive.mount("/content/drive")
    auth.authenticate_user()
    BASE_DIR = "/content/drive/My Drive/preciosfrutihort"
else:
    BASE_DIR = os.path.expanduser("~/preciosfrutihort")

# ───────────────────────── Constantes ──────────────────────────────────────
FILE_TAG        = "frutihort"
OUT_DIR         = BASE_DIR
PATTERN_DAILY   = os.path.join(OUT_DIR, f"*canasta_{FILE_TAG}_*.csv")
CREDS_JSON      = os.path.join(BASE_DIR, "cosmic-ascent-464210-p8-c5c205253ac8.json")
SPREADSHEET_URL = "https://docs.google.com/spreadsheets/d/10zIOm2Ks2vVtg6JH_A9_IHdyAGzcAsN32azbfaxbVnk"
WORKSHEET_NAME  = "precios_supermercados"

MAX_WORKERS, REQ_TIMEOUT = 8, 10
KEY_COLS = ["Supermercado", "CategoríaURL", "Producto", "FechaConsulta"]

# ────────────────── 1. Normalización y tokenización ──────────────────────
def strip_accents(txt: str) -> str:
    return "".join(c for c in unicodedata.normalize("NFD", txt)
                   if unicodedata.category(c) != "Mn")

_token_re = re.compile(r"[a-záéíóúñü]+", re.I)
def tokenize(txt: str) -> List[str]:
    return [strip_accents(t.lower()) for t in _token_re.findall(txt)]

# ────────────────── 2. Mapas de clasificación (2 niveles) ─────────────────
FRUTAS = {
    "naranjas": {"naranja", "naranjas"},
    "manzanas": {"manzana", "manzanas"},
    "limones": {"limon", "limones"},
    "peras": {"pera", "peras"},
    "bananas": {"banana", "bananas", "platano", "platanos"},
    "uvas": {"uva", "uvas"},
    "frutillas": {"frutilla", "frutillas", "fresa", "fresas"},
    "mangos": {"mango", "mangos"},
    "sandías": {"sandia", "sandias"},
}
VERDURAS = {
    "cebollas": {"cebolla", "cebollas"},
    "zanahorias": {"zanahoria", "zanahorias"},
    "papas": {"papa", "papas", "patata", "patatas"},
    "tomates": {"tomate", "tomates"},
    "lechugas": {"lechuga", "lechugas"},
    "zapallos": {"zapallo", "calabaza", "zapallos", "calabazas"},
    "pimientos": {"pimiento", "pimientos", "morron", "morrones"},
}
HUEVOS = {
    "huevo de gallina": {"huevo", "huevos"},
    "huevo de codorniz": {"codorniz"},
}
LECHES = {
    "leche bebible": {"leche", "uht", "entera", "descremada", "semidescremada"},
    "leche polvo": {"polvo"},
}
QUESOS = {
    "queso paraguay": {"paraguay"},
    "queso mozzarella": {"mozzarella"},
    "queso sandwich": {"sandwich", "sandwichero"},
    "queso rallado": {"rallado"},
}
PANIFICADOS = {
    "pan lactal": {"pan", "lactal", "lacteado"},
    "tortilla / chipas": {"tortilla", "chipa", "chipas"},
    "galletas": {"galleta", "galletitas"},
}
CEREALES = {
    "avena": {"avena"},
    "cereal desayuno": {"cereal", "corn", "flakes", "muesli"},
}

GROUP_DEFS: Dict[str, Dict[str, Set[str]]] = {
    "Frutas": FRUTAS,
    "Verduras": VERDURAS,
    "Huevos": HUEVOS,
    "Leches": LECHES,
    "Quesos": QUESOS,
    "Panificados": PANIFICADOS,
    "Cereales": CEREALES,
}
GROUP_TOKENS = {g: set().union(*subs.values()) for g, subs in GROUP_DEFS.items()}
KEYWORDS_SUPER = set().union(*GROUP_TOKENS.values())

def classify(name: str) -> Tuple[str | None, str | None]:
    toks = set(tokenize(name))
    for grupo, subs in GROUP_DEFS.items():
        if toks & GROUP_TOKENS[grupo]:
            for sub, words in subs.items():
                if toks & words:
                    return grupo, sub
            return grupo, ""
    return None, None

# ────────────────── 3. Blacklist no-alimentos ────────────────────────────
NON_FOOD = {
    "mg", "ml", "capsula", "capsulas", "comprimido", "comprimidos",
    "ovulo", "ovulos", "gel", "shampoo", "jabon", "jabones",
    "crema", "solar", "spf", "locion", "unguento", "spray", "ampolla"
}
def is_non_food(name: str) -> bool:
    return any(tok in NON_FOOD for tok in tokenize(name))

# ────────────────── 4. Precio ─────────────────────────────────────────────
def norm_price(val) -> float:
    if isinstance(val, (int, float)):
        return float(val)
    txt = re.sub(r"[^\d,\.]", "", str(val)).replace(".", "").replace(",", ".")
    try: return float(txt)
    except ValueError: return 0.0

def _first_price(node: BeautifulSoup, sels: List[str] | None = None) -> float:
    sels = sels or [
        "span.price ins span.amount",
        "span.price > span.amount",
        "span.woocommerce-Price-amount",
        "span.amount",
        "bdi",
        "[data-price]",
    ]
    for sel in sels:
        el = node.select_one(sel)
        if el:
            p = norm_price(el.get_text() or el.get("data-price", ""))
            if p > 0:
                return p
    return 0.0

# ────────────────── 5. HTTP session ───────────────────────────────────────
def _build_session() -> requests.Session:
    retry = Retry(total=3, backoff_factor=1.2,
                  status_forcelist=(429,500,502,503,504),
                  allowed_methods=("GET","HEAD"))
    s = requests.Session()
    s.headers["User-Agent"] = (
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
        "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124 Safari/537.36"
    )
    adapter = HTTPAdapter(max_retries=retry)
    s.mount("http://", adapter)
    s.mount("https://", adapter)
    return s

# ────────────────── 6. Base scraper ──────────────────────────────────────
class HtmlSiteScraper:
    def __init__(self, name: str, base: str):
        self.name = name
        self.base_url = base.rstrip("/")
        self.session = _build_session()

    def category_urls(self) -> List[str]: raise NotImplementedError
    def parse_category(self, url: str) -> List[Dict]: raise NotImplementedError

    def scrape(self) -> List[Dict]:
        urls = self.category_urls()
        if not urls: return []
        fecha = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        out: List[Dict] = []
        with ThreadPoolExecutor(MAX_WORKERS) as pool:
            futs = {pool.submit(self.parse_category, u): u for u in urls}
            for f in as_completed(futs):
                for r in f.result():
                    r["FechaConsulta"] = fecha
                    out.append(r)
        return out

    def save_csv(self, rows: List[Dict]) -> None:
        if not rows: return
        os.makedirs(OUT_DIR, exist_ok=True)
        fn = f"{self.name}_canasta_{FILE_TAG}_{datetime.now():%Y%m%d_%H%M%S}.csv"
        pd.DataFrame(rows).to_csv(os.path.join(OUT_DIR, fn), index=False)

# ────────────────── 7. Stock scraper (precios corregidos) ────────────────
class StockScraper(HtmlSiteScraper):
    def __init__(self):
        super().__init__("stock", "https://www.stock.com.py")

    def category_urls(self) -> List[str]:
        try:
            r = self.session.get(self.base_url, timeout=REQ_TIMEOUT)
            r.raise_for_status()
        except: return []
        soup = BeautifulSoup(r.text, "html.parser")
        return [
            urljoin(self.base_url, a["href"])
            for a in soup.select('a[href*="/category/"]')
            if any(tok in a["href"].lower() for tok in KEYWORDS_SUPER)
        ]

    def parse_category(self, url: str) -> List[Dict]:
        try:
            r = self.session.get(url, timeout=REQ_TIMEOUT)
            r.raise_for_status()
        except: return []
        soup = BeautifulSoup(r.content, "html.parser")
        rows: List[Dict] = []
        for p in soup.select("div.product-item"):
            nm = p.select_one("h2.product-title")
            if not nm: continue
            name = nm.get_text(" ", strip=True)
            if is_non_food(name): continue
            grupo, sub = classify(name)
            if not grupo: continue
            # <-- uso de selectores explícitos para precio
            precio = _first_price(p, ["span.price-label", "span.price"])
            rows.append({
                "Supermercado": "Stock",
                "CategoríaURL": url,
                "Producto": name.upper(),
                "Precio": precio,
                "Grupo": grupo,
                "Subgrupo": sub
            })
        return rows

# ────────────────── 8. Superseis scraper (precios corregidos) ────────────
class SuperseisScraper(HtmlSiteScraper):
    def __init__(self):
        super().__init__("superseis", "https://www.superseis.com.py")

    def category_urls(self) -> List[str]:
        try:
            r = self.session.get(self.base_url, timeout=REQ_TIMEOUT)
            r.raise_for_status()
        except: return []
        soup = BeautifulSoup(r.text, "html.parser")
        return [
            urljoin(self.base_url, a["href"])
            for a in soup.select('a.collapsed[href*="/category/"]')
            if any(tok in a["href"].lower() for tok in KEYWORDS_SUPER)
        ]

    def parse_category(self, url: str) -> List[Dict]:
        try:
            r = self.session.get(url, timeout=REQ_TIMEOUT)
            r.raise_for_status()
        except: return []
        soup = BeautifulSoup(r.content, "html.parser")
        rows: List[Dict] = []
        for a in soup.select("a.product-title-link"):
            name = a.get_text(" ", strip=True)
            if is_non_food(name): continue
            grupo, sub = classify(name)
            if not grupo: continue
            parent = a.find_parent("div", class_="product-item") or a
            # <-- uso de selectores explícitos para precio
            precio = _first_price(parent, ["span.price-label", "span.price"])
            rows.append({
                "Supermercado": "Superseis",
                "CategoríaURL": url,
                "Producto": name.upper(),
                "Precio": precio,
                "Grupo": grupo,
                "Subgrupo": sub
            })
        return rows

class SalemmaScraper(HtmlSiteScraper):
    def __init__(self):
        super().__init__("salemma", "https://www.salemmaonline.com.py")

    def category_urls(self) -> List[str]:
        try:
            r = self.session.get(self.base_url, timeout=REQ_TIMEOUT)
            r.raise_for_status()
        except Exception:
            return []
        soup = BeautifulSoup(r.text, "html.parser")
        return [
            urljoin(self.base_url, a["href"])
            for a in soup.find_all("a", href=True)
            if any(tok in a["href"].lower() for tok in KEYWORDS_SUPER)
        ]

    def parse_category(self, url: str) -> List[Dict]:
        try:
            r = self.session.get(url, timeout=REQ_TIMEOUT)
            r.raise_for_status()
        except Exception:
            return []
        soup = BeautifulSoup(r.content, "html.parser")
        rows: List[Dict] = []
        for f in soup.select("form.productsListForm"):
            nombre = f.find("input", {"name": "name"}).get("value", "")
            if is_non_food(nombre):
                continue
            grupo, sub = classify(nombre)
            if not grupo:
                continue
            precio = norm_price(f.find("input", {"name": "price"}).get("value", ""))
            rows.append({
                "Supermercado": "Salemma",
                "CategoríaURL": url,
                "Producto": nombre.upper(),
                "Precio": precio,
                "Grupo": grupo,
                "Subgrupo": sub
            })
        return rows

class AreteScraper(HtmlSiteScraper):
    def __init__(self):
        super().__init__("arete", "https://www.arete.com.py")

    def category_urls(self) -> List[str]:
        try:
            r = self.session.get(self.base_url, timeout=REQ_TIMEOUT)
            r.raise_for_status()
        except Exception:
            return []
        soup = BeautifulSoup(r.text, "html.parser")
        urls: Set[str] = set()
        for sel in ("#departments-menu", "#menu-departments-menu-1"):
            for a in soup.select(f'{sel} a[href^="catalogo/"]'):
                href = a["href"].split("?")[0].lower()
                if any(tok in href for tok in KEYWORDS_SUPER):
                    urls.add(urljoin(self.base_url + "/", href))
        return list(urls)

    def parse_category(self, url: str) -> List[Dict]:
        try:
            r = self.session.get(url, timeout=REQ_TIMEOUT)
            r.raise_for_status()
        except Exception:
            return []
        soup = BeautifulSoup(r.content, "html.parser")
        rows: List[Dict] = []
        for p in soup.select("div.product"):
            nm = p.select_one("h2.ecommercepro-loop-product__title")
            if not nm:
                continue
            nombre = nm.get_text(" ", strip=True)
            if is_non_food(nombre):
                continue
            grupo, sub = classify(nombre)
            if not grupo:
                continue
            precio = _first_price(p)
            rows.append({
                "Supermercado": "Arete",
                "CategoríaURL": url,
                "Producto": nombre.upper(),
                "Precio": precio,
                "Grupo": grupo,
                "Subgrupo": sub
            })
        return rows

class JardinesScraper(AreteScraper):
    def __init__(self):
        super().__init__()
        self.name = "losjardines"
        self.base_url = "https://losjardinesonline.com.py"

# ────────────────── 8. Biggie (API) ──────────────────────────────────────
class BiggieScraper:
    name, API, TAKE = "biggie", "https://api.app.biggie.com.py/api/articles", 100
    GROUPS = ["frutas-y-verduras", "huevos", "lacteos", "frutas", "verduras"]
    session = _build_session()

    def fetch_group(self, grp: str) -> List[Dict]:
        rows, skip = [], 0
        while True:
            js = self.session.get(self.API, params={
                "take": self.TAKE,
                "skip": skip,
                "classificationName": grp
            }, timeout=REQ_TIMEOUT).json()
            for it in js.get("items", []):
                nombre = it.get("name", "")
                if is_non_food(nombre):
                    continue
                grupo, sub = classify(nombre)
                if not grupo:
                    continue
                rows.append({
                    "Supermercado": "Biggie",
                    "CategoríaURL": grp,
                    "Producto": nombre.upper(),
                    "Precio": norm_price(it.get("price", 0)),
                    "Grupo": grupo,
                    "Subgrupo": sub
                })
            skip += self.TAKE
            if skip >= js.get("count", 0):
                break
        return rows

    def scrape(self) -> List[Dict]:
        fecha = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        out: List[Dict] = []
        for g in self.GROUPS:
            for r in self.fetch_group(g):
                r["FechaConsulta"] = fecha
                out.append(r)
        return out

    def save_csv(self, rows: List[Dict]) -> None:
        if rows:
            os.makedirs(OUT_DIR, exist_ok=True)
            fn = f"{self.name}_canasta_{FILE_TAG}_{datetime.now():%Y%m%d_%H%M%S}.csv"
            pd.DataFrame(rows).to_csv(os.path.join(OUT_DIR, fn), index=False)

# ────────────────── 9. Gestor de scrapers ─────────────────────────────────
SCRAPERS: Dict[str, Callable[[], object]] = {
    "stock": StockScraper,
    "superseis": SuperseisScraper,
    "salemma": SalemmaScraper,
    "arete": AreteScraper,
    "losjardines": JardinesScraper,
    "biggie": BiggieScraper,
}

def _parse_args(argv: List[str] | None = None) -> List[str]:
    if argv is None:
        return list(SCRAPERS)
    if any(a in ("-h", "--help") for a in argv):
        print("Uso: python scraper.py [sitio1 sitio2 …]"); sys.exit(0)
    sel = [a for a in argv if a in SCRAPERS]
    return sel or list(SCRAPERS)

# ────────────────── 10. Google Sheets helpers ─────────────────────────────
def _open_sheet():
    import gspread
    from gspread_dataframe import get_as_dataframe, set_with_dataframe
    from google.oauth2.service_account import Credentials

    scopes = ["https://www.googleapis.com/auth/drive",
              "https://www.googleapis.com/auth/spreadsheets"]
    creds = Credentials.from_service_account_file(CREDS_JSON, scopes=scopes)
    sh = gspread.authorize(creds).open_by_url(SPREADSHEET_URL)
    try:
        ws = sh.worksheet(WORKSHEET_NAME)
    except gspread.exceptions.WorksheetNotFound:
        ws = sh.add_worksheet(title=WORKSHEET_NAME, rows="10000", cols="40")
    df = (get_as_dataframe(ws, dtype=str, evaluate_formulas=False)
          .dropna(how="all"))
    return ws, df

def _write_sheet(ws, df: pd.DataFrame) -> None:
    from gspread_dataframe import set_with_dataframe
    ws.clear()
    set_with_dataframe(ws, df, include_index=False)

# ────────────────── 11. Orquestador principal ────────────────────────────
def main(argv: List[str] | None = None) -> int:
    objetivos = _parse_args(argv if argv is not None else sys.argv[1:])
    registros: List[Dict] = []
    for k in objetivos:
        scraper = SCRAPERS[k]()
        filas = scraper.scrape()
        scraper.save_csv(filas)
        registros.extend(filas)
        print(f"• {k:<12}: {len(filas):>6} filas")
    if not registros:
        print("Sin datos nuevos."); return 0

    csv_files = glob.glob(PATTERN_DAILY)
    if not csv_files:
        print("⚠️ No se encontraron archivos CSV para concatenar."); return 0

    df_all = pd.concat([pd.read_csv(f, dtype=str) for f in csv_files],
                       ignore_index=True, sort=False)
    df_all["Precio"] = pd.to_numeric(df_all["Precio"], errors="coerce")

    ws, df_prev = _open_sheet()
    base = pd.concat([df_prev, df_all], ignore_index=True, sort=False)
    base["FechaConsulta"] = pd.to_datetime(base["FechaConsulta"], errors="coerce")
    base.sort_values("FechaConsulta", inplace=True)
    base["FechaConsulta"] = base["FechaConsulta"].dt.strftime("%Y-%m-%d")
    base.drop_duplicates(KEY_COLS + ["Subgrupo"], keep="first", inplace=True)

    if "ID" in base.columns:
        base.drop(columns=["ID"], inplace=True)
    base.insert(0, "ID", range(1, len(base) + 1))

    _write_sheet(ws, base)
    print(f"✅ Hoja actualizada: {len(base)} filas totales")
    return 0

if __name__ == "__main__":
    sys.exit(main())


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).




• stock       :    560 filas




• superseis   :    460 filas




• salemma     :   1276 filas
• arete       :     88 filas
• losjardines :     40 filas
• biggie      :   2252 filas


  base = pd.concat([df_prev, df_all], ignore_index=True, sort=False)


✅ Hoja actualizada: 3466 filas totales


SystemExit: 0

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)
