### Importing Packages

In [None]:
import logging
from pathlib import Path
from tempfile import mkdtemp

import requests
import torch
from docling_core.transforms.chunker.hierarchical_chunker import (
    ChunkingDocSerializer,
    ChunkingSerializerProvider,
)
from docling_core.transforms.chunker.tokenizer.huggingface import HuggingFaceTokenizer
from docling_core.transforms.serializer.markdown import MarkdownTableSerializer
from llama_index.core import SimpleDirectoryReader, StorageContext, VectorStoreIndex
from llama_index.core.data_structs import Node
from llama_index.core.response_synthesizers import get_response_synthesizer
from llama_index.core.schema import NodeWithScore, TransformComponent
from llama_index.core.vector_stores import MetadataFilter, MetadataFilters
from llama_index.core.vector_stores.types import VectorStoreQueryMode
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.llms.ollama import Ollama
from llama_index.node_parser.docling import DoclingNodeParser
from llama_index.readers.docling import DoclingReader
from llama_index.readers.elasticsearch import ElasticsearchReader
from llama_index.vector_stores.opensearch import (
    OpensearchVectorClient,
    OpensearchVectorStore,
)
from rich.console import Console
from rich.pretty import pprint
from transformers import AutoTokenizer

from docling.chunking import HybridChunker

logging.getLogger().setLevel(logging.WARNING)

### Check if GPU is available

In [None]:
device = torch.device("cuda")
print(f"CUDA GPU is enabled: {torch.cuda.get_device_name(0)}")

### Run Local OpenSearch Instance

ðŸ’¡The version of the OpenSearch instance needs to be compatible with the version of the OpenSearch Python Client library, since this library is used by the LlamaIndex framework, which we leverage in this notebook.

```
docker run \
    -it \
    --pull always \
    -p 9200:9200 \
    -p 9600:9600 \
    -e "discovery.type=single-node" \
    -e DISABLE_INSTALL_DEMO_CONFIG=true \
    -e DISABLE_SECURITY_PLUGIN=true \
    --name opensearch-node \
    -d opensearchproject/opensearch:3.0.0
```

### Verify OpenSearch

In [None]:
response = requests.get("http://localhost:9200")
print(response.text)

### Set up OpenSearch

In [None]:
# http endpoint for your cluster
OPENSEARCH_ENDPOINT = "http://localhost:9200"
# index to store the Docling document vectors
OPENSEARCH_INDEX = "docling-index"

### Set Up Language Model (HuggingFace and Ollama)

- Embedding Model: IBM's Granite Embedding 30M for embedding generation.
- LLM: IBM's Granite 3.1 MoE for model inference.

In [None]:
# the embedding model
EMBED_MODEL = HuggingFaceEmbedding(
    model_name="ibm-granite/granite-embedding-30m-english"
)
# maximum chunk size in tokens
EMBED_MAX_TOKENS = 200
# the generation model
GEN_MODEL = Ollama(
    model="granite3.1-moe",
    request_timeout=120.0,
    # Manually set the context window to limit memory usage
    context_window=8000,
    # Set temperature to 0 for reproducibility of the results
    temperature=0.0,
)
# a sample document
SOURCE = "https://arxiv.org/pdf/2408.09869"

embed_dim = len(EMBED_MODEL.get_text_embedding("hi"))
print(f"The embedding dimension is {embed_dim}.")

### Process Data Using DoclingÂ¶

A single PDF file is processed by Hybrid chunker to generate structured, hierarchical chunks suitable for downstream RAG tasks.

We will convert the original PDF file into a DoclingDocument format using a DoclingReader object. We specify the JSON export type to retain the document hierarchical structure as an input for the next step (chunking the document).

In [None]:
tmp_dir_path = Path(mkdtemp())
req = requests.get(SOURCE)
with open(tmp_dir_path / f"{Path(SOURCE).name}.pdf", "wb") as out_file:
    out_file.write(req.content)

reader = DoclingReader(export_type=DoclingReader.ExportType.JSON)
dir_reader = SimpleDirectoryReader(
    input_dir=tmp_dir_path,
    file_extractor={".pdf": reader},
)

# load the PDF files
documents = dir_reader.load_data()

ValueError: Directory /Home/mayur/Downloads/2408.09869v5.pdf does not exist.

### Load data into OpenSearch

Before loading the data into open search, we have to transform the data.

- DoclingNodeParser: It executes the document-based chunking with the hybrid chunker, which leverages the tokenizer of the embedding model to ensure that the resulting chunks fit within the model input text limit.

- MetadataTransform: It is a custom transformation to ensure that generated chunk metadata is best formatted for indexing with OpenSearch

In [None]:
# create the hybrid chunker
tokenizer = HuggingFaceTokenizer(
    tokenizer=AutoTokenizer.from_pretrained(EMBED_MODEL.model_name),
    max_tokens=EMBED_MAX_TOKENS,
)
chunker = HybridChunker(tokenizer=tokenizer)

# create a Docling node parser
node_parser = DoclingNodeParser(chunker=chunker)


