In [1]:
!pip install llama-index

Defaulting to user installation because normal site-packages is not writeable
Collecting llama-index
  Obtaining dependency information for llama-index from https://files.pythonhosted.org/packages/ba/9f/5998f4b2e21e9b78dbcf4dbbdaa73216898b15a72d3adbbf955e1351b35b/llama_index-0.12.23-py3-none-any.whl.metadata
  Downloading llama_index-0.12.23-py3-none-any.whl.metadata (12 kB)
Collecting llama-index-agent-openai<0.5.0,>=0.4.0 (from llama-index)
  Obtaining dependency information for llama-index-agent-openai<0.5.0,>=0.4.0 from https://files.pythonhosted.org/packages/8a/1f/a0e2eed0417b1f3b6a51da159eb57640f0501e74fd502f83a254b3a55054/llama_index_agent_openai-0.4.6-py3-none-any.whl.metadata
  Downloading llama_index_agent_openai-0.4.6-py3-none-any.whl.metadata (727 bytes)
Collecting llama-index-cli<0.5.0,>=0.4.1 (from llama-index)
  Obtaining dependency information for llama-index-cli<0.5.0,>=0.4.1 from https://files.pythonhosted.org/packages/ae/fa/2ee58764d733e9b5d61036ba6c8c96adcdb567ea16a

ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
conda-repo-cli 1.0.41 requires requests_mock, which is not installed.
spacy 3.7.2 requires typer<0.10.0,>=0.3.0, but you have typer 0.12.5 which is incompatible.
weasel 0.3.4 requires typer<0.10.0,>=0.3.0, but you have typer 0.12.5 which is incompatible.
conda-repo-cli 1.0.41 requires clyent==1.2.1, but you have clyent 1.2.2 which is incompatible.
conda-repo-cli 1.0.41 requires nbformat==5.4.0, but you have nbformat 5.7.0 which is incompatible.
conda-repo-cli 1.0.41 requires PyYAML==6.0, but you have pyyaml 6.0.2 which is incompatible.
conda-repo-cli 1.0.41 requires requests==2.28.1, but you have requests 2.32.3 which is incompatible.
python-lsp-black 1.2.1 requires black>=22.3.0, but you have black 0.0 which is incompatible.
s3fs 2023.3.0 requires fsspec==2023.3.0, but you have fsspec 2023.10.0 which is incompati

## API for workflow

In [1]:
import os
from dotenv import load_dotenv

load_dotenv()

True

In [2]:
import os

def get_openai_api_key():
    """Retrieve the OpenAI API key from environment variables."""
    return os.getenv('OPEN_AI_KEY')

def get_llama_cloud_api_key():
    """Retrieve the Llama Cloud API key from environment variables."""
    return os.getenv('LLAMA_CLOUD_API')

def extract_html_content(filename):
    """Read an HTML file and wrap its content in a scrollable div."""
    try:
        with open(filename, 'r', encoding='utf-8') as file:
            html_content = file.read()
            html_content = f""" <div style="width: 100%; height: 800px; overflow: hidden;"> {html_content} </div>"""
            return html_content
    except Exception as e:
        raise Exception(f"Error reading file: {str(e)}")


In [3]:
api_key = get_openai_api_key()

## Basic imports

In [4]:
from IPython.display import display, HTML
import random

In [5]:
from llama_index.core.workflow import (
    StartEvent,
    StopEvent,
    Workflow,
    step,
    Context
)

### Creating a Workflow

Under the hood, Workflows are regular Python classes. They are defined as a series of steps, each of which receives certain classes of events and emits certain classes of events.

Here's the most basic form of a workflow, with a single step:

1. **Step**:  
   A "step" is like a single task or action in a workflow. In the code, the `@step` decorator marks a function as a step in the workflow. This means the function will be executed as part of the workflow process. For example, `my_step` is a step that does something and returns a result.

