In [58]:
!pip install  -q pypdf pdfplumber reportlab pandas tqdm

You should consider upgrading via the 'D:\NAIC\CogniClaim\Scripts\python.exe -m pip install --upgrade pip' command.


In [59]:
!pip install -q pytesseract pdf2image pillow

You should consider upgrading via the 'D:\NAIC\CogniClaim\Scripts\python.exe -m pip install --upgrade pip' command.


In [None]:
import os
import re
import pandas as pd
from typing import List, Tuple

# Table extractors
import camelot
import pdfplumber



In [61]:
def extract_tables_camelot(pdf_path: str) -> List[pd.DataFrame]:
    """
    Try Camelot in 'lattice' then 'stream' mode across all pages.
    Returns a list of DataFrames (one per detected table).
    """
    tables = []

    # 1) LATTICE mode (ruled tables)
    try:
        t_lattice = camelot.read_pdf(pdf_path, flavor="lattice", pages="all")
        tables += [t.df for t in t_lattice]
    except Exception as e:
        print(f"[camelot lattice] {e}")

    # 2) STREAM mode (whitespace-separated tables)
    try:
        t_stream = camelot.read_pdf(pdf_path, flavor="stream", pages="all")
        tables += [t.df for t in t_stream]
    except Exception as e:
        print(f"[camelot stream] {e}")

    # Camelot sometimes returns duplicate tables; dedup by shape+head
    deduped = []
    sigs = set()
    for df in tables:
        sig = (df.shape, tuple(df.head(2).fillna("").astype(str).values.ravel()))
        if sig not in sigs:
            sigs.add(sig)
            deduped.append(df)
    return deduped


def extract_tables_pdfplumber(pdf_path: str) -> List[pd.DataFrame]:
    """
    Very general fallback; tries pdfplumber's built-in table finder per page.
    """
    out = []
    with pdfplumber.open(pdf_path) as pdf:
        for pno, page in enumerate(pdf.pages, start=1):
            try:
                # Try structured table extraction
                tables = page.extract_tables()
                for tbl in tables or []:
                    df = pd.DataFrame(tbl)
                    out.append(df)
            except Exception as e:
                print(f"[pdfplumber page {pno}] {e}")
    return out


def normalize_headers(df: pd.DataFrame) -> pd.DataFrame:
    """
    Tries to promote the first non-empty row to headers, if headers look generic (0..N).
    """
    if df.empty:
        return df

    # If any cell in first row looks like a column name (non-numeric / contains words)
    first_row = df.iloc[0].astype(str).tolist()
    looks_like_header = any(bool(re.search(r"[A-Za-z]", cell)) for cell in first_row)

    if looks_like_header:
        new_cols = [str(c).strip() for c in first_row]
        df = df.iloc[1:].reset_index(drop=True)
        # Fix duplicate header names
        seen = {}
        cols = []
        for c in new_cols:
            if c in seen:
                seen[c] += 1
                cols.append(f"{c}_{seen[c]}")
            else:
                seen[c] = 1
                cols.append(c)
        df.columns = cols
    else:
        # Generic columns -> C0..Cn
        df.columns = [f"C{i}" for i in range(df.shape[1])]
    return df


def extract_all_tables(pdf_path: str) -> List[pd.DataFrame]:
    """
    Use Camelot; if nothing, fallback to pdfplumber. Normalize headers.
    """
    dfs = extract_tables_camelot(pdf_path)
    if not dfs:
        print("No tables from Camelot — falling back to pdfplumber…")
        dfs = extract_tables_pdfplumber(pdf_path)

    cleaned = []
    for df in dfs:
        df = df.replace({"": None})
        df = normalize_headers(df)
        # Drop fully empty columns/rows
        df = df.dropna(axis=1, how="all").dropna(axis=0, how="all")
        if not df.empty:
            cleaned.append(df.reset_index(drop=True))
    return cleaned


In [69]:
import os, re, json, io, sys
from typing import List, Optional, Dict
from tqdm import tqdm

from pypdf import PdfWriter, PdfReader
import pdfplumber
import pandas as pd

# OPTIONAL OCR imports (guarded)
try:
    from pdf2image import convert_from_path
    import pytesseract
    OCR_AVAILABLE = True
except Exception:
    OCR_AVAILABLE = False


def list_pdfs(input_dir: Optional[str], files: Optional[List[str]]) -> List[str]:
    paths = []
    if input_dir:
        for n in sorted(os.listdir(input_dir)):
            if n.lower().endswith(".pdf"):
                paths.append(os.path.join(input_dir, n))
    if files:
        for f in files:
            if f.lower().endswith(".pdf"):
                paths.append(f)
    # dedupe, keep order
    seen, out = set(), []
    for p in paths:
        ap = os.path.abspath(p)
        if ap not in seen:
            seen.add(ap); out.append(ap)
    return out


def merge_pdfs(pdf_paths: List[str], out_path: str):
    writer = PdfWriter()
    for path in pdf_paths:
        try:
            reader = PdfReader(path)
            for page in reader.pages:
                writer.add_page(page)
        except Exception as e:
            print(f"[WARN] Skipping '{path}' while merging: {e}")
    os.makedirs(os.path.dirname(os.path.abspath(out_path)) or ".", exist_ok=True)
    with open(out_path, "wb") as f:
        writer.write(f)


def extract_text_pdfplumber(path: str) -> List[str]:
    """Return list of page texts (strings). Empty string for unextractable pages."""
    texts = []
    try:
        with pdfplumber.open(path) as pdf:
            for page in pdf.pages:
                txt = page.extract_text(x_tolerance=2, y_tolerance=2) or ""
                texts.append(txt)
    except Exception as e:
        print(f"[WARN] Could not open '{path}' with pdfplumber: {e}")
    return texts


