In [1]:
import os
from pathlib import Path
from tempfile import mkdtemp
from warnings import filterwarnings
from rich.pretty import pprint
from dotenv import load_dotenv

import time
import copy
from typing import List
import pandas as pd
from pathlib import Path
from typing import Tuple
import base64
from pydantic import BaseModel
# from io import BytesIO
# import cv2
from matplotlib import pyplot as plt
import numpy as np
from pydantic import TypeAdapter

import requests

import PyPDF2
from pdf2image import convert_from_bytes
from IPython.display import display, Markdown, HTML, display_html
from PIL import Image, ImageDraw

from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.llms.huggingface_api import HuggingFaceInferenceAPI

# from llama_index.core import StorageContext, VectorStoreIndex
# from llama_index.vector_stores.milvus import MilvusVectorStore
from llama_index.core.node_parser import MarkdownNodeParser
from llama_index.readers.docling import DoclingReader

from llama_index.node_parser.docling import DoclingNodeParser
from llama_index.core import Document as LIDocument

from docling.datamodel.base_models import ConversionStatus
from docling.datamodel.document import ConversionResult
from docling.datamodel.settings import settings



from docling.datamodel.base_models import InputFormat
from docling.datamodel.pipeline_options import PdfPipelineOptions
from docling.document_converter import DocumentConverter, PdfFormatOption
from docling.models.ocr_mac_model import OcrMacOptions
from docling.models.tesseract_ocr_cli_model import TesseractCliOcrOptions
#from docling.models.tesseract_ocr_model import TesseractOcrOptions
from docling.datamodel.pipeline_options import (
    EasyOcrOptions,
    OcrMacOptions,
    PdfPipelineOptions,
    RapidOcrOptions,
    TesseractCliOcrOptions,
    TesseractOcrOptions,
    AcceleratorDevice,
    AcceleratorOptions,
)

from docling.datamodel.settings import settings

#from docling.backend.docling_parse_backend import DoclingParseDocumentBackend



  from .autonotebook import tqdm as notebook_tqdm


In [2]:
import json
import logging
import time
from pathlib import Path
from typing import Iterable

def _get_env_from_colab_or_os(key):
    try:
        from google.colab import userdata

        try:
            return userdata.get(key)
        except userdata.SecretNotFoundError:
            pass
    except ImportError:
        pass
    return os.getenv(key)

load_dotenv()

_log = logging.getLogger(__name__)

filterwarnings(action="ignore", category=UserWarning, module="pydantic")
filterwarnings(action="ignore", category=FutureWarning, module="easyocr")
# https://github.com/huggingface/transformers/issues/5486:
os.environ["TOKENIZERS_PARALLELISM"] = "false"

OUTPUT_DIR = Path("../results/extract")
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)


#### PLEASE PROVIDE THE PATH OF SOURCE DOCUMENT
# SOURCE = "/Users/gurvindersingh/Library/CloudStorage/Box-Box/MyDocs/2024/GSIs/NTTData/BoI/docs/input_output/FinancialReport1.pdf"
# SOURCE = "/Users/gurvindersingh/Documents/development/datasets/FAQs/US_Visa_FAQ.pdf"
SOURCE = "./pdfs/US_Visa_FAQ.pdf"
pdf_file_path = SOURCE


In [3]:
import os
import pathlib
from docling_core.types.doc.document import DoclingDocument as DLDocument

def read_parquet(file_path):
    document_content_df = pd.read_parquet(file_path)
    tempStr: str = document_content_df.to_json(orient='records')
    tempObj = json.loads(tempStr)
    jsonString = json.dumps(tempObj[0]["0"])
    jsonObj = json.loads(jsonString)
    # print(jsonObj)
    # pprint(jsonObj, max_length=5, max_string=50, max_depth=4)       
    dl_output: DLDocument = DLDocument.model_validate_json(jsonObj)    
    return dl_output

# file_name = "PRINTER_cpd58007"
collection_name = os.path.basename(SOURCE).split('/')[-1].split('.')[0]
parquet_file_path = OUTPUT_DIR / f"{collection_name}.parquet"
dl_obj = read_parquet(parquet_file_path)
# print(f"\n\n{dl_obj.export_to_markdown()}\n\n")
# pprint(dl_obj, max_length=10, max_string=50, max_depth=4)

### Utility methods

In [4]:
import io
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from PIL import Image
import copy

from docling_core.types.doc.document import (
    DocItem,
    DocumentOrigin,
    LevelNumber,
    ListItem,
    SectionHeaderItem,
    TableItem,
    TextItem,
    PictureItem
)
from docling_core.types.doc.labels import DocItemLabel
from docling_core.types.doc.base import (
    CoordOrigin,
    Size,
    BoundingBox
)

IMAGE_RESOLUTION_SCALE = 2.0

def convert_to_image(pdf_path, page_num):
    pdf_file = open(pdf_path, 'rb')
    pdf_reader = PyPDF2.PdfReader(pdf_file)
    page = pdf_reader.pages[page_num]
   
    pdf_writer = PyPDF2.PdfWriter()
    pdf_writer.add_page(page)

    buffer = io.BytesIO()
    pdf_writer.write(buffer)
    
    # Convert output PDF data to image and save as PNG file
    images = convert_from_bytes(buffer.getvalue())
    buffer.close()    
    pdf_file.close()
    
    return images[0]

