# RAPTOR Implementation:
A Comprehensive Guide to Recursive Abstractive Processing for Tree Organized Retrieval

<a href="https://github.com/adithya-s-k/AI-Engineering.academy">
<img src="https://raw.githubusercontent.com/adithya-s-k/AI-Engineering.academy/main/assets/banner.png" width="50%">
</a>

Welcome to the RAPTOR Implementation guide! This notebook introduces you to Recursive Abstractive Processing for Tree Organized Retrieval, a novel indexing and retrieving technique specifically designed for long-context language models. We'll provide a step-by-step walkthrough of implementing a RAPTOR system, focusing on its application to long-context documents.

## Introduction

RAPTOR is an innovative approach that adapts a bottom-up strategy for indexing and retrieval of documents, particularly suited for long-context language models. By clustering and summarizing text segments recursively, RAPTOR creates a hierarchical tree structure that captures both high-level abstractions and detailed aspects of text. This technique is especially valuable for complex thematic queries and multi-step reasoning tasks in questioning and answering systems.

## Getting Started

To fully benefit from this notebook, you should have a good understanding of Python and be familiar with basic concepts of document indexing and retrieval. Don't worry if some advanced ideas are new to you – we'll guide you through each step of the RAPTOR process!

### Prerequisites

- Python 3.9+
- Jupyter Notebook or JupyterLab
- Familiarity with document indexing and retrieval concepts
- Understanding of vector embeddings and clustering algorithms
- Basic knowledge of natural language processing (NLP) concepts

## Notebook Contents

Our notebook is structured into the following main sections:

1. **Environment Set Up**: We'll guide you through setting up your Python environment with all the necessary libraries and dependencies for RAPTOR.

2. **Document Chunking**: Learn how to segment large documents into manageable chunks suitable for embedding and processing.

3. **Embedding Generation**: Understand how to generate embeddings for document chunks using various embedding models.

4. **Clustering Algorithm Implementation**: Dive into the process of clustering reduced-dimension embeddings using appropriate algorithms.

5. **Summarization Process**: Learn how to generate summaries for clustered chunks using large language models.

6. **Recursive Tree Construction**: Understand how to recursively build the hierarchical tree structure by repeating the clustering and summarization process.

7. **Retrieval Mechanism**: Explore how to perform efficient retrieval using the constructed tree structure.

8. **Integration with Long-Context LLMs**: Learn how to incorporate RAPTOR outputs into prompts for long-context language models.


By the end of this notebook, you'll have a deep understanding of RAPTOR and be able to implement this advanced technique for efficient document retrieval and utilization in long-context language model applications.

In [1]:
import nest_asyncio

nest_asyncio.apply()

## Set up environment

In [2]:
from typing import Any, Dict, List, Optional
import asyncio
from enum import Enum

from llama_index.core import (
    StorageContext,
    VectorStoreIndex,
    get_response_synthesizer,
    load_index_from_storage,
)
from llama_index.core.base.response.schema import Response
from llama_index.core.base.base_retriever import BaseRetriever, QueryType
from llama_index.core.bridge.pydantic import BaseModel, Field
from llama_index.core.embeddings import BaseEmbedding
from llama_index.core.ingestion import run_transformations
from llama_index.core.llama_pack.base import BaseLlamaPack
from llama_index.core.llms.llm import LLM
from llama_index.core.response_synthesizers import BaseSynthesizer
from llama_index.core.schema import (
    BaseNode,
    NodeWithScore,
    QueryBundle,
    TextNode,
    TransformComponent,
)
from llama_index.core.vector_stores.types import (
    MetadataFilter,
    MetadataFilters,
    BasePydanticVectorStore,
)
from llama_index.packs.raptor.clustering import get_clusters


  from .autonotebook import tqdm as notebook_tqdm


## Define prompt to summarize and the 2 query modes

In [3]:

DEFAULT_SUMMARY_PROMPT = (
    "Summarize the provided text, including as many key details as needed."
)


class QueryModes(str, Enum):
    """Query modes."""

    tree_traversal = "tree_traversal"
    collapsed = "collapsed"

## This class defines a `SummaryModule` that uses a response synthesizer to generate summaries for clusters of documents asynchronously.

