<a href="https://colab.research.google.com/github/discorallado/ESP-Puerta/blob/main/Untitled6.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Notebook: Scraper inteligente para **vitelenergia.com** (Google Colab)
Este notebook extrae el catálogo completo de https://vitelenergia.com/ y guarda:
- CSV: `vitelenergia_catalogo.csv`
- JSON: `vitelenergia_catalogo.json`
- SQLite: `vitelenergia.db` (tabla `productos`)

Columnas: `sku, nombre, categoria, precio_sin_dcto, precio_con_dcto, url, url_ficha_tecnica`.

Características:
- Guardado incremental (`progress.json`) y reanudable.
- Prevención de duplicados por SKU y por URL.
- Modo rápido (`FAST_MODE`) para listar sin visitar cada producto.
- IA local ligera basada en heurísticas para extraer precios, SKU y ficha técnica.

In [1]:
# Celda 1 - Instalar dependencias
!pip install -q requests beautifulsoup4 lxml pandas tenacity tqdm
print("Dependencias instaladas.")

Dependencias instaladas.


In [2]:
# Celda 2 - Imports y configuración general
import os
import json
import time
import random
import re
import sqlite3
from typing import List, Tuple, Dict, Optional
from urllib.parse import urljoin, urlparse

import requests
from bs4 import BeautifulSoup
import pandas as pd
from tenacity import retry, stop_after_attempt, wait_random_exponential
from tqdm.auto import tqdm

# CONFIG
BASE_URL = "https://vitelenergia.com"
OUTPUT_CSV = "vitelenergia_catalogo.csv"
OUTPUT_JSON = "vitelenergia_catalogo.json"
OUTPUT_DB = "vitelenergia.db"
PROGRESS_FILE = "progress.json"

# Control de ejecución
FAST_MODE = False         # True = solo listado rápido (no visita cada producto)
SAVE_EVERY_N = 20        # guardar progreso cada N productos
REQUEST_DELAY_MIN = 0.6
REQUEST_DELAY_MAX = 1.4
MAX_PAGES_PER_CATEGORY = 200

HEADERS_POOL = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
    "(KHTML, like Gecko) Chrome/120.0 Safari/537.36",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 13_6) AppleWebKit/605.1.15 "
    "(KHTML, like Gecko) Version/16.6 Safari/605.1.15",
    "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
    "(KHTML, like Gecko) Chrome/118.0 Safari/537.36"
]

random.seed(42)

In [3]:
# Celda 3 - Utilidades de red y reintentos
@retry(stop=stop_after_attempt(3), wait=wait_random_exponential(min=1, max=4))
def fetch_html(url: str, timeout: int = 12) -> str:
    headers = {"User-Agent": random.choice(HEADERS_POOL)}
    resp = requests.get(url, headers=headers, timeout=timeout)
    resp.raise_for_status()
    # Respectful delay
    time.sleep(random.uniform(REQUEST_DELAY_MIN, REQUEST_DELAY_MAX))
    return resp.text

def clean_text(s: Optional[str]) -> str:
    if not s:
        return ""
    return re.sub(r'\s+', ' ', s).strip()

In [4]:
# Celda 4 - IA ligera (heurísticas) para precios, SKU y ficha técnica
def parse_price_candidates(text: str) -> List[float]:
    """
    Extrae candidatos numéricos desde texto y normaliza a float.
    """
    if not text:
        return []
    text = text.replace('\xa0',' ')
    # extraer patrones numéricos con posibles separadores
    matches = re.findall(r'(?:(?:\$|\b)(?:\s?))?([\d]{1,3}(?:[.,\s]\d{3})*(?:[.,]\d{1,2})?)', text)
    nums = []
    for m in matches:
        s = m.strip().replace(' ', '').replace('\u200b','')
        if s.count('.') and s.count(','):
            if s.find('.') < s.find(','):
                s = s.replace('.','').replace(',','.')
            else:
                s = s.replace(',','')
        elif s.count(',') and not s.count('.'):
            parts = s.split(',')
            if len(parts[-1]) in (1,2):
                s = ''.join(parts[:-1]).replace('.','') + '.' + parts[-1]
            else:
                s = ''.join(parts)
        else:
            s = s.replace(',','').replace('.','')
        s = re.sub(r'[^0-9\.]', '', s)
        if not s:
            continue
        try:
            val = float(s)
            nums.append(val)
        except:
            continue
    nums_unique = sorted(list(set(nums)))
    return nums_unique

