In [None]:
import logging
import re
import time
from datetime import datetime
from typing import Dict, List, Optional, Set
from urllib.parse import quote_plus

from bs4 import BeautifulSoup

# Importaciones de Selenium
from selenium import webdriver
from selenium.webdriver.chrome.service import Service as ChromeService
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import (
    TimeoutException,
    NoSuchElementException,
    StaleElementReferenceException,
    WebDriverException,
)

# Para an√°lisis y visualizaci√≥n en Notebook
import pandas as pd
from IPython.display import display

# Configuraci√≥n de Logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# --- 1. BASE SCRAPER (Driver Handling) ---

class BaseScraper:
    """Clase base para manejar la inicializaci√≥n y cierre del WebDriver."""
    
    def __init__(self, headless: bool = True, driver_path: Optional[str] = None):
        """Inicializa el WebDriver de Chrome."""
        self.driver: Optional[webdriver.Chrome] = None
        self.headless = headless
        self.driver_path = driver_path # Ruta opcional si el driver no est√° en el PATH
        self._initialize_driver()

    def _initialize_driver(self):
        """Configura y lanza el WebDriver."""
        chrome_options = Options()
        # Opciones recomendadas para evitar detecciones de bot
        chrome_options.add_argument("--no-sandbox")
        chrome_options.add_argument("--disable-dev-shm-usage")
        chrome_options.add_argument("--disable-gpu")
        chrome_options.add_argument("--window-size=1920,1080")
        chrome_options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36")
        
        if self.headless:
            chrome_options.add_argument("--headless")
            
        try:
            if self.driver_path:
                service = ChromeService(executable_path=self.driver_path)
                self.driver = webdriver.Chrome(service=service, options=chrome_options)
            else:
                # Asume que el driver est√° en el PATH (o en el mismo directorio si usas Colab/Jupyter con un driver descargado)
                self.driver = webdriver.Chrome(options=chrome_options)
            logging.info("WebDriver inicializado correctamente.")
        except Exception as e:
            logging.error(f"Error al inicializar WebDriver: {e}. Aseg√∫rate de que ChromeDriver est√° instalado y en el PATH.")
            raise

    def close(self):
        """Cierra el WebDriver."""
        if self.driver:
            self.driver.quit()
            self.driver = None
            logging.info("WebDriver cerrado.")

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()

# --- 2. UTILIDADES COMPARTIDAS ---

BLOCK_PATTERNS = (
    "punish", "unusual traffic", "error:gvs", "robot check",
    "are you a robot", "are you human", "please verify you are a human",
    "verify you are human", "security verification", "complete the captcha",
    "captcha verification", "please complete the captcha",
)
_RANGE_SPLIT_PATTERN = re.compile(r"(?<=\d)\s*[-‚Äì‚Äî]\s*(?=\d)")
_currency_re = re.compile(r"(US\$|S/|[$‚Ç¨¬£¬•])")
_rating_re = re.compile(r"([\d.]+)\s*/\s*5(?:\.0)?\s*\((\d+)\)")
_years_re = re.compile(r"(\d+)\s*(?:a√±os|years?)", re.I)
_percent_re = re.compile(r"(\d+)\s*%")

def limpiar_precio(texto: Optional[str]) -> Optional[float]:
    """Limpia y convierte texto de precio a float."""
    def _normalizar(texto_unitario: str) -> Optional[float]:
        cleaned = re.sub(r"[^0-9.,]", "", texto_unitario)
        if not cleaned: return None
        # L√≥gica de detecci√≥n de separador decimal (copiada de tu c√≥digo original)
        decimal_sep: Optional[str] = None
        has_dot = "." in cleaned
        has_comma = "," in cleaned
        if has_dot and has_comma:
            decimal_sep = "," if cleaned.rfind(",") > cleaned.rfind(".") else "."
        elif has_dot:
            if len(cleaned.rpartition(".")[-1]) in (1, 2): decimal_sep = "."
        elif has_comma:
            if len(cleaned.rpartition(",")[-1]) in (1, 2): decimal_sep = ","

        if decimal_sep:
            int_part, dec_part = cleaned.rsplit(decimal_sep, 1)
            int_digits = re.sub(r"[^0-9]", "", int_part)
            dec_digits = re.sub(r"[^0-9]", "", dec_part)
            number_str = f"{int_digits}.{dec_digits or '0'}"
        else:
            number_str = re.sub(r"[^0-9]", "", cleaned)
            if not number_str: return None
            
        try:
            return float(number_str)
        except ValueError:
            return None

    if not texto: return None
    texto = texto.strip()
    # Si es un rango de precios, toma el m√≠nimo (el primero)
    if _RANGE_SPLIT_PATTERN.search(texto):
        partes = [p.strip() for p in _RANGE_SPLIT_PATTERN.split(texto) if p.strip()]
        if partes: texto = partes[0]
    
    return _normalizar(texto)