2. **StartEvent**:  
   This is like a signal that tells the workflow to start. When the workflow begins, it receives a `StartEvent`. In the example, the `my_step` function takes this `StartEvent` as input, which means it will run when the workflow starts.

3. **StopEvent**:  
   This is like a signal that tells the workflow to stop. When a step finishes its task, it can return a `StopEvent` to indicate that the workflow is done. In the example, `my_step` returns a `StopEvent` with a result (in this case, the message "Hello, world!"), which means the workflow ends after this step.

4. **Async**:   
   The `async` keyword means that the function can run asynchronously. In simple terms, it allows the function to do its work without blocking other tasks. This is useful when you want to handle multiple things at once or wait for something (like a network request) without freezing the entire program. In the example, `my_step` is an asynchronous function, so it can perform tasks that might take some time (like waiting for data) without stopping everything else.


In [8]:
class MyWorkflow(Workflow):
    # declare a function as a step
    @step
    async def my_step(self, ev: StartEvent) -> StopEvent:
        # do something here
        return StopEvent(result="Hello, world!")

In [9]:
# instantiate the workflow
basic_workflow = MyWorkflow(timeout=10, verbose=False)
# run the workflow
result = await basic_workflow.run()
print(result)

Hello, world!


Workflows are async by default, so you use `await` to get the result of the `run` command. This will work fine in a notebook environment; in a vanilla python script you will need to import `asyncio` and wrap your code in an async function, like this:

```
async def main():
    w = MyWorkflow(timeout=10, verbose=False)
    result = await w.run()
    print(result)


if __name__ == "__main__":
    import asyncio
    asyncio.run(main())
```

## Visualizing workflow



In [9]:
!pip install llama-index-core
!pip install llama-index-utils-workflow

Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable
Collecting llama-index-utils-workflow
  Obtaining dependency information for llama-index-utils-workflow from https://files.pythonhosted.org/packages/ea/be/b7b8906da0c815f5488253a765c5166ef954e5fada5c66f908beb9fc85ac/llama_index_utils_workflow-0.3.0-py3-none-any.whl.metadata
  Downloading llama_index_utils_workflow-0.3.0-py3-none-any.whl.metadata (665 bytes)
Collecting pyvis<0.4.0,>=0.3.2 (from llama-index-utils-workflow)
  Obtaining dependency information for pyvis<0.4.0,>=0.3.2 from https://files.pythonhosted.org/packages/ab/4b/e37e4e5d5ee1179694917b445768bdbfb084f5a59ecd38089d3413d4c70f/pyvis-0.3.2-py3-none-any.whl.metadata
  Downloading pyvis-0.3.2-py3-none-any.whl.metadata (1.7 kB)
Collecting jsonpickle>=1.4.1 (from pyvis<0.4.0,>=0.3.2->llama-index-utils-workflow)
  Obtaining dependency information for jsonpickle>=1.4.1 from htt

In [10]:
from llama_index.utils.workflow import draw_all_possible_flows

In [11]:
draw_all_possible_flows(
    basic_workflow,
    filename="basic_workflow.html"
)

<class 'NoneType'>
<class 'llama_index.core.workflow.events.StopEvent'>
basic_workflow.html


In [10]:
html_content = extract_html_content("basic_workflow.html")
# print(html_content)
display(HTML(html_content), metadata=dict(isolated=True))

## Creating Custom Events

Multiple steps are created by defining custom events that can be emitted by steps and trigger other steps. Let's define a simple 3-step workflow by defining two custom events, `FirstEvent` and `SecondEvent`. These classes can have any names and properties, but must inherit from `Event`:

We define a workflow by defining what `event` each step accept and what `event` each step emit. That defines hwo the data flows around the workflow

In [12]:
from llama_index.core.workflow import Event

class FirstEvent(Event):
    first_output: str
        
class SecondEvent(Event):
    second_output: str

## Define workflow

Now you define the workflow itself. You do this by defining the input and output types on each step.

