# LongRAG Workflow

In [1]:
from dotenv import load_dotenv
load_dotenv()

True

### Import RagaAI Catalyst components for tracing

In [2]:
from ragaai_catalyst.tracers import Tracer
from ragaai_catalyst import RagaAICatalyst, init_tracing
import os

catalyst = RagaAICatalyst(
    access_key=os.getenv("RAGAAI_CATALYST_ACCESS_KEY"),
    secret_key=os.getenv("RAGAAI_CATALYST_SECRET_KEY"),
    base_url=os.getenv("RAGAAI_CATALYST_BASE_URL"),
)

# Initialize tracer
tracer = Tracer(
    project_name="llamaindex_tracing_examples",
    dataset_name="Long_rag",
    tracer_type="Agentic",
)

init_tracing(catalyst=catalyst, tracer=tracer)

INFO:httpx:HTTP Request: GET https://raw.githubusercontent.com/BerriAI/litellm/main/model_prices_and_context_window.json "HTTP/1.1 200 OK"


Token(s) set successfully


In [3]:
from typing import List, Dict, Optional, Set, FrozenSet

from llama_index.core.schema import BaseNode, TextNode
from llama_index.core.node_parser import SentenceSplitter

DEFAULT_CHUNK_SIZE = 4096  
DEFAULT_MAX_GROUP_SIZE = 20  
DEFAULT_SMALL_CHUNK_SIZE = 512 
DEFAULT_TOP_K = 8  


def split_doc(chunk_size: int, documents: List[BaseNode]) -> List[TextNode]:
    text_parser = SentenceSplitter(chunk_size=chunk_size)
    return text_parser.get_nodes_from_documents(documents)


def group_docs(
    nodes: List[str],
    adj: Dict[str, List[str]],
    max_group_size: Optional[int] = DEFAULT_MAX_GROUP_SIZE,
) -> Set[FrozenSet[str]]:
    docs = sorted(nodes, key=lambda node: len(adj[node]))
    groups = set()  
    for d in docs:
        related_groups = set()
        for r in adj[d]:
            for g in groups:
                if r in g:
                    related_groups = related_groups.union(frozenset([g]))

        gnew = {d}
        related_groupsl = sorted(related_groups, key=lambda el: len(el))
        for g in related_groupsl:
            if max_group_size is None or len(gnew) + len(g) <= max_group_size:
                gnew = gnew.union(g)
                if g in groups:
                    groups.remove(g)

        groups.add(frozenset(gnew))

    return groups


def get_grouped_docs(
    nodes: List[TextNode],
    max_group_size: Optional[int] = DEFAULT_MAX_GROUP_SIZE,
) -> List[TextNode]:

    nodes_str = [node.id_ for node in nodes]
    adj: Dict[str, List[str]] = {
        node.id_: [val.node_id for val in node.relationships.values()]
        for node in nodes
    }
    nodes_dict = {node.id_: node for node in nodes}

    res = group_docs(nodes_str, adj, max_group_size)

    ret_nodes = []
    for g in res:
        cur_node = TextNode()

        for node_id in g:
            cur_node.text += nodes_dict[node_id].text + "\n\n"
            cur_node.metadata.update(nodes_dict[node_id].metadata)

        ret_nodes.append(cur_node)

    return ret_nodes

In [4]:
from llama_index.core.retrievers import BaseRetriever
from llama_index.core.vector_stores.simple import BasePydanticVectorStore
from llama_index.core.schema import QueryBundle, NodeWithScore
from llama_index.core.vector_stores.types import VectorStoreQuery
from llama_index.core.settings import Settings


class LongRAGRetriever(BaseRetriever):

    def __init__(
        self,
        grouped_nodes: List[TextNode],
        small_toks: List[TextNode],
        vector_store: BasePydanticVectorStore,
        similarity_top_k: int = DEFAULT_TOP_K,
    ) -> None:

        self._grouped_nodes = grouped_nodes
        self._grouped_nodes_dict = {node.id_: node for node in grouped_nodes}
        self._small_toks = small_toks
        self._small_toks_dict = {node.id_: node for node in self._small_toks}

        self._similarity_top_k = similarity_top_k
        self._vec_store = vector_store
        self._embed_model = Settings.embed_model

    def _retrieve(self, query_bundle: QueryBundle) -> List[NodeWithScore]:

        query_embedding = self._embed_model.get_query_embedding(
            query_bundle.query_str
        )
        vector_store_query = VectorStoreQuery(
            query_embedding=query_embedding, similarity_top_k=500
        )

        query_res = self._vec_store.query(vector_store_query)

        top_parents_set: Set[str] = set()
        top_parents: List[NodeWithScore] = []
        for id_, similarity in zip(query_res.ids, query_res.similarities):
            cur_node = self._small_toks_dict[id_]
            parent_id = cur_node.ref_doc_id
            if parent_id not in top_parents_set:
                top_parents_set.add(parent_id)

                parent_node = self._grouped_nodes_dict[parent_id]
                node_with_score = NodeWithScore(
                    node=parent_node, score=similarity
                )
                top_parents.append(node_with_score)

                if len(top_parents_set) >= self._similarity_top_k:
                    break

        assert len(top_parents) == min(
            self._similarity_top_k, len(self._grouped_nodes)
        )

        return top_parents

