### LlamaIndex workflows
Abstractions for building complex agentic workflows

In [6]:
!pip install --upgrade llama-index-core llama-index-llms-openai llama-index-utils-workflow python-dotenv

Collecting python-dotenv
  Using cached python_dotenv-1.0.1-py3-none-any.whl (19 kB)
Installing collected packages: python-dotenv
Successfully installed python-dotenv-1.0.1
You should consider upgrading via the '/Users/asthapuri/PycharmProjects/llamaindex_workflow/venv/bin/python -m pip install --upgrade pip' command.[0m


In [4]:
import os
from llama_index.core.workflow import (
    Event,
    StartEvent,
    StopEvent,
    Workflow,
    step,
    Context
)
import random
from llama_index.utils.workflow import draw_all_possible_flows
from llama_index.utils.workflow import draw_most_recent_execution
from llama_index.llms.openai import OpenAI

In [7]:
from llama_index.llms.openai import OpenAI
from dotenv import load_dotenv
load_dotenv()
os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY")

# Most workflows are defined as a class. It inherits from the Workflow class
# This is a very basic workflow. It starts, does one thing and then stops
class OpenAIGenerator(Workflow):
    # We can have as many steps as we want
    @step()
    async def generate(self, ev: StartEvent) -> StopEvent:
        llm = OpenAI(model="gpt-4o")
        response = await llm.acomplete(ev.query)
        return StopEvent(result=str(response))

w = OpenAIGenerator(timeout=60, verbose=False)
result = await w.run(query="What's LlamaIndex?")
print(result)

LlamaIndex, previously known as GPT Index, is a data framework designed to facilitate the connection between large language models (LLMs) and external data sources. It provides a suite of tools that allow users to ingest, structure, and query data from various sources such as documents, databases, and APIs. The framework is particularly useful for applications that require the integration of LLMs with specific datasets, enabling more efficient and contextually relevant responses.

Key features of LlamaIndex include:

1. **Data Connectors**: These allow for the ingestion of data from multiple sources, making it easier to gather and preprocess information.
2. **Indices**: LlamaIndex offers various types of indices to structure and organize the ingested data, which can then be used to optimize querying.
3. **Query Interface**: This feature enables users to perform complex queries on the structured data, leveraging the capabilities of LLMs to provide insightful and accurate answers.

Overa

In [8]:
draw_all_possible_flows(OpenAIGenerator, filename="trivial_workflow.html")

trivial_workflow.html


Lets make a little more complex workflow

In [9]:
# custom event
class FailedEvent(Event):
    error: str

# custom event
class QueryEvent(Event):
    query: str

class LoopExampleFlow(Workflow):

    # This can take a start event or a query event and emit a failed event or stop event
    @step()
    async def answer_query(self, ev: StartEvent | QueryEvent ) -> FailedEvent | StopEvent:
        query = ev.query
        # try to answer the query
        random_number = random.randint(0, 1)
        if (random_number == 0):
            return FailedEvent(error="Failed to answer the query.")
        else:
            return StopEvent(result="The answer to your query")

    @step()
    async def improve_query(self, ev: FailedEvent) -> QueryEvent | StopEvent:
        # improve the query or decide it can't be fixed
        random_number = random.randint(0, 1)
        if (random_number == 0):
            # llm will improve your query incase of a failed event
            return QueryEvent(query="Here's a better query.")
        else:
            return StopEvent(result="Your query can't be fixed.")

In [10]:
draw_all_possible_flows(LoopExampleFlow, filename="loop_workflow.html")

loop_workflow.html


In [12]:
# This time, setting verbose as True shows us all the steps
l = LoopExampleFlow(timeout=10, verbose=True)
result = await l.run(query="What's LlamaIndex?")
print(result)

Running step answer_query
Step answer_query produced event StopEvent
The answer to your query


In [13]:
# There is a global state which allows you to keep arbitrary data or functions around for use by all event handlers.
class GlobalExampleFlow(Workflow):

    # variable of type context will get passed to the function
    @step(pass_context=True)
    async def setup(self, ctx: Context, ev: StartEvent) -> QueryEvent:
        # load our data here. Attach arbitrary data to context.
        ctx.data["some_database"] = ["value1","value2","value3"]

        return QueryEvent(query=ev.query)

    @step(pass_context=True)
    async def query(self, ctx: Context, ev: QueryEvent) -> StopEvent:
        # use our data with our query
        data = ctx.data["some_database"]

        result = f"The answer to your query is {data[1]}"
        return StopEvent(result=result)

In [14]:
g = GlobalExampleFlow(timeout=10, verbose=True)
result = await g.run(query="What's LlamaIndex?")
print(result)

Running step setup
Step setup produced event QueryEvent
Running step query
Step query produced event StopEvent
The answer to your query is value2
