# Chapter 11 Lab

## Setup

In [None]:
# Load API keys from .env file
from dotenv import load_dotenv
load_dotenv()

print("✓ Environment variables loaded")
print("Ready to run lab exercises")

### Question 1: Load curated products\n\nLoad the curated backpack list that has been pre-tested for reliable extraction.

In [None]:
import pandas as pd

print("="*60)
print("QUESTION 1: Load Curated Products")
print("="*60)

# Load the curated backpack list (pre-tested for reliable extraction)
df = pd.read_csv("../data/curated_backpacks.csv")

print(f"\nLoaded {len(df)} products from {df['Brand Name'].nunique()} brands\n")
display(df)
print(f"\n✓ Product selection complete")

### Question 2: URL Discovery

Use SerpAPI to search for product URLs (Listing 11.1) and AI ranking (Listing 11.2) to select the best URL for each product.

In [None]:
import os
import time
import requests
import openai
from pydantic import BaseModel

print("="*60)
print("QUESTION 2: URL Discovery & Ranking")
print("="*60)

# Cleaning functions from guide
REMOVE_TAGS = ["script", "style", "nav", "footer", "header", "iframe", "noscript", "svg", "form"]
REMOVE_CLASSES = ["breadcrumb", "related-products", "recently-viewed", "newsletter", "cookie-banner", "site-footer", "site-header", "cart-drawer", "search-modal", "review", "reviews", "ratings"]

def clean_html_aggressive(html: str) -> str:
    soup = BeautifulSoup(html, "html.parser")
    for tag in REMOVE_TAGS:
        for el in soup.find_all(tag):
            el.decompose()
    for class_pattern in REMOVE_CLASSES:
        for el in soup.find_all(class_=lambda c: c and class_pattern in " ".join(c).lower()):
            el.decompose()
    for el in soup.find_all():
        if not el.get_text(strip=True) and not el.find("img"):
            el.decompose()
    return " ".join(soup.stripped_strings)

SERPAPI_KEY = os.getenv("SERPAPI_KEY")

def search_product_url(brand: str, product: str) -> str:
    """Search for a product URL using SerpAPI."""
    search_key = f"{brand} {product}"
    params = {"q": search_key, "api_key": SERPAPI_KEY, "num": 5, "engine": "google"}
    resp = requests.get("https://serpapi.com/search", params=params)
    resp.raise_for_status()
    data = resp.json()
    
    # Return first result URL (AI ranking can be added for production)
    return results[0].get("link", "") if results else ""

print(f"\nFetching & cleaning HTML for {len(df)} products...\n")

html_data = []
for _, row in df.iterrows():
    brand = row["Brand Name"]
    product = row["Product Name"]
    
    try:
        print(f"  • {brand} {product}...")
        
        # Search for product URL
        url = search_product_url(brand, product)
        if not url:
            html_data.append({
                "product_name": product,
                "brand_name": brand,
                "status": "no_url",
                "cleaned_text": "",
                "url_used": "",
            })
            continue
        
        # Fetch and clean
        raw_html = requests.get(url, timeout=10, headers={"User-Agent": "Mozilla/5.0"}).text
        cleaned = clean_html_aggressive(raw_html)
        
        html_data.append({
            "product_name": product,
            "brand_name": brand,
            "raw_chars": len(raw_html),
            "cleaned_chars": len(cleaned),
            "reduction_pct": round((1 - len(cleaned) / len(raw_html)) * 100, 1),
            "cleaned_text": cleaned,
            "status": "fetched" if len(cleaned) > 500 else "low_content",
            "url_used": url,
        })
    return candidates

# Listing 11.2: Rank URLs with AI
class URLRanking(BaseModel):  #E
    best_url: str
    confidence: str
    reasoning: str

