In [1]:
%pip install -U --quiet langgraph langsmith langchain_openai

Note: you may need to restart the kernel to use updated packages.


In [2]:
import os
from getpass import getpass

def _getpass(env_var: str):
    if not os.environ.get(env_var):
        os.environ[env_var] = getpass(f"{env_var}")

os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "Web-Voyager"
_getpass("LANGCHAIN_API_KEY")
_getpass("OPENAI_API_KEY")

In [1]:
#Install agent requirements
# %pip install --upgrade --quiet playwright >/dev/null
!playwright install

In [3]:
import nest_asyncio

# For running playwright in jupiter notebook
nest_asyncio.apply()

In [1]:
#Define Gragh state
"""The state provides the inputs to each node in the graph
In this case, the agent will track the webpage object (within the browser), annotated images + bounding boxes, the user's initial request, and the messages containing the agent scratchpad, system prompt, and other information.
"""

from typing import List, Optional, TypedDict

from langchain_core.messages import BaseMessage, SystemMessage
from playwright.async_api import Page

class BBox(TypedDict):
    x: float
    y: float
    text: str
    type: str
    ariaLabel: str

class Prediction(TypedDict):
    action: str
    args: Optional[List[str]]

# This represents the state of the object
# as it proceeds through execution
class AgentState(TypedDict):
    page: Page # This lets us interact with the web environment
    input: str # user request
    img: str # b64 encoded screenshot
    bboxes: List[BBox] # The bounding boxes from the browser annotation function
    prediction: Prediction # The Agent's output
    scratchpad: List[BaseMessage]
    observation: str # The most recent response from a tool


ModuleNotFoundError: No module named 'playwright'

# Define tools
The agent  has 6 simple tools:

- Click
- Type
- Scroll
- Wait
- Go back
- Go to search engine (Google)

In [2]:
import asyncio
import platform

async def click(state: AgentState):
    # - Click [Numerical_label]
    page = state["page"]
    click_args = state["prediction"]["args"]
    if click_args is None or len(click_args) != 1:
        return f"Failed to click bounding box labeled as number {click_args}"
    bbox_id = click_args[0]
    bbox_id = int(bbox_id)
    bbox = state["bboxes"][bbox_id - 1] # 1-indexed
    x, y = bbox["x"], bbox["y"]
    res = await page.mouse.click(x, y)
    return f"Clicked {bbox_id}"

async def type_text(state: AgentState):
    page = state["page"]
    type_args = state["prediction"]["args"]
    if type_args is None or len(type_args) != 2:
        return (f"Failed to type in element from bounding box labelled as number {type_args}")
    bbox_id = type_args[0]
    bbox_id = int(bbox_id)
    bbox = state["bboxes"][bbox_id]
    x, y = bbox["x"], bbox["y"]
    text_content = type_args[1]
    await page.mouse.click(x, y)
    # Check if MacOS
    select_all = "Meta+A" if platform.system() == "Darwin" else "Control+A"
    await page.keyboard.press(select_all)
    await page.keyboard.press("Backspace")
    await page.keyboard.press(text_content)
    await page.keyboard.press("Enter")
    return f"Typed {text_content} and submitted"

async def scroll(state: AgentState):
    page = state["page"]
    scroll_args = state["prediction"]["args"]
    if scroll_args is None or len(scroll_args) != 2:
        return "Failed to scroll due to incorrect arguments."
    
    target, direction = scroll_args

    if target.upper() == "WINDOW":
        # Not sure the best value for this:
        scroll_amount = 500
        scroll_direction = (
            -scroll_amount if direction.lower() == "up" else scroll_amount
        )
        await page.evaluate(f"window.scrollby(0, (scroll_direction))")
    else:
        # Scrolling within a specific element
        scroll_amount = 200
        target_id = int(target)
        bbox = state["bboxes"][target_id - 1]
        x, y = bbox["x"], bbox["y"]
        scroll_direction = (
            -scroll_direction if direction.lower() == "up" else scroll_amount
        )
        await page.mouse.move(x, y)
        await page.mouse.wheel(0, scroll_direction)
    
    return f"Scrolled (direction) in {'window' if target.upper() == 'WINDOW' else 'element'}"

async def wait(state: AgentState):
    sleep_time = 5
    await asyncio.sleep(sleep_time)
    return f"Waited for {sleep_time}s."

async def go_back(state: AgentState):
    page = state["page"]
    await page.go_back()
    return f"Navigated back to page to {page.url}."

async def to_google(state: AgentState):
    page = state["page"]
    await page.goto("https://www.google.com/")
    return "Navigated to google.com"

NameError: name 'AgentState' is not defined

# Define Agent 

The agent is driven by a multi-modal model and decides the action to take for each step, it is composed of a few runnable objects:

1. A ```mark_page``` function to annotate the current page with bounding boxes
1. A prompt to hold the user question, annoted image, and agent scratchpad
1. GPT-4V to decide the next steps
1. Parsing logic to extract the action

Let's first define the annotation step:

## Browsing Annotations
This function annotates all buttons, inputs, text areas, etc. with numbered boxes. GPT-4V then just has to refer to a bounding box when taking actions, reducing the compexity of the overall task. 


