# Gemini (Vertex AI) — Auto schema discovery + schema‑constrained JSON extraction (PDF)

This notebook implements an **LLM‑native** pipeline using **only Gemini** (no PDF parsers like pdfplumber/camelot/tabula):

1) **Schema discovery**: Gemini reads the PDF and *induces* an optimal JSON schema/hierarchy for the document  
2) **Extraction**: Gemini extracts the document into that schema using **Structured Outputs** (`response_schema`)  
3) **Validation & persistence**: Validate the extracted JSON against the discovered schema and save artifacts

✅ Designed to be flexible across different PDF layouts (text + tables + numeric data).  
✅ Works with **Vertex AI** (`vertexai=True`) using ADC credentials (no API key in code).  
✅ Uses `model = "gemini-2.5-pro"`, `temperature = 0`.

> **Auth prerequisites (Vertex AI / ADC)**  
> - Either set `GOOGLE_APPLICATION_CREDENTIALS` in your `.env` to a service account JSON  
> - or run `gcloud auth application-default login` in your environment


In [None]:
%pip -q install -U google-genai jsonschema python-dotenv tqdm

## 0) Setup: environment + Vertex AI Gemini client

In [None]:
from dotenv import load_dotenv
_ = load_dotenv()

import os, json, hashlib
from pathlib import Path
from typing import Any, Dict, Tuple, Optional, List

from google import genai
from google.genai import types
from jsonschema import validate
from tqdm import tqdm

MODEL = "gemini-2.5-pro"
PROJECT = ""
LOCATION = ""
TEMPERATURE = 0

# Vertex AI client (no API key passed explicitly)
client = genai.Client(
    vertexai=True,
    project=PROJECT,
    location=LOCATION,
)

def sha256_file(path: str) -> str:
    h = hashlib.sha256()
    with open(path, "rb") as f:
        for chunk in iter(lambda: f.read(1024 * 1024), b""):
            h.update(chunk)
    return h.hexdigest()


## 1) Helpers (no PDF parsing — only bytes/URI inputs)

In [None]:
def make_pdf_part(path_or_gcs_uri: str) -> types.Part:
    """Create a Gemini input Part for a PDF.
    - If gs:// URI: uses from_uri
    - Else: reads local bytes and uses from_bytes
    """
    if path_or_gcs_uri.startswith("gs://"):
        return types.Part.from_uri(path_or_gcs_uri, mime_type="application/pdf")
    data = Path(path_or_gcs_uri).read_bytes()
    # Use keyword arguments for maximum compatibility across SDK versions
    return types.Part.from_bytes(data=data, mime_type="application/pdf")

FORBIDDEN_SCHEMA_KEYS = {
    "$schema", "$id", "$defs", "$ref", "definitions"
}
FORBIDDEN_SCHEMA_CONSTRUCTS = {
    "oneOf", "anyOf", "allOf", "patternProperties", "dependentSchemas"
}

def sanitize_vertex_schema(schema: Dict[str, Any]) -> Dict[str, Any]:
    """Remove JSON-Schema meta/ref features that Vertex structured outputs may reject."""
    def _clean(obj):
        if isinstance(obj, dict):
            out = {}
            for k, v in obj.items():
                if k in FORBIDDEN_SCHEMA_KEYS:
                    continue
                if k in FORBIDDEN_SCHEMA_CONSTRUCTS:
                    # Drop unsupported constructs rather than fail fast.
                    # You can choose to raise instead if you prefer.
                    continue
                if k.startswith("$"):
                    continue
                out[k] = _clean(v)
            return out
        if isinstance(obj, list):
            return [_clean(x) for x in obj]
        return obj

    cleaned = _clean(schema)

    # Ensure minimal sane top-level
    if "type" not in cleaned:
        cleaned["type"] = "object"
    if "additionalProperties" not in cleaned:
        cleaned["additionalProperties"] = True

    return cleaned

def safe_json_load(s: str) -> Any:
    """Parse JSON robustly; structured outputs should already be valid JSON."""
    try:
        return json.loads(s)
    except json.JSONDecodeError:
        # Try extracting the first {...} block
        start = s.find("{")
        end = s.rfind("}")
        if start != -1 and end != -1 and end > start:
            return json.loads(s[start:end+1])
        raise

def save_json(obj: Any, path: str) -> None:
    Path(path).parent.mkdir(parents=True, exist_ok=True)
    with open(path, "w", encoding="utf-8") as f:
        json.dump(obj, f, indent=2, ensure_ascii=False)


## 2) Define a **meta‑schema** for the schema discovery step

We use Structured Outputs to force Gemini to return a **Schema Plan** JSON object with:
- `document_profile`: high-level structure signals
- `schema`: a **Vertex‑safe JSON schema** (no `$schema`, `$ref`, `$defs`, etc.)
- `system_instruction`: tailored extraction rules to use in the next step

This keeps schema discovery deterministic and machine‑readable.


