In [1]:
from llama_index.core import VectorStoreIndex
from llama_index.core.workflow import Event
from llama_index.core.schema import NodeWithScore

class IngestEvent(Event):
    index: VectorStoreIndex
    
class RetrieverEvent(Event):
    nodes: list[NodeWithScore]

class RerankEvent(Event):
    nodes: list[NodeWithScore]

In [2]:
from llama_index.core.response_synthesizers import CompactAndRefine
from llama_index.core.postprocessor.llm_rerank import LLMRerank
from llama_index.core.workflow import Context, Workflow, StartEvent, StopEvent, step

from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.llms.openai import OpenAI
from llama_index.readers.wikipedia import WikipediaReader

import wikipedia
from wikipedia import PageError

class RAGWorkflow(Workflow):
    @step
    async def ingest(self, ctx: Context, ev: StartEvent) -> IngestEvent | None:
        query = ev.get("query", None)
        if query is None:
            return None

        await ctx.set("query", query)

        pages = wikipedia.search(query, results=10)
        if not pages:
            return None

        wiki_loader = WikipediaReader()
        documents = []
        for page in pages:
            try:
                doc = wiki_loader.load_data([page], lang_prefix="en")
                documents.extend(doc)
            except PageError:
                print(f"Skipping “{page}” (PageError).")

        index = VectorStoreIndex.from_documents(
            documents=documents,
            embed_model=OpenAIEmbedding(model_name="text-embedding-3-small"),
        )

        return IngestEvent(index=index)

    @step
    async def retrieve(self, ctx: Context, ev: IngestEvent) -> RetrieverEvent | None:
        index = ev.index
        query = await ctx.get("query", None)
        if (query is None) or (index is None):
            return None

        retriever = index.as_retriever(similarity_top_k=5)
        nodes = await retriever.aretrieve(query)
        print(f"Retrieved {len(nodes)} nodes.")
        return RetrieverEvent(nodes=nodes)

    @step
    async def rerank(self, ctx: Context, ev: RetrieverEvent) -> RerankEvent:
        ranker = LLMRerank(
            choice_batch_size=5,
            top_n=3,
            llm=OpenAI(model="gpt-4o"),
        )
        query = await ctx.get("query", default=None)
        new_nodes = ranker.postprocess_nodes(ev.nodes, query_str=query)
        print(f"Reranked nodes to {len(new_nodes)}")
        return RerankEvent(nodes=new_nodes)

    @step
    async def synthesize(self, ctx: Context, ev: RerankEvent) -> StopEvent:
        llm = OpenAI(model="gpt-4o")
        summarizer = CompactAndRefine(llm=llm, streaming=True, verbose=True)
        query = await ctx.get("query", default=None)
        
        response = await summarizer.asynthesize(query, nodes=ev.nodes)
        return StopEvent(result=response)

In [3]:
w = RAGWorkflow(timeout=60, verbose=True)

result = await w.run(query="What is a transformer, and how is it used in large language models?")
async for chunk in result.async_response_gen():
    print(chunk, end="", flush=True)

Running step ingest
Skipping “GPT-4” (PageError).
Skipping “GPT-3” (PageError).
Skipping “GPT-2” (PageError).


Retrying llama_index.embeddings.openai.base.OpenAIEmbedding._get_text_embeddings.<locals>._retryable_get_embeddings in 1.0 seconds as it raised APIConnectionError: Connection error..


WorkflowRuntimeError: Error in step 'ingest': Connection error.

In [13]:
from llama_index.utils.workflow import draw_all_possible_flows

draw_all_possible_flows(RAGWorkflow, filename="wikipedia_rag_workflow.html")

wikipedia_rag_workflow.html