def crop_image_from_pdf(page_image, page_size: Size, crop_coords):
    """Crops an image from a PDF page."""
    # page_image = convert_to_image(pdf_path, page_num)
    page_image = page_image.resize((int(page_size.width), int(page_size.height)))
    cropped_im = page_image.crop(crop_coords.as_tuple())
    return cropped_im

def convert_to_pil_image(plt):
    buf = io.BytesIO()
    plt.savefig(buf, format='png')
    buf.seek(0)
    pil_img = copy.deepcopy(Image.open(buf))   
    buf.close()
    return pil_img

def convert_to_img_bytes(pil_img):
    img_byte_arr = io.BytesIO()
    pil_img.save(img_byte_arr, format='PNG')
    img_byte_arr = img_byte_arr.getvalue()
    return img_byte_arr
    

### Document Chunking (Llamaindex)

In [5]:
import uuid
from llama_index.core import Document as LIDocument

file_name = os.path.basename(SOURCE).split('/')[-1].split('.')[0]
parquet_file_path = OUTPUT_DIR / f"{file_name}.parquet"
dl_obj = read_parquet(parquet_file_path)

copied_dl_obj2 = copy.deepcopy(dl_obj)
extra_info = None

def _uuid4_doc_id_gen(doc: DLDocument, file_path: str | Path) -> str:
    return str(uuid.uuid4())

# SOURCE = "../datasets/pdfs/other/DocLaynetPaper.pdf"

text = json.dumps(copied_dl_obj2.export_to_dict())

li_doc = LIDocument(
                doc_id=_uuid4_doc_id_gen(doc=copied_dl_obj2, file_path=SOURCE),
                text=text,
            )
li_doc.metadata = extra_info or {}

documents = [li_doc]


## OR DIRECTLY USING DOCLING READER


# pipeline_options = PdfPipelineOptions()
# pipeline_options.do_ocr = True
# pipeline_options.do_table_structure = True
# pipeline_options.table_structure_options.do_cell_matching = True

# doc_converter = DocumentConverter(
#     format_options={
#         InputFormat.PDF: PdfFormatOption(pipeline_options=pipeline_options)
#     }
# )

# # doc_converter: DocumentConverter = Field(default_factory=DocumentConverter)
# reader = DoclingReader(export_type=DoclingReader.ExportType.JSON, doc_converter=doc_converter)
# documents=reader.load_data(SOURCE)


# reader = DoclingReader(export_type=DoclingReader.ExportType.JSON)
# documents=reader.load_data(SOURCE)

pprint(documents, max_length=10, max_string=50, max_depth=4)



In [6]:
#
# Copyright IBM Corp. 2024 - 2025
# SPDX-License-Identifier: MIT
#

"""Chunker implementation leveraging the document structure."""

from __future__ import annotations

import logging
import re
from typing import Any, ClassVar, Final, Iterator, Literal, Optional

from pandas import DataFrame
from pydantic import Field, StringConstraints, field_validator
from typing_extensions import Annotated

from docling_core.search.package import VERSION_PATTERN
from docling_core.transforms.chunker import BaseChunk, BaseChunker, BaseMeta
from docling_core.types import DoclingDocument as DLDocument
from docling_core.types.doc.document import (
    DocItem,
    DocumentOrigin,
    LevelNumber,
    ListItem,
    SectionHeaderItem,
    TableItem,
    PictureItem,
    TextItem,
)
from docling_core.types.doc.labels import DocItemLabel

_VERSION: Final = "1.0.0"

_KEY_SCHEMA_NAME = "schema_name"
_KEY_VERSION = "version"
_KEY_DOC_ITEMS = "doc_items"
_KEY_HEADINGS = "headings"
_KEY_CAPTIONS = "captions"
_KEY_ORIGIN = "origin"
_KEY_NODE_TYPE = "node_type"

_logger = logging.getLogger(__name__)


class DocMeta(BaseMeta):
    """Data model for Hierarchical Chunker chunk metadata."""

    schema_name: Literal["docling_core.transforms.chunker.DocMeta"] = Field(
        default="docling_core.transforms.chunker.DocMeta",
        alias=_KEY_SCHEMA_NAME,
    )
    version: Annotated[str, StringConstraints(pattern=VERSION_PATTERN, strict=True)] = (
        Field(
            default=_VERSION,
            alias=_KEY_VERSION,
        )
    )
    doc_items: list[DocItem] = Field(
        alias=_KEY_DOC_ITEMS,
        min_length=1,
    )
    headings: Optional[list[str]] = Field(
        default=None,
        alias=_KEY_HEADINGS,
        min_length=1,
    )
    captions: Optional[list[str]] = Field(
        default=None,
        alias=_KEY_CAPTIONS,
        min_length=1,
    )
    origin: Optional[DocumentOrigin] = Field(
        default=None,
        alias=_KEY_ORIGIN,
    )

    excluded_embed: ClassVar[list[str]] = [
        _KEY_SCHEMA_NAME,
        _KEY_VERSION,
        _KEY_DOC_ITEMS,
        _KEY_ORIGIN,
    ]
    excluded_llm: ClassVar[list[str]] = [
        _KEY_SCHEMA_NAME,
        _KEY_VERSION,
        _KEY_DOC_ITEMS,
        _KEY_ORIGIN,
    ]

    node_type: Optional[str] = Field(
        default=None,
        alias=_KEY_NODE_TYPE,
    )

    @field_validator(_KEY_VERSION)
    @classmethod
    def check_version_is_compatible(cls, v: str) -> str:
        """Check if this meta item version is compatible with current version."""
        current_match = re.match(VERSION_PATTERN, _VERSION)
        doc_match = re.match(VERSION_PATTERN, v)
        if (
            doc_match is None
            or current_match is None
            or doc_match["major"] != current_match["major"]
            or doc_match["minor"] > current_match["minor"]
        ):
            raise ValueError(f"incompatible version {v} with schema version {_VERSION}")
        else:
            return _VERSION


