# Chapter 11: Edge Management - Interactive Demo

This notebook demonstrates the complete edge management system for on-device AI, including:

1. **Device-Specific Memory Management** with Watermark system
2. **Three-Tiered Context Compression** (500-token buffer, Summary Chain, Semantic Memory)
3. **Concurrency & Parallel Processing** (Thread-Safe Engine, TaskGroup concurrency)
4. **Database Performance Tuning** (WAL mode, connection pooling, batch operations)
5. **Production Architecture Patterns** (Error handling, graceful degradation, state persistence)

**Pocket Agents: A Practical Guide to On‑Device Artificial Intelligence**


## 🎯 **Learning Objectives**

By the end of this notebook, you will understand:

1. **Device-Specific Memory Management** - How to profile and optimize memory usage on edge devices
2. **Three-Tiered Context Compression** - Advanced context management with semantic memory
3. **Concurrency & Parallel Processing** - Thread-safe operations and TaskGroup optimization
4. **Database Performance Tuning** - SQLite optimization for edge RAG systems
5. **Production Architecture Patterns** - Error handling, graceful degradation, and state persistence

## 📋 **Prerequisites**

- Basic Python knowledge
- Understanding of memory management concepts
- Familiarity with concurrent programming
- Knowledge of database operations

## 🚀 **Let's Get Started!**


# 1. Device-Specific Memory Management

## Memory Profiling and Watermark System

Let's start by exploring how to profile device memory and implement the watermark protection system.


In [None]:
# Import required modules
import sys
import os
sys.path.append('.')

from memory_management import (
    DeviceMemoryProfiler, 
    DeviceMemoryManager, 
    WatermarkConfig, 
    WatermarkLevel
)

# Initialize memory profiler
print("🔍 Profiling Device Memory...")
profiler = DeviceMemoryProfiler()
profiler.print_profile()

# Get structured profile
profile = profiler.get_profile()
print(f"\n📊 Device Profile Summary:")
print(f"  Total Memory: {profile.total_memory_mb:.1f} MB")
print(f"  Available Memory: {profile.available_memory_mb:.1f} MB")
print(f"  CPU Cores: {profile.cpu_count}")
print(f"  UMA Architecture: {profile.is_uma}")
print(f"  GPU Available: {profile.gpu_available}")


In [None]:
# Initialize Device Memory Manager with Watermark System
print("\n🛡️ Initializing Device Memory Manager with Watermark Protection...")

# Configure watermark system
watermark_config = WatermarkConfig(
    low_threshold=0.7,    # 70% usage
    medium_threshold=0.85, # 85% usage  
    high_threshold=0.95    # 95% usage
)

# Create memory manager
memory_manager = DeviceMemoryManager(profiler, watermark_config)

print(f"📏 Memory Boundaries:")
print(f"  Model Memory Limit: {memory_manager.model_memory_limit:.1f} MB")
print(f"  Safety Margin: {memory_manager.safety_margin:.1f} MB")
print(f"  KV Cache Budget: {memory_manager.kv_cache_budget:.1f} MB")

# Test model loading capability
test_model_sizes = [500, 1000, 2000, 4000, 8000]  # MB
print(f"\n🧪 Testing Model Loading Capability:")
for model_size in test_model_sizes:
    can_load = memory_manager.can_load_model(model_size)
    context_size = memory_manager.get_optimal_context_size(model_size)
    print(f"  {model_size:4d}MB model: {'✅' if can_load else '❌'} Loadable, {context_size} tokens context")


# 2. Three-Tiered Context Compression System

## Advanced Context Management with Semantic Memory

Now let's explore the three-tiered context system that provides intelligent context compression and semantic memory injection.


In [None]:
from context_optimization import ThreeTieredContextSystem, ContextConfig

# Configure three-tiered context system
print("🧠 Initializing Three-Tiered Context System...")

config = ContextConfig(
    max_context_tokens=2048,
    buffer_tokens=500,           # Tier 1: Raw Buffer
    system_prompt_tokens=100,
    compression_threshold=0.9    # Compress at 90% capacity
)

