In [2]:
## Set up local
from enum import Enum
from typing import Optional

from pydantic import BaseModel


class LayoutItemLabel(str, Enum):

    # Surya
    BLANK = 'Blank'
    TEXT = 'Text'
    TEXTINLINEMATH = 'TextInlineMath'
    CODE = 'Code'
    SECTIONHEADER = 'SectionHeader'
    CAPTION = 'Caption'
    FOOTNOTE = 'Footnote'
    EQUATION = 'Equation'
    LISTITEM = 'ListItem'
    PAGEFOOTER = 'PageFooter'
    PAGEHEADER = 'PageHeader'
    PICTURE = 'Picture'
    FIGURE = 'Figure'
    TABLE = 'Table'
    FORM = 'Form'
    TABLEOFCONTENTS = 'TableOfContents'
    HANDWRITING = 'Handwriting'

    def __str__(self):
        return str(self.value)


class Cell(BaseModel):
    id: int
    text: Optional[str]
    bbox: tuple[float | int, float | int, float | int, float | int]


class ParsedPage(BaseModel):
    page_no: int
    cells: list[Cell]
    size: Optional[tuple[float, float]] = None


class Cluster(BaseModel):
    id: int
    label: LayoutItemLabel
    bbox: list[float]
    confidence: float = 1.0
    cells: list[Cell] = []
    content_text: Optional[str] = None
    content_raw: Optional[str] = None  # image and table is base64
    reading_order: str
    page_no: int
    next_id: Optional[int] = None

In [3]:
# Set up inputs
import json

with open("concat_clusters_output_ga.json", "r") as f:
    data = json.load(f)

clusters: list[Cluster]= []
for cluster in data["clusters"]:
    current_cluster = Cluster(
        id=cluster["id"],
        label=cluster["label"],
        bbox=cluster["bbox"],
        confidence=cluster["confidence"],
        cells=cluster["cells"],
        content_text=cluster["content_text"],
        content_raw=cluster["content_raw"],
        reading_order=cluster["reading_order"],
        page_no=cluster["page_no"],
        next_id=cluster["next_id"],
    )
    clusters.append(current_cluster)

print(len(data["clusters"]))
print(len(clusters))

26
26


In [4]:
## Convert from cluster to chunk
list_chunks: list[str] = []
for cluster in clusters:
    list_chunks.append(cluster.content_text)
print(len(list_chunks))

26


In [5]:
## Build simple sematic split
from llama_index.core.node_parser import (
    SentenceSplitter,
    SemanticSplitterNodeParser,
)
from llama_index.embeddings.openai import OpenAIEmbedding

import os

os.environ["OPENAI_API_KEY"] = ""

In [6]:
embed_model = OpenAIEmbedding()
splitter = SemanticSplitterNodeParser(
    buffer_size=1, breakpoint_percentile_threshold=95, embed_model=embed_model
)

# also baseline splitter
base_splitter = SentenceSplitter(chunk_size=600)

In [7]:
from typing import Any, List, Optional, Sequence, TypedDict
from typing_extensions import Annotated

import numpy as np
from llama_index.core.base.embeddings.base import BaseEmbedding
from llama_index.core.bridge.pydantic import Field, SerializeAsAny
from llama_index.core.callbacks.base import CallbackManager
from llama_index.core.node_parser import NodeParser
from llama_index.core.node_parser.node_utils import build_nodes_from_splits, default_id_func
from llama_index.core.schema import BaseNode, Document

class ChunkCombination(TypedDict):
    chunk: str
    index: int
    combined_chunk: str
    combined_chunk_embedding: List[float]

