# Lesson 2: Building a Workflow

**Lesson objective**: Use event-based Workflows to define the flow of information around a system

In this lab, you'll learn about Workflow concepts.

In [13]:
# Click the Directory icon of the left and upload the requirements.txt
!pip install -r requirements.txt

Collecting llama-index-core (from -r requirements.txt (line 6))
  Downloading llama_index_core-0.12.23.post2-py3-none-any.whl.metadata (2.5 kB)
Collecting llama-index-utils-workflow (from -r requirements.txt (line 7))
  Downloading llama_index_utils_workflow-0.3.0-py3-none-any.whl.metadata (665 bytes)
Collecting llama-index-llms-openai (from -r requirements.txt (line 8))
  Downloading llama_index_llms_openai-0.3.25-py3-none-any.whl.metadata (3.3 kB)
Collecting llama-parse (from -r requirements.txt (line 9))
  Downloading llama_parse-0.6.4.post1-py3-none-any.whl.metadata (6.9 kB)
Collecting llama-index-embeddings-openai (from -r requirements.txt (line 10))
  Downloading llama_index_embeddings_openai-0.3.1-py3-none-any.whl.metadata (684 bytes)
Collecting llama-index-readers-whisper (from -r requirements.txt (line 11))
  Downloading llama_index_readers_whisper-0.1.0-py3-none-any.whl.metadata (1.3 kB)
Collecting gradio (from -r requirements.txt (line 12))
  Downloading gradio-5.20.1-py3-no

<div style="background-color:#fff1d7; padding:15px;"> <b> Note</b>: Make sure to run the notebook cell by cell. Please try to avoid running all cells at once.</div>

In [None]:
# Add your utilities or helper functions to this file.

import os
from dotenv import load_dotenv, find_dotenv

# these expect to find a .env file at the directory above the lesson.
# the format for that file is (without the comment)
#API_KEYNAME=AStringThatIsTheLongAPIKeyFromSomeService
def load_env():
    _ = load_dotenv(find_dotenv())

def get_openai_api_key():
    load_env()
    openai_api_key = os.getenv("OPENAI_API_KEY")
    return openai_api_key

def get_llama_cloud_api_key():
    load_env()
    llama_cloud_api_key = os.getenv("LLAMA_CLOUD_API_KEY")
    return llama_cloud_api_key

def extract_html_content(filename):
    try:
        with open(filename, 'r') as file:
            html_content = file.read()
            html_content = f""" <div style="width: 100%; height: 800px; overflow: hidden;"> {html_content} </div>"""
            return html_content
    except Exception as e:
        raise Exception(f"Error reading file: {str(e)}")

ModuleNotFoundError: No module named 'dotenv'

In [None]:
from IPython.display import display, HTML
from helper import extract_html_content
import random
from helper import get_openai_api_key

ImportError: cannot import name 'extract_html_content' from 'helper' (unknown location)

In [None]:
api_key = get_openai_api_key()

NameError: name 'get_openai_api_key' is not defined

<div style="background-color:#fff6ff; padding:13px; border-width:3px; border-color:#efe6ef; border-style:solid; border-radius:6px">
<p> 💻 &nbsp; <b>To access <code>requirements.txt</code> and <code>helper.py</code> files:</b> 1) click on the <em>"File"</em> option on the top menu of the notebook and then 2) click on <em>"Open"</em>.

<p> ⬇ &nbsp; <b>Download Notebooks:</b> 1) click on the <em>"File"</em> option on the top menu of the notebook and then 2) click on <em>"Download as"</em> and select <em>"Notebook (.ipynb)"</em>.</p>

<p> 📒 &nbsp; For more help, please see the <em>"Appendix – Tips and Help"</em> Lesson.</p>

</div>

<p style="background-color:#f7fff8; padding:15px; border-width:3px; border-color:#e0f0e0; border-style:solid; border-radius:6px"> 🚨
&nbsp; <b>Different Run Results:</b> The output generated by AI chat models can vary with each execution due to their dynamic, probabilistic nature. Don't be surprised if your results differ from those shown in the video.</p>

## Creating a Workflow

Under the hood, Workflows are regular Python classes. They are defined as a series of steps, each of which receives certain classes of events and emits certain classes of events.