context_system = ThreeTieredContextSystem(config)

print(f"📋 Context System Configuration:")
print(f"  Max Context Tokens: {config.max_context_tokens}")
print(f"  Buffer Tokens: {config.buffer_tokens}")
print(f"  Compression Threshold: {config.compression_threshold}")

# Simulate a conversation
print(f"\n💬 Simulating Conversation...")

# Add messages to demonstrate the system
messages = [
    ("user", "Hello! I'm working on a Python project and need help with data analysis.", 20),
    ("assistant", "I'd be happy to help with your Python data analysis project! What specific aspects are you working on?", 25),
    ("user", "I'm trying to analyze sales data using pandas and create visualizations with matplotlib.", 18),
    ("assistant", "Great! Pandas and matplotlib are excellent tools for data analysis. What kind of sales data are you working with?", 22),
    ("user", "I have monthly sales data for the past two years and want to identify trends and seasonal patterns.", 19),
    ("assistant", "Perfect! For trend analysis, you can use pandas' rolling averages and for seasonal patterns, consider grouping by month or quarter.", 24)
]

for role, content, tokens in messages:
    context_system.add_message(role, content, tokens)
    print(f"  {role}: {content} ({tokens} tokens)")

print(f"\n📊 Context System Status:")
stats = context_system.get_statistics()
for key, value in stats.items():
    print(f"  {key}: {value}")


In [None]:
# Demonstrate semantic memory retrieval
print(f"\n🔍 Testing Semantic Memory Retrieval...")

# Test semantic matches
queries = ["Python data analysis", "sales data trends", "pandas matplotlib"]
for query in queries:
    matches = context_system.get_semantic_matches(query, top_k=3)
    print(f"  Query: '{query}'")
    print(f"    Found {len(matches)} semantic matches")
    for i, match in enumerate(matches):
        print(f"    {i+1}. {match['content'][:50]}... (score: {match['score']:.3f})")

# Generate final prompt
print(f"\n📝 Generating Final Prompt...")
final_prompt = context_system.get_final_prompt("Can you help me create a bar chart showing monthly sales?")

print(f"Final Prompt Length: {len(final_prompt)} characters")
print(f"Final Prompt Preview:")
print("=" * 50)
print(final_prompt[:500] + "..." if len(final_prompt) > 500 else final_prompt)
print("=" * 50)


# 3. Concurrency & Parallel Processing

## Thread-Safe Operations and TaskGroup Optimization

Let's explore how to implement thread-safe operations and optimize concurrent processing for edge AI systems.


In [None]:
from concurrency_management import ThreadSafeEngine, TaskGroupManager, ProcessorCoreOptimizer
import asyncio
import time

# Initialize Thread-Safe Engine
print("🔧 Initializing Thread-Safe LLM Engine...")
engine = ThreadSafeEngine(max_workers=4)
engine.start()

print(f"Engine Status: {engine.get_queue_status()}")

# Submit multiple concurrent requests
print(f"\n📤 Submitting Concurrent Requests...")
requests = [
    ("req_1", "What is artificial intelligence?", 100),
    ("req_2", "Explain machine learning algorithms", 150),
    ("req_3", "How does deep learning work?", 120),
    ("req_4", "What are neural networks?", 80)
]

for req_id, prompt, max_tokens in requests:
    engine.submit_request(req_id, prompt, max_tokens=max_tokens)
    print(f"  Submitted: {req_id} - {prompt[:30]}...")

print(f"\n📊 Queue Status After Submission:")
status = engine.get_queue_status()
for key, value in status.items():
    print(f"  {key}: {value}")

# Retrieve responses
print(f"\n📥 Retrieving Responses...")
responses = []
for i in range(len(requests)):
    response = engine.get_response(timeout=10)
    if response:
        responses.append(response)
        print(f"  {response['id']}: {response['result'][:50]}...")
    else:
        print(f"  Request {i+1}: Timeout or no response")

# Stop engine
engine.stop()
print(f"\n🛑 Engine stopped")


