# ParallelLLMManager Comprehensive Demo

This notebook demonstrates the **ParallelLLMManager** - an advanced solution for parallel processing of AWS Bedrock requests across multiple models and regions. We'll showcase concurrent execution, load balancing, fault tolerance, and performance optimization.

## Key Features

- üöÄ **Parallel Processing**: Execute multiple requests concurrently across regions
- üåç **Multi-Region Support**: Intelligent load distribution across AWS regions  
- üîÑ **Load Balancing**: Round-robin, random, and least-loaded strategies
- üõ°Ô∏è **Fault Tolerance**: Multiple failure handling strategies
- üìä **Performance Analytics**: Detailed execution statistics and metrics
- üñºÔ∏è **Multimodal Support**: Parallel processing of text, image, and video content
- ‚öôÔ∏è **Flexible Configuration**: Customizable concurrency and timeout settings
- üîß **MessageBuilder Integration**: Clean, fluent interface for creating requests

## Setup and Imports

In [1]:
import sys
import json
import time
import asyncio
from pathlib import Path
import logging
from datetime import datetime
from typing import List, Dict, Any
import random

# Add the src directory to path for imports
sys.path.append(str(Path.cwd().parent / "src"))

# Import ParallelLLMManager and related classes
from bestehorn_llmmanager import ParallelLLMManager
from bestehorn_llmmanager.bedrock.models.parallel_structures import (
    BedrockConverseRequest, 
    ParallelProcessingConfig,
    FailureHandlingStrategy,
    LoadBalancingStrategy
)
from bestehorn_llmmanager.bedrock.models.llm_manager_structures import (
    AuthConfig, 
    RetryConfig, 
    AuthenticationType, 
    RetryStrategy
)
from bestehorn_llmmanager.bedrock.exceptions.parallel_exceptions import (
    ParallelProcessingError, 
    ParallelExecutionError, 
    ParallelConfigurationError
)

# Import MessageBuilder components
from bestehorn_llmmanager import create_user_message, create_assistant_message, create_message
from bestehorn_llmmanager import RolesEnum, ImageFormatEnum, DocumentFormatEnum, VideoFormatEnum

# Configure logging for better visibility
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

print("‚úÖ Imports successful!")
print(f"üìÅ Working directory: {Path.cwd()}")
print("üîß MessageBuilder integration enabled for clean request creation!")

‚úÖ Imports successful!
üìÅ Working directory: d:\Users\bestem\Code Workspace\LLMManager\notebooks
üîß MessageBuilder integration enabled for clean request creation!


## Request Creation Functions

Using the MessageBuilder system to create requests cleanly and avoid serialization bugs.

In [2]:
def create_text_request_with_builder(text: str, inference_config: Dict[str, Any] = None) -> BedrockConverseRequest:
    """Create a text-based BedrockConverseRequest using MessageBuilder."""
    message = create_user_message() \
        .add_text(text) \
        .build()
    
    return BedrockConverseRequest(
        messages=[message],
        inference_config=inference_config or {"maxTokens": 500, "temperature": 0.7}
    )

def create_image_request_with_builder(text: str, image_path: str, inference_config: Dict[str, Any] = None) -> BedrockConverseRequest:
    """Create an image analysis BedrockConverseRequest using MessageBuilder."""
    try:
        message = create_user_message() \
            .add_text(text) \
            .add_local_image(path_to_local_file=image_path, max_size_mb=5.0) \
            .build()
        
        return BedrockConverseRequest(
            messages=[message],
            inference_config=inference_config or {"maxTokens": 800, "temperature": 0.3}
        )
    except Exception as e:
        print(f"‚ùå Error creating image request: {e}")
        # Fall back to text-only request
        return create_text_request_with_builder(f"{text} (Note: Image could not be loaded from {image_path})")

