# Setting up the notebook

In [None]:
pip install google-generativeai httpx pydantic PyMuPDF pdfplumber pytesseract ocrmypdf

In [61]:
import httpx
import os
import asyncio
import time
import json
import logging

import concurrent.futures

from typing import Dict, List, Any, Optional, Tuple
from pathlib import Path
import re
from datetime import datetime
import pathlib
import fitz
from pydantic import BaseModel, Field

import subprocess
import pdfplumber
import pytesseract
from PIL import Image

import google.generativeai as genai


In [62]:
gemini_client = genai.configure(api_key=os.getenv('GOOGLE_API_KEY'))
gemini_model = "gemini-2.5-flash-lite-preview-06-17"

In [63]:
person_name = "Amy"

input_pdf = f"Input Data/{person_name}/referral_package.pdf"
searchable_pdf = f"Input Data/{person_name}/generated/referral_package_searchable.pdf"
input_pa_form = f"Input Data/{person_name}/PA.pdf"
output_pa_form = f"Output Data/{person_name}/PA_filled.pdf"
lang = "eng"

In [64]:
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Extracting data from the Referral Form using OCR

In [65]:
# Uses OCRmyPDF to apply an OCR layer to a scanned PDF, return both extracted text and image-based OCR
# these values are used to generate a prompt for the Gemini model to answer the questions

def apply_ocrmypdf_to_input_pdf(input_pdf: str, output_pdf: str, lang: str = "eng"):
    os.makedirs(os.path.dirname(output_pdf), exist_ok=True)
    
    subprocess.run(
        [
            "ocrmypdf",
            "--language", 
            lang,
            "--deskew",
            "--rotate-pages",
            input_pdf,
            output_pdf,
        ],
        check=True,
    )

In [66]:
# Function to extract text from a searchable PDF using pdfplumber 
# and OCR on each page image using pytesseract and return a dictionary of pages with the text
pages: dict[str, dict[str, str]] = {}

def extract_text_searchable(pdf_path: str):
    """Extract text from a searchable PDF using pdfplumber."""
    with pdfplumber.open(pdf_path) as pdf:
        for i, page in enumerate(pdf.pages, 1):
            text = page.extract_text() or ""
            if "Page " + str(i) not in pages:
                pages["Page " + str(i)] = {"pdfplumber": "", "ocr": ""}
            pages["Page " + str(i)]["pdfplumber"] = text


def ocr_images_from_pdf(pdf_path: str, lang: str = "eng", dpi: int = 300):
    """Perform OCR on each PDF page image using pytesseract."""
    with pdfplumber.open(pdf_path) as pdf:
        for i, page in enumerate(pdf.pages, 1):
            pil_img = page.to_image(resolution=dpi).original
            gray = pil_img.convert("L").point(lambda x: 0 if x < 128 else 255)
            text = pytesseract.image_to_string(gray, lang=lang)
            if "Page " + str(i) not in pages:
                pages["Page " + str(i)] = {"pdfplumber": "", "ocr": ""}
            pages["Page " + str(i)]["ocr"] = text

In [67]:
apply_ocrmypdf_to_input_pdf(input_pdf, searchable_pdf, lang)

Start processing 8 pages concurrently
    7 page is facing ⇧, confidence 10.10 - no change
    8 page is facing ⇧, confidence 9.24 - no change
    6 page is facing ⇧, confidence 11.78 - no change
    3 page is facing ⇧, confidence 15.14 - rotation appears correct
    2 page is facing ⇧, confidence 11.38 - no change
    5 page is facing ⇧, confidence 14.18 - rotation appears correct
    4 page is facing ⇧, confidence 12.99 - no change
    1 page is facing ⇧, confidence 21.19 - rotation appears correct
    9 page is facing ⇧, confidence 15.27 - rotation appears correct
Postprocessing...
Image optimization ratio: 1.19 savings: 15.7%
Total file size ratio: 1.83 savings: 45.3%
Output file is a PDF/A-2B (as expected)


In [68]:
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as exe:
        fut1 = exe.submit(extract_text_searchable, searchable_pdf)
        fut2 = exe.submit(ocr_images_from_pdf, input_pdf, "eng")
        concurrent.futures.wait([fut1, fut2])

# Extracting the fields from PA Form

In [69]:
def extract_fields_with_positions(pdf_path):
    doc = fitz.open(pdf_path)
    fields = []
    for page_num, page in enumerate(doc, start=1):
        for w in page.widgets() or []:
            field = {
                "name": w.field_name,
                "type": "checkbox" if w.field_type == fitz.PDF_WIDGET_TYPE_CHECKBOX else "text",
                "value": w.field_value,
                "page": page_num,
                "field_type": w.field_type,
                "field_type_string": w.field_type_string,
                "field_label": w.field_label,
            }

            print(field)
            fields.append(field)

    fields_by_page = {}
    for field in fields:
        page_num = field['page']
        if page_num not in fields_by_page:
            fields_by_page[page_num] = []
        fields_by_page[page_num].append(field)
    return fields_by_page

