In [1]:
!pip install llama-index llama-index-vector-stores-chroma llama-index-utils-workflow llama-index-llms-huggingface-api pyvis -U -q

### Basic Workflow creation

We can start by creating a simple workflow. We use the StartEvent and StopEvent classes to define the start and stop of the workflow.

In [2]:
from llama_index.core.workflow import StartEvent, StopEvent, Workflow, step


class MyWorkflow(Workflow):
    @step
    async def my_step(self, ev: StartEvent) -> StopEvent:
        # do something here
        return StopEvent(result="Hello, world!")


w = MyWorkflow(timeout=10, verbose=False)
#As you can see, we will now run the workflow by calling w.run()
result = await w.run()
result

'Hello, world!'

### Connecting multiple steps

In [3]:
from llama_index.core.workflow import Event

class ProcessingEvent(Event):
    intermediate_result: str

class MultiStepWorkflow(Workflow):
    @step
    async def step_one(self, ev: StartEvent) -> ProcessingEvent:
        # Process initial data
        return ProcessingEvent(intermediate_result="Step 1 complete")
    # @step
    # async def step_two(self, ev: ProcessingEvent) -> ProcessingEvent:
    #     #process first step
    #     return ProcessingEvent(intermediate_result="step 2 complete")
    @step
    async def step_two(self, ev: ProcessingEvent) -> StopEvent:
        # Use the intermediate result
        final_result = f"Finished processing: {ev.intermediate_result}"
        return StopEvent(result=final_result)

w = MultiStepWorkflow(timeout=10, verbose=False)
result = await w.run()
result

'Finished processing: Step 1 complete'

### Loops and Branches

We can also use type hinting to create branches and loops. Note that we can use the | operator to specify that the step can return multiple types.

In [6]:
from llama_index.core.workflow import Event
import random


class ProcessingEvent(Event):
    intermediate_result: str


class LoopEvent(Event):
    loop_output: str


class MultiStepWorkflow(Workflow):
    @step
    async def step_one(self, ev: StartEvent | LoopEvent) -> ProcessingEvent | LoopEvent:
        if random.randint(0, 1) == 0:
            print("Bad thing happened")
            return LoopEvent(loop_output="Back to step one.")
        else:
            print("Good thing happened")
            return ProcessingEvent(intermediate_result="First step complete.")

    @step
    async def step_two(self, ev: ProcessingEvent) -> StopEvent:
        # Use the intermediate result
        final_result = f"Finished processing: {ev.intermediate_result}"
        return StopEvent(result=final_result)


w = MultiStepWorkflow(verbose=False)
result = await w.run()
result


Good thing happened


'Finished processing: First step complete.'

### Drawing Workflows

We can also draw workflows. Let’s use the draw_all_possible_flows function to draw the workflow. This stores the workflow in an HTML file.

In [8]:
from llama_index.utils.workflow import draw_all_possible_flows

w = MultiStepWorkflow(verbose=False) # as defined in the previous section
draw_all_possible_flows(w, "flow.html")

flow.html


In [9]:
from llama_index.utils.workflow import draw_all_possible_flows

draw_all_possible_flows(w)

workflow_all_flows.html


### State Management

Instead of passing the event information between steps, we can use the Context type hint to pass information between steps. This might be useful for long running workflows, where you want to store information between steps.

In [10]:
from llama_index.core.workflow import Event, Context
from llama_index.core.agent.workflow import ReActAgent


class ProcessingEvent(Event):
    intermediate_result: str


class MultiStepWorkflow(Workflow):
    @step
    async def step_one(self, ev: StartEvent, ctx: Context) -> ProcessingEvent:
        # Process initial data
        await ctx.set("query", "What is the capital of France?")
        return ProcessingEvent(intermediate_result="Step 1 complete")

    @step
    async def step_two(self, ev: ProcessingEvent, ctx: Context) -> StopEvent:
        # Use the intermediate result
        query = await ctx.get("query")
        print(f"Query: {query}")
        final_result = f"Finished processing: {ev.intermediate_result}"
        return StopEvent(result=final_result)


w = MultiStepWorkflow(timeout=10, verbose=False)
result = await w.run()
result

Query: What is the capital of France?


'Finished processing: Step 1 complete'

### Multi-Agent Workflows
We can also create multi-agent workflows. Here we define two agents, one that multiplies two integers and one that adds two integers.

In [11]:
from llama_index.llms.huggingface_api import HuggingFaceInferenceAPI
from llama_index.core.agent.workflow import AgentWorkflow

# Define some tools
def add(a: int, b: int) -> int:
    """Add two numbers."""
    return a + b

def multiply(a: int, b: int) -> int:
    """Multiply two numbers."""
    return a * b

llm = HuggingFaceInferenceAPI(model_name="Qwen/Qwen2.5-Coder-32B-Instruct")

# we can pass functions directly without FunctionTool -- the fn/docstring are parsed for the name/description
multiply_agent = ReActAgent(
    name="multiply_agent",
    description="Is able to multiply two integers",
    system_prompt="A helpful assistant that can use a tool to multiply numbers.",
    tools=[multiply], 
    llm=llm,
)

