In [None]:
import json
import logging
import os
import time
import asyncio
import re
from typing import Dict, List, Any, Optional, Tuple, Union
from pathlib import Path
from datetime import datetime
import traceback
import warnings

import google.generativeai as genai
from google.genai import types

import fitz
import PyPDF2

import httpx

from pydantic import BaseModel, Field, validator
from pydantic.types import constr, conint

import pathlib
import shutil
from io import BytesIO

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(name)s - %(message)s'
)
logger = logging.getLogger(__name__)

warnings.filterwarnings("ignore", category=UserWarning)


: 

In [None]:
from dotenv import load_dotenv
load_dotenv()  

api_key = os.getenv('GEMINI_API_KEY')
genai.configure(api_key=api_key)

# PA DATA

In [3]:
def extract_fields_with_positions(pdf_path):
   doc = fitz.open(pdf_path)
   fields = []
   for page_num, page in enumerate(doc, start=1):
       for w in page.widgets() or []:
           field = {
               "name": w.field_name,
               "type": "checkbox" if w.field_type == fitz.PDF_WIDGET_TYPE_CHECKBOX else "text",
               "value": w.field_value,
               "page": page_num,
               "field_type": w.field_type,
               "field_type_string": w.field_type_string,
               "field_label": w.field_label,
           }
           fields.append(field)

   # Group fields by page
   fields_by_page = {}
   for field in fields:
       page_num = field['page']
       if page_num not in fields_by_page:
           fields_by_page[page_num] = []
       fields_by_page[page_num].append(field)
   
   return fields_by_page

In [None]:
abdulla_pa_fields = extract_fields_with_positions("Input Data/Abdulla/PA.pdf")
abdulla_pa_fields

In [7]:
PROMPT_PA = """You are an expert medical document processing assistant specializing in Prior Authorization (PA) form analysis and field mapping. Your task is to process and enrich PA form field data with detailed contextual information.

Given Input:
1. A structured dataset containing PA form field definitions including:
   - Field names (e.g. CB1, T1)
   - Field types (checkbox, text, etc.)
   - Page numbers
   - Field labels
   - Current values

2. The complete Prior Authorization form PDF document

Required Processing:
For each form field, analyze sequentially by page number and:

1. Extract the implicit question being asked by the field
   - For checkboxes: Frame the label as a yes/no question
   - For text fields: Frame as an information request
   - For dates: Specify what event/action the date refers to

2. Generate rich contextual information that includes:
   - The section/category the field belongs to
   - Whether it's a primary question or sub-question
   - Whose information is being requested (patient, provider, insurer)
   - Any dependencies on other fields
   - Clinical relevance of the requested information

<CRITICAL_REQUIREMENTS>
- Every field must have both question and context added
- Context must be specific and clinically relevant
- Maintain logical relationships between fields
- Preserve exact field names and labels
- Keep context concise but informative (25 words max)
- Only output valid JSON
</CRITICAL_REQUIREMENTS>

<RESPONSE_FORMAT>
Each output JSON object should only contain the fields - name, type, page, field_label, question, context in the following format:
{{"name": "CB1",
 "type": "checkbox",
 "page": 2,
 "field_label": "Start of treatment",
 "question": "Is this a new treatment start for the patient?",
 "context": "Initial checkbox in treatment timeline section indicating whether patient is beginning new therapy versus continuing existing treatment."}}
{{"name": "T2",
 "type": "text", 
 "page": 2,
 "field_label": "Start date: (MM)",
 "question": "What is the month of treatment start?",
 "context": "2-digit month format for planned medication initiation date in treatment scheduling section."}}
</RESPONSE_FORMAT>

<PA_FORM_DATA>
{page_fields}
</PA_FORM_DATA>

Return valid JSON array only. No explanations outside the JSON."""

