## CCnR Retreival using GPT-4o

The AI Agent browses the webpages to search for relevant documents, confirm it's existence and proceed to download it to the Local System.

Note: The code only works on **MacOS** due to bugs concerning Jupyter Notebook on Windows.

Test Run: https://drive.google.com/file/d/1sAWOf3a9bA1gzQR3pQD1WIhf0G2LEiaO/view?usp=sharing

In [None]:
%%capture --no-stderr
# Installing all required dependencies for LangChain, LangSmith and Playwright
try:
    !pip install -U --quiet langchain langchain_openai langchain_community langsmith langgraph langchainhub playwright langchain_experimental pandas openpyxl
    !playwright install

except Exception as e:
    print(f"An error occurred during installation: {e}")

In [None]:
import os
import getpass

# Set up environment variables for OpenAI, LangChain tracing, and project name.
def _set_if_undefined(var: str):
    while True:
        value = getpass.getpass(f"Please provide your {var}: ")
        if value:
            os.environ[var] = value
            break
        else:
            print(f"Please enter a value for {var}.")

_set_if_undefined("OPENAI_API_KEY")
_set_if_undefined("LANGCHAIN_API_KEY")


os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "idkk3"

In [None]:
# Assigning fie paths for mark_page.js file, downloaded file and url sheet

mark_page_path = "/content/mark_page_3.js"
download_location_path = ""
downloaded_file_path = download_location_path + "/CCnR_download.pdf" #Change according to need
excel_path = "/content/Without_captcha.xlsx" #Optional - only if we need to retreive website urls from a sheet

#### Install Agent requirements

