本脚本展示了如何使用 llama_index 库中的工作流（Workflow）系统来构建和运行复杂的流程管理逻辑。通过定义不同的事件类型、步骤函数以及上下文对象，可以实现从简单的线性流程到并发执行、事件收集、嵌套工作流等多种复杂的工作流模式

## 顺序工作流

工作流通过继承Workflow类实现，这个类包含几个步骤，并且每个步骤使用装饰器@step装饰

In [2]:
from llama_index.core.workflow import (
    StartEvent,
    StopEvent,
    Workflow,
    step,
)

class MyWorkflow(Workflow):

    @step
    async def my_step(self,ev:StartEvent)->StopEvent:
        return StopEvent(result="hello world")

w=MyWorkflow(timeout=10,verbose=False)
result=await w.run()
print(result)

hello world


以上工作流定义了开头步骤（StartEvent）和结束标志（StopEvent），这很重要

llamaIndex通过以下代码将工作流可视化，可视化内容存在html，使用浏览器打开即可

![](../../../../../c:/CodeRepos/python/MyCode/LlamaIndex_learnning/doc/2024-12-19_105411.png)

In [3]:
from llama_index.utils.workflow import draw_all_possible_flows

draw_all_possible_flows(MyWorkflow, filename="basic_workflow.html")

<class 'NoneType'>
<class 'llama_index.core.workflow.events.StopEvent'>
basic_workflow.html


进一步地，我们通过定义不同的Event，构建多级的工作流

In [4]:
from llama_index.core.workflow import (
    StartEvent,
    StopEvent,
    Workflow,
    step,
    Event,
)

# 定义事件
class FirstEvent(Event):
    first_output: str

class SecondEvent(Event):
    second_output: str

# 定义工作流
class MyWorkflow(Workflow):
    @step
    async def step_one(self, ev: StartEvent) -> FirstEvent:
        print(ev.first_input)
        return FirstEvent(first_output="First step complete.")

    @step
    async def step_two(self, ev: FirstEvent) -> SecondEvent:
        print(ev.first_output)
        return SecondEvent(second_output="Second step complete.")

    @step
    async def step_three(self, ev: SecondEvent) -> StopEvent:
        print(ev.second_output)
        return StopEvent(result="Workflow complete.")


w = MyWorkflow(timeout=10, verbose=False)
result = await w.run(first_input="Start the workflow.")
print(result)

Start the workflow.
First step complete.
Second step complete.
Workflow complete.


**总结** ：

- WorkFLow是一个起点为StartEvent，终点为StopEvent的流程
- 通过定义不同的Event，构建其多级的工作流
- 工作流内支持变量的流转

## 分支循环工作流

以上例子，只展示了顺序执行的工作流，对于复杂的分支、循环工作流如何构建呢？还是一样，首先定义监听事件Event，通过不同的指定方式来构建不同的工作流

In [6]:
# 循环工作流
class LoopEvent(Event):
    loop_output: str

# 定义工作流
import random
class LoopWorkflow(Workflow):
    @step
    async def step_one(self, ev: StartEvent | LoopEvent) -> FirstEvent | LoopEvent:
        if random.randint(0, 1) == 0:
            print("Bad thing happened")
            return LoopEvent(loop_output="Back to step one.")
        else:
            print("Good thing happened")
            return FirstEvent(first_output="First step complete.")
        
    @step
    async def step_two(self, ev: FirstEvent) -> SecondEvent:
        print(ev.first_output)
        return SecondEvent(second_output="Second step complete.")

    @step
    async def step_three(self, ev: SecondEvent) -> StopEvent:
        print(ev.second_output)
        return StopEvent(result="Workflow complete.")
    
w = LoopWorkflow(timeout=10, verbose=False)
result = await w.run()
print(result)

draw_all_possible_flows(LoopWorkflow, filename="loop_workflow.html")

Bad thing happened
Bad thing happened
Bad thing happened
Good thing happened
First step complete.
Second step complete.
Workflow complete.
<class 'NoneType'>
<class '__main__.FirstEvent'>
<class '__main__.LoopEvent'>
<class 'llama_index.core.workflow.events.StopEvent'>
<class '__main__.SecondEvent'>
loop_workflow.html


