# Mistral OCR Workshop

This notebook demonstrates Optical Character Recognition using Mistral models on AWS.

## Introduction to Mistral OCR

**Mistral OCR** is a specialized optical character recognition model designed for extracting text, images, tables, and mathematical expressions from documents. Unlike traditional OCR models that only extract text, Mistral OCR comprehends each element of documents and returns ordered interleaved text and images in markdown format, making it ideal for multimodal document understanding and RAG systems.

### Key Features

- **State-of-the-Art Performance**: Achieves 94.89% overall accuracy on benchmarks, outperforming Google Document AI (83.42%), Azure OCR (89.52%), Gemini models (88-90%), and GPT-4o (89.77%)
- **Complex Document Understanding**: Excels at processing interleaved imagery, mathematical expressions, tables, and advanced layouts such as LaTeX formatting
- **Natively Multilingual**: Parses, understands, and transcribes thousands of scripts, fonts, and languages across all continents with 99%+ accuracy
- **Multi-format Support**: Process images (JPG, PNG, WebP, etc.) and multi-page PDF documents
- **Doc-as-Prompt & Structured Output**: Use documents as prompts and extract information into structured formats like JSON for downstream function calls and agent building
- **RAG-Ready**: Ideal model for use in Retrieval-Augmented Generation (RAG) systems with multimodal documents such as slides or complex PDFs
- **Production-Ready**: Deploy on Amazon SageMaker, la Plateforme, or self-host for organizations with stringent data privacy requirements

### Performance Highlights

Mistral OCR excels across multiple dimensions:

| Category | Mistral OCR 2503 | Best Competitor |
|----------|------------------|-----------------|
| Overall Accuracy | 94.89% | 90.23% (Gemini-1.5-Flash) |
| Mathematical Expressions | 94.29% | 89.11% (Gemini-1.5-Flash) |
| Scanned Documents | 98.96% | 96.15% (Gemini-1.5-Pro) |
| Tables | 96.12% | 91.70% (GPT-4o) |
| Multilingual | 99.02% | 97.31% (Azure OCR) |

### Use Cases

1. **Document Digitization**: Convert scanned documents, receipts, forms, and historical archives into searchable, AI-ready text
2. **Scientific Research**: Extract text, equations, tables, charts, and figures from scientific papers and journals
3. **Handwriting Recognition**: Process handwritten notes, whiteboard images, and forms
4. **Multimodal RAG Systems**: Build intelligent document understanding pipelines by combining Mistral OCR with LLMs like Mistral Small for analysis and summarization
5. **Multi-language Processing**: Handle documents in diverse linguistic backgrounds, from global organizations to hyperlocal businesses
6. **Structured Data Extraction**: Extract specific information from documents and format it into JSON for automated workflows and agent systems
7. **Cultural Heritage Preservation**: Digitize historical documents and artifacts for preservation and accessibility

This workshop will demonstrate how to deploy and use Mistral OCR on Amazon SageMaker for various document processing tasks, leveraging its industry-leading accuracy, speed, and versatility.



## Understanding Mistral OCR Response Format

Mistral OCR returns document content as **Markdown with interleaved text and images**, preserving the document structure and layout hierarchy. This format is optimized for downstream processing by LLMs and provides precise positioning information through bounding boxes.

### Supported Document Elements

Mistral OCR can extract and recognize a wide variety of document elements:

- **Standard typed text** - Regular printed text in any font
- **Multilingual text** - Mixed scripts (e.g., Asian and Roman characters)
- **Mathematical expressions** - Formulas, equations, and LaTeX notation
- **Handwriting** - Handwritten notes and annotations
- **Strikethrough text** - Text with strikethrough formatting
- **Diverse layout formats** - Complex page layouts and structures
- **Multi-column tables** - Tables with multiple columns and complex structures
- **Text with specific bounding boxes** - Precise spatial positioning
- **Text on colored backgrounds** - Text overlaid on colored regions
- **Form elements** - Checkboxes and circle selection fields

### Response Structure

Responses are returned in **Markdown format** with:
- Structural elements like pipes (`|`), LaTeX, and tables
- Layout cues that help LLMs understand document hierarchy
- Image placeholders embedded in the text

#### Image Representation

Images within documents are represented as Markdown image syntax:
```markdown
![img-0.jpeg](img-0.jpeg)
```

The image ID (e.g., `img-0.jpeg`) maps to the `pages[n].images` array, which contains:
- **Bounding box coordinates** (`top_left_x`, `top_left_y`, `bottom_right_x`, `bottom_right_y`)
- **Base64-encoded payload** (optional)
- **Image annotations** (if applicable)

#### Example Response Structure

```python
{
  'index': 13,
  'markdown': "![img-13.jpeg](img-13.jpeg)\n\nFigure 11: Examples of model responses...",
  'images': [
    {
      'id': 'img-13.jpeg',
      'top_left_x': 294,
      'top_left_y': 512,
      'bottom_right_x': 1404,
      'bottom_right_y': 1568,
      'image_base64': None,
      'image_annotation': None
    }
  ],
  'dimensions': {
    'dpi': 200,
    'height': 2200,
    'width': 1700
  }
}
```

This structured format enables precise extraction, spatial understanding, and seamless integration with downstream AI pipelines.

## Setup and Imports

First, let's import the necessary libraries for OCR processing.

In [None]:
import base64
import os
import boto3
import json
from typing import Optional, Dict, Any
from IPython.display import Markdown, display

## Helper Functions

These helper functions support image processing, model invocation, and post-response processing for the Mistral OCR model.

