In [10]:
import logging
import re
import time
import random
from datetime import datetime
from typing import Dict, List, Optional, Set, Tuple, Any
from urllib.parse import quote_plus, urljoin

# Importaciones de Librer√≠as de Scraping
from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.chrome.service import Service as ChromeService
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import (
    TimeoutException, WebDriverException, StaleElementReferenceException, NoSuchElementException,
)

# WebDriver Manager
try:
    from webdriver_manager.chrome import ChromeDriverManager
except ImportError:
    import subprocess
    print("Instalando webdriver-manager...")
    subprocess.check_call(['pip', 'install', 'webdriver-manager'])
    from webdriver_manager.chrome import ChromeDriverManager

import pandas as pd
from IPython.display import display

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# ----------------------------------------------------
# --- 1. CONFIGURACI√ìN GLOBAL Y UTILIDADES ---
# ----------------------------------------------------

USER_AGENTS = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.61 Safari/537.36",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.2 Safari/605.1.15",
    "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:92.0) Gecko/20100101 Firefox/92.0",
]
BLOCK_PATTERNS = ("punish", "unusual traffic", "error:gvs", "robot check", "are you a robot", "please complete the captcha", "slide to verify")
_RANGE_SPLIT_PATTERN = re.compile(r"(?<=\d)\s*[-‚Äì‚Äî]\s*(?=\d)")
_currency_re = re.compile(r"(US\$|S/|[$‚Ç¨¬£¬•])")
_rating_re = re.compile(r"([\d.]+)\s*/\s*5(?:\.0)?\s*\((\d+)\)")
_years_re = re.compile(r"(\d+)\s*(?:a√±os|years?)", re.I)
_percent_re = re.compile(r"(\d+)\s*%")

# Funciones de Utilidad
def limpiar_precio(texto: Optional[str]) -> Optional[float]:
    def _normalizar(texto_unitario: str) -> Optional[float]:
        cleaned = re.sub(r"[^0-9.,]", "", texto_unitario)
        if not cleaned: return None
        decimal_sep: Optional[str] = None
        has_dot = "." in cleaned; has_comma = "," in cleaned
        if has_dot and has_comma: decimal_sep = "," if cleaned.rfind(",") > cleaned.rfind(".") else "."
        elif has_dot:
            if len(cleaned.rpartition(".")[-1]) in (1, 2): decimal_sep = "."
        elif has_comma:
            if len(cleaned.rpartition(",")[-1]) in (1, 2): decimal_sep = ","
        if decimal_sep:
            int_part, dec_part = cleaned.rsplit(decimal_sep, 1)
            int_digits = re.sub(r"[^0-9]", "", int_part); dec_digits = re.sub(r"[^0-9]", "", dec_part)
            number_str = f"{int_digits}.{dec_digits or '0'}"
        else:
            number_str = re.sub(r"[^0-9]", "", cleaned)
            if not number_str: return None
        try: return float(number_str)
        except ValueError: return None
    if not texto: return None
    texto = texto.strip()
    if _RANGE_SPLIT_PATTERN.search(texto):
        partes = [p.strip() for p in _RANGE_SPLIT_PATTERN.split(texto) if p.strip()]
        if partes: texto = partes[0]
    return _normalizar(texto)

def limpiar_cantidad(texto: Optional[str]) -> int:
    if texto is None: return 0
    t = texto.strip().lower().replace("+", "")
    if not t: return 0
    mult = 1
    if re.search(r"k\b", t) or "mil" in t: mult = 1000; t = re.sub(r"k\b|mil", "", t)
    n = limpiar_precio(t) or 0.0
    return int(round(n * mult))

def detectar_moneda(texto: str) -> Optional[str]:
    if not texto: return None
    m = _currency_re.search(texto)
    return m.group(1) if m else None

def parse_rating(texto: str) -> Tuple[Optional[float], Optional[int]]:
    if not texto: return (None, None)
    m = _rating_re.search(texto)
    if not m: return (None, None)
    try: return float(m.group(1)), int(m.group(2))
    except: return (None, None)

