In [1]:
import os
import io
from typing import Optional, Tuple, Literal, Dict, Any
from dataclasses import dataclass
import base64
import requests
from tqdm import tqdm
from pdf2image import convert_from_bytes
from pikepdf import Pdf

def post_page(url: str, page_bytes: bytes, filename: str,
              char_unit: int = 100, max_spaces: int = 200, y_tolerance: int = 100,
              timeout: tuple[float, float] = (10, 120)) -> Dict[str, Any]:
    """
    Send one page to /read_pdf_text via multipart/form-data and return parsed JSON.
    """
    files = {"file": (filename, io.BytesIO(page_bytes), "application/pdf")}
    data = {
        "char_unit": str(char_unit),
        "max_spaces": str(max_spaces),
        "y_tolerance": str(y_tolerance),
    }
    r = requests.post(url, data=data, files=files, timeout=timeout)
    r.raise_for_status()
    return r.json()

def pdf_obj_to_bytes(pdf_obj):
    dst = Pdf.new()
    dst.pages.append(pdf_obj)  # copy page object (preserves text layer)
    buf = io.BytesIO()
    dst.save(buf)
    return buf.getvalue()

In [2]:
pdf_path = 'obfuscated_fake_cbiz_prof_10_pages_3.pdf'
src = Pdf.open(pdf_path)   
n_pages = len(src.pages)

per_page_text_and_images = []

for pg in tqdm(range(n_pages)):
    # processing each page
    pdf_page_obj = src.pages[pg]

    # convert pikepdf obj to bytes
    pg_bytes = pdf_obj_to_bytes(pdf_page_obj)
    
    # getting image in form PIL object
    images = convert_from_bytes(
        pg_bytes,
        dpi=300,
        single_file=True,
    )

    # converting PIL to bytes
    buffered = io.BytesIO()
    images[0].save(buffered, format="PNG", optimize=True)
    img_bytes = buffered.getvalue()
    
    # Encode image bytes to base64 string
    img_base64 = base64.b64encode(img_bytes).decode("utf-8")
    
    # getting base file name from path
    pg_filename = os.path.basename(pdf_path)
    pg_file_name_without_ext = os.path.splitext(pg_filename)[0]
    
    # getting full text from libre draw api
    res = post_page(url = "http://localhost:5000/read_pdf_text", page_bytes = pg_bytes, filename = pg_file_name_without_ext + '.pdf')

    per_page_text_and_images.append({ 'page_number': pg + 1, 'pil_image': images[0], 'base64_image': img_base64, 'full_text': res['extracted_text'] })

100%|██████████████████████████████████████████████████████████████████████████████████| 10/10 [00:24<00:00,  2.46s/it]


In [13]:
import os
from pathlib import Path
extraction_path = r'D:\ADP\Data_Extraction\runs\9e30bd5e-3c4e-4e24-a77c-7c972ac31124'
base = Path(extraction_path)

for pg, page_dir in enumerate(os.listdir(base)):
    page_num = pg + 1
    file_name = 'page_' + str(page_num) + '_full_text.md'
    out_path = Path(base / page_dir / file_name)
    out_path.write_text(per_page_text_and_images[pg]['full_text'])

