In [1]:
import pdfplumber
import re
from collections import Counter

def analyze_font_sizes(pdf):
    """
    Scans the PDF to determine the most common font size (Body Text)
    and identify sizes likely to be headers.
    """
    font_sizes = []
    for page in pdf.pages:
        # Get all words and their heights
        words = page.extract_words(extra_attrs=["size"])
        for w in words:
            if w['size']:
                # Round to nearest integer to group similar sizes
                font_sizes.append(round(w['size']))
    
    if not font_sizes:
        return 10, {} # Fallback

    # The most common font size is likely the Body Text
    size_counts = Counter(font_sizes)
    body_font_size = size_counts.most_common(1)[0][0]
    
    return body_font_size

def is_footer_or_header(text, page_height, y_position):
    """
    Simple heuristic to filter out page numbers and copyright footers 
    based on vertical position (top 5% or bottom 10% of page).
    """
    if y_position < (page_height * 0.05) or y_position > (page_height * 0.90):
        # Specific filters for the document you provided
        skip_keywords = ["Page", "Copyright", "UnitedHealthcare", "Proprietary Information", "Effective"]
        if any(keyword in text for keyword in skip_keywords):
            return True
    return False

def list_to_markdown_table(table_data):
    """
    Converts a list of lists (from pdfplumber) into a Markdown table.
    """
    if not table_data:
        return ""
    
    # Clean None values
    clean_data = [[str(cell).replace('\n', ' ').strip() if cell else "" for cell in row] for row in table_data]
    
    # Create header row
    header = "| " + " | ".join(clean_data[0]) + " |"
    separator = "| " + " | ".join(["---"] * len(clean_data[0])) + " |"
    
    body = ""
    for row in clean_data[1:]:
        body += "\n| " + " | ".join(row) + " |"
        
    return f"\n{header}\n{separator}{body}\n"

def convert_pdf_to_md(pdf_path, md_path):
    print(f"Processing {pdf_path}...")
    
    with pdfplumber.open(pdf_path) as pdf:
        body_font_size = analyze_font_sizes(pdf)
        print(f"Detected Body Font Size: ~{body_font_size}pt")
        
        markdown_content = []
        
        for i, page in enumerate(pdf.pages):
            print(f"Scanning Page {i+1}...")
            
            # 1. Extract Tables first to avoid duplicating text
            tables = page.find_tables()
            table_bboxes = [t.bbox for t in tables] # Get bounding boxes to exclude this text later
            
            extracted_tables = []
            for table in page.extract_tables():
                extracted_tables.append(list_to_markdown_table(table))
            
            # 2. Extract Text Line by Line
            text_content = []
            
            # Extract words with layout info
            words = page.extract_words(extra_attrs=["size", "top", "bottom"])
            
            # Group words into lines based on "top" position (with small tolerance)
            lines = {}
            for w in words:
                # Check if this word is inside a table; if so, skip it
                in_table = False
                w_center_x = (w['x0'] + w['x1']) / 2
                w_center_y = (w['top'] + w['bottom']) / 2
                
                for bbox in table_bboxes:
                    if (bbox[0] < w_center_x < bbox[2]) and (bbox[1] < w_center_y < bbox[3]):
                        in_table = True
                        break
                if in_table:
                    continue

                # Group by Y position (rounded to combine words on same line)
                y_pos = round(w['top'])
                if y_pos not in lines:
                    lines[y_pos] = []
                lines[y_pos].append(w)
            
            # Sort lines by vertical position
            sorted_y = sorted(lines.keys())
            
            for y in sorted_y:
                line_words = lines[y]
                # Reconstruct the line text
                line_text = " ".join([w['text'] for w in line_words])
                
                # Check if it's a footer/header garbage
                if is_footer_or_header(line_text, page.height, y):
                    continue
                
                # Determine Heading Level based on max font size in the line
                max_size = max([w['size'] for w in line_words])
                
                # Heuristics for formatting
                if max_size > body_font_size * 1.5:
                    prefix = "# " # H1
                elif max_size > body_font_size * 1.1:
                    prefix = "## " # H2
                else:
                    prefix = ""
                
                # Bold logic (simplified: if the whole line is bold font)
                # pdfplumber font names usually contain "Bold"
                is_bold = all("Bold" in w.get("fontname", "") for w in line_words)
                if is_bold and prefix == "":
                    line_text = f"**{line_text}**"
                
                text_content.append(f"{prefix}{line_text}")

            # 3. Assemble Page Content
            # We append text, then insert tables at the end of the page (simplification)
            # Or interleave them if we tracked Y-positions strictly. 
            # For this script, we append tables after text for cleaner reading if strict flow isn't critical.
            
            page_md = "\n".join(text_content)
            for tbl in extracted_tables:
                # Heuristic: append tables at the end of the section
                page_md += f"\n\n{tbl}\n"
                
            markdown_content.append(page_md)

    # Save to file
    with open(md_path, "w", encoding="utf-8") as f:
        full_text = "\n\n---\n\n".join(markdown_content)
        f.write(full_text)
    
    print(f"Successfully saved to {md_path}")