# create a custom transformation to avoid out-of-range integers
class MetadataTransform(TransformComponent):
    def __call__(self, nodes, **kwargs):
        for node in nodes:
            binary_hash = node.metadata.get("origin", {}).get("binary_hash", None)
            if binary_hash is not None:
                node.metadata["origin"]["binary_hash"] = str(binary_hash)
        return nodes

### Embed and Insert Data

Using opensearch vector client, we embed the document into the index.

The key action takes place in VectorStoreIndex:

- Read document
- Apply transformation
- Generating embeddings and indexing them to vector store.

In [None]:
# OpensearchVectorClient stores text in this field by default
text_field = "content"
# OpensearchVectorClient stores embeddings in this field by default
embed_field = "embedding"

client = OpensearchVectorClient(
    endpoint=OPENSEARCH_ENDPOINT,
    index=OPENSEARCH_INDEX,
    dim=embed_dim,
    engine="faiss",
    embedding_field=embed_field,
    text_field=text_field,
)

vector_store = OpensearchVectorStore(client)
storage_context = StorageContext.from_defaults(vector_store=vector_store)

index = VectorStoreIndex.from_documents(
    documents=documents,
    transformations=[node_parser, MetadataTransform()],
    storage_context=storage_context,
    embed_model=EMBED_MODEL,
)

### Building RAG

Assemble RAG system, execute a query and get the response.

In [None]:
console = Console(width=88)

QUERY = "Which are the main AI models in Docling?"
query_engine = index.as_query_engine(llm=GEN_MODEL)
res = query_engine.query(QUERY)

console.print(f"ðŸ‘¤: {QUERY}\nðŸ¤–: {res.response.strip()}")

### Custom Serializer

Docling can extract the table content and process it for chunking, like other text elements.

In the following example, the response is generated from a retrieved chunk containing a table.

In [None]:
QUERY = "What is the time to solution with the native backend on Intel?"
query_engine = index.as_query_engine(llm=GEN_MODEL)
res = query_engine.query(QUERY)
console.print(f"ðŸ‘¤: {QUERY}\nðŸ¤–: {res.response.strip()}")

The result above was generated with the table serialized in a triplet format. Language models may perform better on complex tables if the structure is represented in a format that is widely adopted, like markdown.

For this purpose, we can leverage a custom serializer that transforms tables in markdown format:

In [None]:
class MDTableSerializerProvider(ChunkingSerializerProvider):
    def get_serializer(self, doc):
        return ChunkingDocSerializer(
            doc=doc,
            # configuring a different table serializer
            table_serializer=MarkdownTableSerializer(),
        )


# clear the database from the previous chunks
client.clear()
vector_store.clear()

chunker = HybridChunker(
    tokenizer=tokenizer,
    max_tokens=EMBED_MAX_TOKENS,
    serializer_provider=MDTableSerializerProvider(),
)
node_parser = DoclingNodeParser(chunker=chunker)
index = VectorStoreIndex.from_documents(
    documents=documents,
    transformations=[node_parser, MetadataTransform()],
    storage_context=storage_context,
    embed_model=EMBED_MODEL,
)

**More accurate results after Custom Serialization**

In [None]:
query_engine = index.as_query_engine(llm=GEN_MODEL)
res = query_engine.query(QUERY)
console.print(f"ðŸ‘¤: {QUERY}\nðŸ¤–: {res.response.strip()}")

### Filter Context Query

Since we are storing chunks in hierarchical structure, we can leverage document structure using Docling to improve the RAG \
performance for both retrieval and for answering questions.

For example, we can use chunk metadata with layout information to run queries in a filter context, for high retrieval accuracy.

In [None]:
def display_nodes(nodes):
    res = []
    for idx, item in enumerate(nodes):
        doc_res = {"k": idx + 1, "score": item.score, "text": item.text, "items": []}
        doc_items = item.metadata["doc_items"]
        for doc in doc_items:
            doc_res["items"].append({"ref": doc["self_ref"], "label": doc["label"]})
        res.append(doc_res)
    pprint(res, max_string=200)

In [None]:
retriever = index.as_retriever(similarity_top_k=1)

QUERY = "How does pypdfium perform?"
nodes = retriever.retrieve(QUERY)

print(QUERY)
display_nodes(nodes)

**Restrict the retrieval to only those chunks containing tabular data to retrieve more quantitative information**

In [None]:
filters = MetadataFilters(
    filters=[MetadataFilter(key="doc_items.label", value="table")]
)

table_retriever = index.as_retriever(filters=filters, similarity_top_k=1)
nodes = table_retriever.retrieve(QUERY)

print(QUERY)
display_nodes(nodes)

### Hybrid Search Retrieval with RRFÂ¶

It combines keyword and semantic search to improve search relevance. To avoid relying on traditional score normalization techniques, the reciprocal rank fusion (RRF) feature on hybrid search can significantly improve the relevance of the retrieved chunks in our RAG system.

*Reciprocal Rank Fusion (RRF) is a powerful algorithm that merges ranked search results from multiple retrieval methods (like keyword and semantic search) into a single, superior list by focusing on rank positions rather than raw scores.*

