## 🦜🔗 Überwachte Agenten

Manchmal möchte man, dass ein Agent etwas tun darf, aber nur nach Rückfrage. Typisch Anwendungsfälle sind z.B. Email-Versand oder der Zugriff auf das Betriebssystem.

In [None]:
from langchain import hub
from langchain.agents import create_openai_functions_agent
from langchain.tools.shell import ShellTool
from langgraph.prebuilt.tool_executor import ToolExecutor
from helpers import llm

tools = [ShellTool()]
prompt = hub.pull("reactagent/openai-functions-agent")
agent_runnable = create_openai_functions_agent(llm(temperature=0), tools, prompt)
tool_executor = ToolExecutor(tools)

In [None]:
from typing import TypedDict, Annotated, Union
from langchain_core.agents import AgentAction, AgentFinish
import operator

class AgentState(TypedDict):
    input: str
    agent_outcome: Union[AgentAction, AgentFinish, None]
    intermediate_steps: Annotated[list[tuple[AgentAction, str]], operator.add]

In [None]:
from langchain_core.agents import AgentFinish
from langchain_core.agents import AgentActionMessageLog

def run_agent(data):
    agent_outcome = agent_runnable.invoke(data)
    return {"agent_outcome": agent_outcome}

def execute_tools(data):
    agent_action :AgentActionMessageLog = data["agent_outcome"]
    if agent_action.tool == 'terminal':
        response = input(prompt=f"[y/n] continue with shell execution: {agent_action.tool_input}?")
        if response == "y":
            output = tool_executor.invoke(agent_action)
        else:
            output = "This specific terminal command not permitted by user. Try a different terminal command or return unfinished."
    return {"intermediate_steps": [(agent_action, str(output))]}

def should_continue(data):
    if isinstance(data["agent_outcome"], AgentFinish):
        return "end"
    else:
        return "continue"

In [None]:
from langgraph.graph import END, StateGraph

workflow = StateGraph(AgentState)

workflow.add_node("agent", run_agent)
workflow.add_node("action", execute_tools)
workflow.set_entry_point("agent")
workflow.add_conditional_edges(
    "agent",
    should_continue,
    {
        "continue": "action",
        "end": END,
    },
)
workflow.add_edge("action", "agent")

agent_executor = workflow.compile()


In [None]:

# Wir definieren eine kleine Funktion, die die Ausgabe formatiert.
# Diese Arbeit muss man üblicherweise selbst tun, weil LangChain ja nicht weiß, welches Format man am Ende braucht.
def formatted_output(data):
    return data.get("agent_outcome").return_values.get("output")
app = agent_executor | formatted_output


In [None]:

inputs={"input": "Count the lines of all python notebooks in the current directory. Use simple shell commands."}
output = app.invoke(inputs)
print(f"Agent result: {output}")


In [None]:
inputs={"input": "Send a mail with the subject 'donald trump hat doofe ohren' to donald@trump.com."}
output = app.invoke(inputs)
print(f"Agent result: {output}")

## Das hat funktioniert

Allerdings muss nun der Agent den State und alle Objekte rundherum so lange im Memory behalten, bis ein Nutzer endlich die Rückfrage beantwortet. Geht das besser?

Ja, mit Checkpoints.

In [None]:
def unsafe_execute_tools(data):
    agent_action : AgentActionMessageLog = data["agent_outcome"]
    output = tool_executor.invoke(agent_action)
    return {"intermediate_steps": [(agent_action, str(output))]}

In [None]:
from langgraph.checkpoint.sqlite import SqliteSaver
memory = SqliteSaver.from_conn_string(":memory:")

workflow = StateGraph(AgentState)

workflow.add_node("agent", run_agent)
workflow.add_node("action", unsafe_execute_tools)
workflow.set_entry_point("agent")
workflow.add_conditional_edges(
    "agent",
    should_continue,
    {
        "continue": "action",
        "end": END,
    },
)
workflow.add_edge("action", "agent")

checkpoint_agent_executor = workflow.compile(checkpointer=memory, interrupt_before=["action"])

In [None]:
inputs={"input": "Count the lines of all python notebooks in the current directory. Use simple shell commands."}
for event in checkpoint_agent_executor.stream(inputs, {"configurable": {"thread_id": "2"}}):
    for k, v in event.items():
        if k != "__end__":
            print(v)

In [None]:
inputs={"input": "Count the lines of all python notebooks in the current directory. Use simple shell commands."}
for event in checkpoint_agent_executor.stream(inputs, {"configurable": {"thread_id": "2"}}):
    for k, v in event.items():
        if k != "__end__":
            print(v)

In [None]:
for event in checkpoint_agent_executor.stream(None, {"configurable": {"thread_id": "2"}}):
    for k, v in event.items():
        if k != "__end__":
            print(v)