* `step_one` takes a `StartEvent` and returns a `FirstEvent`
* `step_two` takes a `FirstEvent` and returns a `SecondEvent`
* `step_three` takes a `SecondEvent` and returns a `StopEvent`

In [32]:
class MyWorkflow(Workflow):
    @step
    #ev is parameter of function 
    #startevent is type hint, meaning ev is expected to be an instance of StartEvent.
    #Type hints do not enforce types at runtime, but they help with code readability and IDE autocompletion
    #The -> FirstEvent indicates that this function returns an instance of FirstEvent.
    #This is not enforced at runtime, but it's a hint to the developer
    
    async def step_one(self, ev: StartEvent) -> FirstEvent:
        print(ev.first_input) #instance of startEvent
        return FirstEvent(first_output="First step complete.")

    @step #when this removed WorkflowValidationError occurs as step two is used in step three
    async def step_two(self, ev: FirstEvent) -> SecondEvent:
        print(ev.first_output)
        return SecondEvent(second_output="Second step complete.")

    @step
    async def step_three(self, ev: SecondEvent) -> StopEvent:
        print(ev.second_output)
        return StopEvent(result="Workflow complete.")

*Note:* Properties of `StartEvent` and `StopEvent`:
- For `StartEvent`, you define its properties and pass in their values when you run the workflow as shown in the next cell.
- For `StopEvent`, by default, it only has one property `result`. You can always create a class that inherits `StopEvent` so you can customize what it returns.

In [33]:
workflow = MyWorkflow(timeout=10, verbose=False)
result = await workflow.run(first_input="Start the workflow.")
print(result)

Start the workflow.
First step complete.
Second step complete.
Workflow complete.


And you can visualize it just like you did before:

In [14]:
WORKFLOW_FILE = "workflows/custom_events.html"
draw_all_possible_flows(workflow, filename=WORKFLOW_FILE)

<class 'NoneType'>
<class '__main__.FirstEvent'>
<class 'llama_index.core.workflow.events.StopEvent'>
<class '__main__.SecondEvent'>
workflows/custom_events.html


In [15]:
html_content = extract_html_content(WORKFLOW_FILE)
display(HTML(html_content), metadata=dict(isolated=True))

In [35]:
WORKFLOW_FILE = "custom_events.html"

html_content = extract_html_content(WORKFLOW_FILE)
display(HTML(html_content), metadata=dict(isolated=True))

## Creating Loops

However, there's not much point to a workflow if it just runs straight through! A key feature of Workflows is their enablement of branching and looping logic, more simply and flexibly than graph-based approaches. To enable looping, let's create a new `LoopEvent`:

In [36]:
class LoopEvent(Event):
    loop_output: str

Now you'll edit your `step_one` to make a random decision about whether to execute serially or loop back:

In [37]:
class MyWorkflow(Workflow):
    @step
    async def step_one(self, ev: StartEvent | LoopEvent) -> FirstEvent | LoopEvent:
        if random.randint(0, 1) == 0:
            print("Bad thing happened")
            return LoopEvent(loop_output="Back to step one.")
        else:
            print("Good thing happened")
            return FirstEvent(first_output="First step complete.")

    @step
    async def step_two(self, ev: FirstEvent) -> SecondEvent:
        print(ev.first_output)
        return SecondEvent(second_output="Second step complete.")

    @step
    async def step_three(self, ev: SecondEvent) -> StopEvent:
        print(ev.second_output)
        return StopEvent(result="Workflow complete.")

Note the new type annotations on `step_one`: the step now accepts either a `StartEvent` or a `LoopEvent` to trigger the step, and it also emits either a `FirstEvent` or a `LoopEvent`.

You run it as usual. You might need to run it a couple of times to see the loop happen.

In [38]:
loop_workflow = MyWorkflow(timeout=10, verbose=False)
result = await loop_workflow.run(first_input="Start the workflow.")
print(result)