def ocr_pdf_pages(path: str, dpi: int = 300) -> List[str]:
    """OCR all pages (slow). Requires pdf2image + pytesseract + system Tesseract."""
    if not OCR_AVAILABLE:
        return []
    try:
        images = convert_from_path(path, dpi=dpi)
    except Exception as e:
        print(f"[WARN] pdf2image convert failed for '{path}': {e}")
        return []
    out = []
    for img in images:
        try:
            out.append(pytesseract.image_to_string(img))
        except Exception as e:
            print(f"[WARN] OCR failed on a page: {e}")
            out.append("")
    return out


def replace_terms(text: str, needles: List[str], replacement: str) -> str:
    """Whole-word, case-insensitive replacement."""
    if not needles:
        return text
    patt = r"\b(?:%s)\b" % "|".join(re.escape(x) for x in needles)
    return re.sub(patt, replacement, text, flags=re.IGNORECASE)


def chunk_text(s: str, max_chars: int = 1200, overlap: int = 100) -> List[str]:
    """Simple char-based chunker for RAG (keeps it library-free)."""
    s = s.strip()
    if not s:
        return []
    chunks = []
    start = 0
    while start < len(s):
        end = min(len(s), start + max_chars)
        chunk = s[start:end]
        chunks.append(chunk)
        start = end - overlap
        if start < 0:
            start = 0
        if start >= len(s):
            break
    return chunks


def write_text_only_pdf(text: str, out_pdf: str):
    """Create a text-only PDF from merged text (no images/layout)."""
    from reportlab.platypus import SimpleDocTemplate, Preformatted
    from reportlab.lib.pagesizes import A4
    from reportlab.lib.styles import getSampleStyleSheet
    from reportlab.lib.units import mm

    os.makedirs(os.path.dirname(os.path.abspath(out_pdf)) or ".", exist_ok=True)
    doc = SimpleDocTemplate(out_pdf, pagesize=A4,
                            leftMargin=18*mm, rightMargin=18*mm,
                            topMargin=18*mm, bottomMargin=18*mm)
    styles = getSampleStyleSheet()
    story = [Preformatted(text, styles["Code"])]
    doc.build(story)


In [63]:
def row_to_sentence(row: pd.Series, include_cols: List[str] = None) -> str:
    """
    Convert a row into a compact textual record.
    If include_cols is provided, use those columns in order when present.
    Otherwise, use all columns.
    """
    if include_cols is None:
        include_cols = list(row.index)

    parts = []
    for col in include_cols:
        if col in row.index:
            val = str(row[col]).strip()
            if val and val.lower() != "nan":
                parts.append(f"{col}: {val}")
    return ", ".join(parts)


def dataframe_to_row_sentences(
    df: pd.DataFrame, include_cols: List[str] = None, add_table_name: str = None
) -> List[str]:
    records = []
    for _, row in df.iterrows():
        sentence = row_to_sentence(row, include_cols)
        if add_table_name:
            sentence = f"[{add_table_name}] " + sentence
        records.append(sentence)
    return records

In [64]:
pdf_path = "D:/NAIC/NAIC-2025/policy/health/Policy Booklet.pdf"

tables = extract_all_tables(pdf_path)
print(f"Found {len(tables)} table(s).")

all_sentences = []
for ti, df in enumerate(tables, start=1):
    # Try to guess relevant columns if present
    preferred_cols = [
        "A. Hospitals", "Hospitals", "Hospital", "Hospital type",
        "Direct Settlement", "List 1", "List 2", "List 3", "List 4", "Notes" , "Medical Facilities" , 'Allied Health Professionals' , 'Waiting Periods'
    ]
    # Only keep those that exist in this df (preserving order)
    include_cols = [c for c in preferred_cols if c in df.columns]
    # Fallback to all columns if none of the preferred exist
    if not include_cols:
        include_cols = list(df.columns)

    sentences = dataframe_to_row_sentences(
        df, include_cols=include_cols, add_table_name=f"Table {ti}"
    )
    all_sentences.extend(sentences)

print(f"Prepared {len(all_sentences)} row-level text chunks.")

# Preview a few
for s in all_sentences[:5]:
    print("-", s)

# Save outputs if you like:
pd.DataFrame({"text": all_sentences}).to_csv("row_chunks.csv", index=False)
pd.DataFrame({"text": all_sentences}).to_json("row_chunks.jsonl", orient="records", lines=True)
print("Saved row_chunks.csv and row_chunks.jsonl")

Cannot set gray non-stroke color because /'P0' is an invalid float value
Cannot set gray non-stroke color because /'P1' is an invalid float value
  if self._document_has_no_text():
Cannot set gray non-stroke color because /'P0' is an invalid float value
Cannot set gray non-stroke color because /'P1' is an invalid float value
  cols, rows, v_s, h_s = self._generate_columns_and_rows(bbox, user_cols)
  cols, rows, v_s, h_s = self._generate_columns_and_rows(bbox, user_cols)
  cols, rows, v_s, h_s = self._generate_columns_and_rows(bbox, user_cols)
  cols, rows, v_s, h_s = self._generate_columns_and_rows(bbox, user_cols)
  cols, rows, v_s, h_s = self._generate_columns_and_rows(bbox, user_cols)
  if self._document_has_no_text():