In [None]:
SCHEMA_PLAN_SCHEMA: Dict[str, Any] = {
    "type": "object",
    "additionalProperties": False,
    "required": ["document_profile", "schema", "system_instruction"],
    "properties": {
        "document_profile": {
            "type": "object",
            "additionalProperties": False,
            "required": ["title_guess", "structure_style", "tables_present", "numbering_style"],
            "properties": {
                "title_guess": {"type": "string"},
                "structure_style": {
                    "type": "string",
                    "description": "e.g., 'numbered_sections', 'headings', 'freeform_blocks'",
                    "enum": ["numbered_sections", "headings", "freeform_blocks", "mixed"]
                },
                "tables_present": {"type": "boolean"},
                "numbering_style": {
                    "type": "string",
                    "description": "e.g., '1/1.1', 'A/B', 'none'",
                },
                "notes": {"type": "string"},
            },
        },
        "schema": {
            "type": "object",
            "description": "Vertex-safe JSON schema to use for extraction (no $schema/$ref/$defs).",
            "additionalProperties": True
        },
        "system_instruction": {
            "type": "string",
            "description": "System instruction to use during extraction."
        },
        "extraction_prompt": {
            "type": "string",
            "description": "Optional user prompt to use during extraction."
        }
    }
}

SCHEMA_PLAN_SCHEMA


## 3) Step 1 — Schema discovery (Gemini reads the PDF and proposes the best hierarchy)

**Important constraints (enforced in the prompt):**
- Do NOT use `$schema`, `$ref`, `$defs`
- Keep the schema **generic enough** to fit similar documents
- Prefer a clean hierarchy: `metadata` + `sections[]` + optional `subsections[]`
- Represent tables in a consistent way: `columns[]` + `rows[]` (rows as objects)

> You can reuse a discovered schema across many reports if they share the same format.


In [None]:
PDF_PATH = "/mnt/data/Report - Sample.pdf"  # change to your PDF path or gs:// URI
pdf_part = make_pdf_part(PDF_PATH)

schema_discovery_system = """You are a schema induction engine for business reports.
Your job: read the provided PDF and propose an optimal JSON schema that represents the document's content and hierarchy.

Hard constraints for the schema you output:
- Output MUST be valid JSON.
- The schema MUST be Vertex-safe:
  - DO NOT use $schema, $id, $defs, $ref, definitions
  - DO NOT use oneOf/anyOf/allOf, patternProperties, dependentSchemas
  - Use only: type, properties, required, items, enum, additionalProperties, description
- The schema MUST be flexible for similar PDFs:
  - Use arrays for repeating structures (sections/subsections)
  - Use a generic table representation: {title, kind, columns, rows}
  - Include a citations field with page numbers for traceability.

Optimization goals:
- Discover a natural hierarchy (metadata → sections → subsections).
- If the doc has numbered headings, capture those numbers.
- If the doc has key/value tables, represent them as kind='kv_pairs'.
- If the doc has matrix tables, represent them as kind='matrix'.

Return a Schema Plan that conforms to the provided Schema Plan schema.
"""

schema_discovery_user = """Analyze this PDF and generate a Schema Plan for extracting it into JSON.
Your schema should represent the best hierarchy you can infer from the document."""

resp_plan = client.models.generate_content(
    model=MODEL,
    contents=[pdf_part, schema_discovery_user],   # NOTE: plain string (no Part.from_text)
    config=types.GenerateContentConfig(
        system_instruction=schema_discovery_system,
        response_mime_type="application/json",
        response_schema=SCHEMA_PLAN_SCHEMA,
        temperature=0,
    ),
)

schema_plan = safe_json_load(resp_plan.text)

# sanitize the returned schema to be safe for Vertex structured outputs
schema_plan["schema"] = sanitize_vertex_schema(schema_plan["schema"])

schema_plan["document_profile"]


## 4) Step 2 — Schema‑constrained extraction using the discovered schema

We now call Gemini again with:
- `response_schema = schema_plan["schema"]`
- `response_mime_type = "application/json"`
- `system_instruction` tuned for the discovered structure

No external PDF parsing is performed; Gemini reads the PDF directly.


In [None]:
extraction_system = schema_plan["system_instruction"]

# If the plan included a preferred extraction prompt, use it; else default.
extraction_user = schema_plan.get("extraction_prompt") or "Extract this document into the provided JSON schema."

resp_data = client.models.generate_content(
    model=MODEL,
    contents=[pdf_part, extraction_user],  # plain string again
    config=types.GenerateContentConfig(
        system_instruction=extraction_system,
        response_mime_type="application/json",
        response_schema=schema_plan["schema"],
        temperature=TEMPERATURE,
    ),
)

extracted = safe_json_load(resp_data.text)
list(extracted.keys()) if isinstance(extracted, dict) else type(extracted)


## 5) Validate and save outputs

We validate the extracted JSON using `jsonschema.validate` against the discovered schema.

Then we save:
- `schema_plan.json`
- `extracted.json`

> If validation fails in some documents, you can add an automatic "repair" step by calling Gemini with the validation error and asking it to re-output JSON conforming to the same schema.