def parse_moq(texto: str) -> Tuple[Optional[int], Optional[str]]:
    if not texto: return (None, None)
    m = re.search(r"(\d[\d.,]*)", texto)
    if not m: return (None, texto.strip())
    try: val = limpiar_cantidad(m.group(1))
    except: val = None
    return val, texto.strip()


# ----------------------------------------------------
## 2. L√≥gica de Extracci√≥n de Detalles
# ----------------------------------------------------

def extract_alibaba_product_details(driver: webdriver.Chrome, product_url: str) -> Dict[str, Any]:
    """Navega a una p√°gina de producto individual y extrae detalles avanzados."""
    
    details: Dict[str, Any] = {
        "link": product_url, 
        "precios_por_niveles": None, "ingresos_anuales_usd": None,
        "mercados_principales": None, "pais_origen_detallado": None,
        "atributos_produccion": None, "peso_bruto_kg": None, "tiempos_entrega": None
    }
    logging.info(f"Cargando detalles: {product_url}")
    
    try:
        driver.get(product_url)
        WebDriverWait(driver, 20).until(EC.presence_of_element_located((By.ID, "key-attributes")))
        soup = BeautifulSoup(driver.page_source, "html.parser")
        
        # 1. Precios por Niveles
        ladder_prices = []; price_container = soup.select_one('div[data-testid="ladder-price"]')
        if price_container:
            for item in price_container.select('.price-item'):
                qty_text = item.select_one('.id-text-sm.id-text-\\[\\#666\\]'); price_text = item.select_one('.id-text-2xl.id-font-bold span')
                if qty_text and price_text: ladder_prices.append({"cantidad_rango": qty_text.get_text(strip=True), "precio_unitario": limpiar_precio(price_text.get_text(strip=True))})
        details['precios_por_niveles'] = ladder_prices
        
        # 2. M√©tricas de la Compa√±√≠a
        company_data = soup.select_one('.module_unifed_company_card')
        if company_data:
            revenue_container = company_data.find('div', class_='id-text-sm id-truncate', string=lambda t: t and 'Ingresos anuales totales' in t)
            if revenue_container and (revenue_val := revenue_container.find_next_sibling('div')): details['ingresos_anuales_usd'] = limpiar_precio(revenue_val.get_text(strip=True))
            markets_container = company_data.find('div', class_='id-text-sm id-truncate', string='Mercados principales')
            if markets_container and (markets_val := markets_container.find_next_sibling('div')): details['mercados_principales'] = markets_val.get_text(strip=True)
            country_location_node = company_data.select_one('.id-mt-1.id-flex.id-items-center.id-gap-0\\.5.id-text-xs span:last-child')
            details['pais_origen_detallado'] = country_location_node.get_text(strip=True) if country_location_node else None

        # 3. Atributos Clave
        attributes = {}; attribute_table = soup.select_one('div[data-testid="module-attribute"] .id-grid-cols-2.id-border-\\[0\\.5px\\]')
        if attribute_table:
            for row in attribute_table.select('.id-grid'):
                key_node = row.select_one('.id-bg-\\[\\#f8f8f8\\]'); value_node = row.select_one('.id-font-medium')
                if key_node and value_node:
                    key = key_node.get_text(strip=True).replace(' ', '_').lower(); value = value_node.get_text(strip=True)
                    attributes[key] = value
        details['atributos_produccion'] = attributes
        details['peso_bruto_kg'] = limpiar_precio(attributes.get('peso_bruto', None)) if attributes.get('peso_bruto', None) else None
        
        # 4. Tiempos de Entrega
        delivery_table = soup.select_one('.lead-list table'); delivery_times = []
        if delivery_table:
             rows = delivery_table.select('tbody tr')
             if len(rows) > 1:
                qty_ranges = [th.get_text(strip=True) for th in rows[0].select('td')]
                time_values = [td.get_text(strip=True) for td in rows[1].select('td')]
                for r, t in zip(qty_ranges, time_values): delivery_times.append({"rango_cantidad": r, "tiempo_aprox_dias": t})
        details['tiempos_entrega'] = delivery_times
        
        return details
    
    except Exception as e:
        logging.error(f"Error al extraer detalles de {product_url}: {e}")
        return details