In [52]:
human_reviewed_schema = \
{
 'Form_fields': [
  'Company Name',
  'Company Code',  
  'Period',
  'Employee Name',
  'Address line 1',  
  'City',
  'State',
  'Zip',
  'Emp Id',
  'SSN',
  'DOB',
  'Gender',
  'Marital Status',
  'Status',
  'Position',
  'Title',
  'Pay Group',
  'Job Code',
  'Emp Type',
  'Statutory',
  'Seasonal',
  'Domestic Emp',
  'Probation',
  'Home #',
  'Work #',
  'Ext.',
  'Email',
  'Mail Stop',
  'Hire Date',
  'Rehire Date',
  'Term Date',
  'Term Reason',    
  'Adj Sen Date',
  'Pension',
  'Visa Type',
  'Exp',
  'Citizen',
  'I9 Reverify',
  'I9 Verified',
  'Deceased',
  'Tax Form',
  'WCC',
  'EEOC',
  'Supervisor ID',
  'Name (supervisor name)',
  'Def Comp',
  'Union',
  'Union Date',
  'Collect Dues',
  'Paid Init. Fees',
  'Veteran',
  'Legal Rep',
  'Nickname',
  'surname',  
  'Prior Last',
  'Disability',
  'Smoker',
  'AutoPay',
  'Pay Frequency',
  'OT Exempt',
  'Default Hours',
  'Locations',
  'Positions'],
 'Tables': {
  'Rate/Salary Information': [
   'RateCode',
   'Description',
   'Rate',
   'Salary',  
   'Effective Dates'],
  'Employee Tax Information': [
   'Employee Tax Code',
   'Employee Tax Description',
   'Status',
   "Add'l Amount",
   'Effective Dates',
   'Default'],
  'Employer Tax Information': [
   'Employer Tax Code',
   'Employer Tax Description',
   'Effective Dates',
   'Default'],
  'Deduction Information': [
   'Code',
   'Deduction',
   'Rate',
   'CalcCode',
   'Frequency',
   'Goal/Paid',
   'Min/Max/Annual Max',
   'Arrears',
   'Agency',   
   'Effective Dates'],
  'Direct Deposit Information': [
   'Sequence No.',
   'Transit No.',
   'Account No.',
   'Checking?',  
   'Account Name',
   'Amount Code', 
   'Amount',
   'Prenote Date',
   'Effective Dates',
   'Exclude Special'],
  'Fringe Benefit Information': ['ECode',
   'CalcCode',
   'Rate Code',
   'Rate',
   'Rate Per',
   'Amount',
   'Tabled?',
   'Units', 
   'Frequency',
   'Goal/Paid/Goal Bal.',
   'Min/Max/Ann. Max',
   'Effective Dates'],
  'Benefit Accrual Information': [
   'BCode',
   'Rate',
   'Amount',
   'Hours',
   'Hours',
   'Max/Carryover Max',
   'Length of Service',
   'Hours: Used/Avail/Total/Prob',
   'Dollars: Used/Avail/Total/Prob',   
   'Effective Dates'],
  'Review Information':[ 
   'Date',
   'Reviewer',
   'Rating',
   'Raise Amount',
   'New Pay Amnt',
   'New Pay Per',
   'New Position',
   'Effective Date',
   'Next Review'
  ],
  'Emergency Contact Information': [
   'Name',
   'Relationship',
   'Home Phone',
   'Work Phone',
   'Address', 
   'City',
   'State',
   'Zip',
   'Country']
  }
}

ERROR! Session/line number was not unique in database. History logging moved to new session 531


In [4]:
import os
import base64
from openai import AzureOpenAI
# from azure.identity import DefaultAzureCredential, get_bearer_token_provider
from dotenv import load_dotenv

load_dotenv()
endpoint = os.getenv("ENDPOINT_URL", "https://aniru-metendec-eastus2.openai.azure.com/")
subscription_key = os.getenv("AZURE_OPENAI_API_KEY")

client = AzureOpenAI(
    azure_endpoint=endpoint,
    api_key=subscription_key,
    api_version="2025-03-01-preview",
)

In [5]:
def extract_response_text(response):
    """
    Extract text content from the API response based on the actual response structure
    """
    try:
        if response and hasattr(response, 'output') and response.output:
            # Find the message output (type='message')
            for output_item in response.output:
                if hasattr(output_item, 'type') and output_item.type == 'message':
                    if hasattr(output_item, 'content') and output_item.content:
                        for content_item in output_item.content:
                            if hasattr(content_item, 'text'):
                                return content_item.text
        return None
    except Exception as e:
        print(f"Error extracting response text: {e}")
        return None