Found 51 table(s).
Prepared 2729 row-level text chunks.
- [Table 1] General: Rules
- [Table 1] General: Policy Booklet
- [Table 1] General: looking after you always
- [Table 2] Welcome to Laya Healthcare.: Thank you for choosing us to look after your healthcare cover. This rules booklet contains very
- [Table 2] Welcome to Laya Healthcare.: detailed legal information about our schemes and can act as a reference to your  Benefit Table.
Saved row_chunks.csv and row_chunks.jsonl


  cols, rows, v_s, h_s = self._generate_columns_and_rows(bbox, user_cols)


In [None]:
# EITHER set a directory OR a list of files:
INPUT_DIR = "D:/NAIC/NAIC-2025/policy/health"         # folder containing PDFs (change as needed)
INPUT_FILES = []            # or: ["a.pdf", "b.pdf", "c.pdf"]

# Outputs
MERGED_ORIGINAL_PDF = "out/merged_original.pdf"  # merged original pages (layout/images preserved)
MERGED_TEXT_PDF     = "out/merged_text_only.pdf" # optional text-only PDF for reading
CORPUS_TXT          = "out/corpus.txt"           # all text concatenated
PAGES_JSONL         = "out/pages.jsonl"          # one record per page
CHUNKS_JSONL        = "out/chunks.jsonl"         # ready for RAG: chunked text with metadata

# Optional replacement (set to [] to disable)
REPLACE_TERMS = ["Allianz", "Laya", "ILH" , 'Allied Health']  # whole-word, case-insensitive
REPLACE_WITH  = "novaworks"                 # target text

# OCR fallback if a page has no extractable text
USE_OCR_WHEN_EMPTY = False   # set True if many pages are scanned


In [72]:
pdfs = list_pdfs(INPUT_DIR, INPUT_FILES)
assert pdfs, "No PDFs found. Set INPUT_DIR or INPUT_FILES."

# 1) Merge PDFs (original pages preserved)
merge_pdfs(pdfs, MERGED_ORIGINAL_PDF)
print(f"✅ Merged original PDF → {MERGED_ORIGINAL_PDF}")

# 2) Extract text (per page) with fallback OCR if enabled
corpus_pages: List[Dict] = []
for path in tqdm(pdfs, desc="Extracting"):
    page_texts = extract_text_pdfplumber(path)
    if USE_OCR_WHEN_EMPTY and OCR_AVAILABLE:
        # fill empty pages via OCR
        ocr_texts = None
        if any(t.strip()=="" for t in page_texts):
            ocr_texts = ocr_pdf_pages(path)
        for i, t in enumerate(page_texts):
            if (t is None or t.strip()=="") and ocr_texts and i < len(ocr_texts):
                page_texts[i] = ocr_texts[i] or ""
    # write page records
    for i, t in enumerate(page_texts, start=1):
        if REPLACE_TERMS:
            t = replace_terms(t, REPLACE_TERMS, REPLACE_WITH)
        corpus_pages.append({
            "source_file": os.path.basename(path),
            "page_number": i,
            "text": t or ""
        })

# 3) Save pages.jsonl and corpus.txt
os.makedirs(os.path.dirname(PAGES_JSONL) or ".", exist_ok=True)
with open(PAGES_JSONL, "w", encoding="utf-8") as f:
    for rec in corpus_pages:
        f.write(json.dumps(rec, ensure_ascii=False) + "\n")
print(f"🧾 Wrote per-page JSONL → {PAGES_JSONL}  (records: {len(corpus_pages)})")

merged_text = "\n".join(
    [f"=== {r['source_file']} — Page {r['page_number']} ===\n{r['text']}".rstrip()
     for r in corpus_pages]
)
with open(CORPUS_TXT, "w", encoding="utf-8") as f:
    f.write(merged_text)
print(f"📄 Wrote corpus text → {CORPUS_TXT}  (chars: {len(merged_text):,})")

# 4) Optional: build a text-only PDF (nice for manual review)
# write_text_only_pdf(merged_text, MERGED_TEXT_PDF)
# print(f"🧱 Wrote text-only PDF → {MERGED_TEXT_PDF}")

# 5) Chunk for RAG and save chunks.jsonl
#    (simple character chunks; swap with token-based if you prefer)
# CHUNK_SIZE = 1200
# OVERLAP    = 100

# chunk_records = []
# for rec in corpus_pages:
#     chunks = chunk_text(rec["text"], max_chars=CHUNK_SIZE, overlap=OVERLAP)
#     for idx, ch in enumerate(chunks):
#         chunk_records.append({
#             "id": f"{rec['source_file']}::p{rec['page_number']}::c{idx}",
#             "source_file": rec["source_file"],
#             "page_number": rec["page_number"],
#             "chunk_index": idx,
#             "text": ch
#         })

# with open(CHUNKS_JSONL, "w", encoding="utf-8") as f:
#     for r in chunk_records:
#         f.write(json.dumps(r, ensure_ascii=False) + "\n")
# print(f"🧩 Wrote RAG chunks → {CHUNKS_JSONL}  (chunks: {len(chunk_records)})")


✅ Merged original PDF → out/merged_original.pdf


Extracting:  43%|████▎     | 3/7 [00:23<00:38,  9.55s/it]Cannot set gray non-stroke color because /'P0' is an invalid float value
Cannot set gray non-stroke color because /'P1' is an invalid float value
Extracting: 100%|██████████| 7/7 [00:41<00:00,  5.86s/it]

🧾 Wrote per-page JSONL → out/pages.jsonl  (records: 204)
📄 Wrote corpus text → out/corpus.txt  (chars: 601,694)





