In [8]:
import os
from langchain_pymupdf4llm import PyMuPDF4LLMLoader
from langchain.text_splitter import MarkdownHeaderTextSplitter, RecursiveCharacterTextSplitter
from langchain_core.documents import Document
from typing import List, Sequence, Any
from db.db_connection_pool_using_pycopg2 import get_connection, release_connection, close_pool
from db.db_connection_pool import get_engine, get_conn
from pydantic import BaseModel, Field
from typing import List, Optional, Dict
from db.schema import Document_Chunk
from datetime import datetime
from zoneinfo import ZoneInfo
import logging
import asyncio
from sqlalchemy import text
from fastembed import TextEmbedding
from dotenv import load_dotenv
import tqdm

In [7]:
load_dotenv(override=True)

True

In [8]:
# convert pdf to markdown
# https://pymupdf.readthedocs.io/en/latest/pymupdf4llm/api.html#pymupdf4llm-api
FOLDER_PATH = r"C:\Users\aibag\git_repo\policy_wording"

FILE_NAME = "state-home-comprehensive-contents-comprehensive-insurance-policy-wording-si6995-2-1224.PDF"


In [9]:
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
)

logger = logging.getLogger(__name__)

current_nz_datetime = datetime.now(tz=ZoneInfo("Pacific/Auckland"))


In [10]:
# function to load pdf file and convert a pdf file to a markdown file
async def load_pdf_file(filepath:str, filename:str, mode:str ="single") -> List[Document]:

    full_path = os.path.join(filepath, filename)
    logger.info(f"Processing file: {full_path}")

    if not os.path.exists(full_path):
        raise FileNotFoundError(f"File not found: {full_path}")

    if not filename.lower().strip().endswith(".pdf"):
        raise TypeError ("Invalid File Type; only PDFs are allowed.")

    # custom pages_delimiter to identify where are ends of pages in single mode 
    # page = load each page as a Document object; single = load entire PDF as a single Document object
    doc_loader = PyMuPDF4LLMLoader(full_path 
                                    ,mode=mode
                                    ,pages_delimiter="<<-- PAGE BREAK -->>\n\n"
                                    ,table_strategy="lines_strict" # lines, text, lines_strict, lines_strict is default
                                    #,page_separators=True
                                   )
                            
    # lazy loading
    docs = []
    async for doc in doc_loader.alazy_load():
        docs.append(doc)

    logger.info(f"Successfully processed file: {filename}; Total Pages: {docs[0].metadata["total_pages"]}")

    return docs

doc_obj =  await load_pdf_file(FOLDER_PATH, FILE_NAME, mode="single")


2025-09-25 20:57:23,611 [INFO] __main__: Processing file: C:\Users\aibag\git_repo\policy_wording\state-home-comprehensive-contents-comprehensive-insurance-policy-wording-si6995-2-1224.PDF
2025-09-25 20:57:32,380 [INFO] __main__: Successfully processed file: state-home-comprehensive-contents-comprehensive-insurance-policy-wording-si6995-2-1224.PDF; Total Pages: 61


In [11]:
# functions to chunk/split a markdown file into chunks
def chunk_header_splitter(doc_contents)->list[Document]:

    headers_to_split_on = [
        ("#", "Header 1"),
        ("##", "Header 2"),
        ("###", "Header 3"),
        ('####', "Header 4"),
        ('#####', "Header 5"),
        ('######', "Header 6"),
        ('#######', "Header 7")
    ]

    markdown_splitter = MarkdownHeaderTextSplitter(headers_to_split_on, strip_headers=True)

    return markdown_splitter.split_text(doc_contents)

def chunk_header_recursivesplitter(doc_contents)->list:
    text_splitter = RecursiveCharacterTextSplitter(
        separators=[". ", "! ", "? ", "\n\n", "\n", " ", ""],
        chunk_size=2000,
        chunk_overlap=200,
    )
    chunks = text_splitter.split_text(doc_contents)
    return chunks

print(chunk_header_splitter(doc_obj[0].page_content))
#print(chunk_header_recursivesplitter(doc[0].page_content))

