## Workflow for a ReAct Agent


[Optional] Set up observability with Llamatrace¶


In [1]:
from opentelemetry.sdk import trace as trace_sdk
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
    OTLPSpanExporter as HTTPSpanExporter,
)
from openinference.instrumentation.llama_index import LlamaIndexInstrumentor
from dotenv import load_dotenv
import os

load_dotenv()

# Add Phoenix API Key for tracing
PHOENIX_API_KEY = os.getenv("OTEL_EXPORTER_OTLP_HEADERS")
os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY")

# Add Phoenix
span_phoenix_processor = SimpleSpanProcessor(
    HTTPSpanExporter(endpoint="https://app.phoenix.arize.com/v1/traces")
)

# Add them to the tracer
tracer_provider = trace_sdk.TracerProvider()
tracer_provider.add_span_processor(span_processor=span_phoenix_processor)

# Instrument the application
LlamaIndexInstrumentor().instrument(tracer_provider=tracer_provider)

### Designing the workflow
An agent consists of several steps
1. Handling the latest incoming user message, including adding to memory and preparing the chat history
2. Using the chat history and tools to construct a ReAct prompt
3. Calling the llm with the react prompt, and parsing out function/tool calls
4. If no tool calls, we can return
5. If there are tool calls, we need to execute them, and then loop back for a fresh ReAct prompt using the latest tool calls

### The Workflow Events
To handle these steps, we need to define a few events:

1. An event to handle new messages and prepare the chat history
2. An event to prompt the LLM with the react prompt
3. An event to trigger tool calls, if any
4. An event to handle the results of tool calls, if any

The other steps will use the built-in StartEvent and StopEvent events.

In addition to events, we will also use the global context to store the current react reasoning!

In [2]:
from llama_index.core.llms import ChatMessage
from llama_index.core.tools import ToolSelection, ToolOutput
from llama_index.core.workflow import Event

class PrepEvent(Event): #PrepEvent marks readiness for the next step.
    pass

class InputEvent(Event): #Represents the chat history (a list of messages), which the agent uses to prepare the ReAct prompt. | InputEvent carries the formatted prompt to the LLM.
    input : list[ChatMessage]

class ToolCallEvent(Event): # Signals that tools (or functions) need to be invoked as part of the reasoning process. | ToolCallEvent handles tool-related reasoning (e.g., calculations, queries).
    tool_calls : list[ToolSelection] 
    
class FunctionOutputEvent(Event): #Represents the results of invoking tools.
    output: ToolOutput
 

### The Workflow Itself
With our events defined, we can construct our workflow and steps.

Note that the workflow automatically validates itself using type annotations, so the type annotations on our steps are very helpful!



In [3]:
from llama_index.core.workflow import (
    Workflow,
    Context,
    StartEvent,
    StopEvent,
    step
)
from llama_index.core.llms.llm import LLM
from typing import Any
from llama_index.core.tools.types import BaseTool
from llama_index.llms.openai import OpenAI
from llama_index.core.memory import ChatMemoryBuffer
from llama_index.core.agent.react import ReActChatFormatter, ReActOutputParser
from llama_index.core.agent.react.types import (
    ActionReasoningStep, 
    ObservationReasoningStep
)

#### Breaking Down the @step Decorator and Workflow Concepts
The @step decorator in the ReActAgent class marks functions as workflow steps. These steps represent individual units of work in the workflow. Let’s address your questions systematically.

**How Do the Steps Work?**

Each step:

1. Takes input in the form of an event (e.g., StartEvent, PrepEvent, InputEvent).
2. Processes the input data and performs some operations (e.g., storing user input, formatting chat history, or handling tool calls).
3. Produces and returns a new event (e.g., PrepEvent, InputEvent) that acts as the output for the next step in the workflow.

Think of it as a pipeline:

Each step takes an event as input, processes it, and returns another event for the next step.


**What Are ev and ctx?**