In [17]:
# pip install tiktoken>=0.11.0
# pip install tiktoken>=0.11.0
import tiktoken
from typing import Any, Dict, List, Tuple
import time

# ---------- helpers for robust usage extraction ----------

def _pick(obj, *names, default=None):
    """
    Return the first non-None attribute or dict key found on obj among names.
    Works for both SDK dataclass objects and raw dict responses.
    """
    if obj is None:
        return default
    for n in names:
        v = getattr(obj, n, None)
        if v is None and isinstance(obj, dict):
            v = obj.get(n)
        if v is not None:
            return v
    return default

def _cached_tokens_from_details(details) -> int:
    """
    Extract cached_tokens from usage details for both object and dict shapes.
    """
    if details is None:
        return 0
    if isinstance(details, dict):
        return details.get("cached_tokens", 0) or 0
    return getattr(details, "cached_tokens", 0) or 0

def extract_usage_counts(response):
    """
    Extract input_tokens, output_tokens, and cached_tokens across SDK variations:
    - input_tokens vs prompt_tokens
    - input_tokens_details vs prompt_tokens_details
    """
    usage = getattr(response, "usage", None) or {}
    input_tokens  = _pick(usage, "input_tokens", "prompt_tokens", default=0)
    output_tokens = _pick(usage, "output_tokens", "completion_tokens", default=0)
    # Details may be objects (dataclasses) or dicts depending on client/SDK
    itd = _pick(usage, "input_tokens_details", "prompt_tokens_details", default=None)
    cached_tokens = _cached_tokens_from_details(itd)
    return input_tokens, output_tokens, cached_tokens

# ---------- tokenization utilities ----------

def _get_encoding(model: str):
    """
    Prefer the model-specific encoding, then fall back to o200k_harmony, then o200k_base.
    """
    try:
        return tiktoken.encoding_for_model(model)
    except Exception:
        try:
            return tiktoken.get_encoding("o200k_harmony")
        except Exception:
            return tiktoken.get_encoding("o200k_base")

def estimate_text_tokens_from_input(input_messages: List[Dict[str, Any]], model: str) -> int:
    """
    Best-effort text-only token estimate from the same payload being sent.
    Skips images/audio; counts roles and text parts only.
    """
    enc = _get_encoding(model)
    total = 0
    for msg in input_messages:
        if "role" in msg and msg["role"] is not None:
            total += len(enc.encode(str(msg["role"])))
        content = msg.get("content") or []
        # content may be a list of dict parts (Responses API style)
        for part in content if isinstance(content, list) else [content]:
            if isinstance(part, dict) and part.get("type") in ("text", "input_text"):
                total += len(enc.encode(str(part.get("text", ""))))
            elif isinstance(part, dict) and part.get("type") in ("input_image", "image_url", "image", "input_audio"):
                # image/audio counted server-side; do not treat base64/URL as text
                continue
            elif isinstance(part, str):
                total += len(enc.encode(part))
    return total

# ---------- pricing and cost ----------

def model_prices_usd_per_million(model: str) -> Dict[str, float]:
    """
    Return USD per-million token rates for input, cached input, and output.
    Replace these with the current OpenAI or Azure rates in production.
    """
    m = model.lower()
    if "gpt-5-mini" in m:
        # Example placeholders; update from current pricing pages
        return {"in": 0.25, "cached_in": 0.025, "out": 2.00}
    # Default to GPT-5-like
    return {"in": 1.25, "cached_in": 0.125, "out": 10.00}

def compute_cost_usd(model: str, input_tokens: int, output_tokens: int, cached_tokens: int = 0) -> Tuple[float, float]:
    prices = model_prices_usd_per_million(model)
    non_cached_in = max(0, input_tokens - max(0, cached_tokens))
    cin = (non_cached_in * prices["in"] + cached_tokens * prices["cached_in"]) / 1_000_000.0
    cout = (output_tokens * prices["out"]) / 1_000_000.0
    return round(cin, 6), round(cout, 6)

# ---------- main printer ----------

