In [690]:
import os
import re
import json
import io
import requests
import ast
from typing import Optional
from datetime import date
from collections import deque

from dotenv import load_dotenv

from pydantic import BaseModel, Field
from bs4 import BeautifulSoup
from langchain_core.prompts import PromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.messages import AIMessage
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_text_splitters import RecursiveCharacterTextSplitter
from unstructured.partition.html import partition_html

In [691]:
# **Approach 2:**

# 1. “Given the following HTML: `html`, write a parser using playwright that pulls out all sections involving clothing”
# 2. Validate and check the generated script for security
# 3. Execute the scripts against the HTML
# 4. “Given the following HTML snippet, extract `ClothingItem` from the data”

In [692]:
load_dotenv()

True

In [693]:
# TODO: Consider an HTML-aware text splitter to not cut off HTML tags
SEARCH_MAX_RESULTS = 1
CHUNK_SIZE = 20000
MIN_CHUNK_SIZE = 1000
CHUNK_OVERLAP = 100
MAX_RETRIES = 3
MAX_N_LINKS_TO_SEARCH = 3
SEARCH_QUERY = "Loro Piana"

In [694]:
generate_scraping_script_prompt = """
Given the following HTML:
{html}

Write a parser using BeautifulSoup that pulls out all sections involving clothing.
Return the function to extract in a function called `parse_clothing_sections`.
Put all your python code in a code block in markdown format i.e ```code ...```
Return only the python code, nothing else.

The function should look like this:
```python
async def parse_clothing_sections(html: str):
    ...
```

Parsing Script Python:
"""

In [695]:
parse_results_prompt = """
Given the following clothing search results:

Results:
{results}

Extract a list of ClothingItem objects from the results.
Here is the python class definition for ClothingItem:
{clothing_item_class}

Extract only a List[ClothingItem] and nothing else.

clothing_item_list = 
"""

In [696]:
# TODO: Consider fine tuning a smaller model on GPT-4o's output to train it to
# Parse clothing web pages consistently!!!
# That fine tuning would be the kicker feature to show that I really understand ML/AI engineering!!!
llm = ChatOpenAI(
    model="gpt-4o",
    api_key=os.getenv("OPENAI_API_KEY"),
    temperature=0.0,
    streaming=False,
)

fast_llm = ChatOpenAI(
    model="gpt-4o-mini",
    api_key=os.getenv("OPENAI_API_KEY"),
    temperature=0.0,
    streaming=False,
)

In [697]:
class ClothingItem(BaseModel):
    """
    A clothing item extracted from the internet.
    Includes a variety of possible metadata fields.
    """

    id: Optional[str] = Field(
        None, description="Unique identifier for the clothing item"
    )
    name: Optional[str] = Field(
        None, description="Descriptive name of the clothing item"
    )
    brand: Optional[str] = Field(
        None, description="Manufacturer or designer of the clothing item"
    )
    category: Optional[str] = Field(
        None, description="General category of the item, e.g., shirts, pants, dresses"
    )
    subcategory: Optional[str] = Field(
        None,
        description="More specific classification, e.g., t-shirt, jeans, cocktail dress",
    )
    price: Optional[float] = Field(None, description="Current price of the item")
    original_price: Optional[float] = Field(
        None, description="Original price if the item is on sale"
    )
    image_url: Optional[str] = Field(None, description="URL of the item's image")
    color: Optional[str] = Field(None, description="Primary color of the item")
    sizes: Optional[list[str]] = Field(
        None, description="Available sizes or size range"
    )
    material: Optional[str] = Field(
        None, description="Main fabric or material composition"
    )
    gender: Optional[str] = Field(
        None, description="Target gender if applicable (men's, women's, unisex)"
    )
    season: Optional[str] = Field(
        None,
        description="Appropriate season for the item, e.g., summer, winter, all-season",
    )
    style: Optional[str] = Field(
        None, description="Style of the item, e.g., casual, formal, sporty"
    )
    description: Optional[str] = Field(
        None, description="Detailed text description of the item"
    )
    care_instructions: Optional[str] = Field(
        None, description="Instructions for washing and maintaining the item"
    )
    availability: Optional[str] = Field(
        None, description="Availability status, e.g., in stock, out of stock, pre-order"
    )
    average_rating: Optional[float] = Field(
        None, description="Average customer rating of the item"
    )
    num_reviews: Optional[int] = Field(None, description="Number of customer reviews")
    tags: Optional[list[str]] = Field(
        None,
        description="Keywords associated with the item for searching and categorization",
    )
    dimensions: Optional[dict] = Field(
        None, description="Measurements of the item, e.g., length, width, sleeve length"
    )
    weight: Optional[float] = Field(
        None, description="Weight of the item, useful for shipping calculations"
    )
    release_date: Optional[date] = Field(
        None, description="Date when the item was first available or added to inventory"
    )
    sustainability_info: Optional[str] = Field(
        None, description="Information about eco-friendly or ethical production"
    )


