In [None]:
%pip install -U --quiet langgraph langsmith langchain_anthropic dotenv langchain-classic

In [None]:
from dotenv import load_dotenv

load_dotenv()

In [None]:
%pip install --upgrade --quiet  playwright > /dev/null
!playwright install

In [None]:
from IPython import display
from playwright.async_api import async_playwright

playwright = await async_playwright().start()
browser = await playwright.chromium.connect_over_cdp("ws://127.0.0.1:9222/devtools/browser/3084eb6d-4aeb-450c-94b7-80d108d7c47b")
context = browser.contexts[0]
page = context.pages[0] if context.pages else await context.new_page()
await page.goto("https://www.google.com") 

In [None]:
from typing import List, Literal, Optional
from typing_extensions import TypedDict

from langchain_core.messages import BaseMessage, SystemMessage
from playwright.async_api import Page


class BBox(TypedDict):
    x: float
    y: float
    text: str
    type: str
    ariaLabel: str

options = ["Click", "Type", "Scroll", "Wait", "GoBack", "Google", "ANSWER", "retry"]
class Prediction(TypedDict):
    action: Literal[*options]
    args: Optional[List[str]]

# This represents the state of the agent
# as it proceeds through execution
class AgentState(TypedDict):
    page: Page  # The Playwright web page lets us interact with the web environment
    input: str  # User request
    img: str  # b64 encoded screenshot
    bboxes: List[BBox]  # The bounding boxes from the browser annotation function
    bbox_descriptions: str
    prediction: Prediction  # The Agent's output
    # A system message (or messages) containing the intermediate steps
    scratchpad: List[BaseMessage]
    observation: str  # The most recent response from a tool

In [None]:
import asyncio
import base64

from langchain_core.runnables import chain as chain_decorator

# Some javascript we will run on each step
# to take a screenshot of the page, select the
# elements to annotate, and add bounding boxes
with open("mark_page.js") as f:
    mark_page_script = f.read()


async def mark_page(state: AgentState) -> AgentState:
    page = state["page"]
    
    # 페이지가 안정화될 때까지 기다림 (네비게이션 완료 대기)
    max_retries = 5
    for attempt in range(max_retries):
        try:
            # DOM이 로드되고 네트워크가 안정화될 때까지 대기
            await page.wait_for_load_state("domcontentloaded", timeout=3000)
            await asyncio.sleep(0.3)  # 추가 안정화 시간
            await page.wait_for_load_state("networkidle", timeout=3000)
            break
        except Exception:
            if attempt < max_retries - 1:
                await asyncio.sleep(0.5)
            else:
                # 마지막 시도에서도 실패하면 계속 진행
                pass
    
    # mark_page_script 평가 (재시도 로직 포함)
    for attempt in range(10):
        try:
            await page.evaluate(mark_page_script)
            break
        except Exception as e:
            if "Execution context was destroyed" in str(e) or "Target closed" in str(e):
                # 네비게이션 중이므로 다시 대기
                await asyncio.sleep(0.5)
                try:
                    await page.wait_for_load_state("domcontentloaded", timeout=2000)
                except Exception:
                    pass
            else:
                # 다른 에러는 재시도
                await asyncio.sleep(0.3)
            if attempt == 9:
                # 마지막 시도에서도 실패하면 빈 bboxes 반환
                return {
                    **state,
                    "img": base64.b64encode(await page.screenshot()).decode(),
                    "bboxes": [],
                }
    
    # markPage() 호출 (재시도 로직 포함)
    bboxes = []
    for attempt in range(10):
        try:
            bboxes = await page.evaluate("markPage()")
            if bboxes:
                break
        except Exception as e:
            if "Execution context was destroyed" in str(e) or "Target closed" in str(e):
                # 네비게이션 중이므로 다시 대기
                await asyncio.sleep(0.5)
                try:
                    await page.wait_for_load_state("domcontentloaded", timeout=2000)
                except Exception:
                    pass
            else:
                await asyncio.sleep(0.3)
    
    # 스크린샷 촬영
    screenshot = await page.screenshot()
    
    # unmarkPage() 호출 (에러 무시)
    try:
        await page.evaluate("unmarkPage()")
    except Exception:
        # 이미 페이지가 네비게이션되었거나 컨텍스트가 파괴되었을 수 있음
        pass
    
    return {
        **state,
        "img": base64.b64encode(screenshot).decode(),
        "bboxes": bboxes if bboxes else [],
    }

In [None]:
def format_descriptions(state: AgentState) -> AgentState:
    labels = []

    for i, bbox in enumerate(state["bboxes"]):
        text = bbox.get("ariaLabel") or ""

        if not text.strip():
            text = bbox["text"]

        el_type = bbox.get("type")
        labels.append(f'{i} (<{el_type}/>): "{text}"')

    bbox_descriptions = "\nValid Bounding Boxes:\n" + "\n".join(labels)
    return {**state, "bbox_descriptions": bbox_descriptions}

In [None]:
from langchain_anthropic import ChatAnthropic
from langchain_classic import hub
from langchain_core.prompts import ChatPromptTemplate

prompt = hub.pull("wfh/web-voyager")

llm = ChatAnthropic(model="claude-3-7-sonnet-latest").with_structured_output(Prediction)

def agent(state: AgentState) -> AgentState:
    prediction = prompt | llm
    return {
        **state,
        "prediction": prediction.invoke(state)
    }

In [None]:
import asyncio
import platform


