# Broker Agent Experimental Code
This is a notebook for exploring the codebase and testing ideas.
Goal is the make sure we can connect to a website and scrape/browse/interact with the page.

In [1]:
import json
import re
import asyncio
import traceback

from langchain_ollama import ChatOllama
from playwright.async_api import async_playwright

from broker_agent.config.config import BrokerAgentConfig

In [2]:
config = BrokerAgentConfig.from_yaml_and_env()

In [3]:
ollama = ChatOllama(base_url=config.OLLAMA_BASE_URL, model=config.llm)

In [4]:
async def extract_playwright_script(script_content):
    """
    Extract the playwright script from between the ```playwright and ``` markers

    Args:
        script_content (str): The content containing the playwright script

    Returns:
        str or None: The extracted script if found, None otherwise
    """
    playwright_code = re.search(r"```playwright\n(.*?)```", script_content, re.DOTALL)
    if playwright_code:
        return playwright_code.group(1)
    return None

In [5]:
# TODO: May need to filter a11y tree to ensure best model understanding
def format_a11y_tree(tree_data):
    """Format an accessibility tree for better readability."""
    
    # Handle input that's already a dictionary
    if isinstance(tree_data, dict):
        tree = tree_data
    else:
        raise TypeError("Input must be a dictionary or string")
    
    # Format the tree recursively
    def format_node(node, depth=0):
        indent = "  " * depth
        result = []
        
        # Node header with role and name
        header = f"{indent}- {node['role']}: \"{node['name']}\""
        result.append(header)
        
        # Process children if present
        if 'children' in node and node['children']:
            if len(node['children']) > 10:
                # Summarize if there are many children
                shown_children = node['children'][:5]
                result.append(f"{indent}  ↳ Children ({len(node['children'])} total):")
                for child in shown_children:
                    result.extend(format_node(child, depth + 2))
                result.append(f"{indent}  ↳ ... {len(node['children']) - 5} more children ...")
            else:
                result.append(f"{indent}  ↳ Children:")
                for child in node['children']:
                    result.extend(format_node(child, depth + 2))
        
        return result
    
    formatted = format_node(tree)
    return "\n".join(["# Accessibility Tree Structure", "```", *formatted, "```"])

In [6]:
async def get_test_playwright_script() -> str:
    return """
```playwright
async def main():
    # Get elements using aria-labels and roles
    price_button = page.get_by_role("button", name="Price")
    bedrooms_select = page.get_by_label('Beds / Baths')
    no_fee_checkbox = page.get_by_label('No Fee Only')

    # Click the Price button to open price filter
    await price_button.click()

    # Click the no fee checkbox since its under the pricing menu
    no_fee_checkbox = page.get_by_label('No Fee Only')
    await no_fee_checkbox.click()

    # Wait for and fill in min/max price fields
    await page.wait_for_selector("input[placeholder='No min']")
    min_price_input = page.get_by_placeholder("No min")
    await min_price_input.click()
    await min_price_input.fill("3000")

    max_price_input = page.get_by_placeholder("Max")
    await max_price_input.click()
    await max_price_input.fill("4000")
    
    # Press ESC to dismiss dropdown options that might be blocking the Done button
    await page.keyboard.press("Escape")

    # Select Studio from Bedrooms dropdown
    await bedrooms_select.click()
    studio_option = page.get_by_test_id("desktop-filter").get_by_text("Studio")
    await studio_option.click()

    # Press ESC to dismiss dropdown options that might be blocking the Done button
    await page.keyboard.press("Escape")
```
"""

In [7]:
# TODO: Do we need a all the DOM content to generate a script?
# The following is the DOM content and accessibility tree of the website:
# <dom_content>
# {dom_content}
# </dom_content>