class SimpleClothingItem(BaseModel):
    name: str = Field(..., description="Descriptive name of the clothing item")
    price: Optional[float] = Field(None, description="Current price of the item")
    image_url: Optional[str] = Field(None, description="URL of the item's image")
    link: Optional[str] = Field(None, description="URL of the item's page")


class ClothingItemList(BaseModel):
    clothing_items: list[SimpleClothingItem]

In [698]:
def split_html(html: str) -> list[str]:
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=CHUNK_SIZE,
        # min_chunk_size=MIN_CHUNK_SIZE,
        chunk_overlap=CHUNK_OVERLAP,
        length_function=len,
        is_separator_regex=False,
    )
    return text_splitter.split_text(html)

In [699]:
class SearchResult(BaseModel):
    url: str = Field(..., description="URL of the search result")
    html_content: Optional[str] = Field(
        None, description="HTML content of the search result"
    )


def get_search_results_from_url(url: str) -> list[SearchResult]:
    res = []
    response = requests.get(url)
    response.raise_for_status()
    html_content = response.text
    for split_html_content in split_html(html_content):
        res.append(SearchResult(url=url, html_content=split_html_content))
    return res


def get_search_results(search_query: str) -> list[SearchResult]:
    tavily_search = TavilySearchResults(
        max_results=SEARCH_MAX_RESULTS, include_raw_content=True
    )
    search_results = tavily_search.invoke({"query": search_query})
    res = []
    for search_result in search_results:
        print(f"Search result: {search_result}")
        url = search_result["url"]
        try:
            res.extend(get_search_results_from_url(url))
        except Exception as e:
            print(f"Error fetching HTML from {url}: {e}")
            continue
    return res

In [700]:
def get_test_html() -> str:
    with open("test_web_page.html", "r") as f:
        return f.read()

In [701]:
def extract_code_from_markdown(markdown: str) -> str:
    # Try to extract code blocks marked with ```python first
    if "```python" in markdown:
        return markdown.split("```python")[1].split("```")[0]
    # If no python blocks found, try generic code blocks
    elif "```" in markdown:
        return markdown.split("```")[1].split("```")[0]
    return ""

In [702]:
def get_parsing_script(search_results: dict) -> str:
    prompt = PromptTemplate(
        input_variables=["html"],
        template=generate_scraping_script_prompt,
    )
    extract_prompt = prompt.format(html=search_results["content"])
    # print(f"Extract prompt: {extract_prompt}")
    raw_res = AIMessage.model_validate(llm.invoke(extract_prompt)).content
    # print(f"Raw res: {raw_res}")
    return extract_code_from_markdown(raw_res)

In [703]:
def validate_parsing_script(script: str) -> bool:
    """
    Validate the generated parsing script for security concerns.
    Returns True if script passes security checks, False otherwise.
    """
    # List of forbidden operations/keywords that could be dangerous
    forbidden = [
        "import os",
        "import sys",
        "subprocess",
        "eval(",
        "exec(",
        "__import__",
        "open(",
        "write",
        "delete",
        "remove",
    ]

    # Check for forbidden operations
    for item in forbidden:
        if item in script.lower():
            print(f"Security violation: Found forbidden operation '{item}'")
            return False

    # Only allow importing safe parsing libraries
    allowed_imports = {
        "json",
        "asyncio",
        "playwright",
        "beautifulsoup4",
        "bs4",
        "lxml",
        "html.parser",
        "re",
    }

    import_lines = [
        line.strip()
        for line in script.split("\n")
        if line.strip().startswith("import") or line.strip().startswith("from")
    ]

    for line in import_lines:
        module = line.split()[1].split(".")[0]
        if module not in allowed_imports:
            print(f"Security violation: Unauthorized import '{module}'")
            return False

    return True

In [704]:
class FunctionExtractor(ast.NodeVisitor):
    def __init__(self):
        self.functions = []

    def visit_FunctionDef(self, node):
        # Collect regular function name and body
        self.functions.append({
            'name': node.name,
            'type': 'regular',
            'body': ast.unparse(node)
        })
        self.generic_visit(node)

    def visit_AsyncFunctionDef(self, node):
        # Collect async function name and body
        self.functions.append({
            'name': node.name,
            'type': 'async',
            'body': ast.unparse(node)
        })
        self.generic_visit(node)

def extract_functions_from_code(code):
    tree = ast.parse(code)
    extractor = FunctionExtractor()
    extractor.visit(tree)
    return extractor.functions