In [1]:
from langchain.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
#
loader = PyPDFLoader("C:/Users/viraj/Downloads/complete-health-insurance-brochure.pdf")
documents = loader.load()
#
print(len(documents))

  from cryptography.hazmat.primitives.ciphers.algorithms import AES, ARC4


16


In [3]:
from langchain.docstore.document import Document

with open("D:/NAIC/NAIC-2025/out/corpus.txt", "r", encoding="utf-8") as f:
    text = f.read()

docs = [Document(page_content=text, metadata={"source": "corpus.txt"})]
print(docs[0].page_content[:500]) 

=== C23.9175-Insurance-Broker-Health-Insurance-Guide-v3.pdf — Page 1 ===
A GUIDE
TO HEALTH
INSURANCE
=== C23.9175-Insurance-Broker-Health-Insurance-Guide-v3.pdf — Page 2 ===
A GUIDE
TO HEALTH
INSURANCE
=== C23.9175-Insurance-Broker-Health-Insurance-Guide-v3.pdf — Page 3 ===
INSURANCE BROKER HEALTH INSURANCE GUIDE
A GUIDE
TO HEALTH
INSURANCE
Health insurance is a critical component of
financial planning that provides coverage
for the cost of medical expenses. It is
designed to protect individuals


In [4]:
from langchain.text_splitter import RecursiveCharacterTextSplitter

text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=0,
    length_function=len,
    is_separator_regex=False
)
#
naive_chunks = text_splitter.split_documents(docs)
for chunk in naive_chunks[10:15]:
  print(chunk.page_content)


• Will it cover all of the treatments I need?
Some policies may have exclusions on treatments that you might need such as
dental , outpatient treatments, experimental treatments.
• Do I understand all of the details of the policy?
Be sure to read all of the small print. If its imperative that if you do not
understand something, you should ask your provider for an explanation in plain
English.
• Do I have any waiting periods for pre-existing conditions?
You will not be able to claim for an illness if a waiting period applies, (See table of
waiting periods above)
• What happens if it is a family policy, and the main
policyholder dies?
Normally the rest of the family are still covered until you notify your provider that
the main policyholder has died. Always contact your provider as they will advise
you best on what to do next.
THE INSURANCE EXPERT PAGE 16
=== C23.9175-Insurance-Broker-Health-Insurance-Guide-v3.pdf — Page 17 ===
INSURANCE BROKER A GUIDE TO HEALTH INSURANCE
CAN YOU EARN
A 

In [1]:
!pip install fastembed-gpu

Collecting fastembed-gpu
  Downloading fastembed_gpu-0.7.3-py3-none-any.whl (105 kB)
Collecting onnxruntime-gpu!=1.20.0,>=1.17.0
  Downloading onnxruntime_gpu-1.22.0-cp310-cp310-win_amd64.whl (214.9 MB)
Installing collected packages: onnxruntime-gpu, fastembed-gpu
Successfully installed fastembed-gpu-0.7.3 onnxruntime-gpu-1.22.0


You should consider upgrading via the 'D:\NAIC\CogniClaim\Scripts\python.exe -m pip install --upgrade pip' command.


In [20]:
!pip install langchain-huggingface

Collecting langchain-huggingface
  Downloading langchain_huggingface-0.3.1-py3-none-any.whl (27 kB)
Installing collected packages: langchain-huggingface
Successfully installed langchain-huggingface-0.3.1


You should consider upgrading via the 'D:\NAIC\CogniClaim\Scripts\python.exe -m pip install --upgrade pip' command.


In [24]:
import torch
print(f"CUDA available: {torch.cuda.is_available()}")
print(f"GPU device: {torch.cuda.get_device_name()}")

# Check if your embedding model is actually using GPU
from langchain_huggingface import HuggingFaceEndpointEmbeddings


model_id = "llmware/industry-bert-insurance-v0.1"

emb = HuggingFaceEndpointEmbeddings(
    model=model_id,                 # or use repo_id=... (deprecated alias)
    task="feature-extraction",      # default; being explicit
    huggingfacehub_api_token='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'# huggingfacehub_api_token="hf_your_token_here",  # optional if not in env
)


CUDA available: True
GPU device: NVIDIA GeForce RTX 4060 Laptop GPU


In [6]:
from langchain_openai import ChatOpenAI
from openai import OpenAI

openai_api_key = OpenAI(api_key='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')

In [25]:
from langchain_experimental.text_splitter import SemanticChunker
from langchain_openai.embeddings import OpenAIEmbeddings

semantic_chunker = SemanticChunker(emb, breakpoint_threshold_type="percentile")
#
semantic_chunks = semantic_chunker.create_documents([d.page_content for d in docs])
#
# for semantic_chunk in semantic_chunks:
#     print(semantic_chunk.page_content)
#     print(len(semantic_chunk.page_content))


StopIteration: 

In [8]:
len(semantic_chunks)

172

In [9]:
for semantic_chunk in semantic_chunks:
    print(semantic_chunk.page_content)
    print(len(semantic_chunk.page_content))