Here's the most basic form of a workflow, with a single step:

In [None]:
from llama_index.core.workflow import (
    StartEvent,
    StopEvent,
    Workflow,
    step,
    Context
)

In [None]:
class MyWorkflow(Workflow):
    # declare a function as a step
    @step
    async def my_step(self, ev: StartEvent) -> StopEvent:
        # do something here
        return StopEvent(result="Hello, world!")

This new `MyWorkflow` class:
* Uses the `@step` decorator to declare a function to be a step
* Has a single step called `my_step` which accepts a `StartEvent`. `StartEvent` is a special event which is always generated when a workflow first runs.
* `my_step` returns a `StopEvent`, which is another special event. When a `StopEvent` is emitted the workflow returns it and stops running.

*Note*: The **async** keyword defines asynchronous functions, which can be paused and resumed, allowing other tasks to run in the meantime.

You instantiate it and run your workflow like this:

In [None]:
# instantiate the workflow
basic_workflow = MyWorkflow(timeout=10, verbose=False)
# run the workflow
result = await basic_workflow.run()
print(result)

*Note*:
- The **await** keyword is used with async functions to pause execution until the specific asynchronous task is complete.
- The **timeout** argument represent the number of seconds after which the workflow execution will be halted


### Side Note

Workflows are async by default, so you use `await` to get the result of the `run` command. This will work fine in a notebook environment; in a vanilla python script you will need to import `asyncio` and wrap your code in an async function, like this:

```
async def main():
    w = MyWorkflow(timeout=10, verbose=False)
    result = await w.run()
    print(result)


if __name__ == "__main__":
    import asyncio
    asyncio.run(main())
```

Since you're in a notebook right now, you won't execute the above code as it won't work!

## Visualizing a workflow

A great feature of workflows is the built-in visualizer, which you will install now:

In [None]:
from llama_index.utils.workflow import draw_all_possible_flows

In [None]:
draw_all_possible_flows(
    basic_workflow,
    filename="workflows/basic_workflow.html"
)

You are provided with a helper function which displays the HTML page generated inside the notebook. On your own computer, you could just open the file in your browser.

In [None]:
html_content = extract_html_content("workflows/basic_workflow.html")
display(HTML(html_content), metadata=dict(isolated=True))

<div style="background-color:#fff1d7; padding:15px;"> <b> Note </b>: The visualized workflow might look slightly different from that of the video. If it doesn't display in the notebook, you can: 1) click on the <em>"File"</em> option on the top menu of the notebook and then 2) click on <em>"Open"</em>. The HTML file is in the folder "workflows".</div>

Of course, a flow with a single step is not very useful! Let's define a multi-step workflow.

## Creating Custom Events

Multiple steps are created by defining custom events that can be emitted by steps and trigger other steps. Let's define a simple 3-step workflow by defining two custom events, `FirstEvent` and `SecondEvent`. These classes can have any names and properties, but must inherit from `Event`:

In [None]:
from llama_index.core.workflow import Event

class FirstEvent(Event):
    first_output: str

class SecondEvent(Event):
    second_output: str

### Defining the workflow

Now you define the workflow itself. You do this by defining the input and output types on each step.

* `step_one` takes a `StartEvent` and returns a `FirstEvent`
* `step_two` takes a `FirstEvent` and returns a `SecondEvent`
* `step_three` takes a `SecondEvent` and returns a `StopEvent`

In [None]:
class MyWorkflow(Workflow):
    @step
    async def step_one(self, ev: StartEvent) -> FirstEvent:
        print(ev.first_input)
        return FirstEvent(first_output="First step complete.")

    @step
    async def step_two(self, ev: FirstEvent) -> SecondEvent:
        print(ev.first_output)
        return SecondEvent(second_output="Second step complete.")

    @step
    async def step_three(self, ev: SecondEvent) -> StopEvent:
        print(ev.second_output)
        return StopEvent(result="Workflow complete.")

*Note:* Properties of `StartEvent` and `StopEvent`:
- For `StartEvent`, you define its properties and pass in their values when you run the workflow as shown in the next cell.
- For `StopEvent`, by default, it only has one property `result`. You can always create a class that inherits `StopEvent` so you can customize what it returns.