def select_price_pair(candidates: List[float]) -> Tuple[Optional[float], Optional[float]]:
    """
    Mayor -> precio sin descuento, menor -> precio con descuento
    """
    if not candidates:
        return None, None
    if len(candidates) == 1:
        return candidates[-1], None
    return candidates[-1], candidates[0]

def detect_sku_from_text(text: str) -> Optional[str]:
    """
    Detecta SKU a partir de patrones comunes.
    """
    if not text:
        return None
    patterns = [
        r'(?:sku|sku:|sku\.|cod|cod\.|codigo|código|ref|ref\.|referencia|ref:)\s*[:\-]?\s*([A-Za-z0-9\-_\/]+)',
        r'Código[:\s]*([A-Za-z0-9\-_\/]+)'
    ]
    for p in patterns:
        m = re.search(p, text, re.I)
        if m:
            return m.group(1).strip()
    return None

def find_datasheet_link(soup: BeautifulSoup, base_url: str) -> Optional[str]:
    """
    Busca enlaces a PDF o a textos que indiquen 'Ficha técnica' o 'Datasheet'.
    """
    for a in soup.find_all('a', href=True):
        href = a['href'].strip()
        text = a.get_text(" ", strip=True).lower()
        if '.pdf' in href.lower() or 'ficha' in text or 'datasheet' in text or 'descargar ficha' in text:
            return urljoin(base_url, href)
    # buscar secciones con clase 'download' 'ficha' etc.
    for sec in soup.find_all(class_=re.compile(r'(download|descarga|ficha|datasheet)', re.I)):
        a = sec.find('a', href=True)
        if a:
            return urljoin(base_url, a['href'])
    return None

In [5]:
# Celda 5 - Detección de categorías desde el HEADER y construcción de lista
def extract_header_categories(home_html: str) -> List[Tuple[str, str]]:
    """
    Extrae (nombre, url) de categorías desde el header/nav del home.
    """
    soup = BeautifulSoup(home_html, "lxml")
    categories = []
    navs = soup.find_all('nav') or []
    candidates = navs if navs else ([soup.find('header')] if soup.find('header') else [])
    seen = set()
    for nav in candidates:
        if not nav:
            continue
        for a in nav.find_all('a', href=True):
            name = clean_text(a.get_text())
            href = a['href']
            if not name:
                continue
            if href.startswith('#') or 'javascript:' in href.lower():
                continue
            full = urljoin(BASE_URL, href)
            if full.rstrip('/') == BASE_URL.rstrip('/'):
                continue
            skip = ['contacto','blog','login','carrito','cart','faq','nosotros','empresa','servicios']
            if any(k in name.lower() for k in skip) or any(k in full.lower() for k in skip):
                continue
            key = (name.lower(), full)
            if key in seen:
                continue
            seen.add(key)
            categories.append((name, full))
    return categories

def build_category_tree(categories: List[Tuple[str,str]]) -> List[Tuple[str,str]]:
    """
    Por ahora retorna lista plana; preparada para árbol.
    """
    return categories

In [6]:
# Celda 6 - Heurísticas para encontrar bloques de producto en página de categoría
def find_product_blocks(soup: BeautifulSoup) -> List[BeautifulSoup]:
    selectors = [
        'li.product', '.product', '.product-item', '.woocommerce-product', '.product-card',
        '.grid-item', '.product-loop-item', '.product-block'
    ]
    blocks = []
    for sel in selectors:
        found = soup.select(sel)
        if found:
            blocks.extend(found)
    if not blocks:
        for li in soup.find_all('li'):
            if li.find('a', href=True) and li.find('img'):
                blocks.append(li)
    uniq = []
    seen = set()
    for b in blocks:
        try:
            key = (b.name, tuple(b.get('class') or []), (b.find('a')['href'] if b.find('a') and b.find('a').get('href') else ''))
        except:
            key = str(b)[:200]
        if key not in seen:
            seen.add(key)
            uniq.append(b)
    return uniq