In [4]:
class SummaryModule(BaseModel):
    response_synthesizer: BaseSynthesizer = Field(description="LLM")
    summary_prompt: str = Field(
        default=DEFAULT_SUMMARY_PROMPT,
        description="Summary prompt.",
    )
    num_workers: int = Field(
        default=4, description="Number of workers to generate summaries."
    )
    show_progress: bool = Field(default=True, description="Show progress.")

    class Config:
        arbitrary_types_allowed = True

    def __init__(
        self,
        llm: Optional[LLM] = None,
        summary_prompt: str = DEFAULT_SUMMARY_PROMPT,
        num_workers: int = 4,
    ) -> None:
        response_synthesizer = get_response_synthesizer(
            response_mode="tree_summarize", use_async=True, llm=llm
        )
        super().__init__(
            response_synthesizer=response_synthesizer,
            summary_prompt=summary_prompt,
            num_workers=num_workers,
        )

    async def generate_summaries(
        self, documents_per_cluster: List[List[BaseNode]]
    ) -> List[str]:
        """Generate summaries of documents per cluster.

        Args:
            documents_per_cluster (List[List[BaseNode]]): List of documents per cluster

        Returns:
            List[str]: List of summary for each cluster
        """
        jobs = []
        for documents in documents_per_cluster:
            with_scores = [NodeWithScore(node=doc, score=1.0) for doc in documents]
            jobs.append(
                self.response_synthesizer.asynthesize(self.summary_prompt, with_scores)
            )

        lock = asyncio.Semaphore(self.num_workers)
        responses = []

        # run the jobs while limiting the number of concurrent jobs to num_workers
        for job in jobs:
            async with lock:
                responses.append(await job)

        return [str(response) for response in responses]

## This class defines a `RaptorRetriever` that indexes and retrieves documents using a hierarchical tree structure. It supports both collapsed and tree traversal retrieval modes, and can generate summaries for clusters of documents.