# ----------------------------------------------------
## 4. Clase Base (BaseScraper)
# ----------------------------------------------------

class BaseScraper:
    """Clase base para manejar la inicializaci√≥n y cierre del WebDriver."""
    
    def __init__(self, headless: bool = True, driver_path: Optional[str] = None):
        self.driver: Optional[webdriver.Chrome] = None; self.headless = headless; self.driver_path = driver_path; self._initialize_driver()
    def _initialize_driver(self):
        chrome_options = Options(); user_agent = random.choice(USER_AGENTS)
        chrome_options.add_argument("--no-sandbox"); chrome_options.add_argument("--disable-dev-shm-usage")
        chrome_options.add_argument("--disable-gpu"); chrome_options.add_argument("--window-size=1920,1080")
        chrome_options.add_argument(f"user-agent={user_agent}"); chrome_options.add_argument("--disable-blink-features=AutomationControlled")
        chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"]); chrome_options.add_experimental_option('useAutomationExtension', False)
        if self.headless: chrome_options.add_argument("--headless")
        try:
            if self.driver_path: service = ChromeService(executable_path=self.driver_path)
            else: service = ChromeService(ChromeDriverManager().install())
            self.driver = webdriver.Chrome(service=service, options=chrome_options)
        except Exception as e: logging.error(f"Error al inicializar WebDriver: {e}. ¬øEst√° Chrome instalado?"); raise
    def close(self):
        if self.driver: self.driver.quit(); self.driver = None; logging.info("WebDriver cerrado.")
    def __enter__(self): return self
    def __exit__(self, exc_type, exc_val, exc_tb): self.close()
    
    @staticmethod
    def _abs_link(href: str, domain: str) -> str:
        if not href: return ""
        if href.startswith("//"): return "https:" + href
        if href.startswith("/"): return urljoin(f"https://www.{domain}.com", href)
        return href
    
    @staticmethod
    def _is_blocked(driver) -> bool:
        url = (getattr(driver, "current_url", "") or "").lower()
        if any(p in url for p in BLOCK_PATTERNS): return True
        html = getattr(driver, "page_source", "") or ""
        try: soup = BeautifulSoup(html, "html.parser"); soup_text = soup.get_text(separator=" ", strip=True).lower()
        except Exception: soup_text = html.lower()
        return any(p in soup_text for p in BLOCK_PATTERNS)
    def _accept_banners(self, timeout: int = 5):
        candidates = [(By.XPATH, "//button[contains(., 'Aceptar') or contains(., 'Accept')]"), (By.XPATH, "//button[contains(., 'Allow all')]"), (By.CSS_SELECTOR, "[role='button'][aria-label*='accept' i]")]
        for by, sel in candidates:
            try: btn = WebDriverWait(self.driver, timeout).until(EC.element_to_be_clickable((by, sel))); btn.click(); time.sleep(0.5); logging.info("Banner de cookies aceptado.")
            except Exception: continue
    def _human_scroll_until_growth(self, max_scrolls: int = 16, pause: float = 1.0):
        last_height = self.driver.execute_script("return document.body.scrollHeight") if self.driver else 0
        for i in range(max_scrolls):
            if not self.driver: break
            self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);"); time.sleep(pause)
            new_height = self.driver.execute_script("return document.body.scrollHeight")
            if new_height <= last_height:
                self.driver.execute_script("window.scrollBy(0, 700);"); time.sleep(pause)
                new_height = self.driver.execute_script("return document.body.scrollHeight")
                if new_height <= last_height: break
            last_height = new_height
    

    
    def _find_all_any(self, selectors: List[str], timeout: int = 10) -> List:
        if not self.driver: return []
        for css in selectors:
            try:
                WebDriverWait(self.driver, timeout).until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, css)))
                els = self.driver.find_elements(By.CSS_SELECTOR, css);
                if els: return els;
            except TimeoutException: continue
        return []
    @staticmethod
    def _resolve_text(node) -> Optional[str]:
        if node is None: return None
        get_attribute = getattr(node, "get_attribute", None)
        if callable(get_attribute):
            inner = get_attribute("innerText")
            if inner: return inner.strip()
            return (getattr(node, "text", "") or "").strip() or None
        return node.get_text(" ", strip=True) or None