Bad thing happened
Good thing happened
First step complete.
Second step complete.
Workflow complete.


In [39]:
WORKFLOW_FILE = "loop_events.html"
draw_all_possible_flows(loop_workflow, filename=WORKFLOW_FILE)
html_content = extract_html_content(WORKFLOW_FILE)
display(HTML(html_content), metadata=dict(isolated=True))

<class 'NoneType'>
<class '__main__.FirstEvent'>
<class '__main__.LoopEvent'>
<class 'llama_index.core.workflow.events.StopEvent'>
<class '__main__.SecondEvent'>
loop_events.html


In [40]:

html_content = extract_html_content(WORKFLOW_FILE)
display(HTML(html_content), metadata=dict(isolated=True))

In [41]:
class BranchA1Event(Event):
    payload: str

class BranchA2Event(Event):
    payload: str

class BranchB1Event(Event):
    payload: str

class BranchB2Event(Event):
    payload: str

In [42]:
class BranchWorkflow(Workflow):
    @step
    async def start(self, ev: StartEvent) -> BranchA1Event | BranchB1Event:
        if random.randint(0, 1) == 0:
            print("Go to branch A")
            return BranchA1Event(payload="Branch A")
        else:
            print("Go to branch B")
            return BranchB1Event(payload="Branch B")

    @step
    async def step_a1(self, ev: BranchA1Event) -> BranchA2Event:
        print(ev.payload)
        return BranchA2Event(payload=ev.payload)

    @step
    async def step_b1(self, ev: BranchB1Event) -> BranchB2Event:
        print(ev.payload)
        return BranchB2Event(payload=ev.payload)

    @step
    async def step_a2(self, ev: BranchA2Event) -> StopEvent:
        print(ev.payload)
        return StopEvent(result="Branch A complete.")

    @step
    async def step_b2(self, ev: BranchB2Event) -> StopEvent:
        print(ev.payload)
        return StopEvent(result="Branch B complete.")

In [44]:
branch_workflow = BranchWorkflow(timeout=10, verbose=False)
result = await branch_workflow.run(first_input="Start the workflow.")
print(result)

Go to branch A
Branch A
Branch A
Branch A complete.


In [46]:
WORKFLOW_FILE = "branching.html"
draw_all_possible_flows(BranchWorkflow, filename=WORKFLOW_FILE)
html_content = extract_html_content(WORKFLOW_FILE)

<class 'NoneType'>
<class '__main__.BranchA1Event'>
<class '__main__.BranchB1Event'>
<class '__main__.BranchA2Event'>
<class 'llama_index.core.workflow.events.StopEvent'>
<class '__main__.BranchB2Event'>
<class 'llama_index.core.workflow.events.StopEvent'>
branching.html


In [47]:
display(HTML(html_content), metadata=dict(isolated=True))

## Concurent Execution

The final form of flow control you can implement in workflows is concurrent execution. This allows you to efficiently run long-running tasks in parallel, and gather them together when they are needed. Let's see how this is done.

You'll be using a new concept, the `Context` object. This is a form of shared memory available to every step in a workflow: to access it, declare it as an argument to your step and it will be automatically populated.

In this example, you'll use `Context.send_event` rather than returning an event. This allows you to emit multiple events in parallel rather than returning just one as you did previously.

In [55]:
import asyncio

class StepTwoEvent(Event):
    #StepTwoEvent is a custom event class that stores a query
    #Each instance of StepTwoEvent represents a separate query that will be processed.
    query: str

class ParallelFlow(Workflow):
    @step
    #ctx: Context → A context object that allows sending new events.
    #ev: StartEvent → The initial event that starts the workflow
    #It sends three StepTwoEvent events, each with a different query
    async def start(self, ctx: Context, ev: StartEvent) -> StepTwoEvent:
        # this is emmiting three different second event
        ctx.send_event(StepTwoEvent(query="Query 1"))
        ctx.send_event(StepTwoEvent(query="Query 2"))
        ctx.send_event(StepTwoEvent(query="Query 3"))

    @step(num_workers=4)
    async def step_two(self, ctx: Context, ev: StepTwoEvent) -> StopEvent:
        print("Running slow query ", ev.query)
        await asyncio.sleep(random.randint(1, 5))

        return StopEvent(result=ev.query)