- ev (Event):

    - Represents the input event for the current step.
    - Contains the data that the current step needs to operate on.
    - Example: In new_user_msg, ev is a StartEvent that provides the input (the user's message).
- ctx (Context):

    - Acts as shared memory for the workflow.
    - Allows steps to share intermediate data that isn’t directly passed through events.
    - You can:
        - Store data: await ctx.set("key", value)
        - Retrieve data: await ctx.get("key")
      
These two mechanisms (ev and ctx) ensure steps can communicate with each other.


**How Do the Steps Communicate?**

- Direct Communication:

    - One step returns an event (e.g., PrepEvent), and the next step receives it as input.
    - Example:
        - new_user_msg returns a PrepEvent.
        - prepare_chat_history takes PrepEvent as input and uses it to proceed.
        
- Shared State (Context):

    - Steps use ctx to share data that persists across multiple steps.
    - Example:
        - new_user_msg clears current_reasoning in ctx.
        - Later steps like prepare_chat_history or handle_llm_input retrieve or update current_reasoning.


In [5]:

class ReActAgent(Workflow):
    def __init__(
        self,
        *args: Any,
        llm: LLM | None = None,
        tools: list[BaseTool] | None = None,
        extra_context: str | None = None,
        **kwargs: Any,
    ) -> None:
        super().__init__(*args, **kwargs)
        self.tools = tools or []

        self.llm = llm or OpenAI()

        self.memory = ChatMemoryBuffer.from_defaults(llm=llm)
        self.formatter = ReActChatFormatter(context=extra_context or "")
        self.output_parser = ReActOutputParser()
        self.sources = []

    @step
    async def new_user_msg(self, ctx: Context, ev: StartEvent) -> PrepEvent:
        # clear sources
        self.sources = []

        # get user input
        user_input = ev.input
        user_msg = ChatMessage(role="user", content=user_input)
        self.memory.put(user_msg)

        # clear current reasoning
        await ctx.set("current_reasoning", [])

        return PrepEvent() #Signals the workflow to proceed to the next step (prepare_chat_history) by emitting a PrepEvent.

    @step
    async def prepare_chat_history(
        self, ctx: Context, ev: PrepEvent
    ) -> InputEvent:
        # get chat history
        chat_history = self.memory.get()
        current_reasoning = await ctx.get("current_reasoning", default=[])
        llm_input = self.formatter.format(
            self.tools, chat_history, current_reasoning=current_reasoning
        )
        return InputEvent(input=llm_input)

    @step
    async def handle_llm_input(
        self, ctx: Context, ev: InputEvent
    ) -> ToolCallEvent | StopEvent:
        chat_history = ev.input

        response = await self.llm.achat(chat_history)

        try:
            reasoning_step = self.output_parser.parse(response.message.content)
            (await ctx.get("current_reasoning", default=[])).append(
                reasoning_step
            )
            if reasoning_step.is_done:
                # Condition: reasoning_step.is_done checks if the LLM provided a final answer.
                # Action:
                    # Stores the assistant's response (reasoning_step.response) in memory.
                    # Emits a StopEvent with:
                        # The final response.
                        # Any sources consulted.
                        # The reasoning chain from ctx.
                self.memory.put(
                    ChatMessage(
                        role="assistant", content=reasoning_step.response
                    )
                )
                #  RETURN StopEvent if the LLM provides a final response.

                return StopEvent(
                    result={
                        "response": reasoning_step.response,
                        "sources": [*self.sources],
                        "reasoning": await ctx.get(
                            "current_reasoning", default=[]
                        ),
                    }
                )
            elif isinstance(reasoning_step, ActionReasoningStep):
                # Condition: The reasoning step indicates the LLM wants to call a tool.
                # Action:
                    # Extracts the tool_name and tool_args from the reasoning.
                    # Emits a ToolCallEvent with:
                        # A list of tool calls.
                        # Tools are identified by tool_name and passed tool_args.
                tool_name = reasoning_step.action
                tool_args = reasoning_step.action_input
                
                #RETURN ToolCallEvent if the LLM decides to invoke a tool.

                return ToolCallEvent(
                    tool_calls=[
                        ToolSelection(
                            tool_id="fake",
                            tool_name=tool_name,
                            tool_kwargs=tool_args,
                        )
                    ]
                )
        except Exception as e:
            # Appends an ObservationReasoningStep to current_reasoning, noting the error.
            # Allows the workflow to continue iterating.
            (await ctx.get("current_reasoning", default=[])).append(
                ObservationReasoningStep(
                    observation=f"There was an error in parsing my reasoning: {e}"
                )
            )

        # If parsing fails or neither condition is met, the step returns to a fresh state with a PrepEvent.
        # if no tool calls or final response, iterate again
        # Condition: If the LLM doesn’t provide a final response or tool call.
        # Action: Returns a PrepEvent to loop back and refine the prompt.
        return PrepEvent()

    @step
    async def handle_tool_calls(
        self, ctx: Context, ev: ToolCallEvent
    ) -> PrepEvent:
        tool_calls = ev.tool_calls
        tools_by_name = {tool.metadata.get_name(): tool for tool in self.tools}

        # call tools -- safely!
        for tool_call in tool_calls:
            tool = tools_by_name.get(tool_call.tool_name)
            if not tool:
                (await ctx.get("current_reasoning", default=[])).append(
                    ObservationReasoningStep(
                        observation=f"Tool {tool_call.tool_name} does not exist"
                    )
                )
                continue

            try:
                tool_output = tool(**tool_call.tool_kwargs)
                self.sources.append(tool_output)
                (await ctx.get("current_reasoning", default=[])).append(
                    ObservationReasoningStep(observation=tool_output.content)
                )
            except Exception as e:
                (await ctx.get("current_reasoning", default=[])).append(
                    ObservationReasoningStep(
                        observation=f"Error calling tool {tool.metadata.get_name()}: {e}"
                    )
                )

        
        # prep the next iteration
        # After processing all tool calls, emits a PrepEvent to continue the reasoning loop.
        # This ensures the agent is ready for the next reasoning iteration.

        return PrepEvent()

And thats it! Let's explore the workflow we wrote a bit.

**new_user_msg():** Adds the user message to memory, and clears the global context to keep track of a fresh string of reasoning.

**prepare_chat_history():** Prepares the react prompt, using the chat history, tools, and current reasoning (if any)

**handle_llm_input():** Prompts the LLM with our react prompt, and uses some utility functions to parse the output. If there are no tool calls, we can stop and emit a StopEvent. Otherwise, we emit a ToolCallEvent to handle tool calls. Lastly, if there are no tool calls, and no final response, we simply loop again.

**handle_tool_calls():** Safely calls tools with error handling, adding the tool outputs to the current reasoning. Then, by emitting a PrepEvent, we loop around for another round of ReAct prompting and parsing.



#### Run the Workflow

NOTE: With loops, we need to be mindful of runtime. Here, we set a timeout of 120s.



In [6]:
from llama_index.core.tools import FunctionTool

def add(x: int, y: int) -> int:
    """Useful function to add two numbers."""
    return x + y


def multiply(x: int, y: int) -> int:
    """Useful function to multiply two numbers."""
    return x * y


tools = [
    FunctionTool.from_defaults(add),
    FunctionTool.from_defaults(multiply),
]

agent = ReActAgent(
    llm=OpenAI(model="gpt-4o-mini"), tools=tools, timeout=120, verbose=True
)

ret = await agent.run(input="Hello!")

Running step new_user_msg


Failed to export batch code: 401, reason: 


Step new_user_msg produced event PrepEvent
Running step prepare_chat_history


Failed to export batch code: 401, reason: 


Step prepare_chat_history produced event InputEvent
Running step handle_llm_input


Failed to export batch code: 401, reason: 
Failed to export batch code: 401, reason: 
Failed to export batch code: 401, reason: 


Step handle_llm_input produced event StopEvent


Failed to export batch code: 401, reason: 
Failed to export batch code: 401, reason: 


In [7]:
print(ret["response"])


Hello! How can I assist you today?
```


In [10]:
ret = await agent.run(input="Calculate: (120*4) + 15")

Running step new_user_msg


Failed to export batch code: 401, reason: 


Step new_user_msg produced event PrepEvent
Running step prepare_chat_history


Failed to export batch code: 401, reason: 


Step prepare_chat_history produced event InputEvent
Running step handle_llm_input


Failed to export batch code: 401, reason: 
Failed to export batch code: 401, reason: 
Failed to export batch code: 401, reason: 


Step handle_llm_input produced event ToolCallEvent
Running step handle_tool_calls


Failed to export batch code: 401, reason: 
Failed to export batch code: 401, reason: 
Failed to export batch code: 401, reason: 


Step handle_tool_calls produced event PrepEvent
Running step prepare_chat_history


Failed to export batch code: 401, reason: 


Step prepare_chat_history produced event InputEvent
Running step handle_llm_input


Failed to export batch code: 401, reason: 
Failed to export batch code: 401, reason: 
Failed to export batch code: 401, reason: 


Step handle_llm_input produced event ToolCallEvent
Running step handle_tool_calls


Failed to export batch code: 401, reason: 
Failed to export batch code: 401, reason: 
Failed to export batch code: 401, reason: 


Step handle_tool_calls produced event PrepEvent
Running step prepare_chat_history


Failed to export batch code: 401, reason: 


Step prepare_chat_history produced event InputEvent
Running step handle_llm_input


Failed to export batch code: 401, reason: 
Failed to export batch code: 401, reason: 
Failed to export batch code: 401, reason: 


Step handle_llm_input produced event StopEvent


Failed to export batch code: 401, reason: 
Failed to export batch code: 401, reason: 


In [11]:
print(ret["response"])


The result of (120 * 4) + 15 is 495.
