# Advanced Batch Document OCR with HF jobs

This notebook runs a three-stage OCR pipeline on Hugging Face Jobs:

1. **Extract** – Run DeepSeek OCR over a dataset, save Markdown and crop detected figures
2. **Describe** – Generate captions for extracted figures  
3. **Assemble** – Enrich Markdown with figure captions

All stages share a single HF dataset repository. Each stage loads the dataset, processes it, and pushes updates back.


In [None]:
import os
import shutil
import tempfile
import time
from pathlib import Path

from huggingface_hub import HfApi, create_repo, fetch_job_logs, inspect_job, run_uv_job, whoami
from huggingface_hub._jobs_api import JobInfo, JobStage

In [None]:
# Configuration
USERNAME = whoami()["name"]

HUB_IMAGE = "vllm/vllm-openai:v0.12.0"
HARDWARE = "a100-large"
TIMEOUT = "3h"

CODE_REPO = f"{USERNAME}/deepseek-ocr-job-code"
DATASET_REPO = f"{USERNAME}/deepseek-ocr-dataset"

# Source dataset
SOURCE_DATASET = "HuggingFaceM4/FineVision"
SOURCE_CONFIG = "olmOCR-mix-0225-documents"
MAX_SAMPLES = 20

print(f"Code: {CODE_REPO} | Dataset: {DATASET_REPO}")
print(f"Source: {SOURCE_DATASET}/{SOURCE_CONFIG} ({MAX_SAMPLES} samples)")

In [None]:
# Base environment for all stages
BASE_ENV = {
    # vLLM
    "MODEL_ID": "deepseek-ai/DeepSeek-OCR",
    "SERVED_MODEL_NAME": "deepseek-ocr",
    "HOST": "0.0.0.0",
    "PORT": "8000",
    "MAX_MODEL_LEN": "8192",
    "GPU_MEMORY_UTILIZATION": "0.90",
    "TENSOR_PARALLEL_SIZE": "1",
    # Code
    "JOB_CODE_REPO": CODE_REPO,
    "JOB_CODE_REVISION": "main",
    "JOB_CODE_LOCAL_DIR": "/tmp/deepseek-ocr-job-code",
    # Auth
    "HF_TOKEN": os.environ.get("HF_TOKEN", ""),
    # Prompts
    "DOC_PROMPT": "<image>\n<|grounding|>Convert this document to Markdown.",
    "DOC_MAX_TOKENS": "4096",
    "DOC_TEMPERATURE": "0.1",
    "FIGURE_PROMPT": "<image>\nDescribe this image in detail.",
    "FIGURE_MAX_TOKENS": "512",
    "FIGURE_TEMPERATURE": "0.6",
}

In [None]:
# Upload code to HF Hub
CODE_PATHS = [
    Path("hf_job_runner.py"),
    Path("../llm_ocr"),
]

api = HfApi()
create_repo(repo_id=CODE_REPO, repo_type="dataset", exist_ok=True)
create_repo(repo_id=DATASET_REPO, repo_type="dataset", exist_ok=True)

bundle_dir = Path(tempfile.mkdtemp(prefix="job-code-"))
for path in CODE_PATHS:
    src = Path.cwd() / path if not path.is_absolute() else path
    if src.is_dir():
        shutil.copytree(src, bundle_dir / path.name, dirs_exist_ok=True)
    else:
        shutil.copy2(src, bundle_dir / path.name)

api.upload_folder(folder_path=str(bundle_dir), repo_id=CODE_REPO, repo_type="dataset")
print(f"Uploaded code to {CODE_REPO}")


In [None]:
# Helper functions
CODE_URL = f"https://huggingface.co/datasets/{CODE_REPO}/resolve/main/hf_job_runner.py"

def launch(stage: str, flavor: str, env: dict) -> JobInfo:
    full_env = {**BASE_ENV, **env, "PIPELINE_STAGE": stage}
    job = run_uv_job(CODE_URL, image=HUB_IMAGE, flavor=flavor, env=full_env, timeout=TIMEOUT)
    print(f"Launched {stage}: {job.url}")
    return job