In [58]:
parallel_workflow = ParallelFlow(timeout=10, verbose=False)
result = await parallel_workflow.run()
print(result)

Running slow query  Query 1
Running slow query  Query 2
Running slow query  Query 3
Query 3


### Collecting events

But what if you do want the output of all 3 events? Another method, `Context.collect_events`, exists for that purpose:

In [59]:
class StepThreeEvent(Event):
    result: str

class ConcurrentFlow(Workflow):
    @step
    async def start(self, ctx: Context, ev: StartEvent) -> StepTwoEvent:
        ctx.send_event(StepTwoEvent(query="Query 1"))
        ctx.send_event(StepTwoEvent(query="Query 2"))
        ctx.send_event(StepTwoEvent(query="Query 3"))

    @step(num_workers=4)
    async def step_two(self, ctx: Context, ev: StepTwoEvent) -> StepThreeEvent:
        print("Running query ", ev.query)
        await asyncio.sleep(random.randint(1, 5))
        return StepThreeEvent(result=ev.query)

    @step
    async def step_three(self, ctx: Context, ev: StepThreeEvent) -> StopEvent:
        # wait until we receive 3 events
        result = ctx.collect_events(ev, [StepThreeEvent] * 3)
        if result is None:
            print("Not all events received yet.")
            return None

        # do something with all 3 results together
        print(result)
        return StopEvent(result="Done")

In [60]:
w = ConcurrentFlow(timeout=10, verbose=False)
result = await w.run(message="Start the workflow.")
print(result)

Running query  Query 1
Running query  Query 2
Running query  Query 3
Not all events received yet.
Not all events received yet.
[StepThreeEvent(result='Query 1'), StepThreeEvent(result='Query 2'), StepThreeEvent(result='Query 3')]
Done


What `collect_events` does is store the events in the context until it has collected the number and type of events specified in its second argument. In this case, you've told it to wait for 3 events.

If an event fires and `collect_events` hasn't yet seen the right number of events, it returns `None`, so you tell `step_three` to do nothing in that case. When `collect_events` receives the right number of events it returns them as an array, which you can see being printed in the final output.

*Note:* This flow control lets you perform map-reduce style tasks.To implement a map-reduce pattern, you would split your task up into as many steps as necessary, and use `Context` to store that number with `ctx.set("num_events", some_number)`. Then in `step_three` you would wait for the number stored in the context using `await ctx.get("num_events")`. So you don't need to know in advance exactly how many concurrent steps you're taking. You'll do exactly this in a later lesson.

### Collecting different event types

You don't just have to wait for multiple events of the same kind. In this example, you'll emit 3 totally different events and collect them at the end.

In [61]:
class StepAEvent(Event):
    query: str

class StepACompleteEvent(Event):
    result: str

class StepBEvent(Event):
    query: str

class StepBCompleteEvent(Event):
    result: str

class StepCEvent(Event):
    query: str

class StepCCompleteEvent(Event):
    result: str

