# Vision and Reasoning

This notebook implements an advanced, two-stage pipeline for digitizing and validating hand-filled forms using open-source vision and language models. It is a high-fidelity recreation of the architecture from OpenAI's Model Selection Guide, enhanced with a detailed, automated evaluation stage for production-grade observability.

## Architecture Overview and Enhancements

We adopt the original's robust two-stage workflow:

**Stage 1: Vision-based OCR Extraction**
*   A powerful vision-capable LLM performs an initial, high-accuracy OCR on the form image.
*   The key principle is to extract text literally and preserve ambiguity (e.g., `'jsmithl@gmail.com OR jsmith1@gmail.com'`). It does not guess or infer missing information.

**Stage 2: Reasoning and Refinement**
*   A cost-effective reasoning LLM takes the JSON from Stage 1 as input.
*   Using a suite of tools (like web search and email validation), it resolves ambiguities, fills in missing information (like zip codes), and ensures the data conforms to the final schema.

**Our Enhancements:**
*   **Automated Quality Evaluation Agent:** A new, final stage where an evaluation LLM compares the output of Stage 1 and Stage 2 to provide granular scores for accuracy, inference quality, and completeness.
*   **Comprehensive Final Summary:** A final DataFrame that consolidates all operational metrics (cost, latency) and our new quality scores, providing a holistic view of the entire process.

## 1. Setup and Configuration

In [None]:
# %pip install -qU openai pandas tqdm pydantic

In [None]:
import os
import json
import re
import time
import uuid
import logging
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from pathlib import Path
import pandas as pd
from openai import OpenAI
from tqdm.auto import tqdm
from IPython.display import display, Markdown, Image
from pydantic import BaseModel, Field

# --- LLM Configuration ---
API_KEY = "API_KEY"  # Replace with your actual API key
BASE_URL = "https://api.studio.nebius.com/v1/"

# NOTE: The chosen vision model MUST support image inputs (e.g., multimodal models like LLaVA, CogVLM, etc.)
MODEL_VISION = "google/gemma-3-27b-it" # A powerful multimodal model for OCR
MODEL_REASONING = "Qwen/Qwen3-14B"      # A smaller, cheaper model for refinement
MODEL_EVALUATE = "Qwen/Qwen3-235B-A22B"    # A powerful model for accurate evaluation

client = OpenAI(api_key=API_KEY, base_url=BASE_URL)

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

metrics_log = []

### 1.1. Utilities and Data Structures

We start by defining our target data schema using Pydantic, which ensures that the LLM's output is structured correctly. We also include a robust helper function for parsing JSON from model responses.

In [2]:
class PersonContact(BaseModel):
    name: str
    home_phone: str
    work_phone: str
    cell_phone: str
    email: str

class Address(BaseModel):
    street: str
    city: str
    state: str
    zip: str
    county: str

class DwellingDetails(BaseModel):
    coverage_a_limit: str
    companion_policy_expiration_date: str
    occupancy_of_dwelling: str
    type_of_policy: str
    unrepaired_structural_damage: bool
    construction_type: str
    roof_type: str
    foundation_type: str
    has_post_and_pier_or_post_and_beam_foundation: bool
    cripple_walls: bool
    number_of_stories: str
    living_space_over_garage: bool
    number_of_chimneys: str
    square_footage: str
    year_of_construction: str
    anchored_to_foundation: bool
    water_heater_secured: bool

class InsuranceFormData(BaseModel):
    applicant: PersonContact
    co_applicant: PersonContact
    risk_address: Address
    mailing_address_if_different_than_risk_address: Address
    participating_insurer: str
    companion_policy_number: str
    dwelling_details: DwellingDetails
    effective_date: str
    expiration_date: str

def parse_json_from_response(text: str) -> Dict[str, Any]:
    match = re.search(r'```(?:json)?\s*({.*?})\s*```', text, re.S)
    if match:
        json_str = match.group(1)
    else:
        start = text.find('{')
        end = text.rfind('}')
        if start != -1 and end != -1:
            json_str = text[start:end+1]
        else:
            return {"raw_text": text}
    try:
        return json.loads(json_str)
    except json.JSONDecodeError:
        logging.warning(f"Failed to parse JSON from response: {text}")
        return {"raw_text": text}

## 2. Stage 1: Vision-based OCR Extraction

In this stage, we send the image of the insurance form to our powerful multimodal LLM. The prompt is carefully crafted to instruct the model to perform a literal transcription, explicitly preserving any ambiguities it finds and leaving blank fields empty. This prevents the model from making premature assumptions and provides a clean, raw extraction for the next stage.

