In [1]:
import json
import os
from typing import List, Dict
from tqdm import tqdm

import chromadb
from chromadb.config import Settings

from sentence_transformers import SentenceTransformer
import re
from bs4 import BeautifulSoup
from bs4.element import Tag


In [2]:

with open('image_extracted_gpu.json', 'r', encoding = 'utf-8') as json_file:
    pages = json.load(json_file)

len(pages)


402

In [3]:
def flatten_text_chunk(chunk):
    return "\n".join(
        f"{b['type'].upper()}: {b['content']}"
        for b in chunk["data_chunk"]
    )


In [4]:
text_chunks = []
table_chunks = []

for page in pages:
    blocks = page['blocks']

    temp_text_chunk = []
    temp_table_chunk = []
    page_number = None
    has_table = False

    for block in blocks:
        btype = block.get('type')
        content = block.get('content')

        if not content:
            continue

        
        if btype == 'page_number':
            page_number = content
            temp_text_chunk.append({
                "type": btype,
                "content": content
            })
            continue

        
        if btype in ('header', 'title', 'text', 'list'):
            temp_text_chunk.append({
                "type": btype,
                "content": content
            })

        
        elif btype == 'table_caption':
            temp_table_chunk.append({
                "type": btype,
                "content": content
            })

        
        elif btype == 'table':
            has_table = True
            temp_table_chunk.append({
                "type": btype,
                "content": content
            })

   
    if has_table and page_number:
        temp_table_chunk.insert(0, {
            "type": "page_number",
            "content": page_number
        })

    
    if temp_text_chunk:
        text_chunks.append({
            "image_name": page["image"],
            "data_chunk": temp_text_chunk
        })

    if has_table:
        table_chunks.append({
            "image_name": page["image"],
            "data_chunk": temp_table_chunk
        })


In [6]:
def flatten_table_chunk(chunk):
    lines = []
    raw_tables = []

    for b in chunk["data_chunk"]:
        if b["type"] == "page_number":
            lines.append(f"Page: {b['content']}")

        elif b["type"] == "table_caption":
            lines.append(f"Table caption: {b['content']}")

        elif b["type"] == "table":
            raw_tables.append(b["content"])  # ðŸ‘ˆ keep HTML

            soup = BeautifulSoup(b["content"], "html.parser")
            for row in soup.find_all("tr"):
                cells = [c.get_text(strip=True) for c in row.find_all(["td", "th"])]
                if cells:
                    lines.append(" | ".join(cells))

    return {
        "embed_text": "\n".join(lines),
        "html": raw_tables
    }


In [7]:
def extract_page_number(chunk):
    for b in chunk["data_chunk"]:
        if b["type"] == "page_number" and b["content"]:
            return b["content"]
    return None


In [8]:
def extract_html_tables(chunk):
    return [
        b["content"]
        for b in chunk["data_chunk"]
        if b["type"] == "table" and b["content"]
    ]


In [9]:
from collections import defaultdict

page_to_tables = defaultdict(list)

for c in table_chunks:
    page_no = extract_page_number(c)
    if page_no is None:
        continue

    flattened = flatten_table_chunk(c)
    page_to_tables[page_no].append(flattened["embed_text"])


In [25]:
def serialize_html_tables(html_tables):
    if not html_tables:
        return ""
    return "\n\n".join(html_tables)


In [10]:
text_docs = []
text_metas = []

for c in text_chunks:
    page_no = extract_page_number(c)
    if page_no is None:
        continue

    text_content = flatten_text_chunk(c)

    # add table context if present
    if page_no in page_to_tables:
        text_content += "\n\nTABLE CONTEXT:\n"
        text_content += "\n".join(page_to_tables[page_no])

    text_docs.append(text_content)
    text_metas.append({
        "page": page_no,
        "image": c["image_name"],
        "type": "text"
    })


In [26]:
table_docs = []
table_metas = []