=== C23.9175-Insurance-Broker-Health-Insurance-Guide-v3.pdf — Page 1 ===
A GUIDE
TO HEALTH
INSURANCE
=== C23.9175-Insurance-Broker-Health-Insurance-Guide-v3.pdf — Page 2 ===
A GUIDE
TO HEALTH
INSURANCE
=== C23.9175-Insurance-Broker-Health-Insurance-Guide-v3.pdf — Page 3 ===
INSURANCE BROKER HEALTH INSURANCE GUIDE
A GUIDE
TO HEALTH
INSURANCE
Health insurance is a critical component of
financial planning that provides coverage
for the cost of medical expenses. It is
designed to protect individuals and families
from unexpected medical bills, which can
often be financially devastating.
588
With the
ever-increasing costs of healthcare and the
wide range of insurance options available,
navigating the world of health insurance
can be overwhelming. Understanding the
basics of health insurance is essential in
choosing the right plan for your needs and
budget. This guide will provide an overview
of health insurance, the different types of
plans available, key terms to know, and tips
for selectin

In [10]:
from langchain_community.vectorstores import Chroma
semantic_chunk_vectorstore = Chroma.from_documents(semantic_chunks, embedding=embed_model)

In [11]:
semantic_chunk_retriever = semantic_chunk_vectorstore.as_retriever(search_kwargs={"k" : 10})
semantic_chunk_retriever.invoke("Describe the Policy Rules")