async def generate_playwright_script(website: str, dom: str, a11y_tree: str, error_message: str) -> str:
    generate_playwright_script_prompt = """
    Generate a Python Playwright script to earch for a studio apartment
    in the $3000-$4000 range. Note the following playwright code has already been executed:

    <already_executed_code>
    await page.goto(website)
    dom_content = await page.content()
    a11y_tree = await page.accessibility.snapshot
    </already_executed_code>

    <aria_labels>
    - 'Price'
    - 'Beds / Baths
    - 'No min'
    - 'No max'
    - 'Options' (This one is a form for the min/max price in dollars)
    - 'No Fee Only'
    - 'Done' (This one sets the price level)
    - 'Search' (This one clicks the search button after setting the price and number of bedrooms)
    </aria_labels>
    
    <rules>
    - Start the script by async grabbing elements with `page.get_by_role()` or `page.get_by_label()` or `page.get_by_text()` or similar functions since we already
    have the DOM content on the current page
        - Don't use `await` when calling these functions, just call them sync i.e `page.get_by_role("button", name="Price")` or `page.get_by_label("...")`
    - You also don't need to wait for the page to load i.e `page.wait_for_event("load")` since
    the page is already loaded
    - Click the 'No Fee Only' button before filling in the below input forms with min/max price
    - Use `await page.wait_for_selector("input[placeholder='No min']")` and `await page.wait_for_selector("input[placeholder='No max']")` to select the minimum and maximum price boxes
        - Then click them like `await page.get_by_placeholder("No min").click()`
        - Then enter fields i.e `await page.get_by_placeholder("No min").fill("3000")`
        - Finally, after entering the price, press escape to exit the price selection menu `await page.keyboard.press("Escape")`
    - Set a timeout of 5000ms (5 seconds) for all your actions taken on the DOM i.e `await page.goto(website, timeout=5000)`
    - Make sure to click elements like form fields before entering data into them
    - Use the `aria-label` attributes from the a11y tree to find the elements to interact with
    - The search script should handle clicking the 'Price' button, filling out the min and max price, 
    filling out the 'Bedrooms' dropdown and selecting 'Studio', and clicking the 'Search' button.
    - Make sure that the generated playwright code uses the async playwright API i.e (from playwright.async_api import async_playwright).
    - Don't run the code directly with asyncio.run() - just return the code in the main() function.
    - You don't need to use the page.goto() function since we are already on the target website.
    - Use the following test ID filter to filter by apartment type: `page.get_by_test_id("desktop-filter").get_by_text("Studio")`
    </rules>

    If there was an error in the previous playwright script, please use the following error message to generate a new playwright script:
    <error_message>
    {error_message}
    </error_message>

    <a11y_tree>
    {a11y_tree}
    </a11y_tree>

    Return the playwright script in markdown format like the following:
```playwright
async def main():
    # Get elements using aria-labels and roles
    price_button = page.get_by_role("button", name="Price")
    bedrooms_select = page.get_by_label('Beds / Baths')
    no_fee_checkbox = page.get_by_label('No Fee Only')

    # Click the Price button to open price filter
    await price_button.click()

    # Click the no fee checkbox since its under the pricing menu
    no_fee_checkbox = page.get_by_label('No Fee Only')
    await no_fee_checkbox.click()

    # Wait for and fill in min/max price fields
    await page.wait_for_selector("input[placeholder='No min']")
    min_price_input = page.get_by_placeholder("No min")
    await min_price_input.click()
    await min_price_input.fill("3000")

    max_price_input = page.get_by_placeholder("Max")
    await max_price_input.click()
    await max_price_input.fill("4000")
    
    # Press ESC to dismiss dropdown options that might be blocking the Done button
    await page.keyboard.press("Escape")

    # Select Studio from Bedrooms dropdown
    await bedrooms_select.click()
    studio_option = page.get_by_test_id("desktop-filter").get_by_text("Studio")
    await studio_option.click()

    # Press ESC to dismiss dropdown options that might be blocking the Done button
    await page.keyboard.press("Escape")
    ...
```

    Playwright script:
    """
    filled_prompt = generate_playwright_script_prompt.format(a11y_tree=format_a11y_tree(a11y_tree), error_message=error_message)
    print(f"Prompt = {filled_prompt}")
    return (await ollama.ainvoke(filled_prompt)).content