In [5]:
from typing import Iterable, Any

from llama_index.core import VectorStoreIndex
from llama_index.core.llms import LLM
from llama_index.core.workflow import Event


class LoadNodeEvent(Event):
    small_nodes: Iterable[TextNode]
    grouped_nodes: list[TextNode]
    index: VectorStoreIndex
    similarity_top_k: int
    llm: LLM

In [6]:
from llama_index.core.workflow import (
    Workflow,
    step,
    StartEvent,
    StopEvent,
    Context,
)
from llama_index.core import SimpleDirectoryReader
from llama_index.core.query_engine import RetrieverQueryEngine


class LongRAGWorkflow(Workflow):

    @step
    async def ingest(self, ev: StartEvent) -> LoadNodeEvent | None:

        data_dir: str = ev.get("data_dir")
        llm: LLM = ev.get("llm")
        chunk_size: int | None = ev.get("chunk_size")
        similarity_top_k: int = ev.get("similarity_top_k")
        small_chunk_size: int = ev.get("small_chunk_size")
        index: VectorStoreIndex | None = ev.get("index")
        index_kwargs: dict[str, Any] | None = ev.get("index_kwargs")

        if any(
            i is None
            for i in [data_dir, llm, similarity_top_k, small_chunk_size]
        ):
            return None

        if not index:
            docs = SimpleDirectoryReader(data_dir).load_data()
            if chunk_size is not None:
                nodes = split_doc(
                    chunk_size, docs
                )  
                grouped_nodes = get_grouped_docs(
                    nodes
                )  
            else:
                grouped_nodes = docs

            small_nodes = split_doc(small_chunk_size, grouped_nodes)

            index_kwargs = index_kwargs or {}
            index = VectorStoreIndex(small_nodes, **index_kwargs)
        else:
            small_nodes = index.docstore.docs.values()
            grouped_nodes = get_grouped_docs(small_nodes, None)

        return LoadNodeEvent(
            small_nodes=small_nodes,
            grouped_nodes=grouped_nodes,
            index=index,
            similarity_top_k=similarity_top_k,
            llm=llm,
        )

    @step
    async def make_query_engine(
        self, ctx: Context, ev: LoadNodeEvent
    ) -> StopEvent:
        retriever = LongRAGRetriever(
            grouped_nodes=ev.grouped_nodes,
            small_toks=ev.small_nodes,
            similarity_top_k=ev.similarity_top_k,
            vector_store=ev.index.vector_store,
        )
        query_eng = RetrieverQueryEngine.from_args(retriever, ev.llm)

        return StopEvent(
            result={
                "retriever": retriever,
                "query_engine": query_eng,
                "index": ev.index,
            }
        )

    @step
    async def query(self, ctx: Context, ev: StartEvent) -> StopEvent | None:
        query_str: str | None = ev.get("query_str")
        query_eng = ev.get("query_eng")

        if query_str is None:
            return None

        result = query_eng.query(query_str)
        return StopEvent(result=result)

In [7]:
from llama_index.llms.openai import OpenAI
from IPython.display import display, Markdown

In [8]:
wf = LongRAGWorkflow(timeout=60)
llm = OpenAI("gpt-4o-mini")
data_dir = "data"

result = await wf.run(
data_dir=data_dir,
llm=llm,
chunk_size=DEFAULT_CHUNK_SIZE,
similarity_top_k=DEFAULT_TOP_K,
small_chunk_size=DEFAULT_SMALL_CHUNK_SIZE)

INFO:httpx:HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"


### Run the workflow with RagaAI Catalyst

In [9]:
with tracer:
    res = await wf.run(
        query_str="what is IR?",
        query_eng=result["query_engine"],
    )
    display(Markdown(str(res)))



INFO:httpx:HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"


IR, or Indistinguishability Rate, is a metric proposed to measure the quality of individual entries in generated datasets. It assesses how often a strong language model fails to identify a synthetically generated entry as being of lower quality when combined with a few real dataset entries. A high IR indicates that the generated data closely matches the properties of real-world data, suggesting that the discriminator is randomly guessing, while a low IR indicates that the generated data is out-of-distribution from real-world data.

INFO:ragaai_catalyst.tracers.agentic_tracing.utils.zip_list_of_unique_files: Zip file created successfully.
INFO:ragaai_catalyst.tracers.agentic_tracing.tracers.base: Traces saved successfully.


Uploading agentic traces...
Code already exists