In [7]:
# Celda 7 - Scrape de página de producto (detallado)
def scrape_product_page(product_url: str) -> Dict[str, Optional[str]]:
    """
    Visita la página del producto y extrae:
    sku, nombre, precio_sin_dcto, precio_con_dcto, url_ficha_tecnica
    """
    result = {
        "sku": "",
        "nombre": "",
        "precio_sin_dcto": "",
        "precio_con_dcto": "",
        "url": product_url,
        "url_ficha_tecnica": ""
    }
    try:
        html = fetch_html(product_url)
    except Exception as e:
        return result

    soup = BeautifulSoup(html, "lxml")
    text_all = soup.get_text(" ", strip=True)

    # Nombre: h1 preferido
    name = ""
    h1 = soup.find('h1')
    if h1 and h1.get_text(strip=True):
        name = clean_text(h1.get_text())
    else:
        # buscar elementos con clase title/product
        title_el = soup.find(attrs={"class": re.compile(r'(product|title|nombre|name|titulo)', re.I)})
        if title_el and title_el.get_text(strip=True):
            name = clean_text(title_el.get_text())
        else:
            name = clean_text(soup.title.string) if soup.title and soup.title.string else ""

    result["nombre"] = name

    # SKU
    sku = None
    sku_el = soup.find(attrs={"class": re.compile(r'(sku|codigo|code|ref)', re.I)}) or soup.find(id=re.compile(r'(sku|codigo|code|ref)', re.I))
    if sku_el and sku_el.get_text(strip=True):
        sku = clean_text(sku_el.get_text())
    if not sku:
        sku = detect_sku_from_text(text_all)
    if sku:
        sku = re.sub(r'(?i)(sku|cod|codigo|ref|referencia)[:\s\-]*', '', sku).strip()
        result["sku"] = sku

    # Precios: buscar contenedores con clase 'price' 'precio' y fallback a todo el texto
    price_texts = []
    price_containers = soup.find_all(class_=re.compile(r'(price|precio|amount|valor)', re.I))
    for pc in price_containers:
        price_texts.append(pc.get_text(" ", strip=True))
    # fallback: toda la página
    price_texts.append(text_all)

    candidates = []
    for pt in price_texts:
        candidates.extend(parse_price_candidates(pt))
    candidates = sorted(list(set(candidates)))
    p_before, p_after = select_price_pair(candidates)
    if p_before is not None:
        result["precio_sin_dcto"] = str(int(p_before)) if float(p_before).is_integer() else str(p_before)
    if p_after is not None:
        result["precio_con_dcto"] = str(int(p_after)) if float(p_after).is_integer() else str(p_after)

    # Ficha técnica
    ficha = find_datasheet_link(soup, product_url)
    if ficha:
        result["url_ficha_tecnica"] = ficha

    return result

In [8]:
# Celda 8 - Guardado incremental (progress) y deduplicación (por SKU y URL)
def load_progress() -> Dict:
    if os.path.exists(PROGRESS_FILE):
        with open(PROGRESS_FILE, 'r', encoding='utf-8') as f:
            return json.load(f)
    # estructura:
    return {"collected": [], "by_sku": {}, "by_url": {}, "last_category_index": 0, "processed_count": 0}

def save_progress(progress: Dict):
    with open(PROGRESS_FILE, 'w', encoding='utf-8') as f:
        json.dump(progress, f, ensure_ascii=False, indent=2)

def initialize_db(db_path: str = OUTPUT_DB):
    conn = sqlite3.connect(db_path)
    cur = conn.cursor()
    cur.execute("""
        CREATE TABLE IF NOT EXISTS productos (
            sku TEXT PRIMARY KEY,
            nombre TEXT,
            categoria TEXT,
            precio_sin_dcto REAL,
            precio_con_dcto REAL,
            url TEXT,
            url_ficha_tecnica TEXT
        )
    """)
    conn.commit()
    conn.close()

def upsert_product_db(product: Dict, db_path: str = OUTPUT_DB):
    conn = sqlite3.connect(db_path)
    cur = conn.cursor()
    # usar REPLACE INTO para simplificar (o INSERT OR REPLACE)
    cur.execute("""
        INSERT OR REPLACE INTO productos(sku, nombre, categoria, precio_sin_dcto, precio_con_dcto, url, url_ficha_tecnica)
        VALUES (?, ?, ?, ?, ?, ?, ?)
    """, (
        product.get("sku") or product.get("url"),
        product.get("nombre"),
        product.get("categoria"),
        float(product.get("precio_sin_dcto")) if product.get("precio_sin_dcto") else None,
        float(product.get("precio_con_dcto")) if product.get("precio_con_dcto") else None,
        product.get("url"),
        product.get("url_ficha_tecnica")
    ))
    conn.commit()
    conn.close()