def print_usage_breakdown(response, model: str, input_messages: List[Dict[str, Any]], logger):
    """
    Prints total input/output token usage from the authoritative API response,
    estimates text vs image split locally, and computes costs including cached input.
    """
    input_tokens, output_tokens, cached_tokens = extract_usage_counts(response)
    est_text_tokens = estimate_text_tokens_from_input(input_messages, model)
    est_image_tokens = max(0, input_tokens - est_text_tokens)
    cost_in, cost_out = compute_cost_usd(model, input_tokens, output_tokens, cached_tokens=cached_tokens)

    logger.info(f"total input tokens: {input_tokens}")
    logger.info(f"input text tokens: {est_text_tokens}  # estimated")
    logger.info(f"input image tokens: {est_image_tokens}  # estimated (input - text)")
    logger.info(f"cost for input: ${cost_in} USD")
    logger.info(f"total output tokens: {output_tokens}")
    logger.info(f"cost of output: ${cost_out} USD")


# Example: integrate into your invoke()
def invoke(agent, full_raw_text, base64_image, logger):
    try:
        inp = [
            {
                "role": "system",
                "content": [
                    {"type": "input_text", "text": agent['system_prompt']}
                ],
            },
            {
                "role": "user",
                "content": [
                    {
                        "type": "input_image",
                        "image_url": f"data:image/jpeg;base64,{base64_image}",
                    },
                    {"type": "input_text", "text": "## FULL_RAW_TEXT:\n"+full_raw_text+"\n"},
                    {"type": "input_text", "text": agent['user_prompt']},
                ],
            },
        ]

        kwargs = dict(model=agent['model'], input=inp, text={"verbosity": "low"})
        if 'reasoning_effort' in agent:
            kwargs["reasoning"] = {"effort": agent['reasoning_effort']}
        start = time.time()
        resp = client.responses.create(**kwargs)
        logger.info(f'llm {agent['model']} api call took {time.time()-start} seconds')
        # Print usage and cost after the call
        print_usage_breakdown(resp, agent['model'], inp, logger)
        return resp
    except Exception as e:
        print(e)
        return None

In [127]:
extractor = {
    'user_prompt': f"""Extract data from this document using the following schema:
    
## Schema:
{human_reviewed_schema}

Instructions:
- First locate the form field or table in image to get its position and layout.
- Secondly it can happen that some text is partially visible or hidden or overlapped in image, for that parse the text from FULL_RAW_TEXT.
- Extract all form field values and table data as specified in the schema.

Respond only in JSON format with extracted data (no prose):
{{
 "Form_fields": {{
   "field1": "value1",
   "field2": "value2"
 }},
 "Tables": {{
   "table1": [
     {{"column1": "value", "column2": "value"}},
     {{"column1": "value", "column2": "value"}}
   ],
   "table2": [...]
 }}
}}""",
    'system_prompt': "You are a Document data extraction expert",
    'model': "gpt-5-mini",
    'reasoning_effort': 'medium'
}

In [None]:
- Prioritize image for inferring positional info of DATA_PART
- Prioritize text parsing from FULL_RAW_TEXT

- First locate the form field or table in image to get its position and layout.
- Secondly it can happen that some text is partially visible or hidden or overlapped in image, for that parse the text from FULL_RAW_TEXT.
- Do not pick form field names or column names from schema as value of any other entry.
- For Form_fields names, if any of them are not visible in image or not present in FULL_RAW_TEXT and extractor returned them empty then that is correct.
- When validating tables strictly follow the column headers mentioned in schema.
- When validating tables prioritize/ground image first so that you won't introduce any new errors.
- Form_field names and column names are case sensitive.
- Assign a value to Form_field or column only if it is found around their name with exact match.
- All sections named as "Information" in image or FULL_RAW_TEXT are actually tables, do not confuse them with form fields.