多次运行代码，可以发现代码工作流流程有两种：

- A: one[FirstEvent]->two[SecondEvent]-three[StopEvent]
- B: 随机次数one[LoopEvent]-> A

也就是说：**下一步执行那个动作，取决于当前返回事件和动作的监听事件，比如动作two监听FirstEvent，只要当前返回事件是FirstEvent，下一步就会执行two，这一点和MetaGPT定义WorkFlow的概念类似**

In [7]:
# 分支工作流
class BranchA1Event(Event):
    payload: str


class BranchA2Event(Event):
    payload: str


class BranchB1Event(Event):
    payload: str


class BranchB2Event(Event):
    payload: str


class BranchWorkflow(Workflow):
    @step
    async def start(self, ev: StartEvent) -> BranchA1Event | BranchB1Event:
        if random.randint(0, 1) == 0:
            print("Go to branch A")
            return BranchA1Event(payload="Branch A")
        else:
            print("Go to branch B")
            return BranchB1Event(payload="Branch B")

    @step
    async def step_a1(self, ev: BranchA1Event) -> BranchA2Event:
        print(ev.payload)
        return BranchA2Event(payload=ev.payload)

    @step
    async def step_b1(self, ev: BranchB1Event) -> BranchB2Event:
        print(ev.payload)
        return BranchB2Event(payload=ev.payload)

    @step
    async def step_a2(self, ev: BranchA2Event) -> StopEvent:
        print(ev.payload)
        return StopEvent(result="Branch A complete.")

    @step
    async def step_b2(self, ev: BranchB2Event) -> StopEvent:
        print(ev.payload)
        return StopEvent(result="Branch B complete.")

w = BranchWorkflow(timeout=10, verbose=False)
result = await w.run()
print(result)

draw_all_possible_flows(BranchWorkflow, filename="branch_workflow.html")

Go to branch A
Branch A
Branch A
Branch A complete.
<class 'NoneType'>
<class '__main__.BranchA1Event'>
<class '__main__.BranchB1Event'>
<class '__main__.BranchA2Event'>
<class 'llama_index.core.workflow.events.StopEvent'>
<class '__main__.BranchB2Event'>
<class 'llama_index.core.workflow.events.StopEvent'>
branch_workflow.html


## 工作流上下文

在以上例子中，可以看见变量状态随着流程进行传递，如果要在未直接连接的步骤之间传递数据，则需要通过两者之间的所有步骤传递数据。这会使您的代码更难阅读和维护

为了解决这个问题，为工作流引入一个全局可用的对象Context

In [1]:
from llama_index.core.workflow import (
    StartEvent,
    StopEvent,
    Workflow,
    step,
    Event,
    Context,
)

class SetupEvent(Event):
    query: str


class StepTwoEvent(Event):
    query: str


class StatefulFlow(Workflow):
    @step
    async def start(
        self, ctx: Context, ev: StartEvent
    ) -> SetupEvent | StepTwoEvent:
        db = await ctx.get("some_database", default=None)
        if db is None:
            print("Need to load data")
            return SetupEvent(query=ev.query)

        # do something with the query
        return StepTwoEvent(query=ev.query)

    @step
    async def setup(self, ctx: Context, ev: SetupEvent) -> StartEvent:
        # load data
        await ctx.set("some_database", [1, 2, 3])
        return StartEvent(query=ev.query)

    @step
    async def step_two(self, ctx: Context, ev: StepTwoEvent) -> StopEvent:
        # do something with the data
        print("Data is ", await ctx.get("some_database"))

        return StopEvent(result=await ctx.get("some_database"))

w = StatefulFlow(timeout=10, verbose=False)
result = await w.run(query="Some query")
print(result)        

Need to load data
Data is  [1, 2, 3]
[1, 2, 3]


## 流式处理事件

llamaIndex中的WorkFlow的Context支持传递事件，

In [1]:
import nest_asyncio
nest_asyncio.apply()