def display_parallel_response(response, title="Parallel Response"):
    """Display a formatted parallel response."""
    print(f"\n{title}")
    print("=" * len(title))
    
    # Overall statistics
    print(f"‚úÖ Overall Success: {response.success}")
    print(f"üìä Success Rate: {response.get_success_rate():.1f}%")
    print(f"‚è±Ô∏è  Total Duration: {response.total_duration_ms:.2f}ms")
    print(f"üî¢ Total Requests: {len(response.request_responses)}")
    print(f"‚úÖ Successful Requests: {len(response.get_successful_responses())}")
    print(f"‚ùå Failed Requests: {len(response.get_failed_responses())}")
    
    # Execution statistics
    if response.parallel_execution_stats:
        stats = response.parallel_execution_stats
        print(f"\nüìà Execution Statistics:")
        print(f"   ‚ö° Avg Duration: {stats.average_request_duration_ms:.2f}ms")
        print(f"   üìä Max Duration: {stats.max_request_duration_ms:.2f}ms")
        print(f"   ‚ö° Min Duration: {stats.min_request_duration_ms:.2f}ms")
        print(f"   üîÄ Concurrent Executions: {stats.concurrent_executions}")
        
        if stats.region_distribution:
            print(f"   üåç Region Distribution:")
            for region, count in stats.region_distribution.items():
                print(f"      {region}: {count} requests")
    
    # Token usage
    total_tokens = response.get_total_tokens_used()
    if any(total_tokens.values()):
        print(f"\nüéØ Total Token Usage:")
        for key, value in total_tokens.items():
            if value > 0:
                print(f"   {key}: {value:,}")
    
    # Average latency
    avg_latency = response.get_average_latency()
    if avg_latency:
        print(f"\n‚ö° Average API Latency: {avg_latency:.2f}ms")
    
    # Warnings
    if response.warnings:
        print(f"\n‚ö†Ô∏è  Warnings ({len(response.warnings)}):")
        for warning in response.warnings[:3]:  # Show first 3 warnings
            print(f"   - {warning}")
        if len(response.warnings) > 3:
            print(f"   ... and {len(response.warnings) - 3} more")

def display_individual_responses(response, max_responses=3):
    """Display individual responses from the parallel execution."""
    successful_responses = response.get_successful_responses()
    
    print(f"\nüí¨ Sample Individual Responses (showing up to {max_responses}):")
    print("-" * 50)
    
    count = 0
    for req_id, bedrock_response in successful_responses.items():
        if count >= max_responses:
            break
            
        print(f"\nüÜî Request ID: {req_id}")
        print(f"ü§ñ Model: {bedrock_response.model_used}")
        print(f"üåç Region: {bedrock_response.region_used}")
        print(f"‚è±Ô∏è  Duration: {bedrock_response.total_duration_ms:.2f}ms")
        
        content = bedrock_response.get_content()
        if content:
            # Truncate long responses
            if len(content) > 200:
                content = content[:200] + "..."
            print(f"üí≠ Response: {content}")
        
        count += 1
    
    if len(successful_responses) > max_responses:
        print(f"\n... and {len(successful_responses) - max_responses} more successful responses")

print("‚úÖ MessageBuilder-based helper functions defined!")

‚úÖ MessageBuilder-based helper functions defined!


## Initialize the ParallelLLMManager

We'll create a ParallelLLMManager instance with multiple models and regions for optimal parallel processing.

In [None]:
print("üöÄ Initializing ParallelLLMManager...")

# Define models for parallel processing
models = [
    "Claude Sonnet 4.5",
    "Claude Haiku 4.5",
    "Nova Pro",             # Multimodal capabilities
    "Nova Lite",            # Backup option
]

# Define regions for load distribution
regions = [
    "us-east-1",
    "us-west-2", 
    "eu-west-1"
]

# Configure authentication
auth_config = AuthConfig(
    auth_type=AuthenticationType.PROFILE,
    profile_name="default"
)

# Configure retry behavior
retry_config = RetryConfig(
    max_retries=2,
    retry_strategy=RetryStrategy.REGION_FIRST
)