# ----------------------------------------------------
## 5. Clase AlibabaScraper 
# ----------------------------------------------------

class AlibabaScraper(BaseScraper):
    """Scraper Alibaba."""
    CARD_CONTAINERS: List[str] = ["div.fy26-product-card-content", "div.searchx-product-card", "div.card-info.gallery-card-layout-info"]
    A_CARD: List[str] = ["h2.searchx-product-e-title a", "a.searchx-product-link-wrapper", "a"]
    TITLE: List[str] = ["h2.searchx-product-e-title span", "h2.searchx-product-e-title a", "h2.search-card-e-title a", "h1, h2, h3"]
    PRICE: List[str] = ["div.searchx-product-price-price-main", "div.searchx-product-price", "div.search-card-e-price-main"]
    PRICE_ORIGINAL: List[str] = ["del", "s", ".price-origin"]
    DISCOUNT: List[str] = [".discount", ".sale-tag", "[data-discount]"]
    SUPPLIER_NAME: List[str] = ["a.searchx-product-e-company", "a.search-card-e-company"]
    SUPPLIER_YEAR_COUNTRY: List[str] = ["a.searchx-product-e-supplier__year"]
    VERIFIED_BADGE: List[str] = [".verified-supplier-icon__wrapper", "img.searchx-verified-icon"]
    RATING: List[str] = ["span.searchx-product-e-review"]
    SELLING_POINTS: List[str] = [".searchx-selling-point-text"]

    def _abs_link(self, href: str) -> str: return super()._abs_link(href, "alibaba")

    def _extract_card(self, card) -> Optional[Dict]:
        """Extracci√≥n con try/except expl√≠cito para evitar SYNTAXERROR."""
        try:
            a = self._first_match(card, self.A_CARD) or card
            link = self._abs_link((a.get_attribute("href") or "").strip())
            
            # --- Extracci√≥n de datos ---
            titulo_el = self._first_match(card, self.TITLE); titulo = self._resolve_text(titulo_el) or "Sin t√≠tulo"
            price_el = self._first_match(card, self.PRICE); price_text = self._resolve_text(price_el)
            precio = limpiar_precio(price_text); moneda = detectar_moneda(price_text or "") if price_text else None
            pori_el = self._first_match(card, self.PRICE_ORIGINAL); precio_original = limpiar_precio(self._resolve_text(pori_el) if pori_el else None)
            desc_el = self._first_match(card, self.DISCOUNT); descuento = self._resolve_text(desc_el) if desc_el else None
            moq_el = self._first_match(card, ["div.searchx-moq", "div.price-area-center"]); moq_val, moq_text = (None, None); 
            if moq_el: moq_text = self._resolve_text(moq_el); moq_val, _ = parse_moq(moq_text or "")
            ventas = int(moq_val or 0); proveedor_el = self._first_match(card, self.SUPPLIER_NAME); proveedor = self._resolve_text(proveedor_el) if proveedor_el else None
            year_ctry_el = self._first_match(card, self.SUPPLIER_YEAR_COUNTRY); proveedor_anios, proveedor_pais = (None, None)
            if year_ctry_el:
                text = (self._resolve_text(year_ctry_el) or "").strip(); m_years = _years_re.search(text)
                if m_years: proveedor_anios = int(m_years.group(1)) if m_years.group(1).isdigit() else None
                spans = year_ctry_el.find_elements(By.TAG_NAME, "span")
                if spans:
                    maybe_country = (spans[-1].text or "").strip()
                    if maybe_country and len(maybe_country) <= 3: proveedor_pais = maybe_country
            verified = bool(self._first_match(card, self.VERIFIED_BADGE)); rating_el = self._first_match(card, self.RATING); rating_score, rating_count = parse_rating(self._resolve_text(rating_el) or "")
            envio_promesa = None; tasa_repeticion = None; sp = self._first_match(card, self.SELLING_POINTS)
            if sp:
                txt = (self._resolve_text(sp) or "").strip();
                if "env√≠o" in txt.lower(): envio_promesa = txt
                pr = _percent_re.search(txt);
                if pr: tasa_repeticion = int(pr.group(1)) if pr.group(1).isdigit() else None

            return {"titulo": titulo, "precio": precio, "precio_original": precio_original, "descuento": descuento, "ventas": ventas, "link": link, "moneda": moneda, "proveedor": proveedor, "proveedor_anios": proveedor_anios, "proveedor_pais": proveedor_pais, "proveedor_verificado": verified, "rating_score": rating_score, "rating_count": rating_count, "moq": moq_val, "moq_texto": moq_text, "envio_promesa": envio_promesa, "tasa_repeticion": tasa_repeticion,}
        except Exception as e: 
            logging.error(f"Error extrayendo card Alibaba: {e}")
            return None

    def parse(self, producto: str, paginas: int = 4) -> List[Dict]:
        resultados: List[Dict] = []; fecha_scraping = datetime.now().strftime("%Y-%m-%d")
        for page in range(1, paginas + 1):
            q = quote_plus(producto); url = f"https://www.alibaba.com/trade/search?SearchText={q}&page={page}"
            logging.info(f"Cargando Alibaba: P√°gina {page}")
            cargada = False
            for intento in range(3):
                try:
                    self.driver.get(url); self._accept_banners(5)
                    WebDriverWait(self.driver, 25).until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, ", ".join(self.CARD_CONTAINERS))))
                    self._human_scroll_until_growth(max_scrolls=16, pause=1.0)
                    cargada = True; break
                except (TimeoutException, WebDriverException) as e: logging.warning(f"Reintento Alibaba p{page} ({intento + 1}): {e}"); time.sleep(3.0) 
            
            if not cargada: continue
            if self._is_blocked(self.driver): logging.warning("Bloqueo de Captcha detectado (tr√°fico inusual). Deteniendo Alibaba."); break
            
            bloques = self._find_all_any(self.CARD_CONTAINERS, timeout=8); count_page = 0
            for card in bloques:
                data = self._extract_card(card)
                if data:
                    data.update({"pagina": page, "plataforma": "Alibaba", "fecha_scraping": fecha_scraping})
                    resultados.append(data); count_page += 1
            logging.info(f"P√°gina {page}: {count_page} productos v√°lidos.")
            if count_page == 0 and page > 1: break
        return resultados


