# Workflows

- A workflow provides a structured way to organize your code into sequential and manageable steps.
- created by defining Steps which are triggered by Events, and themselves emit Events to trigger further steps.

##### Workflows offer several key benefits:

    - Clear organization of code into discrete steps
    - Event-driven architecture for flexible control flow
    - Type-safe communication between steps
    - Built-in state management
    - Support for both simple and complex agent interactions


In [2]:
from llama_index.core.workflow import StartEvent,StopEvent,Workflow,step
class MyWorkflow(Workflow):
    @step
    async def my_step(self,ev:StartEvent)->StopEvent:
        return StopEvent(result="Hello World")
w=MyWorkflow(timeout=10,verbose=False)
result=await w.run()

## Connecting Multiple Steps

- we create custom events that carry data between steps.
- we add an event that is passed between the steps and transfer the output of the first step to the second step.


In [3]:
from llama_index.core.workflow import Event
class ProcessingEvent(Event):
    intermediate_result:str
class MultiStepWorkFlow(Workflow):
    @step
    async def step_one(self,ev:StartEvent)->ProcessingEvent:
        return ProcessingEvent(intermediate_result="Step 1 complete")
    @step
    async def step_two(self,ev: ProcessingEvent)->StopEvent:
        final_result=f"Finished processing: {ev.intermediate_result}"
        return StopEvent(result=final_result)
w=MultiStepWorkFlow(timeout=10,verbose=False)
result= await w.run()
result

'Finished processing: Step 1 complete'

## Loops and Branches

- The type hinting is the most powerful part of workflows because it allows us to create branches, loops and joins to facilitate more complex workflows.
- LoopEvent is taken as input for the step and can also be returned as output.


In [4]:
from llama_index.core.workflow import Event
import random


class ProcessingEvent(Event):
    intermediate_result: str


class LoopEvent(Event):
    loop_output: str


class MultiStepWorkflow(Workflow):
    @step
    async def step_one(self, ev: StartEvent | LoopEvent) -> ProcessingEvent | LoopEvent:
        if random.randint(0, 1) == 0:
            print("Bad thing happened")
            return LoopEvent(loop_output="Back to step one.")
        else:
            print("Good thing happened")
            return ProcessingEvent(intermediate_result="First step complete.")

    @step
    async def step_two(self, ev: ProcessingEvent) -> StopEvent:
        # Use the intermediate result
        final_result = f"Finished processing: {ev.intermediate_result}"
        return StopEvent(result=final_result)


w = MultiStepWorkflow(verbose=False)
result = await w.run()
result

Bad thing happened
Bad thing happened
Bad thing happened
Bad thing happened
Good thing happened


'Finished processing: First step complete.'

In [5]:
from llama_index.utils.workflow import draw_all_possible_flows

#w = ... # as defined in the previous section
draw_all_possible_flows(w, "flow.html")

flow.html


## State Management

- It is useful when you want to keep track of the state of the workflow, so that every step has access to the same state.


In [6]:
from llama_index.core.workflow import Context,StartEvent,StopEvent
class stateManagementWorkflow(Workflow):
    @step
    async def query(self,ctx:Context,ev:StartEvent)->StopEvent:
        await ctx.set("query","What is the capital of France?" )
        val=ctx.event_params
        query=await ctx.get("query")
        return StopEvent(result=val)

## Automating workflows with Multi-agent workflows

- We can use the AgentWorkflow class to create a multi-agent workflow.
- The AgentWorkflow uses Workflow Agents to allow you to create a system of one or more agents that can collaborate and hand off tasks to each other based on their specialized capabilities.
- One agent must be designated as the root agent in the AgentWorkflow constructor. When a user message comes in, it is first routed to the root agent.


In [7]:
from llama_index.core.agent.workflow import AgentWorkflow, ReActAgent
from llama_index.llms.huggingface_api import HuggingFaceInferenceAPI

# Define some tools
def add(a: int, b: int) -> int:
    """Add two numbers."""
    return a + b

def multiply(a: int, b: int) -> int:
    """Multiply two numbers."""
    return a * b

llm = HuggingFaceInferenceAPI(model_name="Qwen/Qwen2.5-Coder-32B-Instruct")

# we can pass functions directly without FunctionTool -- the fn/docstring are parsed for the name/description
multiply_agent = ReActAgent(
    name="multiply_agent",
    description="Is able to multiply two integers",
    system_prompt="A helpful assistant that can use a tool to multiply numbers.",
    tools=[multiply],
    llm=llm,
)

addition_agent = ReActAgent(
    name="add_agent",
    description="Is able to add two integers",
    system_prompt="A helpful assistant that can use a tool to add numbers.",
    tools=[add],
    llm=llm,
)

# Create the workflow
workflow = AgentWorkflow(
    agents=[multiply_agent, addition_agent],
    root_agent="multiply_agent",
)

# Run the system
response = await workflow.run(user_msg="Can you add 5 and 3?")

WorkflowRuntimeError: Error in step 'run_agent_step': 402, message='Payment Required', url='https://router.huggingface.co/hf-inference/models/Qwen/Qwen2.5-Coder-32B-Instruct/v1/chat/completions'

In [8]:
from llama_index.core.workflow import Context
async def add(ctx:Context,a:int,b:int)->int:
    """Add two numbers"""
    cur_state=await ctx.get("state")
    cur_state["num_fn_calls"]+=1
    await ctx.set("state",cur_state)
    return a+b
async def multiply(ctx:Context,a:int,b:int)->int:
    """Multiply two numbers."""
    cur_state=await ctx.get("state")
    cur_state["num_fn_calls"]+=1
    await ctx.set("state",cur_state)
    return a*b

In [9]:
workflow=AgentWorkflow(
    agents=[multiply_agent,addition_agent],
    root_agent="multiply_agent",
    initial_state={"num_fn_calls":0},
    state_prompt="Current state:{state}.User message{msg},"
)
ctx=Context(workflow)
response=await workflow.run(user_msg="Can you add 5 and 3?",ctx=ctx)
state=await ctx.get("state")
print(state["num_fn_calls"])

WorkflowRuntimeError: Error in step 'run_agent_step': 402, message='Payment Required', url='https://router.huggingface.co/hf-inference/models/Qwen/Qwen2.5-Coder-32B-Instruct/v1/chat/completions'