class ChunkSemanticSplitterNodeParser(NodeParser):
    """Semantic node parser that works with pre-split chunks instead of sentences.
    
    Args:
        buffer_size (int): number of chunks to group together when evaluating semantic similarity
        embed_model (BaseEmbedding): embedding model to use
        include_metadata (bool): whether to include metadata in nodes
        include_prev_next_rel (bool): whether to include prev/next relationships
    """

    embed_model: SerializeAsAny[BaseEmbedding] = Field(
        description="The embedding model to use for semantic comparison",
    )

    buffer_size: int = Field(
        default=1,
        description=(
            "The number of chunks to group together when evaluating semantic similarity. "
            "Set to 1 to consider each chunk individually. "
            "Set to >1 to group chunks together."
        ),
    )

    breakpoint_percentile_threshold: int = Field(
        default=95,
        description=(
            "The percentile of cosine dissimilarity that must be exceeded between a "
            "group of chunks and the next to form a node. The smaller this "
            "number is, the more nodes will be generated"
        ),
    )

    def _parse_nodes(
        self,
        nodes: Sequence[BaseNode],
        show_progress: bool = False,
        **kwargs: Any,
    ) -> List[BaseNode]:
        """Implementation of abstract method from NodeParser."""
        all_nodes: List[BaseNode] = []
        for node in nodes:
            # Convert each node's text into chunks and process them
            chunks = [node.text]  # Or implement your own chunking logic here
            processed_nodes = self.parse_chunks(chunks, show_progress)
            all_nodes.extend(processed_nodes)
        return all_nodes

    @classmethod
    def from_defaults(
        cls,
        embed_model: Optional[BaseEmbedding] = None,
        breakpoint_percentile_threshold: Optional[int] = 95,
        buffer_size: Optional[int] = 1,
        include_metadata: bool = True,
        include_prev_next_rel: bool = True,
        callback_manager: Optional[CallbackManager] = None,
        id_func: Optional[callable] = None,
    ) -> "ChunkSemanticSplitterNodeParser":
        callback_manager = callback_manager or CallbackManager([])

        if embed_model is None:
            try:
                from llama_index.embeddings.openai import OpenAIEmbedding
                embed_model = OpenAIEmbedding()
            except ImportError:
                raise ImportError(
                    "`llama-index-embeddings-openai` package not found, "
                    "please run `pip install llama-index-embeddings-openai`"
                )

        id_func = id_func or default_id_func

        return cls(
            embed_model=embed_model,
            breakpoint_percentile_threshold=breakpoint_percentile_threshold,
            buffer_size=buffer_size,
            include_metadata=include_metadata,
            include_prev_next_rel=include_prev_next_rel,
            callback_manager=callback_manager,
            id_func=id_func,
        )

    def parse_chunks(
        self,
        chunks: List[str],
        show_progress: bool = False,
    ) -> List[BaseNode]:
        """Parse pre-split chunks into semantically related nodes."""
        # Create a dummy document to hold the chunks
        doc = Document(text="".join(chunks))
        
        chunk_groups = self._build_chunk_groups(chunks)
        
        # Get embeddings for chunk groups
        combined_chunk_embeddings = self.embed_model.get_text_embedding_batch(
            [g["combined_chunk"] for g in chunk_groups],
            show_progress=show_progress,
        )
        
        for i, embedding in enumerate(combined_chunk_embeddings):
            chunk_groups[i]["combined_chunk_embedding"] = embedding

        # Calculate semantic distances between groups
        distances = self._calculate_distances_between_chunk_groups(chunk_groups)
        
        # Build final chunks based on semantic similarity
        final_chunks = self._build_node_chunks(chunk_groups, distances)
        
        # Create nodes from the chunks
        nodes = build_nodes_from_splits(
            final_chunks,
            doc,
            id_func=self.id_func,
        )
        
        return nodes

    def _build_chunk_groups(self, chunks: List[str]) -> List[ChunkCombination]:
        """Build groups of chunks based on buffer size."""
        chunk_groups: List[ChunkCombination] = [
            {
                "chunk": x,
                "index": i,
                "combined_chunk": "",
                "combined_chunk_embedding": [],
            }
            for i, x in enumerate(chunks)
        ]

        # Group chunks together based on buffer size
        for i in range(len(chunk_groups)):
            combined_chunk = ""
            
            # Add previous chunks based on buffer size
            for j in range(i - self.buffer_size, i):
                if j >= 0:
                    combined_chunk += chunk_groups[j]["chunk"] + " "
            
            # Add current chunk
            combined_chunk += chunk_groups[i]["chunk"]
            
            # Add next chunks based on buffer size
            for j in range(i + 1, i + 1 + self.buffer_size):
                if j < len(chunk_groups):
                    combined_chunk += " " + chunk_groups[j]["chunk"]
            
            chunk_groups[i]["combined_chunk"] = combined_chunk.strip()

        return chunk_groups

    def _calculate_distances_between_chunk_groups(
        self, chunk_groups: List[ChunkCombination]
    ) -> List[float]:
        """Calculate semantic distances between consecutive chunk groups."""
        distances = []
        for i in range(len(chunk_groups) - 1):
            embedding_current = chunk_groups[i]["combined_chunk_embedding"]
            embedding_next = chunk_groups[i + 1]["combined_chunk_embedding"]
            
            similarity = self.embed_model.similarity(embedding_current, embedding_next)
            distance = 1 - similarity
            
            distances.append(distance)
            
        return distances

    def _build_node_chunks(
        self, chunk_groups: List[ChunkCombination], distances: List[float]
    ) -> List[str]:
        """Build final chunks based on semantic similarity breakpoints."""
        final_chunks = []
        
        if len(distances) > 0:
            breakpoint_distance_threshold = np.percentile(
                distances, self.breakpoint_percentile_threshold
            )

            indices_above_threshold = [
                i for i, x in enumerate(distances) if x > breakpoint_distance_threshold
            ]

            # Group chunks based on semantic breakpoints
            start_index = 0
            for index in indices_above_threshold:
                group = chunk_groups[start_index:index + 1]
                combined_text = " ".join([g["chunk"] for g in group])
                final_chunks.append(combined_text)
                start_index = index + 1

            # Add remaining chunks
            if start_index < len(chunk_groups):
                combined_text = " ".join(
                    [g["chunk"] for g in chunk_groups[start_index:]]
                )
                final_chunks.append(combined_text)
        else:
            # If only one chunk or no distances calculated, return the original chunk
            final_chunks = [" ".join([g["chunk"] for g in chunk_groups])]

        return final_chunks

