# Day 2, Session 3: Multi-Document Batch Processor

## Scaling AI with Parallel Processing

Today we move beyond single-document processing to handle enterprise-scale batch workloads. When you need to process hundreds or thousands of invoices, serial processing becomes a bottleneck. We'll use LangGraph's Send API to create dynamic parallel workflows that scale with your workload.

### The Performance Challenge

**Serial Processing:** 10 invoices × 3 seconds each = 30 seconds  
**Parallel Processing:** 10 invoices ÷ 4 workers = ~8 seconds  
**Enterprise Scale:** 1000 invoices in minutes instead of hours

### What We're Building

A dynamic batch processor that:
- Processes multiple invoices simultaneously
- Allocates workers based on current load
- Handles partial failures gracefully
- Provides real-time progress monitoring
- Scales memory usage efficiently

Let's see parallel AI in action!

In [None]:
# Global configuration - Instructor will fill these
OLLAMA_URL = "http://XX.XX.XX.XX"  # Course server IP (port 80)
API_TOKEN = "YOUR_TOKEN_HERE"      # Instructor provides token
MODEL = "qwen3:8b"                  # Default model on server

import requests
import json
import time
import os
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any, TypedDict, Annotated
from dataclasses import dataclass, field
import threading
import concurrent.futures
import psutil
import uuid
from collections import defaultdict

# Install required packages
!pip install -q langgraph langchain-core

from langgraph.graph import StateGraph, END, Send
from langgraph.graph.message import add_messages

# Health check
def check_server_health():
    """Verify server connection"""
    try:
        response = requests.get(f"{OLLAMA_URL}/health")
        if response.status_code == 200:
            data = response.json()
            print(f"✅ Server Status: {data.get('status', 'Unknown')}")
            print(f"📊 Models Available: {data.get('models_count', 0)}")
            return True
    except Exception as e:
        print(f"❌ Server connection failed: {e}")
    return False

# LLM calling function
def call_llm(prompt, model=MODEL):
    """Call the LLM with a prompt"""
    headers = {
        "Authorization": f"Bearer {API_TOKEN}",
        "Content-Type": "application/json"
    }
    
    data = {
        "model": model,
        "prompt": prompt
    }
    
    try:
        response = requests.post(
            f"{OLLAMA_URL}/think",
            headers=headers,
            json=data
        )
        if response.status_code == 200:
            return response.json().get('response', '')
        else:
            return f"Error: {response.status_code}"
    except Exception as e:
        return f"Error: {e}"

def get_memory_usage():
    """Get current memory usage"""
    process = psutil.Process()
    return process.memory_info().rss / 1024 / 1024  # MB

def get_server_metrics():
    """Get server performance metrics"""
    try:
        response = requests.get(f"{OLLAMA_URL}/metrics")
        if response.status_code == 200:
            return response.json()
    except:
        pass
    return {"status": "unavailable"}

print("🚀 Parallel Processing Demo Setup")
print("🔌 Connecting to course server...")
server_available = check_server_health()

print(f"\n💾 Initial Memory Usage: {get_memory_usage():.1f}MB")

In [None]:
# Download invoice dataset
import requests
import zipfile
import io

dropbox_url = "https://www.dropbox.com/scl/fo/m9hyfmvi78snwv0nh34mo/AMEXxwXMLAOeve-_yj12ck8?rlkey=urinkikgiuven0fro7r4x5rcu&st=hv3of7g7&dl=1"

print("📦 Downloading invoice dataset...")
try:
    response = requests.get(dropbox_url)
    with zipfile.ZipFile(io.BytesIO(response.content)) as z:
        z.extractall("invoice_images")
    print("✅ Downloaded invoice dataset")
    
    # Create sample invoice batch for testing
    SAMPLE_INVOICES = [
        {
            "invoice_id": f"INV-2024-{str(i+1).zfill(3)}",
            "vendor": f"Company {chr(65+i)}",
            "amount": 1000 + (i * 500),
            "currency": ["USD", "EUR", "GBP"][i % 3],
            "complexity": ["simple", "medium", "complex"][i % 3],
            "processing_time_estimate": 2 + (i % 3)  # 2-4 seconds
        }
        for i in range(10)
    ]
    
    print(f"📊 Created batch of {len(SAMPLE_INVOICES)} test invoices")
    
    # Show sample
    for i, invoice in enumerate(SAMPLE_INVOICES[:3]):
        print(f"  {i+1}. {invoice['invoice_id']}: {invoice['vendor']} - ${invoice['amount']} {invoice['currency']}")
    print(f"  ... and {len(SAMPLE_INVOICES)-3} more")
    
