# Workflow for a ReAct Agent

This notebook walks through setting up a `Workflow` to construct a ReAct agent.

React calling agents work by prompting an LLM to either invoke tools/functions, or return a final response. They also reflect on the generated outputs before handing over the response to the user

## Intallations

In [1]:
# # Necessary installations
# !pip install llama-index==0.12.17
# !pip install faiss-cpu==1.10.0
# !pip install llama-index-vector-stores-faiss==0.3.0
# !pip install llama-index-embeddings-azure-openai==0.3.0
# !pip install llama-index-llms-azure-openai==0.3.0
# !pip install llama-index-utils-workflow==0.2.1
# !pip install duckduckgo-search==7.3.2
#!pip intall python-dotenv==1.0.1

 `Workflows make async a first-class citizen`. If you were running in your own code, you would want to use `asyncio.run()` to start an async event loop if one isn't already running.

```python
async def main():
    <async code>

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())
```

## Configure LLM and Embedding Model

In [2]:
from dotenv import load_dotenv
import os
# Load environment variables from the .env file
load_dotenv()

True

In [3]:
from llama_index.llms.azure_openai import AzureOpenAI
from llama_index.embeddings.azure_openai import AzureOpenAIEmbedding

api_key = os.getenv("OPENAI_API_KEY")
azure_endpoint = os.getenv("BASE_URL")
api_version = "2023-07-01-preview"
generation_deployment_name=os.getenv("GENERATION_MODEL")
embedding_deployment_name=os.getenv("EMBEDDING_MODEL")

# Initialize an instance of the AzureOpenAI class for generating text.
llm = AzureOpenAI(
    model="gpt-4o",
    deployment_name=generation_deployment_name,
    api_key=api_key,
    azure_endpoint=azure_endpoint,
    api_version=api_version,
)

# Initialize an instance of the AzureOpenAIEmbedding class for generating text embeddings.
embed_model = AzureOpenAIEmbedding(
    model="text-embedding-3-small",
    deployment_name=embedding_deployment_name,
    api_key=api_key,
    azure_endpoint=azure_endpoint,
    api_version=api_version,
)

In [4]:
from llama_index.core import Settings  # This class is used to configure global settings.

Settings.embed_model = embed_model
Settings.llm = llm

## Create Tools for Agent

We define the separate tool functions. When integrating with our graph all of these will be executed using the same `handle_tool_calls()` step we will define later in the code.

For now, let's define the functions that our agent will have access to.

### Query Engine Tools

Query engine is a generic interface that allows you to ask question over your data.

1) The `lyft_engine` tool is to get data related to 10k filing of `Lyft, Inc.`.
2) The `uber_engine` tool is to get data related to 10k filing of `Uber Technologies, Inc.`

In [5]:
from llama_index.core import (
    SimpleDirectoryReader,
    VectorStoreIndex,
    StorageContext,
    load_index_from_storage,
)

In [6]:
# Define the directory where the index is (or will be) persisted
persist_dir_lyft = "./storage/lyft"
persist_dir_uber = "./storage/uber"

# Check if persistent storage exists
if os.path.exists(persist_dir_lyft) and os.path.exists(persist_dir_uber):
    # Load the storage context from disk
    storage_context_lyft = StorageContext.from_defaults(persist_dir=persist_dir_lyft)
    storage_context_uber = StorageContext.from_defaults(persist_dir=persist_dir_uber)
    # Load the index using the persistent storage context
    lyft_index = load_index_from_storage(storage_context_lyft)
    uber_index = load_index_from_storage(storage_context_uber)
    # set index_loaded variable to True
    index_loaded=True
else:
    index_loaded=False

In [7]:
#If the index is not in storage make the index from the documents
if not index_loaded:
    # load data
    lyft_docs = SimpleDirectoryReader(
        input_files=["./data/10k/lyft_2021.pdf"]
    ).load_data()
    uber_docs = SimpleDirectoryReader(
        input_files=["./data/10k/uber_2021.pdf"]
    ).load_data()

    # build index
    lyft_index = VectorStoreIndex.from_documents(lyft_docs)
    uber_index = VectorStoreIndex.from_documents(uber_docs)

    # persist index
    lyft_index.storage_context.persist(persist_dir="./storage/lyft")
    uber_index.storage_context.persist(persist_dir="./storage/uber")

In [8]:
# Create a query engine for the Lyft index. 
lyft_engine = lyft_index.as_query_engine(similarity_top_k=3) #return the top 3 results

# Create a query engine for the Uber index.
uber_engine = uber_index.as_query_engine(similarity_top_k=3) #top 3 most similar results


### Web Search Tool

The web search tool will provide the agent with access to web search. It will be instructed to use this for more general knowledge queries.

In [9]:
from duckduckgo_search import DDGS

