In [17]:
import os
from dotenv import load_dotenv

load_dotenv()
api_key = os.getenv("OPENAI_API_KEY")
tavily_api_key = os.getenv("TAVILY_API_KEY")


In [7]:
from llama_index.llms.openai import OpenAI

llm = OpenAI(model="gpt-4o-mini", api_key=api_key)

In [19]:
from tavily import AsyncTavilyClient
# Metadata: name of the function is one of most important
async def search_web(query: str) -> str:
  # Meta data: the below string says what the tool is good for
  """
  Useful for using the web to answer questions.
  """
  client = AsyncTavilyClient(api_key=tavily_api_key)
  # return str(await client.search(query))
  result = await client.search(query)
  return result['results'][0]['content'] if result['results'] else "No results found."

In [20]:
from llama_index.core.agent.workflow import AgentWorkflow

workflow = AgentWorkflow.from_tools_or_functions(
  # pass it an array of tools
  [search_web],
  llm=llm,
  system_prompt="You are a helpful assistant that answers questions. If you don't know the answer, you can search the web for information",
)

In [None]:
response = await workflow.run(user_msg="What is the weather in Madrid?")
print(str(response)) 

In [22]:
from llama_index.core.workflow import Context

#Configure a context to work with our wordflow
ctx = Context(workflow)

response = await workflow.run(
  user_msg="My name is Lydon, nice to meet you!", ctx=ctx #give the configured context to the workflow
)
print(str(response))

Nice to meet you, Lydon! How can I assist you today?


In [23]:
#run the workflow again with the same context
response = await workflow.run(user_msg="What is my name?", ctx=ctx)
print(str(response))

Your name is Lydon.


In [24]:
from llama_index.core.workflow import JsonPickleSerializer, JsonSerializer

#convert our Context to a dictionary object
ctx_dict = ctx.to_dict(serializer=JsonSerializer())

#create a new Context from teh dictionary
restored_ctx = Context.from_dict(
  workflow, ctx_dict, serializer=JsonSerializer()
)

In [25]:
response = await workflow.run(
  user_msg="Do you still remember my name?", ctx=restored_ctx
)
print(str(response))

Yes, I remember your name is Lydon.


In [None]:
from llama_index.core.agent.workflow import (
  AgentInput,
  AgentOutput,
  ToolCall,
  ToolCallResult,
  AgentStream,
)

handler = workflow.run(user_msg="What is the weather in Saskatoon?")

async for event in handler.stream_events():
  if isinstance(event, AgentStream):
      print(event.delta, end="", flush=True)
      # print(event.response)  # the current full response
      # print(event.raw)  # the raw llm api response
      # print(event.current_agent_name)  # the current agent name
    # elif isinstance(event, AgentInput):
    #    print(event.input)  # the current input messages
    #    print(event.current_agent_name)  # the current agent name
    # elif isinstance(event, AgentOutput):
    #    print(event.response)  # the current full response
    #    print(event.tool_calls)  # the selected tool calls, if any
    #    print(event.raw)  # the raw llm api response
    # elif isinstance(event, ToolCallResult):
    #    print(event.tool_name)  # the tool name
    #    print(event.tool_kwargs)  # the tool kwargs
    #    print(event.tool_output)  # the tool output
    # elif isinstance(event, ToolCall):
    #     print(event.tool_name)  # the tool name
    #     print(event.tool_kwargs)  # the tool kwargs

In [None]:
from llama_index.core.agent.workflow import (
    AgentInput,
    AgentOutput,
    ToolCall,
    ToolCallResult,
    AgentStream,
)

handler = workflow.run(user_msg="What is the weather in Saskatoon?")

async for event in handler.stream_events():
    if isinstance(event, AgentInput):
       print("Agent input: ", event.input)  # the current input messages
       print("Agent name:", event.current_agent_name)  # the current agent name
    elif isinstance(event, AgentOutput):
       print("Agent output: ", event.response)  # the current full response
       print("Tool calls made: ", event.tool_calls)  # the selected tool calls, if any
       print("Raw LLM response: ", event.raw)  # the raw llm api response
    elif isinstance(event, ToolCallResult):
       print("Tool called: ", event.tool_name)  # the tool name
       print("Arguments to the tool: ", event.tool_kwargs)  # the tool kwargs
       print("Tool output: ", event.tool_output)  # the tool output

