# Web Scraper

Para obtener la información de las personalidades de las series vamos a hacer Web scraping sobre la página [Personality Database](https://www.personality-database.com/). Este notebook lo escribimos para explicar como hicimos la obtención de datos de las series, pero realmente lo ejecutamos en Docker usando el perfil de Docker Compose explicado en el manual de uso, decidimos hacerlo así porque al final se hace todo dentro de un bucle que contiene todo el código, por tanto no sería muy útil usar un notebook.

Primero definimos los modelos de la información que vamos a guardar en la base de datos, para ello usamos el ORM de SQLAlchemy. Entonces definimos personajes y series:

In [None]:
from sqlalchemy import (
    BigInteger,
    String,
    Boolean,
    ForeignKey
)
from sqlalchemy.orm import mapped_column, relationship, Mapped
from sqlalchemy.orm import DeclarativeBase

class Base(DeclarativeBase):
    pass

class Character(Base):
    __tablename__ = "character"
    id: Mapped[int] = mapped_column(
        BigInteger(), primary_key=True, autoincrement=True)
    name: Mapped[str] = mapped_column(String(200), nullable=False)
    image: Mapped[str] = mapped_column(String(200), nullable=False)
    personality_ie: Mapped[bool] = mapped_column(
        Boolean(), nullable=False)  # True = I, False = E
    personality_sn: Mapped[bool] = mapped_column(
        Boolean(), nullable=False)  # True = S, False = N
    personality_tf: Mapped[bool] = mapped_column(
        Boolean(), nullable=False)  # True = T, False = F
    personality_jp: Mapped[bool] = mapped_column(
        Boolean(), nullable=False)  # True = J, False = P
    # Relationship one to many with serie (this is child)
    serie_id: Mapped[int] = mapped_column(
        ForeignKey("serie.id", ondelete='SET NULL'), nullable=True)
    serie: Mapped["Serie"] = relationship(back_populates="characters")

    def __init__(self, name, image, serie, personality_str: str):
        self.name = name
        self.image = image
        self.serie = serie
        self.personality_ie = personality_str[0] == "I"
        self.personality_sn = personality_str[1] == "S"
        self.personality_tf = personality_str[2] == "T"
        self.personality_jp = personality_str[3] == "J"

class Serie(Base):
    __tablename__ = 'serie'
    id: Mapped[int] = mapped_column(
        BigInteger(), primary_key=True, autoincrement=True)
    name: Mapped[str] = mapped_column(String(200), nullable=False)
    image: Mapped[str] = mapped_column(String(200), nullable=False)
    # Relationship one to many with characters (this is parent)
    characters: Mapped[List["Character"]] = relationship(
        back_populates="serie", passive_deletes='all')

Una vez definidos los modelos del ORM podemos definir cómo se van a guardar la información obtenida de la página.

In [None]:
class ScrapedCharacter(TypedDict):
    name: str
    image: str
    personality: Literal["ISTJ", "ISFJ", "INFJ", "INTJ",
                         "ISTP", "ISFP", "INFP", "INTP",
                         "ESTP", "ESFP", "ENFP", "ENTP",
                         "ESTJ", "ESFJ", "ENFJ", "ENTJ"]

class ScrapedSerie(TypedDict):
    name: str
    image: str

def personality_is_valid(personality: str) -> bool:
    return personality in ["ISTJ", "ISFJ", "INFJ", "INTJ",
                           "ISTP", "ISFP", "INFP", "INTP",
                           "ESTP", "ESFP", "ENFP", "ENTP",
                           "ESTJ", "ESFJ", "ENFJ", "ENTJ"]

Definimos una función que guarde una serie con sus personajes en la base de datos

In [None]:
def save_scraped(session: Session, scraped_serie: ScrapedSerie,
                 scraped_characters: list[ScrapedCharacter]):
    # Create Serie
    serie = Serie(name=scraped_serie["name"], image=scraped_serie["image"])
    session.add(serie)
    # Create Characters and associate them with the serie
    print("SCRAPED CHARACTERS: ", scraped_characters)
    for x in scraped_characters:
        character = Character(
            name=x["name"], image=x["image"],
            personality_str=x["personality"], serie=serie)
        session.add(character)
    session.commit()

Se recorren los links a consultar, estos se leen de un fichero `links.txt` y se obtienen los datos de la web y se guardan en la base de datos

In [None]:
def scrape_to_db():
    # Initialize database
    engine = initialize_retry()
    session = Session(bind=engine)
    # Get series from links.txt
    with open("web_scraper/links.txt", "r") as f:
        links = f.read().splitlines()
    # Scrape each serie
    for link in links:
        scraped_serie, scraped_characters = scrape(link)
        save_scraped(session, scraped_serie, scraped_characters)

    session.close()

La función `scrape` realiza la conexión y la obtención de datos de una URL. Primero se configura el navegador a utilizar, en este caso Firefox, pero también se podría configurar Chrome, Safari o Edge. Luego declaramos una lista vacía que contenga los caracteres obtenidos y una variable que guarde los datos de la serie.

In [None]:
from selenium import webdriver
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from selenium.common.exceptions import TimeoutException
from selenium.webdriver import FirefoxOptions
from .scraped_types import ScrapedCharacter
from .scraped_types import ScrapedSerie, personality_is_valid

def scrape(url: str):
    # Use specified browser to open the url
    options = FirefoxOptions()
    options.add_argument("--headless")
    browser = webdriver.Firefox(options)
    browser.get(url)
    serie: ScrapedSerie = None
    characters: list[ScrapedCharacter] = []

Usamos un bucle infinito porque se desconoce el número de páginas que se van a recorrer, entonces se recorrerá página por página hasta encontrar una vacía (la página al terminarse en contenido muestra una página vacía).

El contenido de la página se carga mediante Javascript, entonces hay que indicar a Selenium que debe esperar un tiempo a que los elementos a buscar aparezcan.

Luego se comienza obteniendo los datos de la serie, el título y su portada.

In [None]:
 try:
    page_number = 1
    while True:
        try:
            # Get serie name
            serie_name = (
                WebDriverWait(browser, 10)
                .until(
                    EC.presence_of_element_located(
                        (By.CLASS_NAME, "community-title")
                    )
                )
                .text
            )
            # Get serie cover art image
                cover_art = (
                    WebDriverWait(browser, 10)
                    .until(
                        EC.presence_of_element_located(
                            (By.CLASS_NAME, "summary")))
                    .find_element(By.CLASS_NAME, "avatar")
                    .find_element(By.TAG_NAME, "img")
                    .get_attribute("src")
                )

Se espera a que aparezcan las tarjetas con los personajes y su personalidad y se almacenan como `ScrapedCharacter`. Además, se comprueba que la personalidad sea válida, ya que puede ser que no la tengan y la marquen como "XXXX"

In [None]:
                # Get all character profile cards
                elements = WebDriverWait(browser, 10).until(
                    EC.presence_of_all_elements_located(
                        (By.CLASS_NAME, "profile-card"))
                )
                print("number of elements: ", len(elements))
                # Iterate over all cards and get the information
                for el in elements:
                    character_name = el.find_element(
                        By.CLASS_NAME, "info-name").text
                    character_avatar = el.find_element(
                        By.TAG_NAME, "img").get_attribute("src")
                    character_personality = el.find_element(
                        By.CLASS_NAME, "personality").text
                    # print("character: ", character_name,
                    #      " | avatar: ", character_avatar,
                    #      " | personality: ", character_personality)
                    if personality_is_valid(character_personality):
                        characters.append(ScrapedCharacter(
                            name=character_name, image=character_avatar,
                            personality=character_personality))


Para pasar de página se obtienen los enlaces de los números de página y se busca el que tenga el número siguiente a la página actual, se hace click sobre él, se incrementa el contador de página, se espera que desaparezcan las tarjetas actuales y a que carguen las nuevas. Entonces se sale del bucle sobre los números de página y se va a la siguiente iteración del bucle principal. Este bucle seguirá hasta que al pasar a la siguiente página se produzca una excepción por esperar demasiado tiempo a que aparezcan las nuevas tarjetas.

In [None]:
  # Get pagination elements
                pelements = WebDriverWait(browser, 10).until(
                    EC.presence_of_all_elements_located(
                        (By.CLASS_NAME, "rc-pagination-item")
                    )
                )
                print("pagination elements: ", len(elements))
                # Iterate over all pages and click on the next one if there's any
                for el in pelements:
                    print("page number checking: ", el.text)
                    if page_number + 1 == int(el.text):
                        el.click()
                        page_number += 1
                        print("waiting for staleness")
                        WebDriverWait(browser, 30).until(
                            EC.staleness_of(elements[0])
                        )
                        print("waiting for clickable")
                        WebDriverWait(browser, 10).until(
                            EC.element_to_be_clickable(
                                (By.CLASS_NAME, "profile-container")
                            )
                        )
                        print(50 * "%")
                        print("page: " + el.text)
                        break
            except TimeoutException as e:
                print("No more entries: " + e.strerror)
                print(e.with_traceback())
                break

Una vez finalizado el bucle principal se cierra el navegador utilizado y se devuelve una tupla con los datos de la serie y la lista de personajes.

In [None]:
    finally:
        print("Scrape reached end")
        browser.quit()
        print("serie: ", serie)
        print("characters: ", characters)
        return (serie, characters)