In [1]:
from IPython.display import display, HTML
from helper import extract_html_content
import random
from helper import get_openai_api_key

In [2]:
api_key = get_openai_api_key()

In [3]:
from llama_index.core.workflow import Workflow, StartEvent, StopEvent,step, Context

In [4]:
class MyWorkflow(Workflow):
    # declare a function as a step
    @step
    async def my_step(self, ev: StartEvent) -> StopEvent:
        # do something here
        return StopEvent(result="Hello World")

In [5]:
#initiate a Workflow
basic_workflow= MyWorkflow(timeout=10, verbose=False)
#Run the Workflow
result = await basic_workflow.run()
print(result)

Hello World


In [6]:
from llama_index.core.workflow import draw_all_possible_flows

In [7]:
draw_all_possible_flows(
    basic_workflow,
    filename= "Workflows/basic_workflow.html"
)

  draw_all_possible_flows(


Workflows/basic_workflow.html


In [8]:
html_content= extract_html_content("Workflows/basic_workflow.html")
display(HTML(html_content), metadata=dict(isolated=True))

In [9]:
from llama_index.core.workflow import Event

class FirstEvent(Event):
    first_output:str

class SecondEvent(Event):
    second_output:str

In [10]:
class MyWorkflow(Workflow):
    @step
    async def step_one(self, ev:StartEvent) -> FirstEvent:
        print(ev.first_input)
        return FirstEvent(first_output="First Output")
    @step
    async def step_tow(self, ev:FirstEvent) -> SecondEvent:
        print(ev.first_output)
        return SecondEvent(second_output="Second Output")
    @step
    async def step_tree(self, ev:SecondEvent)-> StopEvent:
        print(ev.second_output)
        return StopEvent(result="Workflow Completed")

In [11]:
workflow=MyWorkflow(timeout=10, verbose=False)
result= await workflow.run(first_input="First Input")
print(result)

First Input
First Output
Second Output
Workflow Completed


In [12]:
draw_all_possible_flows(workflow, filename="Workflows/complex_workflow.html")

Workflows/complex_workflow.html


  draw_all_possible_flows(workflow, filename="Workflows/complex_workflow.html")


In [13]:
html_content=extract_html_content("Workflows/complex_workflow.html")
display(HTML(html_content), metadata=dict(isolated=True))

In [14]:
class LoopEvent(Event):
    loop_output:str

In [15]:
class MyWorkflow(Workflow):
    @step
    async def step_one(self, ev:StartEvent | LoopEvent) -> FirstEvent | LoopEvent:
        if random.randint(0,1)==0:
            print("Looping back to step one")
            return LoopEvent(loop_output="Looping")
        else:
            print("No looping")
            return FirstEvent(first_output="First_step_complete")
    @step
    async def step_two(self, ev: FirstEvent) -> SecondEvent:
        print(ev.first_output)
        return SecondEvent(second_output="Second step complete.")
    @step
    async def step_three(self, ev:SecondEvent) -> StopEvent | LoopEvent:
        if random.randint(0,1)==0:
            print("Looping back to step one")
            return LoopEvent(loop_output="Looping")
        else:
            print("No looping")
            return StopEvent(third_output="Workflow complete")
    

In [16]:
loop_workflow=MyWorkflow(timeout=10, verbose=False)
result= await loop_workflow.run(first_input="Start workflow")
print(result)

No looping
First_step_complete
No looping
None


In [17]:
draw_all_possible_flows(loop_workflow, filename="Workflows/loop_workflow.html")

Workflows/loop_workflow.html


  draw_all_possible_flows(loop_workflow, filename="Workflows/loop_workflow.html")


In [18]:
html_content = extract_html_content("Workflows/loop_workflow.html")
display(HTML(html_content), metadata=dict(isolated=True))

In [19]:
class BranchA1Event(Event):
    payload: str

class BranchA2Event(Event):
    payload: str

class BranchB1Event(Event):
    payload: str

class BranchB2Event(Event):
    payload: str

In [20]:
class BranchWorkflow(Workflow):
    @step
    async def start(self, ev: StartEvent) -> BranchA1Event | BranchB1Event:
        if random.randint(0, 1) == 0:
            print("Go to branch A")
            return BranchA1Event(payload="Branch A")
        else:
            print("Go to branch B")
            return BranchB1Event(payload="Branch B")

    @step
    async def step_a1(self, ev: BranchA1Event) -> BranchA2Event:
        print(ev.payload)
        return BranchA2Event(payload=ev.payload)

    @step
    async def step_b1(self, ev: BranchB1Event) -> BranchB2Event:
        print(ev.payload)
        return BranchB2Event(payload=ev.payload)

    @step
    async def step_a2(self, ev: BranchA2Event) -> StopEvent | BranchB1Event:
        if random.randint(0, 1) == 0:
            print(ev.payload)
            return StopEvent(result="Branch A complete.")
        else:
            print(ev.payload)
            return BranchB1Event(payload="Branch A")

    @step
    async def step_b2(self, ev: BranchB2Event) -> StopEvent:
        print(ev.payload)
        return StopEvent(result="Branch B complete.")

In [21]:
WORKFLOW_FILE = "workflows/branching.html"
draw_all_possible_flows(BranchWorkflow, filename=WORKFLOW_FILE)
html_content = extract_html_content(WORKFLOW_FILE)

workflows/branching.html


  draw_all_possible_flows(BranchWorkflow, filename=WORKFLOW_FILE)


In [22]:
display(HTML(html_content), metadata=dict(isolated=True))

In [23]:
import asyncio
class StepTwoEvent(Event):
    query: str

class ParallelFlow(Workflow):
    @step
    async def start(self, ctx:Context, ev: StartEvent) -> StepTwoEvent:
        ctx.send_event(StepTwoEvent(query="First query"))
        ctx.send_event(StepTwoEvent(query="Second query"))
        ctx.send_event(StepTwoEvent(query="Third query"))

    @step(num_workers=4)
    async def step_two(self, ctx:Context, ev: StepTwoEvent) -> StopEvent:
        print(ev.query)
        await asyncio.sleep(random.randint(1, 5))

        return StopEvent(result=ev.query)

In [24]:
parallel_workflow = ParallelFlow(timeout=10, verbose=False)
result = await parallel_workflow.run()
print(result)

First query
Second query
Third query
First query


In [25]:
draw_all_possible_flows(parallel_workflow, filename="Workflows/parallel_workflow.html")

Workflows/parallel_workflow.html


  draw_all_possible_flows(parallel_workflow, filename="Workflows/parallel_workflow.html")


In [26]:
class StepThreeEvent(Event):
    result: str

class ConcurrentFlow(Workflow):
    @step
    async def start(self, ctx: Context, ev: StartEvent) ->StepTwoEvent:
        ctx.send_event(StepTwoEvent(query="First query"))
        ctx.send_event(StepTwoEvent(query="Second query"))
        ctx.send_event(StepTwoEvent(query="Third query"))

    @step(num_workers=4)
    async def step_two(self, ctx: Context, ev: StepTwoEvent) -> StepThreeEvent:
        print(ev.query)
        await asyncio.sleep(random.randint(1, 5))
        return StepThreeEvent(result=ev.query)
    
    @step
    async def step_three(self, ctx: Context, ev: StepThreeEvent) -> StopEvent:
        #wait until 3 events are received
        result= ctx.collect_events(ev, [StepThreeEvent]*3)
        if result is None:
            print("Waiting for more events")
            return None
        
        print(result)
        return StopEvent(result="Workflow complete")

In [27]:
W= ConcurrentFlow(timeout=10, verbose=False)
result= await W.run(message="Start workflow")
print(result)

First query
Second query
Third query
Waiting for more events
Waiting for more events
[StepThreeEvent(result='Second query'), StepThreeEvent(result='Third query'), StepThreeEvent(result='First query')]
Workflow complete


In [28]:
class StepAEvent(Event):
    query: str

class StepACompleteEvent(Event):
    result: str

class StepBEvent(Event):
    query: str

class StepBCompleteEvent(Event):
    result: str

class StepCEvent(Event):
    query: str

class StepCCompleteEvent(Event):
    result: str

In [29]:
class ConcurrentFlow(Workflow):
    @step
    async def start(
        self, ctx: Context, ev: StartEvent
    ) -> StepAEvent | StepBEvent | StepCEvent:
        ctx.send_event(StepAEvent(query="Query 1"))
        ctx.send_event(StepBEvent(query="Query 2"))
        ctx.send_event(StepCEvent(query="Query 3"))

    @step
    async def step_a(self, ctx: Context, ev: StepAEvent) -> StepACompleteEvent:
        print("Doing something A-ish")
        return StepACompleteEvent(result=ev.query)

    @step
    async def step_b(self, ctx: Context, ev: StepBEvent) -> StepBCompleteEvent:
        print("Doing something B-ish")
        return StepBCompleteEvent(result=ev.query)

    @step
    async def step_c(self, ctx: Context, ev: StepCEvent) -> StepCCompleteEvent:
        print("Doing something C-ish")
        return StepCCompleteEvent(result=ev.query)

    @step
    async def step_three(
        self,
        ctx: Context,
        ev: StepACompleteEvent | StepBCompleteEvent | StepCCompleteEvent,
    ) -> StopEvent:
        print("Received event ", ev.result)

        events = ctx.collect_events(
            ev,
            [StepCCompleteEvent, StepACompleteEvent, StepBCompleteEvent],
        )
        if (events is None):
            return None

        print("All events received: ", events)
        return StopEvent(result="Done")

In [30]:
w = ConcurrentFlow(timeout=10, verbose=False)
result = await w.run(message="Start the workflow.")
print(result)

Doing something A-ish
Doing something B-ish
Doing something C-ish
Received event  Query 1
Received event  Query 2
Received event  Query 3
All events received:  [StepCCompleteEvent(result='Query 3'), StepACompleteEvent(result='Query 1'), StepBCompleteEvent(result='Query 2')]
Done


In [31]:
WORKFLOW_FILE = "workflows/concurrent_different_events.html"
draw_all_possible_flows(w, filename=WORKFLOW_FILE)
html_content = extract_html_content(WORKFLOW_FILE)

workflows/concurrent_different_events.html


  draw_all_possible_flows(w, filename=WORKFLOW_FILE)


In [32]:
display(HTML(html_content), metadata=dict(isolated=True))

In [33]:
from llama_index.llms.openai import OpenAI

In [34]:
class FirstEvent(Event):
    first_output: str

class SecondEvent(Event):
    second_output: str
    response: str

class TextEvent(Event):
    delta: str

class ProgressEvent(Event):
    msg: str

In [35]:
class MyWorkflow(Workflow):
    @step
    async def step_one(self, ctx: Context, ev: StartEvent) -> FirstEvent:
        ctx.write_event_to_stream(ProgressEvent(msg="Step one is happening"))
        return FirstEvent(first_output="First step complete.")

    @step
    async def step_two(self, ctx: Context, ev: FirstEvent) -> SecondEvent:
        llm = OpenAI(model="gpt-4o-mini", api_key=api_key) 
        generator = await llm.astream_complete(
            "Please give me the first 50 words of Moby Dick, a book in the public domain."
        )
        async for response in generator:
            ctx.write_event_to_stream(TextEvent(delta=response.delta))
        return SecondEvent(
            second_output="Second step complete, full response attached",
            response=str(response),
        )

    @step
    async def step_three(self, ctx: Context, ev: SecondEvent) -> StopEvent:
        ctx.write_event_to_stream(ProgressEvent(msg="Step three is happening"))
        return StopEvent(result="Workflow complete.")

In [None]:
workflow = MyWorkflow(timeout=30, verbose=False)
handler = workflow.run(first_input="Start the workflow.")

async for ev in handler.stream_events():
    if isinstance(ev, ProgressEvent):
        print(ev.msg)
    if isinstance(ev, TextEvent):
        print(ev.delta, end="")

final_result = await handler
print("Final result = ", final_result)