* **Workflow**
* **StartEvent**
* **StopEvent**
* **step**
* **Event**
* **Context**

In [65]:
from IPython.display import display, HTML, DisplayHandle
from helper import extract_html_content
import random
from helper import get_openai_api_key

In [66]:
api_key = get_openai_api_key()

In [67]:
api_key

In [68]:
from llama_index.core.workflow import (StartEvent, 
StopEvent,
Workflow,
step,
Context)

In [69]:
class MyWorkflow(Workflow):
    # declare a function as a step.
    @step
    async def my_step(self, ev:StartEvent) -> StopEvent:
        return StopEvent(result="Ikumar")


In [70]:
#Intiate the workflow
basic_workflow = MyWorkflow(timeout=10, verbose=False)
#run the workflow
result = await basic_workflow.run()
print(result)

Ikumar


In [71]:
#running asynchronously in .py file you have to install asyncio lib
# async def main():
#     w = MyWorkflow(timeout=10, verbose=False)
#     result = await w.run()
#     print(result)

In [72]:
#workflow are asynchronous by default

### visualize the workflow

In [73]:
from llama_index.utils.workflow import draw_all_possible_flows

In [74]:
draw_all_possible_flows(
    basic_workflow,
    filename="workflows/basic_workflow.html"
)

<class 'NoneType'>
<class 'llama_index.core.workflow.events.StopEvent'>
workflows/basic_workflow.html


In [75]:
html_content = extract_html_content("workflows/basic_workflow.html")
display(HTML(html_content), metadata=dict(isolated=True))

#### creating custom events

In [76]:
from llama_index.core.workflow import Event

In [77]:
class FirstEvent(Event):
    first_output:str

class SecondEvent(Event):
    second_output:str

In [78]:
class MyWorkflow(Workflow):
    @step
    async def step_one(self, ev:StartEvent)->FirstEvent:
        print(ev.first_input)
        return FirstEvent(first_output="first event completed")
    @step
    async def step_two(self, ev: FirstEvent)-> SecondEvent:
        print(ev.first_output)
        return SecondEvent(second_output="Second step complete")
    @step
    async def step_three(self, ev: SecondEvent)-> StopEvent:
        print(ev.second_output)
        return StopEvent(result="Workflow complete")


In [79]:
workflow = MyWorkflow(timeout=10, verbose=False)
result = await workflow.run(first_input="Start the workflow.")
print(result)

Start the workflow.
first event completed
Second step complete
Workflow complete


In [80]:
WORKFLOW_FILE = "workflows/custom_events.html"
draw_all_possible_flows(workflow, WORKFLOW_FILE)

<class 'NoneType'>
<class '__main__.FirstEvent'>
<class 'llama_index.core.workflow.events.StopEvent'>
<class '__main__.SecondEvent'>
workflows/custom_events.html


In [81]:
html_content = extract_html_content(WORKFLOW_FILE)
display(HTML(html_content), metadata=dict(isolated=True))

### creating loops

In [82]:
class LoopEvent(Event):
    loop_output:str

In [83]:
class MyWorkflow(Workflow):
    @step
    async def step_one(self, ev: StartEvent | LoopEvent) -> FirstEvent | LoopEvent:
        if random.randint(0, 1) == 0:
            print("bad thing happened")
            return LoopEvent(loop_output="back to step one")
        else: 
            print("good thing happened")
            return FirstEvent(first_output="first step completed")
    @step
    async def step_two(self, ev: FirstEvent) -> SecondEvent:
        print(ev.first_output)
        return SecondEvent(second_output="SEcond step completed")

    @step
    async def step_three(self, ev: SecondEvent)-> StopEvent:
        print(ev.second_output)
        return StopEvent(result="Workflow completed")


In [94]:
loop_workflow = MyWorkflow(timeout=10, verbose=False)
result = await loopworkflow.run(first_input="start the workflow.")
print(result)

bad thing happened


WorkflowTimeoutError: Operation timed out after 10 seconds

In [None]:
WORKFLOW_FILE = "workflows/loop_events.html"
draw_all_possible_flows(loop_workflow, filename=WORKFLOW_FILE)
html_content = extract_html_content(WORKFLOW_FILE)
display(HTML(html_content), metadata=dict(isolated=True))