In [705]:
async def execute_parsing_script(script: str, html: str) -> list:
    local_namespace = {}
    try:
        # Execute the function body in the local namespace
        exec(script, globals(), local_namespace)
        parse_clothing_sections = local_namespace.get("parse_clothing_sections")
        if not parse_clothing_sections:
            raise ValueError("parse_clothing_sections function not found")
        return await parse_clothing_sections(html)
    except Exception as e:
        print(f"Error executing parsing script: {str(e)}")

In [706]:
def result_extractor(raw_results: str) -> ClothingItemList:
    structured_output_llm = llm.with_structured_output(ClothingItemList)
    
    for attempt in range(MAX_RETRIES):
        try:
            raw_res = structured_output_llm.invoke(raw_results)
            print(f"Raw res: {raw_res}")
            return ClothingItemList.model_validate(raw_res)
        except Exception as e:
            if attempt == MAX_RETRIES - 1:  # Last attempt
                raise
            print(f"Attempt {attempt + 1} failed: {str(e)}. Retrying...")

In [707]:
# search_results = split_html(get_test_html())
# print(f"len(search_results): {len(search_results)}")
# print(f"Search results: {search_results[100:110]}")
# print([res_len for res_len in [len(res) for res in search_results]])
# print(
#     f"Average chunk length: {sum(len(chunk) for chunk in search_results) / len(search_results)}"
# )

In [708]:
# TODO: Build a classifer to search results to see if a given result is about a clothing item
# "Is this in the main body of the HTML and is mainly text, or is it just a bunch of numbers?"

In [709]:
html_contains_clothing_info_prompt = """
Given the following HTML snippet:
{html}

Does this HTML contain information about a clothing item or links that are promising for finding clothing items?
Return "true" if it does, "false" otherwise.
"""

In [710]:
get_clothing_item_links_prompt = """
Given the following list of links:
{links}

Extract a list of links that are promising for finding clothing items.
"""

In [711]:
get_n_most_promising_links_prompt = """
Given the following list of links:
{links}

Return the {n} most promising links from the list for finding clothing items.
The link should be something the user clicks on to get to a clothing item page for purchasing.

Links:
"""

In [712]:
def contains_clothing_item_info_or_links(html_chunk: str) -> bool:
    """
    Returns True if the HTML chunk is about a clothing item
    or links that are promising for finding clothing items, False otherwise.
    """
    prompt = PromptTemplate(
        input_variables=["html"],
        template=html_contains_clothing_info_prompt,
    )
    extract_prompt = prompt.format(html=html_chunk)
    raw_res = AIMessage.model_validate(fast_llm.invoke(extract_prompt)).content
    return "true" in raw_res.lower()  # TODO: Consider structured output in the future

In [713]:
def extract_all_links(html_chunk: str) -> list[str]:
    """
    Returns a list of all links in the HTML chunk.
    """
    soup = BeautifulSoup(html_chunk, "html.parser")
    return [link.get("href") for link in soup.find_all("a")]

In [714]:
class ClothingItemLinks(BaseModel):
    links: list[str] = Field(
        ..., description="List of links that are promising for finding clothing items"
    )


def get_clothing_item_links(search_result: SearchResult) -> list[str]:
    """
    Returns a list of links that are promising for finding clothing items.
    """
    raw_links = extract_all_links(search_result.html_content)
    print(f"HTML content: {search_result.html_content}")
    all_links = []
    for link in raw_links:
        if link and not (link.startswith("http://") or link.startswith("https://")):
            all_links.append(search_result.url + link)
        else:
            all_links.append(link)
    print(
        f"All links: {all_links}"
    )  # NOTE: Extracting the links takes ~5 minutes, not a real-time operation
    prompt = PromptTemplate(
        input_variables=["links"],
        template=get_clothing_item_links_prompt,
    )
    get_links_prompt = prompt.format(links=all_links)
    link_extract_llm = llm.with_structured_output(ClothingItemLinks)
    raw_res = link_extract_llm.invoke(get_links_prompt)
    return ClothingItemLinks.model_validate(raw_res).links

In [715]:
# TODO: This is not a real-time feature, architect the frontend to show users search progress while the model is working
# TODO: Why is the search result turning up so much garbage in the HTML?

# TODO: Toplevel pages seem to not have much specific clothing into - will need to dive deeper into websites to get more specific info
# TODO: The scraping logic works, but needs to find web pages where the information actually is - the LLM needs to do some browser control
# Writing a playwright script is good for this!!!
# "Given this following web page, write a playwright script to follow the top N most promising links to get more information about clothing items"

In [716]:
# TODO: Implement BFS with the link traversal script