In [134]:
judge = {
    'user_prompt': f'## Schema:\n{human_reviewed_schema}' + \
'''    

The extractor said that DATA_PART DATA_NAME is/are: 
DATA_VALUE

validate above data against the ground truth image and text. finally tell me if correct or not.

Instructions and guidelines to follow during validation:
DATA_PART_INSTRUCTIONS

if a element is correct then return empty "feedback".

Respond only in below format (no prose):
OUTPUT_FORMAT
''',
    'system_prompt': "You are a strict data validation expert",
    'model': "gpt-5",
    'reasoning_effort': 'medium',
    'form_field_output_format': '''[
    {
     "data_name": <key name>,
     "status": correct | wrong, 
     "feedback": <observation>
    },
    ...
]''',
    'form_field_instructions': '''- While locating or parsing for a form field name, assign the found value only if it is not present in schema['Form_fields'].
- Do not use FULL_RAW_TEXT to decide whether a form field value is empty or not.
- Locate form field first in image, then decide if its empty or not and finally parse FULL_RAW_TEXT only to get its value.
- If extractor listed a form field as X then you must also look for exact matching field name in image or FULL_RAW_TEXT.
- If form fields are not visible in image or FULL_RAW_TEXT and extractor said they are empty then that is correct.
- When a form field name and a table name are semantically close as in schema, carefully distinguish between both of them.
- If a form field name and a table column name are same or semantically close do not pick field value from that table.''',
    'table_output_format': '''[
    {
    "row": specific row_number | all rows | multiple rows a,b,c | range of rows x-y
    "column": specific columne_name | all columns | multiple columns column1, column2 | range of columns column1-column5,
    "status": correct | wrong,
    "feedback": <observation>
    },
    ...
]''',
    'table_instructions':'''- Ground the image, locate table.
- For actual text parse FULL_RAW_TEXT but prioritize layout from grounded image.
- Strictly follow the column headers mentioned in schema for the concerned table.
- Compare the table grid of the extractor with that in image cell by cell, and check extractor's table for:
  - if any row(s) or column(s) shifting happened.
  - if any cells or rows or columns are missing.
  - if a cell value is wrong placed in another cell.
  - ignore absolute values like when 0 == 0.0
- Check if any form field values are wrongs placed in table or not.'''
}

extractor_with_feedback = {
    'user_prompt': '''
You previously extracted DATA_PART DATA_NAME as 
DATA_VALUE

but the Judge validated and gave below feedback:
FEEDBACK

- recheck your extraction based on feedback and correct your mistake.
- do not modify any other values which the Judge did not report.
- if any form field key or table not present in image just leave its value blank in output.

Respond only in below format (no prose):
OUTPUT_FORMAT
''',
    'system_prompt': "You are a Document data extraction expert who focuses on feedback and improves the extraction based on it.",
    'model': "gpt-5-mini",
    'reasoning_effort': 'medium',
    'form_field_output_format': "{ 'field_name1':'field_value1', 'field_name2':'field_value2' }",
    'table_output_format': '''[
  {"column1": "value", "column2": "value"},
  {"column1": "value", "column2": "value"}
]'''
}

- First main point
  - First sub-point
  * Second sub-point
* Second main point
  * Another sub-point

In [9]:
import itertools

def iter_dict_chunks(d, n):
    it = iter(d.items())
    while True:
        chunk_items = list(itertools.islice(it, n))
        if not chunk_items:
            break
        yield dict(chunk_items)

In [120]:
import pandas as pd
def prepare_prompt_for(agent, data, feedback = None):
    # this method is dependent on prompt input and output structure
    agent_copy = agent.copy()
    data_part = data['DATA_PART']
    if data_part == "Form_fields":
        data_name = ''
        data_value = '\n'.join([k+':'+v for k,v in data['CHUNK'].items()])
        output_format = agent['form_field_output_format']
        data_part_instructions = agent['form_field_instructions'] if 'form_field_instructions' in agent.keys() else ''
    if data_part == 'Tables':
        chunk = data['CHUNK']
        data_name = list(chunk.keys())[0]
        df = pd.DataFrame(chunk[list(chunk.keys())[0]])
        data_value = df.to_markdown(index=False)
        output_format = agent['table_output_format']
        data_part_instructions = agent['table_instructions'] if 'table_instructions' in agent.keys() else ''
    agent_copy['user_prompt'] = agent_copy['user_prompt'].replace('DATA_PART_INSTRUCTIONS', data_part_instructions)\
        .replace('DATA_VALUE', data_value).replace('DATA_PART', data_part).replace('DATA_NAME', data_name)\
        .replace('OUTPUT_FORMAT', output_format).replace('FEEDBACK', feedback if feedback is not None else '')
    return agent_copy