In [None]:
async def query_gemini_async(prompt, pdf_path, model="gemini-2.5-flash"):
    import pathlib
    
    filepath = pathlib.Path(pdf_path)
    loop = asyncio.get_event_loop()
    
    # Configure model for JSON output
    generation_config = genai.GenerationConfig(
        response_mime_type="application/json"
    )
    
    response = await loop.run_in_executor(
        None,
        lambda: genai.GenerativeModel(
            model,
            generation_config=generation_config
        ).generate_content([
            genai.upload_file(path=filepath),
            prompt
        ])
    )
    
    return response.text

async def process_pa_fields_async(pa_fields_data, pdf_path):
    async def process_page(page_num, page_fields):
        prompt = PROMPT_PA.format(page_fields=json.dumps(page_fields))
        result = await query_gemini_async(prompt, pdf_path)
        return page_num, result
    
    # Create tasks for all pages
    tasks = [process_page(page, fields) for page, fields in pa_fields_data.items()]
    
    # Run all pages concurrently
    results = await asyncio.gather(*tasks)
    
    # Simple results structure
    enhanced_fields = {}
    for page, result in results:
        enhanced_fields[page] = result
        print(f"Page {page} processed")
    
    return enhanced_fields

# Usage
enhanced_pa_data = await process_pa_fields_async(abdulla_pa_fields, "Input Data/Abdulla/PA.pdf")



Page 2 processed
Page 3 processed
Page 4 processed
Page 5 processed


In [9]:
enhanced_pa_data

