# 📢 Disclaimer

This notebook contains material copied verbatim from the [LlamaIndex documentation](https://www.llamaindex.ai/)  
and was created with the assistance of ChatGPT.  

It is intended for educational purposes only.  
All copyrights and credits belong to the LlamaIndex team and their respective authors.



# Concurrent execution of workflows

In addition to looping and branching, workflows can run steps concurrently. This is useful when you have multiple steps that can be run independently of each other and they have time-consuming operations that they `await`, allowing other steps to run in parallel.


## Emitting multiple events

In our examples so far, we've only emitted one event from each step. But there are many cases where you would want to run steps in parallel. To do this, you need to emit multiple events. You can do this using send_event.

In [7]:
from llama_index.core.workflow import (
    StartEvent,
    StopEvent,
    Workflow,
    step,
    Event,
    Context,
)
import asyncio
import random
from llama_index.llms.openai import OpenAI
from llama_index.utils.workflow import draw_all_possible_flows

In [23]:
class StepTwoEvent(Event):
    query: str

class ParellelFlow(Workflow):
    @step
    async def start(self,ctx: Context, ev: StartEvent) -> StepTwoEvent:
        ctx.send_event(StepTwoEvent(query="Query 1"))
        ctx.send_event(StepTwoEvent(query="Query 2"))
        ctx.send_event(StepTwoEvent(query="Query 3"))

    @step(num_workers=4)
    async def step_two(self, ctx: Context, ev: StepTwoEvent) -> StopEvent:
        print("Running slow query ", ev.query)
        await asyncio.sleep(random.randint(1, 5))
        return StopEvent(result=ev.query)

w = ParellelFlow(timeout=300, verbose=True)
handler = await w.run()  
print(handler)


Running step start
Step start produced no event
Running step step_two
Running slow query  Query 1
Running step step_two
Running slow query  Query 2
Running step step_two
Running slow query  Query 3
Step step_two produced event StopEvent
Query 3


In this example, our start step emits 3 `StepTwoEvents`. The `step_two` step is decorated with `num_workers=4`, which tells the workflow to run up to 4 instances of this step concurrently (this is the default).

In [24]:
draw_all_possible_flows(ParellelFlow, filename="Concurrent_execution.html")

Concurrent_execution.html


gio: file:///home/tshepiso/workspace/coldblooded-agents/LlamaIndex-Tutorials/workflows/Concurrent_execution.html: Failed to find default application for content type ‘text/html’


In [26]:
from IPython.display import HTML, display
display(HTML(open('Concurrent_execution.html', encoding='utf-8').read()))


---


✅ **If you are using `ctx.send_event()`, you don't need a `return`.**  
✅ **If you are NOT using `ctx.send_event()`**, **then YES you need to `return` an event**.

---

# 📚 Full Detailed Explanation:

| Situation | What to Do | Why |
|:---|:---|:---|
| You call `ctx.send_event(event)` | ❌ No need to `return`. | You already **sent** the event manually to the workflow. |
| You don't call `ctx.send_event()` | ✅ You must `return event`. | The workflow system **expects the step to return** the next event automatically. |

---

# 🎯 Two Styles of Writing Steps:

### 1. Sending events manually (`ctx.send_event` style)

```python
@step
async def my_step(ctx: Context, ev: SomeEvent):
    ctx.send_event(NextEvent(output="Hello"))
```
✅ Here, you **send** the event yourself, **no return** needed.

---

### 2. Returning an event (return-style)

```python
@step
async def my_step(ctx: Context, ev: SomeEvent) -> NextEvent:
    return NextEvent(output="Hello")
```
✅ Here, **you don't call `send_event`** —  
you **just `return`** the event object.  
The **workflow engine automatically** takes your returned event and processes it.

---

# 📈 Why have both options?

| Option | When to Use |
|:---|:---|
| `ctx.send_event` | When you want to **send multiple events** (fan-out) or do more complex things. |
| `return event` | When you just need to **send one simple event** and move on. |

✅ `return` is **simpler** if you're sending **one event**.  
✅ `send_event` is **more powerful** if you're **sending many events**, **doing concurrent steps**, or **custom orchestration**.

---

# 🧠 Real-world analogy:

| Real-world Action | Workflow Action |
|:---|:---|
| Handing a document directly to a person | `return event` |
| Mailing multiple packages through a post office | `ctx.send_event(event) multiple times` |

---

```markdown
## 🛠️ When to Return an Event vs Sending with `ctx.send_event`

In LlamaIndex workflows:

| Situation | What to Do |
|:---|:---|
| You manually send events using `ctx.send_event()` | ❌ No need to `return` anything |
| You don't call `ctx.send_event()` | ✅ Must `return` the event from the function |

---

### ✅ Sending manually (no return needed):
```python
@step
async def my_step(ctx: Context, ev: SomeEvent):
    ctx.send_event(NextEvent(output="Hello"))
```

---

### ✅ Returning directly (simple return style):
```python
@step
async def my_step(ctx: Context, ev: SomeEvent) -> NextEvent:
    return NextEvent(output="Hello")
```

---

### 📈 Summary:

| Option | Use When |
|:---|:---|
| `ctx.send_event(event)` | Need to send multiple events or control behavior |
| `return event` | Simple single event case |

---
```

---

# 🔥 TL;DR:
| Step Behavior | Explanation |
|:---|:---|
| Call `ctx.send_event()` | ➔ No need to `return` anything |
| Do not call `ctx.send_event()` | ➔ Must `return` an event |

---

✅ **In your ConcurrentFlow example**:  
because it **manually sends multiple events**,  
**no return is needed**.


# Collecting events

If you execute the previous example, you'll note that the workflow stops after whichever query is first to complete. Sometimes that's useful, but other times you'll want to wait for all your slow operations to complete before moving on to another step. You can do this using collect_events.

In [30]:
class StepTwoEvent(Event):
    query: str

class StepThreeEvent(Event):
    result: str

class ConcurrentFlow(Workflow):
    @step
    async def start(self, ctx: Context, ev: StartEvent) -> StepTwoEvent:
        ctx.send_event(StepTwoEvent(query="Query 1"))
        ctx.send_event(StepTwoEvent(query="Query 2"))
        ctx.send_event(StepTwoEvent(query="Query 3"))

    @step(num_workers=4)
    async def step_two(self, ctx: Context, ev: StepTwoEvent) -> StepThreeEvent:
        print("Running query ", ev.query)
        await asyncio.sleep(random.randint(1, 5))
        return StepThreeEvent(result=ev.query)

    @step
    async def step_three(self, ctx: Context, ev: StepThreeEvent) -> StopEvent:
        # wait until we receive 3 events
        result = ctx.collect_events(ev, [StepThreeEvent] * 3)
        if result is None:
            return None

        # do something with all 3 results together
        print(result)
        return StopEvent(result="Done")

w = ConcurrentFlow(timeout=300, verbose=False)
handler = await w.run()  
print(handler)
            

Running query  Query 1
Running query  Query 2
Running query  Query 3
[StepThreeEvent(result='Query 1'), StepThreeEvent(result='Query 2'), StepThreeEvent(result='Query 3')]
Done


The `collect_events` method lives on the `Context` and takes the event that triggered the step and an array of event types to wait for. In this case, we are awaiting 3 events of the same `StepThreeEvent` type.

The `step_three` step is fired every time a `StepThreeEvent` is received, but `collect_events` will return `None` until all 3 events have been received. At that point, the step will continue and you can do something with all 3 results together.

The result returned from `collect_events` is an array of the events that were collected, in the order that they were received.

In [31]:
draw_all_possible_flows(ConcurrentFlow, filename="Collecting_events.html")

Collecting_events.html


gio: file:///home/tshepiso/workspace/coldblooded-agents/LlamaIndex-Tutorials/workflows/Collecting_events.html: Failed to find default application for content type ‘text/html’


In [34]:
from IPython.display import HTML, display
display(HTML(open('Collecting_events.html', encoding='utf-8').read()))

# Multiple event types

Of course, you do not need to wait for the same type of event. You can wait for any combination of events you like, such as in this example

In [33]:
class StepAEvent(Event):
    query: str

class StepBEvent(Event):
    query: str

class StepCEvent(Event):
    query: str

class StepACompleteEvent(Event):
    result: str

class StepBCompleteEvent(Event):
    result: str

class StepCCompleteEvent(Event):
    result: str

class ConcurrentFlow(Workflow):
    @step
    async def start(
        self, ctx: Context, ev: StartEvent
    ) -> StepAEvent | StepBEvent | StepCEvent:
        ctx.send_event(StepAEvent(query="Query 1"))
        ctx.send_event(StepBEvent(query="Query 2"))
        ctx.send_event(StepCEvent(query="Query 3"))

    @step
    async def step_a(self, ctx: Context, ev: StepAEvent) -> StepACompleteEvent:
        print("Doing something A-ish")
        return StepACompleteEvent(result=ev.query)

    @step
    async def step_b(self, ctx: Context, ev: StepBEvent) -> StepBCompleteEvent:
        print("Doing something B-ish")
        return StepBCompleteEvent(result=ev.query)

    @step
    async def step_c(self, ctx: Context, ev: StepCEvent) -> StepCCompleteEvent:
        print("Doing something C-ish")
        return StepCCompleteEvent(result=ev.query)

    @step
    async def step_three(
        self,
        ctx: Context,
        ev: StepACompleteEvent | StepBCompleteEvent | StepCCompleteEvent,
    ) -> StopEvent:
        print("Received event ", ev.result)

        # wait until we receive 3 events
        if (
            ctx.collect_events(
                ev,
                [StepCCompleteEvent, StepACompleteEvent, StepBCompleteEvent],
            )
            is None
        ):
            return None

        # do something with all 3 results together
        return StopEvent(result="Done")

w = ConcurrentFlow(timeout=300, verbose=False)
handler = await w.run()  
print(handler)

Doing something A-ish
Doing something B-ish
Doing something C-ish
Received event  Query 1
Received event  Query 2
Received event  Query 3
Done


In [35]:
draw_all_possible_flows(ConcurrentFlow, filename="Multiple_event_types.html")

Multiple_event_types.html


gio: file:///home/tshepiso/workspace/coldblooded-agents/LlamaIndex-Tutorials/workflows/Multiple_event_types.html: Failed to find default application for content type ‘text/html’


In [36]:
from IPython.display import HTML, display
display(HTML(open('Multiple_event_types.html', encoding='utf-8').read()))

There are several changes we've made to handle multiple event types:

- `start` is now declared as emitting 3 different event types
- `step_three` is now declared as accepting 3 different event types
- `collect_events` now takes an array of the event types to wait for
  
Note that the order of the event types in the array passed to `collect_events` is important. The events will be returned in the order they are passed to `collect_events`, regardless of when they were received.