# Configure parallel processing behavior
parallel_config = ParallelProcessingConfig(
    max_concurrent_requests=8,
    request_timeout_seconds=60,
    failure_handling_strategy=FailureHandlingStrategy.CONTINUE_ON_FAILURE,
    load_balancing_strategy=LoadBalancingStrategy.ROUND_ROBIN,
    failure_threshold=0.3  # 30% failure threshold
)

try:
    # Initialize the ParallelLLMManager
    parallel_manager = ParallelLLMManager(
        models=models,
        regions=regions,
        auth_config=auth_config,
        retry_config=retry_config,
        parallel_config=parallel_config,
        timeout=30,
        force_download=True
    )
    
    print(f"‚úÖ ParallelLLMManager initialized successfully!")
    print(f"   ü§ñ Models: {len(parallel_manager.get_available_models())}")
    print(f"   üåç Regions: {len(parallel_manager.get_available_regions())}")
    print(f"   üîÄ Max Concurrent: {parallel_config.max_concurrent_requests}")
    print(f"   ‚öñÔ∏è  Load Balancing: {parallel_config.load_balancing_strategy.value}")
    print(f"\n{parallel_manager}")
    
    # Validate configuration
    validation = parallel_manager.validate_configuration()
    print(f"\nüîç Configuration Validation:")
    print(f"   Valid: {'‚úÖ' if validation['valid'] else '‚ùå'} {validation['valid']}")
    print(f"   Auth Status: {validation['auth_status']}")
    print(f"   Model-Region Combinations: {validation['model_region_combinations']}")
    print(f"   Parallel Config Valid: {'‚úÖ' if validation['parallel_config_valid'] else '‚ùå'}")
    
    if validation['warnings'] or validation.get('parallel_warnings', []):
        all_warnings = validation['warnings'] + validation.get('parallel_warnings', [])
        print(f"   ‚ö†Ô∏è  Warnings: {len(all_warnings)}")
        for warning in all_warnings[:2]:
            print(f"      - {warning}")
    
    if validation['errors'] or validation.get('parallel_errors', []):
        all_errors = validation['errors'] + validation.get('parallel_errors', [])
        print(f"   ‚ùå Errors: {len(all_errors)}")
        for error in all_errors:
            print(f"      - {error}")

except Exception as e:
    print(f"‚ùå Error initializing ParallelLLMManager: {e}")
    print("\nüí° Troubleshooting tips:")
    print("   1. Ensure AWS credentials are configured")
    print("   2. Check access to specified models and regions")
    print("   3. Verify network connectivity to AWS")
    raise



üöÄ Initializing ParallelLLMManager...




‚úÖ ParallelLLMManager initialized successfully!
   ü§ñ Models: 4
   üåç Regions: 3
   üîÄ Max Concurrent: 8
   ‚öñÔ∏è  Load Balancing: round_robin

ParallelLLMManager(models=4, regions=3, max_concurrent=8, strategy=round_robin)

üîç Configuration Validation:
   Valid: ‚úÖ True
   Auth Status: profile
   Model-Region Combinations: 6
   Parallel Config Valid: ‚úÖ
      - High concurrency (8) compared to available regions (3)


## Example 1: Basic Parallel Text Processing üí¨

Let's start with parallel processing of multiple text requests using the MessageBuilder system.

In [4]:
print("üí¨ Example 1: Basic Parallel Text Processing (MessageBuilder)")
print("=" * 58)

# Create multiple text requests using MessageBuilder
text_prompts = [
    "Explain quantum computing in simple terms.",
    "What are the benefits of renewable energy?",
    "Describe the process of photosynthesis.",
    "How does machine learning work?",
    "What is the significance of DNA in genetics?",
    "Explain the water cycle."
]

# Create BedrockConverseRequest objects using MessageBuilder
text_requests = []
for prompt in text_prompts:
    request = create_text_request_with_builder(text=prompt)
    text_requests.append(request)

print(f"üìù Created {len(text_requests)} text requests using MessageBuilder")
for i, req in enumerate(text_requests, 1):
    print(f"   {i}. {req.request_id}: {text_prompts[i-1][:50]}...")