# --- Execution ---
if __name__ == "__main__":
    # Replace with your actual file name
    input_pdf = "spinal_pain.pdf" 
    output_md = "spinal_pain.md"
    
    try:
        convert_pdf_to_md(input_pdf, output_md)
    except FileNotFoundError:
        print(f"Error: Could not find {input_pdf}. Make sure the file is in the same folder.")

Processing spinal_pain.pdf...
Detected Body Font Size: ~10pt
Scanning Page 1...
Scanning Page 2...
Scanning Page 3...
Scanning Page 4...
Scanning Page 5...
Scanning Page 6...
Scanning Page 7...
Scanning Page 8...
Scanning Page 9...
Scanning Page 10...
Scanning Page 11...
Scanning Page 12...
Scanning Page 13...
Scanning Page 14...
Successfully saved to spinal_pain.md


#Rag implementation

In [18]:
import os
import glob

from dotenv import load_dotenv
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_chroma import Chroma
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_community.document_loaders import DirectoryLoader, TextLoader
from langchain_text_splitters import MarkdownHeaderTextSplitter
from langchain_core.messages import SystemMessage, HumanMessage
from langchain_openai import ChatOpenAI
from langchain_core.documents import Document 

import gradio as gr

load_dotenv(override=True)

True

In [11]:
#using lanchain's loaders
from langchain_community.document_loaders import TextLoader

loader = TextLoader(
    file_path="spinal_pain.md",
    encoding="utf-8"
)

documents = loader.load()

md_text = documents[0].page_content

# print(f"Loaded {len(documents)} documents")
# print(type(documents[0]))

In [14]:
splitter = MarkdownHeaderTextSplitter(
    headers_to_split_on=[("##", "H2")],  # Targets file's ## headers
    strip_headers=True
)
md_header_splits = splitter.split_text(md_text)

# Only if split_text() returns dicts (older versions)
chunks = []
for split in md_header_splits:
    if isinstance(split, dict):
        chunks.append(Document(
            page_content=split["content"],
            metadata={**documents[0].metadata, **split["metadata"]}
        ))
    else:  # It's already a Document
        split.metadata.update(documents[0].metadata)  # Merge source metadata
        chunks.append(split)


In [None]:
# md_header_splits = splitter.split_text(md_text)
# print("First split type:", type(md_header_splits[0]))
# print("First split keys/attributes:", dir(md_header_splits[0]))
# print("First split:", md_header_splits[0])