In [9]:
# Celda 9 - Scrape de categoría con modo FAST_MODE y control de paginación
def scrape_category(category_name: str, category_url: str, progress: Dict, start_id: int = 1) -> Tuple[List[Dict], Dict]:
    """
    Recorre una categoría (paginación), extrae enlaces y (opcional) visita cada producto.
    Actualiza progress dict y devuelve lista de nuevos productos encontrados.
    """
    print(f"Scrapeando categoría: {category_name} -> {category_url}")
    new_products = []
    to_visit_pages = [category_url]
    visited_pages = set()
    pages_crawled = 0

    while to_visit_pages and pages_crawled < MAX_PAGES_PER_CATEGORY:
        page_url = to_visit_pages.pop(0)
        if page_url in visited_pages:
            continue
        visited_pages.add(page_url)
        pages_crawled += 1
        try:
            html = fetch_html(page_url)
        except Exception as e:
            print(f"  [!] Error al cargar {page_url}: {e}")
            continue
        soup = BeautifulSoup(html, "lxml")

        blocks = find_product_blocks(soup)
        # fallback: enlaces con '/product' en href
        if not blocks:
            for a in soup.find_all('a', href=True):
                href = a['href']
                if '/product' in href or '/producto' in href or '/producto-' in href:
                    blocks.append(a)

        for b in blocks:
            # obtener URL y nombre del bloque
            a = b.find('a', href=True) if hasattr(b, 'find') else None
            product_url = urljoin(page_url, a['href']) if a and a.get('href') else (a if isinstance(a,str) else None)
            name_block = clean_text(a.get_text()) if a and a.get_text() else clean_text(b.get_text(" ", strip=True)[:120])

            if not product_url:
                continue
            # deduplicación por URL
            if product_url in progress.get("by_url", {}):
                # ya procesado o listado
                continue

            if FAST_MODE:
                # Modo rápido: solo tomar nombre, url y categoría
                prod = {
                    "sku": "",
                    "nombre": name_block,
                    "categoria": category_name,
                    "precio_sin_dcto": "",
                    "precio_con_dcto": "",
                    "url": product_url,
                    "url_ficha_tecnica": ""
                }
            else:
                # modo completo: visitar página del producto
                details = scrape_product_page(product_url)
                prod = {
                    "sku": details.get("sku") or "",
                    "nombre": details.get("nombre") or name_block,
                    "categoria": category_name,
                    "precio_sin_dcto": details.get("precio_sin_dcto") or "",
                    "precio_con_dcto": details.get("precio_con_dcto") or "",
                    "url": product_url,
                    "url_ficha_tecnica": details.get("url_ficha_tecnica") or ""
                }

            # deduplicación por SKU preferente, sino por URL
            key_sku = prod["sku"] if prod["sku"] else None
            if key_sku and key_sku in progress.get("by_sku", {}):
                # duplicado por SKU -> ignorar
                continue
            if prod["url"] in progress.get("by_url", {}):
                continue

            # registrar en progreso y persistir poco a poco
            idx = len(progress.get("collected", [])) + 1
            progress.setdefault("collected", []).append(prod)
            if prod["sku"]:
                progress.setdefault("by_sku", {})[prod["sku"]] = True
            progress.setdefault("by_url", {})[prod["url"]] = True
            progress["processed_count"] = progress.get("processed_count", 0) + 1

            # persistir en DB
            upsert_product_db(prod)

            new_products.append(prod)

            # Guardado incremental cada SAVE_EVERY_N
            if progress["processed_count"] % SAVE_EVERY_N == 0:
                save_progress(progress)
                # exportar parcial
                df_partial = pd.DataFrame(progress["collected"])
                df_partial.to_csv(OUTPUT_CSV, index=False, encoding='utf-8')
                with open(OUTPUT_JSON, 'w', encoding='utf-8') as f:
                    json.dump(progress["collected"], f, ensure_ascii=False, indent=2)
                print(f"  [*] Guardado incremental: {progress['processed_count']} productos.")

        # detectar enlaces de paginación y añadir
        pag_links = set()
        for pag in soup.find_all(class_=re.compile(r'(pagination|paginador|page-numbers|nav-pages)', re.I)):
            for a in pag.find_all('a', href=True):
                pag_links.add(urljoin(page_url, a['href']))
        for a in soup.find_all('a', href=True):
            href = a['href']
            if re.search(r'/page/\d+/?', href) or re.search(r'(\?|&)page=\d+', href):
                pag_links.add(urljoin(page_url, href))
        for pl in pag_links:
            if pl not in visited_pages:
                to_visit_pages.append(pl)

    if pages_crawled >= MAX_PAGES_PER_CATEGORY:
        print("  [!] Límite de páginas por categoría alcanzado; detener para evitar loop infinito.")

    return new_products, progress