async def websearch(query: str) -> str:
    ddgs = DDGS() # Instantiate the DDGS class which provides DuckDuckGo search functionality.
    response = ddgs.text(query, region="us-en", max_results=5) #5 top most search
    context = ""
    for count,result in enumerate(response):
        context += f"Result {count}: {result['body']}\n"

    # Return the concatenated search result bodies as a single string.
    return context

## Designing the Workflow

`ReAct Agent Workflow:`
- Receives a user message and updates the conversation memory.
- Formats a prompt using the conversation history and any ongoing reasoning.
- Sends the prompt to the LLM and parses its output.
- If the LLM provides a final response, it is returned; if it indicates a tool call, the appropriate tool is executed.
- The output from the tool is integrated back into the reasoning, and the loop repeats until a final response is produced.

## The Workflow Events

To handle these steps, we need to define a few events:

In [11]:
from llama_index.core.llms import ChatMessage
from llama_index.core.tools import ToolSelection, ToolOutput
from llama_index.core.workflow import Event

class PrepEvent(Event): #We need to prepare the input for LLM
    pass


class InputEvent(Event): #Input is ready lets give it to LLM
    input: list[ChatMessage]


class ToolCallEvent(Event): # Agent has decided to call a tool
    tool_calls: list[ToolSelection]

The other steps will use the built-in `StartEvent` and `StopEvent` events.

In addition to events, we will also use the global context to store the current react reasoning!

## The Workflow and its Steps

With our events defined, we can construct our workflow and steps. 

In [12]:
# Import necessary types and classes from typing and llama_index modules.
from typing import Any, List
from llama_index.core.agent.react import ReActChatFormatter, ReActOutputParser
from llama_index.core.agent.react.types import (
    ActionReasoningStep,
    ObservationReasoningStep,
)
from llama_index.core.llms.llm import LLM
from llama_index.core.memory import ChatMemoryBuffer
from llama_index.core.tools.types import BaseTool
from llama_index.core.workflow import (
    Context,
    Workflow,
    StartEvent,
    StopEvent,
    step,
)
from llama_index.llms.openai import OpenAI

