### Planning Single-Task Language Agents in a Closed Loop

Welcome to the first session notebook of the **Math & AI Institute AI Talks** series.
In this session, we will dive into the topic **"Planning single-task language agents in a closed loop".**

**Author:** Doğukan Uraz Tuna (@dtunai)

**Event Time:** 28 May, 2024 20:00 GMT+3

#### Requirements

In [None]:
%%capture --no-stderr
%pip install --quiet -U langchain langchain-community langchain-experimental langgraph langchain-openai tavily-python langchainhub duckduckgo_search

#### API Setup

In [None]:
import os

os.environ["TAVILY_API_KEY"] = ""
os.environ["OPENAI_API_KEY"] = ""
# os.environ["LANGCHAIN_API_KEY"] = "A"
# os.environ["LANGCHAIN_TRACING_V2"] = "true"
# os.environ["LANGCHAIN_PROJECT"] = "Plan-and-execute"

#### Tool Setup

In [None]:
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain.tools import DuckDuckGoSearchResults, DuckDuckGoSearchRun
from langchain.agents import (
    Tool,
    AgentExecutor,
    LLMSingleActionAgent,
    AgentOutputParser,
)

tools = [DuckDuckGoSearchResults(max_results=10)]

#### Hub & Agent Executor Setup

In [None]:
from langchain import hub
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent

prompt = hub.pull("wfh/react-agent-executor")
prompt.pretty_print()

llm = ChatOpenAI(model="gpt-3.5-turbo")
agent_executor = create_react_agent(llm, tools, messages_modifier=prompt)

#### Example

In [None]:
agent_executor.invoke({"messages": [("user", "Is covid vaccine safe?")]})

#### Modelling Plan-and-Execute Agent

In [None]:
from typing import List, Tuple, Annotated, TypedDict
import operator


class PlanExecute(TypedDict):
    input: str
    plan: List[str]
    past_steps: Annotated[List[Tuple], operator.add]
    response: str

In [None]:
from langchain_core.pydantic_v1 import BaseModel, Field


class Plan(BaseModel):
    """Plan to follow in future"""

    steps: List[str] = Field(
        description="different steps to follow, should be in sorted order"
    )

#### Planner Prompt Design (Example)

In [None]:
from langchain_core.prompts import ChatPromptTemplate

planner_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            """For the given objective, come up with a simple step by step plan. \
This plan should involve individual tasks, that if executed correctly will yield the correct answer. Do not add any superfluous steps. \
The result of the final step should be the final answer. Make sure that each step has all the information needed - do not skip steps.""",
        ),
        ("placeholder", "{messages}"),
    ]
)
planner = planner_prompt | ChatOpenAI(
    model="gpt-3.5-turbo", temperature=0
).with_structured_output(Plan)

In [None]:
print(planner_prompt)

In [None]:
planner.invoke(
    {
        "messages": [
            ("user", "what is the hometown of the current Australia open winner?")
        ]
    }
)

#### Replanning a real-world use-case agent

In [None]:
from typing import Union


class Response(BaseModel):
    """Response to user."""

    response: str


class Act(BaseModel):
    """Action to perform."""

    action: Union[Response, Plan] = Field(
        description="Action to perform. If you want to respond to user, use Response. "
        "If you need to further use tools to get the answer, use Plan."
    )


replanner_prompt = ChatPromptTemplate.from_template(
    """For the given objective, come up with a simple step by step plan. \
This plan should involve individual tasks, that if executed correctly will yield the correct answer. Do not add any superfluous steps. \
The result of the final step should be the final answer. Make sure that each step has all the information needed - do not skip steps.

Your objective was this:
{input}

Your original plan was this:
{plan}

You have currently done the follow steps:
{past_steps}

Update your plan accordingly. If no more steps are needed and you can return to the user, then respond with that. Otherwise, fill out the plan. Only add steps to the plan that still NEED to be done. Do not return previously done steps as part of the plan."""
)


replanner = replanner_prompt | ChatOpenAI(
    model="gpt-3.5-turbo", temperature=0
).with_structured_output(Act)

In [None]:
from typing import Literal


async def execute_step(state: PlanExecute):
    plan = state["plan"]
    plan_str = "\n".join(f"{i+1}. {step}" for i, step in enumerate(plan))
    task = plan[0]
    task_formatted = f"""For the following plan:
{plan_str}\n\nYou are tasked with executing step {1}, {task}."""
    agent_response = await agent_executor.ainvoke(
        {"messages": [("user", task_formatted)]}
    )
    return {
        "past_steps": (task, agent_response["messages"][-1].content),
    }


async def plan_step(state: PlanExecute):
    plan = await planner.ainvoke({"messages": [("user", state["input"])]})
    return {"plan": plan.steps}


async def replan_step(state: PlanExecute):
    output = await replanner.ainvoke(state)
    if isinstance(output.action, Response):
        return {"response": output.action.response}
    else:
        return {"plan": output.action.steps}


def should_end(state: PlanExecute) -> Literal["agent", "__end__"]:
    if "response" in state and state["response"]:
        return "__end__"
    else:
        return "agent"

In [None]:
from langgraph.graph import StateGraph

workflow = StateGraph(PlanExecute)
workflow.add_node("planner", plan_step)
workflow.add_node("agent", execute_step)
workflow.add_node("replan", replan_step)
workflow.set_entry_point("planner")
workflow.add_edge("planner", "agent")
workflow.add_edge("agent", "replan")

workflow.add_conditional_edges(
    "replan",
    should_end,
)
app = workflow.compile()

In [None]:
from IPython.display import Image, display

display(Image(app.get_graph(xray=True).draw_mermaid_png()))

In [None]:
config = {
    "recursion_limit": 50,
}

inputs = {
    "input": f"Provide investment recommendations for the next quarter based on current market trends and financial news. Recommend stock symbols.\n\
            Risk Tolerance: Medium\n\
            Investment Horizon: 3 Months\n\
            Sector Preferences: Technology, Renewable Energy\n"
}

response_dict = {}
past_steps_list = []

async for event in app.astream(inputs, config=config):
    for k, v in event.items():
        if k != "__end__":
            print(v)