## Workflows cookbook: Walking through all features of Workflows¶

- https://docs.llamaindex.ai/en/latest/examples/workflow/workflows_cookbook/

In [5]:
%load_ext dotenv
%dotenv

In [28]:
import random
import sys
from pathlib import Path

from IPython.display import IFrame

# requires llama-index>=0.12.47
from llama_index.core.workflow import (
    Context,
    Event,
    StartEvent,
    StopEvent,
    Workflow,
    step,
)
from llama_index.llms.groq import Groq
from llama_index.utils.workflow import (
    draw_all_possible_flows,
    draw_most_recent_execution,
)

sys.path.append("..")

from src.constants import GROQ_MODEL_NAME
from src.settings import settings

base_assets_path = "assets/"
Path(base_assets_path).mkdir(exist_ok=True, parents=True)

# Workflow basics

In [21]:
class GroqGenerator(Workflow):
    @step
    async def generate(self, ev: StartEvent) -> StopEvent:
        llm = Groq(model=GROQ_MODEL_NAME, api_key=settings.GROQ_API)
        response = await llm.acomplete(ev.query)
        return StopEvent(result=str(response))


w = GroqGenerator(timeout=10, verbose=True)
result = await w.run(query="What's LlamaIndex in one sentence?")
print(result)

Running step generate
Step generate produced event StopEvent
LlamaIndex is a open-source library that enables efficient and scalable indexing and querying of large language models, allowing for fast and accurate retrieval of relevant information from massive datasets.


In [29]:
file_name = base_assets_path + "trivial_workflow.html"
draw_all_possible_flows(OpenAIGenerator, filename=file_name)

assets/trivial_workflow.html


In [37]:
display(IFrame(file_name, width=900, height=600))

## Loops and branches

In [38]:
class FailedEvent(Event):
    error: str


class QueryEvent(Event):
    query: str


class LoopExampleFlow(Workflow):
    @step
    async def answer_query(
        self, ev: StartEvent | QueryEvent
    ) -> FailedEvent | StopEvent:
        query = ev.query
        # try to answer the query
        random_number = random.randint(0, 1)
        if random_number == 0:
            return FailedEvent(error="Failed to answer the query.")
        else:
            return StopEvent(result="The answer to your query")

    @step
    async def improve_query(self, ev: FailedEvent) -> QueryEvent | StopEvent:
        # improve the query or decide it can't be fixed
        random_number = random.randint(0, 1)
        if random_number == 0:
            return QueryEvent(query="Improving query!")
        else:
            return StopEvent(result="Stop: Your query can't be fixed.")

In [50]:
for idx in range(4):
    print(f"{idx}: " + "-" * 10 + "Start!" + "-" * 10)
    l = LoopExampleFlow(timeout=10, verbose=True)
    result = await l.run(query="What's LlamaIndex?")
    print(result)

0: ----------Start!----------
Running step answer_query
Step answer_query produced event StopEvent
The answer to your query
1: ----------Start!----------
Running step answer_query
Step answer_query produced event FailedEvent
Running step improve_query
Step improve_query produced event QueryEvent
Running step answer_query
Step answer_query produced event StopEvent
The answer to your query
2: ----------Start!----------
Running step answer_query
Step answer_query produced event FailedEvent
Running step improve_query
Step improve_query produced event StopEvent
Stop: Your query can't be fixed.
3: ----------Start!----------
Running step answer_query
Step answer_query produced event StopEvent
The answer to your query


In [51]:
file_name = base_assets_path + "loop_workflow.html"
draw_all_possible_flows(LoopExampleFlow, filename=file_name)
display(IFrame(file_name, width=800, height=600))

assets/loop_workflow.html


## Maintaining state between events

In [57]:
class GlobalExampleFlow(Workflow):
    @step
    async def setup(self, ctx: Context, ev: StartEvent) -> QueryEvent:
        await ctx.store.set("some_database", ["A function", "a Lib", "a Class"])
        return QueryEvent(query=ev.query)

    @step
    async def query(self, ctx: Context, ev: QueryEvent) -> StopEvent:
        data = await ctx.store.get("some_database")
        result = f"The answer to your query is {data[1]}"
        return StopEvent(result=result)

In [58]:
g = GlobalExampleFlow(timeout=10, verbose=True)
result = await g.run(query="What's LlamaIndex?")
print(result)