You run this just like you ran the other workflows:

In [None]:
workflow = MyWorkflow(timeout=10, verbose=False)
result = await workflow.run(first_input="Start the workflow.")
print(result)

And you can visualize it just like you did before:

In [None]:
WORKFLOW_FILE = "workflows/custom_events.html"
draw_all_possible_flows(workflow, filename=WORKFLOW_FILE)

In [None]:
html_content = extract_html_content(WORKFLOW_FILE)
display(HTML(html_content), metadata=dict(isolated=True))

## Creating Loops

However, there's not much point to a workflow if it just runs straight through! A key feature of Workflows is their enablement of branching and looping logic, more simply and flexibly than graph-based approaches. To enable looping, let's create a new `LoopEvent`:

In [None]:
class LoopEvent(Event):
    loop_output: str

Now you'll edit your `step_one` to make a random decision about whether to execute serially or loop back:

In [None]:
class MyWorkflow(Workflow):
    @step
    async def step_one(self, ev: StartEvent | LoopEvent) -> FirstEvent | LoopEvent:
        if random.randint(0, 1) == 0:
            print("Bad thing happened")
            return LoopEvent(loop_output="Back to step one.")
        else:
            print("Good thing happened")
            return FirstEvent(first_output="First step complete.")

    @step
    async def step_two(self, ev: FirstEvent) -> SecondEvent:
        print(ev.first_output)
        return SecondEvent(second_output="Second step complete.")

    @step
    async def step_three(self, ev: SecondEvent) -> StopEvent:
        print(ev.second_output)
        return StopEvent(result="Workflow complete.")

Note the new type annotations on `step_one`: the step now accepts either a `StartEvent` or a `LoopEvent` to trigger the step, and it also emits either a `FirstEvent` or a `LoopEvent`.

You run it as usual. You might need to run it a couple of times to see the loop happen.

In [None]:
loop_workflow = MyWorkflow(timeout=10, verbose=False)
result = await loop_workflow.run(first_input="Start the workflow.")
print(result)

Your new, looping workflow visualizes like this:

In [None]:
WORKFLOW_FILE = "workflows/loop_events.html"
draw_all_possible_flows(loop_workflow, filename=WORKFLOW_FILE)
html_content = extract_html_content(WORKFLOW_FILE)
display(HTML(html_content), metadata=dict(isolated=True))

## Branching

The same constructs that allow you to loop allow you to create branches. Here's a workflow that executes two different branches depending on an early decision:

In [None]:
class BranchA1Event(Event):
    payload: str

class BranchA2Event(Event):
    payload: str

class BranchB1Event(Event):
    payload: str

class BranchB2Event(Event):
    payload: str

In [None]:
class BranchWorkflow(Workflow):
    @step
    async def start(self, ev: StartEvent) -> BranchA1Event | BranchB1Event:
        if random.randint(0, 1) == 0:
            print("Go to branch A")
            return BranchA1Event(payload="Branch A")
        else:
            print("Go to branch B")
            return BranchB1Event(payload="Branch B")

    @step
    async def step_a1(self, ev: BranchA1Event) -> BranchA2Event:
        print(ev.payload)
        return BranchA2Event(payload=ev.payload)

    @step
    async def step_b1(self, ev: BranchB1Event) -> BranchB2Event:
        print(ev.payload)
        return BranchB2Event(payload=ev.payload)

    @step
    async def step_a2(self, ev: BranchA2Event) -> StopEvent:
        print(ev.payload)
        return StopEvent(result="Branch A complete.")

    @step
    async def step_b2(self, ev: BranchB2Event) -> StopEvent:
        print(ev.payload)
        return StopEvent(result="Branch B complete.")

You don't actually need to instantiate the workflow to visualize it, you can just pass the workflow class directly to the visualizer:

In [None]:
WORKFLOW_FILE = "workflows/branching.html"
draw_all_possible_flows(BranchWorkflow, filename=WORKFLOW_FILE)
html_content = extract_html_content(WORKFLOW_FILE)

In [None]:
display(HTML(html_content), metadata=dict(isolated=True))

## Concurent Execution

The final form of flow control you can implement in workflows is concurrent execution. This allows you to efficiently run long-running tasks in parallel, and gather them together when they are needed. Let's see how this is done.