The only additional requirement we have is the [playwright](https://playwright.dev/) browser.

In [None]:
import nest_asyncio
import sys

# Apply nested event loop patch if running in a Jupyter Notebook environment.
# This allows for seamless asynchronous use of Playwright.
if "ipykernel" in sys.modules:
    nest_asyncio.apply()

## Define Graph State

The state provides the inputs to each node in the graph.

In our case, the agent will track the webpage object (within the browser), annotated images + bounding boxes, the user's initial request, and the messages containing the agent scratchpad, system prompt, and other information.


In [None]:
from typing import List, Optional, TypedDict
from langchain_core.messages import BaseMessage, SystemMessage, HumanMessage
from playwright.async_api import Page

# Represents a bounding box with coordinates, text content, type, and an ARIA label.
class BBox(TypedDict):
    x: float  # X-coordinate of the top-left corner
    y: float  # Y-coordinate of the top-left corner
    text: str  # Text content within the bounding box
    type: str  # Type of element (e.g., button, input)
    ariaLabel: str  # ARIA label for accessibility

# Represents the agent's predicted action and optional arguments.
class Prediction(TypedDict):
    action: str  # The predicted action to take (e.g., "click", "type")
    args: Optional[List[str]]  # Optional arguments for the action

# Represents the state of the LangChain agent during execution.
class AgentState(TypedDict):
    page: Page  # Playwright web page for interacting with the browser
    input: str  # User's input/request
    img: str  # Base64-encoded screenshot of the current page state (The accepted format of image by GPT-4o)
    bboxes: List[BBox]  # List of bounding boxes from the page
    prediction: Prediction  # Agent's prediction for the next action
    scratchpad: List[BaseMessage]  # Intermediate steps/messages in the agent's reasoning
    observation: str  # Most recent response from the LLM

## Define tools

The agent has 6 simple tools:

1. Click (at labeled box)
2. Type
3. Scroll
4. Wait
5. Go back
6. Go to search engine (Google)


We define them below here as functions. Each function takes in the state as the input and returns a string indicating the status of its actions.`

In [None]:
import asyncio
import platform


async def click(state: AgentState) -> str:
    # - Click [Numerical_Label]
    page = state["page"]
    click_args = state["prediction"]["args"]
    if click_args is None or len(click_args) != 1:
        return f"Failed to click bounding box labeled as number {click_args}"
    bbox_id = click_args[0]
    try:
        bbox_id = int(bbox_id)
        bbox = state["bboxes"][bbox_id]
    except:
        return f"Error: no bbox for : {bbox_id}. Please try again"
    x, y = bbox["x"], bbox["y"]
    await page.mouse.click(x, y)
    await asyncio.sleep(5)
    return f"Clicked {bbox_id}"

async def download_doc(state: AgentState) -> str:
    # Click a certain distance away from the close button to click on download, works only for websites of the type 'pubrec6'
    count = 0
    page = state["page"]
    bbox_id = 3
    try:
        bbox_id = int(bbox_id)
        bbox = state["bboxes"][bbox_id]
    except:
        return f"Error: no bbox for : {bbox_id}. Please try again"
    x, y = bbox["x"] - 49, bbox["y"] + 36 #offset distance
    await page.mouse.click(x, y)
    while count < 3: #trying to download 3 times
        try:
            download = await page.wait_for_event('download', timeout = 15000)

            # Save the downloaded file to the specified path
            await download.save_as(download_file_path)
            return f"Clicked {bbox_id}, download completed, You can rest now"
        except Exception as e:
            print(f"Couldn't download {count} time cause of {e}, retrying...")
            count+=1
            continue
    return f"Download failed due to {e}, please try again."

async def type_text(state: AgentState) -> str:
    # type [content] in [numerical_label] and hit enter
    page = state["page"]
    type_args = state["prediction"]["args"]
    if type_args is None or len(type_args) != 2:
        return (
            f"Failed to type in element from bounding box labeled as number {type_args}"
        )
    bbox_id = type_args[0]
    try:
        bbox_id = int(bbox_id)
        bbox = state["bboxes"][bbox_id]
    except:
        return f"Error: no bbox for : {bbox_id}. Please try again"
    x, y = bbox["x"], bbox["y"]
    text_content = type_args[1]
    await page.mouse.click(x, y)
    # Check if MacOS
    select_all = "Meta+A" if platform.system() == "Darwin" else "Control+A"
    await page.keyboard.press(select_all)
    await page.keyboard.press("Backspace")
    await page.keyboard.type(text_content)
    await page.keyboard.press("Enter")
    await asyncio.sleep(5)
    return f"Typed {text_content} and submitted"


async def scroll(state: AgentState) -> str:
    # Move mouse to location and scroll up or down
    page = state["page"]
    scroll_args = state["prediction"]["args"]
    if scroll_args is None or len(scroll_args) != 2:
        return "Failed to scroll due to incorrect arguments."

    target, direction = scroll_args

    try:
        if target.upper() == "WINDOW":
            # Not sure the best value for this:
            scroll_amount = 500
            scroll_direction = (
                -scroll_amount if direction.lower() == "up" else scroll_amount
            )
            await page.evaluate(f"window.scrollBy(0, {scroll_direction})")
        else:
            # Scrolling within a specific element
            scroll_amount = 40  #200
            target_id = int(target)
            bbox = state["bboxes"][target_id]
            x, y = bbox["x"], bbox["y"]
            scroll_direction = (
                -scroll_amount if direction.lower() == "up" else scroll_amount
            )
            await page.mouse.move(x, y)
            await page.mouse.wheel(0, scroll_direction)

        return f"Scrolled {direction} in {'window' if target.upper() == 'WINDOW' else 'element'}"
    except:
        return f"The arguments provided {scroll_args} are invalid, please try again"


async def wait(state: AgentState) -> str:
    # Wait
    sleep_time = 5
    await asyncio.sleep(sleep_time)
    return f"Waited for {sleep_time}s."


async def go_back(state: AgentState) -> str:
    # Go back to previous webpage
    page = state["page"]
    await page.go_back()
    await asyncio.sleep(5)
    return f"Navigated back a page to {page.url}."


async def to_google(state: AgentState) -> str:
    # Go to google search engine homepage
    page = state["page"]
    await page.goto("https://www.google.com/")
    await asyncio.sleep(3)
    return "Navigated to google.com."

## Define Agent

The agent is driven by a multi-modal model and decides the action to take for each step. It is composed of a few runnable objects:

1. A `mark_page` function to annotate the current page with bounding boxes
2. A prompt to hold the user question, annotated image, and agent scratchpad
3. GPT-4o to decide the next steps
4. Parsing logic to extract the action for the tools defined


Let's first define the annotation step:
#### Browser Annotations

This function annotates all buttons, inputs, text areas, etc. with numbered bounding boxes. GPT-4o then just has to refer to a bounding box
when taking actions, reducing the complexity of the overall task.

In [None]:
import base64
import logging
from langchain_core.runnables import chain as chain_decorator
from playwright.async_api import Page, Error as PlaywrightError
from IPython.display import Image, display
import io

with open(mark_page_path) as f:
    mark_page_script = f.read()

@chain_decorator
async def mark_page(page: Page) -> dict:
    """
    Marks interactable elements on the page, retrieves bounding boxes, and takes a screenshot.

    Args:
        page: The Playwright Page object representing the web page.

    Returns:
        A dictionary containing:
            - "img": Base64-encoded string of the page screenshot.
            - "bboxes": List of bounding boxes for marked elements.
    """
    await page.evaluate(mark_page_script)

    for attempt in range(10):
        try:
            bboxes = await page.evaluate("markPage()")  # Apply the markPage() function on the webpage
            break
        except PlaywrightError as e:
            logging.warning(f"Error retrieving bounding boxes (attempt {attempt + 1}): {e}")  # Log error
            await asyncio.sleep(3) # The browser maybe loading
    else:
        raise Exception("Failed to retrieve bounding boxes even after multiple attempts.")  # Raise error if all attempts fail

    screenshot = await page.screenshot()
    img_stream = io.BytesIO(screenshot)
    img_bytes = img_stream.getvalue()
    display(Image(img_bytes))
    await page.evaluate("unmarkPage()") # Ensure the bboxes don't follow us around
    return {
        "img": base64.b64encode(screenshot).decode(),
        "bboxes": bboxes,
    }

#### Agent definition

Now we'll compose this function with the prompt, llm and output parser to complete our agent.

In [None]:
from langchain import hub
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain.prompts import ChatPromptTemplate, SystemMessagePromptTemplate

async def annotate(state):
    """
    Uses the above defined mark_page.js script to update the agent state.

    Args:
        state: The current agent state.

    Returns:
        The updated agent state with annotation data.
    """
    try:
        marked_page = await mark_page.with_retry().ainvoke(state["page"])
        return {**state, **marked_page}
    except Exception as e:
        return {**state, "error": f"Annotation failed: {e}"}

def format_descriptions(state):
    """
    Formats descriptions for bounding boxes in the agent state.

    Args:
        state: The current agent state.

    Returns:
        The updated agent state with formatted bounding box descriptions.
    """
    labels = []
    for i, bbox in enumerate(state["bboxes"]):
        text = bbox.get("ariaLabel") or bbox.get("text") or "Empty"  # Default to "Empty"
        el_type = bbox.get("type", "unknown")  # Default to "unknown"
        labels.append(f'{i} (<{el_type}/>): "{text}"')
    bbox_descriptions = "\nValid Bounding Boxes:\n" + "\n".join(labels)
    return {**state, "bbox_descriptions": bbox_descriptions}

def parse(text: str) -> dict:
    """
    Parses LLM output to extract an action and its arguments.

    Args:
        text: The raw LLM output text.

    Returns:
        A dictionary containing the extracted action and arguments (if any).
    """
    action_prefix = "Action: "
    if not text.strip().split("\n")[-1].startswith(action_prefix):
        logging.error(f"Unexpected LLM output format: {text}")
        return {"action": "retry", "args": f"Unexpected LLM output format: {text}"}
    action_block = text.strip().split("\n")[-1]

    action_str = action_block[len(action_prefix) :]
    split_output = action_str.split(" ", 1)
    if len(split_output) == 1:
        action, action_input = split_output[0], None
    else:
        action, action_input = split_output
    action = action.strip()
    if action_input is not None:
        action_input = [
            inp.strip().strip("[]") for inp in action_input.strip().split(";")
        ]
    print({"action": action, "args": action_input})
    return {"action": action, "args": action_input}

In [None]:
updated_system_message = '''Imagine you are a robot browsing the web, just like humans. Now you need to complete a task. In each iteration, you will receive an Observation that includes a screenshot of a webpage and some texts. This screenshot will
feature Numerical Labels placed in the TOP LEFT corner of each Web Element. Carefully analyze the visual
information to identify the Numerical Label corresponding to the Web Element that requires interaction, then follow
the guidelines and choose one of the following actions:

1. Click a Web Element.
2. Delete existing content in a textbox and then type content.
3. Scroll up or down.
4. Wait
5. Go back
7. Return to google to start over.
8. Respond with the final answer

Correspondingly, Action should STRICTLY follow the format:

- Click [Numerical_Label]
- Type [Numerical_Label]; [Content]
- Scroll [Numerical_Label or WINDOW]; [up or down]
- Wait
- GoBack
- Google
- Download
- ANSWER; [content]

Key Guidelines You MUST follow:

* Action guidelines *
1) Execute only one action per iteration.
2) When clicking or typing, ensure to select the correct bounding box.
3) Numeric labels lie in the top-left corner of their corresponding bounding boxes and are colored the same.

* Web Browsing Guidelines *
1) Don't interact with useless web elements like Login, Sign-in, donation that appear in Webpages
2) Select strategically to minimize time wasted.
3) Take your time to analyse the webpage before making a command. Don't be hasty.
4) Check the previous actions all the time, DO NOT REPEAT the same action as before, instead think of another action or check if waiting is required.\n"
5) Check for loading screens on the page, if there is one, wait.
6) DON'T click on the arrow buttons to view.\n"

Your reply should strictly follow the format:

Thought: {{Your brief thoughts (briefly summarize the info that will help ANSWER)}}
Action: {{One Action format you choose}}
Then the User will provide:
Observation: {{A labeled screenshot Given by User}}'''


prompt = hub.pull("wfh/web-voyager:8b927604")
new_prompt = ChatPromptTemplate(
    input_variables=prompt.input_variables,
    messages=[updated_system_message] + prompt.messages[1:]
)

In [None]:
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4o", max_tokens=4096)

agent = annotate | RunnablePassthrough.assign(
    prediction=format_descriptions | new_prompt | llm | StrOutputParser() | parse # Creating a chain where the prediction is assigned by passing through the various functions
)

## Define graph

We've created most of the important logic. We have one more function to define that will help us update the graph state after a tool is called.

In [None]:
import re
from langchain_core.messages import SystemMessage


def update_scratchpad(state: AgentState) -> AgentState:
    """
    Updates the scratchpad with the latest observation and increments the step counter.

    Args:
        state: The current agent state containing the observation and scratchpad.

    Returns:
        The updated agent state with the modified scratchpad.
    """
    old_scratchpad = state.get("scratchpad", [])
    txt = old_scratchpad[0].content if old_scratchpad else "Previous action observations:\n"

    try:
        last_line = txt.rsplit("\n", 1)[-1]
        step = int(re.match(r"\d+", last_line).group()) + 1
    except (AttributeError, ValueError, IndexError):  # Handles cases where the pattern is not found or the scratchpad is empty
        print("Scratchpad empty or error extracting step from scratchpad. Starting from step 1.")
        step = 1

    txt += f"\n{step}. {state['observation']}"
    return {**state, "scratchpad": [SystemMessage(content=txt)]}  # Update the state with the modified scratchpad


In [None]:
from langchain_core.runnables import RunnableLambda
from langgraph.graph import END, StateGraph


def sub_graph_builder(AgentState, step : int = 0):
    """
    Builds a subgraph within a StateGraph to represent a part of the LangChain agent's workflow.

    This function takes an AgentState and constructs a graph where:
        - The agent is the entry point.
        - Each tool (click, type, etc.) is a node connected to an 'update_scratchpad' node.
        - A 'select_tool' function determines the next node based on the agent's prediction.

    Args:
        AgentState: The initial state for the subgraph.

    Returns:
        The compiled StateGraph representing the subgraph.
    """

    graph_builder = StateGraph(AgentState)

    graph_builder.add_node("agent", agent)
    graph_builder.set_entry_point("agent")

    graph_builder.add_node("update_scratchpad", update_scratchpad)
    graph_builder.add_edge("update_scratchpad", "agent")

    tools = {
        "Click": click,
        "Type": type_text,
        "Scroll": scroll,
        "Wait": wait,
        "GoBack": go_back,
        "Google": to_google,
        "Download": download_doc
    }

    for node_name, tool in tools.items():
        graph_builder.add_node(
            node_name,
            # The lambda ensures the function's string output is mapped to the "observation"
            # key in the AgentState
            RunnableLambda(tool) | (lambda observation: {"observation": observation}),
        )
        # Always return to the agent (by means of the update-scratchpad node)
        graph_builder.add_edge(node_name, "update_scratchpad")


    def select_tool(state: AgentState):
        # Any time the agent completes, this function
        # is called to route the output to a tool or
        # to the end user.
        action = state["prediction"]["action"]
        if action == "ANSWER":
            return END
        if action == "retry":
            return "agent"
        return action


    graph_builder.add_conditional_edges("agent", select_tool)

    graph = graph_builder.compile()
    return graph

# Creating the subgraphs
graph = sub_graph_builder(AgentState, 1)
# CCnR_downloader_graph = sub_graph_builder(AgentState)

In [None]:
from IPython.display import Image, display

#Uncomment this if you want to see the structure of the workflow
display(Image(graph.get_graph(xray=True).draw_mermaid_png()))

In [None]:
import logging
from IPython.display import Image, display
from playwright.async_api import async_playwright, Error as PlaywrightError
import pandas as pd
import io
import base64

async def call_agent(agent_input: str, name, max_steps: int = 150) -> str:
    """
    Executes a LangChain agent based on a graph and user input,
    displaying the process and returning the final answer.

    Args:
        agent_input: Initial input for the agent.
        graph: The graph representing the agent's workflow.
        max_steps: Maximum number of steps to allow.

    Returns:
        The final answer produced by the agent, or None if no answer is found.
    """

    df = pd.read_excel(excel_path, engine = 'openpyxl')
    url = (df.loc[df['Sub_division'] == name, 'County_recorder_website']).iloc[0]
    # Initialize variables
    final_answer = None
    steps = []
    async with async_playwright() as p:
        try:
            browser = await p.chromium.launch(headless=True, args=None)
            page = await browser.new_page()
            await page.goto(url)
            await asyncio.sleep(5)
        except PlaywrightError as e:
            logging.error(f"Error during browser setup: {e}")  # Log any Playwright errors
            return "Please try calling the function again."

        event_stream = graph.astream(
            {
                "page": page,
                "input": agent_input,
                "scratchpad": [],
            },
            {
                "recursion_limit": max_steps,
            },
        )

        async for event in event_stream:
            try:
                if "agent" not in event:
                    continue
                pred = event["agent"].get("prediction") or {}
                action = pred.get("action")
                action_input = pred.get("args")
                # display.clear_output(wait=False)
                steps.append(f"{len(steps) + 1}. {action}: {action_input}")
                logging.info(f"Step {len(steps)}: {action} - {action_input}")  # Log the step
                print("\n".join(steps))
                display.display(display.Image(base64.b64decode(event["agent"]["img"])))
            except Exception as e:
                logging.warning(f"Error displaying image: {e}")  # Log image display errors

            if "ANSWER" in action:
                final_answer = action_input[0]
                break  # Exit loop when ANSWER is found

        try:
            await browser.close()
        except PlaywrightError as e:
            logging.error(f"Error closing browser: {e}")  # Log browser closing errors

    if final_answer is None:
        logging.warning("Agent did not find an answer.")  # Log if no answer found

    return final_answer


In [None]:
#Creating a prompt to feed in instructions for the specific UI of the website
sp_prompt_pubrec6 = ("You're employed to retreive CCnR documents of a property. Your task here is to download "
                "the Convenants, Conditions and Restrictions for the input sub_division name. Here are the steps to follow for retreiving the same:\n"
                "1) STRICTLY type '(RES)' in the document type field\n"
                "2) Go to the name section. Then use the sub-division name provided by the human and enter it STRICTLY in the BUSINESS NAME type field. Make sure to give the correct field here you've been getting it wrong before\n"
                "3) SCROLL DOWN multiple times, DO NOT CLICK on SEARCH button\n"
                "4) SCROLL DOWN to Look for documents of the type Restriction and open them by clicking on the name provided.\n"
                "5) STRICTLY WAIT till you see a white paged document on the right side and STRICTLY DO NOT CLICK anything till the document loads.\n"
                "6) Wait till you can see a white paged document on the right side of the page.\n"
                "6) Check if the document is in fact convenants or declaration of condo and the name of the subdivision on the document matches.\n"
                "7) If and only IF the document is loaded must you give the command to Download the file.\n"
                "8) end the program and return completed once download completes\n"
                "Here's what you need to do: "
                  )

In [None]:
HOA_name = 'CONGO GROUP LLC.'
human = f"Please download the relevant Covenants, Conditions, and Restrictions document for the sub_division named: {HOA_name}"

res = await call_agent(sp_prompt_pubrec6 + human, HOA_name)

print(res)