Running step setup
Step setup produced event QueryEvent
Running step query
Step query produced event StopEvent
The answer to your query is a Lib


In [56]:
file_name = base_assets_path + "global_workflow.html"
draw_all_possible_flows(GlobalExampleFlow, filename=file_name)
display(IFrame(file_name, width=800, height=600))

assets/global_workflow.html


## Wait - not working

In [170]:
class WaitExampleFlow(Workflow):
    @step
    async def setup(self, ctx: Context, ev: StartEvent) -> StopEvent:
        if hasattr(ev, "data"):
            print("|-> Set DATA in store")
            await ctx.store.set("data", ev.data)

        return StopEvent(result=None)

    @step
    async def query(self, ctx: Context, ev: StartEvent) -> StopEvent:
        if hasattr(ev, "query"):
            data = await ctx.store.get("data", None)
            print(f"|-> Await for store get {data=}")
            if data:
                return StopEvent(result=f"Got the data {data}")
            else:
                print("|-> there's non data yet")
                return None
        else:
            print("|-> this isn't a query")
            return None

In [173]:
w = WaitExampleFlow(verbose=True)
result = await w.run(query="Can I kick it?")
if result is None:
    print("No you can't")
print("\n---- data ----")
result = await w.run(data="Yes you can")
print("\n---- Query ----")
result = await w.run(query="Can I kick it?")
print(result)

Running step query
|-> Await for store get data=None
|-> there's non data yet
Step query produced no event
Running step setup
Step setup produced event StopEvent
No you can't

---- data ----
Running step query
|-> this isn't a query
Step query produced no event
Running step setup
|-> Set DATA in store
Step setup produced event StopEvent

---- Query ----
Running step query
|-> Await for store get data=None
|-> there's non data yet
Step query produced no event
Running step setup
Step setup produced event StopEvent
None


In [174]:
file_name = base_assets_path + "wait_workflow.html"
draw_all_possible_flows(WaitExampleFlow, filename=file_name)
display(IFrame(file_name, width=800, height=600))

assets/wait_workflow.html


## Waiting for one or more events¶

In [182]:
class InputEvent(Event):
    input: str


class SetupEvent(Event):
    error: bool


class QueryEvent(Event):
    query: str


class CollectExampleFlow(Workflow):
    @step
    async def setup(self, ctx: Context, ev: StartEvent) -> SetupEvent:
        # generically start everything up
        if not hasattr(self, "setup") or not self.setup:
            self.setup = True
            print("I got set up")
        return SetupEvent(error=False)

    @step
    async def collect_input(self, ev: StartEvent) -> InputEvent:
        if hasattr(ev, "input"):
            # perhaps validate the input
            print("I got some input")
            return InputEvent(input=ev.input)

    @step
    async def parse_query(self, ev: StartEvent) -> QueryEvent:
        if hasattr(ev, "query"):
            # parse the query in some way
            print("I got a query")
            return QueryEvent(query=ev.query)

    @step
    async def run_query(
        self, ctx: Context, ev: InputEvent | SetupEvent | QueryEvent
    ) -> StopEvent | None:
        ready = ctx.collect_events(ev, [QueryEvent, InputEvent, SetupEvent])
        print(f"-> {ready=} ")
        if ready is None:
            print("Not enough events yet")
            return None

        # run the query
        print("Now I have all the events")
        print(ready)

        result = f"Ran query '{ready[0].query}' on input '{ready[1].input}'"
        return StopEvent(result=result)

In [183]:
c = CollectExampleFlow()
result = await c.run(input="Here's some input", query="Here's my question")
print(result)

I got some input
I got a query
-> ready=None 
Not enough events yet
-> ready=None 
Not enough events yet
-> ready=[QueryEvent(query="Here's my question"), InputEvent(input="Here's some input"), SetupEvent(error=False)] 
Now I have all the events
[QueryEvent(query="Here's my question"), InputEvent(input="Here's some input"), SetupEvent(error=False)]
Ran query 'Here's my question' on input 'Here's some input'


In [184]:
file_name = base_assets_path + "collect_workflow.html"
draw_all_possible_flows(CollectExampleFlow, filename=file_name)
display(IFrame(file_name, width=800, height=600))

assets/collect_workflow.html