{2: '[\n  {\n    "name": "CB1",\n    "type": "checkbox",\n    "page": 2,\n    "field_label": "Start of treatment",\n    "question": "Is this request for the start of a new treatment for the patient?",\n    "context": "Initial checkbox in treatment timeline section indicating whether patient is beginning new therapy."\n  },\n  {\n    "name": "T2",\n    "type": "text",\n    "page": 2,\n    "field_label": "Start date: (MM)",\n    "question": "What is the two-digit month for the treatment start date?",\n    "context": "Month component for the planned medication initiation date in treatment scheduling section."\n  },\n  {\n    "name": "T3",\n    "type": "text",\n    "page": 2,\n    "field_label": "Start date: (DD)",\n    "question": "What is the two-digit day for the treatment start date?",\n    "context": "Day component for the planned medication initiation date in treatment scheduling section."\n  },\n  {\n    "name": "T4",\n    "type": "text",\n    "page": 2,\n    "field_label": "Start d

In [19]:
all_pages_data = {}

for page_num, json_response in enhanced_pa_data.items():
    page_data = json.loads(json_response)
    all_pages_data[page_num] = page_data

# Save as one pretty JSON file
with open("pa_all_fields.json", "w") as f:
    json.dump(all_pages_data, f, indent=2, ensure_ascii=False)

print(f"Saved enhanced PA fields")

Saved enhanced PA fields


# MEDICAL DATA

In [22]:
REFERRAL_PACKAGE_PROMPT = """You are an expert medical document processing assistant specializing in Prior Authorization (PA) forms and medical documentation. You are given a list of PA form fields with their associated context and questions. Your task is to thoroughly analyze the provided PDF referral package and extract all relevant information to accurately fill out the PA form.

## CRITICAL INSTRUCTIONS:
1. **NEVER leave answer fields empty or null** - always provide a specific value
2. **For missing information**: Use "Not documented" or "Not specified" instead of empty strings
3. **For checkbox fields**: Always answer with either "Yes" or "No" (never true/false or empty)
4. **For text fields**: Provide the exact information or "Not available" if truly missing
5. **For dates**: Use MM/DD/YYYY format (unless format is specified) or "Not specified" if date is missing 
6. **Be thorough**: Review the ENTIRE document multiple times to find all relevant information

## DETAILED EXTRACTION GUIDELINES:

### Patient Information:
- Extract ALL demographic details (name, DOB, address, phone, insurance)
- Look for patient information in headers, footers, cover pages, and forms
- Check multiple pages for complete contact information

### Medical Information:
- **Diagnoses**: Extract primary and secondary diagnoses with ICD-10 codes if available
- **Medications**: Include exact drug names, strengths, frequencies, routes of administration
- **Treatment History**: Look for previous medications tried, dates, outcomes, failures
- **Clinical Notes**: Extract relevant symptoms, assessments, lab results
- **Provider Details**: Include all prescribing physicians, NPIs, addresses, phone numbers

### Administrative Details:
- **Insurance**: Member IDs, group numbers, prior authorization numbers
- **Facility Information**: Infusion centers, pharmacies, administration locations
- **Dates**: Treatment start dates, last treatment dates, prescription dates

## ANSWER FORMAT REQUIREMENTS:

**For Checkbox Fields (CB prefixes):**
- Answer ONLY with "Yes" or "No" 
- If unclear, use clinical judgment based on available information
- Example: If asking about "Start of treatment" and document shows new prescription → "Yes"

**For Text Fields (T prefixes):**
- Provide exact values from the document
- For dates: Use MM/DD/YYYY format (e.g., "05/22/2024") unless format is specified 
- For names: Use exact spelling and format from document
- For missing info: Use "Not documented" instead of leaving blank

**For Yes/No Questions:**
- Base answers on clinical evidence in the document
- If patient has the condition/medication/history mentioned → "Yes"
- If explicitly stated they don't have it or no evidence found → "No"

## VALIDATION CHECKLIST:
Before submitting, ensure:
- ✓ Every field has a non-empty answer
- ✓ All checkbox answers are "Yes" or "No"
- ✓ All dates follow MM/DD/YYYY format
- ✓ Patient demographics are complete
- ✓ Medication information is detailed and accurate
- ✓ No fields are left with null, empty strings, or boolean values

<PA_FORM_DATA>
{pa_form_fields}
</PA_FORM_DATA>

<RESPONSE FORMAT>
[
  {{
    "name": "CB1",
    "page": 2,
    "field_label": "Start of treatment",
    "answer": "Yes"
  }},
  {{
    "name": "T2",
    "page": 2,
    "field_label": "Start date: (MM)",
    "answer": "05"
  }}
]
</RESPONSE FORMAT>

**CRITICAL**: Every field must have a specific answer - no empty strings, no null values, no boolean true/false."""

In [60]:
from pydantic import BaseModel, Field, ValidationError
from typing import List, Dict, Any
import json, re, asyncio, pathlib
import google.generativeai as genai

# ---------- Schema ----------

class PAFormAnswer(BaseModel):
    name: str
    page: int
    field_label: str
    answer: str = Field(description="answer to the question based on the referral package PDF")

# ---------- Parsing + Validation ----------

def _safe_json_loads(text: str) -> Any:
    s = text.strip()

    # 1) Strip ```json ... ``` or ``` ... ``` fences if present
    m = re.search(r"```(?:json)?\s*(.*?)\s*```", s, re.S | re.I)
    if m:
        s = m.group(1).strip()

    # 2) Keep only outermost JSON array or object if extra prose exists
    # Prefer array (your prompt returns a list)
    a1, a2 = s.find('['), s.rfind(']')
    o1, o2 = s.find('{'), s.rfind('}')
    if a1 != -1 and a2 != -1 and a2 > a1:
        s = s[a1:a2+1]
    elif o1 != -1 and o2 != -1 and o2 > o1:
        s = s[o1:o2+1]

    return json.loads(s)

def parse_and_validate_answers(response_text: str) -> List[PAFormAnswer]:
    data = _safe_json_loads(response_text)

    # If model returned a single object, wrap into list
    if isinstance(data, dict):
        data = [data]
    if not isinstance(data, list):
        raise ValueError("Model output is not a JSON array or object.")

    validated: List[PAFormAnswer] = []
    for i, item in enumerate(data):
        try:
            validated.append(PAFormAnswer(**item))
        except ValidationError as e:
            # Surface which element failed
            raise ValueError(f"Pydantic validation failed at index {i}: {e}") from e
    return validated

# ---------- Gemini calls (page-by-page) ----------

async def fill_single_page_from_referral(page_num: int,
                                         page_fields: list,
                                         referral_pdf_path: str,
                                         model: str = "gemini-2.5-flash"):
    prompt = REFERRAL_PACKAGE_PROMPT.format(
        pa_form_fields=json.dumps(page_fields, indent=2)
    )

    loop = asyncio.get_event_loop()

    uploaded = await loop.run_in_executor(
        None, lambda: genai.upload_file(path=pathlib.Path(referral_pdf_path))
    )

    response = await loop.run_in_executor(
        None,
        lambda: genai.GenerativeModel(
            model,
            generation_config=genai.GenerationConfig(
                response_mime_type="application/json"
            ),
        ).generate_content([uploaded, prompt]),
    )

    answers = parse_and_validate_answers(response.text)  # robust parse + pydantic
    return page_num, [a.model_dump() for a in answers]   # return plain dicts

async def fill_pa_pages_sequential(enhanced_fields_by_page: Dict,
                                   referral_pdf_path: str,
                                   model: str = "gemini-2.5-flash"):
    # Normalize: allow {field_name: obj} or {page: [fields...]}
    first_key = next(iter(enhanced_fields_by_page), None)
    if first_key is None:
        return {}

    if isinstance(enhanced_fields_by_page[first_key], dict):  # looks like field objects
        by_page: Dict[int, list] = {}
        for f in enhanced_fields_by_page.values():
            p = int(f["page"])
            by_page.setdefault(p, []).append(f)
        enhanced_fields_by_page = by_page

    filled_pages: Dict[int, List[Dict[str, Any]]] = {}
    for page_num in sorted(enhanced_fields_by_page.keys()):
        page_fields = enhanced_fields_by_page[page_num]
        page_ret, page_results = await fill_single_page_from_referral(
            page_num=page_num,
            page_fields=page_fields,
            referral_pdf_path=referral_pdf_path,
            model=model,
        )
        filled_pages[page_ret] = page_results
        print(f"Page {page_ret} completed")

    return filled_pages

filled_results = await fill_pa_pages_sequential(
    enhanced_pa_data,                          # either {page:[...]} or {name: {...}}
    "Input Data/Abdulla/referral_package.pdf"
)

Page 2 completed
Page 3 completed
Page 4 completed
Page 5 completed


In [50]:
filled_results

{2: [{'name': 'CB1',
   'page': 2,
   'field_label': 'Start of treatment',
   'answer': 'Yes'},
  {'name': 'T2', 'page': 2, 'field_label': 'Start date: (MM)', 'answer': '05'},
  {'name': 'T3', 'page': 2, 'field_label': 'Start date: (DD)', 'answer': '22'},
  {'name': 'T4',
   'page': 2,
   'field_label': 'Start date: (YYYY)',
   'answer': '2024'},
  {'name': 'CB5',
   'page': 2,
   'field_label': 'Continuation of therapy',
   'answer': 'No'},
  {'name': 'T6',
   'page': 2,
   'field_label': 'Date of last treatment: (MM)',
   'answer': 'Not documented'},
  {'name': 'T7',
   'page': 2,
   'field_label': 'Date of last treatment: (DD)',
   'answer': 'Not documented'},
  {'name': 'T8',
   'page': 2,
   'field_label': 'Date of last treatment: (YYYY)',
   'answer': 'Not documented'},
  {'name': 'T9',
   'page': 2,
   'field_label': 'Precertification Requested By:',
   'answer': 'Erfan Rostami, BSN, RN'},
  {'name': 'T10',
   'page': 2,
   'field_label': 'Phone:',
   'answer': '615-343-1176'},


In [51]:
# Count answers across all pages
total_fields = 0
answered = 0
not_documented = 0

for page_fields in filled_results.values():
    for field in page_fields:
        total_fields += 1
        if field["answer"].strip().lower() in ["not documented", "not specified", "not available"]:
            not_documented += 1
        else:
            answered += 1

print(f"Total fields: {total_fields}")
print(f"Answered: {answered}")
print(f"Not documented: {not_documented}")
print(f"Answered %: {answered/total_fields*100:.2f}%")
print(f"Not documented %: {not_documented/total_fields*100:.2f}%")


Total fields: 335
Answered: 263
Not documented: 72
Answered %: 78.51%
Not documented %: 21.49%


In [64]:
import fitz  # PyMuPDF
from pathlib import Path

def build_answer_index(filled_results: dict) -> dict:
    """
    filled_results: {page_num: [ {name, page, field_label, answer}, ...], ...}
    returns: {field_name: answer}
    """
    idx = {}
    for page, items in filled_results.items():
        for it in items:
            name = str(it.get("name", "")).strip()
            if not name:
                continue
            idx[name] = str(it.get("answer", "")).strip()
    return idx

def _bool_from_yes_no(s: str) -> bool:
    return str(s).strip().lower() in ("yes", "y", "true", "checked", "on", "1")

def fill_pa_pdf_from_answers(
    pa_pdf_in: str,
    filled_results: dict,
    out_pdf: str = None,
    make_flattened_copy: bool = False
) -> str:
    """
    Fills the PDF fields using answers. Returns the path to the filled PDF.
    - pa_pdf_in: path to the original PA PDF
    - filled_results: your dict keyed by page number with answer dicts
    - out_pdf: optional output path; defaults to '<name>_filled.pdf'
    - make_flattened_copy: also write a '<name>_filled_flat.pdf' (basic flatten)
    """
    pa_pdf_in = str(pa_pdf_in)
    out_pdf = out_pdf or str(Path(pa_pdf_in).with_name(Path(pa_pdf_in).stem + "_filled.pdf"))

    answers_by_name = build_answer_index(filled_results)

    doc = fitz.open(pa_pdf_in)
    filled = 0
    missing = []

    for page in doc:  # iterate pages
        widgets = page.widgets() or []
        for w in widgets:
            fname = w.field_name or ""
            if not fname:
                continue
            if fname not in answers_by_name:
                missing.append(fname)
                continue

            ans = answers_by_name[fname]

            try:
                # Checkbox
                if w.field_type == fitz.PDF_WIDGET_TYPE_CHECKBOX:
                    checked = _bool_from_yes_no(ans)
                    # Primary way: set On/Off value and update appearance
                    w.field_value = "Yes" if checked else "Off"
                    w.update()
                    filled += 1
                else:
                    # Text / other entry
                    w.field_value = ans
                    w.update()
                    filled += 1
            except Exception:
                # Fallbacks for odd PDFs
                try:
                    if w.field_type == fitz.PDF_WIDGET_TYPE_CHECKBOX:
                        # Some versions expose a button_set helper
                        w.button_set(_bool_from_yes_no(ans))
                    else:
                        w.set_value(str(ans))
                    w.update()
                    filled += 1
                except Exception as e:
                    print(f"Could not write field '{fname}': {e}")

    # Save the filled, editable form
    doc.save(out_pdf, deflate=True)
    doc.close()

    print(f"Filled {filled} widgets. Missing answers for {len(missing)} fields.")
    if missing:
        # optional: inspect a few missing names to refine mapping
        print("Examples of fields with no answer:", missing[:10])

    # (Optional) basic flatten: reopen and save with cleaning / appearance streams kept
    if make_flattened_copy:
        flat_path = str(Path(out_pdf).with_name(Path(out_pdf).stem + "_flat.pdf"))
        d2 = fitz.open(out_pdf)
        # This "flatten" keeps appearances and removes form editing.
        # (True, full flattening can be more involved across PDFs, but this works for most.)
        d2.save(flat_path, deflate=True, garbage=4, clean=True)
        d2.close()
        print(f"Flattened copy written to: {flat_path}")

    return out_pdf


In [65]:
filled_pdf_path = fill_pa_pdf_from_answers(
    pa_pdf_in="Input Data/Abdulla/PA.pdf",
    filled_results=filled_results,   
    out_pdf="Input Data/Abdulla/PA_filled.pdf",
    make_flattened_copy=True         
)
print("Saved:", filled_pdf_path) # open it with Adobe 


MuPDF error: format error: partial block in aes filter

Filled 335 widgets. Missing answers for 0 fields.
Flattened copy written to: Input Data/Abdulla/PA_filled_flat.pdf
Saved: Input Data/Abdulla/PA_filled.pdf