[Document(metadata={}, page_content='<<-- PAGE BREAK -->>'), Document(metadata={'Header 6': '**Thank you for choosing State Insurance**'}, page_content='This policy wording, along with your policy schedule, contains all the information you need\nto know about your insurance cover. Please read these carefully and keep them on hand as\nyou will find them useful if you need to make a claim.'), Document(metadata={'Header 6': '**How to contact us**'}, page_content='In New Zealand, just call 0800 80 24 24. If you have any questions, need help, or want to\n[make a claim, our contact centres are available 7 days a week or visit state.co.nz](https://www.state.co.nz/)  \nCall us free from Australia 1 800 887 863  \nUnited States 1 800 593 9482  \nUnited Kingdom 0800 096 5308  \nCall us direct from Somewhere else overseas 64 9 969 1150  \nOur promise to customers includes communicating clearly.  \nThis document meets the WriteMark quality award, independent  \nproof we have achieved a high standa

In [12]:
# pydantic model of the vector db
class DocumentChunk(BaseModel):
    embedding: List[float]
    chunk_text: str
    metadata: Optional[Dict[str, str]] = Field(default_factory=dict)
    file_name: str
    tags: Optional[List[str]] = Field(default_factory=list)
    isActive: bool
    version: Optional[str] = None
    created_at: Optional[datetime] = None
    updated_at: Optional[datetime] = None


In [None]:
def chunk_document(doc_obj)->list[str]:

    # get metadata from the doc object 
    source_path = doc_obj[0].metadata.get("source", "")
    file_name = source_path.split("\\")[-1] if source_path else "unknown"

    doc_metadata = {
        "source"        : doc_obj[0].metadata.get("source"),
        "file_name"     : file_name,
        "total_pages"   : str(doc_obj[0].metadata.get("total_pages")),
        "creation_date" : doc_obj[0].metadata.get("creationdate"),
    }

    try:
        chunks = chunk_header_splitter(doc_obj[0].page_content)
        logger.info(f"Chunk Size for document '{file_name}': {len(chunks)}")
    except Exception as e:
        logger.error(f"Call to chunk_header_splitter failed: {e}")    

    doc_chunks_list = []

    for chunk in chunks:

        # skip pages that only have 30 char or less
        if len(chunk.page_content) < 30: 
            continue 

        # additional metadata 
        chunk_metadata = doc_metadata.copy()

        header_key, header_value = next(iter(chunk.metadata.items()), (None, None))

        chunk_metadata["header_key"] = header_key 
        chunk_metadata["header_value"] = header_value.replace("*","").replace("#","")

        # create an instance of DocumentChunk
        doc_chunk = DocumentChunk(
            embedding = [],
            chunk_text = chunk.page_content,
            metadata = chunk_metadata,
            file_name = file_name,
            isActive = True,
            version = "1",
            tags = ["Home", "State"],
            created_at = current_nz_datetime,
            updated_at= current_nz_datetime,
        )

        doc_chunks_list.append(doc_chunk)

    return doc_chunks_list

#print(chunk_document(doc_obj))

In [None]:
# enrich each chunks 



In [None]:
# initialised embedding
#BAAI/bge-small-en-v1.5 : 384 dim; BAAI/bge-base-en-v1.5 : 768 dim; BAAI/bge-base-en-v1.5 : 768 dim; BAAI/bge-large-en-v1.5 : 1024 dim; 
embedding_model = TextEmbedding(model_name="BAAI/bge-base-en-v1.5")  


# Progress bar for chunk enrichment within the current file
with tqdm(total=len(doc_chunks), desc=f"Enriching Chunks", leave=False) as pbar_chunks:
    for chunk in doc_chunks:
        # Apply enrichment logic (e.g., embeddings, metadata)
        enrichment_data = enrich_chunk(chunk)
        if enrichment_data:
            # Identify if chunk is a table (has HTML representation)
            is_table = 'text_as_html' in chunk.metadata.to_dict()
            content = chunk.metadata.text_as_html if is_table else chunk.text

            # Store final enriched chunk data
            final_chunk_data = {
                'source': f"{os.path.basename(os.path.dirname(os.path.dirname(file_path)))}/"
                            f"{os.path.basename(os.path.dirname(file_path))}",
                'content': content,
                'is_table': is_table,
                **enrichment_data
            }
            all_enriched_chunks.append(final_chunk_data)

        # Update chunk-level progress bar
        pbar_chunks.update(1)

# Update file-level progress bar
pbar_files.update(1)


[array([-3.49265672e-02, -1.19090453e-03, -9.83221363e-03, -4.09996025e-02,
        4.52825474e-03, -2.77397037e-02,  3.44226919e-02,  2.90391743e-02,
        6.88453838e-02, -9.32833739e-03, -1.58323161e-02, -2.71164887e-02,
        1.02300104e-02,  7.26642366e-03,  3.52182873e-02,  2.30059214e-02,
        2.15208121e-02, -7.03304932e-02, -3.36005799e-02,  2.17992701e-02,
        3.75785492e-02, -3.81885022e-02,  6.40121521e-03, -9.27529763e-03,
        1.44267678e-02,  5.92385884e-03,  6.53712917e-03, -2.05263197e-02,
       -9.56038572e-03, -6.64055645e-02,  1.26764616e-02,  2.94634905e-02,
        4.39698175e-02, -1.10123446e-02,  1.22123649e-02,  4.63300794e-02,
       -6.47613332e-02,  1.01570813e-02, -4.18482348e-02,  1.18676072e-03,
        3.97266522e-02,  9.43441689e-03, -1.49836838e-02,  1.27029810e-02,
       -1.77019625e-03, -1.78345609e-02, -2.54457425e-02, -1.19007574e-02,
        2.47827470e-02,  8.77142232e-03, -4.03366052e-02, -5.44716567e-02,
       -3.58812809e-02, 

In [15]:
# Using SQLALCHEMY
# create the engine
db_engine = get_engine()

In [16]:
# test
async with get_conn(db_engine) as conn:
    stmt = text("SELECT 1")
    result = await conn.execute(stmt)
    print(result.scalar())


2025-09-25 20:57:32,639 INFO sqlalchemy.engine.Engine select pg_catalog.version()


2025-09-25 20:57:32,639 [INFO] sqlalchemy.engine.Engine: select pg_catalog.version()


2025-09-25 20:57:32,642 INFO sqlalchemy.engine.Engine [raw sql] ()


2025-09-25 20:57:32,642 [INFO] sqlalchemy.engine.Engine: [raw sql] ()


2025-09-25 20:57:32,644 INFO sqlalchemy.engine.Engine select current_schema()


2025-09-25 20:57:32,644 [INFO] sqlalchemy.engine.Engine: select current_schema()


2025-09-25 20:57:32,644 INFO sqlalchemy.engine.Engine [raw sql] ()


2025-09-25 20:57:32,644 [INFO] sqlalchemy.engine.Engine: [raw sql] ()


2025-09-25 20:57:32,651 INFO sqlalchemy.engine.Engine show standard_conforming_strings


2025-09-25 20:57:32,651 [INFO] sqlalchemy.engine.Engine: show standard_conforming_strings


2025-09-25 20:57:32,651 INFO sqlalchemy.engine.Engine [raw sql] ()


2025-09-25 20:57:32,651 [INFO] sqlalchemy.engine.Engine: [raw sql] ()
2025-09-25 20:57:32,656 [INFO] db.db_connection_pool: DB connection opened


2025-09-25 20:57:32,657 INFO sqlalchemy.engine.Engine BEGIN (implicit)


2025-09-25 20:57:32,657 [INFO] sqlalchemy.engine.Engine: BEGIN (implicit)


2025-09-25 20:57:32,658 INFO sqlalchemy.engine.Engine SELECT 1


2025-09-25 20:57:32,658 [INFO] sqlalchemy.engine.Engine: SELECT 1


2025-09-25 20:57:32,659 INFO sqlalchemy.engine.Engine [generated in 0.00263s] ()


2025-09-25 20:57:32,659 [INFO] sqlalchemy.engine.Engine: [generated in 0.00263s] ()


1
2025-09-25 20:57:32,663 INFO sqlalchemy.engine.Engine ROLLBACK


2025-09-25 20:57:32,663 [INFO] sqlalchemy.engine.Engine: ROLLBACK
2025-09-25 20:57:32,665 [INFO] db.db_connection_pool: DB connection closed


In [None]:
# create schema and tables using sqlalchemy

async with get_conn(db_engine) as conn:
    # You can wrap everything in an explicit transaction if you want
    # an atomic create/commit block.
    # the transaction is automatically committed when the block exits
    async with conn.begin():   

        await conn.execute(text("SET search_path TO public, document;"))

        # Create schema
        await conn.execute(
            text("CREATE SCHEMA IF NOT EXISTS document;")
        )

        # Create table
        create_table_sql = """
            CREATE TABLE IF NOT EXISTS document.document_chunk (
                id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
                embedding       VECTOR(1536),
                chunk_text      TEXT,
                doc_metadata    JSONB,
                file_name       TEXT,
                doc_tags        TEXT[],
                isActive        BOOLEAN,
                version         TEXT,
                created_at      TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                updated_at      TIMESTAMP
            );
        """
        await conn.execute(text(create_table_sql))

        # Create table index
        create_index_sql = """
            CREATE INDEX IF NOT EXISTS documents_embedding_idx
            ON document.document_chunk
            USING ivfflat (embedding vector_l2_ops)
            WITH (lists = 100);
        """
        await conn.execute(text(create_index_sql))

    logger.info("Document schema initialisation finished")

2025-09-25 20:57:37,324 [INFO] db.db_connection_pool: DB connection opened


2025-09-25 20:57:37,325 INFO sqlalchemy.engine.Engine BEGIN (implicit)


2025-09-25 20:57:37,325 [INFO] sqlalchemy.engine.Engine: BEGIN (implicit)


2025-09-25 20:57:37,326 INFO sqlalchemy.engine.Engine SET search_path TO public, document;


2025-09-25 20:57:37,326 [INFO] sqlalchemy.engine.Engine: SET search_path TO public, document;


2025-09-25 20:57:37,327 INFO sqlalchemy.engine.Engine [generated in 0.00090s] ()


2025-09-25 20:57:37,327 [INFO] sqlalchemy.engine.Engine: [generated in 0.00090s] ()


2025-09-25 20:57:37,332 INFO sqlalchemy.engine.Engine CREATE SCHEMA IF NOT EXISTS document;


2025-09-25 20:57:37,332 [INFO] sqlalchemy.engine.Engine: CREATE SCHEMA IF NOT EXISTS document;


2025-09-25 20:57:37,332 INFO sqlalchemy.engine.Engine [generated in 0.00092s] ()


2025-09-25 20:57:37,332 [INFO] sqlalchemy.engine.Engine: [generated in 0.00092s] ()


2025-09-25 20:57:37,334 INFO sqlalchemy.engine.Engine 
            CREATE TABLE IF NOT EXISTS document.document_chunk (
                id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
                embedding       VECTOR(1536),
                chunk_text      TEXT,
                doc_metadata    JSONB,
                file_name       TEXT,
                doc_tags        TEXT[],
                isActive        BOOLEAN,
                version         TEXT,
                created_at      TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                updated_at      TIMESTAMP
            );
        


2025-09-25 20:57:37,334 [INFO] sqlalchemy.engine.Engine: 
            CREATE TABLE IF NOT EXISTS document.document_chunk (
                id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
                embedding       VECTOR(1536),
                chunk_text      TEXT,
                doc_metadata    JSONB,
                file_name       TEXT,
                doc_tags        TEXT[],
                isActive        BOOLEAN,
                version         TEXT,
                created_at      TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                updated_at      TIMESTAMP
            );
        


2025-09-25 20:57:37,334 INFO sqlalchemy.engine.Engine [generated in 0.00076s] ()


2025-09-25 20:57:37,334 [INFO] sqlalchemy.engine.Engine: [generated in 0.00076s] ()


2025-09-25 20:57:37,337 INFO sqlalchemy.engine.Engine 
            CREATE INDEX IF NOT EXISTS documents_embedding_idx
            ON document.document_chunk
            USING ivfflat (embedding vector_l2_ops)
            WITH (lists = 100);
        


2025-09-25 20:57:37,337 [INFO] sqlalchemy.engine.Engine: 
            CREATE INDEX IF NOT EXISTS documents_embedding_idx
            ON document.document_chunk
            USING ivfflat (embedding vector_l2_ops)
            WITH (lists = 100);
        


2025-09-25 20:57:37,338 INFO sqlalchemy.engine.Engine [generated in 0.00116s] ()


2025-09-25 20:57:37,338 [INFO] sqlalchemy.engine.Engine: [generated in 0.00116s] ()


2025-09-25 20:57:37,342 INFO sqlalchemy.engine.Engine COMMIT


2025-09-25 20:57:37,342 [INFO] sqlalchemy.engine.Engine: COMMIT
2025-09-25 20:57:37,344 [INFO] __main__: Document schema initialisation finished
2025-09-25 20:57:37,346 [INFO] db.db_connection_pool: DB connection closed


In [None]:
# Batch insert the doc chunks to the document_chunk table
async def insert_chunk_batch(conn, rows: Sequence[Dict[str, Any]]):

    insert_sql = """
    INSERT INTO document.document_chunk
      (embedding, chunk_text, doc_metadata, file_name, doc_tags, isActive, version, created_at, updated_at)
    VALUES
      (:embedding, :chunktext, :docmetadata, :filename, :doctags, :isActive, :version, :createdat, :updatedat)
    """
    await conn.execute(text(insert_sql), rows)


async def ingest_chunks(engine, docchunks: List[Any]):

    async with get_conn(engine) as conn:

        # Prepare batches like OKA BATCH_SIZE=10
        BATCH_SIZE = 10

        for i in range(0, len(docchunks), BATCH_SIZE):
            batch = docchunks[i:i+BATCH_SIZE]

            texts = [d.chunktext for d in batch]
            embeddings = await client.aembed(texts)

            rows = []
            for d, emb in zip(batch, embeddings):
                # Align columns per DocumentChunkBase
                doctags = ",".join(d.tags) if getattr(d, "tags", None) else None
                rows.append({
                    "embedding": emb,
                    "chunktext": d.chunktext,
                    "docmetadata": json.dumps(d.metadata or {}) if isinstance(d.metadata, dict) else d.metadata,
                    "filename": getattr(d, "filename", None),
                    "doctags": doctags,
                    "isActive": getattr(d, "isActive", True),
                    "version": getattr(d, "version", None),
                    "createdat": getattr(d, "createdat", current_nz_datetime),
                    "updatedat": getattr(d, "updatedat", None),
                })

            async with conn.begin():
                await insert_chunk_batch(conn, rows)

<psycopg2.pool.ThreadedConnectionPool at 0x187e9afbc50>