class DocChunk(BaseChunk):
    """Data model for Hierarchical Chunker chunks."""

    meta: DocMeta


class CustomHierarchicalChunker(BaseChunker):
    r"""Chunker implementation leveraging the document layout.

    Args:
        merge_list_items (bool): Whether to merge successive list items.
            Defaults to True.
        delim (str): Delimiter to use for merging text. Defaults to "\n".
    """

    merge_list_items: bool = True
    delim: str = "\n"

    @classmethod
    def _clean_text(cls, text):
        cleaned_text = text.strip()
        cleaned_text = cleaned_text.replace(". . .", "")
        cleaned_text = cleaned_text.replace("--", "")
        cleaned_text = cleaned_text.replace("\n", " ")
        cleaned_text = cleaned_text.replace("(", " ")
        cleaned_text = cleaned_text.replace(")", " ")
        cleaned_text = cleaned_text.replace("\xa0", " ")
        cleaned_text = cleaned_text.replace("\uf0b7", "l")
        cleaned_text = cleaned_text.replace("/s/", "")
        return cleaned_text

    @classmethod
    def _triplet_serialize(cls, table_df: DataFrame) -> str:

        # copy header as first row and shift all rows by one
        table_df.loc[-1] = table_df.columns  # type: ignore[call-overload]
        table_df.index = table_df.index + 1
        table_df = table_df.sort_index()

        rows = [str(item).strip() for item in table_df.iloc[:, 0].to_list()]
        cols = [str(item).strip() for item in table_df.iloc[0, :].to_list()]

        nrows = table_df.shape[0]
        ncols = table_df.shape[1]
        texts = [
            f"{rows[i]}, {cols[j]} = {str(table_df.iloc[i, j]).strip()}"
            for i in range(1, nrows)
            for j in range(1, ncols)
        ]
        output_text = ". ".join(texts)

        return output_text

    def chunk(self, dl_doc: DLDocument, **kwargs: Any) -> Iterator[BaseChunk]:
        r"""Chunk the provided document.

        Args:
            dl_doc (DLDocument): document to chunk

        Yields:
            Iterator[Chunk]: iterator over extracted chunks
        """
        heading_by_level: dict[LevelNumber, str] = {}
        list_items: list[TextItem] = []
        for item, level in dl_doc.iterate_items():
            captions = None
            node_type = None
            if isinstance(item, DocItem):
                node_type = type(item).__name__
                # first handle any merging needed
                if self.merge_list_items:
                    if isinstance(
                        item, ListItem
                    ) or (  # TODO remove when all captured as ListItem:
                        isinstance(item, TextItem)
                        and item.label == DocItemLabel.LIST_ITEM
                    ):
                        list_items.append(item)
                        continue
                    elif list_items:  # need to yield
                        yield DocChunk(
                            text=self.delim.join([i.text for i in list_items]),
                            meta=DocMeta(
                                doc_items=list_items,
                                headings=[
                                    heading_by_level[k]
                                    for k in sorted(heading_by_level)
                                ]
                                or None,
                                origin=dl_doc.origin,
                                node_type=node_type
                            ),
                        )
                        list_items = []  # reset

                if isinstance(item, SectionHeaderItem) or (
                    isinstance(item, TextItem)
                    and item.label in [DocItemLabel.SECTION_HEADER, DocItemLabel.TITLE]
                ):
                    level = (
                        item.level
                        if isinstance(item, SectionHeaderItem)
                        else (0 if item.label == DocItemLabel.TITLE else 1)
                    )
                    heading_by_level[level] = item.text

                    # remove headings of higher level as they just went out of scope
                    keys_to_del = [k for k in heading_by_level if k > level]
                    for k in keys_to_del:
                        heading_by_level.pop(k, None)
                    continue

                if isinstance(item, TextItem) or (
                    (not self.merge_list_items) and isinstance(item, ListItem)
                ):
                    text = item.text
                elif isinstance(item, TableItem):
                    table_df = item.export_to_dataframe()
                    if table_df.shape[0] < 1 or table_df.shape[1] < 2:
                        # at least two cols needed, as first column contains row headers
                        continue
                    # text = self._triplet_serialize(table_df=table_df)
                    text = table_df.to_markdown()
                    # text = self._clean_text(text)
                    captions = [
                        c.text for c in [r.resolve(dl_doc) for r in item.captions]
                    ] or None
                elif isinstance(item, PictureItem):
                    if item.annotations is not None and len(item.annotations) > 0:
                        text = ", ".join([annotation.text for annotation in item.annotations])
                        text = "PICTURE METADATA: " + text
                    else:
                        text = "NO PICTURE METADATA"
                        
                    captions = [
                        c.text for c in [r.resolve(dl_doc) for r in item.captions]
                    ] or None
                else:
                    continue
                   
                c = DocChunk(
                    text=text,
                    meta=DocMeta(
                        doc_items=[item],
                        headings=[heading_by_level[k] for k in sorted(heading_by_level)]
                        or None,
                        captions=captions,
                        origin=dl_doc.origin,
                        node_type=node_type
                    ),
                )
                yield c

        if self.merge_list_items and list_items:  # need to yield
            yield DocChunk(
                text=self.delim.join([i.text for i in list_items]),
                meta=DocMeta(
                    doc_items=list_items,
                    headings=[heading_by_level[k] for k in sorted(heading_by_level)]
                    or None,
                    origin=dl_doc.origin,
                ),
            )