In [33]:
from llama_index.core.workflow import Context
#Note: To access the Context, the Context parameter should be the first parameter of the tool.

async def set_name(ctx: Context, name:str) -> str:
  state = await ctx.get("state")
  state["name"] = name
  await ctx.set("state", state)
  return f"name set to {name}"

workflow = AgentWorkflow.from_tools_or_functions(
  [set_name],
  llm=llm,
  system_prompt="You are a helpful assistant that can set a name.",
  initial_state={"name": "unset"},
)

ctx = Context(workflow)

response = await workflow.run(user_msg="My name is Lydon", ctx=ctx)
print(str(response))

state= await ctx.get("state")
print("Name as stored in state:", state["name"])

Your name has been set to Lydon.
Name as stored in state: Lydon


In [None]:
from llama_index.core.workflow import (
  Context,
  InputRequiredEvent,
  HumanResponseEvent,
)

# a tool that perfomrs a dangerous task
async def dangerous_task(ctx: Context) -> str:
  """
  A dangerous task that requires human confirmation.
  """

    #emit an event to the external stream to be captured
  ctx.write_event_to_stream(
    InputRequiredEvent(
      prefix="Are you sure you want to proceed?",
      user_name="Lydon",
    )
  )

  # wait until we see a HumanResponseEvent
  response = await ctx.wait_for_event(
    HumanResponseEvent, requirements={"user_name": "Lydon"}
  )
  
  # act on the input from the event
  if response.response.strip().lower() == "yes":
    return "Dangerous task completed successfully."
  else:
    return "Dangerous task aborted."

workflow = AgentWorkflow.from_tools_or_functions(
  [dangerous_task],
  llm=llm,
  system_prompt="You are a helpful assistant that can perform dangerous tasks.",
)




In [39]:
# the place to answer is at the top of VSCode

handler = workflow.run(user_msg="I want to proceed with the dangerous task.")

async for event in handler.stream_events():
    # capture InputRequiredEvent
    if isinstance(event, InputRequiredEvent):
        # capture keyboard input
        response = input(event.prefix)
        # send our response back
        handler.ctx.send_event(
            HumanResponseEvent(
                response=response,
                user_name=event.user_name,
            )
        )

response = await handler
print(str(response))

The dangerous task has been completed successfully. If you need anything else, feel free to ask!


The following code is the Multi Agent code

In [40]:
async def search_web(query: str) -> str:
    """Useful for using the web to answer questions."""
    client = AsyncTavilyClient(api_key=tavily_api_key)
    return str(await client.search(query))


async def record_notes(ctx: Context, notes: str, notes_title: str) -> str:
    """Useful for recording notes on a given topic."""
    current_state = await ctx.get("state")
    if "research_notes" not in current_state:
        current_state["research_notes"] = {}
    current_state["research_notes"][notes_title] = notes
    await ctx.set("state", current_state)
    return "Notes recorded."


async def write_report(ctx: Context, report_content: str) -> str:
    """Useful for writing a report on a given topic."""
    current_state = await ctx.get("state")
    current_state["report_content"] = report_content
    await ctx.set("state", current_state)
    return "Report written."


async def review_report(ctx: Context, review: str) -> str:
    """Useful for reviewing a report and providing feedback."""
    current_state = await ctx.get("state")
    current_state["review"] = review
    await ctx.set("state", current_state)
    return "Report reviewed."

In [41]:
from llama_index.core.agent.workflow import FunctionAgent, ReActAgent