for c in table_chunks:
    page_no = extract_page_number(c)
    if page_no is None:
        continue

    flattened = flatten_table_chunk(c)
    html_tables = extract_html_tables(c)

    table_docs.append(flattened["embed_text"])
    table_metas.append({
        "page": page_no,
        "image": c["image_name"],
        "type": "table",

        # semantic
        "flattened_table_text": flattened["embed_text"],

        # raw html (SERIALIZED STRING)
        "html_table": serialize_html_tables(html_tables)
    })


In [12]:
# text_chunks = []

# for page in pages:
#     blocks = page["blocks"]
#     current_chunk = []

#     for block in blocks:
#         if block["type"] in ("title", "header"):
#             if current_chunk:
#                 text_chunks.append({
#                     "content": " ".join(current_chunk),
#                     "page": page["image"],
#                     "type": "text"
#                 })
#                 current_chunk = []

#         if block["type"] in ("text", "list"):
#             if isinstance(block["content"], str):
#                 current_chunk.append(block["content"])

#         if block["type"] == "table":
#             # stop text chunk before table
#             if current_chunk:
#                 text_chunks.append({
#                     "content": " ".join(current_chunk),
#                     "page": page["image"],
#                     "type": "text"
#                 })
#                 current_chunk = []

#     if current_chunk:
#         text_chunks.append({
#             "content": " ".join(current_chunk),
#             "page": page["image"],
#             "type": "text"
#         })


In [13]:
# text_chunks[32]

In [14]:
# def row_to_facts(row, headers, table_caption):
#     facts = []

#     for col, val in row.items():
#         if val:
#             facts.append(f"{col}: {val}")

#     return (
#         f"In table '{table_caption}', "
#         f"the row contains the following values: "
#         f"{'; '.join(facts)}."
#     )


In [15]:
# table_chunks = []
# table_row_chunks = []

# for page in pages:
#     blocks = page["blocks"]

#     for i, block in enumerate(blocks):
#         if block["type"] != "table":
#             continue

#         # --- caption ---
#         caption = ""
#         for j in range(i - 1, max(i - 6, -1), -1):
#             if blocks[j]["type"] == "table_caption":
#                 caption = blocks[j]["content"]
#                 break
#         if not caption:
#             for j in range(i + 1, min(i + 6, len(blocks))):
#                 if blocks[j]["type"] == "table_caption":
#                     caption = blocks[j]["content"]
#                     break

#         # --- context above ---
#         context_text = []
#         for j in range(i - 1, max(i - 6, -1), -1):
#             if blocks[j]["type"] == "text":
#                 ct = blocks[j].get("content")
#                 if isinstance(ct, str) and ct.strip():
#                     context_text.append(ct.strip())
#         context_text = context_text[::-1]

#         # --- parse rows ---
#         rows = parse_table_html(block["content"])
#         row_facts = []

#         for r in rows:
#             if "A_PDU parameter" in r and "Cvt" in r:
#                 fact = (
#                     f"Parameter {r['A_PDU parameter']} "
#                     f"({r.get('Parameter Name','')}) "
#                     f"has Cvt value {r['Cvt']} "
#                     f"and byte value {r.get('Byte value','')} "
#                     f"in {caption}."
#                 )
#                 row_facts.append(fact)

#                 # row-level index (IMPORTANT)
#                 table_row_chunks.append({
#                     "content": fact,
#                     "page": page["image"],
#                     "parameter": r["A_PDU parameter"],
#                     "table": caption
#                 })

#         # table-level chunk (still useful)
#         embedding_text = f"""
#                             Context:
#                             {' '.join(context_text)}
                            
#                             Table Caption:
#                             {caption}
                            
#                             Row Facts:
#                             {' '.join(row_facts)}
#                             """

#         table_chunks.append({
#             "content": embedding_text.strip(),
#             "page": page["image"],
#             "type": "table"
#         })


In [16]:
# table_row_chunks

In [17]:
# table_chunks

In [18]:
embedder = SentenceTransformer("./bge-large-en")


In [19]:
# chroma_client = chromadb.Client(
#     Settings(persist_directory="./chroma_phase_type", anonymized_telemetry=False)
# )