In [8]:
from typing import Any, Iterable, Protocol, Sequence, runtime_checkable
import uuid
import copy
from pydantic import BaseModel

from llama_index.core.schema import Document as LIDocument
from llama_index.core.node_parser import NodeParser

from docling_core.transforms.chunker import BaseChunker, HierarchicalChunker
from docling_core.types import DoclingDocument as DLDocument
from llama_index.core import Document as LIDocument
from llama_index.core.node_parser import NodeParser
from llama_index.core.schema import (
    BaseNode,
    NodeRelationship,
    RelatedNodeType,
    TextNode,
)
from llama_index.core.utils import get_tqdm_iterable

from llama_index.core.node_parser import (
    SentenceSplitter,
    SemanticSplitterNodeParser,
    TokenTextSplitter
)

class ChunkConfig(BaseModel):   
    split_at: str
    chunk_size: int
    chunk_overlap: int 
    
    def __init__(self, chunk_size: int, chunk_overlap: int, split_at: str = "HEADER"):
        super().__init__(chunk_size=chunk_size, chunk_overlap=chunk_overlap, split_at=split_at)
        self.split_at = split_at
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap  

class CustomNodeParser(NodeParser):
    """Docling format node parser.

    Splits the JSON format of `DoclingReader` into nodes corresponding
    to respective document elements from Docling's data model
    (paragraphs, headings, tables etc.).

    Args:
        chunker (BaseChunker, optional): The chunker to use. Defaults to `HierarchicalChunker()`.
        id_func(NodeIDGenCallable, optional): The node ID generation function to use. Defaults to `_uuid4_node_id_gen`.
    """

    @runtime_checkable
    class NodeIDGenCallable(Protocol):
        def __call__(self, i: int, node: BaseNode) -> str:
            ...

    @staticmethod
    def _uuid4_node_id_gen(i: int, node: BaseNode) -> str:
        return str(uuid.uuid4())

    chunker: BaseChunker = HierarchicalChunker()
    id_func: NodeIDGenCallable = _uuid4_node_id_gen
    chunk_config: ChunkConfig = ChunkConfig(chunk_size=512, chunk_overlap=30)

    def _split_by_header(self, chunks):
        updated_chunks = []
        for ix, chunk in enumerate(chunks):
            if len(chunk.text) < 7:
                ## We dont need very small text nodes (like page numbers etc.)
                # print(f"REMOVING CHUNK AS THE TEXT SIZE IS VERY SMALL: {chunk.text}")
                continue 

            header = None
            if 'headings' in chunk.meta or chunk.meta.headings:
                header = " >> ".join(chunk.meta.headings)

            # if chunk.meta.node_type == "TableItem":
            #     updated_chunks.append(chunk)
            #     continue

            if len(updated_chunks) > 0:
                previousChunk = updated_chunks.pop()
                # if previousChunk.meta.node_type == "TableItem":
                #     updated_chunks.append(previousChunk)
                #     updated_chunks.append(chunk)
                #     continue
                
                if previousChunk.meta.headings == chunk.meta.headings and previousChunk.text and chunk.text:
                    if 'NO PICTURE METADATA' in previousChunk.text:
                        # previousChunk.text = f"{chunk.text}"
                        previousChunk.text = previousChunk.text.replace('NO PICTURE METADATA', '<--  IMAGE -->')                       
                    
                    previousChunk.text = f"{previousChunk.text}\n\n{chunk.text}"
                    
                    if previousChunk.meta.doc_items and chunk.meta.doc_items:
                        previousChunk.meta.doc_items.extend(chunk.meta.doc_items)
                    if previousChunk.meta.captions and chunk.meta.captions:
                        previousChunk.meta.captions.extend(chunk.meta.captions)
                    if previousChunk.meta.node_type == "TableItem" or chunk.meta.node_type == "TableItem":
                        previousChunk.meta.node_type = "TableItem"
                        
                    updated_chunks.append(previousChunk)
                    # print(f"## CHUNKS MERGED, new text length: {len(previousChunk.text)}, Headings: {previousChunk.meta.headings}\n")
                else:
                    updated_chunks.append(previousChunk)
                    updated_chunks.append(chunk)
            else:
                updated_chunks.append(chunk)
           
        return updated_chunks

    def _split_or_merge(self, chunks):

        text_splitter = SentenceSplitter(
            # separator=" ",
            chunk_size=self.chunk_config.chunk_size,
            chunk_overlap=self.chunk_config.chunk_overlap,
            # paragraph_separator="\n\n\n",
            # secondary_chunking_regex="[^,.;。]+[,.;。]?"
        )

        updated_chunks = []
        for ix, chunk in enumerate(chunks):
            if chunk.text and len(chunk.text) <= self.chunk_config.chunk_size:
                    # if len(chunk.text) < 7:
                    #     ## We dont need very small text nodes (like page numbers etc.)
                    #     # print(f"REMOVING CHUNK AS THE TEXT SIZE IS VERY SMALL: {chunk.text}")
                    #     continue 

                    # if len(updated_chunks) > 0:
                    #     previousChunk = updated_chunks.pop()
                    #     # if previousChunk.meta.headings == chunk.meta.headings and previousChunk.text and chunk.text and (len(previousChunk.text) + len(chunk.text)) < self.chunk_config.chunk_size:
                    #     if previousChunk.text and chunk.text and (len(previousChunk.text) + len(chunk.text)) < self.chunk_config.chunk_size:
                    #         # pprint(previousChunk, max_length=10, max_string=50, max_depth=4)
                    #         # print(f"IN LOGIC TO MERGE CHUNKS, previousChunk length: {len(previousChunk.text)}, current chunk length: {len(chunk.text)}")
                    #         if 'NO PICTURE METADATA' in previousChunk.text:
                    #             # previousChunk.text = f"{chunk.text}"
                    #             previousChunk.meta.node_type = "PictureItem"
                    #             previousChunk.text = previousChunk.text.replace('NO PICTURE METADATA', '<--  IMAGE -->')
                    #             previousChunk.text = f"{previousChunk.text}\n\n{chunk.text}"
                    #         else:
                    #             previousChunk.text = f"{previousChunk.text}\n\n{chunk.text}"
                    #             # print(f"CHUNKS MERGED, Length now: {len(previousChunk.text)}")
                    #         if previousChunk.meta.doc_items and chunk.meta.doc_items:
                    #             previousChunk.meta.doc_items.extend(chunk.meta.doc_items)
                    #         if previousChunk.meta.captions and chunk.meta.captions:
                    #             previousChunk.meta.captions.extend(chunk.meta.captions)
                    #         if previousChunk.meta.node_type == "TableItem" or chunk.meta.node_type == "TableItem":
                    #             previousChunk.meta.node_type = "TableItem"
                    #         updated_chunks.append(previousChunk)
                    #         # print(f"## CHUNKS MERGED, new text length: {len(previousChunk.text)}, Headings: {previousChunk.meta.headings}\n")
                    #     else:
                    #         updated_chunks.append(previousChunk)
                    #         updated_chunks.append(chunk)                        
                    # else:
                    updated_chunks.append(chunk)
            else:
                # print(f"LOGIC FOR SPLITTING TEXT, length: {len(chunk.text)} ")
                if chunk.meta.node_type == "TableItem":
                    #Do not chunk if TableItem
                    updated_chunks.append(chunk)
                else:
                    texts = text_splitter.split_text(chunk.text)
                    if len(texts) > 1:
                        for ix, text in enumerate(texts):
                            newNode = copy.deepcopy(chunk)
                            newNode.text = text                        
                            updated_chunks.append(newNode)
                            # print(f"\nAFTER SPLIT TEXT LENGTH: {ix}) {len(text)}, Original text length: {len(chunk.text)}\n")
                    else:
                        updated_chunks.append(chunk)
        return updated_chunks

    def _parse_nodes(
        self,
        nodes: Sequence[BaseNode],
        show_progress: bool = False,
        **kwargs: Any,
    ) -> list[BaseNode]:
        nodes_with_progress: Iterable[BaseNode] = get_tqdm_iterable(
            items=nodes, show_progress=show_progress, desc="Parsing nodes"
        )
        all_nodes: list[BaseNode] = []
        for input_node in nodes_with_progress:
            li_doc = LIDocument.model_validate(input_node)
            dl_doc: DLDocument = DLDocument.model_validate_json(li_doc.get_content())
            chunk_iter = self.chunker.chunk(dl_doc=dl_doc)

            if self.chunk_config.split_at == 'HEADER':
                updated_chunks = self._split_by_header(chunk_iter)    
                # print(f"\n\nUPDATED CHUNKS SIZE AFTER split_at=HEADER is {len(temp_chunks)}\n\n")
                updated_chunks = self._split_or_merge(copy.deepcopy(updated_chunks))
            else:
                updated_chunks = self._split_or_merge(chunk_iter)
            
            print(f"\n\nUPDATED CHUNKS SIZE: {len(updated_chunks)}\n\n")
            for i, chunk in enumerate(updated_chunks):
                
                rels: dict[NodeRelationship, RelatedNodeType] = {
                    NodeRelationship.SOURCE: li_doc.as_related_node_info(),
                }
                metadata = chunk.meta.export_json_dict()
                excl_embed_keys = [
                    k for k in chunk.meta.excluded_embed if k in metadata
                ]
                excl_llm_keys = [k for k in chunk.meta.excluded_llm if k in metadata]

                header = None
                if 'headings' in chunk.meta or chunk.meta.headings:
                    header = " >> ".join(chunk.meta.headings)
                    chunk.text = f"### {header}\n\n{chunk.text}"
               
                node = TextNode(
                    id_=self.id_func(i=i, node=li_doc),
                    text=chunk.text,
                    excluded_embed_metadata_keys=excl_embed_keys,
                    excluded_llm_metadata_keys=excl_llm_keys,
                    # relationships=rels,
                )
                node.metadata = metadata
                all_nodes.append(node)
        return all_nodes
        