First, create a search pipeline and specify RRF as technique:

In [None]:
url = f"{OPENSEARCH_ENDPOINT}/_search/pipeline/rrf-pipeline"
headers = {"Content-Type": "application/json"}
body = {
    "description": "Post processor for hybrid RRF search",
    "phase_results_processors": [
        {"score-ranker-processor": {"combination": {"technique": "rrf"}}}
    ],
}

response = requests.put(url, json=body, headers=headers)
print(response.text)

In [None]:
client_rrf = OpensearchVectorClient(
    endpoint=OPENSEARCH_ENDPOINT,
    index=f"{OPENSEARCH_INDEX}-rrf",
    dim=embed_dim,
    engine="faiss",
    embedding_field=embed_field,
    text_field=text_field,
    search_pipeline="rrf-pipeline",
)

vector_store_rrf = OpensearchVectorStore(client_rrf)
storage_context_rrf = StorageContext.from_defaults(vector_store=vector_store_rrf)
index_hybrid = VectorStoreIndex.from_documents(
    documents=documents,
    transformations=[node_parser, MetadataTransform()],
    storage_context=storage_context_rrf,
    embed_model=EMBED_MODEL,
)

The first retriever, which entirely relies on semantic (vector) search, fails to catch the supporting chunk for the given question in the top 1 position. Note that we highlight few expected keywords for illustration purposes.

In [None]:
QUERY = "Does Docling project provide a Dockerfile?"
retriever = index.as_retriever(similarity_top_k=3)
nodes = retriever.retrieve(QUERY)
exp = "Docling also provides a Dockerfile"
start = "[bold yellow]"
end = "[/]"
for idx, item in enumerate(nodes):
    console.print(
        f"*** k={idx + 1} ***\n{item.text.strip().replace(exp, f'{start}{exp}{end}')}"
    )

However, the retriever with the hybrid search pipeline effectively recognizes the key paragraph in the first position:

In [None]:
retriever_rrf = index_hybrid.as_retriever(
    vector_store_query_mode=VectorStoreQueryMode.HYBRID, similarity_top_k=3
)
nodes = retriever_rrf.retrieve(QUERY)
for idx, item in enumerate(nodes):
    console.print(
        f"*** k={idx + 1} ***\n{item.text.strip().replace(exp, f'{start}{exp}{end}')}"
    )

### Context Expansion

**Small Chunks**

- Increased retrieval precision
- Answer question tightly focused.
- Improves the accuracy, reduces hallucination, and speeds up inference.

**But, it may overlook the contextual information**

Docling by preserving the document structure, it enables us to employ various strategies to use context for more accurate RAG performance.

For example, after identifying the most relevant chunk, you might include adjacent chunks from the same section as additional grounding material before generating the final answer.

In the following example, the top retrieved chunks do not contain all the information that is required to answer the question.

In [None]:
QUERY = "According to the tests with arXiv and IBM Redbooks, which backend should I use if I have limited resources and complex tables?"
query_rrf = index_hybrid.as_query_engine(
    vector_store_query_mode=VectorStoreQueryMode.HYBRID,
    llm=GEN_MODEL,
    similarity_top_k=3,
)
res = query_rrf.query(QUERY)
console.print(f"ðŸ‘¤: {QUERY}\nðŸ¤–: {res.response.strip()}")

In [None]:
nodes = retriever_rrf.retrieve(QUERY)
for idx, item in enumerate(nodes):
    console.print(
        f"*** k={idx + 1} ***\n{item.text.strip().replace(exp, f'{start}{exp}{end}')}"
    )

ðŸ’¡ In a production setting, it may be preferable to persist the parsed documents (i.e., DoclingDocument objects) as JSON in an object store or database and then fetch them when you need to traverse the document for contextâ€‘expansion scenarios. In this simplified example, however, we will query the OpenSearch index directly to obtain the required chunks.

In [None]:
top_headings = nodes[0].metadata["headings"]
top_text = nodes[0].text

rdr = ElasticsearchReader(endpoint=OPENSEARCH_ENDPOINT, index=OPENSEARCH_INDEX)
docs = rdr.load_data(
    field=text_field,
    query={
        "query": {
            "terms_set": {
                "metadata.headings.keyword": {
                    "terms": top_headings,
                    "minimum_should_match_script": {"source": "params.num_terms"},
                }
            }
        }
    },
)
ext_nodes = []
for idx, item in enumerate(docs):
    if item.text == top_text:
        ext_nodes.append(NodeWithScore(node=Node(text=item.text), score=1.0))
        if idx > 0:
            ext_nodes.append(
                NodeWithScore(node=Node(text=docs[idx - 1].text), score=1.0)
            )
        if idx < len(docs) - 1:
            ext_nodes.append(
                NodeWithScore(node=Node(text=docs[idx + 1].text), score=1.0)
            )
        break

synthesizer = get_response_synthesizer(llm=GEN_MODEL)
res = synthesizer.synthesize(query=QUERY, nodes=ext_nodes)
console.print(f"ðŸ‘¤: {QUERY}\nðŸ¤–: {res.response.strip()}")