### For reference

- [Python Requests](https://requests.readthedocs.io/en/latest/)
- [Beautiful Soup](https://www.crummy.com/software/BeautifulSoup/bs4/doc/)
- [SQLAlchemy](https://docs.sqlalchemy.org/en/20/index.html)
- [Selenium](https://www.selenium.dev/), [selenium-python](https://selenium-python.readthedocs.io/)
- [Bachelor Cast](https://abc.com/shows/the-bachelor/cast)

In [None]:
MAIN_URL = "https://abc.com/shows/the-bachelor/cast"

### What do we want?

#### Prepare data structure for results

In [None]:
# Use dictionary or dataclass here (don't overthink it)
# Choose a data structure according to what you'll do with the data
from dataclasses import dataclass
from typing import Literal, Optional
from pathlib import Path

@dataclass
class CastMember:
    name: str
    age: int
    occupation: str
    bio: str
    hometown: str
    state: str
    country: str = "USA"
    role: Literal["contestant", "bachelor", "host"] = "contestant"  # If we want the whole cast
    photo: Optional[str] = None  # url
    page: str = MAIN_URL  # web page on ABC's website

# Where we'll store pictures
IMG_DIR = Path("./img")
IMG_DIR.mkdir(exist_ok=True)

#### Convenience utlities (for later)

In [None]:
import shutil
from dataclasses import asdict
from pprint import pprint

import requests


def print_cast_member(member):
    pprint(asdict(member), sort_dicts=False)


# Simple wrapper around requests.get() to print http status code
def get_url(*args, **kwargs) -> requests.Response:
    response = requests.get(*args, **kwargs)
    
    print(response.request.method, response.request.url)
    print(response.status_code, response.reason)
    
    return response


# If we want to download images
def download_file(url: str, dest: str):
    with requests.get(url, stream=True) as response:
        with Path(dest).open(mode="wb") as f:
            shutil.copyfileobj(response.raw, f)

### Inspect page manually
* One div element with class `tilegroup--castlist`
* For each cast member an anchor element with class `tile--person`
* Watch the scope of variables in your cells. Try to use unique global names to avoid surprises.

In [None]:
from bs4 import BeautifulSoup

# Make remote request. Can also be POST with parameters
main_response = get_url(MAIN_URL)

# Parse content of request response (page)
main_soup = BeautifulSoup(main_response.content, "html5lib")

### Find our target elements (nodes in the DOM tree)

#### Find cast element

In [None]:
# Get main cast element
cast_list = main_soup.find("div", class_="tilegroup--castlist")

In [None]:
# Confirm there's only one such element
len(main_soup.find_all("div", class_="tilegroup--castlist"))

#### Find cast member elements

In [None]:
children = list(cast_list.children)
len(children)

In [None]:
persons = main_soup.find_all("a", class_="tile--person")

In [None]:
len(persons)

In [None]:
[person["aria-label"] for person in persons]

# Of course can also print each item individually
# for person in persons:
#     print(person["aria-label"])

#### Why only 15??

#### Iterate over paginated results

In [None]:
URLS = [f"{MAIN_URL}?page={i}" for i in range(1, 4)]
URLS

In [None]:
def get_person_elements(url):
    response = requests.get(url)
    soup = BeautifulSoup(response.content, "html5lib")

    return soup.find_all("a", class_="tile--person")

In [None]:
persons = [person for url in URLS for person in get_person_elements(url)]

In [None]:
# Confirm results
[person["aria-label"] for person in persons]

In [None]:
len(persons)

### Retrieve cast member bios

In [None]:
# Examine anchor href attributes
links = [person["href"] for person in persons]
links

In [None]:
# Links are root-relative urls
# Extract root domain
import tldextract  # overkill here but good to know

base_url = "https://" + tldextract.extract(MAIN_URL).fqdn

In [None]:
links = [base_url + person["href"] for person in persons]
links

In [None]:
# Request and parse utility
def get_soup(url):
    return BeautifulSoup(get_url(url).content, "html5lib")

### Start with bachelor and host

#### Bachelor

#### Inspect page manually
* Stuff we want is inside `<section class="actor">...</section>`
* In practice you could just copy-paste here (one-off)

In [None]:
# Get our page
bachelor_soup = get_soup(links[0])

In [None]:
# Narrow down to our desired section
bachelor_section = bachelor_soup.find("section", class_="actor")

#### Get the name

In [None]:
bachelor_name = bachelor_section.find("div", class_="actor__name").string
bachelor_name

In [None]:
bachelor_section.find("div", class_="actor__bio")

#### Get the bio

In [None]:
# Remove markup
bachelor_bio = bachelor_section.find("div", class_="actor__bio").get_text()
print(bachelor_bio)

#### Get the picture

In [None]:
bachelor_section.find("picture").img

In [None]:
encoded_image = bachelor_section.find("picture").img['src'].split('base64,')[1]
encoded_image

#### What is this?...

In [None]:
from base64 import b64decode

# Local file path
path = IMG_DIR / f"{bachelor_name}.gif"

path.write_bytes(b64decode(encoded_image))

In [None]:
# Nevermind, delete
path.unlink(missing_ok=True)

In [None]:
# Make new cast member, fill missing bits from bio
bachelor = CastMember(
    name=bachelor_name,
    age=26,
    occupation="Tech Executive",
    bio=bachelor_bio.strip(),  # stripping could be done in __post_init__() if we wanted to be fancy
    hometown="Anaheim Hills",
    state="California",
    role="bachelor",
    page=links[0]
)

In [None]:
# Convenient access to attributes, autocomplete
bachelor.name, bachelor.age

In [None]:
# Easy to convert to dictionary
from dataclasses import asdict

asdict(bachelor)

#### Same with host

In [None]:
host_section = get_soup(links[1]).find("section", class_="actor")
host_name = host_section.find("div", class_="actor__name").string
host_bio = host_section.find("div", class_="actor__bio").get_text()

# https://en.wikipedia.org/wiki/Jesse_Palmer
host = CastMember(
    name=host_name,
    age=44,
    occupation="Television Personality",
    bio=host_bio.strip(),
    hometown="Toronto",
    state="Ontario",
    country="Canada",
    role="host",
    page=links[1]
)

### The contestants

#### Try the first one manually

In [None]:
# generic global names for this exercise, would clean up IRL
section = get_soup(links[2]).find("section", class_="actor")

In [None]:
name = section.find("div", class_="actor__name").string
name

In [None]:
details = section.find("div", class_="actor__description").contents[0]
details

In [None]:
details.contents

In [None]:
age, _, occupation, _, home = details.contents

In [None]:
hometown, state = home.split(",")

In [None]:
bio = section.find("div", class_="actor__bio").get_text()

In [None]:
aly = CastMember(
    name=name,
    age=int(age),
    occupation=occupation.strip(),
    bio=bio.strip(),
    hometown=hometown.strip(),
    state=state.strip(),
    page=links[2]
)

In [None]:
print_cast_member(aly)

#### Iterate to get everyone

In [None]:
# Prepare the list
CAST = [bachelor, host]

In [None]:
for link in links[2:]:
    # Copy paste from above
    section = get_soup(link).find("section", class_="actor")
    name = section.find("div", class_="actor__name").string
    details = section.find("div", class_="actor__description").contents[0]
    bio = section.find("div", class_="actor__bio").get_text()
    
    age, _, occupation, _, home = details.contents
    
    hometown, state = home.split(",")
    
    CAST.append(
        CastMember(
            name=name.strip(),
            age=int(age),
            occupation=occupation.strip(),
            bio=bio.strip(),
            hometown=hometown.strip(),
            state=state.strip(),
            page=link
        )
    )

In [None]:
len(CAST)

### Check our results

In [None]:
# Table formatting is just for looks
from prettytable import PrettyTable
table = PrettyTable()

table.field_names = ["Name", "Age", "Hometown"]
table.add_rows([[member.name, member.age, f"{member.hometown}, {member.state}"] for member in CAST])

print(table)

### Now try to get the pics

#### Set up selenium

In [None]:
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service as ChromeService
from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager

#### Try it on Zach first

- https://selenium-python.readthedocs.io/waits.html
- https://selenium-python.readthedocs.io/locating-elements.html#locating-by-xpath

In [None]:
# Zach's page
url = CAST[0].page

# Options for our browser
options = Options()
options.add_argument("--incognito")
options.add_argument("--headless")

# Create a web driver
driver = webdriver.Chrome(service=ChromeService(ChromeDriverManager().install()), options=options)

# Get Zach's page
driver.get(url)
print(driver.title)

# How we locate our image element on the page
xpath = "//picture/img[@alt='Zach Shallcross']"

# Wait for our image element to be updated by JS
element = WebDriverWait(driver, 5).until(
    EC.text_to_be_present_in_element_attribute((By.XPATH, xpath), "src", ".jpg")
)

# KISS version of this could be:
# import time
# time.sleep(5)  # JS runs while python sleeps
# pic = driver.find_element(By.TAG_NAME, "picture")
# img = pic.find_element(By.TAG_NAME, "img")

# Access our image and confirm source
img = driver.find_element(By.XPATH, xpath)
print(img.get_attribute("src"))

# Close browser session
driver.quit()

#### Now for everyone

In [None]:
# Take our code above and put it in a function
def get_cast_member_photo(member: CastMember) -> str:
    try:
        # Options for our browser
        options = Options()
        options.add_argument("--incognito")
        options.add_argument("--headless")

        # Create a web driver
        driver = webdriver.Chrome(service=ChromeService(ChromeDriverManager().install()), options=options)

        # Get the page
        driver.get(member.page)

        # How we locate our image element on the page
        xpath = f"//picture/img[@alt='{member.name}']"

        # Wait for our image element to be updated by JS
        element = WebDriverWait(driver, 5).until(
            EC.text_to_be_present_in_element_attribute((By.XPATH, xpath), "src", ".jpg")
        )

        # Access our image
        img = driver.find_element(By.XPATH, xpath)
        img_link = img.get_attribute("src")

        # Where we'll store our image locally
        local_path = f"./img/{member.name}.jpg"

        # Download the image
        download_file(img_link, local_path)
        
        # Update cast member
        member.photo = local_path

        # Show success 
        print(f"Done extracting photo for {member.name}: {img_link}")

    except:
        # Show error
        print(f"Unable to extract photo for {member.name}")
        raise

    finally:
        # Close browser session
        driver.quit()

#### Parallel I/O
- https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures

In [None]:
import os

# Limited resources on mybinder
MAX_THREADS = 1 if os.getenv('BINDER_LAUNCH_HOST') == 'https://mybinder.org/' else None

In [None]:
from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
    for member in CAST:
        print(f"submitting job for {member.name}")
        
        executor.submit(get_cast_member_photo, member)

#### Why errors??

- Inspect pages with errors...
- Looks like we need to tweak our xpath expression
- https://devhints.io/xpath#string-functions

#### Try again with our updated code

In [None]:
# Modify our function slightly (only the xpath line)
def get_cast_member_photo(member: CastMember) -> str:
    try:
        # Options for our browser
        options = Options()
        options.add_argument("--incognito")
        options.add_argument("--headless")

        # Create a web driver
        driver = webdriver.Chrome(service=ChromeService(ChromeDriverManager().install()), options=options)

        # Get the page
        driver.get(member.page)

        # How we locate our image element on the page
        xpath = f"//picture/img[starts-with(@alt, '{member.name}')]"

        # Wait for our image element to be updated by JS
        element = WebDriverWait(driver, 5).until(
            EC.text_to_be_present_in_element_attribute((By.XPATH, xpath), "src", ".jpg")
        )

        # Access our image
        img = driver.find_element(By.XPATH, xpath)
        img_link = img.get_attribute("src")

        # Where we'll store our image locally
        local_path = f"./img/{member.name}.jpg"

        # Download the image
        download_file(img_link, local_path)
        
        # Update cast member
        member.photo = local_path

        # Show success 
        print(f"Done extracting photo for {member.name}: {img_link}")

    except:
        # Show error
        print(f"Unable to extract photo for {member.name}")
        raise

    finally:
        # Close browser session
        driver.quit()

In [None]:
with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
    for member in CAST:
        # Only those with missing photo
        if not member.photo:
            print(f"submitting job for {member.name}")

            executor.submit(get_cast_member_photo, member)

#### Display our results

In [None]:
# HTML rendering is completely optional
from IPython.display import display_html

styling = """
<style>
.container {
    display: grid;
    grid-template-columns: repeat(4, 1fr);
    gap: 40px 20px;
}
.info {
    grid-column: 1;
    display: flex;
    flex-direction: column;
    align-items: center;
}
.bio {
    grid-column: 2/5;
}
.name {
    font-weight: bold;
}
</style>
"""

rows = [
    '''
    <div class="info">
        <img src="{photo}" />
        <div class="name">{name}</div>
        <div>{age}</div>
        <div>{hometown}, {state}</div>
    </div>
    <div class="bio">
        {bio}
    </div>
    '''.format(**asdict(member)) for member in CAST]


html = styling + '''<div class="container">''' + "".join(rows) + "</div>"

display_html(html, raw=True)

### Store results in database

In [None]:
from pathlib import Path

db_filename = "bachelor.sqlite3"

Path(db_filename).unlink(missing_ok=True)

In [None]:
from sqlalchemy import create_engine
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, Session


engine = create_engine(f"sqlite:///{db_filename}", echo=False)


class Base(DeclarativeBase):
    pass

In [None]:
# Declarative style similar to dataclass
class Member(Base):
    __tablename__ = "cast_member"
    
    id: Mapped[int] = mapped_column(primary_key=True)

    name: Mapped[str]
    age: Mapped[int]
    occupation: Mapped[str]
    bio: Mapped[str]
    hometown: Mapped[str]
    state: Mapped[str]
    country: Mapped[str] = mapped_column(default="USA")  # defaults are not necessary here since we've defined them in our dataclass
    role: Mapped[str] = mapped_column(default="contestant")
    photo: Mapped[Optional[str]]
    page: Mapped[str]

In [None]:
Base.metadata.create_all(engine)

In [None]:
with Session(engine) as session:
    session.add_all([Member(**asdict(member)) for member in CAST])
    
    session.commit()

#### We can now query our DB

In [None]:
!sqlite3 {db_filename} ".tables"

In [None]:
# Contestants' names
!sqlite3 {db_filename} "select name from cast_member where role = 'contestant'"

In [None]:
# Anyone over 30?
!sqlite3 {db_filename} "select name, age from cast_member where role = 'contestant' and age >= 30"

In [None]:
# Where is everyone from?
!sqlite3 {db_filename} "select state, count(id) as total from cast_member where role = 'contestant' group by state order by total desc"

#### Or we can use the ORM

In [None]:
from sqlalchemy import select, func, desc, and_

# Create a query
query = select(Member).where(
    and_(
        Member.age >= 30,
        Member.role == "contestant"
    )
)

# Printing it shows the SQL
print(query)

In [None]:
# Run the query (note that we use scalars here, see https://blog.miguelgrinberg.com/post/what-s-new-in-sqlalchemy-2-0)
for member in session.scalars(query):
    print(member.name, member.age)

In [None]:
# Home states query (extra parentheses for multiline statement)
query = (
    select(
        Member.state, func.count(Member.id).label("total"))
            .where(Member.role == "contestant")
            .group_by(Member.state)
            .order_by(desc("total"))
)

print(query)

In [None]:
# Run our query (results are tuples)
for result in session.execute(query):
    print(result)