In [717]:
def get_n_most_promising_links(links: list[str], n: int) -> list[str]:
    """
    Returns the N most promising links from the list.
    """
    prompt = PromptTemplate(
        input_variables=["links", "n"],
        template=get_n_most_promising_links_prompt,
    )
    get_links_prompt = prompt.format(links=links, n=n)
    link_extract_llm = llm.with_structured_output(ClothingItemLinks)
    raw_res = link_extract_llm.invoke(get_links_prompt)
    return ClothingItemLinks.model_validate(raw_res).links

In [718]:
async def parse_search_result(search_result: SearchResult) -> list[ClothingItem]:
    print(f"Search chunk: {search_result}")
    try:
        parsing_script = get_parsing_script({"content": search_result})
    except Exception as e:
        print(f"Error getting parsing script: {str(e)}")
        raise e
    if validate_parsing_script(parsing_script):
        # print(f"Raw Parsing script: {parsing_script}")
        extracted_funcs = extract_functions_from_code(parsing_script)
        # print(f"Extracted functions: {extracted_funcs}")
        if len(extracted_funcs) != 1:
            print(f"Expected 1 function, got {len(extracted_funcs)}")
            raise e

        parse_clothing_sections_func = extracted_funcs[0]["body"]
        # print(f"Parsing script:\n{parse_clothing_sections_func}")
        results_safe = validate_parsing_script(parse_clothing_sections_func)
        if not results_safe:
            print("Parse clothing sections function failed security validation")
            raise e

        raw_results = await execute_parsing_script(
            parse_clothing_sections_func, search_result
        )
        print(f"Results: {raw_results}")
        if raw_results and len(raw_results) > 0:
            try:
                parsed_results = result_extractor(str(raw_results))
                print(f"Parsed results: {parsed_results}")
                return parsed_results.clothing_items
            except Exception as e:
                print(f"Error parsing results: {str(e)}")
                raise e
        else:
            return []
    else:
        print("Generated script failed security validation")
        return []

In [719]:
# TODO: Implement a depth limit on BFS to make sure it terminates
async def run_clothing_search_crawler(search_query: str) -> list[ClothingItem]:
    """
    Uses BFS and tavily search to find clothing items and crawl search results for
    promising clothing item links.
    """
    queue: deque[SearchResult] = deque()
    search_results = get_search_results(search_query)
    final_result: list[ClothingItem] = []
    # search_results = split_html(get_test_html())
    for search_result in search_results:
        queue.append(search_result)
    while queue:
        search_result = queue.popleft()
        print(f"Parent URL: {search_result.url}")
        if not contains_clothing_item_info_or_links(search_result):
            print(f"Skipping chunk: {search_result}")
            continue

        # Parse the current search result
        parsed_results = await parse_search_result(search_result)
        final_result.extend(parsed_results)

        # Continue BFS by adding the most promising links to the queue
        links = get_clothing_item_links(search_result)
        print(f"Links: {links}")
        if not links:
            print(f"No links found for {search_result.url}, continuing search")
            continue
        # TODO: Follow the most promising links or search the current page
        num_links_to_search = min(MAX_N_LINKS_TO_SEARCH, len(links))
        links_to_search = get_n_most_promising_links(links, num_links_to_search)
        print(f"Links to search: {links_to_search}")
        for link in links_to_search:
            try:
                queue.append(get_search_results_from_url(link))
            except Exception as e:
                print(f"Error fetching HTML from {link}: {e}")
                continue

    return final_result

In [720]:
result = await run_clothing_search_crawler(SEARCH_QUERY)
print(f"Result: {result}")

Search result: {'url': 'https://en.wikipedia.org/wiki/Loro_Piana', 'content': "After taking the lead of the company in the 1960s, Franco Loro Piana, grandson of Pietro, started to export high quality fabrics to Europe, America and Japan.[2]\nIn the 1970s, the company was directed by Franco's sons, Sergio and Pier Luigi, whose focus was on top quality fabrics development, including cashmere and extra fine wools. In the second half of this century, the family moved its activity to Valsesia and founded the wool spinning mill Fratelli Lora e Compagnia, followed by the wool spinning mill Zignone & C. in Quarona at the beginning of the 20th century.\n It has a total of 136 stores that are directly operated.[1][8]\nOperations[edit]\nLoro Piana is vertically integrated, and it handles all stages of production, from the harvesting of natural fibers to the delivery of the finished product to stores.[1]\n The company has three divisions: the textile division produces high quality textiles from ca

TypeError: 'NoneType' object is not iterable

In [515]:
# TODO: Consider using partition_html from unstructured to grab the HTML content for each page
# Seems to be more consistent that the custom parsers the AI is writing
# TODO: Compare partition_html vs. writing custom parsers