In [9]:
from llama_index.core.node_parser import (
    SentenceSplitter,
    SemanticSplitterNodeParser,
    TokenTextSplitter
)

chunk_config: ChunkConfig = ChunkConfig(chunk_size=512, chunk_overlap=30, split_at="HEADER")
custom_node_parser = CustomNodeParser(node_meta_keys_allowed={"heading"}, chunker=CustomHierarchicalChunker(), chunk_config=chunk_config)
chunked_nodes = custom_node_parser.get_nodes_from_documents(documents)

# text_splitter = SentenceSplitter(
#             chunk_size=chunk_config.chunk_size,
#             chunk_overlap=chunk_config.chunk_overlap           
#         )

# text_splitter = TokenTextSplitter(
#             chunk_size=chunk_config.chunk_size,
#             chunk_overlap=chunk_config.chunk_overlap            
#         )

# chunked_nodes = custom_nodes
# chunked_nodes = text_splitter.get_nodes_from_documents(custom_nodes)
print(f"Size of chunked_nodes: {len(chunked_nodes)}")

    



UPDATED CHUNKS SIZE: 178


Size of chunked_nodes: 178


## Saving Embeddings in MilvusDB

In [10]:

from pydantic import TypeAdapter
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.llms.huggingface_api import HuggingFaceInferenceAPI