async def click(state: AgentState) -> AgentState:
    # - Click [Numerical_Label]
    page = state["page"]
    click_args = state["prediction"]["args"]
    if click_args is None or len(click_args) != 1:
        return {
            **state,
            "observation": f"Failed to click bounding box labeled as number {click_args}"
        }

    bbox_id = click_args[0]
    bbox_id = int(bbox_id)
    try:
        bbox = state["bboxes"][bbox_id]
    except Exception:
        return {
            **state,
            "observation": f"Error: no bbox for : {bbox_id}"
        }

    x, y = bbox["x"], bbox["y"]
    await page.mouse.click(x, y)
    return {
        **state,
        "observation": f"Clicked {bbox_id}"
    }


async def type_text(state: AgentState):
    page = state["page"]
    type_args = state["prediction"]["args"]

    if type_args is None or len(type_args) == 1:
        type_args = type_args[0].split(";")

    if type_args is None or len(type_args) != 2:
        return {
            **state,
            "observation": f"Failed to type in element from bounding box labeled as number {type_args}"
        }

    bbox_id = type_args[0]
    bbox_id = int(bbox_id)
    bbox = state["bboxes"][bbox_id]
    x, y = bbox["x"], bbox["y"]
    text_content = type_args[1]
    await page.mouse.click(x, y)
    # Check if MacOS
    select_all = "Meta+A" if platform.system() == "Darwin" else "Control+A"
    await page.keyboard.press(select_all)
    await page.keyboard.press("Backspace")
    await page.keyboard.type(text_content)
    await page.keyboard.press("Enter")
    return {
        **state,
        "observation": f"Typed {text_content} and submitted"
    }

async def scroll(state: AgentState):
    page = state["page"]
    scroll_args = state["prediction"]["args"]

    if scroll_args is None or len(scroll_args) == 1:
        scroll_args = scroll_args[0].split(";")

    if scroll_args is None or len(scroll_args) != 2:
        return {
            **state,
            "observation": "Failed to scroll due to incorrect arguments."
        }

    target, direction = scroll_args

    if target.upper() == "WINDOW":
        # Not sure the best value for this:
        scroll_amount = 500
        scroll_direction = (
            -scroll_amount if direction.lower() == "up" else scroll_amount
        )
        await page.evaluate(f"window.scrollBy(0, {scroll_direction})")
    else:
        # Scrolling within a specific element
        scroll_amount = 200
        target_id = int(target)
        bbox = state["bboxes"][target_id]
        x, y = bbox["x"], bbox["y"]
        scroll_direction = (
            -scroll_amount if direction.lower() == "up" else scroll_amount
        )
        await page.mouse.move(x, y)
        await page.mouse.wheel(0, scroll_direction)

    return {
        **state,
        "observation": f"Scrolled {direction} in {'window' if target.upper() == 'WINDOW' else 'element'}"
    }


async def wait(state: AgentState):
    sleep_time = 5
    await asyncio.sleep(sleep_time)
    return {
        **state,
        "observation": f"Waited for {sleep_time}s."
    }


async def go_back(state: AgentState):
    page = state["page"]
    await page.go_back()
    return {
        **state,
        "observation": f"Navigated back a page to {page.url}."
    }


async def to_google(state: AgentState):
    page = state["page"]
    await page.goto("https://www.google.com/")
    return {
        **state,
        "observation": "Navigated to google.com."
    }

In [None]:
def select_tool(state: AgentState):
    action = state["prediction"]["action"]
    return action

In [None]:
import re


def update_scratchpad(state: AgentState):
    """After a tool is invoked, we want to update
    the scratchpad so the agent is aware of its previous steps"""
    old = state.get("scratchpad")
    if old:
        txt = old[0].content
        last_line = txt.rsplit("\n", 1)[-1]
        step = int(re.match(r"\d+", last_line).group()) + 1
    else:
        txt = "Previous action observations:\n"
        step = 1
    txt += f"\n{step}. {state['observation']}"

    return {**state, "scratchpad": [SystemMessage(content=txt)]}

In [None]:
from langchain_core.runnables import RunnableLambda

from langgraph.graph import END, START, StateGraph

graph_builder = StateGraph(AgentState)

# add node step
graph_builder.add_node("mark_page", RunnableLambda(mark_page))
graph_builder.add_node("format_descriptions", format_descriptions)
graph_builder.add_node("agent", agent)
graph_builder.add_node("click", click)
graph_builder.add_node("type_text", type_text)
graph_builder.add_node("scroll", scroll)
graph_builder.add_node("wait", wait)
graph_builder.add_node("go_back", go_back)
graph_builder.add_node("to_google", to_google)
graph_builder.add_node("update_scratchpad", update_scratchpad)

# add edge step
graph_builder.add_edge(START, "mark_page")
graph_builder.add_edge("mark_page", "format_descriptions")
graph_builder.add_edge("format_descriptions", "agent")
graph_builder.add_conditional_edges(
    "agent", 
    select_tool,
    {
        "ANSWER": END,
        "retry": "mark_page",
        "Type": "type_text",
        "Scroll": "scroll",
        "Wait": "wait",
        "GoBack": "go_back",
        "Google": "to_google",
        "Click": "click",
    }
)
graph_builder.add_edge("type_text", "update_scratchpad")
graph_builder.add_edge("scroll", "update_scratchpad")
graph_builder.add_edge("wait", "update_scratchpad")
graph_builder.add_edge("go_back", "update_scratchpad")
graph_builder.add_edge("to_google", "update_scratchpad")
graph_builder.add_edge("update_scratchpad", "mark_page")

graph = graph_builder.compile()
graph

In [None]:
async for event in graph.astream({
    "input": "What is the latest Elon Musk tweet?",
    "page": page,
    "scratchpad": [],
    }):

    if "mark_page" in event:
        display.clear_output(wait=False)
        display.display(display.Image(base64.b64decode(event["mark_page"]["img"])))

    if "agent" in event:
        print(event["agent"]["prediction"])