In [11]:
def prepare_table_feedback(issues):
    return '\n'.join([f'''Issue {i+1}:
row: {issue['row']}
column: {issue['column']}
{issue['feedback']}
-------------''' for i,issue in enumerate(issues)])

In [12]:
import logging
from pathlib import Path
import uuid
import json
import sys
from contextlib import contextmanager

def setup_console_logging(level=logging.INFO):
    """
    Configure a single console (stdout) handler on the 'extract' parent logger.
    Call once at process start; idempotent.
    """
    parent = logging.getLogger("extract")
    parent.setLevel(level)
    parent.propagate = False  # do not bubble to root to avoid duplicates

    # Add a StreamHandler(sys.stdout) only if not already present
    has_stream = any(isinstance(h, logging.StreamHandler) for h in parent.handlers)
    if not has_stream:
        ch = logging.StreamHandler(stream=sys.stdout)  # default is stderr if not specified
        ch.setLevel(level)
        ch.setFormatter(logging.Formatter("%(asctime)s | %(levelname)s | %(message)s"))
        parent.addHandler(ch)

def create_extraction_root(base_dir: Path) -> tuple[str, Path]:
    """
    Create a unique extraction root named by a UUID.
    Returns (extraction_id, root_path).
    """
    extraction_id = str(uuid.uuid4())
    root = base_dir / extraction_id
    root.mkdir(parents=True, exist_ok=False)
    return extraction_id, root

@contextmanager
def page_logger(root: Path, extraction_id: str, page_num: int, level=logging.INFO):
    """
    For a given page:
      - create the page folder
      - attach a FileHandler to page's log.md
      - yield (logger, page_dir)
      - on exit, close/remove only the file handler
    Console output is handled by the parent 'extract' logger via propagation.
    """
    page_dir = root / f"page_{page_num:04d}"
    page_dir.mkdir(parents=True, exist_ok=True)

    # Child logger under the 'extract' parent for propagation
    logger_name = f"extract.{extraction_id}.page.{page_num:04d}"
    logger = logging.getLogger(logger_name)
    logger.setLevel(level)
    logger.propagate = True  # allow messages to flow to parent for console

    log_file = page_dir / "log.md"
    fh = logging.FileHandler(log_file, encoding="utf-8", delay=True)
    fh.setLevel(level)
    fh.setFormatter(logging.Formatter("%(asctime)s | %(levelname)s | %(message)s"))
    logger.addHandler(fh)

    try:
        yield logger, page_dir
    finally:
        # Clean up only file handlers from this logger
        for h in list(logger.handlers):
            if isinstance(h, logging.FileHandler):
                try:
                    h.close()
                finally:
                    logger.removeHandler(h)

def start_page_logger(root: Path, extraction_id: str, page_num: int, level=logging.INFO):
    page_dir = root / f"page_{page_num:04d}"
    page_dir.mkdir(parents=True, exist_ok=True)

    logger_name = f"extract.{extraction_id}.page.{page_num:04d}"
    logger = logging.getLogger(logger_name)
    logger.setLevel(level)
    logger.propagate = True  # let parent 'extract' print to console

    log_file = page_dir / "log.md"
    fh = logging.FileHandler(log_file, encoding="utf-8", delay=True)
    fh.setLevel(level)
    fh.setFormatter(logging.Formatter("%(asctime)s | %(levelname)s | %(message)s"))
    logger.addHandler(fh)
    return logger, page_dir, fh