def limpiar_cantidad(texto: Optional[str]) -> int:
    """Limpia y convierte texto de cantidad (ej. MOQ, ventas) a int."""
    if texto is None: return 0
    t = texto.strip().lower().replace("+", "")
    if not t: return 0
    mult = 1
    if re.search(r"k\b", t):
        mult = 1000
        t = re.sub(r"k\b", "", t)
    n = limpiar_precio(t) or 0.0
    return int(round(n * mult))

def detectar_moneda(texto: str) -> Optional[str]:
    """Detecta el s√≠mbolo de la moneda."""
    if not texto: return None
    m = _currency_re.search(texto)
    return m.group(1) if m else None

def parse_rating(texto: str) -> (Optional[float], Optional[int]):
    """Parsea el score y el conteo de ratings."""
    if not texto: return (None, None)
    m = _rating_re.search(texto)
    if not m: return (None, None)
    try:
        return float(m.group(1)), int(m.group(2))
    except:
        return (None, None)

def parse_moq(texto: str) -> (Optional[int], Optional[str]):
    """Parsea el valor num√©rico del MOQ (Min Order Quantity)."""
    if not texto: return (None, None)
    m = re.search(r"(\d[\d.,]*)", texto)
    if not m: return (None, texto.strip())
    try:
        val = limpiar_cantidad(m.group(1))
    except:
        val = None
    return val, texto.strip()

def parse_repeat_rate(texto: str) -> Optional[int]:
    """Parsea la tasa de repetici√≥n de compra (porcentaje)."""
    if not texto: return None
    m = _percent_re.search(texto)
    if not m: return None
    try:
        return int(m.group(1))
    except:
        return None

# --- 3. ALIBABA SCRAPER ---