try:
    # Execute requests in parallel
    print(f"\nüöÄ Executing {len(text_requests)} requests in parallel...")
    start_time = time.time()
    
    parallel_response = parallel_manager.converse_parallel(
        requests=text_requests
        # Note: target_regions_per_request not specified - will auto-adjust based on 
        # available regions (3) and max_concurrent_requests (8), using min(8,3) = 3
    )
    
    end_time = time.time()
    sequential_estimate = len(text_requests) * 3  # Rough estimate: 3 seconds per request
    
    print(f"‚úÖ Parallel execution completed in {end_time - start_time:.2f} seconds")
    print(f"‚ö° Estimated sequential time: {sequential_estimate:.1f} seconds")
    print(f"üöÄ Speed improvement: ~{sequential_estimate / (end_time - start_time):.1f}x faster")
    
    # Display results
    display_parallel_response(parallel_response, "üó£Ô∏è  Parallel Text Processing Results")
    display_individual_responses(parallel_response, max_responses=2)

except Exception as e:
    print(f"‚ùå Error in parallel text processing: {e}")
    print(f"   Type: {type(e).__name__}")



üí¨ Example 1: Basic Parallel Text Processing (MessageBuilder)
üìù Created 6 text requests using MessageBuilder
   1. req_5f6cb0b00185_1766147953802246: Explain quantum computing in simple terms....
   2. req_a5f1b5bc179c_1766147953802316: What are the benefits of renewable energy?...
   3. req_2e6abd38ef07_1766147953802359: Describe the process of photosynthesis....
   4. req_220a8f058e7d_1766147953802397: How does machine learning work?...
   5. req_12c3b25b8eaf_1766147953802436: What is the significance of DNA in genetics?...
   6. req_391b2dd20932_1766147953802473: Explain the water cycle....

üöÄ Executing 6 requests in parallel...
‚úÖ Parallel execution completed in 16.89 seconds
‚ö° Estimated sequential time: 18.0 seconds
üöÄ Speed improvement: ~1.1x faster

üó£Ô∏è  Parallel Text Processing Results
‚úÖ Overall Success: True
üìä Success Rate: 100.0%
‚è±Ô∏è  Total Duration: 16891.67ms
üî¢ Total Requests: 6
‚úÖ Successful Requests: 6
‚ùå Failed Requests: 0

üìà Execution St

## Example 2: Parallel Image Analysis üñºÔ∏è

Now let's demonstrate parallel processing of image analysis requests using MessageBuilder's local image functionality.

In [None]:
print("üñºÔ∏è  Example 2: Parallel Image Analysis (MessageBuilder)")
print("=" * 50)

# Define image analysis tasks
image_tasks = [
    {
        "text": "Analyze the architectural features and historical significance of this landmark.",
        "image_path": "../images/1200px-Tour_Eiffel_Wikimedia_Commons_(cropped).jpg"
    },
    {
        "text": "Describe the structural design and engineering aspects of this tower.",
        "image_path": "../images/Tokyo_Tower_2023.jpg"
    },
    {
        "text": "What can you tell me about the lighting and atmospheric conditions in this image?",
        "image_path": "../images/1200px-Tour_Eiffel_Wikimedia_Commons_(cropped).jpg"
    },
    {
        "text": "Compare this tower's design to other famous towers around the world.",
        "image_path": "../images/Tokyo_Tower_2023.jpg"
    }
]

# Create image analysis requests using MessageBuilder
image_requests = []
for task in image_tasks:
    request = create_image_request_with_builder(
        text=task["text"],
        image_path=task["image_path"]
    )
    image_requests.append(request)

print(f"üñºÔ∏è  Created {len(image_requests)} image analysis requests using MessageBuilder")
for i, task in enumerate(image_tasks, 1):
    image_name = Path(task["image_path"]).name
    print(f"   {i}. {image_name}: {task['text'][:40]}...")