from llama_index.core.workflow import (
    StartEvent,
    StopEvent,
    Workflow,
    step,
    Event,
    Context,
)
import asyncio
from llama_index.utils.workflow import draw_all_possible_flows
from llama_index.llms.ollama import Ollama

In [2]:
class FirstEvent(Event):
    first_output: str

class SecondEvent(Event):
    second_output: str
    response: str

class ProgressEvent(Event):
    msg: str

class MyWorkflow(Workflow):
    @step
    async def step_one(self, ctx: Context, ev: StartEvent) -> FirstEvent:
        ctx.write_event_to_stream(ProgressEvent(msg="Step one is happening"))
        return FirstEvent(first_output="First step complete.")

    @step
    async def step_two(self, ctx: Context, ev: FirstEvent) -> SecondEvent:
        llm  = Ollama(
            model="qwen2.5:latest", 
            request_timeout=360.0,
            base_url='http://localhost:11434')
        
        generator = await llm.astream_complete(
            "用中文给我讲2个笑话，每个笑话不超过15个字"
        )
        async for response in generator:
            # Allow the workflow to stream this piece of response
            ctx.write_event_to_stream(ProgressEvent(msg=response.delta))
        return SecondEvent(
            second_output="Second step complete, full response attached",
            response=str(response),
        )

    @step
    async def step_three(self, ctx: Context, ev: SecondEvent) -> StopEvent:
        ctx.write_event_to_stream(ProgressEvent(msg="Step three is happening"))
        return StopEvent(result="Workflow complete.")

In [4]:
async def main():
    w = MyWorkflow(timeout=30, verbose=True)
    handler = w.run(first_input="Start the workflow.")

    async for ev in handler.stream_events():
        if isinstance(ev, ProgressEvent):
            print(ev.msg)

    final_result = await handler
    print("Final result", final_result)

    draw_all_possible_flows(MyWorkflow, filename="streaming_workflow.html")


if __name__ == "__main__":
    asyncio.run(main())

Running step step_one
Step step_one produced event FirstEvent
Running step step_two


Step one is happening
1
.
 老
婆
做饭
咸
，
加
点
水
稀
释
一下
。

2
.
 
 why
 did
 the
 tomato
 turn
 red
?
 Because
 it
 saw
 the
 salad
 dressing
!

Step step_two produced event SecondEvent
Running step step_three
Step step_three produced event StopEvent
Step three is happening
Final result Workflow complete.
<class 'NoneType'>
<class '__main__.FirstEvent'>
<class 'llama_index.core.workflow.events.StopEvent'>
<class '__main__.SecondEvent'>
streaming_workflow.html


## 工作流的并发执行

llamaindex的工作流可以被并发执行

In [None]:
import random

#  一次性发出多个事件
class StepTwoEvent(Event):
    query: str

class StepTwoEvent(Event):
    query: str

class ParallelFlow(Workflow):
    @step
    async def start(self, ctx: Context, ev: StartEvent) -> StepTwoEvent:
        ctx.send_event(StepTwoEvent(query="Query 1"))
        ctx.send_event(StepTwoEvent(query="Query 2"))
        ctx.send_event(StepTwoEvent(query="Query 3"))

    @step(num_workers=4)
    async def step_two(self, ctx: Context, ev: StepTwoEvent) -> StopEvent:
        print("Running slow query ", ev.query)
        await asyncio.sleep(random.randint(1, 5))

        return StopEvent(result=ev.query)

In [11]:
async def main():
    w = ParallelFlow(timeout=30, verbose=True)
    await  w.run()

    draw_all_possible_flows(ParallelFlow, filename="parallel_workflow.html")


if __name__ == "__main__":
    asyncio.run(main())

Running step start
Step start produced no event
Running step step_two
Running slow query  Query 1
Running step step_two
Running slow query  Query 2
Running step step_two
Running slow query  Query 3


Step step_two produced event StopEvent
<class 'NoneType'>
<class '__main__.StepTwoEvent'>
<class 'llama_index.core.workflow.events.StopEvent'>
parallel_workflow.html


