In [1]:
"""
PROYECTO FINAL BASE- Automatización de Ingesta y Procesamiento de Datos
Angel T. Jaramillo Sulca
"""

'\nPROYECTO FINAL BASE- Automatización de Ingesta y Procesamiento de Datos\nAngel T. Jaramillo Sulca\n'

In [None]:
# =====================================================
# 0. INSTALACIÓN (Selenium)
# =====================================================
"""
!wget -q https://dl.google.com/linux/direct/google-chrome-stable_current_amd64.deb
!apt install -y ./google-chrome-stable_current_amd64.deb
!pip install selenium

print('==> Setting up environment - Done')
"""

"\n!wget -q https://dl.google.com/linux/direct/google-chrome-stable_current_amd64.deb\n!apt install -y ./google-chrome-stable_current_amd64.deb\n!pip install selenium\n\nprint('==> Setting up environment - Done')\n"

In [3]:
# =====================================================
# 1. IMPORTACIONES
# =====================================================
import os
import shutil
import re
import time
import pandas as pd
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from sqlalchemy import create_engine, Column, Integer, String, Float, text
from sqlalchemy.orm import declarative_base, sessionmaker
from concurrent.futures import ThreadPoolExecutor, as_completed

# Setup

In [None]:
# =====================================================
# 2. CONFIGURACIÓN INICIAL
# =====================================================
PATH_OUTPUTS = "outputs"
if os.path.exists(PATH_OUTPUTS):
  shutil.rmtree(PATH_OUTPUTS)
os.makedirs(PATH_OUTPUTS, exist_ok=True)

DB_DIR = "database"
if os.path.exists(DB_DIR):
  shutil.rmtree(DB_DIR)
os.makedirs(DB_DIR, exist_ok=True)

In [None]:
base_url = "https://books.toscrape.com/"
book_type = "Fantasy"
timeout = 20

In [None]:
# =====================================================
# 3. FUNCIONES DE NAVEGACIÓN Y SCRAPING
# =====================================================

def get_browser():
  """Crea y retorna un navegador en modo headless."""
  chrome_options = Options()

  chrome_options.add_argument("--headless")
  chrome_options.add_argument("--no-sandbox")
  chrome_options.add_argument("--disable-dev-shm-usage")

  browser = webdriver.Chrome(options=chrome_options)
  browser.set_page_load_timeout(timeout)

  return browser

In [None]:
def close_browser(browser):
    """Cierra el navegador de forma segura."""
    if browser:
        try:
            browser.quit()
        except Exception:
            pass

In [None]:
def go_to_book_type(browser, base_url, book_type, timeout):
    """
    Navega al género indicado y retorna la URL de la categoría.
    Usa WebDriverWait + XPath para localizar el enlace.
    """
    # 1) Ir al home
    browser.get(base_url)

    # 2) Esperar a que aparezca el sidebar de categorías
    wait = WebDriverWait(browser, timeout)
    sidebar = wait.until(
        EC.presence_of_element_located((By.XPATH, "//div[@class='side_categories']"))
    )

    # 3) Buscar el link por texto exacto del género
    category_link = sidebar.find_element(
        By.XPATH, f".//a[normalize-space(text())='{book_type}']"
    )

    # 4) Navegar a la URL de la categoría
    category_href = category_link.get_attribute("href")
    browser.get(category_href)

    # 5) Validar que cargó el heading de la categoría
    wait.until(
        EC.presence_of_element_located((By.XPATH, "//div[@class='page_inner']//h1"))
    )

    return browser.current_url


In [None]:
# =====================================================
# 4. FUNCIONES DE SCRAPING
# =====================================================
def collect_books_from_page(browser, timeout):
    """
    Extrae los datos de todos los libros en la página actual.
    Retorna una lista de diccionarios con:
    - book_name
    - book_price
    - book_link
    - book_description (si existe, caso contrario None)
    """
    wait = WebDriverWait(browser, timeout)

    # Esperar a que aparezcan los libros en la página
    books = wait.until(
        EC.presence_of_all_elements_located((By.XPATH, '//ol[@class="row"]/li'))
    )

    results = []
    for book in books:
        # Nombre del libro
        name_el = book.find_element(By.XPATH, ".//h3/a")
        book_name = name_el.get_attribute("title")

        # Link al detalle del libro
        book_link = name_el.get_attribute("href")

        # Precio
        price_el = book.find_element(By.XPATH, ".//p[@class='price_color']")
        book_price = price_el.text.strip()  # Ej: "£51.77"

        # Descripción: hay que entrar al detalle del libro
        # Abrimos el link en una nueva pestaña para extraer la descripción
        browser.execute_script("window.open(arguments[0]);", book_link)
        browser.switch_to.window(browser.window_handles[-1])

        try:
            desc_el = wait.until(
                EC.presence_of_element_located((By.XPATH, "//div[@id='product_description']/following-sibling::p"))
            )
            book_description = desc_el.text.strip()
        except Exception:
            book_description = None

        # Cerrar pestaña y volver a la principal
        browser.close()
        browser.switch_to.window(browser.window_handles[0])

        # Guardar resultado
        results.append({
            "book_name": book_name,
            "book_price": book_price,
            "book_link": book_link,
            "book_description": book_description
        })

    return results