In [3]:
FORM_IMAGE_URL = "https://drive.usercontent.google.com/download?id=1-tZ526AW3mX1qthvgi8spaaxxeqFG5_6"
display(Image(url=FORM_IMAGE_URL, width=600))

def run_ocr_stage(image_url: str) -> Dict[str, Any]:
    logging.info("--- Starting Stage 1: Vision-based OCR Extraction ---")
    ocr_prompt = f"""You are an expert at processing insurance forms. OCR the data from the user-provided image into a structured JSON format that conforms to the following Pydantic schema:
    {InsuranceFormData.schema_json(indent=2)}

    IMPORTANT INSTRUCTIONS:
    1. Fill out the fields as literally and exactly as possible from the image.
    2. If a field is blank on the form, leave the corresponding JSON string field empty ('').
    3. If a written character is ambiguous (e.g., could be 'l' or '1', 'o' or '0'), include all possibilities in the string, separated by ' OR '. This is especially important for email addresses.
    4. Do NOT infer or guess any information that is not explicitly written on the form."""
    
    messages = [
        {"role": "user", "content": [
            {"type": "text", "text": ocr_prompt},
            {"type": "image_url", "image_url": {"url": image_url, "detail": "high"}}
        ]}
    ]
    
    start_time = time.time()
    response = client.chat.completions.create(model=MODEL_VISION, messages=messages, temperature=0.0)
    latency = time.time() - start_time
    
    response_text = response.choices[0].message.content
    p_tokens, c_tokens = response.usage.prompt_tokens, response.usage.completion_tokens
    metrics_log.append({"step": "ocr_extraction", "model": MODEL_VISION, "latency_s": latency, "prompt_tokens": p_tokens, "completion_tokens": c_tokens, "total_tokens": p_tokens + c_tokens})
    
    return parse_json_from_response(response_text)

stage1_output = run_ocr_stage(FORM_IMAGE_URL)
display(Markdown("### Stage 1 OCR Output (with ambiguities):"))
display(stage1_output)

2025-08-18 09:28:25,759 - INFO - --- Starting Stage 1: Vision-based OCR Extraction ---
C:\Users\faree\AppData\Local\Temp\ipykernel_14240\3772573333.py:7: PydanticDeprecatedSince20: The `schema_json` method is deprecated; use `model_json_schema` and json.dumps instead. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.11/migration/
  {InsuranceFormData.schema_json(indent=2)}
2025-08-18 09:28:37,501 - INFO - HTTP Request: POST https://api.studio.nebius.com/v1/chat/completions "HTTP/1.1 200 OK"


### Stage 1 OCR Output (with ambiguities):