research_agent = FunctionAgent(
    name="ResearchAgent",
    description="Useful for searching the web for information on a given topic and recording notes on the topic.",
    system_prompt=(
        "You are the ResearchAgent that can search the web for information on a given topic and record notes on the topic. "
        "Once notes are recorded and you are satisfied, you should hand off control to the WriteAgent to write a report on the topic."
    ),
    llm=llm,
    tools=[search_web, record_notes],
    # the below is a hint to help the agent know which other agent it can handoff to
    can_handoff_to=["WriteAgent"],
)

write_agent = FunctionAgent(
    name="WriteAgent",
    description="Useful for writing a report on a given topic.",
    system_prompt=(
        "You are the WriteAgent that can write a report on a given topic. "
        "Your report should be in a markdown format. The content should be grounded in the research notes. "
        "Once the report is written, you should get feedback at least once from the ReviewAgent."
    ),
    llm=llm,
    tools=[write_report],
    can_handoff_to=["ReviewAgent", "ResearchAgent"],
)

review_agent = FunctionAgent(
    name="ReviewAgent",
    description="Useful for reviewing a report and providing feedback.",
    system_prompt=(
        "You are the ReviewAgent that can review a report and provide feedback. "
        "Your feedback should either approve the current report or request changes for the WriteAgent to implement."
    ),
    llm=llm,
    tools=[review_report],
    can_handoff_to=["WriteAgent"],
)

In [42]:
from llama_index.core.agent.workflow import AgentWorkflow

agent_workflow = AgentWorkflow(
    agents=[research_agent, write_agent, review_agent],
    # root_agent is the one that it starts with
    root_agent=research_agent.name,
    initial_state={
        "research_notes": {},
        "report_content": "Not written yet.",
        "review": "Review required.",
    },
)

In [43]:
from llama_index.core.agent.workflow import (
    AgentInput,
    AgentOutput,
    ToolCall,
    ToolCallResult,
    AgentStream,
)

handler = agent_workflow.run(
    user_msg="Write me a report on the history of the web. Briefly describe the history of the world wide web, including the development of the internet and the development of the web, including 21st century developments"
)

current_agent = None
current_tool_calls = ""
async for event in handler.stream_events():
    if (
        hasattr(event, "current_agent_name")
        and event.current_agent_name != current_agent
    ):
        current_agent = event.current_agent_name
        print(f"\n{'='*50}")
        print(f"🤖 Agent: {current_agent}")
        print(f"{'='*50}\n")
    elif isinstance(event, AgentOutput):
        if event.response.content:
            print("📤 Output:", event.response.content)
        if event.tool_calls:
            print(
                "🛠️  Planning to use tools:",
                [call.tool_name for call in event.tool_calls],
            )
    elif isinstance(event, ToolCallResult):
        print(f"🔧 Tool Result ({event.tool_name}):")
        print(f"  Arguments: {event.tool_kwargs}")
        print(f"  Output: {event.tool_output}")
    elif isinstance(event, ToolCall):
        print(f"🔨 Calling Tool: {event.tool_name}")
        print(f"  With arguments: {event.tool_kwargs}")


🤖 Agent: ResearchAgent

🛠️  Planning to use tools: ['search_web']
🔨 Calling Tool: search_web
  With arguments: {'query': 'history of the world wide web development internet 21st century'}
