# Building an Agent around a Query Pipeline

In this cookbook we show you how to build an agent around a query pipeline.

Agents offer the ability to do complex, sequential reasoning on top of any query DAG that you have setup. Conceptually this is also one of the ways you can add a "loop" to the graph.

We show you two examples of agents you can implement:
- a full ReAct agent that can do tool picking
- a "simple" agent that adds a retry layer around a text-to-sql query engine.

## Setup Data

We use the chinook database as sample data. [Source](https://www.sqlitetutorial.net/sqlite-sample-database/).

In [1]:
!curl "https://www.sqlitetutorial.net/wp-content/uploads/2018/03/chinook.zip" -O ./chinook.zip
!unzip ./chinook.zip

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  298k  100  298k    0     0  1035k      0 --:--:-- --:--:-- --:--:-- 1036k
Archive:  ./chinook.zip
  inflating: chinook.db              


In [1]:
from llama_index import SQLDatabase
from sqlalchemy import (
    create_engine,
    MetaData,
    Table,
    Column,
    String,
    Integer,
    select,
    column,
)

engine = create_engine("sqlite:///chinook.db")
sql_database = SQLDatabase(engine)

In [2]:
from llama_index.query_pipeline import QueryPipeline

### Setup Observability

We setup Arize Phoenix for observability.

In [3]:
# setup Arize Phoenix for logging/observability
import phoenix as px
import llama_index

px.launch_app()
llama_index.set_global_handler("arize_phoenix")

🌍 To view the Phoenix app in your browser, visit http://0.0.0.0:6006/
📺 To view the Phoenix app in a notebook, run `px.active_session().view()`
📖 For more information on how to use Phoenix, check out https://docs.arize.com/phoenix


## Setup Text-to-SQL Query Engine / Tool

Now we setup a simple text-to-SQL tool: given a query, translate text to SQL, execute against database, and get back a result. 

In [4]:
from llama_index.query_engine import NLSQLTableQueryEngine
from llama_index.tools.query_engine import QueryEngineTool

sql_query_engine = NLSQLTableQueryEngine(
    sql_database='chinook.db',
    tables=["albums", "tracks", "artists"],
    verbose=True,
)
sql_tool = QueryEngineTool.from_defaults(
    query_engine=sql_query_engine,
    name="sql_tool",
    description=(
        "Useful for translating a natural language query into a SQL query"
    ),
)

## Setup ReAct Agent Pipeline

We now setup a ReAct pipeline for a single step using our Query Pipeline syntax. This is a multi-part process that does the following:
1. Takes in agent inputs
2. Calls ReAct prompt using LLM to generate next action/tool (or returns a response).
3. If tool/action is selected, call tool pipeline to execute tool + collect response.
4. If response is generated, get response.

Throughout this we'll use a variety of agent-specific query components. Unlike normal query pipelines, these are specifically designed for query pipelines that are used in a `QueryPipelineAgentWorker`:
- An `AgentInputComponent` that allows you to convert the agent inputs (Task, state dictionary) into a set of inputs for the query pipeline.
- An `AgentFnComponent`: a general processor that allows you to take in the current Task, state, as well as any arbitrary inputs, and returns an output. In this cookbook we define a function component to format the ReAct prompt. However, you can put this anywhere.
- An `CustomAgentComponent`: similar to `AgentFnComponent`, you can implement `_run_component` to define your own logic, with access to Task and state. It is more verbose but more flexible than `AgentFnComponent` (e.g. you can define init variables, and callbacks are in the base class).

Note that any function passed into `AgentFnComponent` and `AgentInputComponent` MUST include `task` and `state` as input variables, as these are inputs passed from the agent. 

Note that the output of an agentic query pipeline MUST be `Tuple[AgentChatResponse, bool]`. You'll see this below.

### Define Agent Input Component

Here we define the agent input component, called at the beginning of every agent step. Besides passing along the input, we also do initialization/state modification.

In [5]:
from llama_index.agent.react.types import (
    ActionReasoningStep,
    ObservationReasoningStep,
    ResponseReasoningStep,
)
from llama_index.agent import Task, AgentChatResponse
from llama_index.query_pipeline import (
    AgentInputComponent,
    AgentFnComponent,
    CustomAgentComponent,
    ToolRunnerComponent,
    QueryComponent,
)
from llama_index.llms import MessageRole
from typing import Dict, Any, Optional, Tuple, List, cast


## Agent Input Component
## This is the component that produces agent inputs to the rest of the components
## Can also put initialization logic here.
def agent_input_fn(task: Task, state: Dict[str, Any]) -> Dict[str, Any]:
    """代理输入函数。

    返回:
        一个包含输出键和值的字典。如果你在定义这个组件与其他组件之间的链接时指定了src_key，
        确保src_key与指定的output_key匹配。

    """
    # 如果state中没有"current_reasoning"，则初始化它
    if "current_reasoning" not in state:
        state["current_reasoning"] = []
    # 创建一个基于当前任务输入的推理步骤
    reasoning_step = ObservationReasoningStep(observation=task.input)
    # 将新的推理步骤添加到state的"current_reasoning"列表中
    state["current_reasoning"].append(reasoning_step)
    # 返回一个包含任务输入的字典
    return {"input": task.input}



agent_input_component = AgentInputComponent(fn=agent_input_fn)

### Define Agent Prompt

Here we define the agent component that generates a ReAct prompt, and after the output is generated from the LLM, parses into a structured object.

In [6]:
from llama_index.agent.react.formatter import ReActChatFormatter
from llama_index.query_pipeline import InputComponent, Link
from llama_index.llms import ChatMessage
from llama_index.tools import BaseTool


## define prompt function
def react_prompt_fn(
    task: Task, state: Dict[str, Any], input: str, tools: List[BaseTool]
) -> List[ChatMessage]:
    """
    根据当前任务、状态、输入和工具生成响应的聊天消息列表。

    参数:
    task -- 当前的任务对象。
    state -- 包含当前状态信息的字典。
    input -- 用户的输入字符串。
    tools -- 可用工具对象的列表。

    返回:
    生成的聊天消息列表。
    """
    # 将用户输入添加到推理过程中
    chat_formatter = ReActChatFormatter()
    # 使用chat_formatter工具来格式化聊天消息
    return chat_formatter.format(
        tools,
        chat_history=task.memory.get() + state["memory"].get_all(),  # 获取任务记忆和状态记忆中的所有信息
        current_reasoning=state["current_reasoning"],  # 获取当前的推理信息
    )



react_prompt_component = AgentFnComponent(
    fn=react_prompt_fn, partial_dict={"tools": [sql_tool]}
)

### Define Agent Output Parser + Tool Pipeline

Once the LLM gives an output, we have a decision tree:
1. If an answer is given, then we're done. Process the output
2. If an action is given, we need to execute the specified tool with the specified args, and then process the output.

Tool calling can be done via the `ToolRunnerComponent` module. This is a standalone module that takes in a list of tools, and can be "executed" with the specified tool name (every tool has a name) and tool action.

We implement this overall module `OutputAgentComponent` that subclasses `CustomAgentComponent`.

Note: we also implement `sub_query_components` to pass through higher-level callback managers to the tool runner submodule.

In [7]:
def finalize_fn(
    task: Task,
    state: Dict[str, Any],
    reasoning_step: Any,
    is_done: bool = False,
    tool_output: Optional[Any] = None,
) -> Tuple[AgentChatResponse, bool]:
    """
    完成函数。

    在这里，我们取最新的推理步骤和一个工具输出（如果提供的话），
    并返回代理输出（并决定代理是否完成）。

    该函数返回一个`AgentChatResponse`和`is_done`元组。并且是
    传递给`QueryPipelineAgentWorker`的任何查询管道的最后一个组件。
    这是任何查询管道预期的返回类型。
    """
    current_reasoning = state["current_reasoning"]  # 从状态字典中获取当前推理步骤列表
    current_reasoning.append(reasoning_step)  # 将最新的推


In [8]:
from typing import Set, Optional
from llama_index.agent.react.output_parser import ReActOutputParser


## Agent Output Component
## Process reasoning step/tool outputs, and return agent response
def finalize_fn(
    task: Task,
    state: Dict[str, Any],
    reasoning_step: Any,
    is_done: bool = False,
    tool_output: Optional[Any] = None,
) -> Tuple[AgentChatResponse, bool]:
    """Finalize function.

    Here we take the latest reasoning step, and a tool output (if provided),
    and return the agent output (and decide if agent is done).

    This function returns an `AgentChatResponse` and `is_done` tuple. and
    is the last component of the query pipeline. This is the expected
    return type for any query pipeline passed to `QueryPipelineAgentWorker`.

    """
    current_reasoning = state["current_reasoning"]
    current_reasoning.append(reasoning_step)
    # if tool_output is not None, add to current reasoning
    if tool_output is not None:
        observation_step = ObservationReasoningStep(
            observation=str(tool_output)
        )
        current_reasoning.append(observation_step)
    if isinstance(current_reasoning[-1], ResponseReasoningStep):
        response_step = cast(ResponseReasoningStep, current_reasoning[-1])
        response_str = response_step.response
    else:
        response_str = current_reasoning[-1].get_content()

    # if is_done, add to memory
    # NOTE: memory is a reserved keyword in `state`, but you can add your own too
    if is_done:
        state["memory"].put(
            ChatMessage(content=task.input, role=MessageRole.USER)
        )
        state["memory"].put(
            ChatMessage(content=response_str, role=MessageRole.ASSISTANT)
        )

    return AgentChatResponse(response=response_str), is_done


class OutputAgentComponent(CustomAgentComponent):
    """Output agent component."""

    tool_runner_component: ToolRunnerComponent
    output_parser: ReActOutputParser

    def __init__(self, tools, **kwargs):
        tool_runner_component = ToolRunnerComponent(tools)
        super().__init__(
            tool_runner_component=tool_runner_component,
            output_parser=ReActOutputParser(),
            **kwargs
        )

    def _run_component(self, **kwargs: Any) -> Any:
        """Run component."""
        chat_response = kwargs["chat_response"]
        task = kwargs["task"]
        state = kwargs["state"]
        reasoning_step = self.output_parser.parse(
            chat_response.message.content
        )
        if reasoning_step.is_done:
            return {
                "output": finalize_fn(
                    task, state, reasoning_step, is_done=True
                )
            }
        else:
            tool_output = self.tool_runner_component.run_component(
                tool_name=reasoning_step.action,
                tool_input=reasoning_step.action_input,
            )
            return {
                "output": finalize_fn(
                    task,
                    state,
                    reasoning_step,
                    is_done=False,
                    tool_output=tool_output,
                )
            }

    @property
    def _input_keys(self) -> Set[str]:
        return {"chat_response"}

    @property
    def _optional_input_keys(self) -> Set[str]:
        return {"is_done", "tool_output"}

    @property
    def _output_keys(self) -> Set[str]:
        return {"output"}

    @property
    def sub_query_components(self) -> List[QueryComponent]:
        return [self.tool_runner_component]


react_output_component = OutputAgentComponent([sql_tool])

### Stitch together Agent Query Pipeline

We can now stitch together the top-level agent pipeline: agent_input -> react_prompt -> llm -> react_output.

The last component is the if-else component that calls sub-components.

In [9]:
from llama_index.query_pipeline import QueryPipeline as QP
from llama_index.llms import OpenAI

qp = QP(
    modules={
        "agent_input": agent_input_component,
        "react_prompt": react_prompt_component,
        "llm": OpenAI(model="gpt-4-1106-preview"),
        "react_output": react_output_component,
    },
    verbose=True,
)
qp.add_chain(["agent_input", "react_prompt", "llm", "react_output"])

### Visualize Query Pipeline

In [10]:
from pyvis.network import Network

net = Network(notebook=True, cdn_resources="in_line", directed=True)
net.from_nx(qp.dag)
net.show("agent_dag.html")

agent_dag.html


### Setup Agent Worker around Text-to-SQL Query Pipeline

This is our way to setup an agent around a text-to-SQL Query Pipeline

In [11]:
from llama_index.agent import QueryPipelineAgentWorker, AgentRunner
from llama_index.callbacks import CallbackManager

agent_worker = QueryPipelineAgentWorker(qp)
agent = AgentRunner(agent_worker, callback_manager=CallbackManager([]))

In [12]:
agent_worker.agent_components

[AgentFnComponent(partial_dict={'tools': [<llama_index.tools.query_engine.QueryEngineTool object at 0x7f1d85c12be0>]}, fn=<function react_prompt_fn at 0x7f1d86709310>, async_fn=None),
 OutputAgentComponent(partial_dict={}, callback_manager=<llama_index.callbacks.base.CallbackManager object at 0x7f1d85dde070>, tool_runner_component=ToolRunnerComponent(partial_dict={}, tool_dict={'sql_tool': <llama_index.tools.query_engine.QueryEngineTool object at 0x7f1d85c12be0>}, callback_manager=<llama_index.callbacks.base.CallbackManager object at 0x7f1d85dde5b0>), output_parser=<llama_index.agent.react.output_parser.ReActOutputParser object at 0x7f1d85ddecd0>)]

In [13]:
from llama_index.agent import QueryPipelineAgentWorker, AgentRunner
from llama_index.callbacks import CallbackManager

agent_worker = QueryPipelineAgentWorker(qp)
agent = AgentRunner(agent_worker, callback_manager=CallbackManager([]))

In [14]:
agent_worker.agent_components

[AgentFnComponent(partial_dict={'tools': [<llama_index.tools.query_engine.QueryEngineTool object at 0x7f1d85c12be0>]}, fn=<function react_prompt_fn at 0x7f1d86709310>, async_fn=None),
 OutputAgentComponent(partial_dict={}, callback_manager=<llama_index.callbacks.base.CallbackManager object at 0x7f1d85dde070>, tool_runner_component=ToolRunnerComponent(partial_dict={}, tool_dict={'sql_tool': <llama_index.tools.query_engine.QueryEngineTool object at 0x7f1d85c12be0>}, callback_manager=<llama_index.callbacks.base.CallbackManager object at 0x7f1d85dde5b0>), output_parser=<llama_index.agent.react.output_parser.ReActOutputParser object at 0x7f1d85ddecd0>)]

### Run the Agent

Let's try the agent on some sample queries.

In [15]:
# start task
task = agent.create_task(
    "What are some tracks from the artist AC/DC? Limit it to 3"
)

In [16]:
step_output = agent.run_step(task.task_id)

[1;3;38;2;155;135;227m> Running module agent_input with input: 
state: {'sources': [], 'memory': ChatMemoryBuffer(token_limit=3000, tokenizer_fn=functools.partial(<bound method Encoding.encode of <Encoding 'cl100k_base'>>, allowed_special='all'), chat_store=SimpleChatSto...
task: task_id='a7019eff-cb16-4853-bf3e-6ede81ad9a17' input='What are some tracks from the artist AC/DC? Limit it to 3' memory=ChatMemoryBuffer(token_limit=3000, tokenizer_fn=functools.partial(<bound method ...

[0m[1;3;38;2;155;135;227m> Running module react_prompt with input: 
input: What are some tracks from the artist AC/DC? Limit it to 3

[0m[1;3;38;2;155;135;227m> Running module llm with input: 
messages: [ChatMessage(role=<MessageRole.SYSTEM: 'system'>, content='\nYou are designed to help with a variety of tasks, from answering questions     to providing summaries to other types of analyses.\n\n## Too...

[0m

[1;3;38;2;155;135;227m> Running module react_output with input: 
chat_response: assistant: Thought: I need to use a tool to help me answer the question.
Action: sql_tool
Action Input: {"input": "What are some tracks from the artist AC/DC? Limit it to 3"}

[0m

AttributeError: 'str' object has no attribute 'get_single_table_info'

In [None]:
step_output = agent.run_step(task.task_id)

[1;3;38;2;155;135;227m> Running module agent_input with input: 
state: {'sources': [], 'memory': ChatMemoryBuffer(token_limit=3000, tokenizer_fn=functools.partial(<bound method Encoding.encode of <Encoding 'cl100k_base'>>, allowed_special='all'), chat_store=SimpleChatSto...
task: task_id='79a8d443-5707-4632-82b2-51fd253cd294' input='What are some tracks from the artist AC/DC? Limit it to 3' memory=ChatMemoryBuffer(token_limit=3000, tokenizer_fn=functools.partial(<bound method ...

[0m[1;3;38;2;155;135;227m> Running module react_prompt with input: 
input: What are some tracks from the artist AC/DC? Limit it to 3

[0m[1;3;38;2;155;135;227m> Running module llm with input: 
messages: [ChatMessage(role=<MessageRole.SYSTEM: 'system'>, content='\nYou are designed to help with a variety of tasks, from answering questions     to providing summaries to other types of analyses.\n\n## Too...

[0m[1;3;38;2;155;135;227m> Running module react_output with input: 
chat_response: assistant: Thoug

In [None]:
step_output.is_last

True

In [None]:
response = agent.finalize_response(task.task_id)

In [None]:
print(str(response))

The top 3 tracks by AC/DC are "For Those About To Rock (We Salute You)", "Put The Finger On You", and "Let's Get It Up".


## Setup Simple Retry Agent Pipeline for Text-to-SQL 

Instead of the full ReAct pipeline that does tool picking, let's try a much simpler agent pipeline that only does text-to-SQL, with retry-logic.

We try a simple text-based "retry" prompt where given the user input and previous conversation history, can generate a modified query that outputs the right result.

### Define Core Modules

- agent input
- retry prompt
- output processor (including a validation prompt)

In [None]:
from llama_index.llms import OpenAI

# llm = OpenAI(model="gpt-3.5-turbo")
llm = OpenAI(model="gpt-4-1106-preview")

In [None]:
from llama_index.agent import Task, AgentChatResponse
from typing import Dict, Any
from llama_index.query_pipeline import AgentInputComponent, AgentFnComponent


def agent_input_fn(task: Task, state: Dict[str, Any]) -> Dict:
    """Agent input function."""
    # initialize current_reasoning
    if "convo_history" not in state:
        state["convo_history"] = []
        state["count"] = 0
    state["convo_history"].append(f"User: {task.input}")
    convo_history_str = "\n".join(state["convo_history"]) or "None"
    return {"input": task.input, "convo_history": convo_history_str}


agent_input_component = AgentInputComponent(fn=agent_input_fn)

In [None]:
from llama_index.prompts import PromptTemplate

retry_prompt_str = """\
You are trying to generate a proper natural language query given a user input.

This query will then be interpreted by a downstream text-to-SQL agent which
will convert the query to a SQL statement. If the agent triggers an error,
then that will be reflected in the current conversation history (see below).

If the conversation history is None, use the user input. If its not None,
generate a new SQL query that avoids the problems of the previous SQL query.

Input: {input}
Convo history (failed attempts): 
{convo_history}

New input: """
retry_prompt = PromptTemplate(retry_prompt_str)

In [None]:
from llama_index.response import Response
from typing import Tuple

validate_prompt_str = """\
Given the user query, validate whether the inferred SQL query and response from executing the query is correct and answers the query.

Answer with YES or NO.

Query: {input}
Inferred SQL query: {sql_query}
SQL Response: {sql_response}

Result: """
validate_prompt = PromptTemplate(validate_prompt_str)

MAX_ITER = 3


def agent_output_fn(
    task: Task, state: Dict[str, Any], output: Response
) -> Tuple[AgentChatResponse, bool]:
    """Agent output component."""
    print(f"> Inferred SQL Query: {output.metadata['sql_query']}")
    print(f"> SQL Response: {str(output)}")
    state["convo_history"].append(
        f"Assistant (inferred SQL query): {output.metadata['sql_query']}"
    )
    state["convo_history"].append(f"Assistant (response): {str(output)}")

    # run a mini chain to get response
    validate_prompt_partial = validate_prompt.as_query_component(
        partial={
            "sql_query": output.metadata["sql_query"],
            "sql_response": str(output),
        }
    )
    qp = QP(chain=[validate_prompt_partial, llm])
    validate_output = qp.run(input=task.input)

    state["count"] += 1
    is_done = False
    if state["count"] >= MAX_ITER:
        is_done = True
    if "YES" in validate_output.message.content:
        is_done = True

    return AgentChatResponse(response=str(output)), is_done


agent_output_component = AgentFnComponent(fn=agent_output_fn)

In [None]:
from llama_index.query_pipeline import (
    QueryPipeline as QP,
    Link,
    InputComponent,
)

qp = QP(
    modules={
        "input": agent_input_component,
        "retry_prompt": retry_prompt,
        "llm": llm,
        "sql_query_engine": sql_query_engine,
        "output_component": agent_output_component,
    },
    verbose=True,
)
qp.add_link("input", "retry_prompt", src_key="input", dest_key="input")
qp.add_link(
    "input", "retry_prompt", src_key="convo_history", dest_key="convo_history"
)
qp.add_chain(["retry_prompt", "llm", "sql_query_engine", "output_component"])

### Visualize Query Pipeline

In [None]:
from pyvis.network import Network

net = Network(notebook=True, cdn_resources="in_line", directed=True)
net.from_nx(qp.dag)
net.show("agent_dag.html")

agent_dag.html


### Define Agent Worker

In [None]:
from llama_index.agent import QueryPipelineAgentWorker, AgentRunner
from llama_index.callbacks import CallbackManager

agent_worker = QueryPipelineAgentWorker(qp)
agent = AgentRunner(agent_worker, callback_manager=CallbackManager([]))

In [None]:
response = agent.chat(
    "How many albums did the artist who wrote 'Restless and Wild' release? (answer should be non-zero)?"
)
print(str(response))

[1;3;38;2;155;135;227m> Running module input with input: 
state: {'sources': [], 'memory': ChatMemoryBuffer(token_limit=3000, tokenizer_fn=functools.partial(<bound method Encoding.encode of <Encoding 'cl100k_base'>>, allowed_special='all'), chat_store=SimpleChatSto...
task: task_id='741c0d59-fa40-44a2-acab-cc4c36fdf0c7' input="How many albums did the artist who wrote 'Restless and Wild' release? (answer should be non-zero)?" memory=ChatMemoryBuffer(token_limit=3000, toke...

[0m[1;3;38;2;155;135;227m> Running module retry_prompt with input: 
input: How many albums did the artist who wrote 'Restless and Wild' release? (answer should be non-zero)?
convo_history: User: How many albums did the artist who wrote 'Restless and Wild' release? (answer should be non-zero)?

[0m[1;3;38;2;155;135;227m> Running module llm with input: 
messages: You are trying to generate a proper natural language query given a user input.

This query will then be interpreted by a downstream text-to-SQL agent w