In [70]:
pa_page_fields = extract_fields_with_positions(input_pa_form)

## Using Gemini to get PA form contexts

In [71]:
def form_pa_prompt(pa_fields):
    return f"""You are an expert medical document processing assistant specializing in Prior Authorization (PA) forms.
Given Input:
1. A list of PA fields (name, type, page, label, current value)
2. The full PA form PDF

Task:
For each field in page order:
1. Infer the explicit question:
   – Checkbox → yes/no question
   – Text → information request
   – Date → specify the referenced event/action
2. Generate concise (≤25 words) context including:
   section/category, primary vs. sub-question, whose data (patient/provider/insurer),
   any dependencies/conditional logic, clinical relevance
3. Preserve parent-child relationships and logical flow

Critical Requirements:
- Every field must have both question and context added
- Context must be specific and clinically relevant
- Maintain logical relationships between fields
- Preserve exact field names and labels
- Keep context concise but informative (25 words max)

IMPORTANT: RESPONSE FORMAT
Each output JSON object must only contain the fields name, type, page,
field_label, question, context in this format:
{{'name': 'CB1', 'type': 'checkbox', 'page': 2,
  'field_label': 'Start of treatment',
  'question': 'Is this a new treatment start for the patient?',
  'context': 'Initial checkbox in treatment timeline indicating whether patient begins new therapy.'}},
{{'name': 'T2', 'type': 'text', 'page': 2,
  'field_label': 'Start date: (MM)',
  'question': 'What is the treatment start date (MM)?',
  'context': 'Captures the calendar month when treatment commenced for scheduling.'}}

<PA_FORM_DATA>
{pa_fields}
</PA_FORM_DATA>

Only output valid JSON."""

In [72]:
from google import genai
from google.genai import types

client = genai.Client()

async def query_gemini_async(
    prompt: str,
    pdf_path: str,
    model: str = gemini_model,
) -> str:
    filepath = pathlib.Path(pdf_path)

    loop = asyncio.get_event_loop()
    response = await loop.run_in_executor(
        None,
        lambda: client.models.generate_content(
            model=model,
            contents=[
                types.Part.from_bytes(
                    data=filepath.read_bytes(),
                    mime_type="application/pdf",
                ),
                prompt,
            ],
        ),
    )

    return response.text or ""

pa_fields_with_context: dict[int, str] = {}

async def process_page(page: int):
    prompt = form_pa_prompt(pa_page_fields[page])

    result = await query_gemini_async(
        prompt,
        pdf_path=input_pa_form,
    )
    return page, result

tasks = [process_page(page) for page in pa_page_fields]
results = await asyncio.gather(*tasks)
for page, result in results:
    pa_fields_with_context[page] = result

In [73]:
from google import genai
from google.genai import types
client = genai.Client()

class PAFormAnswer(BaseModel):
    name: str
    page: int
    field_label: str
    answer: str = Field(
        description="answer to the question based on the referral package PDF"
    )


def query_gemini_for_answers(prompt, referral_package, model=gemini_model):
    referral_package_filepath = pathlib.Path(referral_package)

    response = client.models.generate_content(
        model=model,
        contents=[
            types.Part.from_bytes(
                data=referral_package_filepath.read_bytes(),
                mime_type='application/pdf',
            ),
            
            prompt
        ],
        config={
            "response_mime_type": "application/json",
            "response_schema": list[PAFormAnswer],
        }
    )

    return response.text

In [74]:
referral_package_prompt = """You are an expert medical document processing assistant specializing in prior authorization (PA) forms and medical documentation. You are given a list of PA form fields with their associated context and questions. Your task is to thoroughly analyze the provided PDF referral package and extract all relevant information to accurately fill out the PA form.

Please follow these guidelines:
1. Carefully review each field in the <PA_FORM_DATA> and understand its requirements
2. Extract exact dates, diagnoses, medications, dosages, and other clinical details from the referral package to fill out the <PA_FORM_DATA>
Use the OCR_EXTRACTED_TEXT to help you find the information you need & answer the questions correctly. It is important to override OCR_EXTRACTED_TEXT if a better answer is available.
3. Match the extracted information to the corresponding PA form fields
4. Pay special attention to:
   - Patient demographics and insurance information
   - Current diagnoses and ICD-10 codes
   - Medication details (name, strength, frequency, duration)
   - Previous treatments and their outcomes
   - Clinical assessments and test results
   - Provider information and signatures
5. If information is missing or unclear, indicate this explicitly
6. Format dates as MM/DD/YYYY unless otherwise specified
7. Use exact values and terminology from source documents when possible

<PA_FORM_DATA>
""" + str(pa_fields_with_context) + """
</PA_FORM_DATA>

<OCR_EXTRACTED_TEXT>
""" + str(pages) + """
</OCR_EXTRACTED_TEXT>

## IMPORTANT: RESPONSE FORMAT

FOR CHECKBOXES:
- They must be ALWAYS filled out with "Yes" or "No"

Each output JSON object should only contain the fields – name, page, field_label, answer – in the following format:
{'name': 'CB1', 'page': 2, 'field_label': 'Start of treatment', 'answer': 'answer to the question based on the referral package PDF'},
{'name': 'T2',   'page': 2, 'field_label': 'Start date: (MM)',   'answer': 'answer to the question based on the referral package PDF'}

Note: The Request Completed By field should always have today's date in the format MM/DD/YYYY

"""