In [None]:
# Demonstrate TaskGroup Concurrency for RAG
print(f"\n⚙️ Demonstrating TaskGroup Concurrency for RAG...")

# Initialize TaskGroup Manager
task_manager = TaskGroupManager(max_concurrent=3)
print(f"TaskGroup Manager initialized with {task_manager.max_concurrent} concurrent tasks")

# Simulate concurrent embedding generation
print(f"\n🔤 Running Concurrent Embedding Generation...")
texts = [
    "Machine learning is a subset of artificial intelligence",
    "Deep learning uses neural networks with multiple layers", 
    "Natural language processing enables computers to understand text",
    "Computer vision allows machines to interpret visual information",
    "Reinforcement learning learns through trial and error"
]

# Run embedding generation concurrently
start_time = time.time()
embeddings = await task_manager.run_embedding_generation(texts)
end_time = time.time()

print(f"Generated {len(embeddings)} embeddings in {end_time - start_time:.2f} seconds")
for i, embedding in enumerate(embeddings):
    print(f"  Text {i+1}: {texts[i][:40]}... -> {embedding[:20]}...")

# Simulate concurrent vector search
print(f"\n🔍 Running Concurrent Vector Search...")
start_time = time.time()
search_results = await task_manager.run_vector_search("machine learning neural networks", top_k=3)
end_time = time.time()

print(f"Found {len(search_results)} results in {end_time - start_time:.2f} seconds")
for i, result in enumerate(search_results):
    print(f"  Result {i+1}: {result['id']} (score: {result['score']:.3f})")


In [None]:
# Demonstrate Processor Core Optimization
print(f"\n🧠 Demonstrating Processor Core Optimization...")

# Initialize Processor Core Optimizer
optimizer = ProcessorCoreOptimizer()
print(f"Processor Core Optimizer initialized for {optimizer.cpu_count} CPU cores")

# Monitor core usage
print(f"\n📊 Monitoring CPU Core Usage...")
core_usage = optimizer.monitor_core_usage()
print(f"Current CPU Usage:")
for core, usage in core_usage.items():
    print(f"  {core}: {usage:.1f}%")

# Get optimization recommendations
print(f"\n💡 Getting Optimization Recommendations...")
recommendations = optimizer.get_optimization_recommendations()
for rec in recommendations:
    print(f"  - {rec}")

# Test workload-specific optimization
print(f"\n⚡ Testing Workload-Specific Optimization...")
workloads = ["cpu_bound", "io_bound", "memory_bound"]
for workload in workloads:
    config = optimizer.optimize_for_workload(workload, 1000, 2000)
    print(f"  {workload}: {config['num_workers']} workers, {config['threading_model']} threading")


# 4. Database Performance Tuning

## SQLite Optimization for Edge RAG Systems

Let's explore how to optimize database performance for edge AI applications using SQLite with WAL mode and connection pooling.


In [None]:
from database_optimization import EdgeDatabaseOptimizer, DatabaseConfig
import tempfile
import os

# Create temporary database for demo
temp_db = tempfile.NamedTemporaryFile(suffix='.db', delete=False)
db_path = temp_db.name
temp_db.close()

print(f"💾 Initializing Edge Database Optimizer...")
print(f"Database Path: {db_path}")

# Configure database with optimizations
config = DatabaseConfig(
    db_path=db_path,
    connection_pool_size=5,
    enable_wal_mode=True,
    cache_size_mb=64,
    enable_mmap=True
)

db_optimizer = EdgeDatabaseOptimizer(config)
print(f"Database initialized with {db_optimizer.connection_count} connections")

# Demonstrate batch operations
print(f"\n📦 Demonstrating Batch Operations...")

# Batch insert documents
documents = [
    {
        'title': f'Document {i}',
        'content': f'This is the content of document {i} about machine learning and AI.',
        'type': 'text',
        'metadata': {'source': 'demo', 'index': i}
    }
    for i in range(100)
]

print(f"Inserting {len(documents)} documents...")
start_time = time.time()
inserted_docs = db_optimizer.batch_insert_documents(documents)
end_time = time.time()
print(f"Inserted {inserted_docs} documents in {end_time - start_time:.2f} seconds")