def rank_urls_with_ai(search_key: str, candidates: list[dict], model: str = "gpt-4o-mini") -> URLRanking:  #F
    """Use an LLM to pick the best product page from search results."""
    candidate_text = ""
    for c in candidates:
        candidate_text += (
            f"Position {c['position']}:\n"
            f"  Title: {c['title']}\n"
            f"  URL: {c['url']}\n"
            f"  Snippet: {c['snippet']}\n\n"
        )
    
    system_prompt = """You are a data engineering assistant helping build a product database.
Given a product search key and a list of candidate URLs from search results,
pick the single best URL for extracting structured product data.

Prefer:
1. Manufacturer or official brand pages
2. Major retailer pages (REI, Backcountry, Moosejaw) if manufacturer page unavailable
3. Pages likely to contain: product name, price, description, weight, images
4. Individual product pages over category or listing pages

Avoid:
- Review sites, forums, Reddit threads
- Category pages that list multiple products

Return the best URL, your confidence level, and a brief explanation."""
    
    user_prompt = f"Product: {search_key}\n\nCandidate URLs:\n{candidate_text}"
    
    response = openai.beta.chat.completions.parse(
        model=model,
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt},
        ],
        response_format=URLRanking,
    )
    return response.choices[0].message.parsed

# Process each product
print(f"\nDiscovering URLs for {len(df)} products...\n")

url_data = []  #G
for _, row in df.iterrows():
    search_key = f"{row['Brand Name']} {row['Product Name']}"
    print(f"  • {search_key}...")
    
    try:
        # Step 1: Search for URLs
        candidates = search_product_urls(search_key)
        
        # Step 2: Rank URLs with AI
        if candidates:
            ranking = rank_urls_with_ai(search_key, candidates)
            url_data.append({
                "brand_name": row['Brand Name'],
                "product_name": row['Product Name'],
                "search_key": search_key,
                "url": ranking.best_url,
                "confidence": ranking.confidence,
                "status": "success"
            })
        else:
            url_data.append({
                "brand_name": row['Brand Name'],
                "product_name": row['Product Name'],
                "search_key": search_key,
                "url": None,
                "confidence": None,
                "status": "no_results"
            })
    except Exception as e:
        print(f"    Error: {type(e).__name__}")
        html_data.append({
            "product_name": product,
            "brand_name": brand,
            "status": f"error: {type(e).__name__}",
            "cleaned_text": "",
            "url_used": "",
        })
    
    time.sleep(1)  # Rate limiting

html_df = pd.DataFrame(html_data)
print(f"\nHTML Fetching & Cleaning Results:\n")
display(html_df[["product_name", "brand_name", "status", "cleaned_chars", "reduction_pct"]].head(10))

success_count = (df_urls['status'] == 'success').sum()
print(f"\n✓ {success_count}/{len(df_urls)} URLs discovered successfully")

### Question 3: HTML Cleaning & AI Extraction

Fetch HTML, apply aggressive cleaning (Listing 11.3), and extract product data with AI (Listing 11.6).

In [None]:
from bs4 import BeautifulSoup
from pydantic import Field
from typing import Optional

print("="*60)
print("QUESTION 3: HTML Cleaning & AI Extraction")
print("="*60)

# Listing 11.3: Aggressive HTML cleaning
REMOVE_TAGS = ["script", "style", "nav", "footer", "header", "iframe", "noscript", "svg", "form"]  #I
REMOVE_CLASSES = ["breadcrumb", "related-products", "recently-viewed", "newsletter", "cookie-banner", "site-footer", "site-header", "cart-drawer", "search-modal", "review", "reviews", "ratings"]

def clean_html_aggressive(html: str) -> str:  #J
    """Remove non-product HTML elements to reduce noise and token count."""
    soup = BeautifulSoup(html, "html.parser")
    
    for tag_name in REMOVE_TAGS:
        for element in soup.find_all(tag_name):
            element.decompose()
    
    for class_pattern in REMOVE_CLASSES:
        for element in soup.find_all(class_=lambda c: c and class_pattern in " ".join(c).lower()):
            element.decompose()
    
    for element in soup.find_all():
        if not element.get_text(strip=True) and not element.find("img"):
            element.decompose()
    
    return " ".join(soup.stripped_strings)

# Listing 11.5 & 11.6: Define schema and extraction function
class ProductExtraction(BaseModel):  #K
    product_name: str = Field(description="Full product name")
    brand_name: str = Field(description="Manufacturer or brand")
    description: Optional[str] = Field(default=None, description="Product description")
    price: Optional[str] = Field(default=None, description="Current retail price")
    weight: Optional[str] = Field(default=None, description="Product weight with unit")
    primary_image_url: Optional[str] = Field(default=None, description="Main image URL")
    category: Optional[str] = Field(default=None, description="Product category")