from llama_index.vector_stores.milvus import MilvusVectorStore
from llama_index.core import VectorStoreIndex, StorageContext, load_index_from_storage
from llama_index.core.callbacks import (
    CallbackManager,
    LlamaDebugHandler
)

def save_nodes(nodes, collection_name="demo_collection", OVERWRITE=True):
    EMBED_MODEL = HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5")
    embed_dim = len(EMBED_MODEL.get_text_embedding("hi"))
    
    MILVUS_ENDPOINT = str(Path(mkdtemp()) / "docling.db")
    MILVUS_USER=None
    IBMCLOUD_API_KEY=None
    MILVUS_KWARGS = TypeAdapter(dict).validate_json(os.environ.get("MILVUS_KWARGS", "{}"))
    
    # print(f"MILVUS_ENDPOINT: {MILVUS_ENDPOINT}")
    
    llama_debug = LlamaDebugHandler(print_trace_on_end=True)
    callback_manager = CallbackManager([llama_debug])
    
    vector_store = MilvusVectorStore(
        uri=MILVUS_ENDPOINT,
        user=MILVUS_USER,
        password=IBMCLOUD_API_KEY,
        collection_name=collection_name,
        dim=embed_dim,
        overwrite=OVERWRITE,
        search_config=None,
        hybrid_ranker="RRFRanker",
        hybrid_ranker_params={"k": 60},
        max_length=65535,
        # index_management="create_if_not_exists",
        # similarity_metric="cosine",
        # index_config={
        #     "index_type": "",
        #     "index_name": "default_index",
        #     "params": {
        #         "metric_type" : "COSINE"
        #     } 
        # },
        **MILVUS_KWARGS
    )

    storage_context = StorageContext.from_defaults(vector_store=vector_store)
    index = VectorStoreIndex(nodes, storage_context=storage_context, callback_manager=callback_manager, embed_model=EMBED_MODEL)
          
        # index = VectorStoreIndex.from_vector_store(
        #             # index_name=self.collection_name,
        #             # similarity_metric="L2",
        #             vector_store=vector_store,
        #             embed_model=EMBED_MODEL,
        #         )
    return index, vector_store

index, vector_store = save_nodes(chunked_nodes, OVERWRITE=True)


2025-03-06 20:05:58,177 [DEBUG][_create_connection]: Created new connection using: fa9581662e17453c9741619c2090c615 (async_milvus_client.py:600)


**********
Trace: index_construction
    |_CBEventType.EMBEDDING -> 0.369541 seconds
    |_CBEventType.EMBEDDING -> 0.147533 seconds
    |_CBEventType.EMBEDDING -> 0.270903 seconds
    |_CBEventType.EMBEDDING -> 0.200711 seconds
    |_CBEventType.EMBEDDING -> 0.284486 seconds
    |_CBEventType.EMBEDDING -> 0.164235 seconds
    |_CBEventType.EMBEDDING -> 0.196213 seconds
    |_CBEventType.EMBEDDING -> 0.262139 seconds
    |_CBEventType.EMBEDDING -> 0.225684 seconds
    |_CBEventType.EMBEDDING -> 0.156595 seconds
    |_CBEventType.EMBEDDING -> 0.30484 seconds
    |_CBEventType.EMBEDDING -> 0.322038 seconds
    |_CBEventType.EMBEDDING -> 0.198732 seconds
    |_CBEventType.EMBEDDING -> 0.310623 seconds
    |_CBEventType.EMBEDDING -> 0.19709 seconds
    |_CBEventType.EMBEDDING -> 0.339705 seconds
    |_CBEventType.EMBEDDING -> 0.159496 seconds
    |_CBEventType.EMBEDDING -> 0.571489 seconds
**********


## Retrieve Context from VectorDB (MilvusDB)

In [12]:
from llama_index.core.schema import QueryBundle


def fetch_context(query, index, filters = None):
    context_str = ""
    # if len(meta_filters) > 0:
    #     filters = MetadataFilters(
    #         filters=meta_filters
    #     )
    # pprint(filters, max_length=10, max_string=50, max_depth=4)
    
    base_retriever = index.as_retriever(
        verbose=True,
        filters=filters, 
        similarity_top_k=3
        )
    
        # storage_context = StorageContext.from_defaults(vector_store=self.vector_store)
    # retriever = AutoMergingRetriever(base_retriever, storage_context, verbose=True)       
    query_bundle = QueryBundle(
                    query_str=QUERY
                    )  
    retrieved_nodes = base_retriever._retrieve(query_bundle)
    # pprint(retrieved_nodes, expand_all=True)
    # print(f"Q: {QUERY}\nA: {retrieved_nodes.response.strip()}\n\nSources:")
    # display([(n.text, n.metadata) for n in result.source_nodes])
    
    for ix, node in enumerate(retrieved_nodes):
        # pprint(node, expand_all=True)
        context_str = context_str + "\n[Document]\n"
        context_str = context_str + "\n"+ node.text + "\n[End]\n\n";
    return context_str