In [13]:
# Define the ReActAgent as a subclass of Workflow to orchestrate the ReAct prompting loop.
class ReActAgent(Workflow):
    def __init__(
        self,
        *args: Any,
        llm: LLM | None = None,
        tools: list[BaseTool] | None = None,
        extra_context: str | None = None,
        **kwargs: Any,
    ) -> None:

        # Initialize parent Workflow.
        super().__init__(*args, **kwargs)
        self.tools = tools or []
        self.llm = llm
        self.memory = ChatMemoryBuffer.from_defaults()
        self.formatter = ReActChatFormatter.from_defaults(
            context=extra_context or ""
        )
        self.output_parser = ReActOutputParser()
        self.sources = [] # Initialize an empty list to track sources (e.g., tool outputs).

    @step
    async def new_user_msg(self, ctx: Context, ev: StartEvent) -> PrepEvent:
        """Adds the user message to memory, and clears the global context ensuring a fresh start"""
        
        # Clear the sources list at the beginning of a new conversation round.
        self.sources = []

        # Extract the user input from the StartEvent.
        user_input = ev.input
        # Create a chat message for the user.
        user_msg = ChatMessage(role="user", content=user_input)
        # Add the user message to the conversation memory.
        self.memory.put(user_msg)

        # Clear the current reasoning stored in the context, ensuring a fresh start.
        await ctx.set("current_reasoning", [])

        # Emit a PrepEvent to indicate that preparation is complete.
        return PrepEvent()

    @step
    async def prepare_chat_history(self, ctx: Context, ev: PrepEvent) -> InputEvent:
        """Prepares the ReAct prompt using the chat history, available tools, and current reasoning (if any)."""
        
        # Retrieve the chat history from the memory buffer.
        chat_history = self.memory.get()
        # Get the current reasoning steps from the context (or default to an empty list).
        current_reasoning = await ctx.get("current_reasoning", default=[])
        # Format the input for the LLM by including the tools, chat history, and current reasoning.
        llm_input = self.formatter.format(
            self.tools, chat_history, current_reasoning=current_reasoning
        )
        # print("LLM Input",llm_input)
        # Return an InputEvent containing the prepared LLM prompt.
        return InputEvent(input=llm_input)

    @step
    async def handle_llm_input(self, ctx: Context, ev: InputEvent) -> ToolCallEvent | StopEvent:
        """Prompts the LLM with our ReAct prompt and decides whether to emit a StopEvent, ToolCallEvent, or PrepEvent."""
        
        # Retrieve the prompt from the InputEvent.
        llm_input = ev.input
        # Send the prompt to the LLM and await its response.
        response = await self.llm.achat(llm_input)

        try:
            # Parse the LLM's response to extract a reasoning step.
            reasoning_step = self.output_parser.parse(response.message.content)
            # Append the new reasoning step to the context's current reasoning.
            (await ctx.get("current_reasoning", default=[])).append(
                reasoning_step
            )
            # If the reasoning step indicates that the LLM is done (i.e., no tool calls are required):
            if reasoning_step.is_done:
                # Add the final response to the conversation memory.
                self.memory.put(
                    ChatMessage(
                        role="assistant", content=reasoning_step.response
                    )
                )
                # Emit a StopEvent with the final response, sources, and reasoning.
                return StopEvent(
                    result={
                        "response": reasoning_step.response,
                        "sources": [*self.sources],
                        "reasoning": await ctx.get(
                            "current_reasoning", default=[]
                        ),
                    }
                )
            # If the reasoning step is an action that requires calling a tool:
            elif isinstance(reasoning_step, ActionReasoningStep):
                # Extract the tool name and its input arguments.
                tool_name = reasoning_step.action
                tool_args = reasoning_step.action_input
                # Emit a ToolCallEvent with the relevant tool call details.
                return ToolCallEvent(
                    tool_calls=[
                        ToolSelection(
                            tool_id=tool_name,
                            tool_name=tool_name,
                            tool_kwargs=tool_args,
                        )
                    ]
                )
        except Exception as e:
            # If there is an error in parsing the LLM's reasoning, log an observation step.
            (await ctx.get("current_reasoning", default=[])).append(
                ObservationReasoningStep(
                    observation=f"There was an error in parsing my reasoning: {e}"
                )
            )

        # If no final response or tool call was detected, emit a PrepEvent to repeat the reasoning process.
        return PrepEvent()

    @step
    async def handle_tool_calls(self, ctx: Context, ev: ToolCallEvent) -> PrepEvent:
        """Safely calls tools with error handling, adding the tool outputs to the current reasoning."""
        # Retrieve the list of tool calls from the event.
        tool_calls = ev.tool_calls
        # Map tools by their names for easy lookup.
        tools_by_name = {tool.metadata.get_name(): tool for tool in self.tools}

        # Iterate over each tool call.
        for tool_call in tool_calls:
            # Retrieve the tool instance based on its name.
            tool = tools_by_name.get(tool_call.tool_name)
            if not tool:
                # If the tool is not found, log an observation about the missing tool.
                (await ctx.get("current_reasoning", default=[])).append(
                    ObservationReasoningStep(
                        observation=f"Tool {tool_call.tool_name} does not exist"
                    )
                )
                continue

            try:
                # Call the tool asynchronously with the provided keyword arguments.
                tool_output = await tool.acall(**tool_call.tool_kwargs)
                # Add the tool's output to the list of sources.
                self.sources.append(tool_output)
                
                # Record the output of the tool as an observation in the current reasoning.
                (await ctx.get("current_reasoning", default=[])).append(
                    ObservationReasoningStep(observation=tool_output.content)
                )
            except Exception as e:
                # If the tool call fails, record an observation with the error details.
                (await ctx.get("current_reasoning", default=[])).append(
                    ObservationReasoningStep(
                        observation=f"Error calling tool {tool.metadata.get_name()}: {e}"
                    )
                )

        # Emit a PrepEvent to loop back and generate another prompt iteration.
        return PrepEvent()


## Observability

In [14]:
from llama_index.utils.workflow import draw_all_possible_flows

draw_all_possible_flows(ReActAgent, filename="react_agent.html")

react_agent.html


## Attach Tools to the Agent

Attach tools we previously built to the ReAct Agent.

In [15]:
from llama_index.core.tools import QueryEngineTool,FunctionTool
# we have only defined tools that can retrieve data for now. We can also define tools that can take actions such as sending email, scheduling a meeting and so on
tools = [
          QueryEngineTool.from_defaults(
                lyft_engine, 
                name="lyft_10k", 
                description="Provides official financial data from Lyft's 2021 10-K filing, including revenue, expenses, profitability, growth trends, and key financial metrics."
            ),

           QueryEngineTool.from_defaults(
                uber_engine, 
                name="uber_10k", 
                description="Provides official financial data from Uber's 2021 10-K filing, including revenue, operating costs, profit margins, market trends, and key financial indicators."
            ),

           FunctionTool.from_defaults(
               async_fn=websearch,
               name="web_search",
               description="Provides real-time, live web search capabilities to fetch the most current and relevant information from the internet. Use this tool when a query requires up-to-date data, dynamic news, or broad context beyond the static datasets.")
       ]

In [16]:
agent = ReActAgent(
    llm=llm, tools=tools, timeout=120, verbose=True #we set a timeout of 120s.
)

## Run the Workflow!

**NOTE:** With loops, we need to be mindful of runtime. Here, we set a timeout of 120s.

In [28]:
result= await agent.run(input="Hello!")

