https://huggingface.co/agents-course/notebooks/blob/main/unit2/llama-index/workflows.ipynb

In [1]:
# !pip install llama-index datasets llama-index-callbacks-arize-phoenix llama-index-vector-stores-chroma llama-index-utils-workflow llama-index-llms-huggingface-api pyvis -U -q

In [2]:
import random
from llama_index.llms.huggingface_api import HuggingFaceInferenceAPI
from llama_index.llms.mlx import MLXLLM
from llama_index.core.agent.workflow import ReActAgent, AgentWorkflow
from llama_index.core.workflow import (
    Context,
    Event,
    StartEvent,
    StopEvent,
    Workflow,
    draw_all_possible_flows,
    step,
)

In [3]:
class MyWorkflow(Workflow):
    @step
    async def my_step(self, ev: StartEvent) -> StopEvent:
        # do something here
        return StopEvent(result="Hello, world!")

In [4]:
w = MyWorkflow(timeout=10, verbose=False)
result = await w.run()
result

'Hello, world!'

# Connecting Multiple Steps

In [5]:
class ProcessingEvent(Event):
    intermediate_result: str

In [6]:
class MultiStepWorkflow(Workflow):
    @step
    async def step_one(self, ev: StartEvent) -> ProcessingEvent:
        # Process initial data
        return ProcessingEvent(intermediate_result="Step 1 complete")

    @step
    async def step_two(self, ev: ProcessingEvent) -> StopEvent:
        # Use the intermediate result
        final_result = f"Finished processing: {ev.intermediate_result}"
        return StopEvent(result=final_result)

In [7]:
w = MultiStepWorkflow(timeout=10, verbose=False)
result = await w.run()
result

'Finished processing: Step 1 complete'

# Loops and Branches

In [8]:
from llama_index.core.workflow import Event

In [9]:
class ProcessingEvent(Event):
    intermediate_result: str

In [10]:
class LoopEvent(Event):
    loop_output: str

In [11]:
class MultiStepWorkflow(Workflow):
    @step
    async def step_one(self, ev: StartEvent) -> ProcessingEvent | LoopEvent:
        if random.randint(0, 1) == 0:
            print("Bad thing happened")
            return LoopEvent(loop_output="Back to step one.")
        else:
            print("Good thing happened")
            return ProcessingEvent(intermediate_result="First step complete.")

    @step
    async def step_two(self, ev: ProcessingEvent | LoopEvent) -> StopEvent:
        # Use the intermediate result
        final_result = f"Finished processing: {ev.intermediate_result}"
        return StopEvent(result=final_result)

In [12]:
w = MultiStepWorkflow(verbose=False)
result = await w.run()
result

Good thing happened


'Finished processing: First step complete.'

# Drawing Workflows

In [13]:
draw_all_possible_flows(w)

workflow_all_flows.html


  draw_all_possible_flows(w)


# State Management

In [14]:
class ProcessingEvent(Event):
    intermediate_result: str

In [15]:
class MultiStepWorkflow(Workflow):
    @step
    async def step_one(self, ev: StartEvent, ctx: Context) -> ProcessingEvent:
        # Process initial data
        await ctx.set("query", "What is the capital of France?")
        return ProcessingEvent(intermediate_result="Step 1 complete")

    @step
    async def step_two(self, ev: ProcessingEvent, ctx: Context) -> StopEvent:
        # Use the intermediate result
        query = await ctx.get("query")
        print(f"Query: {query}")
        final_result = f"Finished processing: {ev.intermediate_result}"
        return StopEvent(result=final_result)

In [16]:
w = MultiStepWorkflow(timeout=10, verbose=False)
result = await w.run()
result

Query: What is the capital of France?


'Finished processing: Step 1 complete'

# Multi-Agent Workflows

In [17]:
# Define some tools
def add(a: int, b: int) -> int:
    """Add two numbers."""
    return a + b

def multiply(a: int, b: int) -> int:
    """Multiply two numbers."""
    return a * b

In [18]:
# llm = MLXLLM(model_name="mlx-community/Qwen2.5-Coder-3B-Instruct-bf16")
llm = HuggingFaceInferenceAPI(model_name="Qwen/Qwen2.5-Coder-3B-Instruct")

In [19]:
# we can pass functions directly without FunctionTool -- the fn/docstring are parsed for the name/description
multiply_agent = ReActAgent(
    name="multiply_agent",
    description="Is able to multiply two integers",
    system_prompt="A helpful assistant that can use a tool to multiply numbers.",
    tools=[multiply], 
    llm=llm,
)

addition_agent = ReActAgent(
    name="add_agent",
    description="Is able to add two integers",
    system_prompt="A helpful assistant that can use a tool to add numbers.",
    tools=[add], 
    llm=llm,
)

In [20]:
# Create the workflow
workflow = AgentWorkflow(
    agents=[multiply_agent, addition_agent],
    root_agent="multiply_agent"
)

In [21]:
# Run the system
response = await workflow.run(user_msg="Can you add 5 and 3?")

In [22]:
response

AgentOutput(response=ChatMessage(role=<MessageRole.ASSISTANT: 'assistant'>, additional_kwargs={}, blocks=[TextBlock(block_type='text', text='8')]), tool_calls=[ToolCallResult(tool_name='handoff', tool_kwargs={'to_agent': 'add_agent', 'reason': 'The user wants to add two numbers.'}, tool_id='16760f5f-c680-4905-830f-aba168a6f9f8', tool_output=ToolOutput(content='Agent add_agent is now handling the request due to the following reason: The user wants to add two numbers..\nPlease continue with the current request.', tool_name='handoff', raw_input={'args': (), 'kwargs': {'to_agent': 'add_agent', 'reason': 'The user wants to add two numbers.'}}, raw_output='Agent add_agent is now handling the request due to the following reason: The user wants to add two numbers..\nPlease continue with the current request.', is_error=False), return_direct=True)], raw=ChatCompletionStreamOutput(choices=[ChatCompletionStreamOutputChoice(delta=ChatCompletionStreamOutputDelta(role='assistant', content='8', tool_c