# RAG Workflow with Reranking  

This notebook walks through setting up a `Workflow` to perform basic RAG with reranking.

## Designing the Workflow  

RAG + Reranking consists of some clearly defined steps  

1. Indexing data, creating an index  
2. Using that index + a query to retrieve relevant text chunks  
3. Rerank the text retrieved text chunks using the original query  
4. Synthesizing a final response  

With this in mind, we can create events and workflow steps to follow this process!

## The Workflow Events  

To handle these steps, we need to define a few events:  
1. An event to pass retrieved nodes to the reranker  
2. An event to pass reranked nodes to the synthesizer  

The other steps will use the built-in `StartEvent` and `StopEvent` events.

In [1]:
from llama_index.core.workflow import Event
from llama_index.core.schema import NodeWithScore

class RetrieverEvent(Event):
    """Result of running retrieval"""

    nodes: list[NodeWithScore]


class RerankEvent(Event):
    """Result of running reranking on retrieved nodes"""

    nodes: list[NodeWithScore]

## The Workflow Itself  

With our events defined, we can construct our workflow and steps.  
Note that the workflow automatically validates itself using type annotations, so the type annotations on our steps are very helpful!

In [2]:
import os
os.environ["CUDA_VISIBLE_DEVICES"]="0,1"
from llama_index.llms.huggingface import HuggingFaceLLM
from llama_index.embeddings.huggingface import HuggingFaceEmbedding

llm = HuggingFaceLLM(
    model_name="/media/user/datadisk2/LLM_models/Qwen2.5-14B-Instruct-1M",
    tokenizer_name="/media/user/datadisk2/LLM_models/Qwen2.5-14B-Instruct-1M",
    device_map="auto",
    max_new_tokens=2048
)

embed_model=HuggingFaceEmbedding(
    model_name="/media/user/datadisk2/Embedding_models/all-MiniLM-L6-v2",
    device="cuda",
    target_devices="cuda:0"
)

  from .autonotebook import tqdm as notebook_tqdm
Loading checkpoint shards: 100%|██████████| 8/8 [00:18<00:00,  2.26s/it]


In [3]:
!mkdir -p data
!wget --user-agent "Mozilla" "https://arxiv.org/pdf/2307.09288.pdf" -O "data/llama2.pdf"

--2025-07-07 16:37:21--  https://arxiv.org/pdf/2307.09288.pdf
arxiv.org (arxiv.org) 해석 중... 151.101.131.42, 151.101.3.42, 151.101.67.42, ...
다음으로 연결 중: arxiv.org (arxiv.org)|151.101.131.42|:443... 연결했습니다.
HTTP 요청을 보냈습니다. 응답 기다리는 중... 301 Moved Permanently
위치: http://arxiv.org/pdf/2307.09288 [따라감]
--2025-07-07 16:37:21--  http://arxiv.org/pdf/2307.09288
다음으로 연결 중: arxiv.org (arxiv.org)|151.101.131.42|:80... 연결했습니다.
HTTP 요청을 보냈습니다. 응답 기다리는 중... 200 OK
길이: 13661300 (13M) [application/pdf]
저장 위치: `data/llama2.pdf'


2025-07-07 16:37:23 (10.5 MB/s) - `data/llama2.pdf' 저장함 [13661300/13661300]



In [4]:
from llama_index.core import SimpleDirectoryReader, VectorStoreIndex
from llama_index.core.response_synthesizers import CompactAndRefine
from llama_index.core.postprocessor.llm_rerank import LLMRerank
from llama_index.core.workflow import (
    Context,
    Workflow,
    StartEvent,
    StopEvent,
    step,
)



class RAGWorkflow(Workflow):
    @step
    async def ingest(self, ctx: Context, ev: StartEvent) -> StopEvent | None:
        """Entry point to ingest a document, triggered by a StartEvent with `dirname`."""
        dirname = ev.get("dirname")
        if not dirname:
            return None

        documents = SimpleDirectoryReader(dirname).load_data()
        index = VectorStoreIndex.from_documents(
            documents=documents,
            embed_model=embed_model,
        )
        return StopEvent(result=index)

    @step
    async def retrieve(
        self, ctx: Context, ev: StartEvent
    ) -> RetrieverEvent | None:
        "Entry point for RAG, triggered by a StartEvent with `query`."
        query = ev.get("query")
        index = ev.get("index")

        if not query:
            return None

        print(f"Query the database with: {query}")

        # store the query in the global context
        await ctx.store.set("query", query)

        # get the index from the global context
        if index is None:
            print("Index is empty, load some documents before querying!")
            return None

        retriever = index.as_retriever(similarity_top_k=2)
        nodes = await retriever.aretrieve(query)
        print(f"Retrieved {len(nodes)} nodes.")
        return RetrieverEvent(nodes=nodes)

    @step
    async def rerank(self, ctx: Context, ev: RetrieverEvent) -> RerankEvent:
        # Rerank the nodes
        ranker = LLMRerank(
            choice_batch_size=5, top_n=3, llm=llm
        )
        print(await ctx.store.get("query", default=None), flush=True)
        new_nodes = ranker.postprocess_nodes(
            ev.nodes, query_str=await ctx.store.get("query", default=None)
        )
        print(f"Reranked nodes to {len(new_nodes)}")
        return RerankEvent(nodes=new_nodes)

    @step
    async def synthesize(self, ctx: Context, ev: RerankEvent) -> StopEvent:
        """Return a streaming response using reranked nodes."""
        summarizer = CompactAndRefine(
            llm=llm, 
            streaming=True, 
            verbose=True,
        )
        query = await ctx.store.get("query", default=None)

        response = await summarizer.asynthesize(query, nodes=ev.nodes)
        return StopEvent(result=response)

And thats it! Let's explore the workflow we wrote a bit.  

- We have two entry points (the steps that accept `StartEvent`)  
- The steps themselves decide when they can run  
- The workflow context is used to store the user query  
- The nodes are passed around, and finally a streaming response is returned

## Run the Workflow!

In [5]:
w = RAGWorkflow()

# Ingest the documents
index = await w.run(dirname="data")

In [6]:
# Run a query
result = await w.run(query="How was Llama2 trained?", index=index)
async for chunk in result.async_response_gen():
    print(chunk, end="", flush=True)

Query the database with: How was Llama2 trained?
Retrieved 2 nodes.
How was Llama2 trained?
Reranked nodes to 2
 Llama 2 was trained using custom training libraries, Meta's Research Super Cluster, and production clusters for pretraining. Fine-tuning, annotation, and evaluation were performed on third-party cloud compute. Specifically, the pretraining utilized a cumulative 3.3M GPU hours of computation on A100-80GB hardware (with a TDP of 350-400W), resulting in estimated total emissions of 539 tCO2eq, all of which were offset by Meta's sustainability program. The pretraining data consisted of 2 trillion tokens from publicly available sources, with a cutoff of September 2022, while the fine-tuning data included publicly available instruction datasets and over one million new human-annotated examples.

참고  
- [RAG Reranking](https://docs.llamaindex.ai/en/stable/examples/workflow/rag/)  
- [LLM Reranker Demonstration (Great Gatsby)](https://docs.llamaindex.ai/en/stable/examples/node_postprocessor/LLMReranker-Gatsby/)  
- [LLM Reranker Demonstration (2021 Lyft 10-k)](https://docs.llamaindex.ai/en/stable/examples/node_postprocessor/LLMReranker-Lyft-10k/)  
- [Structured LLM Reranker Demonstration (2021 Lyft 10-k)](https://docs.llamaindex.ai/en/stable/examples/node_postprocessor/Structured-LLMReranker-Lyft-10k/)