In [8]:
async def execute_llm_script(script_code, context_objects):
    # Define a local namespace with your objects
    namespace = context_objects.copy()
    
    # Execute the code in this namespace
    exec(script_code, namespace)
    
    # If the script defines a main function, call it
    if 'main' in namespace and callable(namespace['main']):
        await namespace['main']()
    else:
        raise ValueError("No main function found in the script")

## Python vs. JS Playwright
If there's too few examples for python playwright, then try out the JS playwright instead

In [None]:
async def main():
    playwright = await async_playwright().start()
    browser = await playwright.chromium.launch(headless=False)

    for website in config.websites:
        print(f"Navigating to {website}")
        page = await browser.new_page()

        try:
            retry_count = 0 
            error_message = ""
                
            while retry_count < config.script_generation_retries:
                await page.goto(website, timeout=50000)
                dom_content = await page.content()
                print(f"DOM content length: {len(dom_content)} characters")
                a11y_tree = await page.accessibility.snapshot()
                print(
                    f"Accessibility tree obtained with {len(a11y_tree.get('children', []))} root elements"
                )
                print(a11y_tree)

                print("Generating Playwright script...")
                script = await generate_playwright_script(website, dom_content, a11y_tree, error_message)

                # NOTE: Re-enable if testing a playwright script
                # script = await get_test_playwright_script()
                print(script)

                # Extract and execute the playwright script
                extracted_script = await extract_playwright_script(script)
                try:
                    # Execute the extracted script against the current page
                    exec_globals = {
                        "page": page,
                        "asyncio": asyncio,
                        "playwright": playwright,
                    }
                    # TODO: Is there a more secure way to do this than exec?
                    # TODO: How to handle dynamic playwright execution?
                    await execute_llm_script(extracted_script, exec_globals)
                    print("Script execution completed")
                    break  # Exit the retry loop on success
                except Exception as script_error:
                    retry_count += 1
                    error_message = str(script_error) + "\n" + traceback.format_exc()
                    print(f"Error executing generated script (attempt {retry_count}/{config.script_generation_retries}): {script_error}")
                    print(f"Script execution stack trace:\n{traceback.format_exc()}")
                    if retry_count >= config.script_generation_retries:
                        raise Exception("Maximum retry attempts reached")
                    else:
                        print(f"Retrying in 2 seconds...")
                        await asyncio.sleep(2)

                await page.close()

                # TODO: Implement exponential backoff for politeness when visiting
                await asyncio.sleep(2)

        except Exception as e:
            print(f"Error processing {website}: {e}")
            print(f"Call stack:\n{traceback.format_exc()}")
            await page.close()

    await browser.close()
    await playwright.stop()


await asyncio.gather(main())

Navigating to https://streeteasy.com/for-rent/nyc
DOM content length: 1218917 characters
Accessibility tree obtained with 379 root elements
{'role': 'WebArea', 'name': 'NYC Apartments for Rent - Updated Daily | StreetEasy', 'children': [{'role': 'link', 'name': 'Skip Navigation'}, {'role': 'link', 'name': 'StreetEasy Logo'}, {'role': 'link', 'name': 'Advertise Link'}, {'role': 'button', 'name': 'Sign In / Register'}, {'role': 'button', 'name': 'Rent', 'haspopup': 'menu'}, {'role': 'button', 'name': 'Buy', 'haspopup': 'menu'}, {'role': 'link', 'name': 'Sell Your Home', 'haspopup': 'menu'}, {'role': 'link', 'name': 'Buildings', 'haspopup': 'menu'}, {'role': 'button', 'name': 'Resources', 'haspopup': 'menu'}, {'role': 'link', 'name': 'Blog', 'haspopup': 'menu'}, {'role': 'text', 'name': 'e.g. address, building, agent'}, {'role': 'textbox', 'name': 'e.g. address, building, agent'}, {'role': 'button', 'name': 'Search'}, {'role': 'heading', 'name': 'LOCATION', 'level': 5}, {'role': 'button',

[None]