def stop_page_logger(logger: logging.Logger, fh: logging.FileHandler):
    # Detach and close only the per-page FileHandler
    try:
        logger.removeHandler(fh)  # detach first to avoid races reopening the file [6]
    finally:
        fh.close()  # release file descriptor [3][6]



In [135]:
judge_retry_max_attempts = 3
setup_console_logging(level=logging.INFO)
base_dir = Path("runs")
extraction_id, root = create_extraction_root(base_dir)

for page_num, page_data in enumerate(per_page_text_and_images):
    logger, pdir, fh = start_page_logger(root, extraction_id, page_num)

    logger.info('started initial extraction for page %s', page_num+1)
    initial_extraction_response = invoke(extractor, page_data['full_text'], page_data['base64_image'], logger)
    extracted_data = eval(extract_response_text(initial_extraction_response))
    with (pdir / "original_extraction.json").open("w", encoding="utf-8") as f:
        json.dump(extracted_data, f, indent=2, ensure_ascii=False)
    # processing each page's extracted data
    for data_part in extracted_data.keys():
        if data_part == 'Form_fields':
            logger.info('started validating form fields')
            # processing 10 form fields at a time
            for chunk in iter_dict_chunks(extracted_data[data_part],10):
                logger.info('-validating form fields: %s', list(chunk.keys()))
                judge_retry_attempt = 1
                while judge_retry_attempt <= judge_retry_max_attempts:
                    if judge_retry_attempt > 1: logger.info('-revalidating form fields: %s', list(chunk.keys()))
                    data = {
                        'DATA_PART' : data_part,
                        'CHUNK': chunk
                    }
                    judge_agent = prepare_prompt_for(judge, data)
                    judge_res = invoke(judge_agent, page_data['full_text'], page_data['base64_image'], logger)
                    form_field_feedback = eval(extract_response_text(judge_res))
                    form_field_issues = [ff_issue for ff_issue in form_field_feedback if ff_issue['status'] == 'wrong']
                    if form_field_issues:
                        
                        logger.info('--found issues: %s', form_field_issues)
                        judge_retry_attempt += 1
                        form_field_feedback_string = '\n'.join([ffi['data_name']+':'+ffi['feedback'] for ffi in form_field_issues])
                        extractor_with_feedback_agent = prepare_prompt_for(extractor_with_feedback, data, form_field_feedback_string)
                        
                        extractor_with_feedback_res = invoke(extractor_with_feedback_agent, page_data['full_text'], page_data['base64_image'], logger)
                        
                        corrected_chunk = eval(extract_response_text(extractor_with_feedback_res))
                        logger.info('--previous was:\n%s', '\n'.join([k+' : '+v for k,v in chunk.items()]))
                        logger.info('--corrected as:\n%s', '\n'.join([k+' : '+v for k,v in corrected_chunk.items()]))
                        chunk.update(corrected_chunk)
                    else:
                        logger.info('done judging, ALL GOOD')
                        break
                extracted_data[data_part].update(chunk)
        if data_part == 'Tables':
            logger.info('started validating tables')
            # processing 1 table at a time
            for chunk in iter_dict_chunks(extracted_data[data_part],1):
                logger.info('-validating table: %s', list(chunk.keys())[0])
                table_name, table_data = chunk.popitem()
                if not table_data: continue # if table is empty do not check
                judge_retry_attempt = 1
                while judge_retry_attempt <= judge_retry_max_attempts:
                    if judge_retry_attempt > 1: logger.info('-revalidating table: %s', list(chunk.keys()))
                    data = {
                        'DATA_PART' : data_part,
                        'CHUNK': {table_name: table_data}
                    }
                    judge_agent = prepare_prompt_for(judge, data)
                    judge_res = invoke(judge_agent, page_data['full_text'], page_data['base64_image'], logger)
                    tables_feedback = eval(extract_response_text(judge_res))
                    tables_issues = [t_issue for t_issue in tables_feedback if t_issue['status'] == 'wrong']
                    if tables_issues:
                        logger.info('--found issues: %s', tables_issues)
                        judge_retry_attempt += 1
                        extractor_with_feedback_agent = prepare_prompt_for(extractor_with_feedback, data, prepare_table_feedback(tables_issues))
                        
                        extractor_with_feedback_res = invoke(extractor_with_feedback_agent, page_data['full_text'], page_data['base64_image'], logger)
                        
                        corrected_chunk = eval(extract_response_text(extractor_with_feedback_res))
                        logger.info('--previous was:\n%s', pd.DataFrame(table_data).to_markdown(index=False))
                        logger.info('--corrected as:\n%s', pd.DataFrame(corrected_chunk).to_markdown(index=False))
                        table_data = corrected_chunk
                        chunk.update({table_name:table_data})
                    else:
                        logger.info('done judging, ALL GOOD')
                        break
                extracted_data[data_part].update(chunk)
    with (pdir / "corrected_extraction.json").open("w", encoding="utf-8") as f:
        json.dump(extracted_data, f, indent=2, ensure_ascii=False)