class AlibabaScraper(BaseScraper):
    """Scraper Alibaba (layout searchx/fy26) con scroll humano y datos extra."""

    # Contenedores de cards (selectores m√∫ltiples para robustez)
    CARD_CONTAINERS: List[str] = [
        "div.fy26-product-card-content",
        "div.searchx-product-card",
        "div.card-info.gallery-card-layout-info",
    ]

    # Selectores internos (selectores m√∫ltiples para robustez)
    A_CARD: List[str] = ["h2.searchx-product-e-title a", "a.searchx-product-link-wrapper", "a"]
    TITLE: List[str] = ["h2.searchx-product-e-title span", "h2.searchx-product-e-title a", "h2.search-card-e-title a", "h1, h2, h3"]
    PRICE: List[str] = ["div.searchx-product-price-price-main", "div.searchx-product-price", "div.search-card-e-price-main"]
    PRICE_ORIGINAL: List[str] = ["del", "s", ".price-origin"]
    DISCOUNT: List[str] = [".discount", ".sale-tag", "[data-discount]"]
    SUPPLIER_NAME: List[str] = ["a.searchx-product-e-company", "a.search-card-e-company"]
    SUPPLIER_YEAR_COUNTRY: List[str] = ["a.searchx-product-e-supplier__year"]
    VERIFIED_BADGE: List[str] = [".verified-supplier-icon__wrapper", "img.searchx-verified-icon"]
    RATING: List[str] = ["span.searchx-product-e-review"]
    SELLING_POINTS: List[str] = [".searchx-selling-point-text"]

    # ----------------- utilidades privadas -----------------

    def _accept_banners(self, timeout: int = 5):
        """Intenta aceptar cookies/banners para evitar interrupci√≥n."""
        candidates = [
            (By.XPATH, "//button[contains(., 'Aceptar') or contains(., 'Accept')]"),
            (By.XPATH, "//button[contains(., 'Allow all')]"),
            (By.CSS_SELECTOR, "[role='button'][aria-label*='accept' i]"),
        ]
        for by, sel in candidates:
            try:
                btn = WebDriverWait(self.driver, timeout).until(EC.element_to_be_clickable((by, sel)))
                btn.click()
                time.sleep(0.3)
                logging.info("Banner de cookies aceptado.")
            except Exception:
                continue

    def _human_scroll_until_growth(self, max_scrolls: int = 16, pause: float = 1.0):
        """Simula scroll de humano para cargar contenido din√°mico."""
        last_height = self.driver.execute_script("return document.body.scrollHeight") if self.driver else 0
        for i in range(max_scrolls):
            if not self.driver: break
            
            # Intenta scroll normal
            self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
            time.sleep(pause)
            
            new_height = self.driver.execute_script("return document.body.scrollHeight")
            
            # Si la altura no creci√≥, realiza un scroll extra para forzar la carga
            if new_height <= last_height:
                self.driver.execute_script("window.scrollBy(0, 700);")
                time.sleep(pause)
                new_height = self.driver.execute_script("return document.body.scrollHeight")
                if new_height <= last_height:
                     # Si sigue sin crecer, asumimos que hemos llegado al final o est√° bloqueado
                    break
            
            last_height = new_height

    def _first_match(self, root, selectors: List[str]):
        """Busca el primer elemento que coincida con la lista de selectores CSS."""
        if not self.driver: return None
        for css in selectors:
            try:
                el = root.find_element(By.CSS_SELECTOR, css)
                if el:
                    return el
            except Exception:
                continue
        return None

    def _find_all_any(self, selectors: List[str], timeout: int = 10) -> List:
        """Espera a que cualquier selector est√© presente y retorna todos los elementos."""
        if not self.driver: return []
        for css in selectors:
            try:
                WebDriverWait(self.driver, timeout).until(
                    EC.presence_of_all_elements_located((By.CSS_SELECTOR, css))
                )
                els = self.driver.find_elements(By.CSS_SELECTOR, css)
                if els:
                    return els
            except TimeoutException:
                continue
        return []

    @staticmethod
    def _resolve_text(node) -> Optional[str]:
        """Obtiene el texto de un elemento Selenium o BeautifulSoup."""
        if node is None: return None
        # Intenta con Selenium (get_attribute para innerText)
        get_attribute = getattr(node, "get_attribute", None)
        if callable(get_attribute):
            inner = get_attribute("innerText")
            if inner: return inner.strip()
            return (getattr(node, "text", "") or "").strip() or None
        # Fallback a BeautifulSoup
        return node.get_text(" ", strip=True) or None

    @staticmethod
    def _abs_link(href: str) -> str:
        """Convierte links relativos a absolutos de Alibaba."""
        if not href: return ""
        if href.startswith("//"): return "https:" + href
        if href.startswith("/"): return "https://www.alibaba.com" + href
        return href

    @staticmethod
    def _is_blocked(driver) -> bool:
        """Verifica si la p√°gina muestra signos de bloqueo (Captcha/Antibot)."""
        url = (getattr(driver, "current_url", "") or "").lower()
        if any(p in url for p in BLOCK_PATTERNS):
            return True
        html = getattr(driver, "page_source", "") or ""
        # Buscar patrones en el texto de la p√°gina
        try:
            soup = BeautifulSoup(html, "html.parser")
            soup_text = soup.get_text(separator=" ", strip=True).lower()
        except Exception:
            soup_text = html.lower()
        return any(p in soup_text for p in BLOCK_PATTERNS)

    # ----------------- extracci√≥n de card (Selenium) -----------------

    def _extract_card(self, card) -> Optional[Dict]:
        """Extrae todos los campos de datos de un √∫nico elemento 'card' (Selenium)."""
        if not self.driver: return None
        try:
            # Link
            a = self._first_match(card, self.A_CARD) or card
            link = self._abs_link((a.get_attribute("href") or "").strip())

            # T√≠tulo
            titulo_el = self._first_match(card, self.TITLE)
            titulo = self._resolve_text(titulo_el) or (a.get_attribute("title") or a.text or "").strip() or "Sin t√≠tulo"

            # Precio + moneda
            price_el = self._first_match(card, self.PRICE)
            price_text = self._resolve_text(price_el)
            precio = limpiar_precio(price_text)
            moneda = detectar_moneda(price_text or "") if price_text else None

            # Original/descuento
            pori_el = self._first_match(card, self.PRICE_ORIGINAL)
            precio_original = limpiar_precio(self._resolve_text(pori_el) if pori_el else None)
            desc_el = self._first_match(card, self.DISCOUNT)
            descuento = self._resolve_text(desc_el) if desc_el else None

            # MOQ / ventas proxy
            moq_el = self._first_match(card, ["div.searchx-moq", "div.price-area-center"])
            moq_val, moq_text = (None, None)
            if moq_el:
                moq_text = self._resolve_text(moq_el)
                moq_val, _ = parse_moq(moq_text or "")
            ventas = int(moq_val or 0) # Mantener "ventas" como proxy para el MOQ

            # Proveedor
            proveedor_el = self._first_match(card, self.SUPPLIER_NAME)
            proveedor = self._resolve_text(proveedor_el) if proveedor_el else None

            year_ctry_el = self._first_match(card, self.SUPPLIER_YEAR_COUNTRY)
            proveedor_anios, proveedor_pais = (None, None)
            if year_ctry_el:
                # La l√≥gica de parse_years_country usa elementos Selenium directamente,
                # pero para no complicar, aqu√≠ asumimos la extracci√≥n en texto simple
                text = (self._resolve_text(year_ctry_el) or "").strip()
                m_years = _years_re.search(text)
                if m_years:
                    try: proveedor_anios = int(m_years.group(1))
                    except: pass
                
                # Intentar sacar el pa√≠s del √∫ltimo span si existe
                spans = year_ctry_el.find_elements(By.TAG_NAME, "span")
                if spans:
                    maybe_country = (spans[-1].text or "").strip()
                    if maybe_country and len(maybe_country) <= 3:
                        proveedor_pais = maybe_country


            verified = bool(self._first_match(card, self.VERIFIED_BADGE))

            # Rating
            rating_el = self._first_match(card, self.RATING)
            rating_score, rating_count = (None, None)
            if rating_el:
                rating_score, rating_count = parse_rating(self._resolve_text(rating_el) or "")

            # Selling points
            envio_promesa = None
            tasa_repeticion = None
            sp = self._first_match(card, self.SELLING_POINTS)
            if sp:
                txt = (self._resolve_text(sp) or "").strip()
                if "env√≠o" in txt.lower():
                    envio_promesa = txt
                pr = parse_repeat_rate(txt)
                if pr is not None:
                    tasa_repeticion = pr

            return {
                "titulo": titulo,
                "precio": precio,
                "precio_original": precio_original,
                "descuento": descuento,
                "ventas": ventas,
                "link": link,
                "moneda": moneda,
                "proveedor": proveedor,
                "proveedor_anios": proveedor_anios,
                "proveedor_pais": proveedor_pais,
                "proveedor_verificado": verified,
                "rating_score": rating_score,
                "rating_count": rating_count,
                "moq": moq_val,
                "moq_texto": moq_text,
                "envio_promesa": envio_promesa,
                "tasa_repeticion": tasa_repeticion,
            }
        except (NoSuchElementException, StaleElementReferenceException):
            return None
        except Exception as e:
            logging.error(f"Error extrayendo card Alibaba: {e}")
            return None

    # ----------------- flujo principal -----------------

    def parse(self, producto: str, paginas: int = 4):
        """Busca el producto en Alibaba y extrae datos de m√∫ltiples p√°ginas."""
        if not self.driver:
            logging.error("El driver de Selenium no est√° inicializado. No se puede ejecutar el parseo.")
            return []

        resultados: List[Dict] = []
        fecha_scraping = datetime.now().strftime("%Y-%m-%d")

        for page in range(1, paginas + 1):
            q = quote_plus(producto)
            url = f"https://www.alibaba.com/trade/search?SearchText={q}&page={page}"
            logging.info(f"Cargando Alibaba: P√°gina {page} -> {url}")

            cargada = False
            for intento in range(3):
                try:
                    self.driver.get(url)
                    self._accept_banners(5)
                    WebDriverWait(self.driver, 15).until(
                        EC.presence_of_all_elements_located((By.CSS_SELECTOR, ", ".join(self.CARD_CONTAINERS)))
                    )
                    self._human_scroll_until_growth(max_scrolls=16, pause=1.0)
                    cargada = True
                    break
                except (TimeoutException, WebDriverException) as e:
                    logging.warning(f"Reintento Alibaba p{page} ({intento + 1}): {e}")
                    time.sleep(2.0)

            if not cargada:
                logging.error(f"Omitiendo p√°gina {page} por fallos de carga.")
                continue

            if self._is_blocked(self.driver):
                logging.warning(f"Posible bloqueo/antibot detectado en Alibaba (p√°gina {page}). Deteniendo scraping.")
                break

            # Extracci√≥n con Selenium
            bloques = self._find_all_any(self.CARD_CONTAINERS, timeout=8)
            logging.info(f"P√°gina {page}: {len(bloques)} productos (candidatos via Selenium)")

            count_page = 0
            for card in bloques:
                data = self._extract_card(card)
                if data:
                    data.update({
                        "pagina": page,
                        "plataforma": "Alibaba",
                        "fecha_scraping": fecha_scraping,
                    })
                    resultados.append(data)
                    count_page += 1

            logging.info(f"P√°gina {page}: {count_page} productos v√°lidos.")
            
            # Si hay 0 resultados v√°lidos en Selenium, podemos intentar el fallback de BeautifulSoup aqu√≠
            # (Tu c√≥digo original incluye el fallback, pero por simplicidad y dado que Selenium ya carg√≥ la p√°gina,
            # lo omitimos aqu√≠, ya que el objetivo es asegurar la extracci√≥n con el driver)
            
            if count_page == 0:
                 logging.warning(f"No se encontraron productos v√°lidos en la p√°gina {page}. Se asume fin o bloqueo.")
                 break
                 
        return resultados