# ----------------------------------------------------
## 5. Ejecuci√≥n, Consolidaci√≥n y Exportaci√≥n
# ----------------------------------------------------

def run_alibaba_scraper_completo(producto: str, paginas_busqueda: int = 2, max_detalles: int = 3):
    """
    Ejecuta el scraper de b√∫squeda y luego el scraper de detalles
    para los primeros `max_detalles` enlaces, y exporta a CSV.
    """
    
    # 1. Scraping de B√∫squeda
    print(f"\n--- üåê Paso 1: Scraping de B√∫squeda en ALIBABA para '{producto}' ---")
    driver_path = None 
    df_lista_productos = pd.DataFrame()
    
    try:
        # Modo visual activo (headless=False)
        with AlibabaScraper(headless=False, driver_path=driver_path) as scraper:
            resultados_lista = scraper.parse(producto, paginas=paginas_busqueda)
            if not resultados_lista:
                print("‚ùå B√∫squeda fallida o bloqueada. No hay enlaces para detalles.")
                return pd.DataFrame()
            df_lista_productos = pd.DataFrame(resultados_lista)
            print(f"‚úÖ B√∫squeda completada: {len(df_lista_productos)} productos encontrados.")
            
    except Exception as e:
        print(f"üõë Error cr√≠tico en el scraper de b√∫squeda: {e}"); return pd.DataFrame()

    # 2. Scraping de Detalles
    links_a_scrapear = df_lista_productos['link'].head(max_detalles).tolist()
    print(f"\n--- üîé Paso 2: Scraping de Detalles para {min(len(df_lista_productos), max_detalles)} Enlaces ---")
    detalles_completos = []
    
    try:
        with AlibabaScraper(headless=False, driver_path=driver_path) as scraper:
            driver = scraper.driver
            for link in links_a_scrapear:
                detalle = extract_alibaba_product_details(driver, link) 
                detalles_completos.append(detalle)
                time.sleep(random.uniform(2.0, 4.0)) 
    except Exception as e:
        print(f"üõë Error cr√≠tico durante el scraping de detalles: {e}"); 
    
    df_detalles = pd.DataFrame(detalles_completos)
    
    # 3. Consolidaci√≥n y Exportaci√≥n
    if df_detalles.empty:
        df_final = df_lista_productos
        print("‚ö†Ô∏è No se pudieron obtener los detalles, exportando solo datos de b√∫squeda.")
    else:
        df_final = pd.merge(df_lista_productos, df_detalles, on='link', how='left')

    # 4. Limpieza y Exportaci√≥n Final
    
    nombre_archivo = f"alibaba_analisis_{producto.replace(' ', '_')}_{datetime.now().strftime('%Y%m%d_%H%M')}.csv"
    
    df_final.to_csv(nombre_archivo, index=False, encoding='utf-8-sig')
    
    print(f"\n--- ‚úÖ √âXITO Y EXPORTACI√ìN ---")
    print(f"Datos combinados (B√∫squeda + Detalles) exportados a: {nombre_archivo}")
    
    cols_display = ['plataforma', 'titulo', 'precio', 'moneda', 'proveedor', 
                    'ingresos_anuales_usd', 'peso_bruto_kg', 'precios_por_niveles']
    
    cols_existentes = [col for col in cols_display if col in df_final.columns]
    
    display(df_final[cols_existentes].head())
    
    return df_final

