# Workflows in LlamaIndex
A Workflow in LlamaIndex is an event-driven framework that allows you to chain together different computational steps to achieve complex tasks, such as building a retrieval-augmented generation (RAG) system. The workflow is composed of steps, where each step handles specific types of events and can emit new events.

Workflows are designed to be asynchronous and event-driven, meaning that each step only runs when the appropriate event is triggered. This allows for the creation of complex, multi-step processes that can be easily managed and monitored.

In [None]:
!pip install azure-identity
!pip install azure-search-documents==11.4.0
!pip install -U llama-index
!pip install llama-index-embeddings-azure-openai
!pip install llama-index-llms-azure-openai
!pip install llama-index-vector-stores-azureaisearch
!pip install nest-asyncio
!pip install python-dotenv

# Key Components of a Workflow
## Events
Events are the fundamental objects that are passed between the steps of a workflow. Events can carry data and signal to other steps that certain actions need to be taken. There are special events like `StartEvent` and `StopEvent`, and you can also define custom events to carry specific types of data.

In the example provided, two custom events are defined:

In [13]:
import os
from dotenv import load_dotenv
from azure.core.credentials import AzureKeyCredential
from azure.search.documents import SearchClient
from azure.search.documents.indexes import SearchIndexClient
from llama_index.core import StorageContext, VectorStoreIndex
from llama_index.embeddings.azure_openai import AzureOpenAIEmbedding
from llama_index.llms.azure_openai import AzureOpenAI
from llama_index.vector_stores.azureaisearch import AzureAISearchVectorStore, IndexManagement
from llama_index.core.callbacks import LlamaDebugHandler
from llama_index.vector_stores.azureaisearch import (
    IndexManagement,
    MetadataIndexFieldType,
    
)
from llama_index.core import SimpleDirectoryReader, VectorStoreIndex
from llama_index.core.response_synthesizers import CompactAndRefine
from llama_index.core.postprocessor.llm_rerank import LLMRerank
from llama_index.core.workflow.context import Context
from llama_index.core.workflow.decorators import step
from llama_index.core.workflow.drawing import (
    draw_all_possible_flows,
    draw_most_recent_execution,
)
from llama_index.core.workflow.errors import (
    WorkflowRuntimeError,
    WorkflowTimeoutError,
    WorkflowValidationError,
)
from llama_index.core.workflow.events import Event, StartEvent, StopEvent
from llama_index.core.workflow.workflow import Workflow
from llama_index.core.workflow.context import Context

from llama_index.llms.openai import OpenAI
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.schema import MetadataMode
from llama_index.core.response.notebook_utils import display_response
from llama_index.core import get_response_synthesizer
import pprint

# Load environment variables
load_dotenv()

# Environment Variables
AZURE_OPENAI_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT")
AZURE_OPENAI_API_KEY = os.getenv("AZURE_OPENAI_API_KEY")
AZURE_OPENAI_CHAT_COMPLETION_DEPLOYED_MODEL_NAME = os.getenv("AZURE_OPENAI_CHAT_COMPLETION_DEPLOYED_MODEL_NAME") # I'm using GPT-3.5-turbo
AZURE_OPENAI_EMBEDDING_DEPLOYED_MODEL_NAME = os.getenv("AZURE_OPENAI_EMBEDDING_DEPLOYED_MODEL_NAME") # I'm using text-embedding-ada-002
SEARCH_SERVICE_ENDPOINT = os.getenv("AZURE_SEARCH_SERVICE_ENDPOINT")
SEARCH_SERVICE_API_KEY = os.getenv("AZURE_SEARCH_ADMIN_KEY")
INDEX_NAME = "llamindex-workflow-demo"

# Initialize Azure OpenAI and embedding models
llm = AzureOpenAI(
    model=AZURE_OPENAI_CHAT_COMPLETION_DEPLOYED_MODEL_NAME,
    deployment_name=AZURE_OPENAI_CHAT_COMPLETION_DEPLOYED_MODEL_NAME,
    api_key=AZURE_OPENAI_API_KEY,
    azure_endpoint=AZURE_OPENAI_ENDPOINT,
    api_version="2024-02-01"
)

