## Setup

In [None]:
# Install the SDK if needed
# %pip install docu-devs-api-client pydantic

In [None]:
import os
import json
from pydantic import BaseModel, Field
from docudevs import DocuDevsClient

API_KEY = os.getenv("DOCUDEVS_API_KEY", "your-api-key-here")
client = DocuDevsClient(token=API_KEY)

## The Problem: Long Documents

Long documents (50+ pages) are tough for AI to process in one shot:
- Context windows get overloaded
- The AI loses track of information
- You get incomplete or garbled results

**Map-Reduce** solves this by:
1. Breaking the document into chunks (e.g., 10 pages each)
2. Processing each chunk independently  
3. Combining the results into a clean output

**Real Example**: We'll extract SLA service credit tables from the Azure SLA document - 
a multi-page document where each service has its own credit table scattered throughout.

## Define Your Schema

For this example, we want to extract service credit tables that look like:

| Uptime Percentage | Service Credit |
|-------------------|----------------|
| < 99.9%           | 25%            |
| < 99%             | 50%            |
| < 95%             | 100%           |

Each service in the Azure SLA has its own table like this.

In [None]:
from typing import Optional

class ServiceCreditEntry(BaseModel):
    """A single row in the service credit table."""
    uptime_percentage: str = Field(
        alias="UptimePercentage",
        description="Uptime threshold, e.g. '< 99.9%' or '>= 99.95%'"
    )
    service_credit: str = Field(
        alias="ServiceCredit",
        description="Credit percentage, e.g. '25%'"
    )


class ServiceCredits(BaseModel):
    """The service credits table for a service."""
    entries: list[ServiceCreditEntry] = Field(
        default_factory=list,
        alias="Service Credits"
    )


class ServiceSla(BaseModel):
    """SLA information for a single Azure service."""
    service_name: str = Field(
        alias="Service Name",
        description="Name of the Azure service"
    )
    service_credits: ServiceCredits = Field(default_factory=ServiceCredits)


# Generate the schema for an array of ServiceSla objects
schema_json = json.dumps({
    "type": "array",
    "items": ServiceSla.model_json_schema()
})

print("Schema preview:")
print(json.dumps(ServiceSla.model_json_schema(), indent=2)[:500] + "...")

## Process with Map-Reduce

Key parameters:
- `pages_per_chunk`: How many pages to process at once
- `overlap_pages`: Pages to repeat between chunks (catches data spanning pages)
- `dedup_key`: Which field to use for deduplication (required when overlap > 0)

In [None]:
# Load the Azure SLA document
from pathlib import Path

sla_doc_path = Path("docs/azure-sla.docx")
sla_doc = sla_doc_path.read_bytes()
print(f"Loaded Azure SLA: {len(sla_doc):,} bytes")

In [None]:
# Submit the map-reduce job
job_id = await client.submit_and_process_document_map_reduce(
    document=sla_doc,
    document_mime_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document",
    
    # Row extraction (runs on each chunk)
    prompt="Extract all service SLAs with their service credit tables. Each service has a name and a table showing uptime percentages and corresponding service credits.",
    schema=schema_json,
    
    # Chunking configuration  
    pages_per_chunk=10,    # Process 10 pages at a time
    overlap_pages=1,       # Overlap to catch services spanning pages
    dedup_key="Service Name",  # Use service name to deduplicate
    
    # Use the smaller model for faster processing
    llm="MINI",
)

print(f"Job submitted: {job_id}")

In [None]:
# Wait for results
result = await client.wait_until_ready(job_id, result_format="json", poll_interval=10, timeout=600)

# Parse all extracted SLAs
records = result.get("records", [])
print(f"Extracted {len(records)} chunks of services")

# Flatten all services from all chunks
all_services: list[ServiceSla] = []
for record in records:
    if isinstance(record, list):
        for item in record:
            all_services.append(ServiceSla.model_validate(item))
    elif isinstance(record, dict) and "Service Name" in record:
        all_services.append(ServiceSla.model_validate(record))

print(f"Total services found: {len(all_services)}\n")

# Display a few examples
for sla in all_services[:3]:
    print(f"ðŸ“‹ {sla.service_name}")
    for entry in sla.service_credits.entries[:3]:
        print(f"   {entry.uptime_percentage}: {entry.service_credit} credit")
    if len(sla.service_credits.entries) > 3:
        print(f"   ... and {len(sla.service_credits.entries) - 3} more tiers")
    print()

if len(all_services) > 3:
    print(f"... and {len(all_services) - 3} more services")

## Monitoring Progress

Map-reduce jobs process chunks in parallel, so they can take a while. Here's how to monitor:

In [None]:
import asyncio

async def process_with_progress(document_bytes: bytes, mime_type: str, schema: str, prompt: str):
    """Process a document and show progress updates."""
    
    job_id = await client.submit_and_process_document_map_reduce(
        document=document_bytes,
        document_mime_type=mime_type,
        prompt=prompt,
        schema=schema,
        pages_per_chunk=10,
        overlap_pages=1,
        dedup_key="Service Name",
    )
    
    print(f"Job started: {job_id}")
    
    while True:
        status_response = await client.status(job_id)
        job_status = status_response.parsed
        
        if job_status.status == "COMPLETED":
            print("\nâœ“ Done!")
            break
        elif job_status.status == "ERROR":
            print(f"\nâœ— Error: {job_status.error}")
            break
        
        # Check map-reduce progress if available
        mr_status = getattr(job_status, "map_reduce_status", None)
        if mr_status:
            completed = getattr(mr_status, "completed_chunks", 0)
            total = getattr(mr_status, "total_chunks", 0)
            print(f"\rProgress: {completed}/{total} chunks", end="", flush=True)
        else:
            print(f"\rStatus: {job_status.status}", end="", flush=True)
        
        await asyncio.sleep(2)
    
    return await client.wait_until_ready(job_id, result_format="json")

# Example usage (commented out to avoid re-running):
# result = await process_with_progress(
#     sla_doc,
#     "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
#     schema_json,
#     "Extract all service SLAs with their service credit tables."
# )

## Configuration Options

| Option | What it does | When to use |
|--------|--------------|-------------|
| `pages_per_chunk` | Pages processed together | Larger = more context but slower. 5-10 is a good start. |
| `overlap_pages` | Pages repeated between chunks | Use 1 when data might span page breaks |
| `dedup_key` | Field for deduplication | Required when overlap > 0. Pick a unique field. |
| `llm` | Model to use | `"MINI"` for faster/cheaper, default for best quality |

## Tips for Map-Reduce

1. **Choose the right chunk size**: Start with 5-10 pages. If you're missing context, increase it.

2. **Always use dedup_key with overlap**: Otherwise you'll get duplicate entries from overlapping pages.

3. **Pick a truly unique dedup field**: Service name, ID, or SKU work well. Generic descriptions don't.

4. **Use MINI for large documents**: Faster and cheaper for straightforward extraction tasks.

5. **Test on a sample first**: Try a few pages to validate your schema before processing the whole doc.

## Next Steps

- **[Knowledge Search](03-knowledge-search.ipynb)**: Enrich extractions with your own reference data
- **[Operations](04-operations.ipynb)**: Run error analysis and ask follow-up questions