2025-09-02 22:28:59,623 | INFO | started initial extraction for page 1
2025-09-02 22:29:43,735 | INFO | llm gpt-5-mini api call took 44.10162377357483 seconds
2025-09-02 22:29:43,741 | INFO | total input tokens: 2989
2025-09-02 22:29:43,742 | INFO | input text tokens: 2212  # estimated
2025-09-02 22:29:43,743 | INFO | input image tokens: 777  # estimated (input - text)
2025-09-02 22:29:43,744 | INFO | cost for input: $0.000747 USD
2025-09-02 22:29:43,744 | INFO | total output tokens: 4403
2025-09-02 22:29:43,745 | INFO | cost of output: $0.008806 USD
2025-09-02 22:29:43,749 | INFO | started validating form fields
2025-09-02 22:29:43,749 | INFO | -validating form fields: ['Company Name', 'Company Code', 'Period', 'Employee Name', 'Address line 1', 'City', 'State', 'Zip', 'Emp Id', 'SSN']
2025-09-02 22:30:05,377 | INFO | llm gpt-5 api call took 21.626286029815674 seconds
2025-09-02 22:30:05,383 | INFO | total input tokens: 3173
2025-09-02 22:30:05,384 | INFO | input text tokens: 2400  # 

In [118]:
print(prepare_prompt_for(judge, data)['user_prompt'])

## Schema:
{'Form_fields': ['Company Name', 'Company Code', 'Period', 'Employee Name', 'Address line 1', 'City', 'State', 'Zip', 'Emp Id', 'SSN', 'DOB', 'Gender', 'Marital Status', 'Status', 'Position', 'Title', 'Pay Group', 'Job Code', 'Emp Type', 'Statutory', 'Seasonal', 'Domestic Emp', 'Probation', 'Home #', 'Work #', 'Ext.', 'Email', 'Mail Stop', 'Hire Date', 'Rehire Date', 'Term Date', 'Term Reason', 'Adj Sen Date', 'Pension', 'Visa Type', 'Exp', 'Citizen', 'I9 Reverify', 'I9 Verified', 'Deceased', 'Tax Form', 'WCC', 'EEOC', 'Supervisor ID', 'Name (supervisor name)', 'Def Comp', 'Union', 'Union Date', 'Collect Dues', 'Paid Init. Fees', 'Veteran', 'Legal Rep', 'Nickname', 'surname', 'Prior Last', 'Disability', 'Smoker', 'AutoPay', 'Pay Frequency', 'OT Exempt', 'Default Hours', 'Locations', 'Positions'], 'Tables': {'Rate/Salary Information': ['RateCode', 'Description', 'Rate', 'Salary', 'Effective Dates'], 'Employee Tax Information': ['Employee Tax Code', 'Employee Tax Description',