except Exception as e:
    print(f"❌ Error downloading: {e}")
    SAMPLE_INVOICES = []

## Step 1: Define Parallel State with Reducers

When multiple workers update state simultaneously, we need reducers to merge updates safely.

In [None]:
# Custom reducers for safe parallel state updates
def merge_results(existing: List[Dict], new: List[Dict]) -> List[Dict]:
    """Safely merge processing results from multiple workers"""
    if not existing:
        return new
    if not new:
        return existing
    
    # Create a combined list, avoiding duplicates by invoice_id
    existing_ids = {item.get('invoice_id') for item in existing}
    merged = existing.copy()
    
    for item in new:
        if item.get('invoice_id') not in existing_ids:
            merged.append(item)
    
    return merged

def merge_processing_status(existing: Dict[str, str], new: Dict[str, str]) -> Dict[str, str]:
    """Merge processing status updates"""
    if not existing:
        return new
    if not new:
        return existing
    
    merged = existing.copy()
    merged.update(new)
    return merged

def merge_metrics(existing: Dict[str, Any], new: Dict[str, Any]) -> Dict[str, Any]:
    """Merge performance metrics"""
    if not existing:
        return new
    if not new:
        return existing
    
    merged = existing.copy()
    
    # Add numeric values
    for key in ['total_processed', 'total_errors', 'total_time']:
        merged[key] = existing.get(key, 0) + new.get(key, 0)
    
    # Update other metrics
    for key, value in new.items():
        if key not in ['total_processed', 'total_errors', 'total_time']:
            merged[key] = value
    
    return merged

# Define the batch processing state
class BatchState(TypedDict):
    """State for parallel batch processing"""
    # Input data
    invoice_queue: List[Dict[str, Any]]  # Invoices to process
    
    # Processing status (updated by workers)
    processing: Annotated[Dict[str, str], merge_processing_status]  # Currently processing
    
    # Results (safely merged from parallel workers)
    results: Annotated[List[Dict], merge_results]  # Completed results
    errors: List[str]  # Processing errors
    
    # Performance metrics (aggregated)
    metrics: Annotated[Dict[str, Any], merge_metrics]  # Performance tracking
    
    # Workflow control
    batch_id: str
    started_at: Optional[str]
    completed_at: Optional[str]
    parallelism_level: int

def create_batch_state(invoices: List[Dict], parallelism: int = 4) -> BatchState:
    """Create initial batch processing state"""
    return BatchState(
        invoice_queue=invoices,
        processing={},
        results=[],
        errors=[],
        metrics={
            'total_processed': 0,
            'total_errors': 0,
            'total_time': 0,
            'worker_times': [],
            'memory_usage': []
        },
        batch_id=str(uuid.uuid4())[:8],
        started_at=None,
        completed_at=None,
        parallelism_level=parallelism
    )

print("✅ Parallel state with reducers defined")
print("🔧 Reducers ensure safe concurrent updates")
print("📊 State tracks progress, results, and performance metrics")

# Test reducer functionality
test_results1 = [{'invoice_id': 'INV-001', 'amount': 1000}]
test_results2 = [{'invoice_id': 'INV-002', 'amount': 2000}]
merged = merge_results(test_results1, test_results2)
print(f"\n🧪 Reducer test: {len(merged)} results merged")

## Step 2: Implement Send API for Dynamic Parallelism

The Send API allows us to create workers dynamically based on the workload.