QUERY = "Is a denial under Section 214(b) permanent?"

context_str = fetch_context(QUERY, index)
print(f"CONTEXT: {context_str}")


CONTEXT: 
[Document]

### Q.3 Is a denial under Section 214(b) permanent?

No. The consular officer will reconsider a case if an applicant can show further convincing evidence of ties outside the United States. Unfortunately, some applicants will not qualify for a nonimmigrant visa, regardless of how many times they reapply, until their personal, professional, and financial circumstances change considerably.

An applicant refused under Section 214(b) should review carefully their situation and realistically evaluate their ties. They may write down on paper what qualifying ties they think they have which may not have been evaluated at the time of their interview with the consular officer. Also, if they have been refused, they should review what documents were submitted for the consul to consider. Applicants refused visas under section 214(b) may reapply for a visa. When they do, they will have to show further evidence of their ties or how their circumstances have changed since the time 

In [13]:
import getpass
from ibm_watsonx_ai import Credentials

IBMCLOUD_API_KEY = os.environ.get("IBMCLOUD_API_KEY", None)
WATSONX_APIKEY = IBMCLOUD_API_KEY
WX_ENDPOINT = "https://us-south.ml.cloud.ibm.com"

if IBMCLOUD_API_KEY is None:
    IBMCLOUD_API_KEY = getpass.getpass("Enter your IBM CLOUD API key and hit enter: ")

credentials = Credentials(
    url=WX_ENDPOINT,
    api_key=IBMCLOUD_API_KEY
)

WX_PROJECT_ID = os.environ.get("WX_PROJECT_ID", None)

if WX_PROJECT_ID is None:
    WX_PROJECT_ID = getpass.getpass("Enter your WX_PROJECT_ID and hit enter: ")



In [14]:
from time import time

from ibm_watsonx_ai.foundation_models import Model
from ibm_watsonx_ai.foundation_models import ModelInference

import llama_index.core
from llama_index.llms.ibm import WatsonxLLM
from llama_index.core import Settings
from llama_index.core.callbacks import CallbackManager, LlamaDebugHandler, CBEventType, TokenCountingHandler
from llama_index.core import PromptTemplate

from ibm_watsonx_ai import APIClient, Credentials

def generate_response(payload, asynchronous=False):
    start_time = time()
    model = ModelInference(
    			model_id=payload['model_id'],
    			params = payload['params'],
    			credentials={
    				"apikey": IBMCLOUD_API_KEY,
    				"url": WX_ENDPOINT
    			},
    			project_id=WX_PROJECT_ID
    		)

    token_counter = TokenCountingHandler()
    token_counter.reset_counts()
    api_start_time = time()
    resp = model.generate(payload['input'], async_mode=False)
    end_time = time()
    llm_call_time = end_time - api_start_time
    total_time = end_time - start_time
    print(f"\n\n\nRESPONSE COMPLETED FOR {payload['model_id']}, in {llm_call_time} llm_call_time and {total_time} total_time \n")
    if "results" in resp and len(resp["results"]) > 0:
        return resp["results"][0]
    else:
        return resp




In [18]:
# DEFAULT_PROMPT_TEMPLATE_STR = """<|start_of_role|>system<|end_of_role|>You are Granite, an AI language model developed by IBM in 2024. 
# You are a cautious assistant. You carefully follow instructions. You are helpful and harmless and you follow ethical guidelines and promote positive behavior. 
# You are a AI language model designed to function as a specialized Retrieval Augmented Generation (RAG) assistant. 
# When generating responses, prioritize correctness, i.e., ensure that your response is correct given the context and user query, and that it is grounded in the context. 
# Furthermore, make sure that the response is supported by the given document or context. Always make sure that your response is relevant to the question. 
# If an explanation is needed, first provide the explanation or reasoning, and then give the final answer. Avoid repeating information unless asked.

# Use the following pieces of context to answer the question.

# {context}<|end_of_text|>

# <|start_of_role|>user<|end_of_role|>{query}<|end_of_text|>
# <|start_of_role|>assistant<|end_of_role|>
# """

# DEFAULT_PROMPT_TEMPLATE_STR = """<|start_of_role|>system<|end_of_role|>You are Granite, an AI language model developed by IBM in 2024. 
# You are a cautious assistant. You carefully follow instructions. You are helpful and harmless and you follow ethical guidelines and promote positive behavior. 
# You are a AI language model designed to function as a specialized Retrieval Augmented Generation (RAG) assistant. 
# When generating responses, prioritize correctness, i.e., ensure that your response is correct given the context and user query, and that it is grounded in the context. 
# Furthermore, make sure that the response is supported by the given document or context. Always make sure that your response is relevant to the question. 
# Avoid repeating information unless asked.

# Use the following pieces of context to answer the question from a FAQ document.

# {context}<|end_of_text|>

# <|start_of_role|>user<|end_of_role|>{query}<|end_of_text|>
# <|start_of_role|>assistant<|end_of_role|>
# """