In [1]:
# Celda 10 - Orquestador principal: detectar categorías, cargar progreso, iterar y finalizar
def export_all_formats(collected: List[Dict]):
    # CSV
    df = pd.DataFrame(collected, columns=["sku","nombre","categoria","precio_sin_dcto","precio_con_dcto","url","url_ficha_tecnica"])
    df.to_csv(OUTPUT_CSV, index=False, encoding='utf-8')
    # JSON
    with open(OUTPUT_JSON, 'w', encoding='utf-8') as f:
        json.dump(collected, f, ensure_ascii=False, indent=2)
    print(f"Exportados: {OUTPUT_CSV}, {OUTPUT_JSON}")
    # SQLite ya fue actualizado incrementalmente (tabla productos)

def main_run(limit_categories: Optional[int] = None, resume: bool = True):
    # preparar DB y progreso
    initialize_db()
    progress = load_progress() if resume else {"collected": [], "by_sku": {}, "by_url": {}, "last_category_index": 0, "processed_count": 0}
    # cargar home y detectar categorías
    try:
        home_html = fetch_html(BASE_URL)
    except Exception as e:
        print(f"[ERROR] No se pudo acceder a {BASE_URL}: {e}")
        return []
    cats = extract_header_categories(home_html)
    if not cats:
        # heurística alternativa
        soup = BeautifulSoup(home_html, "lxml")
        alt = []
        for a in soup.find_all('a', href=True):
            href = a['href']
            if 'categoria' in href or 'product-category' in href or '/product' in href:
                alt.append((clean_text(a.get_text()) or href, urljoin(BASE_URL, href)))
        cats = alt[:50]
    if limit_categories:
        cats = cats[:limit_categories]
    print(f"Categorías detectadas: {len(cats)}")
    start_index = progress.get("last_category_index", 0)
    total_new = 0
    for i, (name, url) in enumerate(cats):
        if i < start_index:
            continue
        new_prods, progress = scrape_category(name, url, progress, start_id=len(progress.get("collected", []))+1)
        total_new += len(new_prods)
        progress["last_category_index"] = i + 1
        # guardar progreso inmediato tras cada categoría
        save_progress(progress)
    # final export
    export_all_formats(progress.get("collected", []))
    print(f"Scraping completado. Nuevos productos: {total_new}. Total acumulado: {len(progress.get('collected', []))}")
    return progress.get("collected", [])

# Ejecutar
collected = main_run(limit_categories=None, resume=True)

NameError: name 'List' is not defined

In [None]:
# Celda 11 - Mostrar resultados y primer vistazo
import math
print(f"Total productos recolectados: {len(collected)}")
if collected:
    df_final = pd.DataFrame(collected, columns=["sku","nombre","categoria","precio_sin_dcto","precio_con_dcto","url","url_ficha_tecnica"])
    display(df_final.head(50))
else:
    print("No se recogieron productos.")

# Notas finales y recomendaciones
- Si la web carga productos vía JavaScript y ves pocos o ningún bloque en el HTML, dímelo y adaptamos para:
  - a) investigar llamadas XHR (preferible) y replicarlas, o
  - b) usar Selenium/Playwright (más pesado en Colab).
- Puedes cambiar `FAST_MODE = True` al inicio para ejecutar rápidamente la lista sin visitar cada producto.
- `progress.json` permite reanudar: no se volverá a procesar URLs/SKUs ya guardadas.
- Si quieres, puedo:
  - Añadir logging más detallado a archivo.
  - Añadir descarga automática de fichas técnicas (PDF).
  - Añadir multihilo seguro (con cuidado de no sobrecargar el servidor).