First split type: <class 'langchain_core.documents.base.Document'>
First split keys/attributes: ['__abstractmethods__', '__annotations__', '__class__', '__class_getitem__', '__class_vars__', '__copy__', '__deepcopy__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__fields__', '__fields_set__', '__firstlineno__', '__format__', '__ge__', '__get_pydantic_core_schema__', '__get_pydantic_json_schema__', '__getattr__', '__getattribute__', '__getstate__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__iter__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__pretty__', '__private_attributes__', '__pydantic_complete__', '__pydantic_computed_fields__', '__pydantic_core_schema__', '__pydantic_custom_init__', '__pydantic_decorators__', '__pydantic_extra__', '__pydantic_fields__', '__pydantic_fields_set__', '__pydantic_generic_metadata__', '__pydantic_init_subclass__', '__pydantic_on_complete__', '__pydantic_parent_namespace__', '__pydantic_post_init__', '__pydan

In [16]:

# embeddings = HuggingFaceEmbeddings(model_name = "all-MiniLM-L6-v2")
embeddings = HuggingFaceEmbeddings(model_name = "BAAI/bge-large-en-v1.5")

db_name = "vector_db"

if os.path.exists(db_name):
    Chroma(persist_directory=db_name, embedding_function=embeddings).delete_collection()
    
vectorstore = Chroma.from_documents(documents=chunks, embedding=embeddings, persist_directory=db_name)

In [44]:
retreiver = vectorstore.as_retriever(search_type="similarity",search_kwargs={"k": 3})
# llm = ChatGoogleGenerativeAI(
#     model="gemini-3-flash-preview",
#     temperature=0.3,
#     max_output_tokens=512,
#     google_api_key=os.getenv("GOOGLE_API_KEY")
# )
llm = ChatOpenAI(
    model="arcee-ai/trinity-large-preview:free",  # example
    openai_api_key=os.getenv("OPENAI_API_KEY"),
    openai_api_base="https://openrouter.ai/api/v1",
    temperature=0.1,
)

In [31]:
SYSTEM_PROMPT_TEMPLATE = """
This is for your reference: <<CONTEXT>>

You are a healthcare payer policy compiler.

You are NOT a summarizer.
You are NOT a clinical interpreter.
You are NOT allowed to infer coverage.
You are NOT allowed to create logic from clinical trials.

Your job is to extract enforceable payer coverage rules EXACTLY as written.

You are compiling machine-executable coverage logic.

------------------------------------------------------------
EXTRACTION PRIORITY (STRICT ORDER OF AUTHORITY)
------------------------------------------------------------

1) Coverage Rationale section (highest authority)
2) Explicit coverage / non-coverage statements
3) Conditional coverage language ("covered when", "medically necessary when")
4) CPT / HCPCS applicability sections
5) Frequency limitations
6) Site-of-service requirements
7) Modifier requirements
8) Cross-policy dependencies
9) Clinical Evidence (for citation only; NEVER for eligibility logic)

------------------------------------------------------------
HARD CLASSIFICATION RULES
------------------------------------------------------------

If the policy uses ANY of the following phrases:

- "unproven"
- "not medically necessary"
- "investigational"
- "experimental"
- "not covered"
- "insufficient evidence"

THEN:

coverage_status = "not_covered"
enforcement_level = "hard"
decision_type = "investigational_or_exclusion"
auto_denial = true

NO CONDITIONS should be extracted for these procedures.

------------------------------------------------------------
CONDITIONAL COVERAGE EXTRACTION RULES
------------------------------------------------------------

Only extract conditions IF AND ONLY IF the policy explicitly states:

- "covered when"
- "medically necessary when"
- "considered medically necessary when"
- "coverage is provided when"

If such language does not exist:
→ Do NOT invent conditions.
→ Do NOT infer requirements.
→ Mark as hard denial if exclusionary language exists.

------------------------------------------------------------
CRITICAL EXTRACTION REQUIREMENTS
------------------------------------------------------------

For EACH distinct procedure or technique:

1) Create a SEPARATE rule object.
   - Do NOT merge multiple techniques into one.
   - Each technique = one atomic rule.

2) Extract ALL CPT and HCPCS codes if listed.
   - If codes exist in a separate section, attach them.
   - If no codes are listed, return empty array [].
   - Never hallucinate codes.

3) Preserve EXACT policy denial language in:
   "explicit_policy_language"

4) Extract frequency limits ONLY if explicitly stated.

5) Extract site-of-service requirements ONLY if explicitly stated.

6) Extract modifier requirements ONLY if explicitly stated.

7) If policy references another policy:
   - Include reference name in "referenced_policies"
   - Do NOT extract logic from referenced document unless present in context.

------------------------------------------------------------
CLINICAL EVIDENCE SECTION HANDLING
------------------------------------------------------------

Clinical Evidence sections:
- Explain rationale
- Do NOT define eligibility
- Do NOT convert study criteria into coverage rules
- May be used ONLY for citation context

------------------------------------------------------------
CONFLICT RESOLUTION
------------------------------------------------------------

If Clinical Evidence contradicts Coverage Rationale:
Coverage Rationale ALWAYS overrides.

------------------------------------------------------------
OUTPUT REQUIREMENTS
------------------------------------------------------------

Return STRICT VALID JSON only.
No markdown.
No explanation.
No commentary.
No summaries.

Schema:

{
  "metadata": {
    "payer": "",
    "policy_name": "",
    "policy_number": "",
    "effective_date": "",
    "plan_type": "",
    "region": ""
  },
  "procedures": [
    {
      "procedure_name": "",
      "codes": [],
      "coverage_status": "covered | conditional | not_covered",
      "decision_type": "",
      "enforcement_level": "hard | conditional",
      "auto_denial": true | false,
      "explicit_policy_language": "",
      "conditions": [],
      "frequency_limits": [],
      "site_of_service_requirements": [],
      "modifier_requirements": [],
      "referenced_policies": []
    }
  ]
}

Never summarize.
Never infer.
Never soften denial language.
Never generate hypothetical eligibility.

Return valid JSON only.
"""


In [22]:
def answer_question(question: str):
    docs = retreiver.invoke(question)
    context = "\n\n".join(doc.page_content for doc in docs)
    system_prompt = SYSTEM_PROMPT_TEMPLATE.replace("<<CONTEXT>>", context)
    # system_prompt = SYSTEM_PROMPT_TEMPLATE.format(context=context)
    response = llm.invoke([SystemMessage(content=system_prompt), HumanMessage(content=question)])
    return response.content

In [None]:
from datetime import datetime

def save_json(data, filename=None):
    # Auto-generate filename if not provided
    if filename is None:
        filename = "features1.json"  # Changed extension
    
    # Handle both dict/list input and JSON string input
    if isinstance(data, (dict, list)):
        json_str = json.dumps(data, indent=2, ensure_ascii=False)
    else:
        # Assume it's already JSON string (from your LLM output)
        json_str = data.strip("```json\n").strip("```").strip()
    
    with open(filename, "w", encoding="utf-8") as f:
        f.write(json_str)
    
    print(f"JSON file saved as: {filename}")
    
    
a = answer_question("Analyze the spinal-pain policy and give insights on procedures that must be preffered in order to treat a patient and satisfies the policy rules.")
save_json(a)

✅ JSON file saved as: features1.json