addition_agent = ReActAgent(
    name="add_agent",
    description="Is able to add two integers",
    system_prompt="A helpful assistant that can use a tool to add numbers.",
    tools=[add], 
    llm=llm,
)

# Create the workflow
workflow = AgentWorkflow(
    agents=[multiply_agent, addition_agent],
    root_agent="multiply_agent"
)

# Run the system
response = await workflow.run(user_msg="Can you add 5 and 3?")
response

AgentOutput(response=ChatMessage(role=<MessageRole.ASSISTANT: 'assistant'>, additional_kwargs={}, blocks=[TextBlock(block_type='text', text='8')]), tool_calls=[ToolCallResult(tool_name='handoff', tool_kwargs={'to_agent': 'add_agent', 'reason': 'The user wants to add two numbers, and the add_agent is better suited for this task.'}, tool_id='edec989e-992a-4b81-bb52-f3660164f9cd', tool_output=ToolOutput(content='Agent add_agent is now handling the request due to the following reason: The user wants to add two numbers, and the add_agent is better suited for this task..\nPlease continue with the current request.', tool_name='handoff', raw_input={'args': (), 'kwargs': {'to_agent': 'add_agent', 'reason': 'The user wants to add two numbers, and the add_agent is better suited for this task.'}}, raw_output='Agent add_agent is now handling the request due to the following reason: The user wants to add two numbers, and the add_agent is better suited for this task..\nPlease continue with the curren

In [12]:
response1 = await workflow.run(user_msg="Can you multiply 5 and 3? and after having the answer, add it to 10?")
response1

AgentOutput(response=ChatMessage(role=<MessageRole.ASSISTANT: 'assistant'>, additional_kwargs={}, blocks=[TextBlock(block_type='text', text='15')]), tool_calls=[ToolCallResult(tool_name='multiply', tool_kwargs={'a': 5, 'b': 3}, tool_id='3d4a32d8-20ed-4477-b39e-63347cf75720', tool_output=ToolOutput(content='15', tool_name='multiply', raw_input={'args': (), 'kwargs': {'a': 5, 'b': 3}}, raw_output=15, is_error=False), return_direct=False)], raw=ChatCompletionStreamOutput(choices=[ChatCompletionStreamOutputChoice(delta=ChatCompletionStreamOutputDelta(role='assistant', content='5', tool_call_id=None, tool_calls=None), index=0, finish_reason=None, logprobs=None)], created=1744817682, id='', model='Qwen/Qwen2.5-Coder-32B-Instruct', system_fingerprint='3.2.1-sha-4d28897', usage=None, object='chat.completion.chunk'), current_agent_name='multiply_agent')

In [13]:
response2 = await workflow.run(user_msg="Can you multiply 5 and 3? and after having the answer, add it to 10?")
response2

AgentOutput(response=ChatMessage(role=<MessageRole.ASSISTANT: 'assistant'>, additional_kwargs={}, blocks=[TextBlock(block_type='text', text='The final result is 25.')]), tool_calls=[ToolCallResult(tool_name='multiply', tool_kwargs={'a': 5, 'b': 3}, tool_id='a7b52351-c41e-47d7-a04d-a6b915f36955', tool_output=ToolOutput(content='15', tool_name='multiply', raw_input={'args': (), 'kwargs': {'a': 5, 'b': 3}}, raw_output=15, is_error=False), return_direct=False), ToolCallResult(tool_name='handoff', tool_kwargs={'to_agent': 'add_agent', 'reason': 'I need to add 15 and 10'}, tool_id='cf53af9a-5479-4d4d-8fa8-843f90a9acd5', tool_output=ToolOutput(content='Agent add_agent is now handling the request due to the following reason: I need to add 15 and 10.\nPlease continue with the current request.', tool_name='handoff', raw_input={'args': (), 'kwargs': {'to_agent': 'add_agent', 'reason': 'I need to add 15 and 10'}}, raw_output='Agent add_agent is now handling the request due to the following reason:

Agent tools can also modify the workflow state we mentioned earlier. Before starting the workflow, we can provide an initial state dict that will be available to all agents. The state is stored in the state key of the workflow context. It will be injected into the state_prompt which augments each new user message.

Let’s inject a counter to count function calls by modifying the previous example:

In [17]:
from llama_index.core.workflow import Context

# Define some tools
async def add(ctx: Context, a: int, b: int) -> int:
    """Add two numbers."""
    # update our count
    cur_state = await ctx.get("state")
    cur_state["num_fn_calls"] += 1
    await ctx.set("state", cur_state)

    return a + b

async def multiply(ctx: Context, a: int, b: int) -> int:
    """Multiply two numbers."""
    # update our count
    cur_state = await ctx.get("state")
    cur_state["num_fn_calls"] += 1
    await ctx.set("state", cur_state)

    return a * b

...

workflow = AgentWorkflow(
    agents=[multiply_agent, addition_agent],
    root_agent="multiply_agent",
    initial_state={"num_fn_calls": 0},
    state_prompt="Current state: {state}. User message: {msg}",
)

# run the workflow with context
ctx = Context(workflow)
response = await workflow.run(user_msg="Can you add 5 and 3?", ctx=ctx)
print(response)

# pull out and inspect the state
state = await ctx.get("state")
print(state["num_fn_calls"])

8
0