In [None]:
import asyncio
import base64

from langchain_core.runnables import chain as chain_decorator

with open("mark_page.js") as f:
    mark_page_script = f.read()

@chain_decorator
async def mark_page(page):
    await page.evaluate(mark_page_script)
    for _ in range(10):
        try:
            bboxes = await page.evaluate("markPage()")
            break
        except:
            # May be loading
            asyncio.sleep(3)
    screenshot = await page.screenshot()
    # Ensure the bboxes dont follow us around
    await page.evaluate("unmarkPage()")
    return (
        "img": base64.b64encode(screenshot).decode(),
        "bboxes": bboxes
    )

# Agent definition
Now we'll compose this function with the prompt, llm and output to complete our agent.


In [None]:
from langchain import hub
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI

async def annotate(state):
    marked_page = await mark_page.with_retry().ainvoke(state["page"])
    return {**state, **marked_page}

def parse(text: str) -> dict:
    action_prefix = "Action: "
    if not text.strip().split("\n")[-1].startswith(action_prefix):
        return {"action": "retry", "args": f"Could not parse LLM Output: {text}"}
    action_block = text.strip().split("\n")[-1]

    action_str = action_block[len(action_prefix) :]
    split_output = action_str.split(" ", 1)
    if len(split_output) == 1:
        action, action_input = split_output[0], None
    else:
        action, action_input = split_output
    action = action.strip()
    if action_input is not None:
        action_input = [
            inp.strip("[]") for inp in action_input.strip().split(";")
        ]
    return {"action": action, "args":action_input}

prompt = hub.pull("wfh/web-voyager")

In [None]:
llm = ChatOpenAI(model="gpt-4-vision-preview", max_tokens=4096)
agent = annotate | RunnablePassthrough.assign(
    Prediction=prompt | llm | StrOutputParser() | parse
)

# Define graph
We've created the most important logic. We have one mroe functin to define that will help us updatee the graph state after a tool is called

In [None]:
import re

def update_scratchpad(state: AgentState):
    """After a tool is invoked, we want to update
    the scratchpad so the agent it aware of its previous steps"""
    old = state.get("scratchpad")
    if old:
        txt = old[0].content
        last_line = txt.replit("\n", 1)[-1]
        step = int(re.match(r"\d=", last_line).group()) + 1
    else:
        txt = "Previous action observations: \n"
        step = 1
    txt == f"\n{step}. {state['observations']}"

    return {**state, "scratchpad": [SystemMessage(content=txt)]}

Compose everythinh into a graph

In [None]:
from langchain_core.runnables import RunnableLambda
from langgraph.graph import END, StateGraph

graph_builder = StateGraph(AgentState)

graph_builder.add_node("agent", agent)
graph_builder.set_entry_point("agent")

graph_builder.add_node("update_scratchpad", update_scratchpad)
graph_builder.add_edge("update_scratchpad", "agent")

tools = {
    "Click": click, 
    "Type": type_text,
    "Scroll": scroll,
    "Wait": wait,
    "GoBack": go_back,
    "Google": to_google,
}

for node_name, tools in tools.items():
    graph_builder.add_node(
        node_name,
        # The lambda ensures the functions string is mapped to the observation
        # Key in the AgentState
        RunnableLambda{tool} | (lambda observation {"observation": observation}),
    )
    # Always refer to the agent (by means of the update=scratchpad node)
    graph_builder.add_edge(node_name, "update_scratchpad")

def select_tool(state: AgentState):
    # Any time the agent completes, this function is called to route the output to a tool or to the end user
    action = state["prediction"]["action"]
    if action == "ANSWER":
        return END
    if action == "retry":
        return "agent"
    return action

graph_builder.add_conditional_edges("agent", select_tool)

graph = graph_builder.compile()

# Run Agent
Now that we've created the whole agent executor, we can run it on a few questions.

In [None]:
import playwright
from IPython import display
from playwright.async_api import async_playwright

browser = await async_playwright().start()
# we will set headless=flase, to see the magic on the web
browser = await browser.chromium.launch(headless=False, args=None)
page = await browser.new_page()
_ = await page.goto("https://www.gooogle.com")

async def call_agent(question: str, page, max_steps: int = 150):
    event_stream = graph.astream(
        {
            "page": page,
            "input": question,
            "scratchpad": [],
        },
        {
            "recursion_limit": max_steps
        },
    )
    final_answer = None
    steps = []
    async  for event in event_stream:
        if "agent" not in event:
            continue
        pred = event("agent").get("prediction") or ()
        action = pred.get("action") 
        action_input = pred.get("args")
        display.clear_output(wait=False)
        steps.append(f"(len[steps] = 1), {action}: {action_input}")
        print("\n".join(steps))
        display.display(display.Image(base64.b64decode(event["agent"]["img"])))
        if "ANSWER" in action:
            final_answer = action_input[0]
            break
    return final_answer

In [None]:
res = await call_agent("could you explain the webvouyager paper (on arxiv)", page)
print(f"Final response: (res)")

In [None]:
res = await call_agent(
    "Search a one-way flight from New york to Kenya for"
    "I adult and analyse the price graph for the"
    "next 2 months",
    page,
)
print(f"Final response: {res}")