# Batch insert embeddings
embeddings = [
    {
        'vector_id': f'embed_{i}',
        'vector_data': f'embedding_data_{i}'.encode(),
        'metadata': {'doc_id': f'doc_{i}'}
    }
    for i in range(100)
]

print(f"Inserting {len(embeddings)} embeddings...")
start_time = time.time()
inserted_embeddings = db_optimizer.batch_insert_embeddings(embeddings)
end_time = time.time()
print(f"Inserted {inserted_embeddings} embeddings in {end_time - start_time:.2f} seconds")


In [None]:
# Demonstrate search operations
print(f"\n🔍 Demonstrating Search Operations...")

# Search documents
print(f"Searching documents...")
doc_results = db_optimizer.search_documents("machine learning", limit=5)
print(f"Found {len(doc_results)} documents:")
for i, doc in enumerate(doc_results):
    print(f"  {i+1}. {doc['title']} - {doc['content'][:50]}...")

# Search embeddings
print(f"\nSearching embeddings...")
embed_results = db_optimizer.search_embeddings("embed_", limit=5)
print(f"Found {len(embed_results)} embeddings:")
for i, embed in enumerate(embed_results):
    print(f"  {i+1}. {embed['vector_id']} - {embed['vector_data'][:20]}...")

# Get database statistics
print(f"\n📊 Database Statistics:")
stats = db_optimizer.get_database_stats()
for key, value in stats.items():
    print(f"  {key}: {value}")

# Clean up
os.unlink(db_path)
print(f"\n🧹 Cleaned up temporary database")


# 5. Production Architecture Patterns

## Error Handling, Graceful Degradation, and State Persistence

Finally, let's explore production-grade patterns for building resilient and self-healing edge AI systems.


In [None]:
from production_patterns import (
    ProductionErrorHandler, 
    GracefulDegradationManager, 
    StatePersistenceManager,
    ErrorSeverity
)

# Initialize Production Systems
print("🏭 Initializing Production Architecture Systems...")

# Error Handler
error_handler = ProductionErrorHandler()
print(f"✅ Production Error Handler initialized")

# Graceful Degradation Manager
degradation_manager = GracefulDegradationManager(error_handler)
print(f"✅ Graceful Degradation Manager initialized")

# State Persistence Manager
state_manager = StatePersistenceManager("demo_agent_state.json")
print(f"✅ State Persistence Manager initialized")

# Demonstrate error handling
print(f"\n🚨 Demonstrating Error Handling...")

# Test different error severities
test_errors = [
    (ValueError("Invalid input parameter"), ErrorSeverity.LOW),
    (ConnectionError("Network timeout"), ErrorSeverity.MEDIUM),
    (MemoryError("Out of memory"), ErrorSeverity.HIGH),
    (SystemError("Critical system failure"), ErrorSeverity.CRITICAL)
]

for error, severity in test_errors:
    print(f"Handling {severity.value} error: {type(error).__name__}")
    error_handler.handle_error(error, {'operation': 'demo', 'component': 'test'}, severity)

# Get error statistics
print(f"\n📊 Error Statistics:")
error_stats = error_handler.get_error_stats()
for key, value in error_stats.items():
    print(f"  {key}: {value}")


In [None]:
# Demonstrate graceful degradation
print(f"\n📉 Demonstrating Graceful Degradation...")

# Show initial degradation status
initial_status = degradation_manager.get_degradation_status()
print(f"Initial degradation status: {initial_status}")

# Simulate increasing system stress
print(f"\nSimulating increasing system stress...")
for i in range(4):
    degradation_manager.increase_degradation()
    status = degradation_manager.get_degradation_status()
    print(f"  Stress level {i+1}: {status}")

# Simulate system recovery
print(f"\nSimulating system recovery...")
for i in range(2):
    degradation_manager.decrease_degradation()
    status = degradation_manager.get_degradation_status()
    print(f"  Recovery level {i+1}: {status}")