🔧 Tool Result (search_web):
  Arguments: {'query': 'history of the world wide web development internet 21st century'}
  Output: {'query': 'history of the world wide web development internet 21st century', 'follow_up_questions': None, 'answer': None, 'images': [], 'results': [{'title': 'World Wide Web | History, Uses & Benefits | Britannica', 'url': 'https://www.britannica.com/topic/World-Wide-Web', 'content': 'The Editors of Encyclopaedia Britannica Last Updated: Jan 6, 2025 • Article History Table of Contents Table of Contents Ask the Chatbot a Question Byname: the Web (Show more) See all related content World Wide Web (WWW), the leading information retrieval service of the Internet (the worldwide computer network). The Web gives users access to a vast array of mass media and content—via the deep we

In [44]:
state = await handler.ctx.get("state")
print(state["report_content"])

# History of the World Wide Web and Internet

## Introduction
The World Wide Web (WWW) is a system of interlinked hypertext documents accessed via the Internet. It has transformed the way we communicate, share information, and conduct business globally. This report outlines the history of the WWW, the development of the Internet, and significant advancements in the 21st century.

## Development of the Internet
The origins of the Internet can be traced back to the late 1960s with the development of ARPANET, a project funded by the U.S. Department of Defense. ARPANET utilized packet switching technology, which allowed multiple computers to communicate on a single network. This laid the groundwork for the modern Internet.

In the 1980s, the Internet began to expand beyond military and academic use, leading to the commercialization of the Internet in the 1990s. This period saw the introduction of user-friendly interfaces and the first web browsers, making the Internet accessible to the gen

In [45]:
print(state["review"])

The report on the history of the web is well-structured and covers the key developments in the history of the Internet and the World Wide Web. It effectively outlines the evolution from ARPANET to the commercialization of the Internet and highlights significant 21st-century advancements such as mobile technology, IoT, and blockchain. Overall, the report is informative and concise, making it suitable for readers seeking to understand the topic. I approve this report.


In [46]:
from llama_index.core.workflow import (
    StartEvent,
    StopEvent,
    Workflow,
    step,
)

class MyWorkflow(Workflow):
    @step
    async def my_step(self, ev: StartEvent) -> StopEvent:
        # do something here
        return StopEvent(result="Hello, world!")

In [47]:
workflow = MyWorkflow(timeout=10, verbose=False)
result = await workflow.run()
print(result)

Hello, world!


In [49]:
from llama_index.core.workflow import Event

class FirstEvent(Event):
    first_output: str

class SecondEvent(Event):
    second_output: str

In [50]:
class MyWorkflow(Workflow):
    @step
    async def step_one(self, ev: StartEvent) -> FirstEvent:
        print(ev.first_input)
        return FirstEvent(first_output="First step complete.")

    @step
    async def step_two(self, ev: FirstEvent) -> SecondEvent:
        print(ev.first_output)
        return SecondEvent(second_output="Second step complete.")

    @step
    async def step_three(self, ev: SecondEvent) -> StopEvent:
        print(ev.second_output)
        return StopEvent(result="Workflow complete.")


w = MyWorkflow(timeout=10, verbose=False)
result = await w.run(first_input="Start the workflow.")
print(result)

Start the workflow.
First step complete.
Second step complete.
Workflow complete.


In [52]:
class LoopEvent(Event):
    loop_output: str

In [55]:
import random

class MyWorkflow(Workflow):

    @step
    async def step_one(self, ev: StartEvent | LoopEvent) -> FirstEvent | LoopEvent:
        if random.randint(0, 1) == 0:
            print("Bad thing happened")
            return LoopEvent(loop_output="Back to step one.")
        else:
            print("Good thing happened")
            return FirstEvent(first_output="First step complete.")

    @step
    async def step_two(self, ev: FirstEvent) -> SecondEvent:
        print(ev.first_output)
        return SecondEvent(second_output="Second step complete.")

    @step
    async def step_three(self, ev: SecondEvent) -> StopEvent:
        print(ev.second_output)
        return StopEvent(result="Workflow complete.")


w = MyWorkflow(timeout=10, verbose=False)
result = await w.run(first_input="Start the workflow.")
print(result)

Bad thing happened
Bad thing happened
Good thing happened
First step complete.
Second step complete.
Workflow complete.


In [None]:
from llama_index.utils.workflow import draw_all_possible_flows

draw_all_possible_flows(MyWorkflow, filename="basic_workflow.html")

In [57]:
class BranchA1Event(Event):
    payload: str


class BranchA2Event(Event):
    payload: str


class BranchB1Event(Event):
    payload: str


class BranchB2Event(Event):
    payload: str


class BranchWorkflow(Workflow):
    @step
    async def start(self, ev: StartEvent) -> BranchA1Event | BranchB1Event:
        if random.randint(0, 1) == 0:
            print("Go to branch A")
            return BranchA1Event(payload="Branch A")
        else:
            print("Go to branch B")
            return BranchB1Event(payload="Branch B")

    @step
    async def step_a1(self, ev: BranchA1Event) -> BranchA2Event:
        print(ev.payload)
        return BranchA2Event(payload=ev.payload)

    @step
    async def step_b1(self, ev: BranchB1Event) -> BranchB2Event:
        print(ev.payload)
        return BranchB2Event(payload=ev.payload)

    @step
    async def step_a2(self, ev: BranchA2Event) -> StopEvent:
        print(ev.payload)
        return StopEvent(result="Branch A complete.")

    @step
    async def step_b2(self, ev: BranchB2Event) -> StopEvent:
        print(ev.payload)
        return StopEvent(result="Branch B complete.")

In [59]:
from llama_index.utils.workflow import draw_all_possible_flows

draw_all_possible_flows(BranchWorkflow, filename="basic_workflow.html")

<class 'NoneType'>
<class '__main__.BranchA1Event'>
<class '__main__.BranchB1Event'>
<class '__main__.BranchA2Event'>
<class 'llama_index.core.workflow.events.StopEvent'>
<class '__main__.BranchB2Event'>
<class 'llama_index.core.workflow.events.StopEvent'>
basic_workflow.html


In [60]:
import asyncio

class StepTwoEvent(Event):
    query: str

class ParallelFlow(Workflow):
    @step
    async def start(self, ctx: Context, ev: StartEvent) -> StepTwoEvent:
        ctx.send_event(StepTwoEvent(query="Query 1"))
        ctx.send_event(StepTwoEvent(query="Query 2"))
        ctx.send_event(StepTwoEvent(query="Query 3"))

    @step(num_workers=4)
    async def step_two(self, ctx: Context, ev: StepTwoEvent) -> StopEvent:
        print("Running slow query ", ev.query)
        await asyncio.sleep(random.randint(1, 5))

        return StopEvent(result=ev.query)

In [61]:
w = ParallelFlow(timeout=10, verbose=False)
result = await w.run(message="Start the workflow.")
print(result)

Running slow query  Query 1
Running slow query  Query 2
Running slow query  Query 3
Query 2


In [62]:
class StepThreeEvent(Event):
    result: str

class ConcurrentFlow(Workflow):
    @step
    async def start(self, ctx: Context, ev: StartEvent) -> StepTwoEvent:
        ctx.send_event(StepTwoEvent(query="Query 1"))
        ctx.send_event(StepTwoEvent(query="Query 2"))
        ctx.send_event(StepTwoEvent(query="Query 3"))

    @step(num_workers=4)
    async def step_two(self, ctx: Context, ev: StepTwoEvent) -> StepThreeEvent:
        print("Running query ", ev.query)
        await asyncio.sleep(random.randint(1, 5))
        return StepThreeEvent(result=ev.query)

    @step
    async def step_three(self, ctx: Context, ev: StepThreeEvent) -> StopEvent:
        # wait until we receive 3 events
        result = ctx.collect_events(ev, [StepThreeEvent] * 3)
        if result is None:
            print("Not all events received yet.")
            return None

        # do something with all 3 results together
        print(result)
        return StopEvent(result="Done")

In [63]:
w = ConcurrentFlow(timeout=10, verbose=False)
result = await w.run(message="Start the workflow.")
print(result)

Running query  Query 1
Running query  Query 2
Running query  Query 3
Not all events received yet.
Not all events received yet.
[StepThreeEvent(result='Query 2'), StepThreeEvent(result='Query 3'), StepThreeEvent(result='Query 1')]
Done


In [64]:
class StepAEvent(Event):
    query: str

class StepACompleteEvent(Event):
    result: str

class StepBEvent(Event):
    query: str

class StepBCompleteEvent(Event):
    result: str

class StepCEvent(Event):
    query: str

class StepCCompleteEvent(Event):
    result: str

class ConcurrentFlow(Workflow):
    @step
    async def start(
        self, ctx: Context, ev: StartEvent
    ) -> StepAEvent | StepBEvent | StepCEvent:
        ctx.send_event(StepAEvent(query="Query 1"))
        ctx.send_event(StepBEvent(query="Query 2"))
        ctx.send_event(StepCEvent(query="Query 3"))

    @step
    async def step_a(self, ctx: Context, ev: StepAEvent) -> StepACompleteEvent:
        print("Doing something A-ish")
        return StepACompleteEvent(result=ev.query)

    @step
    async def step_b(self, ctx: Context, ev: StepBEvent) -> StepBCompleteEvent:
        print("Doing something B-ish")
        return StepBCompleteEvent(result=ev.query)

    @step
    async def step_c(self, ctx: Context, ev: StepCEvent) -> StepCCompleteEvent:
        print("Doing something C-ish")
        return StepCCompleteEvent(result=ev.query)

    @step
    async def step_three(
        self,
        ctx: Context,
        ev: StepACompleteEvent | StepBCompleteEvent | StepCCompleteEvent,
    ) -> StopEvent:
        print("Received event ", ev.result)

        # wait until we receive 3 events
        events = ctx.collect_events(
            ev,
            [StepCCompleteEvent, StepACompleteEvent, StepBCompleteEvent],
        )
        if (events is None):
            return None

        # do something with all 3 results together
        print("All events received: ", events)
        return StopEvent(result="Done")

In [65]:
w = ConcurrentFlow(timeout=10, verbose=False)
result = await w.run(message="Start the workflow.")
print(result)

Doing something A-ish
Doing something B-ish
Doing something C-ish
Received event  Query 1
Received event  Query 2
Received event  Query 3
All events received:  [StepCCompleteEvent(result='Query 3'), StepACompleteEvent(result='Query 1'), StepBCompleteEvent(result='Query 2')]
Done


In [66]:
from llama_index.utils.workflow import draw_all_possible_flows

draw_all_possible_flows(ConcurrentFlow, filename="basic_workflow.html")

<class 'NoneType'>
<class '__main__.StepAEvent'>
<class '__main__.StepBEvent'>
<class '__main__.StepCEvent'>
<class '__main__.StepACompleteEvent'>
<class '__main__.StepBCompleteEvent'>
<class '__main__.StepCCompleteEvent'>
<class 'llama_index.core.workflow.events.StopEvent'>
basic_workflow.html


In [67]:
class FirstEvent(Event):
    first_output: str

class SecondEvent(Event):
    second_output: str
    response: str

class TextEvent(Event):
    delta: str

class ProgressEvent(Event):
    msg: str

class MyWorkflow(Workflow):
    @step
    async def step_one(self, ctx: Context, ev: StartEvent) -> FirstEvent:
        ctx.write_event_to_stream(ProgressEvent(msg="Step one is happening"))
        return FirstEvent(first_output="First step complete.")

    @step
    async def step_two(self, ctx: Context, ev: FirstEvent) -> SecondEvent:
        llm = OpenAI(model="gpt-4o-mini", api_key=api_key) # the OpenAI key we set up at the beginning
        generator = await llm.astream_complete(
            "Please give me the first 50 words of Moby Dick, a book in the public domain."
        )
        async for response in generator:
            # Allow the workflow to stream this piece of response
            ctx.write_event_to_stream(TextEvent(delta=response.delta))
        return SecondEvent(
            second_output="Second step complete, full response attached",
            response=str(response),
        )

    @step
    async def step_three(self, ctx: Context, ev: SecondEvent) -> StopEvent:
        ctx.write_event_to_stream(ProgressEvent(msg="Step three is happening"))
        return StopEvent(result="Workflow complete.")

In [68]:
workflow = MyWorkflow(timeout=30, verbose=False)
handler = workflow.run(first_input="Start the workflow.")

async for ev in handler.stream_events():
    if isinstance(ev, ProgressEvent):
        print(ev.msg)
    if isinstance(ev, TextEvent):
        print(ev.delta, end="")

final_result = await handler
print("Final result = ", final_result)

Step one is happening
Sure! Here are the first 50 words of "Moby Dick" by Herman Melville:

"Call me Ishmael. Some years ago—never mind how long precisely—I had little or no money in my purse, and nothing particular to interest me on shore. So, whenever I find myself growing grim about the mouth; whenever it is a damp, drizzly November in my soul;"Step three is happening
Final result =  Workflow complete.


Multi Agent System

In [69]:
from llama_index.core.agent import FunctionCallingAgent as GenericFunctionCallingAgent
from llama_index.core.tools import FunctionTool

# convert our web search functions into a tool, the generic function calling agent doesn't do this automatically (yet)
search_web_tool = FunctionTool.from_defaults(fn=search_web)

research_agent = GenericFunctionCallingAgent.from_tools(
    tools=[search_web_tool],
    llm=llm,
    verbose=False,
    allow_parallel_tool_calls=False,
    system_prompt="You are an agent that does research by searching the web and then records the results of your research."
)
write_agent = GenericFunctionCallingAgent.from_tools(
    tools=[],
    llm=llm,
    verbose=False,
    allow_parallel_tool_calls=False,
    system_prompt="You are an agent that writes a report based on the results of research by another agent."
)
review_agent = GenericFunctionCallingAgent.from_tools(
    tools=[],
    llm=llm,
    verbose=False,
    allow_parallel_tool_calls=False,
    system_prompt="You are an agent that reviews a report written by a different agent."
)

In [70]:
class ResearchEvent(Event):
    prompt: str

class WriteEvent(Event):
    research: str

class ReviewEvent(Event):
    report: str

class ReviewResults(Event):
    review: str

class SimpleMultiAgentFlow(Workflow):

    @step
    async def setup(self, ev: StartEvent) -> ResearchEvent:
        self.research_agent = ev.research_agent
        self.write_agent = ev.write_agent
        self.review_agent = ev.review_agent
        return ResearchEvent(prompt=ev.prompt)

    @step
    async def research(self, ctx: Context, ev: ResearchEvent) -> WriteEvent:

        await ctx.set("prompt", ev.prompt)
        result = self.research_agent.chat(f"Do some research that another agent will use to write a report about this topic: <topic>{ev.prompt}</topic>. Just include the facts without making it into a full report.")

        return WriteEvent(research=str(result))

    @step
    async def write(self, ctx: Context, ev: WriteEvent) -> ReviewEvent:
        result = self.write_agent.chat(f"Write a report based on this research: <research>{ev.research}</research> and this topic: <topic>{await ctx.get('prompt')}</topic>")

        return ReviewEvent(report=str(result))

    @step
    async def review(self, ctx: Context, ev: ReviewEvent) -> StopEvent:
        result = self.review_agent.chat(f"Review this report: <report>{ev.report}</report>")

        ctx.write_event_to_stream(ReviewResults(review=str(result)))

        return StopEvent(result=ev.report)

In [71]:
import nest_asyncio
nest_asyncio.apply()
#                                            verbose is what will print out "Running step setup"
workflow = SimpleMultiAgentFlow(timeout=30, verbose=True)
handler = workflow.run(
    prompt="History of San Francisco",
    research_agent=research_agent,
    write_agent=write_agent,
    review_agent=review_agent
)

async for ev in handler.stream_events():
    if isinstance(ev, ReviewResults):
        print("==== The review ====")
        print(ev.review)

final_result = await handler
print("==== The report ====")
print(final_result)

Running step setup
Step setup produced event ResearchEvent
Running step research
Step research produced event WriteEvent
Running step write
Step write produced event ReviewEvent
Running step review
Step review produced event StopEvent
==== The review ====
### Review of the Report on the History of San Francisco

#### Overall Impression
The report provides a concise and informative overview of San Francisco's history, effectively covering key milestones from its indigenous roots to its modern status as a global city. The structure is logical, and the flow of information is coherent, making it easy for readers to follow the historical narrative.

#### Strengths
1. **Comprehensive Coverage**: The report successfully highlights significant periods in San Francisco's history, including indigenous history, Spanish colonization, Mexican rule, American annexation, the Gold Rush, the 1906 earthquake and fire, and modern developments. This breadth gives readers a well-rounded understanding of th

Add Reflection

In [72]:
class RewriteEvent(Event):
    review: str

# Our WriteEvent doesn't need research attached any more
class WriteEvent(Event):
    pass

class MultiAgentFlowWithReflection(Workflow):

    @step
    async def setup(self, ev: StartEvent) -> ResearchEvent:
        self.research_agent = ev.research_agent
        self.write_agent = ev.write_agent
        self.review_agent = ev.review_agent
        return ResearchEvent(prompt=ev.prompt)

    @step
    async def research(self, ctx: Context, ev: ResearchEvent) -> WriteEvent:

        await ctx.set("prompt", ev.prompt)
        result = self.research_agent.chat(f"Do some research that another agent will use to write a report about this topic: <topic>{ev.prompt}</topic>. Just include the facts without making it into a full report.")

        # store the research in the context for multiple uses
        await ctx.set("research", str(result))

        return WriteEvent()

    @step
    async def write(self, ctx: Context, ev: WriteEvent | RewriteEvent ) -> ReviewEvent:

        prompt = f"Write a report based on this research: <research>{await ctx.get('research')}</research> and this topic: <topic>{await ctx.get('prompt')}</topic> "

        # detect RewriteEvents and modify the prompt
        if isinstance(ev, RewriteEvent):
            print("Doing a rewrite!")
            prompt += f"This report has reviewed and the reviewer had this feedback, which you should take into account: <review>{ev.review}</review>"

        result = self.write_agent.chat(prompt)

        return ReviewEvent(report=str(result))

    @step
    async def review(self, ctx: Context, ev: ReviewEvent) -> StopEvent | RewriteEvent:
        result = self.review_agent.chat(f"Review this report: {ev.report}")

        # get the LLM to self-reflect
        try_again = llm.complete(f"This is a review of a report written by an agent. If you think this review is bad enough that the agent should try again, respond with just the word RETRY. If the review is good, reply with just the word CONTINUE. Here's the review: <review>{str(result)}</review>")

        if try_again.text == "RETRY":
            print("Reviewer said try again")
            return RewriteEvent(review=str(result))
        else:
            print("Reviewer thought it was good!")
            return StopEvent(result=ev.report)

In [73]:
workflow = MultiAgentFlowWithReflection(timeout=30, verbose=True)
handler = workflow.run(
    prompt="History of San Francisco",
    research_agent=research_agent,
    write_agent=write_agent,
    review_agent=review_agent
)

final_result = await handler
print("==== The report ====")
print(final_result)

Running step setup
Step setup produced event ResearchEvent
Running step research
Step research produced event WriteEvent
Running step write
Step write produced event ReviewEvent
Running step review
Reviewer thought it was good!
Step review produced event StopEvent
==== The report ====
# Report on the History of San Francisco

## Introduction
San Francisco, a city renowned for its stunning landscapes and rich cultural heritage, has a complex history that reflects its geographical significance and the diverse communities that have shaped it. This report outlines key historical events and milestones that have contributed to the development of San Francisco from its indigenous roots to its modern-day status.

## Geographical Significance
San Francisco is strategically located at the entrance to one of the largest natural harbors on the Pacific coast. This advantageous position has played a crucial role in its development as a center of maritime trade, facilitating economic growth and attra

In [75]:
from llama_index.utils.workflow import draw_all_possible_flows

draw_all_possible_flows(MultiAgentFlowWithReflection, filename="basic_workflow.html")

<class 'NoneType'>
<class '__main__.WriteEvent'>
<class 'llama_index.core.workflow.events.StopEvent'>
<class '__main__.RewriteEvent'>
<class '__main__.ResearchEvent'>
<class '__main__.ReviewEvent'>
basic_workflow.html