In [None]:
def collect_all_books(browser, timeout):
    """
    Itera todas las páginas del género y acumula los libros encontrados.
    Retorna una lista de diccionarios con datos de todos los libros de todas las páginas.
    """
    wait = WebDriverWait(browser, timeout)
    all_books = []

    while True:
        # 1) Extraer libros de la página actual
        books_in_page = collect_books_from_page(browser, timeout)
        all_books.extend(books_in_page)

        # 2) Verificar si existe botón 'next'
        try:
            next_btn = wait.until(
                EC.presence_of_element_located((By.XPATH, '//li[@class="next"]/a'))
            )
            next_href = next_btn.get_attribute("href")

            # 3) Navegar a la siguiente página
            browser.get(next_href)

            # 4) Esperar a que carguen los libros de la nueva página
            wait.until(
                EC.presence_of_all_elements_located((By.XPATH, '//ol[@class="row"]/li'))
            )

        except Exception:
            # Si no existe botón 'next', hemos llegado al final
            break

    return all_books


In [None]:
# =====================================================
# 5. REGLAS DE NEGOCIO
# =====================================================
def apply_business_rules(book):
    """
    Aplica las reglas de negocio:
      - Separar nombre y detalle entre paréntesis
      - Eliminar el texto entre paréntesis del nombre
      - Convertir el precio a número (float)
    Retorna el mismo diccionario con los campos limpios.
    """
    # --- Separar nombre y detalle ---
    title = book.get("book_name", "")
    detail = None

    match = re.search(r"\((.*?)\)", title)
    if match:
        detail = match.group(1).strip()
        # Eliminar el texto entre paréntesis del nombre
        title = re.sub(r"\(.*?\)", "", title).strip()

    # --- Convertir precio ---
    price_str = book.get("book_price", "")
    # Remover símbolo de moneda y convertir a float
    price_clean = re.sub(r"[^\d\.]", "", price_str)
    try:
        price_num = float(price_clean)
    except ValueError:
        price_num = None

    # --- Actualizar diccionario ---
    book["book_name"] = title
    book["book_detail"] = detail
    book["book_price"] = price_num

    return book


# Definición de modelo ORM (SQLAlchemy)

In [None]:
# =====================================================
# 6. CONFIGURACIÓN DE BASE DE DATOS
# =====================================================
Base = declarative_base()
DB_FILE = os.path.join(DB_DIR, "books.db")
DB_URL = f"sqlite:///{DB_FILE}"

# check_same_thread=False para permitir múltiples threads en la demo (NO recomendado en alta carga)
engine = create_engine(DB_URL, connect_args={"check_same_thread": False}, echo=False, future=True)
# Mejorar concurrencia de lectura en SQLite (WAL)
with engine.connect() as conn:
  conn.execute(text("PRAGMA journal_mode=WAL;"))

SessionLocal = sessionmaker(bind=engine, expire_on_commit=False)

In [None]:
class Book(Base):
    __tablename__ = 'books_for_sale'

    # Columnas según reglas de negocio
    book_code   = Column(Integer, primary_key=True, autoincrement=True)
    book_name   = Column(String(255), nullable=False)
    book_detail = Column(String(255), nullable=True)
    book_price  = Column(Float, nullable=False)

    def __repr__(self):
        return f"<Book(code={self.book_code}, name='{self.book_name}', price={self.book_price})>"

def create_tables():
    """Crea las tablas en la base de datos."""
    Base.metadata.create_all(bind=engine)

In [None]:
def insert_book_session(book_dict):
    """
    Inserta un libro usando una sesión SQLAlchemy.
    hint: usa s = SessionLocal(), luego s.add(), s.commit(), y s.close()
    """
    s = SessionLocal()
    try:
        # Crear objeto Book a partir del diccionario
        book = Book(
            book_name=book_dict.get("book_name"),
            book_detail=book_dict.get("book_detail"),
            book_price=book_dict.get("book_price")
        )
        s.add(book)
        s.commit()
        print(f"✅ Libro insertado: {book.book_name} | Precio: {book.book_price}")
    except Exception as e:
        s.rollback()
        print(f"⚠️ Error al insertar libro: {e}")
    finally:
        s.close()