try:
    # Execute image analysis in parallel
    print(f"\nüöÄ Executing {len(image_requests)} image analysis requests in parallel...")
    start_time = time.time()
    
    image_response = parallel_manager.converse_parallel(
        requests=image_requests,
        target_regions_per_request=3  # Use more regions for vision models
    )
    
    end_time = time.time()
    
    print(f"‚úÖ Parallel image analysis completed in {end_time - start_time:.2f} seconds")
    
    # Display results
    display_parallel_response(image_response, "üñºÔ∏è  Parallel Image Analysis Results")
    display_individual_responses(image_response, max_responses=2)

except Exception as e:
    print(f"‚ùå Error in parallel image analysis: {e}")
    print(f"   Type: {type(e).__name__}")

## Example 3: Performance Benchmarking üìä

Let's benchmark the performance improvements of parallel processing versus sequential execution using MessageBuilder.

In [None]:
print("üìä Example 3: Performance Benchmarking (MessageBuilder)")
print("=" * 50)

# Create benchmark requests using MessageBuilder
benchmark_prompts = [
    "Summarize the history of computer science.",
    "Explain the principles of quantum mechanics.",
    "Describe the process of cellular respiration.",
    "What are the major causes of climate change?",
    "How do neural networks learn?",
    "Explain the theory of evolution.",
    "What is the importance of biodiversity?",
    "How does the internet work?"
]

benchmark_requests = []
for prompt in benchmark_prompts:
    request = create_text_request_with_builder(
        text=prompt,
        inference_config={"maxTokens": 400, "temperature": 0.6}
    )
    benchmark_requests.append(request)

print(f"üìù Created {len(benchmark_requests)} benchmark requests using MessageBuilder")

# Test 1: Sequential execution using single requests
print("\nüêå Sequential Execution Test...")
sequential_start = time.time()
sequential_responses = []
sequential_success_count = 0

try:
    for i, request in enumerate(benchmark_requests[:4], 1):  # Limit to 4 for time
        print(f"   Processing request {i}/4...")
        response = parallel_manager.converse_with_request(request)
        sequential_responses.append(response)
        if response.success:
            sequential_success_count += 1
    
    sequential_end = time.time()
    sequential_duration = sequential_end - sequential_start
    
    print(f"‚úÖ Sequential execution completed in {sequential_duration:.2f} seconds")
    print(f"   Success rate: {(sequential_success_count/4)*100:.1f}%")
    print(f"   Average per request: {sequential_duration/4:.2f} seconds")
    
except Exception as e:
    print(f"‚ùå Error in sequential execution: {e}")
    sequential_duration = float('inf')  # Set high value for comparison

# Test 2: Parallel execution
print("\nüöÄ Parallel Execution Test...")
parallel_start = time.time()

try:
    parallel_response = parallel_manager.converse_parallel(
        requests=benchmark_requests[:4],  # Same 4 requests for fair comparison
        target_regions_per_request=2
    )
    
    parallel_end = time.time()
    parallel_duration = parallel_end - parallel_start
    
    print(f"‚úÖ Parallel execution completed in {parallel_duration:.2f} seconds")
    print(f"   Success rate: {parallel_response.get_success_rate():.1f}%")
    
    # Performance comparison
    if sequential_duration != float('inf'):
        speedup = sequential_duration / parallel_duration
        efficiency = (speedup / parallel_config.max_concurrent_requests) * 100
        
        print(f"\nüìà Performance Comparison:")
        print(f"   Sequential: {sequential_duration:.2f}s")
        print(f"   Parallel: {parallel_duration:.2f}s")
        print(f"   Speedup: {speedup:.2f}x")
        print(f"   Parallel Efficiency: {efficiency:.1f}%")
        print(f"   Time Saved: {sequential_duration - parallel_duration:.2f}s ({((sequential_duration - parallel_duration)/sequential_duration)*100:.1f}%)")
    
    # Display detailed parallel statistics
    display_parallel_response(parallel_response, "üìä Benchmark Results")
    
except Exception as e:
    print(f"‚ùå Error in parallel execution: {e}")