In [None]:
from vllm import LLM, SamplingParams
from vllm.model_executor.models.deepseek_ocr import NGramPerReqLogitsProcessor
from PIL import Image
import json
import os
from tqdm import tqdm
from transformers import AutoTokenizer
import Levenshtein  # For CER calculation
import pandas as pd

MODEL_NAME = "deepseek-ai/DeepSeek-OCR"
JSON_PATH = "./focus_benchmark_test/en_page_ocr.json"
IMAGE_DIR = "./focus_benchmark_test/en_pdf_png"
OUTPUT_PATH = "./fox_results_table.json"
PROMPT = "<image>\nFree OCR."
MAX_TOKENS = 8192
PATCH_SIZE = 16  # Patch size for vision token estimation
VISION_TOKENS_LIST = [64, 100, 1000]  # Vision tokens to evaluate

with open(JSON_PATH, "r", encoding="utf-8") as f:
    fox_data = json.load(f)
print(f"Loaded {len(fox_data)} samples from Fox benchmark")


tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, trust_remote_code=True)


llm = LLM(
    model=MODEL_NAME,
    enable_prefix_caching=False,
    mm_processor_cache_gb=0,
    logits_processors=[NGramPerReqLogitsProcessor],
)


model_inputs = []
for sample in fox_data:
    img_file = sample["image"]
    img_path = os.path.join(IMAGE_DIR, img_file)
    try:
        img = Image.open(img_path).convert("RGB")
    except Exception as e:
        print(f"Skipping {img_file}: {e}")
        continue

    model_inputs.append({
        "prompt": PROMPT,
        "multi_modal_data": {"image": img},
        "sample_id": img_file,
        "ground_truth": sample["conversations"][1]["value"]
    })

print(f"Prepared {len(model_inputs)} valid inputs")

sampling_params = SamplingParams(
    temperature=0.0,
    max_tokens=MAX_TOKENS,
    extra_args=dict(
        ngram_size=30,
        window_size=90,
        whitelist_token_ids={128821, 128822},  # whitelist: <td>, </td>
    ),
    skip_special_tokens=True,
)

print("Running inference...")
model_outputs = llm.generate(model_inputs, sampling_params)

def cer_precision(pred_text, gt_text):
    """Compute CER-based precision (1 - CER)"""
    cer = Levenshtein.distance(pred_text, gt_text) / max(len(gt_text), 1)
    return 1 - cer

def estimate_vision_tokens(img, patch_size=PATCH_SIZE):
    """Estimate number of visual tokens for the image"""
    w, h = img.size
    return (w // patch_size) * (h // patch_size)

def text_token_range(num_tokens):
    """Return range string for table grouping"""
    lower = (num_tokens // 100) * 100
    upper = lower + 100
    return f"{lower}-{upper}"

table_data = []

for inp, out in zip(model_inputs, model_outputs):
    pred_text = out.outputs[0].text
    gt_text = inp["ground_truth"]

    precision = cer_precision(pred_text, gt_text)
    num_text_tokens = len(tokenizer.encode(gt_text))

    # Get image to estimate vision tokens
    img_path = os.path.join(IMAGE_DIR, inp["sample_id"])
    img = Image.open(img_path).convert("RGB")
    base_vision_tokens = estimate_vision_tokens(img)

    for vt in VISION_TOKENS_LIST:
        compression = num_text_tokens / vt  # compute "×" as in paper
        table_data.append({
            "Text Tokens": text_token_range(num_text_tokens),
            "Vision Tokens": vt,
            "Precision (%)": round(precision*100, 1),
            "Compression (×)": round(compression, 1),
            "Pages": 1
        })


df = pd.DataFrame(table_data)
df_grouped = df.groupby(["Text Tokens", "Vision Tokens"]).agg({
    "Precision (%)": "mean",
    "Compression (×)": "mean",
    "Pages": "sum"
}).reset_index()


os.makedirs(os.path.dirname(OUTPUT_PATH) or ".", exist_ok=True)
df_grouped.to_json(OUTPUT_PATH, orient="records", indent=2)
print(f"Saved table to {OUTPUT_PATH}")