In [8]:
# Initialize parser
parser = ChunkSemanticSplitterNodeParser.from_defaults(
    embed_model=embed_model,
    buffer_size=1,  
    breakpoint_percentile_threshold=95 
)

# Parse chunks into semantic nodes
nodes = parser.parse_chunks(list_chunks)

In [9]:
print(len(nodes))

3


In [10]:
for i, node in enumerate(nodes):
    print(f"Value Node {i}")
    print(str(node.text) + "\n")

Value Node 0
I. NỘI DUNG QUẢN LÝ THIẾT BỊ 1. Người phụ trách 1.1. Văn phòng Hà Nội:
+ Thiết bị làm việc:
Ms. Trần Phương Thảo
Ms. Nguyễn Thị Thùy Linh C
+ Thiết bị văn phòng:
Ms. Nguyễn Thị Thanh Hoa B
1.2. Văn phòng Đà Nẵng: Ms. Hồ Thị Thu Hiền B
Ms. Lê Thị Đăng Phúc
1.3. Văn phòng Hồ Chí Minh:
Ms. Huỳnh Ngọc Thảo Vy 2. Tài liệu liên quan:

Value Node 1
Hệ thống ASSET quản lý thiết bị: https://asset.sun-asterisk.vn/ Hướng dẫn sử dụng ASSET: [S_Asset] Hướng dẫn sử dụng cho FCOV Manager và FCOV Staff.pdf Quy định về việc sử dụng tài sản và xử lý mất hỏng tài sản công ty: [SHN_2024] QUY ĐỊNH VỀ VIỆC SỬ DỤNG TÀI SẢN VÀ XỬ LÝ MẤT, HỎNG T… Tiêu chuẩn cấp phát thiết bị: [SHN_2024] TIÊU CHUẨN CẤP THIẾT BỊ 2024 .docx 3.Quy trình cấp phát, thu hồi thiết bị: 3.1 Quy trình cấp phát thiết bị: - Nhân viên mới: GA cấp phát máy theo Tiêu chuẩn thiết bị vào ngày onboard - Nhân viên nghỉ sinh/ không lương comeback: Nhân viên tạo request, GA cấp máy theo tiêu chuẩn/ dựa vào request - Đối với trường hợp 