You'll be using a new concept, the `Context` object. This is a form of shared memory available to every step in a workflow: to access it, declare it as an argument to your step and it will be automatically populated.

In this example, you'll use `Context.send_event` rather than returning an event. This allows you to emit multiple events in parallel rather than returning just one as you did previously.

In [None]:
import asyncio

class StepTwoEvent(Event):
    query: str

class ParallelFlow(Workflow):
    @step
    async def start(self, ctx: Context, ev: StartEvent) -> StepTwoEvent:
        ctx.send_event(StepTwoEvent(query="Query 1"))
        ctx.send_event(StepTwoEvent(query="Query 2"))
        ctx.send_event(StepTwoEvent(query="Query 3"))

    @step(num_workers=4)
    async def step_two(self, ctx: Context, ev: StepTwoEvent) -> StopEvent:
        print("Running slow query ", ev.query)
        await asyncio.sleep(random.randint(1, 5))

        return StopEvent(result=ev.query)

In [None]:
parallel_workflow = ParallelFlow(timeout=10, verbose=False)
result = await parallel_workflow.run()
print(result)

### Collecting events

But what if you do want the output of all 3 events? Another method, `Context.collect_events`, exists for that purpose:

In [None]:
class StepThreeEvent(Event):
    result: str

class ConcurrentFlow(Workflow):
    @step
    async def start(self, ctx: Context, ev: StartEvent) -> StepTwoEvent:
        ctx.send_event(StepTwoEvent(query="Query 1"))
        ctx.send_event(StepTwoEvent(query="Query 2"))
        ctx.send_event(StepTwoEvent(query="Query 3"))

    @step(num_workers=4)
    async def step_two(self, ctx: Context, ev: StepTwoEvent) -> StepThreeEvent:
        print("Running query ", ev.query)
        await asyncio.sleep(random.randint(1, 5))
        return StepThreeEvent(result=ev.query)

    @step
    async def step_three(self, ctx: Context, ev: StepThreeEvent) -> StopEvent:
        # wait until we receive 3 events
        result = ctx.collect_events(ev, [StepThreeEvent] * 3)
        if result is None:
            print("Not all events received yet.")
            return None

        # do something with all 3 results together
        print(result)
        return StopEvent(result="Done")

In [None]:
w = ConcurrentFlow(timeout=10, verbose=False)
result = await w.run(message="Start the workflow.")
print(result)

What `collect_events` does is store the events in the context until it has collected the number and type of events specified in its second argument. In this case, you've told it to wait for 3 events.

If an event fires and `collect_events` hasn't yet seen the right number of events, it returns `None`, so you tell `step_three` to do nothing in that case. When `collect_events` receives the right number of events it returns them as an array, which you can see being printed in the final output.

*Note:* This flow control lets you perform map-reduce style tasks.To implement a map-reduce pattern, you would split your task up into as many steps as necessary, and use `Context` to store that number with `ctx.set("num_events", some_number)`. Then in `step_three` you would wait for the number stored in the context using `await ctx.get("num_events")`. So you don't need to know in advance exactly how many concurrent steps you're taking. You'll do exactly this in a later lesson.

### Collecting different event types

You don't just have to wait for multiple events of the same kind. In this example, you'll emit 3 totally different events and collect them at the end.

In [None]:
class StepAEvent(Event):
    query: str

class StepACompleteEvent(Event):
    result: str

class StepBEvent(Event):
    query: str

class StepBCompleteEvent(Event):
    result: str

class StepCEvent(Event):
    query: str

class StepCCompleteEvent(Event):
    result: str