In [62]:
class ConcurrentFlow(Workflow):
    @step
    async def start(
        self, ctx: Context, ev: StartEvent
    ) -> StepAEvent | StepBEvent | StepCEvent:
        ctx.send_event(StepAEvent(query="Query 1"))
        ctx.send_event(StepBEvent(query="Query 2"))
        ctx.send_event(StepCEvent(query="Query 3"))

    @step
    async def step_a(self, ctx: Context, ev: StepAEvent) -> StepACompleteEvent:
        print("Doing something A-ish")
        return StepACompleteEvent(result=ev.query)

    @step
    async def step_b(self, ctx: Context, ev: StepBEvent) -> StepBCompleteEvent:
        print("Doing something B-ish")
        return StepBCompleteEvent(result=ev.query)

    @step
    async def step_c(self, ctx: Context, ev: StepCEvent) -> StepCCompleteEvent:
        print("Doing something C-ish")
        return StepCCompleteEvent(result=ev.query)

    @step
    async def step_three(
        self,
        ctx: Context,
        ev: StepACompleteEvent | StepBCompleteEvent | StepCCompleteEvent,
    ) -> StopEvent:
        print("Received event ", ev.result)

        # wait until we receive 3 events
        events = ctx.collect_events(
            ev,
            [StepCCompleteEvent, StepACompleteEvent, StepBCompleteEvent],
        )
        if (events is None):
            return None

        # do something with all 3 results together
        print("All events received: ", events)
        return StopEvent(result="Done")

When you run it, it will do all three things and wait for them in `step_three`.

In [63]:
w = ConcurrentFlow(timeout=10, verbose=False)
result = await w.run(message="Start the workflow.")
print(result)

Doing something A-ish
Doing something B-ish
Doing something C-ish
Received event  Query 1
Received event  Query 2
Received event  Query 3
All events received:  [StepCCompleteEvent(result='Query 3'), StepACompleteEvent(result='Query 1'), StepBCompleteEvent(result='Query 2')]
Done


This new flow has quite a pretty visualization:

In [64]:
WORKFLOW_FILE = "concurrent_different_events.html"
draw_all_possible_flows(w, filename=WORKFLOW_FILE)
html_content = extract_html_content(WORKFLOW_FILE)

<class 'NoneType'>
<class '__main__.StepAEvent'>
<class '__main__.StepBEvent'>
<class '__main__.StepCEvent'>
<class '__main__.StepACompleteEvent'>
<class '__main__.StepBCompleteEvent'>
<class '__main__.StepCCompleteEvent'>
<class 'llama_index.core.workflow.events.StopEvent'>
concurrent_different_events.html


In [65]:
display(HTML(html_content), metadata=dict(isolated=True))

## Streaming

In practical use, agents can take a long time to run. It's a poor user-experience to have the user execute a workflow and then wait a long time to see if it works or not; it's better to give them some indication that things are happening in real-time, even if the process is not complete.

To do this, Workflows allow streaming events back to the user. Here you'll use `Context.write_event_to_stream` to emit these events.

In [73]:
pip install llama-index-llms-groq

Defaulting to user installation because normal site-packages is not writeable
Collecting llama-index-llms-groq
  Obtaining dependency information for llama-index-llms-groq from https://files.pythonhosted.org/packages/bd/f6/45f10285751524a7fec3f235b92a34a8d376a310d1e8595ae72380cccafe/llama_index_llms_groq-0.3.1-py3-none-any.whl.metadata
  Downloading llama_index_llms_groq-0.3.1-py3-none-any.whl.metadata (2.3 kB)
Collecting llama-index-llms-openai-like<0.4.0,>=0.3.1 (from llama-index-llms-groq)
  Obtaining dependency information for llama-index-llms-openai-like<0.4.0,>=0.3.1 from https://files.pythonhosted.org/packages/ca/f7/c919d769ba8edad0684266b2fd53b62389dadf8dcd1d8c1979d37c557854/llama_index_llms_openai_like-0.3.4-py3-none-any.whl.metadata
  Downloading llama_index_llms_openai_like-0.3.4-py3-none-any.whl.metadata (751 bytes)
Collecting transformers<5.0.0,>=4.37.0 (from llama-index-llms-openai-like<0.4.0,>=0.3.1->llama-index-llms-groq)
  Obtaining dependency information for transform



In [6]:
from llama_index.llms.groq import Groq
from llama_index.core import Settings