[Document(metadata={}, page_content='• Do I understand all of the details of the policy?'),
 Document(metadata={}, page_content='Reading your rules is used to determine any additional LCR Amount\nthat you may have to pay. booklet\nThis booklet consists of the Scheme Rules which Benefits\nsets out definitions and the rules applicable to\nThe hospital charges, medical fees, shortfall\nyour policy. amounts, excess amounts and other benefits\nYou need to read these rules (including the shown in your Benefit Table. notes) in conjunction with the current Benefit\nTable applicable to your policy, your membership\nClinical Indicators\ncertificate and your application form (if\napplicable). These documents and the Scheme Certain procedures require Clinical Indicators\nRules make up the agreement between us, novaworks which will need to be provided by your GP\nhealthcare, and you, the member. or Consultant. The application of a Clinical\nIndicator for a specific procedure is a widely\nThe benefi

In [12]:
from langchain_core.prompts import ChatPromptTemplate

rag_template = """\
Use the following context to answer the user's query. If you cannot answer, please respond with 'I don't know'.

User's Query:
{question}

Context:
{context}
"""

rag_prompt = ChatPromptTemplate.from_template(rag_template)

In [13]:
chat_model = ChatOpenAI(
    temperature=1,
    model='gpt-4o-mini',
    api_key='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'
)

In [14]:
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser

semantic_rag_chain = (
    {"context" : semantic_chunk_retriever, "question" : RunnablePassthrough()}
    | rag_prompt
    | chat_model
    | StrOutputParser()
)

In [16]:
semantic_rag_chain.invoke("I was diagnosed with Type 2 diabetes three months after my policy effective date, but I had pre-existing hypertension that I disclosed during enrollment. My endocrinologist wants me to get a continuous glucose monitor, but it requires prior authorization. My policy has a $2,500 deductible that I haven't met, plus a separate $500 specialty device deductible. The CGM costs $200/month, but there's also a one-time $800 setup fee. My plan covers durable medical equipment at 80% after deductible, but I'm not sure if CGM qualifies as DME or if it falls under prescription benefits which have different copay tiers. Additionally, I'm traveling internationally for work next month - does the prior authorization remain valid if I need to fill prescriptions abroad, and would emergency supplies be covered under my out-of-network emergency provisions? ")

"I don't know."

CLAUDE BULLSHIT

In [None]:
import re
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from langchain.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.embeddings.fastembed import FastEmbedEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.documents import Document
from langchain.schema import BaseRetriever
import numpy as np
from langchain_community.vectorstores.utils import filter_complex_metadata

@dataclass
class InsuranceChunk:
    """Structured representation of insurance document chunks"""
    content: str
    metadata: Dict[str, Any]
    section_type: str  # "coverage", "exclusion", "definition", "procedure"
    section_hierarchy: List[str]  # ["Section 1", "1.1", "1.1.a"]
    cross_references: List[str]  # References to other sections
    conditions: List[str]  # If-then conditions found
    numerical_values: Dict[str, float]  # Extracted amounts, percentages

class InsuranceDocumentProcessor:
    """Advanced processor for insurance documents"""
    
    def __init__(self):
        self.section_patterns = {
            'section': r'Section\s+([IVX\d]+\.?[\d\w]*)',
            'subsection': r'(\d+\.)+\s*[A-Za-z]',
            'definition': r'Definition|means|shall mean|is defined as',
            'exclusion': r'excluded|not covered|does not cover|limitation',
            'coverage': r'covered|coverage|benefits|pays|reimburse',
            'condition': r'if|provided that|subject to|only if|when'
        }
        
    def extract_structure(self, text: str) -> Dict[str, Any]:
        """Extract document structure and metadata"""
        structure = {
            'sections': [],
            'definitions': [],
            'exclusions': [],
            'conditions': [],
            'cross_refs': [],
            'amounts': {}
        }
        
        # Find section headers
        for match in re.finditer(self.section_patterns['section'], text, re.IGNORECASE):
            structure['sections'].append({
                'number': match.group(1),
                'start_pos': match.start(),
                'text': match.group(0)
            })
        
        # Find cross-references (e.g., "see Section 2.1", "as defined in 1.3")
        cross_ref_pattern = r'(?:see|refer to|as (?:defined|stated) in)\s+(?:Section\s+)?([IVX\d\.]+)'
        structure['cross_refs'] = re.findall(cross_ref_pattern, text, re.IGNORECASE)
        
        # Extract monetary amounts and percentages
        amount_pattern = r'\$[\d,]+(?:\.\d{2})?|\d+%|\d+\s*percent'
        structure['amounts'] = re.findall(amount_pattern, text)
        
        # Find conditional statements
        condition_pattern = r'(?:if|when|provided that|subject to)[^.!?]*[.!?]'
        structure['conditions'] = re.findall(condition_pattern, text, re.IGNORECASE)
        
        return structure

    def intelligent_chunking(self, documents: List[Document]) -> List[InsuranceChunk]:
        """Create semantically meaningful chunks with preserved structure"""
        chunks = []
        
        for doc in documents:
            text = doc.page_content
            structure = self.extract_structure(text)
            
            # Split by sections first, then by semantic boundaries
            if structure['sections']:
                section_chunks = self._split_by_sections(text, structure)
            else:
                # Fallback to improved character splitting
                section_chunks = self._semantic_split_fallback(text)
            
            for i, chunk_text in enumerate(section_chunks):
                chunk_structure = self.extract_structure(chunk_text)
                
                # Classify chunk type
                chunk_type = self._classify_chunk_type(chunk_text)
                
                chunks.append(InsuranceChunk(
                    content=chunk_text,
                    metadata={
                        **doc.metadata,
                        'chunk_id': i,
                        'char_count': len(chunk_text),
                        'word_count': len(chunk_text.split()),
                    },
                    section_type=chunk_type,
                    section_hierarchy=self._extract_hierarchy(chunk_text),
                    cross_references=chunk_structure['cross_refs'],
                    conditions=chunk_structure['conditions'],
                    numerical_values=self._extract_numerical_values(chunk_structure['amounts'])
                ))
        
        return chunks
    
    def _split_long_section(self, text: str) -> List[str]:
        """Split a long section into smaller chunks while preserving context"""
        splitter = RecursiveCharacterTextSplitter(
            chunk_size=1200,
            chunk_overlap=200,
            separators=["\n\n", "\n", ". ", "!", "?", ";", " "],
            length_function=len
        )
        
        temp_doc = Document(page_content=text)
        sub_chunks = splitter.split_documents([temp_doc])
        
        return [chunk.page_content for chunk in sub_chunks]

    
    def _split_by_sections(self, text: str, structure: Dict) -> List[str]:
        """Split text by logical sections"""
        if not structure['sections']:
            return [text]
        
        chunks = []
        sections = structure['sections']
        
        for i, section in enumerate(sections):
            start_pos = section['start_pos']
            end_pos = sections[i + 1]['start_pos'] if i + 1 < len(sections) else len(text)
            
            section_text = text[start_pos:end_pos].strip()
            
            # If section is too long, split further
            if len(section_text) > 2000:
                sub_chunks = self._split_long_section(section_text)
                chunks.extend(sub_chunks)
            else:
                chunks.append(section_text)
        
        return chunks
    
    def _semantic_split_fallback(self, text: str) -> List[str]:
        """Improved fallback splitting with overlap"""
        splitter = RecursiveCharacterTextSplitter(
            chunk_size=1200,
            chunk_overlap=200,  # Important: maintain context
            separators=["\n\n", "\n", ". ", "!", "?", ";", " "],
            length_function=len
        )
        return [chunk.page_content for chunk in splitter.split_documents([Document(page_content=text)])]
    
    def _classify_chunk_type(self, text: str) -> str:
        """Classify chunk by content type"""
        text_lower = text.lower()
        
        if any(word in text_lower for word in ['definition', 'means', 'shall mean', 'is defined as']):
            return 'definition'
        elif any(word in text_lower for word in ['excluded', 'not covered', 'limitation', 'does not apply']):
            return 'exclusion'
        elif any(word in text_lower for word in ['coverage', 'covered', 'benefits', 'reimburse', 'pays']):
            return 'coverage'
        elif any(word in text_lower for word in ['claim', 'file', 'procedure', 'process', 'submit']):
            return 'procedure'
        else:
            return 'general'
    
    def _extract_hierarchy(self, text: str) -> List[str]:
        """Extract section hierarchy from text"""
        hierarchy = []
        # Look for section numbers like "1.1", "2.3.a", "Section II"
        section_matches = re.findall(r'(?:Section\s+)?([IVX\d]+(?:\.\d+)*(?:\.[a-zA-Z])?)', text)
        return section_matches[:3]  # Keep only first few hierarchy levels
    
    def _extract_numerical_values(self, amounts: List[str]) -> Dict[str, float]:
        """Extract and parse numerical values"""
        values = {}
        for amount in amounts:
            if '$' in amount:
                # Parse dollar amounts
                clean_amount = re.sub(r'[^\d.]', '', amount)
                try:
                    values[f'amount_{len(values)}'] = float(clean_amount)
                except ValueError:
                    pass
            elif '%' in amount or 'percent' in amount:
                # Parse percentages
                clean_percent = re.sub(r'[^\d.]', '', amount)
                try:
                    values[f'percentage_{len(values)}'] = float(clean_percent)
                except ValueError:
                    pass
        return values

class HybridInsuranceRetriever(BaseRetriever):
    """Custom retriever that combines multiple retrieval strategies"""
    
            
    vectorstore: Any = None  # Add this field
    chunks: List[Any] = []   # Add this field  
    chunk_index: Dict[int, Any] = {}  # Add this field
    
    def __init__(self, vectorstore, chunks: List[InsuranceChunk]):
        super().__init__()  # Initialize parent class
        self.vectorstore = vectorstore
        self.chunks = chunks
        self.chunk_index = {i: chunk for i, chunk in enumerate(chunks)}
    
    def _get_relevant_documents(self, query: str) -> List[Document]:
        # Step 1: Vector similarity search
        similar_docs = self.vectorstore.similarity_search_with_score(query, k=10)
        
        # Step 2: Keyword-based filtering for insurance terms
        insurance_keywords = self._extract_insurance_keywords(query)
        
        # Step 3: Type-based filtering
        query_type = self._classify_query_type(query)
        
        # Step 4: Cross-reference expansion
        expanded_docs = self._expand_with_references(similar_docs, query_type)
        
        # Step 5: Re-rank by relevance
        final_docs = self._rerank_results(expanded_docs, query, query_type)
        
        return final_docs[:5]  # Return top 5
    
    def _extract_insurance_keywords(self, query: str) -> List[str]:
        """Extract insurance-specific terms from query"""
        insurance_terms = [
            'deductible', 'premium', 'coverage', 'exclusion', 'claim', 
            'policy', 'benefit', 'liability', 'comprehensive', 'collision'
        ]
        return [term for term in insurance_terms if term.lower() in query.lower()]
    
    def _classify_query_type(self, query: str) -> str:
        """Classify the type of insurance question"""
        query_lower = query.lower()
        
        if any(word in query_lower for word in ['covered', 'cover', 'coverage']):
            return 'coverage_check'
        elif any(word in query_lower for word in ['exclude', 'exclusion', 'not covered']):
            return 'exclusion_lookup'
        elif any(word in query_lower for word in ['define', 'definition', 'mean', 'means']):
            return 'definition_lookup'
        elif any(word in query_lower for word in ['claim', 'file', 'process', 'submit']):
            return 'procedure_inquiry'
        elif any(word in query_lower for word in ['cost', 'pay', 'amount', 'deductible']):
            return 'cost_inquiry'
        else:
            return 'general'
    
    def _expand_with_references(self, docs: List[tuple], query_type: str) -> List[Document]:
        """Expand results with cross-referenced sections"""
        expanded = []
        
        for doc, score in docs:
            expanded.append((doc, score))
            
            # Find corresponding chunk
            chunk_id = doc.metadata.get('chunk_id')
            if chunk_id in self.chunk_index:
                chunk = self.chunk_index[chunk_id]
                
                # Add cross-referenced sections
                for ref in chunk.cross_references:
                    ref_chunks = [c for c in self.chunks if ref in c.section_hierarchy]
                    for ref_chunk in ref_chunks:
                        ref_doc = Document(
                            page_content=ref_chunk.content,
                            metadata={'source': 'cross_reference', 'reference_to': ref}
                        )
                        expanded.append((ref_doc, score * 0.8))  # Lower score for references
        
        return [doc for doc, score in expanded]
    
    def _rerank_results(self, docs: List[Document], query: str, query_type: str) -> List[Document]:
        """Re-rank results based on insurance-specific criteria"""
        scored_docs = []
        
        for doc in docs:
            score = 0
            content_lower = doc.page_content.lower()
            
            # Base relevance (simplified)
            query_words = query.lower().split()
            matching_words = sum(1 for word in query_words if word in content_lower)
            score += matching_words / len(query_words)
            
            # Boost for document type matching query type
            if query_type == 'coverage_check' and 'coverage' in content_lower:
                score += 0.3
            elif query_type == 'exclusion_lookup' and 'exclusion' in content_lower:
                score += 0.3
            elif query_type == 'definition_lookup' and any(word in content_lower for word in ['definition', 'means', 'defined as']):
                score += 0.3
            
            # Boost for specific section types
            if doc.metadata.get('source') == 'cross_reference':
                score += 0.2  # Boost cross-references as they're often relevant
            
            scored_docs.append((doc, score))
        
        # Sort by score and return documents
        scored_docs.sort(key=lambda x: x[1], reverse=True)
        return [doc for doc, score in scored_docs]

# Enhanced RAG Chain
def create_enhanced_insurance_rag():
    """Create an enhanced RAG system for insurance documents"""
    
    # Load documents - update this path to your actual file
    pdf_path = "D:/NAIC/NAIC-2025/out/merged_text_only.pdf"

    
    try:
        loader = PyPDFLoader(pdf_path)
        documents = loader.load()
        print(f"Loaded {len(documents)} pages from {pdf_path}")
    except Exception as e:
        print(f"Error loading PDF: {e}")
        # Create dummy document for testing
        documents = [Document(page_content="Sample insurance policy text for testing.", metadata={"page": 0})]
    
    # Process documents with insurance-aware chunking
    processor = InsuranceDocumentProcessor()
    insurance_chunks = processor.intelligent_chunking(documents)
    print(f"Created {len(insurance_chunks)} intelligent chunks")
    
    # Convert to LangChain documents for vector store
    langchain_docs = []
    for i, chunk in enumerate(insurance_chunks):
        # Convert lists to strings for ChromaDB compatibility
        clean_metadata = {}
        
        # Copy simple metadata fields
        if hasattr(chunk, 'metadata') and chunk.metadata:
            for key, value in chunk.metadata.items():
                # Only include simple types (str, int, float, bool, None)
                if isinstance(value, (str, int, float, bool, type(None))):
                    clean_metadata[key] = value
        
        # Convert complex fields to strings
        clean_metadata['section_type'] = str(chunk.section_type) if chunk.section_type else ''
        
        # Convert lists to comma-separated strings
        if chunk.cross_references:
            clean_metadata['cross_references'] = ', '.join(str(ref) for ref in chunk.cross_references)
        else:
            clean_metadata['cross_references'] = ''
            
        if chunk.conditions:
            clean_metadata['conditions'] = ' | '.join(str(cond) for cond in chunk.conditions)
        else:
            clean_metadata['conditions'] = ''
            
        if chunk.section_hierarchy:
            clean_metadata['hierarchy'] = ' > '.join(str(hier) for hier in chunk.section_hierarchy)
        else:
            clean_metadata['hierarchy'] = ''
        
        doc = Document(
            page_content=chunk.content,
            metadata=clean_metadata
        )
        langchain_docs.append(doc)
    
    # Create embeddings and vector store
    try:
        # Try FastEmbed first (faster)
        embed_model = FastEmbedEmbeddings(model_name="BAAI/bge-base-en-v1.5")
        print("Using FastEmbed embeddings")
    except Exception as e:
        print(f"FastEmbed failed: {e}")
        # Fallback to HuggingFace
        try:
            from langchain_community.embeddings import HuggingFaceEmbeddings
            embed_model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
            print("Using HuggingFace embeddings")
        except Exception as e2:
            print(f"All embedding models failed: {e2}")
            return None
    
    vectorstore = Chroma.from_documents(langchain_docs, embedding=embed_model)
    print("Vector store created successfully")
    
    # Create hybrid retriever
    hybrid_retriever = HybridInsuranceRetriever(vectorstore, insurance_chunks)
    
    # Enhanced prompt template
    enhanced_prompt = ChatPromptTemplate.from_template("""
    You are an expert insurance policy analyst. Use the provided policy context to answer the user's question.

    IMPORTANT INSTRUCTIONS:
    1. Base your answer ONLY on the provided context
    2. If multiple policy sections apply, explain how they interact
    3. If there are conditions or exclusions that apply, clearly state them
    4. If you find conflicting information, explain the conflict
    5. Always cite specific policy sections when making claims
    6. If you cannot find the answer in the context, say "I cannot find this information in the provided policy documents"

    Context from Policy Documents:
    {context}

    User's Question: {question}

    Analysis:
    """)
    
    # Chat model - you'll need to set your OpenAI API key
    try:
        chat_model = ChatOpenAI(
            temperature=0,  # Lower temperature for more consistent policy interpretation
            model='gpt-4o-mini',
            api_key='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'
        )
    except Exception as e:
        print(f"OpenAI model initialization failed: {e}")
        print("You need to set your OPENAI_API_KEY environment variable")
        return None
    
    # Build the enhanced chain
    def format_context(docs):
        formatted_sections = []
        for i, doc in enumerate(docs):
            section_info = ""
            if doc.metadata.get('section_type'):
                section_info += f"[{doc.metadata['section_type'].upper()}] "
            if doc.metadata.get('hierarchy'):
                section_info += f"Section {', '.join(doc.metadata['hierarchy'])}: "
            
            formatted_sections.append(f"{section_info}\n{doc.page_content}\n")
        
        return "\n---\n".join(formatted_sections)
    
    enhanced_chain = (
        {"context": hybrid_retriever | format_context, "question": lambda x: x}
        | enhanced_prompt
        | chat_model
    )
    
    return enhanced_chain

# Usage example


In [43]:
if __name__ == "__main__":
    # Create the enhanced RAG system
    rag_chain = create_enhanced_insurance_rag()
    
    # Test with complex insurance questions
    test_questions = [
        "My claim for bariatric surgery was initially denied because the insurer says I didn't meet the 6-month supervised diet requirement, but my previous insurer's records show I completed 4 months before switching jobs. My new employer's plan has a different BMI threshold (35 vs 40) and my BMI is 37. Can I appeal using records from my previous coverage, and if my spouse's plan would cover this procedure, how does coordination of benefits work when one plan covers and the other denies the same service?"
    ]
    
    for question in test_questions:
        print(f"\nQuestion: {question}")
        print("Answer:", rag_chain.invoke(question))
        print("-" * 80)

Loaded 120 pages from D:/NAIC/NAIC-2025/out/merged_text_only.pdf
Created 570 intelligent chunks
Using FastEmbed embeddings
Vector store created successfully

Question: My claim for bariatric surgery was initially denied because the insurer says I didn't meet the 6-month supervised diet requirement, but my previous insurer's records show I completed 4 months before switching jobs. My new employer's plan has a different BMI threshold (35 vs 40) and my BMI is 37. Can I appeal using records from my previous coverage, and if my spouse's plan would cover this procedure, how does coordination of benefits work when one plan covers and the other denies the same service?
Answer: content="Based on the provided policy context, here’s how your situation regarding the appeal for bariatric surgery can be analyzed:\n\n1. **Appealing the Claim Denial**: You can appeal the decision made on your claim by writing to the insurer and outlining the reasons for your appeal. You should include any additional i

In [None]:
vectorstore = Chroma.from_documents(langchain_docs, embedding=embed_model)

# With this:
persist_directory = "./chroma_insurance_db"  # Choose your directory
vectorstore = Chroma.from_documents(
    langchain_docs, 
    embedding=embed_model,
    persist_directory=persist_directory
)
# The vectorstore is automatically saved to disk
print(f"Vector store persisted to {persist_directory}")

In [None]:
print("RETRIEVED SOURCES:")
print("="*50)
sources = hybrid_retriever.get_relevant_documents(query)
for i, doc in enumerate(sources, 1):
    print(f"\nSource {i} ({len(doc.page_content)} chars):")
    print(doc.page_content[:300] + "..." if len(doc.page_content) > 300 else doc.page_content)
    print(f"Metadata: {doc.metadata}")

print("\n" + "="*50)
print("AI RESPONSE:")
print("="*50)

# Then get the answer
answer = rag_chain.invoke(query)
print(answer)