In [75]:
pa_form_answers = query_gemini_for_answers(
referral_package_prompt,
    input_pdf,
    model=gemini_model
)

2025-06-22 23:23:09,502 - INFO - AFC is enabled with max remote calls: 10.
2025-06-22 23:23:32,647 - INFO - HTTP Request: POST https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash-lite-preview-06-17:generateContent "HTTP/1.1 200 OK"
2025-06-22 23:23:32,669 - INFO - AFC remote call 1 is done.


In [76]:
print(pa_form_answers)

[
  {
    "name": "T1",
    "page": 1,
    "field_label": "Date:",
    "answer": "04/08/2023"
  },
  {
    "name": "T2",
    "page": 1,
    "field_label": "To:",
    "answer": "Foresight Health"
  },
  {
    "name": "T3",
    "page": 1,
    "field_label": "Re:",
    "answer": "Amy Chen"
  },
  {
    "name": "T4",
    "page": 1,
    "field_label": "Attention:",
    "answer": "UT Neurology"
  },
  {
    "name": "T5",
    "page": 1,
    "field_label": "From:",
    "answer": "UT Neurology Angela Nicole NPC/Catherine CMA"
  },
  {
    "name": "T6",
    "page": 2,
    "field_label": "Patient Name",
    "answer": "Chen, Amy"
  },
  {
    "name": "T7",
    "page": 2,
    "field_label": "Date of Birth",
    "answer": "05/23/1983"
  },
  {
    "name": "T8",
    "page": 2,
    "field_label": "Age",
    "answer": "39 y.o."
  },
  {
    "name": "T9",
    "page": 2,
    "field_label": "Social Security Number",
    "answer": "xxx-xx-8110"
  },
  {
    "name": "T10",
    "page": 2,
    "field_label": 

In [77]:
from io import BytesIO
import fitz

YES_WORDS = {"yes", "true", "on", "1", "checked"}

def is_checked(val) -> bool:
    return str(val).strip().lower() in YES_WORDS


def fill_pa_form(pdf_path, field_mapping, out_path):
    os.makedirs(os.path.dirname(out_path), exist_ok=True)
    
    doc = fitz.open(pdf_path)

    for page in doc:
        for w in page.widgets() or []:
            data = field_mapping.get(w.field_name)
            if data is None:
                continue


            val = data["value"] if isinstance(data, dict) else data

            if data['field_type'] == 'checkbox':
                on_token = w.on_state() or "Yes"
                w.field_value = on_token if is_checked(val) else "Off"
            else:
                w.field_value = str(val)

            w.update()                


    if out_path:
        doc.save(out_path,
                 deflate=True,
                 incremental=False,
                 encryption=fitz.PDF_ENCRYPT_KEEP)
        doc.close()
    else:
        buf = BytesIO()
        doc.save(buf,
                 deflate=True,
                 incremental=False,
                 encryption=fitz.PDF_ENCRYPT_KEEP)
        buf.seek(0)
        doc.close()
        return buf

In [None]:
import json
 
if isinstance(pa_form_answers, str):
    pa_form_answers = json.loads(pa_form_answers)

print(pa_page_fields.items())
answer_map = {(a["page"], a["name"]): a["answer"] for a in pa_form_answers}


field_mapping = {}

for page_no, page_fields in pa_page_fields.items():
    for fld in page_fields:
        ans = answer_map.get((page_no, fld["name"]))
        
        if ans is None or ans == 'N/A' or ans == 'Unknown' or ans == 'No':
            continue

        fld["value"] = ans

        field_mapping[fld["name"]] = {"value": ans, "field_type": fld["type"], "field_name": fld["name"]}

fill_pa_form(input_pa_form, field_mapping, output_pa_form)

dict_items([])