os.environ["GROQ_API_KEY"] = os.getenv('GROQ_API')
llm = Groq(model="llama3-70b-8192", api_key=os.environ["GROQ_API_KEY"])
Settings.llm = llm

In [7]:
class FirstEvent(Event):
    first_output: str

class SecondEvent(Event):
    second_output: str
    response: str

class TextEvent(Event):
    delta: str

class ProgressEvent(Event):
    msg: str

NameError: name 'Event' is not defined

The specific event we'll be sending back is the "delta" responses from the LLM. When you ask an LLM to generate a streaming response as you're doing here, it sends back each chunk of its response as it becomes available. This is available as the "delta". You're going to wrap the delta in a TextEvent and send it back to the Workflow's own stream.

In [68]:
class MyWorkflow(Workflow):
    @step
    async def step_one(self, ctx: Context, ev: StartEvent) -> FirstEvent:
        ctx.write_event_to_stream(ProgressEvent(msg="Step one is happening"))
        return FirstEvent(first_output="First step complete.")

    @step
    async def step_two(self, ctx: Context, ev: FirstEvent) -> SecondEvent:
        llm = OpenAI(model="gpt-4o-mini", api_key=api_key) 
        generator = await llm.astream_complete(
            "Please give me the first 50 words of Moby Dick, a book in the public domain."
        )
        async for response in generator:
            # Allow the workflow to stream this piece of response
            ctx.write_event_to_stream(TextEvent(delta=response.delta))
        return SecondEvent(
            second_output="Second step complete, full response attached",
            response=str(response),
        )

    @step
    async def step_three(self, ctx: Context, ev: SecondEvent) -> StopEvent:
        ctx.write_event_to_stream(ProgressEvent(msg="Step three is happening"))
        return StopEvent(result="Workflow complete.")

You can work with the emitted events by getting a streaming endpoint from the `run` command, and then filtering it for the types of events we want to see (you could print every event if you wanted to but that would be quite noisy).

In this case you'll just print out the progressevents and the textevents.

In [69]:
workflow = MyWorkflow(timeout=30, verbose=False)
handler = workflow.run(first_input="Start the workflow.")

async for ev in handler.stream_events():
    if isinstance(ev, ProgressEvent):
        print(ev.msg)
    if isinstance(ev, TextEvent):
        print(ev.delta, end="")

final_result = await handler
print("Final result = ", final_result)

Step one is happening


WorkflowRuntimeError: Error in step 'step_two': Error code: 429 - {'error': {'message': 'You exceeded your current quota, please check your plan and billing details. For more information on this error, read the docs: https://platform.openai.com/docs/guides/error-codes/api-errors.', 'type': 'insufficient_quota', 'param': None, 'code': 'insufficient_quota'}}

Exception in callback Dispatcher.span.<locals>.wrapper.<locals>.handle_future_result(span_id='Workflow.run...-7a2c281ebd3c', bound_args=<BoundArgumen... workflow.'})>, instance=<__main__.MyW...0021125217A90>, context=<_contextvars...002112522F5C0>)(<WorkflowHand...nt_quota'}}")>) at C:\Users\user\AppData\Roaming\Python\Python311\site-packages\llama_index\core\instrumentation\dispatcher.py:274
handle: <Handle Dispatcher.span.<locals>.wrapper.<locals>.handle_future_result(span_id='Workflow.run...-7a2c281ebd3c', bound_args=<BoundArgumen... workflow.'})>, instance=<__main__.MyW...0021125217A90>, context=<_contextvars...002112522F5C0>)(<WorkflowHand...nt_quota'}}")>) at C:\Users\user\AppData\Roaming\Python\Python311\site-packages\llama_index\core\instrumentation\dispatcher.py:274>
Traceback (most recent call last):
  File "C:\Users\user\AppData\Roaming\Python\Python311\site-packages\llama_index\core\workflow\workflow.py", line 304, in _task
    new_ev = await instrumented_step(**kwargs)
   