In [None]:
# Validate extracted JSON against the discovered schema
validate(instance=extracted, schema=schema_plan["schema"])

out_dir = "/mnt/data/gemini_auto_schema_outputs"
report_id = Path(PDF_PATH).stem
report_hash = sha256_file(PDF_PATH) if not PDF_PATH.startswith("gs://") else "gcs_uri"

plan_path = str(Path(out_dir) / f"{report_id}.schema_plan.json")
data_path = str(Path(out_dir) / f"{report_id}.extracted.json")

save_json(schema_plan, plan_path)
save_json(extracted, data_path)

plan_path, data_path


## 6) Optional: automatic repair on validation failure (Gemini-only)

If `jsonschema.validate(...)` raises an error, you can ask Gemini to **repair** the JSON.

This is helpful when the model:
- forgets a required field
- returns a value with the wrong type
- adds unexpected keys

The function below retries once with the validation error context.


In [None]:
from jsonschema.exceptions import ValidationError

def repair_to_schema(pdf_part: types.Part, schema: Dict[str, Any], extraction_system: str, bad_json: Any, error: Exception) -> Any:
    repair_system = """You are a JSON repair engine.
You MUST output JSON that conforms to the provided schema.
Do not add commentary. Do not wrap JSON in markdown.
"""

    repair_user = f"""The previous extraction did not validate against the schema.

Validation error:
{str(error)}

Here is the invalid JSON (may be incomplete):
{json.dumps(bad_json, indent=2)[:8000]}

Re-extract/repair by reading the PDF again and output ONLY corrected JSON that conforms to the schema.
"""

    resp = client.models.generate_content(
        model=MODEL,
        contents=[pdf_part, repair_user],
        config=types.GenerateContentConfig(
            system_instruction=repair_system + "\n\n" + extraction_system,
            response_mime_type="application/json",
            response_schema=schema,
            temperature=0,
        ),
    )
    return safe_json_load(resp.text)

# Example usage:
# try:
#     validate(instance=extracted, schema=schema_plan["schema"])
# except ValidationError as e:
#     extracted = repair_to_schema(pdf_part, schema_plan["schema"], extraction_system, extracted, e)
#     validate(instance=extracted, schema=schema_plan["schema"])


## 7) Batch processing (many PDFs)

Two modes:
- `reuse_schema=True`: discover schema from the **first** PDF, reuse for the rest (fast, great for periodic reports)
- `reuse_schema=False`: discover a new schema **per PDF** (flexible for mixed document types)

Still **Gemini-only** (no PDF parsers).


In [None]:
def discover_schema_plan(pdf_part: types.Part) -> Dict[str, Any]:
    resp_plan = client.models.generate_content(
        model=MODEL,
        contents=[pdf_part, schema_discovery_user],
        config=types.GenerateContentConfig(
            system_instruction=schema_discovery_system,
            response_mime_type="application/json",
            response_schema=SCHEMA_PLAN_SCHEMA,
            temperature=0,
        ),
    )
    plan = safe_json_load(resp_plan.text)
    plan["schema"] = sanitize_vertex_schema(plan["schema"])
    return plan

def extract_with_plan(pdf_part: types.Part, plan: Dict[str, Any]) -> Any:
    extraction_system = plan["system_instruction"]
    extraction_user = plan.get("extraction_prompt") or "Extract this document into the provided JSON schema."
    resp = client.models.generate_content(
        model=MODEL,
        contents=[pdf_part, extraction_user],
        config=types.GenerateContentConfig(
            system_instruction=extraction_system,
            response_mime_type="application/json",
            response_schema=plan["schema"],
            temperature=TEMPERATURE,
        ),
    )
    return safe_json_load(resp.text)

def process_pdfs(pdf_paths: List[str], out_dir: str, reuse_schema: bool = True) -> List[Tuple[str, str, str]]:
    """Returns list of (pdf_path, schema_plan_path, extracted_path)."""
    results = []
    plan = None

    for i, pdf_path in enumerate(tqdm(pdf_paths)):
        pdf_part = make_pdf_part(pdf_path)

        if (plan is None) or (not reuse_schema):
            plan = discover_schema_plan(pdf_part)

        extracted = extract_with_plan(pdf_part, plan)

        # Validate; if fails, repair once
        try:
            validate(instance=extracted, schema=plan["schema"])
        except Exception as e:
            extracted = repair_to_schema(pdf_part, plan["schema"], plan["system_instruction"], extracted, e)
            validate(instance=extracted, schema=plan["schema"])

        rid = Path(pdf_path).stem
        plan_path = str(Path(out_dir) / f"{rid}.schema_plan.json")
        data_path = str(Path(out_dir) / f"{rid}.extracted.json")
        save_json(plan, plan_path)
        save_json(extracted, data_path)

        results.append((pdf_path, plan_path, data_path))

    return results

# Example:
# pdfs = ["/path/to/r1.pdf", "/path/to/r2.pdf"]
# results = process_pdfs(pdfs, out_dir="/path/to/out", reuse_schema=True)
# results[:2]