Running step new_user_msg
Step new_user_msg produced event PrepEvent
Running step prepare_chat_history
Step prepare_chat_history produced event InputEvent
Running step handle_llm_input
Step handle_llm_input produced event StopEvent


In [30]:
question=[
    "Compare and contrast the revenue growth of Uber and Lyft in 2021, then give an analysis",
    "Compare and contrast the revenue growth of Uber and Lyft in 2021, then give an analysis. Compare it with new trends and techniques as well.",
    "What regulatory challenges are impacting the ride-sharing industry today?",
    "What are the best practices a company should follow to have high growth?",
    "Can you tell me about the risk factors of the company with the higher revenue?",
    "What is the forecast for the ride-sharing market in the next five years?",
    "What key financial risks are disclosed in Uber's 2021 10-K, and how are these risks being discussed in the latest financial news?"    
]

In [33]:
result= await agent.run(input=question[0])

Running step new_user_msg
Step new_user_msg produced event PrepEvent
Running step prepare_chat_history
Step prepare_chat_history produced event InputEvent
Running step handle_llm_input
Step handle_llm_input produced event ToolCallEvent
Running step handle_tool_calls
Step handle_tool_calls produced event PrepEvent
Running step prepare_chat_history
Step prepare_chat_history produced event InputEvent
Running step handle_llm_input
Step handle_llm_input produced event ToolCallEvent
Running step handle_tool_calls
Step handle_tool_calls produced event PrepEvent
Running step prepare_chat_history
Step prepare_chat_history produced event InputEvent
Running step handle_llm_input
Step handle_llm_input produced event StopEvent


In [20]:
print(result["response"])

In 2021, Uber and Lyft both experienced significant revenue growth as the economy began recovering from the impacts of the COVID-19 pandemic. Here's a comparison and analysis of their revenue growth:

1. **Revenue Growth Rate**:
   - **Uber**: Revenue grew by 57%, increasing from $11.1 billion in 2020 to $17.5 billion in 2021.
   - **Lyft**: Revenue grew by 36%, with an increase of $843.6 million compared to 2020.

   Uber's revenue growth rate was notably higher than Lyft's in 2021.

2. **Key Drivers of Growth**:
   - **Uber**: The growth was driven by a 56% increase in Gross Bookings, with Delivery Gross Bookings rising by 71% and Mobility Gross Bookings increasing by 38%. The delivery segment benefited from higher food delivery orders, larger basket sizes, and market expansion, while the mobility segment saw increased trip volumes as pandemic restrictions eased.
   - **Lyft**: Growth was primarily driven by a significant increase in the number of Active Riders as vaccines became wid

In [21]:
print(result["reasoning"])

[ActionReasoningStep(thought='The current language of the user is English. I need to gather data on the revenue growth of Uber and Lyft in 2021 from their respective 10-K filings to perform the comparison and analysis.', action='lyft_10k', action_input={'input': 'revenue growth in 2021'}), ObservationReasoningStep(observation='Revenue in 2021 grew by 36% compared to 2020, increasing by $843.6 million. This growth was primarily driven by a significant increase in the number of Active Riders as vaccines became more widely distributed and communities reopened. Additionally, revenue benefited from licensing and data access agreements starting in the second quarter of 2021. However, this growth was partially offset by investments in driver supply, which included increased driver incentives recorded as a reduction to revenue.', return_direct=False), ActionReasoningStep(thought="I now have Lyft's revenue growth data for 2021. Next, I need to gather Uber's revenue growth data for the same year

In [22]:
def format_reasoning(reasoning_steps):
    for step in reasoning_steps:
        # Use the class name as the heading
        step_type = type(step).__name__
        print(f"===== {step_type}====\n")
        
        # Check if the step object has attributes stored in __dict__
        if hasattr(step, '__dict__'):
            for key, value in step.__dict__.items():
                print(f"- **{key.capitalize()}**: {value}")
        else:
            # Fallback: simply print the step
            print(step)
        
        print("\n")

In [23]:
format_reasoning(result["reasoning"])

===== ActionReasoningStep====

- **Thought**: The current language of the user is English. I need to gather data on the revenue growth of Uber and Lyft in 2021 from their respective 10-K filings to perform the comparison and analysis.
- **Action**: lyft_10k
- **Action_input**: {'input': 'revenue growth in 2021'}


===== ObservationReasoningStep====

- **Observation**: Revenue in 2021 grew by 36% compared to 2020, increasing by $843.6 million. This growth was primarily driven by a significant increase in the number of Active Riders as vaccines became more widely distributed and communities reopened. Additionally, revenue benefited from licensing and data access agreements starting in the second quarter of 2021. However, this growth was partially offset by investments in driver supply, which included increased driver incentives recorded as a reduction to revenue.
- **Return_direct**: False


===== ActionReasoningStep====

- **Thought**: I now have Lyft's revenue growth data for 2021. Nex