embed_model = AzureOpenAIEmbedding(
    model=AZURE_OPENAI_EMBEDDING_DEPLOYED_MODEL_NAME,
    deployment_name=AZURE_OPENAI_EMBEDDING_DEPLOYED_MODEL_NAME,
    api_key=AZURE_OPENAI_API_KEY,
    azure_endpoint=AZURE_OPENAI_ENDPOINT,
    api_version="2024-02-01"
)

# Initialize search clients
credential = AzureKeyCredential(SEARCH_SERVICE_API_KEY)
index_client = SearchIndexClient(endpoint=SEARCH_SERVICE_ENDPOINT, credential=credential)
search_client = SearchClient(endpoint=SEARCH_SERVICE_ENDPOINT, index_name=INDEX_NAME, credential=credential)


In [3]:
vector_store = AzureAISearchVectorStore(
    search_or_index_client=index_client,
    # filterable_metadata_field_keys=metadata_fields,
    index_name=INDEX_NAME,
    index_management=IndexManagement.CREATE_IF_NOT_EXISTS, # use VALIDATE to validate the index schema if using one that already exists
    id_field_key="id",
    chunk_field_key="chunk",
    embedding_field_key="embedding",
    embedding_dimensionality=1536,  # Ensure this matches your embedding model
    metadata_string_field_key="metadata",
    doc_id_field_key="doc_id",
    language_analyzer="en.lucene",
    vector_algorithm_type="exhaustiveKnn",
)

## Use Existing Index
If you have an existing index, use this code:

In [None]:
storage_context = StorageContext.from_defaults(vector_store=vector_store)
index = VectorStoreIndex.from_documents(
    [],
    storage_context=storage_context,
)

# Key Components of a Workflow
## Events
Events are the fundamental objects that are passed between the steps of a workflow. Events can carry data and signal to other steps that certain actions need to be taken. There are special events like `StartEvent` and `StopEvent`, and you can also define custom events to carry specific types of data.

In the example provided, two custom events are defined:

In [4]:
from llama_index.core.workflow import Event
from llama_index.core.schema import NodeWithScore


class RetrieverEvent(Event):
    """Result of running retrieval"""

    nodes: list[NodeWithScore]


class RerankEvent(Event):
    """Result of running reranking on retrieved nodes"""

    nodes: list[NodeWithScore]

- RetrieverEvent: This event carries the nodes retrieved from the vector store.
- RerankEvent: This event carries the nodes after they have been reranked.

## Steps
Steps are individual units of work within a workflow. Each step is defined as an asynchronous function and decorated with @step(). The decorator automatically handles the input and output types for validation and ensures that each step runs only when the appropriate event is ready.

In the provided workflow, steps are defined to ingest documents, retrieve relevant information, rerank the retrieved nodes, and synthesize a final response:

In [6]:
class RAGWorkflow(Workflow):
    @step(pass_context=True)
    async def ingest(self, ctx: Context, ev: StartEvent) -> StopEvent | None:
        """Entry point to ingest a document, triggered by a StartEvent with `dirname`."""
        dirname = ev.get("dirname")
        if not dirname:
            return None

        documents = SimpleDirectoryReader(dirname).load_data()
        storage_context = StorageContext.from_defaults(vector_store=vector_store)
        ctx.data["index"] = VectorStoreIndex.from_documents(
            documents=documents,
            embed_model=embed_model,
            storage_context=storage_context,
        )
        return StopEvent(result=f"Indexed {len(documents)} documents.")


- **Ingest Step** The ingest step handles the StartEvent to load and index documents from a specified directory. It uses the SimpleDirectoryReader to load the documents and then creates a VectorStoreIndex using these documents and the embedding model. This index is stored in the workflow's context for later retrieval.