# --- EJECUCI√ìN PRINCIPAL: Buscando 'camisa' ---
df_final = run_alibaba_scraper_completo(producto="camisa", paginas_busqueda=1, max_detalles=3)




--- üåê Paso 1: Scraping de B√∫squeda en ALIBABA para 'camisa' ---


2025-12-09 11:55:49,492 - INFO - Get LATEST chromedriver version for google-chrome
2025-12-09 11:55:49,507 - INFO - Get LATEST chromedriver version for google-chrome
2025-12-09 11:55:49,522 - INFO - Driver [C:\Users\alejo\.wdm\drivers\chromedriver\win64\142.0.7444.175\chromedriver-win32/chromedriver.exe] found in cache
2025-12-09 11:55:50,730 - INFO - Cargando Alibaba: P√°gina 1
2025-12-09 11:56:39,880 - ERROR - Error extrayendo card Alibaba: 'AlibabaScraper' object has no attribute '_first_match'
2025-12-09 11:56:39,881 - ERROR - Error extrayendo card Alibaba: 'AlibabaScraper' object has no attribute '_first_match'
2025-12-09 11:56:39,882 - ERROR - Error extrayendo card Alibaba: 'AlibabaScraper' object has no attribute '_first_match'
2025-12-09 11:56:39,883 - ERROR - Error extrayendo card Alibaba: 'AlibabaScraper' object has no attribute '_first_match'
2025-12-09 11:56:39,885 - ERROR - Error extrayendo card Alibaba: 'AlibabaScraper' object has no attribute '_first_match'
2025-12-09 11

‚ùå B√∫squeda fallida o bloqueada. No hay enlaces para detalles.


2025-12-09 11:56:42,266 - INFO - WebDriver cerrado.