### Branching

In [None]:
class BranchA1Event(Event):
    payload: str
class BranchA2Event(Event):
    payload: str
class BranchB1Event(Event):
    payload: str
class BranchB2Event(Event):
    payload: str

In [None]:
class BranchWorkflow(Workflow):
    @step
    async def start(self, ev: StartEvent) -> BranchA1Event | BranchB1Event:
        if random.randint(0, 1) == 0:
            print("Go to branch A")
            return BranchA1Event(payload="BRanch A")
        else:
            print("Go to branch B")
            return BranchB1Event(payload="Branch B")
    @step
    async def step_a1(eslf, ev: BranchA1Event) -> BranchA2Event:
        print(ev.patload)
        return BranchA2Event(payload=ev.payload)

    @step
    async def step_b1(self, ev: BranchB1Event) -> BranchB2Event:
        print(ev.payload)
        return BranchB2Event(payload=ev.payload)
    @step
    async def step_a2(self, ev: BranchA2Event) -> StopEvent:
        print(ev.payload)
        return StopEvent(result="Branch A complete.")
    @step
    async def step_b2(self, ev: BranchB2Event) -> StopEvent:
        print(ev.payload)
        return StopEvent(result="Branch B complete.")

In [None]:
WORKFLOW_FILE = "workflows/branching.html"
draw_all_possible_flows(BranchWorkflow, filename=WORKFLOW_FILE)
html_content = extract_html_content(WORKFLOW_FILE)

In [92]:
display(HTML(html_content), metadata=dict(isolated=True))

#### Concurent Execution

In [None]:
import asyncio

class StepTwoEvent(Event):
    query: str

class ParallelFlow(Workflow):
    @step
    async def start(self, ctx: Context, ev: StartEvent) -> StepTwoEvent:
        ctx.send_event(StepTwoEvent(query="Query 1"))
        ctx.send_event(StepTwoEvent(query="Query 2"))
        ctx.send_event(StepTwoEvent(query="Query 3"))

    @step(num_workers=4)
    async def step_wto(self, ctx: Context, ev: StepTwoEvent) -> StopEvent:
        print("Running slow query", ev.query)
        await asyncio.sleep(random.randint(1, 5))
        return StopEvent(result=ev.query)

In [None]:
parallel_workflow = ParallelFlow(timeout=10, verbose=False)
result = await parallel_workflow.run()
print(result)

#### Collecting events

In [None]:
class StepThreeEvent(Event):
    result: str

class ConcurrentFlow(Workflow):
    @step
    async def start(self, ctx: Context, ev: StartEvent) -> StepTwoEvent:
        ctx.send_event(StepTwoEvent(query="Query 1"))
        ctx.send_event(StepTwoEvent(query="Query 2"))
        ctx.send_event(StepTwoEvent(query="Query 3"))

    @step(num_workers=4)
    async def step_two(self, ctx: Context, ev: StepTwoEvent) -> StepThreeEvent:
        print("Running query ", ev.query)
        await asyncio.sleep(random.randint(1, 5))
        return StepThreeEvent(result=ev.query)
    @step
    async def step_three(self, ctx: Context, ev: StepThreeEvent) -> StopEvent:
        # wait until received three events
        result = ctx.collect_events(ev, [StepThreeEvent] * 3)
        if result is None:
            print("Not all events received yet.")
            return None
        # do something with all 3 results together
        print(result)
        return StopEvent(result="DOne")

In [None]:
w = ConcurrentFlow(timeout=10, verbose=False)
result = await w.run(message="Start the workflow.")
print(result)

### Collecting different event types

In [87]:
class StepAEvent(Event):
    query: str
class StepAComplementEvent(Event):
    result: str

class StepBEvent(Event):
    query: str
class StepBCompleteEvent(Event):
    result: str

class StepCEvent(Event):
    query: str
class StepCCompleteEvent(Event):
    result:str