In [5]:
class RaptorRetriever(BaseRetriever):
    """Raptor indexing retriever."""

    def __init__(
        self,
        documents: List[BaseNode],
        tree_depth: int = 3,
        similarity_top_k: int = 2,
        llm: Optional[LLM] = None,
        embed_model: Optional[BaseEmbedding] = None,
        vector_store: Optional[BasePydanticVectorStore] = None,
        transformations: Optional[List[TransformComponent]] = None,
        summary_module: Optional[SummaryModule] = None,
        existing_index: Optional[VectorStoreIndex] = None,
        mode: QueryModes = "collapsed",
        **kwargs: Any,
    ) -> None:
        """Init params."""
        super().__init__(
            **kwargs,
        )

        self.mode = mode
        self.summary_module = summary_module or SummaryModule(llm=llm)
        self.index = existing_index or VectorStoreIndex(
            nodes=[],
            storage_context=StorageContext.from_defaults(vector_store=vector_store),
            embed_model=embed_model,
            transformations=transformations,
        )
        self.tree_depth = tree_depth
        self.similarity_top_k = similarity_top_k

        if len(documents) > 0:
            asyncio.run(self.insert(documents))

    def _get_embeddings_per_level(self, level: int = 0) -> List[float]:
        """Retrieve embeddings per level in the abstraction tree.

        Args:
            level (int, optional): Target level. Defaults to 0 which stands for leaf nodes.

        Returns:
            List[float]: List of embeddings
        """
        filters = MetadataFilters(filters=[MetadataFilter("level", level)])

        # kind of janky, but should work with any vector index
        source_nodes = self.index.as_retriever(
            similarity_top_k=10000, filters=filters
        ).retrieve("retrieve")

        return [x.node for x in source_nodes]

    async def insert(self, documents: List[BaseNode]) -> None:
        """Given a set of documents, this function inserts higher level of abstractions within the index.

        For later retrieval

        Args:
            documents (List[BaseNode]): List of Documents
        """
        embed_model = self.index._embed_model
        transformations = self.index._transformations

        cur_nodes = run_transformations(documents, transformations, in_place=False)
        for level in range(self.tree_depth):
            # get the embeddings for the current documents

            if self._verbose:
                print(f"Generating embeddings for level {level}.")

            embeddings = await embed_model.aget_text_embedding_batch(
                [node.get_content(metadata_mode="embed") for node in cur_nodes]
            )
            assert len(embeddings) == len(cur_nodes)
            id_to_embedding = {
                node.id_: embedding for node, embedding in zip(cur_nodes, embeddings)
            }

            if self._verbose:
                print(f"Performing clustering for level {level}.")

            # cluster the documents
            nodes_per_cluster = get_clusters(cur_nodes, id_to_embedding)

            if self._verbose:
                print(
                    f"Generating summaries for level {level} with {len(nodes_per_cluster)} clusters."
                )
            summaries_per_cluster = await self.summary_module.generate_summaries(
                nodes_per_cluster
            )

            if self._verbose:
                print(
                    f"Level {level} created summaries/clusters: {len(nodes_per_cluster)}"
                )

            # replace the current nodes with their summaries
            new_nodes = [
                TextNode(
                    text=summary,
                    metadata={"level": level},
                    excluded_embed_metadata_keys=["level"],
                    excluded_llm_metadata_keys=["level"],
                )
                for summary in summaries_per_cluster
            ]

            # insert the nodes with their embeddings and parent_id
            nodes_with_embeddings = []
            for cluster, summary_doc in zip(nodes_per_cluster, new_nodes):
                for node in cluster:
                    node.metadata["parent_id"] = summary_doc.id_
                    node.excluded_embed_metadata_keys.append("parent_id")
                    node.excluded_llm_metadata_keys.append("parent_id")
                    node.embedding = id_to_embedding[node.id_]
                    nodes_with_embeddings.append(node)

            self.index.insert_nodes(nodes_with_embeddings)

            # set the current nodes to the new nodes
            cur_nodes = new_nodes

        self.index.insert_nodes(cur_nodes)

    async def collapsed_retrieval(self, query_str: str) -> Response:
        """Query the index as a collapsed tree -- i.e. a single pool of nodes."""
        return await self.index.as_retriever(
            similarity_top_k=self.similarity_top_k
        ).aretrieve(query_str)

    async def tree_traversal_retrieval(self, query_str: str) -> Response:
        """Query the index as a tree, traversing the tree from the top down."""
        # get top k nodes for each level, starting with the top
        parent_ids = None
        nodes = []
        level = self.tree_depth - 1
        while level >= 0:
            # retrieve nodes at the current level
            if parent_ids is None:
                nodes = await self.index.as_retriever(
                    similarity_top_k=self.similarity_top_k,
                    filters=MetadataFilters(
                        filters=[MetadataFilter(key="level", value=level)]
                    ),
                ).aretrieve(query_str)

                parent_ids = [node.id_ for node in nodes]
                if self._verbose:
                    print(f"Retrieved parent IDs from level {level}: {parent_ids!s}")
            # retrieve nodes that are children of the nodes at the previous level
            elif parent_ids is not None and len(parent_ids) > 0:
                nested_nodes = await asyncio.gather(
                    *[
                        self.index.as_retriever(
                            similarity_top_k=self.similarity_top_k,
                            filters=MetadataFilters(
                                filters=[MetadataFilter(key="parent_id", value=id_)]
                            ),
                        ).aretrieve(query_str)
                        for id_ in parent_ids
                    ]
                )

                nodes = [node for nested in nested_nodes for node in nested]

                if self._verbose:
                    print(f"Retrieved {len(nodes)} from parents at level {level}.")

                level -= 1
                parent_ids = None

        return nodes

    def _retrieve(self, query_bundle: QueryBundle) -> List[NodeWithScore]:
        """Retrieve nodes given query and mode."""
        # not used, needed for type checking

    def retrieve(
        self, query_str_or_bundle: QueryType, mode: Optional[QueryModes] = None
    ) -> List[NodeWithScore]:
        """Retrieve nodes given query and mode."""
        if isinstance(query_str_or_bundle, QueryBundle):
            query_str = query_str_or_bundle.query_str
        else:
            query_str = query_str_or_bundle

        return asyncio.run(self.aretrieve(query_str, mode or self.mode))

    async def aretrieve(
        self, query_str_or_bundle: QueryType, mode: Optional[QueryModes] = None
    ) -> List[NodeWithScore]:
        """Retrieve nodes given query and mode."""
        if isinstance(query_str_or_bundle, QueryBundle):
            query_str = query_str_or_bundle.query_str
        else:
            query_str = query_str_or_bundle

        mode = mode or self.mode
        if mode == "tree_traversal":
            return await self.tree_traversal_retrieval(query_str)
        elif mode == "collapsed":
            return await self.collapsed_retrieval(query_str)
        else:
            raise ValueError(f"Invalid mode: {mode}")

    def persist(self, persist_dir: str) -> None:
        self.index.storage_context.persist(persist_dir=persist_dir)

    @classmethod
    def from_persist_dir(
        cls: "RaptorRetriever",
        persist_dir: str,
        embed_model: Optional[BaseEmbedding] = None,
        **kwargs: Any,
    ) -> "RaptorRetriever":
        storage_context = StorageContext.from_defaults(persist_dir=persist_dir)
        return cls(
            [],
            existing_index=load_index_from_storage(
                storage_context, embed_model=embed_model
            ),
            **kwargs,
        )


## This class defines a `RaptorPack` that initializes a `RaptorRetriever` with various parameters and provides methods to get modules and run retrieval queries.