### Using with LLMs 


In [18]:
# test prompt
import json 


def get_prompt(input):
    OUTPUT_FORMAT = {
        "output": [
            {
            "start_chunk_id": 1,
            "end_chunk_id": 4},
            {
            "start_chunk_id": 5,
            "end_chunk_id": 9},
            {
            "start_chunk_id": 10,
            "end_chunk_id": 16}
        ]
    }
    
    
    INPUT_EXAMPLE_FORMAT =[
        {
            "id": 1,
            "text": "Nguyễn Văn A đã ra mắt từ ngày 1/1/2000 và đang làm việc tại Sun*.",
            "layout": "content"
        },
        {
            "id": 2,
            "text": "Trinh Văn A đã làm việc tại Sun*.",
            "layout": "content"
        },
    ]
    
    SYSTEM_MESSAGE = """
    # 1. Instruction
    You are an expert in analyzing document
    Given a list of text chunk, your mission is concatenate or split list of these text chunk to create list of meaningful paragraph 
    """

    USER_MESSAGE = """
    # 2. Required rules
    - Given a list of text chunk for example: {ex_input}. 
    - Your mission is to split these list of text chunk into multiple text chunks (we will call it as cluster). Text combined in each cluster have a meaningful and compleled text content. 
    - You can split based on meaning of section content or title layout. For example title text chunk can be the first text chunk of cluster or list text chunk point to same meaning or content will be in one cluster.
    - Your output is specify list of start and end chunk id for each cluster.
    - You must ensure when combine all text in text chunks in one cluster (join with new line character), each cluster have a meaningful and compleled content, each cluster has at least 4 text chunks and 300 tokens and has max token is 600 tokens
    - Chunk id in output must not duplicated. The start chunk id of cluster is the adjacent number of end chunker id of its previous cluster. for example: end_chunk_id =4, and the next start chunk id = 5 
    
    # 3. Input:
    {input_json}

    # 4. Output format: JSON
    {output_format}
    
    # 5. Explain input:
    - id: id of the text chunk, and it is also reading order of this chunk. Reading order start from low to high
    - text: text content of this chunk
    - layout: layout information of this chunk.
    """
    
    return SYSTEM_MESSAGE, USER_MESSAGE.format(output_format=OUTPUT_FORMAT, ex_input=INPUT_EXAMPLE_FORMAT, input_json=input)

In [19]:
clusters_llm: list[dict] = []
for cluster in clusters:
    clusters_llm.append(
        {
            "id": cluster.id,
            "text": cluster.content_text,
            "layout": cluster.label
        }
    )

In [20]:
system_prompt, user_prompt = get_prompt(clusters_llm)

In [21]:
from openai import OpenAI
client = OpenAI()

completion = client.chat.completions.create(
    model="gpt-4o",
    messages=[
        {"role": "system", "content": system_prompt},
        {
            "role": "user",
            "content": user_prompt
        }
    ]
)

print(completion.choices[0].message.content)

```json
{
    "output": [
        {"start_chunk_id": 0, "end_chunk_id": 4},
        {"start_chunk_id": 5, "end_chunk_id": 10},
        {"start_chunk_id": 11, "end_chunk_id": 18},
        {"start_chunk_id": 19, "end_chunk_id": 23},
        {"start_chunk_id": 24, "end_chunk_id": 25}
    ]
}
```