# text_collection = chroma_client.create_collection("text_index1")
# table_collection = chroma_client.create_collection("table_index1")


In [20]:
import chromadb
from chromadb.config import Settings

client = chromadb.Client(
    Settings(
        persist_directory="./chroma_mineru_fixed",
        anonymized_telemetry=False
    )
)

text_col = client.create_collection("text_index")
table_col = client.create_collection("table_index")


In [21]:
text_embeddings = embedder.encode(
    text_docs,                     # list[str]
    show_progress_bar=True,
    normalize_embeddings=True
)


Batches:   0%|          | 0/13 [00:00<?, ?it/s]

In [22]:
text_col.add(
    documents=text_docs,
    embeddings=text_embeddings,
    ids=[f"text_{i}" for i in range(len(text_docs))],
    metadatas=text_metas            # page, image, type
)


In [23]:
table_embeddings = embedder.encode(
    table_docs,                    # flattened table text
    show_progress_bar=True,
    normalize_embeddings=True
)


Batches:   0%|          | 0/10 [00:00<?, ?it/s]

In [28]:
table_col.add(
    documents=table_docs,
    embeddings=table_embeddings,
    ids=[f"table_{i}" for i in range(len(table_docs))],
    metadatas=table_metas           # page, image, type, html_tables
)


In [29]:
def embed_query(query: str):
    return embedder.encode(
        f"Represent this sentence for searching relevant passages: {query}",
        normalize_embeddings=True
    )


In [30]:
query = "Negative response A_PDU parameter TA cvt value"
q_emb = embed_query(query)


In [31]:
table_hits = table_col.query(
    query_embeddings=[q_emb],
    n_results=5,
    include=["documents", "metadatas", "distances"]
)

text_hits = text_col.query(
    query_embeddings=[q_emb],
    n_results=5,
    include=["documents", "metadatas", "distances"]
)


In [32]:
table_hits

{'ids': [['table_2', 'table_11', 'table_81', 'table_301', 'table_226']],
 'embeddings': None,
 'documents': [['Page: 18\nTable caption: Table 3 â€” Negative response A_PDU\nA_PDU parameter | Parameter Name | Cvt | Byte value | Mnemonic\nSA | Source Address | M | 0xXXXX | SA\nTA | Target Address | M | 0xXXXX | TA\nTAtype | Target Address type | M | 0xXX | TAT\nRA | Remote Address (optional) | C | 0xXXXX | RA\nA_Data.A_PCI.NR_SI | Negative Response SID | M | 0x7F | SIDNR\nA_Data.A_PCI.SI | <Service Name> Request SID | M | 0xXX | SIDRQ\nA_Data.Parameters 1 | responseCode | M | 0xXX | NRC_\nM (Mandatory): In case the negative response A_PDU is issued then those A_PDU parameters shall be present. \nC (Conditional): The RA (Remote Address) PDU parameter is only present in case of remote addressing.\nAbbreviation | Description\nsuppressPosRspMsgIndicationBit | TRUE = server shall NOT send a positive response message (exception see Annex A.1 in definition of NRC 0x78)\nFALSE = server shall sen

In [33]:
text_hits

{'ids': [['text_23', 'text_38', 'text_268', 'text_291', 'text_330']],
 'embeddings': None,
 'documents': [['HEADER: ISO 14229-1:2013(E)\nTITLE: 7.4 Negative response/confirmation service primitive\nTEXT: Each diagnostic service has a negative response/negative confirmation message specified with message A_Data bytes according to Table 3. The first A_Data byte (A_PCI.NR_SI) is always the specific negative response service identifier. The second A_Data byte (A_PCI.SI) shall be a copy of the service identifier value from the service request/indication message that the negative response message corresponds to.\nTEXT: The parameter responseCode is used in the negative response message to indicate why the diagnostic service failed or could not be completed in time. Values are defined in A.1.\nTITLE: 7.5 Server response implementation rules\nTITLE: 7.5.1 General definitions\nTEXT: The following subclauses specify the behaviour of the server when executing a service. The server and the client 