In [2]:
# Importing the Required Libraries 
from IPython.display import display, HTML
from helper import extract_html_content
import random
from helper import get_openai_api_key

# LlamaIndex Libraries 
from llama_index.core.workflow import (
    StartEvent,
    StopEvent,
    Workflow,
    step,
    Context
)
# For Visualization of Workflows 
from llama_index.utils.workflow import draw_all_possible_flows
# For defining Custom Events 
from llama_index.core.workflow import Event

In [3]:
# Getting the OpenAPI Key 
api_key = get_openai_api_key()

## Creating a Workflow 

In [7]:
class MyWorkflow(Workflow):
    # declare a function as a step
    @step
    async def my_step(self, ev: StartEvent) -> StopEvent:
        # do something here
        return StopEvent(result="Hello, world!")

In [8]:
# instantiate the workflow
basic_workflow = MyWorkflow(timeout=10, verbose=False)
# run the workflow
result = await basic_workflow.run()
print(result)

Hello, world!


In [12]:
draw_all_possible_flows(
    basic_workflow, 
    filename="basic_workflow.html"
)

basic_workflow.html


In [4]:
class FirstEvent(Event):
    first_output: str

class SecondEvent(Event):
    second_output: str

In [19]:
class MyWorkflow(Workflow):
    @step
    async def step_one(self, ev:StartEvent) -> FirstEvent : 
        print(ev.first_input)
        return FirstEvent(first_output="First step complete")

    @step
    async def step_two(self, ev:FirstEvent) -> SecondEvent :
        print(ev.first_output)
        return SecondEvent(second_output="Second step complete")

    @step
    async def step_three(self, ev:SecondEvent) -> StopEvent:
        print(ev.second_output)
        return StopEvent(result="Workflow complete")

In [20]:
workflow = MyWorkflow(timeout=10, verbose=False)
result = await workflow.run(first_input="Start the workflow.")
print(result)

Start the workflow.
First step complete
Second step complete
Workflow complete


In [22]:
WORKFLOW_FILE = "custom_events.html"
draw_all_possible_flows(workflow, filename=WORKFLOW_FILE)

custom_events.html


In [24]:
# html_content = extract_html_content(WORKFLOW_FILE)
# display(HTML(html_content), metadata=dict(isolated=True))

## Creating Loops 

In [7]:
class LoopEvent(Event):
    loop_output: str

class MyWorkflow(Workflow):
    @step
    async def step_one(self, ev:StartEvent | LoopEvent) -> FirstEvent :         
        if random.randint(0, 1) == 0:
            print("Bad thing happened")
            return LoopEvent(loop_output="Back to step one.")
        else:
            print("Good thing happened")
            return FirstEvent(first_output="First step complete.")


    @step
    async def step_two(self, ev:FirstEvent) -> SecondEvent :
        print(ev.first_output)
        return SecondEvent(second_output="Second step complete")

    @step
    async def step_three(self, ev:SecondEvent) -> StopEvent:
        print(ev.second_output)
        return StopEvent(result="Workflow complete") 

In [11]:
loop_workflow = MyWorkflow(timeout=10, verbose=False)
result = await loop_workflow.run(first_input="Start the workflow.")
print(result)

WorkflowValidationError: The following events are consumed but never produced: LoopEvent

In [12]:
WORKFLOW_FILE = "loop_events.html"
draw_all_possible_flows(loop_workflow, filename=WORKFLOW_FILE)
html_content = extract_html_content(WORKFLOW_FILE)
display(HTML(html_content), metadata=dict(isolated=True))

loop_events.html


## Concurrent Execution 

In [13]:
import asyncio

class StepTwoEvent(Event):
    query: str

class ParallelFlow(Workflow):
    @step
    async def start(self, ctx: Context, ev:StartEvent) -> StepTwoEvent:
        ctx.send_event(StepTwoEvent(query="Query 1"))
        ctx.send_event(StepTwoEvent(query="Query 2"))
        ctx.send_event(StepTwoEvent(query="Query 3"))

    @step(num_workers=4)
    async def step_two(self,ctx: Context, ev:StepTwoEvent) -> StopEvent:
        print("Running Slow Query : ", ev.query)
        await asyncio.sleep(random.randint(1, 5))
        return StopEvent(result=ev.query)

parallel_workflow = ParallelFlow(timeout=10, verbose=False)
result = await parallel_workflow.run()
print(result)            

Running Slow Query :  Query 1
Running Slow Query :  Query 2
Running Slow Query :  Query 3
Query 1


## Collecting Events 

In [None]:
class StepThreeEvent(Event):
    result: str

class ConcurrentFlow(Workflow):
    @step
    async def start(self, ctx: Context, ev: StartEvent) -> StepTwoEvent:
        ctx.send_event(StepTwoEvent(query="Query 1"))
        ctx.send_event(StepTwoEvent(query="Query 2"))
        ctx.send_event(StepTwoEvent(query="Query 3"))

    @step
    async def 