In [None]:
def dispatcher(state: BatchState) -> List[Send]:
    """Use Send API to create parallel workers for invoice processing"""
    print(f"📤 Dispatcher starting for batch {state['batch_id']}")
    print(f"   Invoices to process: {len(state['invoice_queue'])}")
    print(f"   Parallelism level: {state['parallelism_level']}")
    
    # Record start time
    state['started_at'] = datetime.now().isoformat()
    
    # Create Send objects for parallel processing
    sends = []
    
    # Group invoices into chunks for workers
    chunk_size = max(1, len(state['invoice_queue']) // state['parallelism_level'])
    
    for i in range(0, len(state['invoice_queue']), chunk_size):
        chunk = state['invoice_queue'][i:i + chunk_size]
        worker_id = f"worker_{len(sends)+1}"
        
        # Create a Send for this chunk
        worker_state = {
            'worker_id': worker_id,
            'invoices': chunk,
            'batch_id': state['batch_id'],
            'worker_start_time': time.time()
        }
        
        sends.append(Send("process_invoice_worker", worker_state))
        
        print(f"   📋 {worker_id}: {len(chunk)} invoices assigned")
    
    print(f"✅ Dispatched {len(sends)} workers")
    return sends

def process_invoice_worker(worker_state: Dict[str, Any]) -> BatchState:
    """Worker node that processes a chunk of invoices"""
    worker_id = worker_state['worker_id']
    invoices = worker_state['invoices']
    batch_id = worker_state['batch_id']
    start_time = worker_state['worker_start_time']
    
    print(f"\n👷 {worker_id} starting ({len(invoices)} invoices)")
    
    # Initialize worker results
    worker_results = []
    worker_processing = {}
    worker_errors = []
    
    # Process each invoice in this worker's chunk
    for i, invoice in enumerate(invoices):
        invoice_id = invoice['invoice_id']
        
        try:
            # Mark as processing
            worker_processing[invoice_id] = f"processing_by_{worker_id}"
            
            print(f"   📄 {worker_id} processing {invoice_id}...")
            
            # Simulate processing time based on complexity
            processing_time = invoice.get('processing_time_estimate', 2)
            time.sleep(processing_time * 0.3)  # Scale down for demo
            
            # Simulate LLM processing (or use real LLM if available)
            if server_available:
                prompt = f"Extract key information from invoice {invoice_id} for {invoice['vendor']}. Amount: ${invoice['amount']} {invoice['currency']}"
                llm_response = call_llm(prompt)
                extracted_data = llm_response[:100] + "..." if len(llm_response) > 100 else llm_response
            else:
                # Mock extraction
                extracted_data = f"Vendor: {invoice['vendor']}, Amount: ${invoice['amount']}, Currency: {invoice['currency']}"
            
            # Create result
            result = {
                'invoice_id': invoice_id,
                'status': 'completed',
                'processed_by': worker_id,
                'processing_time': processing_time,
                'extracted_data': extracted_data,
                'original_invoice': invoice,
                'completed_at': datetime.now().isoformat()
            }
            
            worker_results.append(result)
            
            # Remove from processing
            del worker_processing[invoice_id]
            
            print(f"   ✅ {worker_id} completed {invoice_id}")
            
        except Exception as e:
            error_msg = f"{worker_id} failed to process {invoice_id}: {str(e)}"
            worker_errors.append(error_msg)
            print(f"   ❌ {error_msg}")
            
            # Remove from processing
            if invoice_id in worker_processing:
                del worker_processing[invoice_id]
    
    # Calculate worker metrics
    worker_time = time.time() - start_time
    memory_usage = get_memory_usage()
    
    worker_metrics = {
        'total_processed': len(worker_results),
        'total_errors': len(worker_errors),
        'total_time': worker_time,
        'worker_times': [worker_time],
        'memory_usage': [memory_usage],
        f'{worker_id}_completed': len(worker_results)
    }
    
    print(f"✅ {worker_id} finished: {len(worker_results)} completed, {len(worker_errors)} errors in {worker_time:.1f}s")
    
    # Return state update (will be merged by reducers)
    return BatchState(
        invoice_queue=[],  # Empty - already processed
        processing=worker_processing,
        results=worker_results,
        errors=worker_errors,
        metrics=worker_metrics,
        batch_id=batch_id,
        started_at=None,
        completed_at=None,
        parallelism_level=0
    )

def aggregator(state: BatchState) -> BatchState:
    """Aggregate results from all workers"""
    print(f"\n📊 Aggregating results for batch {state['batch_id']}")
    
    # Mark completion time
    state['completed_at'] = datetime.now().isoformat()
    
    # Calculate final metrics
    total_invoices = len(state['invoice_queue']) if state['invoice_queue'] else state['metrics']['total_processed']
    success_count = len(state['results'])
    error_count = len(state['errors'])
    
    # Calculate timing metrics
    if state['started_at'] and state['completed_at']:
        start_time = datetime.fromisoformat(state['started_at'])
        end_time = datetime.fromisoformat(state['completed_at'])
        total_wall_time = (end_time - start_time).total_seconds()
    else:
        total_wall_time = state['metrics']['total_time']
    
    # Update metrics with final calculations
    state['metrics'].update({
        'total_wall_time': total_wall_time,
        'success_rate': success_count / max(1, total_invoices),
        'average_time_per_invoice': total_wall_time / max(1, total_invoices),
        'throughput_per_second': total_invoices / max(1, total_wall_time)
    })
    
    print(f"✅ Batch {state['batch_id']} completed:")
    print(f"   📈 Processed: {success_count}/{total_invoices} invoices")
    print(f"   ⏱️ Total time: {total_wall_time:.1f}s")
    print(f"   🚀 Throughput: {state['metrics']['throughput_per_second']:.1f} invoices/second")
    print(f"   📊 Success rate: {state['metrics']['success_rate']:.1%}")
    
    return state

print("✅ Send API and worker nodes implemented")
print("📤 Dispatcher creates dynamic workers")
print("👷 Workers process invoice chunks in parallel")
print("📊 Aggregator combines results with performance metrics")

## Step 3: Build Parallel Processing Graph

Now we'll assemble the complete parallel processing workflow.

In [None]:
# Build the parallel processing graph
print("🏗️ Building parallel processing workflow...")

# Create the graph
parallel_workflow = StateGraph(BatchState)

# Add nodes
parallel_workflow.add_node("dispatcher", dispatcher)
parallel_workflow.add_node("process_invoice_worker", process_invoice_worker)
parallel_workflow.add_node("aggregator", aggregator)

# Set entry point
parallel_workflow.set_entry_point("dispatcher")

# Add edges
# Dispatcher uses Send API to create parallel workers
# Workers automatically route to aggregator
parallel_workflow.add_edge("process_invoice_worker", "aggregator")
parallel_workflow.add_edge("aggregator", END)

# Compile the workflow
try:
    parallel_app = parallel_workflow.compile()
    print("✅ Parallel workflow compiled successfully!")
except Exception as e:
    print(f"❌ Error compiling workflow: {e}")

# Visualize the workflow structure
print("\n📊 Parallel Processing Workflow:")
print("┌─────────────────┐")
print("│   Dispatcher    │")
print("│  (Send API)     │")
print("└─────────┬───────┘")
print("          │")
print("    ┌─────┼─────┐")
print("    ▼     ▼     ▼")
print("┌────────┐ ┌────────┐ ┌────────┐")
print("│Worker 1│ │Worker 2│ │Worker N│")
print("│Invoice │ │Invoice │ │Invoice │")
print("│Chunk 1 │ │Chunk 2 │ │Chunk N │")
print("└────────┘ └────────┘ └────────┘")
print("    │        │        │")
print("    └────────┼────────┘")
print("             ▼")
print("    ┌─────────────────┐")
print("    │   Aggregator    │")
print("    │ (Merge Results) │")
print("    └─────────────────┘")
print("             │")
print("            END")

## Step 4: Performance Comparison - Serial vs Parallel

Let's demonstrate the dramatic performance improvements from parallel processing.

In [None]:
def simulate_serial_processing(invoices: List[Dict]) -> Dict[str, Any]:
    """Simulate serial processing for comparison"""
    print("\n🔄 Running serial processing simulation...")
    
    start_time = time.time()
    results = []
    
    for i, invoice in enumerate(invoices):
        print(f"   Processing {i+1}/{len(invoices)}: {invoice['invoice_id']}")
        
        # Simulate processing time
        processing_time = invoice.get('processing_time_estimate', 2)
        time.sleep(processing_time * 0.3)  # Scale down for demo
        
        results.append({
            'invoice_id': invoice['invoice_id'],
            'status': 'completed',
            'processing_time': processing_time
        })
    
    total_time = time.time() - start_time
    
    return {
        'results': results,
        'total_time': total_time,
        'throughput': len(invoices) / total_time
    }

# Performance comparison
print("⚡ PERFORMANCE COMPARISON")
print("=" * 50)

if SAMPLE_INVOICES:
    # Test with different batch sizes
    test_sizes = [5, 10] if len(SAMPLE_INVOICES) >= 10 else [len(SAMPLE_INVOICES)]
    
    for batch_size in test_sizes:
        test_batch = SAMPLE_INVOICES[:batch_size]
        
        print(f"\n📊 Testing with {batch_size} invoices:")
        print("-" * 40)
        
        # Serial processing
        serial_result = simulate_serial_processing(test_batch)
        serial_time = serial_result['total_time']
        
        print(f"📈 Serial Results:")
        print(f"   Time: {serial_time:.1f}s")
        print(f"   Throughput: {serial_result['throughput']:.1f} invoices/sec")
        
        # Parallel processing with different parallelism levels
        parallelism_levels = [2, 4] if batch_size >= 4 else [2]
        
        for parallelism in parallelism_levels:
            if parallelism <= batch_size:
                print(f"\n🚀 Parallel Processing ({parallelism} workers):")
                
                # Create batch state
                batch_state = create_batch_state(test_batch, parallelism)
                
                # Track memory before
                memory_before = get_memory_usage()
                
                # Run parallel processing
                start_time = time.time()
                try:
                    if 'parallel_app' in globals():
                        result = parallel_app.invoke(batch_state)
                        
                        # Track memory after
                        memory_after = get_memory_usage()
                        
                        # Display results
                        parallel_time = result['metrics']['total_wall_time']
                        speedup = serial_time / parallel_time if parallel_time > 0 else 0
                        
                        print(f"   Time: {parallel_time:.1f}s")
                        print(f"   Speedup: {speedup:.1f}x faster than serial")
                        print(f"   Throughput: {result['metrics']['throughput_per_second']:.1f} invoices/sec")
                        print(f"   Success rate: {result['metrics']['success_rate']:.1%}")
                        print(f"   Memory increase: {memory_after - memory_before:.1f}MB")
                        
                        # Show worker distribution
                        worker_results = {}
                        for res in result['results']:
                            worker = res['processed_by']
                            worker_results[worker] = worker_results.get(worker, 0) + 1
                        
                        print(f"   Worker distribution: {dict(worker_results)}")
                    else:
                        print("   ❌ Parallel workflow not compiled")
                        
                except Exception as e:
                    print(f"   ❌ Parallel processing failed: {e}")

    # Performance summary
    print(f"\n🎯 KEY INSIGHTS:")
    print(f"   • Parallel processing can achieve 2-4x speedup")
    print(f"   • Memory usage increases with worker count")
    print(f"   • Optimal parallelism depends on workload and resources")
    print(f"   • Diminishing returns beyond optimal worker count")

else:
    print("⚠️ No sample invoices available for testing")

## Step 5: Memory Monitoring and Resource Management

Parallel processing trades memory for speed. Let's monitor resource usage.

In [None]:
class ResourceMonitor:
    """Monitor system resources during parallel processing"""
    
    def __init__(self):
        self.measurements = []
        self.monitoring = False
        self.monitor_thread = None
    
    def start_monitoring(self, interval=0.5):
        """Start resource monitoring"""
        self.monitoring = True
        self.measurements = []
        
        def monitor_loop():
            while self.monitoring:
                measurement = {
                    'timestamp': time.time(),
                    'memory_mb': get_memory_usage(),
                    'cpu_percent': psutil.cpu_percent(),
                }
                
                # Add server metrics if available
                server_metrics = get_server_metrics()
                if server_metrics.get('status') != 'unavailable':
                    gpu_info = server_metrics.get('gpu', {})
                    measurement['gpu_memory_mb'] = gpu_info.get('memory_used', 0)
                    measurement['gpu_utilization'] = gpu_info.get('utilization', 0)
                
                self.measurements.append(measurement)
                time.sleep(interval)
        
        self.monitor_thread = threading.Thread(target=monitor_loop)
        self.monitor_thread.daemon = True
        self.monitor_thread.start()
    
    def stop_monitoring(self):
        """Stop resource monitoring"""
        self.monitoring = False
        if self.monitor_thread:
            self.monitor_thread.join(timeout=1)
    
    def get_summary(self) -> Dict[str, Any]:
        """Get monitoring summary"""
        if not self.measurements:
            return {'status': 'no_data'}
        
        memory_values = [m['memory_mb'] for m in self.measurements]
        cpu_values = [m['cpu_percent'] for m in self.measurements]
        
        summary = {
            'duration': self.measurements[-1]['timestamp'] - self.measurements[0]['timestamp'],
            'memory': {
                'min': min(memory_values),
                'max': max(memory_values),
                'avg': sum(memory_values) / len(memory_values),
                'peak_increase': max(memory_values) - min(memory_values)
            },
            'cpu': {
                'min': min(cpu_values),
                'max': max(cpu_values),
                'avg': sum(cpu_values) / len(cpu_values)
            }
        }
        
        # Add GPU metrics if available
        gpu_memory_values = [m.get('gpu_memory_mb', 0) for m in self.measurements]
        if any(gpu_memory_values):
            summary['gpu_memory'] = {
                'min': min(gpu_memory_values),
                'max': max(gpu_memory_values),
                'avg': sum(gpu_memory_values) / len(gpu_memory_values)
            }
        
        return summary

# Demonstrate resource monitoring
print("💾 RESOURCE MONITORING DEMONSTRATION")
print("=" * 50)

if SAMPLE_INVOICES and len(SAMPLE_INVOICES) >= 5:
    # Test with resource monitoring
    test_batch = SAMPLE_INVOICES[:5]
    
    print("\n📊 Monitoring resource usage during parallel processing...")
    
    # Start monitoring
    monitor = ResourceMonitor()
    monitor.start_monitoring(interval=0.2)
    
    try:
        # Run parallel processing
        batch_state = create_batch_state(test_batch, parallelism=3)
        
        if 'parallel_app' in globals():
            result = parallel_app.invoke(batch_state)
            
            # Stop monitoring
            monitor.stop_monitoring()
            
            # Get monitoring summary
            resource_summary = monitor.get_summary()
            
            print(f"\n📈 Resource Usage Summary:")
            print(f"   Duration: {resource_summary['duration']:.1f}s")
            
            memory_stats = resource_summary['memory']
            print(f"   Memory: {memory_stats['min']:.1f} - {memory_stats['max']:.1f}MB (avg: {memory_stats['avg']:.1f}MB)")
            print(f"   Peak memory increase: {memory_stats['peak_increase']:.1f}MB")
            
            cpu_stats = resource_summary['cpu']
            print(f"   CPU: {cpu_stats['min']:.1f}% - {cpu_stats['max']:.1f}% (avg: {cpu_stats['avg']:.1f}%)")
            
            if 'gpu_memory' in resource_summary:
                gpu_stats = resource_summary['gpu_memory']
                print(f"   GPU Memory: {gpu_stats['min']:.1f} - {gpu_stats['max']:.1f}MB (avg: {gpu_stats['avg']:.1f}MB)")
            
            # Performance insights
            print(f"\n🔍 Performance Insights:")
            processing_efficiency = result['metrics']['success_rate'] * result['metrics']['throughput_per_second']
            memory_efficiency = processing_efficiency / memory_stats['peak_increase'] if memory_stats['peak_increase'] > 0 else 0
            
            print(f"   Processing efficiency: {processing_efficiency:.2f}")
            print(f"   Memory efficiency: {memory_efficiency:.3f} (higher is better)")
            print(f"   Resource utilization: {'High' if cpu_stats['avg'] > 50 else 'Moderate' if cpu_stats['avg'] > 20 else 'Low'}")
        
        else:
            monitor.stop_monitoring()
            print("❌ Parallel workflow not available")
            
    except Exception as e:
        monitor.stop_monitoring()
        print(f"❌ Monitoring test failed: {e}")
else:
    print("⚠️ Insufficient sample data for monitoring test")

# Resource optimization recommendations
print(f"\n💡 OPTIMIZATION RECOMMENDATIONS:")
print(f"   🔧 Memory Management:")
print(f"      • Monitor peak memory usage vs available RAM")
print(f"      • Reduce parallelism if memory becomes constrained")
print(f"      • Use batch processing for very large workloads")
print(f"   ⚡ Performance Tuning:")
print(f"      • Optimal worker count ≈ CPU cores for CPU-bound tasks")
print(f"      • For I/O-bound tasks, can exceed CPU core count")
print(f"      • Monitor GPU memory if using GPU-accelerated models")
print(f"   📊 Monitoring Strategy:")
print(f"      • Track memory, CPU, and GPU utilization")
print(f"      • Set alerts for resource exhaustion")
print(f"      • Adjust parallelism based on real-world performance")

## Step 6: Handling Partial Failures

In parallel processing, some workers may fail while others succeed. Let's demonstrate resilient handling.

In [None]:
def unreliable_worker(worker_state: Dict[str, Any]) -> BatchState:
    """Simulated unreliable worker that fails sometimes"""
    worker_id = worker_state['worker_id']
    invoices = worker_state['invoices']
    batch_id = worker_state['batch_id']
    start_time = worker_state['worker_start_time']
    
    print(f"\n⚠️ {worker_id} starting (unreliable mode)...")
    
    # Simulate worker failure 30% of the time
    import random
    if random.random() < 0.3:
        error_msg = f"{worker_id} failed due to simulated system error"
        print(f"   ❌ {error_msg}")
        
        # Return partial failure state
        return BatchState(
            invoice_queue=[],
            processing={},
            results=[],
            errors=[error_msg],
            metrics={'total_errors': len(invoices), 'total_time': time.time() - start_time},
            batch_id=batch_id,
            started_at=None,
            completed_at=None,
            parallelism_level=0
        )
    
    # Otherwise, process normally but with some individual failures
    worker_results = []
    worker_errors = []
    
    for invoice in invoices:
        invoice_id = invoice['invoice_id']
        
        # Simulate individual invoice failures 20% of the time
        if random.random() < 0.2:
            error_msg = f"Failed to process {invoice_id} - simulated error"
            worker_errors.append(error_msg)
            print(f"   ❌ {worker_id}: {error_msg}")
        else:
            # Process successfully
            time.sleep(0.1)  # Simulate work
            
            result = {
                'invoice_id': invoice_id,
                'status': 'completed',
                'processed_by': worker_id,
                'extracted_data': f"Successfully processed {invoice_id}",
                'original_invoice': invoice
            }
            
            worker_results.append(result)
            print(f"   ✅ {worker_id}: completed {invoice_id}")
    
    worker_time = time.time() - start_time
    
    return BatchState(
        invoice_queue=[],
        processing={},
        results=worker_results,
        errors=worker_errors,
        metrics={
            'total_processed': len(worker_results),
            'total_errors': len(worker_errors),
            'total_time': worker_time
        },
        batch_id=batch_id,
        started_at=None,
        completed_at=None,
        parallelism_level=0
    )

def resilient_aggregator(state: BatchState) -> BatchState:
    """Aggregator that handles partial failures gracefully"""
    print(f"\n🛡️ Resilient aggregator processing batch {state['batch_id']}")
    
    # Mark completion
    state['completed_at'] = datetime.now().isoformat()
    
    # Analyze results
    total_expected = len(state['invoice_queue']) if state['invoice_queue'] else state['metrics']['total_processed'] + len(state['errors'])
    successful = len(state['results'])
    failed = len(state['errors'])
    
    # Calculate metrics
    success_rate = successful / max(1, total_expected)
    
    # Determine if partial success is acceptable
    if success_rate >= 0.7:  # 70% success threshold
        overall_status = "SUCCESS_WITH_PARTIAL_FAILURES"
        recommendation = "Proceed with successful results, retry failed items"
    elif success_rate >= 0.3:  # 30% success threshold
        overall_status = "PARTIAL_SUCCESS"
        recommendation = "Review failures, may need system adjustments"
    else:
        overall_status = "MOSTLY_FAILED"
        recommendation = "Investigate system issues before retrying"
    
    # Update state with resilience info
    state['metrics'].update({
        'total_expected': total_expected,
        'successful_count': successful,
        'failed_count': failed,
        'success_rate': success_rate,
        'overall_status': overall_status,
        'recommendation': recommendation
    })
    
    print(f"📊 Resilience Analysis:")
    print(f"   Expected: {total_expected} invoices")
    print(f"   Successful: {successful} ({success_rate:.1%})")
    print(f"   Failed: {failed}")
    print(f"   Status: {overall_status}")
    print(f"   Recommendation: {recommendation}")
    
    return state

# Build resilient workflow
print("🛡️ PARTIAL FAILURE HANDLING DEMONSTRATION")
print("=" * 50)

# Create workflow with unreliable workers
resilient_workflow = StateGraph(BatchState)
resilient_workflow.add_node("dispatcher", dispatcher)
resilient_workflow.add_node("unreliable_worker", unreliable_worker)
resilient_workflow.add_node("resilient_aggregator", resilient_aggregator)

resilient_workflow.set_entry_point("dispatcher")
resilient_workflow.add_edge("unreliable_worker", "resilient_aggregator")
resilient_workflow.add_edge("resilient_aggregator", END)

try:
    resilient_app = resilient_workflow.compile()
    print("✅ Resilient workflow compiled")
    
    if SAMPLE_INVOICES and len(SAMPLE_INVOICES) >= 6:
        print("\n🧪 Testing failure resilience...")
        
        # Run multiple tests to show different failure scenarios
        for test_run in range(3):
            print(f"\n--- Test Run {test_run + 1} ---")
            
            test_batch = SAMPLE_INVOICES[:6]
            batch_state = create_batch_state(test_batch, parallelism=3)
            
            try:
                result = resilient_app.invoke(batch_state)
                
                metrics = result['metrics']
                print(f"✅ Test {test_run + 1} Results:")
                print(f"   Status: {metrics['overall_status']}")
                print(f"   Success rate: {metrics['success_rate']:.1%}")
                print(f"   Successful: {metrics['successful_count']}/{metrics['total_expected']}")
                
                if result['errors']:
                    print(f"   Errors encountered: {len(result['errors'])}")
                    for error in result['errors'][:2]:  # Show first 2 errors
                        print(f"     • {error}")
                
            except Exception as e:
                print(f"   ❌ Test {test_run + 1} failed: {e}")
                
except Exception as e:
    print(f"❌ Error compiling resilient workflow: {e}")

# Failure handling best practices
print(f"\n💡 FAILURE HANDLING BEST PRACTICES:")
print(f"   🔄 Retry Strategy:")
print(f"      • Separate failed items for retry")
print(f"      • Use exponential backoff for retries")
print(f"      • Set maximum retry limits")
print(f"   📊 Monitoring:")
print(f"      • Track success rates by worker and batch")
print(f"      • Alert on success rates below thresholds")
print(f"      • Log detailed error information")
print(f"   🛡️ Graceful Degradation:")
print(f"      • Process successful results even with partial failures")
print(f"      • Provide clear status to downstream systems")
print(f"      • Enable manual intervention for critical failures")

## Key Learnings

### What We Demonstrated:

1. **Dynamic Parallel Processing with Send API**
   - Created workers dynamically based on workload size
   - Distributed invoice processing across multiple workers
   - Achieved 2-4x performance improvements over serial processing

2. **Safe Concurrent State Management**
   - Used state reducers to merge updates from parallel workers
   - Prevented race conditions and data conflicts
   - Maintained data consistency across parallel execution

3. **Resource-Aware Scaling**
   - Monitored memory and CPU usage during parallel processing
   - Demonstrated trade-offs between speed and resource consumption
   - Showed how to find optimal parallelism levels

4. **Production-Ready Resilience**
   - Handled partial worker failures gracefully
   - Processed successful results even when some workers failed
   - Provided actionable recommendations based on success rates

### Performance Insights:

**Serial Processing:**
- Predictable resource usage
- Linear scaling with workload size
- Simple error handling

**Parallel Processing:**
- 2-4x faster execution for batch workloads
- Higher memory usage (one model instance per worker)
- More complex state management requirements

**Optimal Parallelism:**
- Sweet spot typically 2-4x CPU cores for I/O-bound tasks
- Memory constraints may limit maximum parallelism
- Diminishing returns beyond optimal worker count

### Production Considerations:

**When to Use Parallel Processing:**
- Large batch workloads (>10 documents)
- I/O-bound processing (API calls, file operations)
- Independent document processing tasks

**When to Use Serial Processing:**
- Small batches (<5 documents)
- Memory-constrained environments
- Tasks requiring strict ordering

**Monitoring Requirements:**
- Track resource utilization (CPU, memory, GPU)
- Monitor success rates and error patterns
- Measure throughput and latency metrics
- Set alerts for resource exhaustion

### LangGraph Send API Benefits:

- **Dynamic Worker Creation:** Create workers based on actual workload
- **Automatic Load Distribution:** Evenly distribute work across workers
- **State Management:** Built-in support for concurrent state updates
- **Error Isolation:** Worker failures don't crash entire batch

### Next Steps:

This parallel processing foundation enables:
- Enterprise-scale document processing pipelines
- Real-time batch processing systems
- Scalable AI-powered workflows
- Production-ready invoice processing services

The combination of LangGraph's Send API with proper state management and resource monitoring creates a robust foundation for scaling AI applications to handle real-world enterprise workloads efficiently and reliably.