EXTRACTION_PROMPT = """You are a product data extraction assistant for a data engineering pipeline.

Given the text content of a product web page, extract the following fields accurately:
- product_name: The full product name as displayed on the page
- brand_name: The manufacturer or brand
- description: A concise product description (1-3 sentences)
- price: The current retail price with currency symbol
- weight: The product weight with unit if available
- primary_image_url: The URL of the main product image if found in the text
- category: The product category (backpack, tent, sleeping bag, headlamp, etc.)

Rules:
- Only extract information that is explicitly present in the text
- Use null for any field you cannot find or confidently determine
- Do not guess or fabricate values
- For price, use the current or sale price, not the original price if both are shown
- For weight, include the unit (lbs, oz, kg, g)
- For category, use a simple label based on what the product is"""

def extract_product_with_ai(cleaned_text: str, model: str = "gpt-4o") -> ProductExtraction:  #L
    """Extract product fields from cleaned page text using an LLM."""
    response = openai.beta.chat.completions.parse(
        model=model,
        messages=[
            {"role": "system", "content": EXTRACTION_PROMPT},
            {"role": "user", "content": cleaned_text[:8000]},
        ],
        response_format=ProductExtraction,
    )
    return response.choices[0].message.parsed

# Process successful URL discoveries
print(f"\nProcessing {len(df_urls[df_urls['status'] == 'success'])} products...\n")

extraction_results = []  #M
for _, row in df_urls[df_urls['status'] == 'success'].iterrows():
    record = {
        "brand_name": row['brand_name'],
        "product_name": row['product_name'],
        "url": row['url'],
        "status": "error"
    }
    
    try:
        print(f"  • {row['product_name']}...")
        
        # Fetch & clean HTML
        raw_html = requests.get(row['url'], timeout=10).text
        cleaned = clean_html_aggressive(raw_html)
        
        # Extract with AI
        extraction = extract_product_with_ai(cleaned)
        
        record.update({
            "extracted_name": extraction.product_name,
            "extracted_brand": extraction.brand_name,
            "extracted_price": extraction.price,
            "extracted_weight": extraction.weight,
            "extracted_category": extraction.category,
            "raw_chars": len(raw_html),
            "cleaned_chars": len(cleaned),
            "status": "success"
        })
    except Exception as e:
        print(f"    Error: {type(e).__name__}")
        record["status"] = f"error: {type(e).__name__}"
    
    extraction_results.append(record)
    time.sleep(2)  # Rate limiting for API calls

results_df = pd.DataFrame(extraction_results)  #N
print(f"\nExtraction Results:\n")
display(results_df[['product_name', 'status', 'extracted_price', 'extracted_weight', 'extracted_category']])

success_count = (results_df['status'] == 'success').sum()
print(f"\n✓ {success_count}/{len(results_df)} extractions successful")

### Question 4: Evaluate Results

Analyze extraction success rate and field coverage across all processed products.

In [None]:
print("="*60)
print("QUESTION 4: Evaluate Results")
print("="*60)

if len(results_df) > 0:
    success_df = results_df[results_df['status'] == 'success']
    total = len(results_df)
    success_count = len(success_df)
    
    # Summary stats
    print(f"\nExtraction Summary:\n")
    summary = pd.DataFrame({
        "Metric": ["Total products", "Successful", "Failed", "Success rate"],
        "Value": [
            total,
            success_count,
            total - success_count,
            f"{success_count / total:.0%}" if total > 0 else "N/A"
        ]
    })
    display(summary)
    
    # Field coverage
    if success_count > 0:
        fields = ['extracted_name', 'extracted_brand', 'extracted_price', 'extracted_weight', 'extracted_category']
        coverage_data = []
        for field in fields:
            if field in success_df.columns:
                populated = success_df[field].notna().sum()
                coverage_data.append({
                    "Field": field.replace('extracted_', ''),
                    "Populated": populated,
                    "Coverage": f"{populated}/{success_count}",
                    "Percentage": f"{populated/success_count:.0%}"
                })
        
        coverage_df = pd.DataFrame(coverage_data)
        print(f"\nField Coverage (of successful extractions):\n")
        display(coverage_df)
        
        avg_coverage = coverage_df['Populated'].sum() / (len(coverage_df) * success_count) * 100
        print(f"\n✓ Average field coverage: {avg_coverage:.0f}%")
    else:
        print("\nNo successful extractions to evaluate")
else:
    print("\nNo data to evaluate")