In [7]:
@step(pass_context=True)
async def retrieve(self, ctx: Context, ev: StartEvent) -> RetrieverEvent | None:
    "Entry point for RAG, triggered by a StartEvent with `query`."
    query = ev.get("query")
    if not query:
        return None

    print(f"Query the database with: {query}")

    # store the query in the global context
    ctx.data["query"] = query

    # get the index from the global context
    index = ctx.data.get("index")
    if index is None:
        print("Index is empty, load some documents before querying!")
        return None

    retriever = index.as_retriever(similarity_top_k=2)
    nodes = retriever.retrieve(query)
    print(f"Retrieved {len(nodes)} nodes.")
    return RetrieverEvent(nodes=nodes)

WorkflowValidationError: To decorate retrieve please pass a workflow instance to the @step() decorator.

- **Retrieve Step**: The retrieve step is triggered by another StartEvent that contains a query. It retrieves relevant nodes from the indexed documents using a retriever configured with a similarity search (similarity_top_k=2). The retrieved nodes are then passed along in a RetrieverEvent.

In [None]:
@step(pass_context=True)
async def rerank(self, ctx: Context, ev: RetrieverEvent) -> RerankEvent:
    # Rerank the nodes
    ranker = LLMRerank(choice_batch_size=5, top_n=3, llm=llm)
    print(ctx.data.get("query"), flush=True)
    new_nodes = ranker.postprocess_nodes(ev.nodes, query_str=ctx.data.get("query"))
    print(f"Reranked nodes to {len(new_nodes)}")
    return RerankEvent(nodes=new_nodes)

Rerank Step: The rerank step takes the nodes from the RetrieverEvent and reranks them using a language model (LLM). The reranked nodes are then passed along in a RerankEvent.

In [None]:
@step(pass_context=True)
async def synthesize(self, ctx: Context, ev: RerankEvent) -> StopEvent:
    """Return a streaming response using reranked nodes."""
    summarizer = CompactAndRefine(llm=llm, streaming=True, verbose=True)
    query = ctx.data.get("query")

    response = await summarizer.asynthesize(query, nodes=ev.nodes)
    return StopEvent(result=response)

Synthesize Step: The synthesize step generates a final response by synthesizing the reranked nodes into a coherent answer using another LLM. The result is then returned in a StopEvent, which stops the workflow.

### Context
The **Context** object allows steps in the workflow to share data. In the example, the context is used to store the index created in the ingest step and the query provided in the retrieve step. This context ensures that the necessary data is available throughout the workflow's execution.

## Workflow Execution
Once the workflow steps are defined, the workflow can be executed by creating an instance of the workflow and calling its run method. The run method is asynchronous and must be awaited. Each step in the workflow is executed in sequence, with data being passed between them using events.

In [19]:
# Initialize Workflow
w = RAGWorkflow()

# Ingest the documents (example with a directory 'data')
await w.run(dirname="data/txt")

# Run a query
result = await w.run(query="How was Llama2 trained?")
async for chunk in result.async_response_gen():
    print(chunk, end="", flush=True)


In this case:
- **Ingestion:** The workflow is first run with a directory name to ingest documents into the vector index.
- **Query Execution**: The workflow is then run again with a query to retrieve, rerank, and synthesize a response from the ingested documents.



## Conclusion
This workflow is a powerful abstraction that allows you to chain together complex tasks, such as retrieval-augmented generation, in a structured and manageable way. By leveraging custom events, context, and steps, you can build workflows that are both flexible and scalable, all while maintaining clear and concise code structure.