{'applicant': {'name': 'Smith, James L',
  'home_phone': '510 331 5555',
  'work_phone': '',
  'cell_phone': '510 212 5555',
  'email': 'jsmith@gmail.com'},
 'co_applicant': {'name': 'Roberts, Jesse T',
  'home_phone': '510 331 5555',
  'work_phone': '415 626 5555',
  'cell_phone': '',
  'email': 'jrobertsjr@gmail.com'},
 'risk_address': {'street': '855 Brannan St',
  'city': 'San Francisco',
  'state': 'CA',
  'zip': '',
  'county': ''},
 'mailing_address_if_different_than_risk_address': {'street': '',
  'city': '',
  'state': '',
  'zip': '',
  'county': ''},
 'participating_insurer': 'Agne Insurance Co',
 'companion_policy_number': '8126918',
 'dwelling_details': {'coverage_a_limit': '$900,000',
  'companion_policy_expiration_date': '5/31/27',
  'occupancy_of_dwelling': 'Owner',
  'type_of_policy': 'Homeowners',
  'unrepaired_structural_damage': False,
  'construction_type': 'Wood Shake OR Other',
  'roof_type': 'Wood Shake OR Other',
  'foundation_type': 'Slab',
  'has_post_and_pie

## 3. Stage 2: Reasoning and Refinement

Now, we pass the raw, ambiguous JSON from Stage 1 to a second, more specialized reasoning agent. This agent's job is to act as a data validation expert. It uses a set of tools to resolve the ambiguities and fill in the missing information.

In [4]:
# Mock Tools for Stage 2
def validate_email(email: str) -> bool:
    """Mock function to validate an email. In a real system, this could check a database or a validation service."""
    logging.info(f"VALIDATING EMAIL: {email}")
    # For this demo, we'll pretend only 'jsmithl@gmail.com' is the valid one.
    return email == "jsmithl@gmail.com"

def search_web(query: str) -> str:
    """Mock function for web search. In a real system, this would call a search API."""
    logging.info(f"SEARCHING WEB FOR: {query}")
    if "855 Brannan St" in query:
        return "The full address is 855 Brannan St, San Francisco, CA 94103. This is in San Francisco County."
    return "No information found."

TOOL_DISPATCHER = {"validate_email": validate_email, "search_web": search_web}

def get_tool_manifest():
    return [
        {"type": "function", "function": {"name": "validate_email", "description": "Check if an email address is valid and exists.", "parameters": {"type": "object", "properties": {"email": {"type": "string"}}, "required": ["email"]}}},
        {"type": "function", "function": {"name": "search_web", "description": "Perform a web search to find missing information like zip codes or counties.", "parameters": {"type": "object", "properties": {"query": {"type": "string"}}, "required": ["query"]}}}
    ]

In [5]:
def run_refinement_stage(ocr_data: Dict[str, Any]) -> Dict[str, Any]:
    logging.info("--- Starting Stage 2: Reasoning and Refinement ---")
    refinement_prompt = """You are a data validation expert. You have been given a raw JSON output from an OCR system. Your job is to clean, validate, and complete it.
    1. Examine each field for ambiguities (e.g., text containing ' OR '). Use the provided tools to resolve them to a single, correct value.
    2. Identify any missing information (e.g., empty strings for `zip` or `county`). Use tools to find the correct information.
    3. If the mailing address is empty, assume it is the same as the risk address and fill it in accordingly.
    4. Return the final, cleaned, and validated JSON object that conforms perfectly to the Pydantic schema. Do not include the schema in your response."""
    
    messages = [
        {"role": "system", "content": refinement_prompt},
        {"role": "user", "content": f"Here is the raw OCR data. Please refine it:\n\n{json.dumps(ocr_data, indent=2)}"}
    ]
    
    # This loop handles the conversation with the tool-using agent
    for i in range(5): # Max 5 turns of conversation
        start_time = time.time()
        response = client.chat.completions.create(model=MODEL_REASONING, messages=messages, tools=get_tool_manifest(), tool_choice="auto")
        latency = time.time() - start_time
        msg = response.choices[0].message
        messages.append(msg)
        
        p_tokens, c_tokens = response.usage.prompt_tokens, response.usage.completion_tokens
        metrics_log.append({"step": f"refinement_turn_{i}", "model": MODEL_REASONING, "latency_s": latency, "prompt_tokens": p_tokens, "completion_tokens": c_tokens, "total_tokens": p_tokens + c_tokens})
        
        if not msg.tool_calls:
            logging.info("Refinement agent has finished reasoning.")
            return parse_json_from_response(msg.content)

        logging.info(f"Refinement agent requested {len(msg.tool_calls)} tool call(s)...")
        for tool_call in msg.tool_calls:
            function_name = tool_call.function.name
            try:
                args = json.loads(tool_call.function.arguments)
                result = TOOL_DISPATCHER[function_name](**args)
                messages.append({"role": "tool", "tool_call_id": tool_call.id, "content": json.dumps(result)})
            except Exception as e:
                messages.append({"role": "tool", "tool_call_id": tool_call.id, "content": json.dumps({"error": str(e)})})
    
    return {"error": "Exceeded maximum tool call limit."}

stage2_output = run_refinement_stage(stage1_output)
display(Markdown("### Stage 2 Refined Output:"))
display(stage2_output)

2025-08-18 09:29:05,952 - INFO - --- Starting Stage 2: Reasoning and Refinement ---
2025-08-18 09:29:15,788 - INFO - HTTP Request: POST https://api.studio.nebius.com/v1/chat/completions "HTTP/1.1 200 OK"
2025-08-18 09:29:15,811 - INFO - Refinement agent requested 1 tool call(s)...
2025-08-18 09:29:15,811 - INFO - SEARCHING WEB FOR: 855 Brannan St San Francisco CA zip code
2025-08-18 09:29:27,358 - INFO - HTTP Request: POST https://api.studio.nebius.com/v1/chat/completions "HTTP/1.1 200 OK"
2025-08-18 09:29:27,373 - INFO - Refinement agent has finished reasoning.


### Stage 2 Refined Output:

{'applicant': {'name': 'Smith, James L',
  'home_phone': '510 331 5555',
  'work_phone': '',
  'cell_phone': '510 212 5555',
  'email': 'jsmith@gmail.com'},
 'co_applicant': {'name': 'Roberts, Jesse T',
  'home_phone': '510 331 5555',
  'work_phone': '415 626 5555',
  'cell_phone': '',
  'email': 'jrobertsjr@gmail.com'},
 'risk_address': {'street': '855 Brannan St',
  'city': 'San Francisco',
  'state': 'CA',
  'zip': '94103',
  'county': 'San Francisco'},
 'mailing_address_if_different_than_risk_address': {'street': '855 Brannan St',
  'city': 'San Francisco',
  'state': 'CA',
  'zip': '94103',
  'county': 'San Francisco'},
 'participating_insurer': 'Agne Insurance Co',
 'companion_policy_number': '8126918',
 'dwelling_details': {'coverage_a_limit': '$900,000',
  'companion_policy_expiration_date': '5/31/27',
  'occupancy_of_dwelling': 'Owner',
  'type_of_policy': 'Homeowners',
  'unrepaired_structural_damage': False,
  'construction_type': 'Wood Shake OR Other',
  'roof_type': 'Wood 

## 4. Automated Quality Evaluation

This is our enhanced evaluation step. We now use a powerful evaluation model to act as a quality assurance specialist. It compares the initial raw extraction with the final refined output to provide a detailed report on the system's performance for this specific form.

In [6]:
def evaluate_extraction_quality(raw_data: Dict, final_data: Dict) -> Dict:
    logging.info("--- Starting Stage 3: Automated Quality Evaluation ---")
    eval_prompt = f"""You are a Quality Assurance expert. Compare the 'RAW OCR DATA' with the 'FINAL REFINED DATA' and provide a detailed evaluation.

    RAW OCR DATA:
    {json.dumps(raw_data, indent=2)}

    FINAL REFINED DATA:
    {json.dumps(final_data, indent=2)}

    Provide your evaluation as a JSON object with the following structure:
    {{
      "field_accuracy_score": {{ "score": float (0.0-1.0), "justification": "Assess the overall accuracy of the final fields compared to the raw data." }},
      "inference_quality_score": {{ "score": float (0.0-1.0), "justification": "Specifically assess how well the model filled in MISSING or resolved AMBIGUOUS fields (e.g., zip code, email)." }},
      "completeness_score": {{ "score": float (0.0-1.0), "justification": "Calculate the ratio of non-empty fields in the final data to the total number of fields." }},
      "overall_confidence_score": {{ "score": float (0.0-1.0), "justification": "Your holistic confidence in the final data's correctness." }},
      "fields_requiring_human_review": ["list", "of", "field_names", "that still seem uncertain or could not be verified"]
    }}
    """

    messages = [{"role": "user", "content": eval_prompt}]
    start_time = time.time()
    response = client.chat.completions.create(model=MODEL_EVALUATE, messages=messages, temperature=0.0)
    latency = time.time() - start_time

    response_text = response.choices[0].message.content
    p_tokens, c_tokens = response.usage.prompt_tokens, response.usage.completion_tokens
    metrics_log.append({"step": "quality_evaluation", "model": MODEL_EVALUATE, "latency_s": latency, "prompt_tokens": p_tokens, "completion_tokens": c_tokens, "total_tokens": p_tokens + c_tokens})

    return parse_json_from_response(response_text)

quality_assessment = evaluate_extraction_quality(stage1_output, stage2_output)
display(Markdown("### Stage 3 Automated Quality Report:"))
display(quality_assessment)

2025-08-18 09:29:30,181 - INFO - --- Starting Stage 3: Automated Quality Evaluation ---
2025-08-18 09:31:42,917 - INFO - HTTP Request: POST https://api.studio.nebius.com/v1/chat/completions "HTTP/1.1 200 OK"


### Stage 3 Automated Quality Report:

{'field_accuracy_score': {'score': 0.9,
  'justification': "The final data accurately preserves all original information from the RAW OCR DATA. The only discrepancies are in the 'mailing_address_if_different_than_risk_address' fields, which were filled with the same values as the risk address despite the field name implying it should only be populated if different. The risk address zip and county were correctly inferred as '94103' and 'San Francisco', respectively."},
 'inference_quality_score': {'score': 0.6,
  'justification': 'The model successfully filled missing zip and county fields for the risk address with accurate data. However, the inference to populate the mailing address with identical values to the risk address is questionable, as the field definition implies it should remain blank if the addresses are the same. This creates potential inaccuracies in the mailing address section.'},
 'completeness_score': {'score': 1.0,
  'justification': 'All fields in the FINAL REFINED DA

## 5. Final Analysis and Summary

In [7]:
model_prices_per_million_tokens = {
    "google/gemma-3-27b-it": {
        "input": 0.10,  # Example price in USD per 1M input tokens
        "output": 0.30   # Example price in USD per 1M output tokens
    },
    "Qwen/Qwen3-14B": {
        "input": 0.08,  # Example price
        "output": 0.24   # Example price
    },
    "Qwen/Qwen3-235B-A22B": {
        "input": 0.20,  # Example price for a large model
        "output": 0.60   # Example price for a large model
    }
}

In [8]:
# 5.2 Per-Step Operational Metrics
if metrics_log:
    df_metrics = pd.DataFrame(metrics_log)

    def calculate_cost(row):
        prices = model_prices_per_million_tokens.get(row['model'], {"input": 0, "output": 0})
        return (row['prompt_tokens'] / 1_000_000) * prices['input'] + (row['completion_tokens'] / 1_000_000) * prices['output']

    df_metrics['cost_usd'] = df_metrics.apply(calculate_cost, axis=1)
    
    display(Markdown("### Per-Step Performance and Cost Analysis"))
    display(df_metrics)
else:
    logging.info("No metrics were logged.")

### Per-Step Performance and Cost Analysis

Unnamed: 0,step,model,latency_s,prompt_tokens,completion_tokens,total_tokens,cost_usd
0,ocr_extraction,google/gemma-3-27b-it,11.770877,1947,593,2540,0.000373
1,refinement_turn_0,Qwen/Qwen3-14B,9.855705,882,771,1653,0.000256
2,refinement_turn_1,Qwen/Qwen3-14B,11.558692,1696,942,2638,0.000362
3,quality_evaluation,Qwen/Qwen3-235B-A22B,132.759243,1309,4236,5545,0.002803


In [9]:
# 5.3 Final Run Summary
if metrics_log and quality_assessment:
    def get_score(assessment, key):
        if isinstance(assessment, dict) and key in assessment and isinstance(assessment[key], dict):
            return assessment[key].get('score', 0.0)
        return 0.0

    summary_data = {
        'Metric': [
            'Total Latency (s)',
            'Total Cost (USD)',
            'Total Tokens',
            '--- Quality Scores (AI) ---',
            'Field Accuracy Score',
            'Inference Quality Score',
            'Completeness Score',
            '**Overall Confidence Score**',
            'Fields for Human Review'
        ],
        'Value': [
            f"{df_metrics['latency_s'].sum():.2f}",
            f"${df_metrics['cost_usd'].sum():.6f}",
            f"{df_metrics['total_tokens'].sum():,}",
            '---',
            f"{get_score(quality_assessment, 'field_accuracy_score'):.2f}",
            f"{get_score(quality_assessment, 'inference_quality_score'):.2f}",
            f"{get_score(quality_assessment, 'completeness_score'):.2f}",
            f"**{get_score(quality_assessment, 'overall_confidence_score'):.2f}**",
            ', '.join(quality_assessment.get('fields_requiring_human_review', [])) or 'None'
        ]
    }
    
    df_summary = pd.DataFrame(summary_data).set_index('Metric')
    
    display(Markdown("### Final Run Summary"))
    display(df_summary)
else:
    logging.info("Cannot generate summary as no metrics were logged or quality assessment failed.")

### Final Run Summary

Unnamed: 0_level_0,Value
Metric,Unnamed: 1_level_1
Total Latency (s),165.94
Total Cost (USD),$0.003793
Total Tokens,12376
--- Quality Scores (AI) ---,---
Field Accuracy Score,0.90
Inference Quality Score,0.60
Completeness Score,1.00
**Overall Confidence Score**,**0.85**
Fields for Human Review,mailing_address_if_different_than_risk_address...


## 6. Conclusion

This notebook has demonstrated a complete, end-to-end two-stage pipeline for intelligent form processing. By separating the tasks of literal OCR (Stage 1) and logical refinement (Stage 2), we create a system that is both highly accurate and cost-effective.

**Key Takeaways:**

1.  **The Two-Stage Architecture Excels:** Using a powerful vision model for initial extraction and a cheaper reasoning model for refinement is a highly effective pattern for complex document processing tasks.
2.  **Tools are Essential for Reasoning:** The refinement stage's ability to fill in missing information is entirely dependent on its access to tools like web search, demonstrating that reasoning is most powerful when grounded in external data.
3.  **Automated Evaluation Provides Actionable Insights:** By adding a final evaluation agent, we transform the pipeline from a black box into a transparent system that quantifies its own performance and provides clear signals for when human intervention is necessary.