In [None]:
def encode_local_file_base64(file_path: str, file_type: Optional[str] = None) -> str:
    """
    Encode a local file (image or PDF) to base64 string.
    
    Args:
        file_path: Path to the local file
        file_type: Type of file ('image' or 'pdf'). If None, inferred from extension.
    
    Returns:
        Base64 encoded string of the file
    """
    if file_type is None:
        ext = os.path.splitext(file_path)[1].lower()
        if ext == ".pdf":
            file_type = "pdf"
        elif ext in (".jpg", ".jpeg", ".png", ".gif", ".bmp", ".webp"):
            file_type = "image"
        else:
            raise ValueError(f"Unsupported file type from extension: {ext}")

    try:
        with open(file_path, "rb") as file:
            encoded_data = base64.b64encode(file.read()).decode("utf-8")
            return encoded_data
    except Exception as e:
        print(f"Failed to encode {file_type} at {file_path}: {e}")
        raise

def run_inference(client, endpoint_name: str, payload: dict[str, Any]) -> Dict[str, Any]:
    """
    Invoke the SageMaker endpoint for OCR inference.
    
    Args:
        client: SageMaker runtime client
        endpoint_name: Name of the deployed endpoint
        payload: JSON payload containing the image data
        
    Returns:
        Dictionary containing parsed OCR results
    """
    try:
        inference_out = client.invoke_endpoint(
            EndpointName=endpoint_name,
            ContentType="application/json",
            Body=json.dumps(payload)
        )
        inference_resp_str = inference_out["Body"].read().decode("utf-8")
        return json.loads(inference_resp_str)
    except Exception as e:
        print(f"Inference error: {e}")
        raise


def replace_images_in_markdown(markdown_str: str, images_dict: dict) -> str:
    """
    Replace image placeholders in markdown with base64-encoded images.
    
    Args:
        markdown_str: Markdown string with image placeholders
        images_dict: Dictionary mapping image names to base64 strings
    
    Returns:
        Markdown string with embedded base64 images
    """
    for img_name, base64_str in images_dict.items():
        markdown_str = markdown_str.replace(
            f"![{img_name}]({img_name})", f"![{img_name}]({base64_str})"
        )
    return markdown_str

def get_combined_markdown(ocr_response: dict) -> str:
    """
    Combine OCR text and images into a single markdown document.
    
    Args:
        ocr_response: Response dictionary from OCR model
    
    Returns:
        Combined markdown string with embedded images
    """
    markdowns = []
    for page in ocr_response["pages"]:
        image_data = {img["id"]: img["image_base64"] for img in page.get("images", [])}
        markdown_with_images = replace_images_in_markdown(page["markdown"], image_data)
        markdowns.append(markdown_with_images)
    return "\n\n".join(markdowns)

In [None]:
# In the workshop, use pre-built Mistral OCR endpoint for inference

def run_inference_with_api(api_endpoint: str, payload: dict[str, Any], timeout: int = 300) -> Dict[str, Any]:
    """
    Invoke the API Gateway endpoint for OCR inference using HTTP requests.
    
    Args:
      api_endpoint: URL of the API Gateway endpoint
      payload: JSON payload containing the image data
      timeout: Request timeout in seconds (default: 300)
      
    Returns:
      Dictionary containing parsed OCR results
    """
    try:
      response = requests.post(
          api_endpoint,
          json=payload,
          headers={"Content-Type": "application/json"},
          timeout=timeout
      )

      # Raise an exception for bad status codes
      response.raise_for_status()

      # Parse and return the JSON response
      return response.json()
    
    except requests.exceptions.Timeout:
        print(f"Request timeout after {timeout} seconds")
        raise
    
    except requests.exceptions.RequestException as e:
        print(f"API request error: {e}")
        if hasattr(e, 'response') and e.response is not None:
            print(f"Response status: {e.response.status_code}")
            print(f"Response body: {e.response.text}")
        raise

## Using Mistral OCR

Now let's use the Mistral OCR model to extract text from an image document. 

## Model Access and Deployment  