In [None]:
class ConcurrentFlow(Workflow):
    @step
    async def start(
        self, ctx: Context, ev: StartEvent
    ) -> StepAEvent | StepBEvent | StepCEvent:
        ctx.send_event(StepAEvent(query="Query 1"))
        ctx.send_event(StepBEvent(query="Query 2"))
        ctx.send_event(StepCEvent(query="Query 3"))

    @step
    async def step_a(self, ctx: Context, ev: StepAEvent) -> StepACompleteEvent:
        print("Doing something A-ish")
        return StepACompleteEvent(result=ev.query)

    @step
    async def step_b(self, ctx: Context, ev: StepBEvent) -> StepBCompleteEvent:
        print("Doing something B-ish")
        return StepBCompleteEvent(result=ev.query)

    @step
    async def step_c(self, ctx: Context, ev: StepCEvent) -> StepCCompleteEvent:
        print("Doing something C-ish")
        return StepCCompleteEvent(result=ev.query)

    @step
    async def step_three(
        self,
        ctx: Context,
        ev: StepACompleteEvent | StepBCompleteEvent | StepCCompleteEvent,
    ) -> StopEvent:
        print("Received event ", ev.result)

        # wait until we receive 3 events
        events = ctx.collect_events(
            ev,
            [StepCCompleteEvent, StepACompleteEvent, StepBCompleteEvent],
        )
        if (events is None):
            return None

        # do something with all 3 results together
        print("All events received: ", events)
        return StopEvent(result="Done")

When you run it, it will do all three things and wait for them in `step_three`.

In [None]:
w = ConcurrentFlow(timeout=10, verbose=False)
result = await w.run(message="Start the workflow.")
print(result)

This new flow has quite a pretty visualization:

In [None]:
WORKFLOW_FILE = "workflows/concurrent_different_events.html"
draw_all_possible_flows(w, filename=WORKFLOW_FILE)
html_content = extract_html_content(WORKFLOW_FILE)

In [None]:
display(HTML(html_content), metadata=dict(isolated=True))

## Streaming

In practical use, agents can take a long time to run. It's a poor user-experience to have the user execute a workflow and then wait a long time to see if it works or not; it's better to give them some indication that things are happening in real-time, even if the process is not complete.

To do this, Workflows allow streaming events back to the user. Here you'll use `Context.write_event_to_stream` to emit these events.

In [None]:
from llama_index.llms.openai import OpenAI

In [None]:
class FirstEvent(Event):
    first_output: str

class SecondEvent(Event):
    second_output: str
    response: str

class TextEvent(Event):
    delta: str

class ProgressEvent(Event):
    msg: str

The specific event we'll be sending back is the "delta" responses from the LLM. When you ask an LLM to generate a streaming response as you're doing here, it sends back each chunk of its response as it becomes available. This is available as the "delta". You're going to wrap the delta in a TextEvent and send it back to the Workflow's own stream.

In [None]:
class MyWorkflow(Workflow):
    @step
    async def step_one(self, ctx: Context, ev: StartEvent) -> FirstEvent:
        ctx.write_event_to_stream(ProgressEvent(msg="Step one is happening"))
        return FirstEvent(first_output="First step complete.")

    @step
    async def step_two(self, ctx: Context, ev: FirstEvent) -> SecondEvent:
        llm = OpenAI(model="gpt-4o-mini", api_key=api_key)
        generator = await llm.astream_complete(
            "Please give me the first 50 words of Moby Dick, a book in the public domain."
        )
        async for response in generator:
            # Allow the workflow to stream this piece of response
            ctx.write_event_to_stream(TextEvent(delta=response.delta))
        return SecondEvent(
            second_output="Second step complete, full response attached",
            response=str(response),
        )

    @step
    async def step_three(self, ctx: Context, ev: SecondEvent) -> StopEvent:
        ctx.write_event_to_stream(ProgressEvent(msg="Step three is happening"))
        return StopEvent(result="Workflow complete.")

You can work with the emitted events by getting a streaming endpoint from the `run` command, and then filtering it for the types of events we want to see (you could print every event if you wanted to but that would be quite noisy).

In this case you'll just print out the progressevents and the textevents.

In [None]:
workflow = MyWorkflow(timeout=30, verbose=False)
handler = workflow.run(first_input="Start the workflow.")

async for ev in handler.stream_events():
    if isinstance(ev, ProgressEvent):
        print(ev.msg)
    if isinstance(ev, TextEvent):
        print(ev.delta, end="")

final_result = await handler
print("Final result = ", final_result)

## Congratulations!

You've successfully built a number of increasingly-complex Workflows, and mastered the basic concepts. In the next lesson, you'll add RAG to your Workflow.

## Resource

[Guide to LlamaIndex's Workflows](https://docs.llamaindex.ai/en/stable/module_guides/workflow/)