以上流程，step_two每次接受一个结果就执行，这在某些情况下适合，在某些情况下需要等待所有调用完成再执行，此时通过collect_events来判断

In [14]:
class StepThreeEvent(Event):
    result: str

class ConcurrentFlow(Workflow):
    @step
    async def start(self, ctx: Context, ev: StartEvent) -> StepTwoEvent:
        ctx.send_event(StepTwoEvent(query="Query 1"))
        ctx.send_event(StepTwoEvent(query="Query 2"))
        ctx.send_event(StepTwoEvent(query="Query 3"))

    @step(num_workers=4)
    async def step_two(self, ctx: Context, ev: StepTwoEvent) -> StepThreeEvent:
        print("Running query ", ev.query)
        await asyncio.sleep(random.randint(1, 5))
        return StepThreeEvent(result=ev.query)

    @step
    async def step_three(self, ctx: Context, ev: StepThreeEvent) -> StopEvent:
        # wait until we receive 3 events
        result = ctx.collect_events(ev, [StepThreeEvent] * 3)
        if result is None:
            return None

        # do something with all 3 results together
        print('result:',result)
        return StopEvent(result="Done")

async def main():
    w = ConcurrentFlow(timeout=30, verbose=True)
    await  w.run()

    draw_all_possible_flows(ConcurrentFlow, filename="concurrent_workflow.html")


if __name__ == "__main__":
    asyncio.run(main())

Running step start
Step start produced no event
Running step step_two
Running query  Query 1
Running step step_two
Running query  Query 2
Running step step_two
Running query  Query 3


Step step_two produced event StepThreeEvent
Step step_two produced event StepThreeEvent
Running step step_three
Step step_three produced no event
Running step step_three
Step step_three produced no event
Step step_two produced event StepThreeEvent
Running step step_three
result: [StepThreeEvent(result='Query 1'), StepThreeEvent(result='Query 2'), StepThreeEvent(result='Query 3')]
Step step_three produced event StopEvent
<class 'NoneType'>
<class '__main__.StepTwoEvent'>
<class 'llama_index.core.workflow.events.StopEvent'>
<class '__main__.StepThreeEvent'>
concurrent_workflow.html


以上过程是等待同类型的3个事件，其实也可以等待不同类型的事件

In [15]:
class StepAEvent(Event):
    query: str

class StepBEvent(Event):
    query: str

class StepCEvent(Event):
    query: str

class StepACompleteEvent(Event):
    result: str

class StepBCompleteEvent(Event):
    result: str

class StepCCompleteEvent(Event):
    result: str

class ConcurrentFlow2(Workflow):
    @step
    async def start(
        self, ctx: Context, ev: StartEvent
    ) -> StepAEvent | StepBEvent | StepCEvent:
        ctx.send_event(StepAEvent(query="Query 1"))
        ctx.send_event(StepBEvent(query="Query 2"))
        ctx.send_event(StepCEvent(query="Query 3"))

    @step
    async def step_a(self, ctx: Context, ev: StepAEvent) -> StepACompleteEvent:
        print("Doing something A-ish")
        return StepACompleteEvent(result=ev.query)

    @step
    async def step_b(self, ctx: Context, ev: StepBEvent) -> StepBCompleteEvent:
        print("Doing something B-ish")
        return StepBCompleteEvent(result=ev.query)

    @step
    async def step_c(self, ctx: Context, ev: StepCEvent) -> StepCCompleteEvent:
        print("Doing something C-ish")
        return StepCCompleteEvent(result=ev.query)

    @step
    async def step_three(
        self,
        ctx: Context,
        ev: StepACompleteEvent | StepBCompleteEvent | StepCCompleteEvent,
    ) -> StopEvent:
        print("Received event ", ev.result)

        # wait until we receive 3 events
        if (
            ctx.collect_events(
                ev,
                [StepCCompleteEvent, StepACompleteEvent, StepBCompleteEvent],
            )
            is None
        ):
            return None

        # do something with all 3 results together
        return StopEvent(result="Done")