# --- 4. EJECUCI√ìN EN NOTEBOOK ---

def run_scraper_test(producto: str, paginas: int = 2):
    """Funci√≥n de prueba para el notebook."""
    print(f"Buscando el producto: '{producto}' en {paginas} p√°ginas de Alibaba...")
    
    # IMPORTANTE: Aseg√∫rate de que el path a tu chromedriver es correcto si no est√° en el PATH
    # driver_path = "/ruta/a/tu/chromedriver" # Descomenta y ajusta si es necesario
    driver_path = None # Usar None si el driver est√° en el PATH
    
    # Usamos el manejador de contexto 'with' para asegurar que el driver se cierra
    try:
        with AlibabaScraper(headless=True, driver_path=driver_path) as scraper:
            df_resultados = pd.DataFrame()
            resultados = scraper.parse(producto, paginas=paginas)
            
            if resultados:
                df_resultados = pd.DataFrame(resultados)
                # Reordenar columnas para mejor visualizaci√≥n
                cols = ['titulo', 'precio', 'moneda', 'moq', 'proveedor', 'proveedor_pais', 
                        'rating_score', 'rating_count', 'link', 'pagina', 'plataforma']
                df_resultados = df_resultados[[c for c in cols if c in df_resultados.columns] + 
                                              [c for c in df_resultados.columns if c not in cols]]
                
                print(f"\n‚úÖ Extracci√≥n exitosa: {len(df_resultados)} productos encontrados.")
                
                # Mostrar los primeros resultados
                display(df_resultados.head(10))
                
                # An√°lisis B√°sico
                print("\n--- An√°lisis R√°pido ---")
                avg_price = df_resultados['precio'].mean()
                print(f"Precio promedio (limpio): {df_resultados['moneda'].mode().iloc[0] if not df_resultados['moneda'].empty else ''} {avg_price:.2f}")

                top_countries = df_resultados['proveedor_pais'].value_counts().head(5)
                print("\nTop 5 Pa√≠ses de Proveedor:")
                print(top_countries)
                
                return df_resultados
            else:
                print("\n‚ùå No se encontraron resultados o hubo un error al cargar la primera p√°gina.")
                return pd.DataFrame()
                
    except Exception as e:
        print(f"\nüõë Error cr√≠tico durante la ejecuci√≥n del scraper: {e}")
        return pd.DataFrame()

# --- EJECUCI√ìN DE PRUEBA ---
# Puedes cambiar el producto y el n√∫mero de p√°ginas a tu gusto
df = run_scraper_test(producto="electronic component kit", paginas=2)

# Opcional: Guardar los resultados
# if not df.empty:
#     df.to_excel("alibaba_resultados.xlsx", index=False)
#     print("\nResultados guardados en alibaba_resultados.xlsx")

Buscando el producto: 'electronic component kit' en 2 p√°ginas de Alibaba...


2025-12-09 10:37:43,140 - INFO - WebDriver inicializado correctamente.
2025-12-09 10:37:43,145 - INFO - Cargando Alibaba: P√°gina 1 -> https://www.alibaba.com/trade/search?SearchText=electronic+component+kit&page=1