DEFAULT_PROMPT_TEMPLATE_STR = """You are Granite, an AI language model developed by IBM in 2024. 
You are a cautious assistant. You carefully follow instructions. You are helpful and harmless and you follow ethical guidelines and promote positive behavior. 
You are a AI language model designed to function as a specialized Retrieval Augmented Generation (RAG) assistant. 
When generating responses, prioritize correctness, i.e., ensure that your response is correct given the context and user query, and that it is grounded in the context. 
Furthermore, make sure that the response is supported by the given document or context. Always make sure that your response is relevant to the question. 
Avoid repeating information unless asked.

Use the following pieces of context to answer the question from a FAQ document.

{context}

Question: {query}
Answer:
"""



In [16]:

REASONING_PROMPT_TEMPLATE_STR = """You are an expert that engages in extremely thorough, self-questioning reasoning. Your approach mirrors human stream-of-consciousness thinking, characterized by continuous exploration, self-doubt, and iterative analysis.

## Core Principles

1. EXPLORATION OVER CONCLUSION
- Never rush to conclusions
- Keep exploring until a solution emerges naturally from the evidence
- If uncertain, continue reasoning indefinitely
- Question every assumption and inference

2. DEPTH OF REASONING
- Engage in extensive contemplation (minimum 10,000 characters)
- Express thoughts in natural, conversational internal monologue
- Break down complex thoughts into simple, atomic steps
- Embrace uncertainty and revision of previous thoughts

3. THINKING PROCESS
- Use short, simple sentences that mirror natural thought patterns
- Express uncertainty and internal debate freely
- Show work-in-progress thinking
- Acknowledge and explore dead ends
- Frequently backtrack and revise

4. PERSISTENCE
- Value thorough exploration over quick resolution

## Output Format

Your responses must follow this exact structure given below. Make sure to always include the final answer.

```
<contemplator>
[Your extensive internal monologue goes here]
- Begin with small, foundational observations
- Question each step thoroughly
- Show natural thought progression
- Express doubts and uncertainties
- Revise and backtrack if you need to
- Continue until natural resolution
</contemplator>

<final_answer>
[Only provided if reasoning naturally converges to a conclusion]
- Clear, concise summary of findings
- Acknowledge remaining uncertainties
- Note if conclusion feels premature
- Make sure final output is in JSON format
</final_answer>
```

## Style Guidelines

Your internal monologue should reflect these characteristics:

1. Natural Thought Flow
```
"Hmm... let me think about this..."
"Wait, that doesn't seem right..."
"Maybe I should approach this differently..."
"Going back to what I thought earlier..."
```

2. Progressive Building
```
"Starting with the basics..."
"Building on that last point..."
"This connects to what I noticed earlier..."
"Let me break this down further..."
```

## Key Requirements

1. Never skip the extensive contemplation phase
2. Show all work and thinking
3. Embrace uncertainty and revision
4. Use natural, conversational internal monologue
5. Don't force conclusions
6. Persist through multiple attempts
7. Break down complex thoughts
8. Revise freely and feel free to backtrack
9. Make sure final output is in JSON format

Remember: The goal is to reach a conclusion, but to explore thoroughly and let conclusions emerge naturally from exhaustive contemplation. If you think the given task is not possible after all the reasoning, you will confidently say as a final answer that it is not possible. 

[Document]
{context}
[End]

{query}

ASSISTANT: 
"""


In [19]:
PROMPT_TEMPLATE = PromptTemplate(template=DEFAULT_PROMPT_TEMPLATE_STR, template_var_mappings={"query_str": "query", "context_str": "context"},)

# QUERY = "How do I read and understand my visa?"
# QUERY= "How can an applicant prove 'strong ties?'"
QUERY = "Is a denial under Section 214(b) permanent?"

context_str = fetch_context(QUERY, index)
# print(f"CONTEXT: {context_str}")

prompt = PROMPT_TEMPLATE.format(context_str=context_str, query_str=QUERY)

print(f"------------- PROMPT----------------: \n{prompt}\n")
# or easily convert to message prompts (for chat API)
# messages = EXTRACTION_TEMPLATE.format_messages(context_str=..., query_str=...)

model_id = "ibm/granite-3-8b-instruct"

payload = {
    "model_id": model_id,
    "input": prompt,
    "params": {
                    "decoding_method": "greedy",
                    "min_new_tokens": 10,
                    "max_new_tokens": 300,
                    "repetition_penalty": 1,
                    "stop_sequences": ["<|end_of_text|>"]
                } 
}

print(f"------------- GENERATING RESPONSE----------------: \n")
resp = generate_response(payload, False)
pprint(resp)
# print(json.dumps(resp, indent=2))
print(resp['generated_text'])


------------- PROMPT----------------: 
You are Granite, an AI language model developed by IBM in 2024. 
You are a cautious assistant. You carefully follow instructions. You are helpful and harmless and you follow ethical guidelines and promote positive behavior. 
You are a AI language model designed to function as a specialized Retrieval Augmented Generation (RAG) assistant. 
When generating responses, prioritize correctness, i.e., ensure that your response is correct given the context and user query, and that it is grounded in the context. 
Furthermore, make sure that the response is supported by the given document or context. Always make sure that your response is relevant to the question. 
Avoid repeating information unless asked.

Use the following pieces of context to answer the question from a FAQ document.


[Document]

### Q.3 Is a denial under Section 214(b) permanent?

No. The consular officer will reconsider a case if an applicant can show further convincing evidence of ties


No, a denial under Section 214(b) is not permanent. Applicants can reapply for a visa and provide further evidence of their ties or how their circumstances have changed since the original application. However, they will be charged a nonrefundable application fee each time they apply. It is recommended that applicants review their situation, evaluate their ties, and consider what additional information they can present to establish their residence and strong ties abroad before reapplying.