async def main():
    w = ConcurrentFlow2(timeout=30, verbose=True)
    await  w.run()

    draw_all_possible_flows(ConcurrentFlow2, filename="concurrent2_workflow.html")


if __name__ == "__main__":
    asyncio.run(main())

Running step start
Step start produced no event
Running step step_a
Doing something A-ish
Step step_a produced event StepACompleteEvent
Running step step_b
Doing something B-ish
Step step_b produced event StepBCompleteEvent
Running step step_c
Doing something C-ish
Step step_c produced event StepCCompleteEvent
Running step step_three
Received event  Query 1
Step step_three produced no event
Running step step_three
Received event  Query 2
Step step_three produced no event
Running step step_three
Received event  Query 3
Step step_three produced event StopEvent
<class 'NoneType'>
<class '__main__.StepAEvent'>
<class '__main__.StepBEvent'>
<class '__main__.StepCEvent'>
<class '__main__.StepACompleteEvent'>
<class '__main__.StepBCompleteEvent'>
<class '__main__.StepCCompleteEvent'>
<class 'llama_index.core.workflow.events.StopEvent'>
concurrent2_workflow.html


## 继承自定义工作流类

自定义工作流是继承WorkFlow类，其实自定义的工作流也是可以作为其他类的基类被继承，相当于实现了一个基础工作流，继承这个工作流再实现一个更加复杂的子类工作流

In [16]:
from llama_index.core.workflow import (
    StartEvent,
    StopEvent,
    Workflow,
    step,
    Event,
    Context,
)

class Step2Event(Event):
    query: str

class Step3Event(Event):
    query: str

class MainWorkflow(Workflow):
    @step
    async def start(self, ev: StartEvent) -> Step2Event:
        print("Starting up")
        return Step2Event(query=ev.query)

    @step
    async def step_two(self, ev: Step2Event) -> Step3Event:
        print("Sending an email")
        return Step3Event(query=ev.query)

    @step
    async def step_three(self, ev: Step3Event) -> StopEvent:
        print("Finishing up")
        return StopEvent(result=ev.query)

w = MainWorkflow(timeout=10, verbose=False)
result = await w.run(query="Initial query")
print(result)

Starting up
Sending an email
Finishing up
Initial query


In [18]:
class Step2BEvent(Event):
    query: str

class CustomWorkflow(MainWorkflow):
    @step
    async def step_two(self, ev: Step2Event) -> Step2BEvent:
        print("Sending an email")
        return Step2BEvent(query=ev.query)

    @step
    async def step_two_b(self, ev: Step2BEvent) -> Step3Event:
        print("Also sending a text message")
        return Step3Event(query=ev.query)
    
w = CustomWorkflow(timeout=10, verbose=False)
result = await w.run(query="Initial query")
print(result)

Starting up
Sending an email
Also sending a text message
Finishing up
Initial query


## 嵌套工作流

将某个工作流嵌入到已存在的工作流中

In [19]:
from llama_index.core.workflow import (
    StartEvent,
    StopEvent,
    Workflow,
    step,
    Event,
    Context,
)
from llama_index.utils.workflow import draw_all_possible_flows


class Step2Event(Event):
    query: str


class MainWorkflow(Workflow):
    @step
    async def start(
        self, ctx: Context, ev: StartEvent, reflection_workflow: Workflow
    ) -> Step2Event:
        print("Need to run reflection")
        res = await reflection_workflow.run(query=ev.query)

        return Step2Event(query=res)

    @step
    async def step_two(self, ctx: Context, ev: Step2Event) -> StopEvent:
        print("Query is ", ev.query)
        # do something with the query here
        return StopEvent(result=ev.query)

class ReflectionFlow(Workflow):
    @step
    async def sub_start(self, ctx: Context, ev: StartEvent) -> StopEvent:
        print("Doing custom reflection")
        return StopEvent(result="Improved query")

w = MainWorkflow(timeout=10, verbose=False)
w.add_workflows(reflection_workflow=ReflectionFlow())
result = await w.run(query="Initial query")
print(result)

Need to run reflection
Doing custom reflection
Query is  Improved query
Improved query