# Demonstrate state persistence
print(f"\n💾 Demonstrating State Persistence...")

# Set agent state
agent_state = {
    "current_goal": "Plan a multi-day trip to Mars",
    "conversation_summary": "User wants to travel to Mars and explore the red planet",
    "current_task": "Research Mars landing sites and travel logistics",
    "preferences": {
        "duration": "7 days",
        "budget": "unlimited",
        "interests": ["space exploration", "scientific research"]
    }
}

print(f"Setting agent state...")
for key, value in agent_state.items():
    state_manager.set_state(key, value)
    print(f"  {key}: {value}")

# Save state
state_manager.save_state()
print(f"✅ Agent state saved to {state_manager.storage_path}")

# Get state summary
summary = state_manager.get_state_summary()
print(f"\n📋 State Summary:")
for key, value in summary.items():
    print(f"  {key}: {value}")


In [None]:
# Simulate agent restart and state recovery
print(f"\n🔄 Simulating Agent Restart and State Recovery...")

# Create new state manager (simulating restart)
new_state_manager = StatePersistenceManager("demo_agent_state.json")
new_state_manager.load_state()

print(f"Recovered agent state after restart:")
recovered_goal = new_state_manager.get_state("current_goal")
recovered_summary = new_state_manager.get_state("conversation_summary")
recovered_task = new_state_manager.get_state("current_task")
recovered_preferences = new_state_manager.get_state("preferences")

print(f"  Goal: {recovered_goal}")
print(f"  Summary: {recovered_summary}")
print(f"  Task: {recovered_task}")
print(f"  Preferences: {recovered_preferences}")

# Clean up demo files
if os.path.exists("demo_agent_state.json"):
    os.remove("demo_agent_state.json")
    print(f"\n🧹 Cleaned up demo state file")

print(f"\n✅ Production Architecture Demo Complete!")


# 🎉 **Summary and Next Steps**

## What We've Learned

In this comprehensive demo, we've explored all the key components of edge management for AI systems:

### ✅ **Memory Management**
- Device-specific memory profiling with UMA support
- Watermark system for proactive memory protection
- Three non-negotiable memory boundaries
- Model loading capability assessment

### ✅ **Context Optimization**
- Three-tiered context compression system
- Raw buffer, summary chain, and semantic memory
- Dynamic context sizing based on device capabilities
- Intelligent compression triggers

### ✅ **Concurrency & Parallel Processing**
- Thread-safe LLM inference engine
- TaskGroup-based concurrent RAG operations
- Processor core utilization optimization
- Workload-specific performance tuning

### ✅ **Database Performance Tuning**
- SQLite WAL mode optimization
- Connection pooling for edge devices
- Batch operations for efficient data ingestion
- Database performance monitoring

### ✅ **Production Architecture Patterns**
- Production-grade error handling with severity levels
- Graceful degradation management
- Agent state persistence across reboots
- Self-healing system architecture

## 🚀 **Next Steps**

1. **Run the Complete Test Suite**
   ```bash
   pytest test_edge_management.py -v
   ```

2. **Explore Individual Modules**
   - `python memory_management.py`
   - `python context_optimization.py`
   - `python concurrency_management.py`
   - `python database_optimization.py`
   - `python production_patterns.py`

3. **Integrate into Your Projects**
   - Use these patterns in your own edge AI applications
   - Customize the configurations for your specific devices
   - Monitor and optimize based on real-world usage

4. **Continue Learning**
   - Explore Chapter 12: Agentic Best Practices
   - Implement the Hero Project with edge optimizations
   - Deploy to production environments

## 💡 **Key Takeaways**

- **Edge AI requires careful resource management** - Memory, context, and concurrency must be optimized for device constraints
- **Production systems need resilience** - Error handling, graceful degradation, and state persistence are essential
- **Performance optimization is workload-specific** - Different tasks require different optimization strategies
- **Monitoring and adaptation are crucial** - Systems must continuously monitor and adjust to maintain optimal performance

---

*"Efficient edge management is the difference between a prototype and a production system."*