def wait(job: JobInfo, poll: int = 60) -> JobInfo:
    while True:
        info = inspect_job(job_id=job.id)
        stage = info.status.stage
        print(f"  {job.id}: {stage}")
        if stage not in {JobStage.RUNNING, "RUNNING", "UPDATING"}:
            return info
        time.sleep(poll)

def logs(job: JobInfo, tail: int = 100):
    for line in list(fetch_job_logs(job_id=job.id, namespace=job.owner.name))[-tail:]:
        print(line, end="")


# Import rendering utilities from llm_ocr
import sys
sys.path.insert(0, '..')  # Add parent directory for llm_ocr imports
from llm_ocr.document import render_sample_markdown, display_markdown


def display_samples(dataset, num_samples: int = 2):
    """Display a few samples from the dataset."""
    from IPython.display import display, Markdown
    
    print(f"Dataset: {len(dataset)} samples")
    print(f"Columns: {list(dataset.column_names)}")
    print()
    
    for i in range(min(num_samples, len(dataset))):
        sample = dataset[i]
        print(f"=== Sample {i}: {sample['sample_id']} ===")
        
        # Show source image if available
        if sample.get('source_image'):
            print("Source image:")
            display(sample['source_image'])
        
        # Show markdown preview
        md = sample.get('document_markdown') or sample.get('document_markdown_text', '')
        if md:
            print(f"\nMarkdown preview ({len(md)} chars):")
            print(md[:500] + '...' if len(md) > 500 else md)
        
        # Show final markdown if available
        final_md = sample.get('document_final_markdown') or sample.get('document_final_markdown_text', '')
        if final_md:
            print(f"\nFinal markdown preview ({len(final_md)} chars):")
            print(final_md[:500] + '...' if len(final_md) > 500 else final_md)
        
        # Show figures
        figures = sample.get('extracted_figures', [])
        if figures:
            print(f"\nExtracted figures: {len(figures)}")
            for j, fig in enumerate(figures[:2]):  # Show max 2 figures
                display(fig)
        print()

In [None]:
# Stage 1: Extract
stage1 = launch("extract", flavor=HARDWARE, env={
    "DATASET_NAME": SOURCE_DATASET,
    "DATASET_CONFIG": SOURCE_CONFIG,
    "DATASET_SPLIT": "train",
    "MAX_SAMPLES": str(MAX_SAMPLES),
    "OUTPUT_DIR": "./outputs",
    "EXTRACT_BATCH_SIZE": "256",
    "EXTRACT_MAX_CONCURRENCY": "8",
    "HF_REPO_ID": DATASET_REPO,
})


In [None]:
stage1_done = wait(stage1)
print(f"Extract complete: {DATASET_REPO}")


In [None]:
# Load and display samples after Extract
from datasets import load_dataset

ds_extract = load_dataset(DATASET_REPO, split="train")
display_samples(ds_extract, num_samples=2)

In [None]:
# Stage 2: Describe
# Updates dataset in place (same repo)
stage2 = launch("describe", flavor=HARDWARE, env={
    "OUTPUT_DIR": "./outputs",
    "DESCRIBE_BATCH_SIZE": "8",
    "DESCRIBE_MAX_CONCURRENCY": "4",
    "SOURCE_REPO_ID": DATASET_REPO,
    "HF_REPO_ID": DATASET_REPO,
})


In [None]:
stage2_done = wait(stage2)
print(f"Describe complete: {DATASET_REPO}")


In [None]:

from datasets import load_dataset
# Load and display samples after Describe
ds_describe = load_dataset(DATASET_REPO, split="train")
display_samples(ds_describe, num_samples=2)

In [None]:
# Stage 3: Assemble
# Updates dataset in place + saves final markdown files
stage3 = launch("assemble", flavor='cpu-upgrade', env={
    "OUTPUT_DIR": "./outputs",
    "SOURCE_REPO_ID": DATASET_REPO,
    "HF_REPO_ID": DATASET_REPO,
    "HF_COMMIT_MESSAGE": "Add assembled documents with figure captions",
})


In [None]:
stage3_done = wait(stage3)
print(f"Pipeline complete! Dataset: https://huggingface.co/datasets/{DATASET_REPO}")


In [None]:
# Load and display final samples after Assemble
ds_final = load_dataset(DATASET_REPO, split="train")
display_samples(ds_final, num_samples=2)