In [88]:
class ConcurrentFlow(Workflow):
    @step
    async def start(self, ctx: Context, ev: StartEvent) -> StepAEvent | StepBEvent | StepCEvent:
        ctx.send_event(StepAEvent(query="Query 1"))
        ctx.send_event(StepBEvent(query="Query 2"))
        ctx.send_event(StepCEvent(query="Query 3"))

    @step
    async def step_a(self, ctx: Context, ev: StepAEvent) -> StepAComplementEvent:
        print("DOing something A-ish")
        return StepAComplementEvent(result=ev.query)
    @step
    async def step_b(self, cvx: Context, ev: StepBEvent) -> StepBCompleteEvent:
        print("Doing something B-ish")
        return StepBCompleteEvent(result=ev.query)
    @step
    async def step_c(self, ctx: Context, ev: StepCEvent) -> StepCCompleteEvent:
        print("Doing something C-ish")
        return StepCCompleteEvent(result=ev.query)
    @step
    async def step_three(self, ctx:Context, ev: StepAComplementEvent | StepBCompleteEvent | StepCCompleteEvent) -> StopEvent:
        print("Recieved event ", ev,result)
        #wait until we recieve 3 events
        events = ctx.collect_events(ev, [StepCCompleteEvent, StepAComplementEvent, StepBCompleteEvent],)
        if events is None:
            return None
        # do something with all 3 results together
        print("All events recieved:", events)
        return StopEvent(result="Done")

In [89]:
w = ConcurrentFlow(timeout=10, verbose=False)
result = await w.run(message="Start the workflow.")
print(result)

DOing something A-ish
Doing something B-ish
Doing something C-ish
Recieved event  result='Query 1' Workflow complete
Recieved event  result='Query 2' Workflow complete
Recieved event  result='Query 3' Workflow complete
All events recieved: [StepCCompleteEvent(result='Query 3'), StepAComplementEvent(result='Query 1'), StepBCompleteEvent(result='Query 2')]
Done


In [90]:
WORKFLOW_FILE = "workflows/concurrent_different_events.html"
draw_all_possible_flows(w, filename=WORKFLOW_FILE)
html_content = extract_html_content(WORKFLOW_FILE)

<class 'NoneType'>
<class '__main__.StepAEvent'>
<class '__main__.StepBEvent'>
<class '__main__.StepCEvent'>
<class '__main__.StepAComplementEvent'>
<class '__main__.StepBCompleteEvent'>
<class '__main__.StepCCompleteEvent'>
<class 'llama_index.core.workflow.events.StopEvent'>
workflows/concurrent_different_events.html


In [91]:
display(HTML(html_content), metadata=dict(isolated=True))

### Sreaming

In [60]:
from llama_index.llms.openai import OpenAI

In [61]:
class FirstEvent(Event):
    first_output: str

class SecondEvent(Event):
    second_output: str
    response: str

class TextEvent(Event):
    delta: str

class ProgressEvent(Event):
    msg: str

In [62]:
class MyWorkflow(Workflow):
    @step
    async def step_one(self, ctx: Context, ev: StartEvent) -> FirstEvent:
        ctx.write_event_to_stream(ProgressEvent(msg="Step one is happening"))
        return FirstEvent(first_output="First step complete.")

    @step
    async def step_two(self, ctx: Context, ev: FirstEvent) -> SecondEvent:
        llm = OpenAI(model="gpt-4o-mini", api_key=api_key) 
        generator = await llm.astream_complete(
            "Please give me the first 50 words of Moby Dick, a book in the public domain."
        )
        async for response in generator:
            # Allow the workflow to stream this piece of response
            ctx.write_event_to_stream(TextEvent(delta=response.delta))
        return SecondEvent(
            second_output="Second step complete, full response attached",
            response=str(response),
        )

    @step
    async def step_three(self, ctx: Context, ev: SecondEvent) -> StopEvent:
        ctx.write_event_to_stream(ProgressEvent(msg="Step three is happening"))
        return StopEvent(result="Workflow complete.")

In [86]:
workflow = MyWorkflow(timeout=30, verbose=False)
handler = workflow.run(first_input="Start the workflow.")

async for ev in handler.stream_events():
    if isinstance(ev, ProgressEvent):
        print(ev.msg)
    if isinstance(ev, TextEvent):
        print(ev.delta, end="")

final_result = await handler
print("Final result = ", final_result)

bad thing happened
good thing happened
first step completed
SEcond step completed
Final result =  Workflow completed