In [None]:
def apply_business_rules(book_dict):
    """
    Aplica reglas de negocio:
    - Separar nombre y detalle (paréntesis)
    - Convertir precio a float
    """
    name = book_dict.get("book_name", "")
    detail = None

    # Si hay paréntesis en el nombre
    match = re.match(r"^(.*?)\s*\((.*?)\)$", name)
    if match:
        name = match.group(1).strip()
        detail = match.group(2).strip()

    # Convertir precio a float
    price_raw = book_dict.get("book_price", "")
    try:
        # Ejemplo: "£23.88" → 23.88
        price = float(re.sub(r"[^\d.]", "", str(price_raw)))
    except:
        price = 0.0

    return {
        "book_name": name,
        "book_detail": detail,
        "book_price": price
    }

In [None]:
# =====================================================
# 7. CONCURRENCIA
# =====================================================
def insert_books_concurrent(list_books, fetch_workers=20, insert_workers=3):
    """
    Inserta libros concurrentemente:
    - FASE 1: Aplica reglas de negocio con varios hilos
    - FASE 2: Inserta datos en BD con pocos hilos
    """
    create_tables()

    # Limpia la tabla antes de insertar
    s0 = SessionLocal()
    try:
        s0.query(Book).delete()
        s0.commit()
    finally:
        s0.close()

    t0 = time.perf_counter()
    payloads = []

    # -------------------------------
    # FASE 1: aplicar reglas de negocio concurrentemente
    # -------------------------------
    with ThreadPoolExecutor(max_workers=fetch_workers) as executor:
        futures = [executor.submit(apply_business_rules, book) for book in list_books]
        for future in as_completed(futures):
            try:
                payloads.append(future.result())
            except Exception as e:
                print(f"⚠️ Error en reglas de negocio: {e}")

    # -------------------------------
    # FASE 2: insertar en BD concurrentemente
    # -------------------------------
    inserted = 0
    def insert_payload(payload):
        s = SessionLocal()
        try:
            book = Book(
                book_name=payload["book_name"],
                book_detail=payload["book_detail"],
                book_price=payload["book_price"]
            )
            s.add(book)
            s.commit()
            return 1
        except Exception as e:
            s.rollback()
            print(f"⚠️ Error al insertar: {e}")
            return 0
        finally:
            s.close()

    with ThreadPoolExecutor(max_workers=insert_workers) as executor:
        futures = [executor.submit(insert_payload, p) for p in payloads]
        for future in as_completed(futures):
            inserted += future.result()

    # -------------------------------
    # FASE 3: métricas
    # -------------------------------
    t1 = time.perf_counter()
    return {
        "total_requested": len(list_books),
        "inserted": inserted,
        "time_s": t1 - t0
    }

In [None]:
# =====================================================
# 8. PIPELINE PRINCIPAL
# =====================================================
def run_pipeline():
    print("Iniciando proceso...")
    browser = get_browser()
    try:
        # Navegar al género correcto
        current_url = go_to_book_type(browser, base_url, book_type, timeout)
        print(f"Navegando al género {book_type}... URL: {current_url}")

        # Extraer todos los libros del género
        books = collect_all_books(browser, timeout=timeout)
        print(f"{len(books)} libros extraídos")

        # Insertar concurrentemente en la BD (aplica reglas de negocio dentro)
        stats = insert_books_concurrent(books, fetch_workers=20, insert_workers=3)
        print(
            f"{stats['inserted']} libros insertados correctamente "
            f"de {stats['total_requested']} en {stats['time_s']:.2f}s"
        )

    except Exception as e:
        print(f"⚠️ Error en pipeline: {e}")
    finally:
        # Cerrar navegador siempre
        close_browser(browser)
        print("Navegador cerrado correctamente")

In [None]:
# =====================================================
# 9. VISUALIZAR RESULTADOS
# =====================================================

def show_books():
    """Muestra el contenido de la tabla final."""
    df = pd.read_sql("SELECT * FROM books_for_sale", con=engine)
    print(f"Total en BD: {len(df)}")
    display(df)

In [None]:
# =====================================================
# 10. MAIN
# =====================================================
if __name__ == "__main__":
    run_pipeline()
    show_books()

Iniciando proceso...
Navegando al género Fantasy... URL: https://books.toscrape.com/catalogue/category/books/fantasy_19/index.html
48 libros extraídos
48 libros insertados correctamente de 48 en 2.12s
Navegador cerrado correctamente
Total en BD: 48


Unnamed: 0,book_code,book_name,book_detail,book_price
0,1,King's Folly,The Kinsman Chronicles #1,39.61
1,2,Paper and Fire,The Great Library #2,49.45
2,3,Myriad,Prentor #1,58.75
3,4,The Rose & the Dagger,The Wrath and the Dawn #2,58.64
4,5,A Gathering of Shadows,Shades of Magic #2,44.81
5,6,The Glittering Court,The Glittering Court #1,44.28
6,7,Tell the Wind and Fire,,45.51
7,8,One Second,Seven #7,52.94
8,9,The Beast,Black Dagger Brotherhood #14,46.08
9,10,The Demonists,Demonist #1,52.11