In [21]:
# Define RAG Workflow with Reranking
class RAGWorkflow(Workflow):
    @step(pass_context=True)
    async def ingest(self, ctx: Context, ev: StartEvent) -> StopEvent | None:
        """Entry point to ingest a document, triggered by a StartEvent with `dirname`."""
        dirname = ev.get("dirname")
        if not dirname:
            return None

        documents = SimpleDirectoryReader(dirname).load_data()
        storage_context = StorageContext.from_defaults(vector_store=vector_store)
        ctx.data["index"] = VectorStoreIndex.from_documents(
            documents=documents,
            embed_model=embed_model,
            storage_context=storage_context,
        )
        return StopEvent(result=f"Indexed {len(documents)} documents.")

    @step(pass_context=True)
    async def retrieve(
        self, ctx: Context, ev: StartEvent
    ) -> RetrieverEvent | None:
        "Entry point for RAG, triggered by a StartEvent with `query`."
        query = ev.get("query")
        if not query:
            return None

        print(f"Query the database with: {query}")

        # store the query in the global context
        ctx.data["query"] = query

        # get the index from the global context
        index = ctx.data.get("index")
        if index is None:
            print("Index is empty, load some documents before querying!")
            return None

        retriever = index.as_retriever(similarity_top_k=10)
        nodes = retriever.retrieve(query)
        print(f"Retrieved {len(nodes)} nodes.")
        return RetrieverEvent(nodes=nodes)

    @step(pass_context=True)
    async def rerank(self, ctx: Context, ev: RetrieverEvent) -> RerankEvent:
        # Rerank the nodes
        ranker = LLMRerank(
            choice_batch_size=5, top_n=3, llm=llm
        )
        print(ctx.data.get("query"), flush=True)
        new_nodes = ranker.postprocess_nodes(
            ev.nodes, query_str=ctx.data.get("query")
        )
        print(f"Reranked nodes to {len(new_nodes)}")
        return RerankEvent(nodes=new_nodes)

    @step(pass_context=True)
    async def synthesize(self, ctx: Context, ev: RerankEvent) -> StopEvent:
        """Return a streaming response using reranked nodes."""
        summarizer = CompactAndRefine(llm=llm, streaming=True, verbose=True)
        query = ctx.data.get("query")

        response = await summarizer.asynthesize(query, nodes=ev.nodes)
        return StopEvent(result=response)

# Initialize Workflow
w = RAGWorkflow()

# Ingest the document (example with a specific file 'data/txt/state_of_the_union.txt')
await w.run(dirname="data/txt")

# Run a query
result = await w.run(query="does the president have a plan for covid-19?")

# Function to display custom response
def display_custom_response(response):
    print("=== GPT-4o-Generated Response ===")
    display_response(response)
    print("\n=== Details of Source Documents ===\n")
    for node in response.source_nodes:
        print(node.get_content(metadata_mode=MetadataMode.LLM))
        print("-" * 40 + "\n")

# Await and collect the full response
final_response = ""
async for chunk in result.async_response_gen():
    final_response += chunk

# Create a mock response object for display
class MockResponse:
    def __init__(self, response, source_nodes):
        self.response = response
        self.source_nodes = source_nodes

mock_response = MockResponse(final_response, result.source_nodes)

# Display the response using the custom display function
display_custom_response(mock_response)


Query the database with: does the president have a plan for covid-19?
Retrieved 1 nodes.
does the president have a plan for covid-19?
Reranked nodes to 1
=== GPT-4o-Generated Response ===


**`Final Response:`** Yes, the president has outlined a plan for COVID-19, which includes steps to stay protected with vaccines and treatments, prepare for new variants, end the shutdown of schools and businesses, and continue vaccinating the world.


=== Details of Source Documents ===

file_path: c:\Dev\azure-ai-search-python-playground\data\txt\state_of_the_union.txt

Because of the progress we’ve made, because of your resilience and the tools we have, tonight I can say  
we are moving forward safely, back to more normal routines.  

We’ve reached a new moment in the fight against COVID-19, with severe cases down to a level not seen since last July.  

Just a few days ago, the Centers for Disease Control and Prevention—the CDC—issued new mask guidelines. 

Under these new guidelines, most Americans in most of the country can now be mask free.   

And based on the projections, more of the country will reach that point across the next couple of weeks. 

Thanks to the progress we have made this past year, COVID-19 need no longer control our lives.  

I know some are talking about “living with COVID-19”. Tonight – I say that we will never just accept living with COVID-19. 

We will continue to combat the virus as we do other disease