**Important Notice:**                                                                                                                                                              
The Mistral OCR model is available from AWS Marketplace via private offer. If you want to trial this model, please contact your account manager and provide your AWS account ID to whitelist for this model access.                                                                                                                                                   Then, please follow this link to deploy the Mistral OCR model on a SageMaker endpoint:                                                                                            [Mistral OCR SageMaker Deployment Guide](https://github.com/aws-samples/mistral-on-aws/blob/main/Mistral%20OCR/Mistral-OCR-SageMaker-Deployment-example.ipynb)     


**For Workshop Participants:**                                                                                                                                                     During the workshop, we will use a pre-created API link to access the Mistral OCR model. The link will be provided during the workshop session. 

In [None]:
# If your AWS account is whitelisted to the model access, and also Mistral OCR model is deployed as SageMaker endpoint, please unncomment and use below code: 

# MISTRAL_OCR_ENDPOINT_NAME = "mistral-ocr-endpoint"
# image_b64 = encode_local_file_base64(file_path="images/french.png")

# # Prepare the payload for Mistral OCR model
# payload = {
#     "model": "mistral-ocr-2505",
#     "document": {
#         "type": "image_url",
#         "image_url": f"data:image/jpeg;base64,{image_b64}"
#     }
# }

# # Create a client and invoke the endpoint
# sagemaker_client = boto3.client("sagemaker-runtime")
# result_parsed = run_inference(client=sagemaker_client, endpoint_name=MISTRAL_OCR_ENDPOINT_NAME, payload=payload)

# # Display final markdown content with embedded images
# display(Markdown(get_combined_markdown(result_parsed)))

## Example: Whiteboard/Handwriting OCR

Extract text from whiteboard images or handwritten notes using Mistral OCR.

In [None]:
# Process whiteboard image
whiteboard_b64 = encode_local_file_base64(file_path="images/whiteboard.png")

# Prepare the payload for Mistral OCR model
whiteboard_payload = {
    "model": "mistral-ocr-2505",
    "document": {
        "type": "image_url",
        "image_url": f"data:image/png;base64,{whiteboard_b64}"
    }
}

# # Invoke the endpoint
# whiteboard_result = run_inference(
#     client=sagemaker_client, 
#     endpoint_name=MISTRAL_OCR_ENDPOINT_NAME, 
#     payload=whiteboard_payload
# )

# In Workshop, invoke the pre-built API endpoint 
whiteboard_result =  run_inference_with_api(
    API_ENDPOINT, 
    whiteboard_payload) 


# Display the extracted text
print("Extracted Whiteboard Content:")
display(Markdown(get_combined_markdown(whiteboard_result)))

### Example: Low-Resolution Handwriting Recognition

Now let's test Mistral OCR's ability to handle challenging, low-quality handwritten content. This demonstrates the model's robustness even when dealing with compressed or poor-quality images.

In [None]:
# Process low-resolution handwriting image
handwriting_b64 = encode_local_file_base64(file_path="images/handwriting_low_res_2_resize.jpg")

# Prepare the payload for Mistral OCR model
handwriting_payload = {
    "model": "mistral-ocr-2505",
    "document": {
        "type": "image_url",
        "image_url": f"data:image/jpeg;base64,{handwriting_b64}"
    }
}

# # Invoke the endpoint
# handwriting_result = run_inference(
#     client=sagemaker_client, 
#     endpoint_name=MISTRAL_OCR_ENDPOINT_NAME, 
#     payload=handwriting_payload
# )

# In Workshop, invoke the pre-built API endpoint 
handwriting_result =  run_inference_with_api(
    API_ENDPOINT, 
    handwriting_payload) 

# Display the extracted text
print("Extracted Low-Resolution Handwriting Content:")
display(Markdown(get_combined_markdown(handwriting_result)))

## Example: Invoice Processing

Process invoice images to extract structured data such as vendor details, line items, totals, and dates. This is a common use case for automating accounts payable workflows and financial document processing.

In [None]:
# Process a single invoice
invoice_b64 = encode_local_file_base64(file_path="images/invoice_1.jpg")

# Prepare the payload for Mistral OCR model
invoice_payload = {
    "model": "mistral-ocr-2505",
    "document": {
        "type": "image_url",
        "image_url": f"data:image/jpeg;base64,{invoice_b64}"
    }
}

# # Invoke the endpoint
# invoice_result = run_inference(
#     client=sagemaker_client, 
#     endpoint_name=MISTRAL_OCR_ENDPOINT_NAME, 
#     payload=invoice_result
# )

# In Workshop, invoke the pre-built API endpoint 
invoice_result =  run_inference_with_api(
    API_ENDPOINT, 
    invoice_payload) 

# Display the extracted invoice content
print("Extracted Invoice Content:")
display(Markdown(get_combined_markdown(invoice_result)))

## Example: Police Report OCR

Extract text and structured information from police report documents. This demonstrates Mistral OCR's ability to handle official documents with mixed layouts, stamps, and form fields.

In [None]:
# Process police report image
police_report_b64 = encode_local_file_base64(file_path="images/Sample-Police-Report.jpeg")

# Prepare the payload for Mistral OCR model
police_report_payload = {
    "model": "mistral-ocr-2505",
    "document": {
        "type": "image_url",
        "image_url": f"data:image/jpeg;base64,{police_report_b64}"
    }
}

# # Invoke the endpoint
# police_report_result = run_inference(
#     client=sagemaker_client, 
#     endpoint_name=MISTRAL_OCR_ENDPOINT_NAME, 
#     payload=police_report_payload
# )

# In Workshop, invoke the pre-built API endpoint 
police_report_result = run_inference_with_api(
    API_ENDPOINT, 
    police_report_payload
) 

# Display the extracted text
print("Extracted Police Report Content:")
display(Markdown(get_combined_markdown(police_report_result)))

## Example: Mathematical Formula OCR

Extract mathematical expressions and equations from images and convert them to accurate LaTeX notation. Mistral OCR excels at recognizing complex mathematical formulas, including integrals, summations, fractions, Greek letters, and special symbols, and automatically converts them into properly formatted LaTeX code. This is particularly valuable for digitizing scientific papers, educational materials, and technical documentation where mathematical precision is critical.

In [None]:
# Process mathematical formula image
math_formula_b64 = encode_local_file_base64(file_path="images/math_formula.png")

# Prepare the payload for Mistral OCR model
math_formula_payload = {
    "model": "mistral-ocr-2505",
    "document": {
        "type": "image_url",
        "image_url": f"data:image/png;base64,{math_formula_b64}"
    }
}

# # Invoke the endpoint
# math_formula_result = run_inference(
#     client=sagemaker_client, 
#     endpoint_name=MISTRAL_OCR_ENDPOINT_NAME, 
#     payload=math_formula_payload
# )

# In Workshop, invoke the pre-built API endpoint 
math_formula_result = run_inference_with_api(
    API_ENDPOINT, 
    math_formula_payload
) 

# Display the extracted mathematical content with LaTeX
print("Extracted Mathematical Formula (LaTeX):")
display(Markdown(get_combined_markdown(math_formula_result)))

In [None]:
# in Workshop, plase use below code to access the pre-built Mistral OCR endpoint
import requests

# API Gateway endpoint URL
API_ENDPOINT = "<will provide in workshop>"

# Encode the image (reusing the same encoded image from above)
image_b64 = encode_local_file_base64(file_path="images/french.png")

# Prepare the payload for API request
api_payload = {
    "model": "mistral-ocr-2505",
    "document": {
        "type": "image_url",
        "image_url": f"data:image/jpeg;base64,{image_b64}",
      
  },
    "include_image_base64": True  # Add this parameter to include embeded images in the output
}

# Make POST request to API Gateway
result_parsed =  run_inference_with_api(API_ENDPOINT, api_payload) 

# Display final markdown content with embedded images
print("OCR Result from API Gateway:")
display(Markdown(get_combined_markdown(result_parsed)))


## Document Understanding Pipeline with OCR and LLM

Combine Mistral OCR with Pixtral Large to build an intelligent document understanding pipeline that can extract text and answer questions about the content.

In [None]:
# Helper functions for Bedrock Runtime
bedrock_runtime = boto3.client("bedrock-runtime")

def bedrock_converse(system_prompt: str, messages: list, endpoint_arn: str, display_usage=False):
    """Invoke model using Converse API"""
    system = [{"text": system_prompt}]
    
    response = bedrock_runtime.converse(
        modelId=endpoint_arn,
        messages=messages,
        system=system,
        additionalModelRequestFields={"max_tokens": 2000, "temperature": 0.3}
    )

    output_content = ''.join(
        content['text'] for content in response['output']['message']['content']
    )

    if display_usage:
        token_usage = response['usage']
        print(f"\tLatency: {response['metrics']['latencyMs']}ms")
    
    return output_content

def bedrock_converse_stream(system_prompt: str, messages: list, endpoint_arn: str):
    """Invoke model with streaming"""
    system = [{"text": system_prompt}]
    
    response = bedrock_runtime.converse_stream(
        modelId=endpoint_arn,
        messages=messages,
        system=system,
        additionalModelRequestFields={"max_tokens": 2000, "temperature": 0.3}
    )
    
    stream = response.get('stream')
    output_content = ''
    
    if stream:
        for event in stream:
            if 'messageStart' in event:
                print(f"\nRole: {event['messageStart']['role']}")
            
            if 'contentBlockDelta' in event:
                text_chunk = event['contentBlockDelta']['delta']['text']
                print(text_chunk, end="")
                output_content += text_chunk
            
            if 'messageStop' in event:
                print(f"\nStop reason: {event['messageStop']['stopReason']}")
            
            if 'metadata' in event:
                metadata = event['metadata']
                if 'metrics' in metadata:
                    print(f"Latency: {metadata['metrics']['latencyMs']}ms")
    
    return output_content

In [None]:
def document_understanding_pipeline(
    image_path: str,
    user_prompt: str,
    ocr_endpoint: str=None,
    llm_endpoint: str=None
) -> str:
    """
    Full pipeline for document understanding from image input.

    Args:
        image_path: Local path to the document image
        user_prompt: What insights the user wants to extract
        ocr_endpoint: SageMaker endpoint for OCR model
        llm_endpoint: Bedrock endpoint for document understanding LLM

    Returns:
        Model-generated response with document insights
    """

    # Step 1: Encode local file using helper
    encoded_image = encode_local_file_base64(image_path)

    # payload = {
    #     "model": "mistral-ocr-2505",
    #     "document": {
    #         "type": "image_url",
    #         "image_url": f"data:image/jpeg;base64,{encoded_image}"
    #     }
    # }

    api_payload = {
        "model": "mistral-ocr-2505",
        "document": {
            "type": "image_url",
            "image_url": f"data:image/jpeg;base64,{image_b64}",
          
      },
        "include_image_base64": False  # Add this parameter to include embeded images in the output
    }


    # Step 2: Run OCR model
    print("Running OCR model...")
    # ocr_result = run_inference(client=sagemaker_client, endpoint_name=ocr_endpoint, payload=payload) # SageMaker endpoint 
    ocr_result =  run_inference_with_api(API_ENDPOINT, api_payload) # Workshop pre-built API endpoint


    # Step 3: Convert OCR output to Markdown
    print("Formatting OCR output...")
    markdown_doc = get_combined_markdown(ocr_result)

    print("----- OCR Text  -----")
    display(Markdown(markdown_doc))

    # Step 4: Prepare LLM messages
    system_prompt = (
        "You are a document understanding assistant. The user will provide structured OCR content "
        "from a scanned document. Use that information to generate clear, factual insights that "
        "answer the user's request."
    )

    messages = [
        {
            "role": "user",
            "content": [{"text": f"{user_prompt}\n\n--- Document Content ---\n{markdown_doc}"}]
        }
    ]

    # Step 5: Call Bedrock LLM with streaming
    print("Running LLM for document insights...")
    insights = bedrock_converse_stream(system_prompt, messages, llm_endpoint)

    return insights

## Example: Document Summarization

Let's use the pipeline to extract and summarize a document.

In [None]:
image_path = "images/french.png"
user_prompt = "can you summarise this document"
# ocr_endpoint = MISTRAL_OCR_ENDPOINT_NAME
llm_endpoint = "us.mistral.pixtral-large-2502-v1:0" # Use Pixtral Large model from Bedrock

# document_understanding_pipeline(image_path, user_prompt, ocr_endpoint, llm_endpoint)
document_understanding_pipeline(image_path=image_path, user_prompt=user_prompt, llm_endpoint=llm_endpoint)

## Advanced: Multi-Agent Document Intelligence with OCR + Strands Agents

In this section, we'll go beyond simple OCR extraction to build a **sophisticated multi-agent document processing system** using **Strands Agents**. This demonstrates how to orchestrate multiple specialized AI agents that work together to analyze documents, enforce compliance rules, and generate actionable insights.

### What is Strands Agents?

**Strands Agents** is a Python framework for building agentic AI applications with tool use capabilities. It provides:
- **Agent Orchestration**: Coordinate multiple specialized agents in workflows
- **Tool Functions**: Decorate Python functions with `@tool` to make them callable by agents
- **Model Integration**: Works seamlessly with Amazon Bedrock models like Pixtral Large
- **State Management**: Track execution across multi-step workflows

Learn more: [https://github.com/strands-agents/strands-agents](https://github.com/strands-agents/strands-agents)

### What We'll Build: Invoice & Document Compliance Workflow

We'll create an **intelligent document processing pipeline** that:
1. **Extracts** text from invoices and official documents using Mistral OCR
2. **Classifies** document types automatically (invoice, report, form, etc.)
3. **Analyzes** documents with domain-specific agents
4. **Validates** against compliance rules (duplicate detection, anomaly detection)
5. **Synthesizes** findings into an executive summary with flagged issues

### Real-World Use Cases

This pattern is applicable to:
- **Accounts Payable Automation**: Process invoices, detect duplicates, flag anomalies
- **Insurance Claims Processing**: Analyze claim documents and police reports
- **Legal Document Review**: Extract key information from contracts and forms
- **Compliance Auditing**: Validate documents against regulatory requirements
- **Research Paper Analysis**: Extract equations, citations, and key findings

### Multi-Agent Architecture

Our workflow uses 5 specialized agents:

1. **Document Triage Agent**: Classifies document type (invoice, police report, form)
2. **Invoice Analysis Agent**: Extracts vendor, amounts, line items, checks for issues
3. **Report Analysis Agent**: Analyzes official documents like police reports
4. **Compliance Agent**: Validates documents against business rules
5. **Synthesis Agent**: Generates executive summary with prioritized actions

### Why Multi-Agent vs Single-Agent?

| Single Agent | Multi-Agent Workflow |
|-------------|---------------------|
| Generic prompts | Specialized expertise per domain |
| Limited context window | Distributed processing |
| Hard to maintain | Modular and scalable |
| One-size-fits-all | Tailored analysis per document type |

### Workflow Steps

```
1. OCR Extraction → 2. Document Triage → 3. Specialized Analysis → 4. Compliance Check → 5. Synthesis
```

Let's build it!

### Setup: Install and Import Strands Agents

First, let's install Strands Agents and import the necessary components.

In [None]:
# Install Strands Agents (if not already installed)
# !pip install strands-agents>=0.1.6

# Import Strands components
from strands import Agent
from strands.models import BedrockModel
from strands.tools import tool
from dataclasses import dataclass, field
from typing import List
from datetime import datetime

print("✅ Strands Agents imported successfully!")

In [None]:
# Document metadata (file paths for processing)
# The OCR text will be extracted dynamically when needed

DOCUMENTS_TO_PROCESS = {
    "doc_001": {
        "doc_id": "doc_001",
        "file_path": "images/invoice_1.jpg",
        "doc_type": "unknown"  # Will be classified by triage agent
    },
    "doc_002": {
        "doc_id": "doc_002", 
        "file_path": "images/invoice_2.jpg",
        "doc_type": "unknown"
    },
    "doc_003": {
        "doc_id": "doc_003",
        "file_path": "images/Invoice_3.jpg",
        "doc_type": "unknown"
    },
    "doc_004": {
        "doc_id": "doc_004",
        "file_path": "images/Sample-Police-Report.jpeg",
        "doc_type": "unknown"
    }
}

# Cache for extracted OCR results (to avoid re-processing same images)
OCR_CACHE = {}

print(f"✅ Document metadata created for {len(DOCUMENTS_TO_PROCESS)} documents")
print("\nDocuments to process:")
for doc_id, doc in DOCUMENTS_TO_PROCESS.items():
    print(f"  - {doc_id}: {doc['file_path']}")

In [None]:
@tool
def extract_document_text(doc_id: str) -> str:
    """
    Extract text from a document using Mistral OCR.
    Caches results to avoid re-processing.
    
    Args:
        doc_id: Document ID to extract text from
    
    Returns:
        JSON string with document details and extracted OCR text
    """
    if doc_id not in DOCUMENTS_TO_PROCESS:
        return json.dumps({"error": f"Document {doc_id} not found"})
    
    # Check cache first
    if doc_id in OCR_CACHE:
        print(f"  [Using cached OCR for {doc_id}]")
        return json.dumps(OCR_CACHE[doc_id], indent=2)
    
    # Extract from document
    doc_info = DOCUMENTS_TO_PROCESS[doc_id]
    file_path = doc_info['file_path']
    
    print(f"  [Running OCR on {file_path}...]")
    
    try:
        # Encode image
        image_b64 = encode_local_file_base64(file_path=file_path)
        
        # Prepare OCR payload
        ocr_payload = {
            "model": "mistral-ocr-2505",
            "document": {
                "type": "image_url",
                "image_url": f"data:image/jpeg;base64,{image_b64}"
            },
            "include_image_base64": False
        }
        
        # Call OCR endpoint
        ocr_result = run_inference_with_api(API_ENDPOINT, ocr_payload)
        
        # Extract markdown text
        ocr_text = get_combined_markdown(ocr_result)
        
        # Cache result
        result = {
            "doc_id": doc_id,
            "file_path": file_path,
            "ocr_text": ocr_text,
            "doc_type": doc_info['doc_type']
        }
        OCR_CACHE[doc_id] = result
        
        print(f"  [OCR complete: {len(ocr_text)} characters extracted]")
        return json.dumps(result, indent=2)
        
    except Exception as e:
        return json.dumps({"error": f"OCR extraction failed: {str(e)}"})


@tool
def list_documents(doc_type: str = None) -> str:
    """
    List all documents available for processing.
    
    Args:
        doc_type: Filter by document type (optional)
    
    Returns:
        JSON string with list of documents
    """
    docs = list(DOCUMENTS_TO_PROCESS.values())
    
    if doc_type and doc_type != "unknown":
        docs = [d for d in docs if d['doc_type'] == doc_type]
    
    result = [{"doc_id": d['doc_id'], "file_path": d['file_path'], "doc_type": d['doc_type']}
              for d in docs]
    
    return json.dumps(result, indent=2)


@tool
def check_duplicate_invoice(doc_id: str, vendor: str, amount: float, date: str) -> str:
    """
    Check if an invoice might be a duplicate by comparing with other processed invoices.
    
    Args:
        doc_id: Current document ID (to exclude from comparison)
        vendor: Vendor name
        amount: Invoice amount
        date: Invoice date
    
    Returns:
        JSON string with duplicate check results
    """
    matches = []
    
    # Check all cached documents
    for cached_doc_id, cached_doc in OCR_CACHE.items():
        if cached_doc_id == doc_id:
            continue  # Skip self
            
        # Simple heuristic: check if vendor and amount appear in OCR text
        ocr_text = cached_doc.get('ocr_text', '').lower()
        
        if (vendor.lower() in ocr_text and 
            str(amount) in ocr_text and
            date in ocr_text):
            matches.append({
                "doc_id": cached_doc_id,
                "file_path": cached_doc.get('file_path'),
                "match_reason": "Vendor, amount, and date found in document"
            })
    
    result = {
        "is_duplicate": len(matches) > 0,
        "match_count": len(matches),
        "matches": matches,
        "note": "This is a heuristic check based on OCR text content"
    }
    
    return json.dumps(result, indent=2)


print("✅ Tool functions defined:")
print("  - extract_document_text(doc_id) - Calls OCR API dynamically")
print("  - list_documents(doc_type)")
print("  - check_duplicate_invoice(doc_id, vendor, amount, date)")

In [None]:
# Initialize Bedrock model for agents
bedrock_model_agents = BedrockModel(
    model_id="us.mistral.pixtral-large-2502-v1:0",
    streaming=False
)

def create_triage_agent():
    """Agent that classifies document types"""
    system_prompt = """You are a document classification specialist. 
Analyze document content and classify it as: invoice, police_report, contract, form, or other.
Provide brief reasoning for your classification."""
    
    agent = Agent(model=bedrock_model_agents, tools=[extract_document_text], system_prompt=system_prompt)
    return agent

def create_invoice_analyst_agent():
    """Agent specialized in invoice analysis"""
    system_prompt = """You are an invoice analysis specialist. Your role:
1. Extract key information (vendor, invoice number, amounts, dates)
2. Validate invoice structure and completeness
3. Check for anomalies (unusual amounts, missing fields, calculation errors)
4. Use check_duplicate_invoice to detect potential duplicates
5. Flag potential issues for review

Be thorough and flag anything suspicious."""
    
    agent = Agent(model=bedrock_model_agents, 
                 tools=[extract_document_text, check_duplicate_invoice], 
                 system_prompt=system_prompt)
    return agent

def create_report_analyst_agent():
    """Agent specialized in official report analysis"""
    system_prompt = """You are an official document analyst specializing in reports.
Extract key information such as:
- Report numbers and dates
- Incident types and locations  
- Key parties involved
- Financial impacts
- Status and follow-up actions

Provide structured summaries."""
    
    agent = Agent(model=bedrock_model_agents, tools=[extract_document_text], system_prompt=system_prompt)
    return agent

def create_compliance_agent():
    """Agent that validates compliance rules"""
    system_prompt = """You are a compliance validation specialist. Check documents for:
1. Duplicate submissions (use check_duplicate_invoice tool)
2. Missing required fields
3. Amounts exceeding thresholds ($5,000+)
4. Date inconsistencies
5. Policy violations

Assign risk levels: LOW, MEDIUM, HIGH, CRITICAL."""
    
    agent = Agent(model=bedrock_model_agents, 
                 tools=[list_documents, extract_document_text, check_duplicate_invoice], 
                 system_prompt=system_prompt)
    return agent

def create_synthesis_agent():
    """Agent that synthesizes findings"""
    system_prompt = """You are an executive synthesis specialist. Create:
1. Executive Summary
2. Key Findings (prioritized by risk)
3. Flagged Issues requiring immediate attention
4. Recommended Actions

Be concise and actionable."""
    
    agent = Agent(model=bedrock_model_agents, tools=[], system_prompt=system_prompt)
    return agent

print("✅ Specialized agents created:")
print("  - Triage Agent")
print("  - Invoice Analyst Agent")
print("  - Report Analyst Agent")
print("  - Compliance Agent")
print("  - Synthesis Agent")

In [None]:
# Initialize Bedrock model for agents
bedrock_model_agents = BedrockModel(
    model_id="us.mistral.pixtral-large-2502-v1:0",
    streaming=False
)

def create_triage_agent():
    """Agent that classifies document types"""
    system_prompt = """You are a document classification specialist. 
Analyze document content and classify it as: invoice, police_report, contract, form, or other.
Provide brief reasoning for your classification."""
    
    agent = Agent(model=bedrock_model_agents, tools=[get_document], system_prompt=system_prompt)
    return agent

def create_invoice_analyst_agent():
    """Agent specialized in invoice analysis"""
    system_prompt = """You are an invoice analysis specialist. Your role:
1. Extract key information (vendor, invoice number, amounts, dates)
2. Validate invoice structure and completeness
3. Check for anomalies (unusual amounts, missing fields, calculation errors)
4. Flag potential issues for review

Be thorough and flag anything suspicious."""
    
    agent = Agent(model=bedrock_model_agents, 
                 tools=[get_document, check_duplicate_invoice], 
                 system_prompt=system_prompt)
    return agent

def create_report_analyst_agent():
    """Agent specialized in official report analysis"""
    system_prompt = """You are an official document analyst specializing in reports.
Extract key information such as:
- Report numbers and dates
- Incident types and locations
- Key parties involved
- Financial impacts
- Status and follow-up actions

Provide structured summaries."""
    
    agent = Agent(model=bedrock_model_agents, tools=[get_document], system_prompt=system_prompt)
    return agent

def create_compliance_agent():
    """Agent that validates compliance rules"""
    system_prompt = """You are a compliance validation specialist. Check documents for:
1. Duplicate submissions
2. Missing required fields
3. Amounts exceeding thresholds ($5,000+)
4. Date inconsistencies
5. Policy violations

Assign risk levels: LOW, MEDIUM, HIGH, CRITICAL."""
    
    agent = Agent(model=bedrock_model_agents, 
                 tools=[list_documents, get_document, check_duplicate_invoice], 
                 system_prompt=system_prompt)
    return agent

def create_synthesis_agent():
    """Agent that synthesizes findings"""
    system_prompt = """You are an executive synthesis specialist. Create:
1. Executive Summary
2. Key Findings (prioritized by risk)
3. Flagged Issues requiring immediate attention
4. Recommended Actions

Be concise and actionable."""
    
    agent = Agent(model=bedrock_model_agents, tools=[], system_prompt=system_prompt)
    return agent

print("✅ Specialized agents created:")
print("  - Triage Agent")
print("  - Invoice Analyst Agent")
print("  - Report Analyst Agent")
print("  - Compliance Agent")
print("  - Synthesis Agent")

### Workflow State Management

Define the state object to track the multi-agent workflow.

In [None]:
@dataclass
class DocumentWorkflowState:
    """State object for document processing workflow"""
    document_ids: List[str]
    
    # Classification results
    document_classifications: Dict[str, str] = field(default_factory=dict)
    
    # Analysis results
    invoice_analysis: Dict[str, Any] = field(default_factory=dict)
    report_analysis: Dict[str, Any] = field(default_factory=dict)
    
    # Compliance results
    compliance_findings: List[Dict] = field(default_factory=list)
    flagged_issues: List[Dict] = field(default_factory=list)
    
    # Final output
    executive_summary: str = ""
    
    # Tracking
    agents_invoked: List[str] = field(default_factory=list)
    execution_time: float = 0.0

print("✅ Workflow state management ready!")

In [None]:
def run_document_workflow(document_ids: List[str]) -> DocumentWorkflowState:
    """
    Orchestrate multi-agent document processing workflow.
    
    Workflow Steps:
    1. Triage - Classify each document (calls OCR)
    2. Specialized Analysis - Route to appropriate analyst
    3. Compliance Check - Validate against rules
    4. Synthesis - Generate executive summary
    
    Args:
        document_ids: List of document IDs to process
        
    Returns:
        DocumentWorkflowState with complete analysis
    """
    import time
    start_time = time.time()
    
    state = DocumentWorkflowState(document_ids=document_ids)
    
    print("="*80)
    print("📄 DOCUMENT PROCESSING WORKFLOW")
    print("="*80)
    print(f"Processing {len(document_ids)} documents\n")
    
    # STEP 1: Triage and Classification
    print("\n📋 STEP 1: Document Triage & Classification")
    print("-"*80)
    triage_agent = create_triage_agent()
    state.agents_invoked.append("triage")
    
    for doc_id in document_ids:
        query = f"Classify the document type for document {doc_id}. Use extract_document_text to retrieve and analyze it."
        response = triage_agent(query)
        response_str = str(response)
        
        # Determine classification
        if "invoice" in response_str.lower():
            doc_type = "invoice"
        elif "police" in response_str.lower() or "report" in response_str.lower():
            doc_type = "police_report"
        else:
            doc_type = "other"
        
        state.document_classifications[doc_id] = doc_type
        print(f"  ✓ {doc_id}: {doc_type}")
    
    # STEP 2: Specialized Analysis
    print("\n\n🔍 STEP 2: Specialized Analysis")
    print("-"*80)
    
    invoice_agent = create_invoice_analyst_agent()
    report_agent = create_report_analyst_agent()
    
    for doc_id, doc_type in state.document_classifications.items():
        if doc_type == "invoice":
            print(f"\n  Analyzing invoice {doc_id}...")
            query = f"Analyze invoice {doc_id}. Extract all details, check for anomalies, and use check_duplicate_invoice to detect duplicates."
            response = invoice_agent(query)
            state.invoice_analysis[doc_id] = str(response)
            state.agents_invoked.append(f"invoice_analyst_{doc_id}")
            print(f"  ✓ Invoice analysis complete")
            
        elif doc_type == "police_report":
            print(f"\n  Analyzing police report {doc_id}...")
            query = f"Analyze police report {doc_id}. Extract key incident details."
            response = report_agent(query)
            state.report_analysis[doc_id] = str(response)
            state.agents_invoked.append(f"report_analyst_{doc_id}")
            print(f"  ✓ Report analysis complete")
    
    # STEP 3: Compliance Check
    print("\n\n🛡️  STEP 3: Compliance Validation")
    print("-"*80)
    compliance_agent = create_compliance_agent()
    state.agents_invoked.append("compliance")
    
    query = "Review all processed invoices for compliance issues: duplicates, missing fields, amounts >$5000, anomalies. List all documents first."
    response = compliance_agent(query)
    state.compliance_findings = [str(response)]
    print(f"  ✓ Compliance check complete")
    
    # STEP 4: Synthesis
    print("\n\n💡 STEP 4: Executive Synthesis")
    print("-"*80)
    synthesis_agent = create_synthesis_agent()
    state.agents_invoked.append("synthesis")
    
    synthesis_context = f"""Based on document processing results:

DOCUMENTS PROCESSED: {len(document_ids)}
CLASSIFICATIONS: {state.document_classifications}

INVOICE ANALYSIS:
{state.invoice_analysis}

REPORT ANALYSIS:
{state.report_analysis}

COMPLIANCE FINDINGS:
{state.compliance_findings}

Create an executive summary with key findings and recommended actions."""
    
    response = synthesis_agent(synthesis_context)
    state.executive_summary = str(response)
    
    state.execution_time = time.time() - start_time
    
    # Display final results
    print("\n\n" + "="*80)
    print("📝 EXECUTIVE SUMMARY")
    print("="*80)
    print(state.executive_summary)
    
    print("\n" + "="*80)
    print("📊 WORKFLOW SUMMARY")
    print("="*80)
    print(f"Documents Processed: {len(document_ids)}")
    print(f"Agents Invoked: {len(state.agents_invoked)}")
    print(f"Total Execution Time: {state.execution_time:.2f}s")
    print(f"OCR Cache Entries: {len(OCR_CACHE)}")
    print("="*80)
    
    return state

print("✅ Workflow orchestrator ready!")

In [None]:
def run_document_workflow(document_ids: List[str]) -> DocumentWorkflowState:
    """
    Orchestrate multi-agent document processing workflow.
    
    Workflow Steps:
    1. Triage - Classify each document
    2. Specialized Analysis - Route to appropriate analyst
    3. Compliance Check - Validate against rules
    4. Synthesis - Generate executive summary
    
    Args:
        document_ids: List of document IDs to process
        
    Returns:
        DocumentWorkflowState with complete analysis
    """
    import time
    start_time = time.time()
    
    state = DocumentWorkflowState(document_ids=document_ids)
    
    print("="*80)
    print("📄 DOCUMENT PROCESSING WORKFLOW")
    print("="*80)
    print(f"Processing {len(document_ids)} documents\n")
    
    # STEP 1: Triage and Classification
    print("\n📋 STEP 1: Document Triage & Classification")
    print("-"*80)
    triage_agent = create_triage_agent()
    state.agents_invoked.append("triage")
    
    for doc_id in document_ids:
        query = f"Classify the document type for document {doc_id}. Use get_document to retrieve it."
        response = triage_agent(query)
        response_str = str(response)
        
        # Determine classification
        if "invoice" in response_str.lower():
            doc_type = "invoice"
        elif "police" in response_str.lower() or "report" in response_str.lower():
            doc_type = "police_report"
        else:
            doc_type = "other"
        
        state.document_classifications[doc_id] = doc_type
        print(f"  ✓ {doc_id}: {doc_type}")
    
    # STEP 2: Specialized Analysis
    print("\n\n🔍 STEP 2: Specialized Analysis")
    print("-"*80)
    
    invoice_agent = create_invoice_analyst_agent()
    report_agent = create_report_analyst_agent()
    
    for doc_id, doc_type in state.document_classifications.items():
        if doc_type == "invoice":
            print(f"\n  Analyzing invoice {doc_id}...")
            query = f"Analyze invoice {doc_id}. Extract all details and check for duplicates."
            response = invoice_agent(query)
            state.invoice_analysis[doc_id] = str(response)
            state.agents_invoked.append(f"invoice_analyst_{doc_id}")
            print(f"  ✓ Invoice analysis complete ({len(str(response))} chars)")
            
        elif doc_type == "police_report":
            print(f"\n  Analyzing police report {doc_id}...")
            query = f"Analyze police report {doc_id}. Extract key incident details."
            response = report_agent(query)
            state.report_analysis[doc_id] = str(response)
            state.agents_invoked.append(f"report_analyst_{doc_id}")
            print(f"  ✓ Report analysis complete ({len(str(response))} chars)")
    
    # STEP 3: Compliance Check
    print("\n\n🛡️  STEP 3: Compliance Validation")
    print("-"*80)
    compliance_agent = create_compliance_agent()
    state.agents_invoked.append("compliance")
    
    query = "Review all invoices for compliance issues: duplicates, missing fields, amounts >$5000, anomalies."
    response = compliance_agent(query)
    state.compliance_findings = [str(response)]
    print(f"  ✓ Compliance check complete ({len(str(response))} chars)")
    
    # STEP 4: Synthesis
    print("\n\n💡 STEP 4: Executive Synthesis")
    print("-"*80)
    synthesis_agent = create_synthesis_agent()
    state.agents_invoked.append("synthesis")
    
    synthesis_context = f"""Based on document processing results:

DOCUMENTS PROCESSED: {len(document_ids)}
CLASSIFICATIONS: {state.document_classifications}

INVOICE ANALYSIS:
{state.invoice_analysis}

REPORT ANALYSIS:
{state.report_analysis}

COMPLIANCE FINDINGS:
{state.compliance_findings}

Create an executive summary with key findings and recommended actions."""
    
    response = synthesis_agent(synthesis_context)
    state.executive_summary = str(response)
    
    state.execution_time = time.time() - start_time
    
    # Display final results
    print("\n\n" + "="*80)
    print("📝 EXECUTIVE SUMMARY")
    print("="*80)
    print(state.executive_summary)
    
    print("\n" + "="*80)
    print("📊 WORKFLOW SUMMARY")
    print("="*80)
    print(f"Documents Processed: {len(document_ids)}")
    print(f"Agents Invoked: {len(state.agents_invoked)}")
    print(f"Total Execution Time: {state.execution_time:.2f}s")
    print("="*80)
    
    return state

print("✅ Workflow orchestrator ready!")

### Example: Process Multiple Documents

Run the complete workflow to process invoices and police reports.

In [None]:
# Run the workflow on all documents (3 invoices + 1 police report)
document_ids = ["doc_001", "doc_002", "doc_003", "doc_004"]

result = run_document_workflow(document_ids)