In [6]:
class RaptorPack(BaseLlamaPack):
    """Raptor pack."""

    def __init__(
        self,
        documents: List[BaseNode],
        llm: Optional[LLM] = None,
        embed_model: Optional[BaseEmbedding] = None,
        vector_store: Optional[BasePydanticVectorStore] = None,
        similarity_top_k: int = 2,
        mode: QueryModes = "collapsed",
        verbose: bool = True,
        **kwargs: Any,
    ) -> None:
        """Init params."""
        self.retriever = RaptorRetriever(
            documents,
            embed_model=embed_model,
            llm=llm,
            similarity_top_k=similarity_top_k,
            vector_store=vector_store,
            mode=mode,
            verbose=verbose,
            **kwargs,
        )

    def get_modules(self) -> Dict[str, Any]:
        """Get modules."""
        return {
            "retriever": self.retriever,
        }

    def run(
        self,
        query: str,
        mode: Optional[QueryModes] = None,
    ) -> Any:
        """Run the pipeline."""
        return self.retriever.retrieve(query, mode=mode)


## Import OpenAI and llamaindex libraries

In [15]:
import os
from typing import List
from llama_index.core import Document, Settings
from llama_index.llms.openai import OpenAI
from llama_index.embeddings.openai import OpenAIEmbedding

os.environ["OPENAI_API_KEY"] = ""

Settings.llm = OpenAI(model="gpt-4o-mini")

## This code defines functions to fetch markdown content from a given URL and create sample documents using the fetched content.

In [23]:
import requests

def fetch_markdown_content(url: str) -> str:
    """Fetch markdown content from the Jina Reader."""
    response = requests.get(f"https://r.jina.ai/{url}")
    response.raise_for_status()
    
    return response.text.strip()

def create_sample_documents() -> List[Document]:
    """Create a list of sample documents by fetching markdown content from Jina Reader."""
    
    urls = [
        "en.wikipedia.org/wiki/Artificial_intelligence", 
        "https://docs.crewai.com/getting-started/Start-a-New-CrewAI-Project-Template-Method/#customizing-your-project"
    ]
    
    documents = []
    for url in urls:
        markdown_text = fetch_markdown_content(url)
        documents.append(Document(text=markdown_text))
    
    return documents

sample_docs = create_sample_documents()
for doc in sample_docs:
    print(doc.text[:200], '\n')  


Title: Artificial intelligence

URL Source: http://en.wikipedia.org/wiki/Artificial_intelligence

Published Time: 2001-10-08T16:55:49Z

Markdown Content:
Jump to content
Main menu
Search
Appearance
Cr 

Title: Starting a New CrewAI Project - Using Template - crewAI

URL Source: https://docs.crewai.com/getting-started/Start-a-New-CrewAI-Project-Template-Method/

Markdown Content:
[](https://github.com 



## This code initializes a `RaptorPack` with sample documents and the `gpt-4o-mini` language model. It then runs a series of test queries against the `RaptorPack` and prints the results, including the content and score of each retrieved node.

In [24]:
documents = create_sample_documents()

# Initialize RaptorPack
raptor_pack = RaptorPack(
    documents,
    llm=OpenAI(model="gpt-4o-mini"),
    embed_model=OpenAIEmbedding(),
    similarity_top_k=2,
    mode=QueryModes.tree_traversal,
    verbose=True
)

# Test queries
queries = [
    "What is artificial intelligence?",
    "How to create a agent using yaml in CrewAI?",
    "How to achieve AGI?",
]

for query in queries:
    print(f"\nQuery: {query}")
    results = raptor_pack.run(query)
    for i, node in enumerate(results, 1):
        print(f"Result {i}:")
        print(f"Content: {node.node.get_content()}")
        print(f"Score: {node.score}")
        print("-" * 50)

       

Generating embeddings for level 0.
Performing clustering for level 0.
Generating summaries for level 0 with 9 clusters.
Level 0 created summaries/clusters: 9
Generating embeddings for level 1.
Performing clustering for level 1.
Generating summaries for level 1 with 1 clusters.
Level 1 created summaries/clusters: 1
Generating embeddings for level 2.
Performing clustering for level 2.
Generating summaries for level 2 with 1 clusters.
Level 2 created summaries/clusters: 1

Query: What is artificial intelligence?
Retrieved parent IDs from level 2: ['1de596a1-366f-4f6a-8a01-aa2cbd9f229c']
Retrieved 1 from parents at level 2.
Retrieved parent IDs from level 1: ['e60b28b2-2912-4188-9fee-171ce9884405']
Retrieved 2 from parents at level 1.
Retrieved parent IDs from level 0: ['db7f1f15-7990-4227-bf1f-6cfe44e4b3b5', '77164cf2-5fdf-4f43-88ca-32b77f145f84']
Retrieved 4 from parents at level 0.
Result 1:
Content: Title: Artificial intelligence

URL Source: http://en.wikipedia.org/wiki/Artificial_int