# 🚀 **AI Resilience Masterclass: Building Unbreakable LLM Systems**
## **6-Hour Deep Dive into Error Handling, Rate Limiting & Self-Healing AI**

---

### 🎯 **Welcome to Your Journey Towards Bulletproof AI!**

```ascii
    🤖 Your AI System Journey
    ┌─────────────────────┐
    │   Fragile System    │ ──► Error Fundamentals
    └─────────────────────┘
              │
              ▼
    ┌─────────────────────┐
    │  Resilient System   │ ──► Graceful Degradation
    └─────────────────────┘
              │
              ▼
    ┌─────────────────────┐
    │ Self-Healing System │ ──► Autonomous Recovery
    └─────────────────────┘
```

### 🎓 **What You'll Master Today**

| Module | Time | What You'll Build | Real-World Impact |
|--------|------|-------------------|-------------------|
| **1. Error Olympics** | 45min | Custom error handlers with retry logic | 99.9% uptime |
| **2. Graceful Degradation** | 60min | Multi-tier fallback system | Zero downtime |
| **3. Rate Limit Ninja** | 45min | Smart quota management | 50% cost reduction |
| **4. Circuit Breakers** | 60min | Self-protecting systems | Auto-recovery |
| **5. Observability Hub** | 45min | Real-time monitoring | Predictive maintenance |
| **6. Self-Healing AI** | 75min | Autonomous recovery system | 24/7 reliability |

### 🛠️ **Your Toolkit for Today**
- 🔑 OpenAI API (the only external service we need!)
- 📊 Real-time dashboards
- 🎮 Interactive experiments
- 💾 Production-ready patterns
- 🚨 Live monitoring & alerts

### ⚡ **Why This Matters**
- **Netflix**: 99.99% availability = resilient systems
- **OpenAI**: Handles millions of requests = smart rate limiting
- **Google**: Self-healing infrastructure = autonomous recovery

**Today, you'll build systems that match these standards!**

## 📋 **Pre-Flight Checklist**
### Let's ensure your environment is ready for takeoff! 🚁

In [1]:
# 🎯 Cell 1: Environment Setup & Validation
# This cell sets up EVERYTHING you need for the workshop!
import subprocess
import sys
import os
from datetime import datetime
print("🚀 AI Resilience Workshop - System Check")
print("="*50)
print(f"⏰ Workshop Start Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("="*50)

# Install required packages
packages = [
    'openai>=1.0.0',
    'tenacity',  # For retry logic
    'ratelimit',  # For rate limiting
    'prometheus-client',  # For metrics
    'plotly',  # For interactive visualizations
    'pandas',
    'numpy',
    'ipywidgets',
    'rich',  # For beautiful terminal output
    'httpx',  # For advanced HTTP handling
    'asyncio',
    'aiohttp',
    'python-dotenv'
]

print("\n📦 Installing required packages...")
for package in packages:
    subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", package])
    print(f"  ✅ {package.split('>')[0]} installed")

# Import everything we need
import openai
import json
import time
import asyncio
import logging
from typing import Optional, Dict, Any, List, Callable
from dataclasses import dataclass, field
from enum import Enum
from collections import deque, defaultdict
from datetime import datetime, timedelta
import random
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
import traceback
import warnings
warnings.filterwarnings('ignore')

# Data science imports
import pandas as pd
import numpy as np
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots
import ipywidgets as widgets
from IPython.display import display, HTML, clear_output

# Resilience imports - FIXED VERSION
from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    retry_if_exception_type,
    before,  # Changed from before_retry
    after    # Changed from after_retry
)

print("\n🔑 OpenAI API Key Setup")
print("-"*40)

# Interactive API key setup
if 'OPENAI_API_KEY' not in os.environ:
    print("⚠️  No API key found in environment")
    print("Please enter your OpenAI API key:")
    api_key = input("API Key: ").strip()
    os.environ['OPENAI_API_KEY'] = api_key
    openai.api_key = api_key
else:
    openai.api_key = os.environ['OPENAI_API_KEY']
    print("✅ API key loaded from environment")

# Test API connection
print("\n🔍 Testing OpenAI API connection...")
try:
    client = openai.OpenAI()
    models = client.models.list()
    print("✅ Successfully connected to OpenAI API!")
    print(f"   Available models: {len(list(models))} models found")
except Exception as e:
    print(f"❌ Connection failed: {e}")
    print("   Please check your API key and try again")

# Setup logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('AIResilience')

print("\n✨ Environment ready! Let's build resilient AI systems!")
print("="*50)

🚀 AI Resilience Workshop - System Check
⏰ Workshop Start Time: 2025-09-19 19:43:32

📦 Installing required packages...
  ✅ openai installed
  ✅ tenacity installed
  ✅ ratelimit installed
  ✅ prometheus-client installed
  ✅ plotly installed
  ✅ pandas installed
  ✅ numpy installed
  ✅ ipywidgets installed
  ✅ rich installed
  ✅ httpx installed
  ✅ asyncio installed
  ✅ aiohttp installed
  ✅ python-dotenv installed

🔑 OpenAI API Key Setup
----------------------------------------
✅ API key loaded from environment

🔍 Testing OpenAI API connection...
✅ Successfully connected to OpenAI API!
   Available models: 88 models found

✨ Environment ready! Let's build resilient AI systems!


# 🏋️ **Module 1: The Error Olympics**
## Master Every Type of Failure Your AI Will Face!

---

## 🎪 **Welcome to the Failure Zoo!**
### Where Every Error Has a Story (and a Solution)

In the wild world of AI systems, errors aren't just bugs – they're opportunities to build resilience! Let's meet the entire cast of characters that will test your system's mettle.

---

## 🎭 **Act 1: The Main Cast of Error Characters**

### 🐢 **The Timeout Turtle** - *Chronicus Slowpokeus*
**Natural Habitat:** Network edges and overloaded servers  
**Favorite Food:** Your patience  
**Warning Signs:** 
- Requests taking >30 seconds
- Users refreshing repeatedly
- That spinning wheel of doom

**Theory Behind the Beast:**
Timeouts occur when network latency exceeds acceptable thresholds. They follow the **Long Tail Distribution** – most requests are fast, but a few are devastatingly slow.

```python
# The Timeout Lifecycle
Stage 1: Request sent         (0s)    😊
Stage 2: Waiting...           (10s)   😐
Stage 3: Still waiting...     (30s)   😰
Stage 4: Timeout!            (60s)   💀
```

**Survival Strategy:**
- Set aggressive timeouts (fail fast!)
- Implement exponential backoff
- Use circuit breakers to detect chronic slowness

---

### 🚦 **The Rate Limit Riot** - *Velocitus Restrictus*
**Natural Habitat:** API gateways and billing departments  
**Favorite Saying:** "Whoa there, speedster!"  
**Damage Type:** 429 Too Many Requests

**The Mathematics of Rate Limiting:**
```
Tokens Available = Bucket Capacity - Tokens Used
Refill Rate = Tokens per Second
Wait Time = (Tokens Needed - Tokens Available) / Refill Rate
```

**Visual Representation:**
```
Bucket at start:    [🪙🪙🪙🪙🪙] Full (5/5 tokens)
After 3 requests:   [🪙🪙⚫⚫⚫] Partial (2/5 tokens)
After rate limit:   [⚫⚫⚫⚫⚫] Empty (0/5 tokens)
                    ↓ Time passes...
After refill:       [🪙🪙🪙⚫⚫] Refilling (3/5 tokens)
```

**Pro Tips:**
- Implement request queuing
- Use multiple API keys for load distribution
- Cache aggressively to reduce API calls

---

### 🌊 **The Token Tsunami** - *Contextus Overloadicus*
**Natural Habitat:** Long conversation threads  
**Special Attack:** Context window overflow  
**Weakness:** Summarization magic

**Token Economics 101:**
| Model | Max Tokens | Cost Impact | Use Case |
|-------|------------|-------------|----------|
| GPT-4 | 8,192 | 💰💰💰 | Complex reasoning |
| GPT-4-32k | 32,768 | 💰💰💰💰💰 | Document analysis |
| GPT-3.5 | 4,096 | 💰 | Quick responses |

**The Token Overflow Equation:**
```
Total Tokens = Prompt Tokens + Completion Tokens + System Tokens
If Total Tokens > Max Context Window → 💥 ERROR
```

**Defensive Strategies:**
1. **The Sliding Window:** Keep only recent context
2. **The Summarizer:** Compress older messages
3. **The Chunker:** Split into smaller requests

---

### 😴 **The Service Slumber** - *API Hibernatus*
**Natural Habitat:** Cloud regions during maintenance  
**Symptoms:** 503 Service Unavailable  
**Duration:** 30 seconds to ∞

**The Anatomy of Downtime:**
```
99.9% Uptime = 8.76 hours downtime/year
99.99% Uptime = 52.56 minutes downtime/year
99.999% Uptime = 5.26 minutes downtime/year

Your Reality: Murphy's Law applies 100% of the time
```

**Recovery Patterns:**
```python
# The Resilience Ladder
Level 1: Retry ────────→ "Try, try again"
Level 2: Fallback ─────→ "Use backup service"
Level 3: Degrade ──────→ "Reduce functionality"
Level 4: Cache ────────→ "Use stale data"
Level 5: Queue ────────→ "Process later"
```

---

### 🎭 **The Format Fiasco** - *Parseus Impossibilus*
**Natural Habitat:** JSON responses and XML nightmares  
**Favorite Trick:** Returning HTML when you expect JSON  
**Battle Cry:** "undefined is not a function!"

**Common Format Failures:**
```javascript
Expected: {"result": "success", "data": {...}}
Reality:  "<html><body>503 Service Unavailable</body></html>"
Result:   JSON.parse() → 💥
```

---

## 📚 **Act 2: The Science of Failure**

### **The Error Probability Matrix**
```
                High Frequency │ Low Frequency
    High Impact ┌──────────────┼──────────────┐
                │   CRITICAL   │   PREPARE    │
                │ Rate Limits  │ Outages      │
                ├──────────────┼──────────────┤
    Low Impact  │   OPTIMIZE   │   ACCEPT     │
                │ Timeouts     │ Format Errors│
                └──────────────┴──────────────┘
```

### **The Resilience Pyramid**
```
         ╱╲          Prevent
        ╱  ╲         (Input validation, rate limiting)
       ╱────╲        
      ╱      ╲       Detect
     ╱ DETECT ╲      (Health checks, monitoring)
    ╱──────────╲     
   ╱            ╲    Respond
  ╱   RESPOND    ╲   (Retry, fallback, circuit break)
 ╱────────────────╲  
╱     RECOVER      ╲ Recover
└──────────────────┘ (Self-heal, scale, alert)
```

---

## 🎮 **Act 3: The Training Ground**

### **Error Handling Kata** 🥋
Practice these patterns until they become muscle memory:

**Pattern 1: The Graceful Retry**
```python
for attempt in range(3):
    try:
        result = risky_operation()
        break
    except TemporaryError:
        if attempt == 2:
            use_fallback()
        time.sleep(2 ** attempt)  # Exponential backoff
```

**Pattern 2: The Circuit Breaker**
```python
if circuit.is_open():
    return cached_response
try:
    response = make_request()
    circuit.record_success()
except:
    circuit.record_failure()
    if circuit.failure_count > threshold:
        circuit.open()
```

---

## 🏆 **Act 4: The Championship Round**

### **Error Olympics Events:**

**🏃 Sprint Recovery** (Fastest error recovery)
- Gold: <100ms fallback
- Silver: <500ms retry
- Bronze: <2s degradation

**🏋️ Load Bearing** (Most concurrent errors handled)
- Gold: 1000+ errors/second
- Silver: 100+ errors/second
- Bronze: 10+ errors/second

**🤸 Gymnastics** (Most graceful degradation)
- Gold: Seamless user experience
- Silver: Minor feature reduction
- Bronze: Maintenance mode

---

## 🎯 **Act 5: The Wisdom Wall**

### **Ancient Proverbs of Error Handling:**

> "A timeout in time saves nine retries" - *Ancient DevOps Wisdom*

> "He who fails fast, recovers faster" - *The Circuit Breaker's Creed*

> "Cache is king when the API is down" - *The Fallback Manifesto*

### **The Error Handler's Oath:**
```
I solemnly swear to:
✓ Never ignore an error silently
✓ Always provide meaningful error messages
✓ Log everything, panic about nothing
✓ Test my error paths as much as happy paths
✓ Remember that users don't care about my stack trace
```

---

## 🎨 **Act 6: The Visual Guide**

### **Error Flow Visualization:**
```
User Request
    ↓
[Primary Service]
    ↓
  Error? ──No──→ Success! 🎉
    ↓Yes
[Retry Logic]
    ↓
  Works? ──Yes─→ Success! 🎉
    ↓No
[Fallback Service]
    ↓
  Works? ──Yes─→ Degraded Success 😊
    ↓No
[Cache Check]
    ↓
  Found? ──Yes─→ Stale Success 😐
    ↓No
[Error Response]
    ↓
Graceful Failure 😢
```

---

## 📊 **Act 7: The Metrics That Matter**

### **Your Error Dashboard:**
```
┌─────────────────────────────────────┐
│ 🚨 ERROR RATE: 0.1%                │
│ ⏱️  P99 LATENCY: 2.3s               │
│ 🔄 RETRY SUCCESS: 87%               │
│ 💾 CACHE HIT: 45%                  │
│ 🔌 CIRCUIT STATE: [CLOSED]          │
└─────────────────────────────────────┘
```

### **SLA Reality Check:**
- **99% uptime** = Your customers notice
- **99.9% uptime** = Your boss notices
- **99.99% uptime** = Nobody notices (until it breaks)
- **99.999% uptime** = You're probably lying

---

## 🎓 **Graduation Ceremony**

### **You've Mastered Error Handling When:**
- [ ] Errors make you curious, not panicked
- [ ] Your logs tell a story, not a mystery
- [ ] Users see helpful messages, not stack traces
- [ ] Your system degrades gracefully, not catastrophically
- [ ] You measure everything and assume nothing



## 🚀 **Next Steps: From Theory to Practice**

Ready to implement these patterns? Head to the code cells below where we'll build:
1. A retry mechanism that actually works
2. Circuit breakers that save your bacon
3. Fallback chains that never let you down
4. Monitoring that tells you what's really happening

Remember: **Every error handled well is a future outage prevented!**

---

*"In the face of ambiguity, refuse the temptation to guess. In the face of errors, refuse the temptation to ignore."* - The Zen of Error Handling

In [2]:
# 🎯 Cell 2: The Complete Error Taxonomy
# Let's build a comprehensive error handling system!

from enum import Enum, auto
from dataclasses import dataclass
from typing import Optional, Any, Callable, List, Dict
import time
import random
import openai
import traceback
from collections import deque, defaultdict
from datetime import datetime
import pandas as pd

class ErrorType(Enum):
    """Complete taxonomy of AI system errors"""
    # Network errors
    TIMEOUT = auto()
    CONNECTION_ERROR = auto()
    DNS_FAILURE = auto()
    
    # API errors
    RATE_LIMIT = auto()
    QUOTA_EXCEEDED = auto()
    AUTHENTICATION = auto()
    
    # Model errors
    CONTEXT_LENGTH = auto()
    INVALID_INPUT = auto()
    CONTENT_FILTER = auto()
    
    # Service errors
    SERVICE_UNAVAILABLE = auto()
    INTERNAL_ERROR = auto()
    
    # Data errors
    PARSING_ERROR = auto()
    VALIDATION_ERROR = auto()

@dataclass
class ErrorContext:
    """Rich context for error handling decisions"""
    error_type: ErrorType
    timestamp: float
    attempt_number: int
    error_message: str
    request_data: Optional[Dict[str, Any]] = None
    response_data: Optional[Dict[str, Any]] = None
    traceback: Optional[str] = None
    
    @property
    def is_retryable(self) -> bool:
        """Determine if error is worth retrying"""
        non_retryable = {
            ErrorType.AUTHENTICATION,
            ErrorType.INVALID_INPUT,
            ErrorType.CONTENT_FILTER,
            ErrorType.CONTEXT_LENGTH
        }
        return self.error_type not in non_retryable

class ResilientAIClient:
    """Production-grade resilient OpenAI client"""
    
    def __init__(self):
        self.client = openai.OpenAI()
        self.error_history = deque(maxlen=100)
        self.metrics = defaultdict(int)
        self.callbacks = {}
        
    def register_error_callback(self, error_type: ErrorType, callback: Callable):
        """Register custom error handlers"""
        self.callbacks[error_type] = callback
        
    def classify_error(self, exception: Exception) -> ErrorType:
        """Smart error classification"""
        error_str = str(exception).lower()
        
        # Classification logic
        if "rate limit" in error_str or "429" in error_str:
            return ErrorType.RATE_LIMIT
        elif "timeout" in error_str:
            return ErrorType.TIMEOUT
        elif "connection" in error_str:
            return ErrorType.CONNECTION_ERROR
        elif "context length" in error_str or "token" in error_str:
            return ErrorType.CONTEXT_LENGTH
        elif "503" in error_str or "unavailable" in error_str:
            return ErrorType.SERVICE_UNAVAILABLE
        elif "authentication" in error_str or "401" in error_str:
            return ErrorType.AUTHENTICATION
        elif "500" in error_str:
            return ErrorType.INTERNAL_ERROR
        else:
            return ErrorType.INTERNAL_ERROR
    
    async def call_with_resilience(
        self,
        messages: List[Dict[str, str]],
        model: str = "gpt-4o",  # Updated to gpt-4o
        max_retries: int = 3,
        **kwargs
    ):
        """Make API call with comprehensive error handling"""
        
        for attempt in range(max_retries):
            try:
                # Record attempt
                self.metrics['total_attempts'] += 1
                
                # Make the actual API call
                response = self.client.chat.completions.create(
                    model=model,
                    messages=messages,
                    **kwargs
                )
                
                # Success!
                self.metrics['successful_calls'] += 1
                return response
                
            except Exception as e:
                # Classify the error
                error_type = self.classify_error(e)
                
                # Create error context
                context = ErrorContext(
                    error_type=error_type,
                    timestamp=time.time(),
                    attempt_number=attempt + 1,
                    error_message=str(e),
                    request_data={'messages': messages, 'model': model},
                    traceback=traceback.format_exc()
                )
                
                # Record error
                self.error_history.append(context)
                self.metrics[f'error_{error_type.name}'] += 1
                
                # Execute callback if registered
                if error_type in self.callbacks:
                    self.callbacks[error_type](context)
                
                # Decide on retry
                if not context.is_retryable or attempt == max_retries - 1:
                    raise
                
                # Calculate backoff
                wait_time = self._calculate_backoff(attempt, error_type)
                print(f"⏳ Retry {attempt + 1}/{max_retries} after {wait_time:.1f}s")
                time.sleep(wait_time)
    
    def _calculate_backoff(self, attempt: int, error_type: ErrorType) -> float:
        """Intelligent backoff calculation"""
        base_wait = 1.0
        
        # Different strategies for different errors
        if error_type == ErrorType.RATE_LIMIT:
            # Longer wait for rate limits
            return base_wait * (2 ** attempt) + random.uniform(0, 1)
        elif error_type == ErrorType.SERVICE_UNAVAILABLE:
            # Even longer for service issues
            return base_wait * (3 ** attempt)
        else:
            # Standard exponential backoff
            return base_wait * (1.5 ** attempt)
    
    def get_error_report(self) -> pd.DataFrame:
        """Generate error analytics report"""
        if not self.error_history:
            return pd.DataFrame()
        
        data = []
        for error in self.error_history:
            data.append({
                'timestamp': datetime.fromtimestamp(error.timestamp),
                'type': error.error_type.name,
                'attempt': error.attempt_number,
                'retryable': error.is_retryable,
                'message': error.error_message[:50]
            })
        
        return pd.DataFrame(data)

# Create our resilient client
resilient_client = ResilientAIClient()

# Register some custom error handlers
def handle_rate_limit(context: ErrorContext):
    print(f"🚦 Rate limit hit! Waiting longer...")
    print(f"   Error: {context.error_message}")

def handle_timeout(context: ErrorContext):
    print(f"⏱️ Timeout occurred on attempt {context.attempt_number}")

resilient_client.register_error_callback(ErrorType.RATE_LIMIT, handle_rate_limit)
resilient_client.register_error_callback(ErrorType.TIMEOUT, handle_timeout)

print("✅ Resilient AI Client initialized!")
print(f"📊 Error types tracked: {len(ErrorType)}")
print(f"🔄 Retry logic: Exponential backoff with jitter")
print(f"📈 Metrics collection: Enabled")
print(f"🤖 Default model: gpt-4o")

✅ Resilient AI Client initialized!
📊 Error types tracked: 13
🔄 Retry logic: Exponential backoff with jitter
📈 Metrics collection: Enabled
🤖 Default model: gpt-4o


### 🧪 **Live Error Simulation Lab**
#### Let's trigger real errors and see our handlers in action!

In [3]:
# 🎯 Cell 3: Interactive Error Simulator
# Experience different failure modes in a controlled environment!

import openai
import time
import ipywidgets as widgets
from IPython.display import display, clear_output

class ErrorSimulator:
    """Simulate various API errors for testing"""
    
    def __init__(self):
        self.simulation_mode = True
        self.error_probability = 0.5
        self.client = openai.OpenAI()
        
    def simulate_error(self, error_type: str):
        """Simulate specific error types"""
        
        # Create simple exceptions that mimic OpenAI errors
        errors = {
            'rate_limit': Exception("Error code: 429 - Rate limit exceeded. Please retry after 1 second."),
            'timeout': TimeoutError("Request timeout after 30s"),
            'context_length': Exception("Error: Maximum context length (8192 tokens) exceeded"),
            'service_unavailable': Exception("Error code: 503 - The server is temporarily unavailable"),
            'auth': Exception("Error code: 401 - Incorrect API key provided")
        }
        
        if error_type in errors:
            raise errors[error_type]
    
    def make_call_with_chaos(self, messages, error_type=None):
        """Make API call with optional chaos engineering"""
        
        # Randomly inject errors if in simulation mode
        if self.simulation_mode and error_type:
            self.simulate_error(error_type)
        
        # Otherwise make real call
        return self.client.chat.completions.create(
            model="gpt-4o",  # Updated to gpt-4o
            messages=messages,
            max_tokens=50
        )

# Create simulator
simulator = ErrorSimulator()

# Interactive error testing widget
print("🎮 Interactive Error Testing Console")
print("="*50)

error_dropdown = widgets.Dropdown(
    options=['none', 'rate_limit', 'timeout', 'context_length', 'service_unavailable', 'auth'],
    value='none',
    description='Error Type:',
    style={'description_width': 'initial'}
)

test_button = widgets.Button(
    description='🧪 Trigger Error',
    button_style='danger',
    tooltip='Simulate the selected error',
    layout=widgets.Layout(width='200px')
)

output = widgets.Output()

def test_error_handling(b):
    with output:
        clear_output()
        error_type = error_dropdown.value
        
        print(f"\n🔬 Testing: {error_type}")
        print("-"*40)
        
        messages = [{"role": "user", "content": "Hello! Please respond with a brief greeting."}]
        
        try:
            if error_type == 'none':
                # Make real API call
                print("📡 Making real API call to gpt-4o...")
                response = simulator.client.chat.completions.create(
                    model="gpt-4o",  # Updated to gpt-4o
                    messages=messages,
                    max_tokens=50
                )
                print("✅ Success! Response received:")
                print(f"   {response.choices[0].message.content}")
            else:
                # Simulate error
                print(f"💥 Simulating {error_type} error...")
                simulator.make_call_with_chaos(messages, error_type)
                
        except Exception as e:
            print(f"❌ Error caught: {type(e).__name__}")
            print(f"   Message: {str(e)}")
            
            # Now show recovery
            print("\n🔄 Attempting recovery...")
            
            # Classify and handle
            error_class = resilient_client.classify_error(e)
            print(f"   Error classified as: {error_class.name}")
            
            if error_class == ErrorType.RATE_LIMIT:
                print("   Strategy: Exponential backoff with jitter")
                print("   Waiting 2 seconds...")
                time.sleep(2)
                print("   ✅ Ready to retry!")
                
            elif error_class == ErrorType.TIMEOUT:
                print("   Strategy: Reduce payload size and retry")
                print("   ✅ Optimized request prepared")
                
            elif error_class == ErrorType.CONTEXT_LENGTH:
                print("   Strategy: Truncate context and retry")
                print("   ✅ Context reduced by 50%")
                
            elif error_class == ErrorType.AUTHENTICATION:
                print("   Strategy: Check API key and credentials")
                print("   ❌ Cannot retry - authentication required")
                
            elif error_class == ErrorType.SERVICE_UNAVAILABLE:
                print("   Strategy: Use fallback service or wait")
                print("   Waiting 3 seconds for service recovery...")
                time.sleep(3)
                print("   ✅ Ready to retry with circuit breaker")
                
            else:
                print("   Strategy: Wait and retry with fallback model")
                print("   ✅ Fallback strategy ready")
            
            # Show what would happen next
            print("\n📝 Next steps:")
            if error_class in [ErrorType.RATE_LIMIT, ErrorType.TIMEOUT, ErrorType.SERVICE_UNAVAILABLE]:
                print("   1. Retry with exponential backoff")
                print("   2. If fails, try alternative model")
                print("   3. If still fails, return cached response")
            elif error_class == ErrorType.CONTEXT_LENGTH:
                print("   1. Truncate oldest messages")
                print("   2. Summarize context if possible")
                print("   3. Retry with reduced context")
            elif error_class == ErrorType.AUTHENTICATION:
                print("   1. Notify user of auth issue")
                print("   2. Provide instructions to fix")
                print("   3. Cannot proceed without valid credentials")

test_button.on_click(test_error_handling)

# Display the interface
display(widgets.VBox([
    widgets.HTML("<h3>🎯 Select an error type and click to simulate:</h3>"),
    widgets.HTML("<p style='color: #666;'>Choose 'none' to make a real API call, or select an error to simulate</p>"),
    error_dropdown,
    test_button,
    output
]))

print("\n💡 Tips:")
print("• Select 'none' to test a real API call to gpt-4o")
print("• Try different error types to see recovery strategies")
print("• Notice how different errors require different handling approaches")

🎮 Interactive Error Testing Console


VBox(children=(HTML(value='<h3>🎯 Select an error type and click to simulate:</h3>'), HTML(value="<p style='col…


💡 Tips:
• Select 'none' to test a real API call to gpt-4o
• Try different error types to see recovery strategies
• Notice how different errors require different handling approaches


# 🎭 **Module 2: Graceful Degradation Mastery**
## The Art of Failing Elegantly 🩰

---

## 🏰 **The Degradation Castle**
Your AI system has multiple defensive walls. When one falls, another stands ready:

```
         👑 The Keep (Premium)
         └─ GPT-4: Full features, $$$
              ↓ fails
         🛡️ Inner Wall (Standard)  
         └─ GPT-3.5: Core features, $$
              ↓ fails
         ⚔️ Outer Wall (Cache)
         └─ Previous responses, ¢
              ↓ fails  
         🌉 The Moat (Static)
         └─ Template responses, free
              ↓ fails
         🏳️ Honest error message
```

---

## 📊 **The Service Ladder**

| Level | Reliability | Speed | Cost | User Experience |
|-------|------------|-------|------|-----------------|
| Premium | 95% | 2-3s | $$$ | Full features 😊 |
| Standard | 98% | 1-2s | $$ | Core features 🙂 |
| Cache | 99.9% | <100ms | ¢ | Recent responses 😐 |
| Static | 99.99% | <10ms | Free | Basic templates 😔 |

---

## 🎯 **Core Principle: The 80/20 Rule**

**80% of user value comes from 20% of features**

Essential (Keep Alive):
- Basic text responses
- Core functionality  
- Simple queries

Nice-to-Have (Can Degrade):
- Advanced analytics
- Real-time updates
- Complex formatting

---

## ✅ **Good Patterns**

### The Waterfall
```python
services = [premium_ai, standard_ai, cache, static]
for service in services:
    try:
        return await service.handle()
    except:
        continue  # Try next level
```

### Feature Flags
```python
if system_degraded:
    disable_features(['analytics', 'formatting'])
    keep_features(['basic_response', 'cache'])
```

---

## ❌ **Anti-Patterns to Avoid**

1. **The Cliff** - All or nothing approach
2. **Silent Degradation** - Not telling users
3. **Lying Fallback** - Fake responses

---

## 📈 **Degradation in Action**

```
Normal:    [🟢 Premium ] → Full response with all features
Degraded:  [🟡 Standard] → "Using simplified AI (high demand)"  
Emergency: [🔴 Cache   ] → "Here's a recent similar response"
```

---

## 🎓 **Key Lessons**

1. **Partial success > Complete failure**
2. **Fast mediocrity > Slow excellence**
3. **Honest communication > Silent degradation**
4. **Something > Nothing**

---

## 📊 **Success Metrics**

```
✅ Availability: 99.95%
✅ Graceful failures: 98%  
✅ User satisfaction during degradation: 4.2/5
✅ Cost savings: 40%
```

---

## 🏆 **Real-World Examples**

**Netflix:** 4K → HD → SD during peak hours
**Twitter:** Full timeline → Cached timeline under load
**Your System:** Premium AI → Standard → Cache → Static

---

## 💡 **Remember**

> "Users don't care about your architecture, they care about getting answers."

> "The best degradation is the one users don't notice."

---

**Your Toolkit:**
- 🏰 Multiple fallback layers
- 🎯 Core feature preservation
- 📊 Clear communication
- ✅ Tested degradation paths

**Next:** Rate Limiting - Because sometimes "slow down" is the answer! 🚦

In [4]:
# 🎯 Cell 4: Multi-Tier Fallback System
# Build a production-grade graceful degradation system!

from enum import Enum, auto
from abc import ABC, abstractmethod
from typing import Optional, Dict, Any, List
from collections import defaultdict, deque
import hashlib
import json
import time
import openai
import random

class ServiceTier(Enum):
    """Service quality tiers"""
    PREMIUM = auto()
    STANDARD = auto()
    BASIC = auto()
    CACHE = auto()
    STATIC = auto()

class FallbackStrategy(ABC):
    """Abstract base for fallback strategies"""
    
    @abstractmethod
    def execute(self, request: Dict[str, Any]) -> Any:
        pass
    
    @abstractmethod
    def can_handle(self, request: Dict[str, Any]) -> bool:
        pass

class ResponseCache:
    """Intelligent response caching"""
    
    def __init__(self, max_size: int = 1000):
        self.cache = {}
        self.access_times = {}
        self.max_size = max_size
        
    def _get_key(self, messages: List[Dict]) -> str:
        """Generate cache key from messages"""
        content = json.dumps(messages, sort_keys=True)
        return hashlib.md5(content.encode()).hexdigest()
    
    def get(self, messages: List[Dict]) -> Optional[str]:
        """Retrieve cached response"""
        key = self._get_key(messages)
        if key in self.cache:
            self.access_times[key] = time.time()
            print(f"   🎯 Cache hit for key: {key[:8]}...")
            return self.cache[key]
        return None
    
    def set(self, messages: List[Dict], response: str):
        """Cache a response"""
        if len(self.cache) >= self.max_size:
            # Evict least recently used
            lru_key = min(self.access_times, key=self.access_times.get)
            del self.cache[lru_key]
            del self.access_times[lru_key]
        
        key = self._get_key(messages)
        self.cache[key] = response
        self.access_times[key] = time.time()
        print(f"   💾 Response cached with key: {key[:8]}...")

class GracefulDegradationService:
    """Multi-tier service with automatic fallback"""
    
    def __init__(self):
        self.client = openai.OpenAI()
        self.cache = ResponseCache()
        self.metrics = defaultdict(int)
        self.tier_history = deque(maxlen=100)
        
        # Define tier configurations - Fixed model names!
        self.tier_configs = {
            ServiceTier.PREMIUM: {
                'model': 'gpt-4o',  # Premium tier
                'max_tokens': 500,
                'temperature': 0.7,
                'timeout': 30
            },
            ServiceTier.STANDARD: {
                'model': 'gpt-4o-mini',  # Fixed typo: was "pt-4o-mini"
                'max_tokens': 300,
                'temperature': 0.7,
                'timeout': 20
            },
            ServiceTier.BASIC: {
                'model': 'gpt-3.5-turbo',  # Basic tier
                'max_tokens': 100,
                'temperature': 0.5,
                'timeout': 10
            }
        }
        
        # Static fallbacks for common queries
        self.static_responses = {
            'greeting': "Hello! I'm experiencing high demand but I'm here to help.",
            'help': "I can assist with various tasks. Please be patient during high load.",
            'error': "I'm currently experiencing difficulties. Please try again shortly.",
            'general': "I'm operating in limited mode. Your query has been noted."
        }
        
        # Control simulation
        self.simulate_failures = False
        self.failure_tiers = []
    
    def classify_query(self, messages: List[Dict]) -> str:
        """Classify query type for static fallback"""
        if not messages:
            return 'general'
            
        last_message = messages[-1].get('content', '').lower()
        
        if any(word in last_message for word in ['hello', 'hi', 'hey']):
            return 'greeting'
        elif 'help' in last_message:
            return 'help'
        else:
            return 'general'
    
    def execute_with_fallback(
        self,
        messages: List[Dict],
        preferred_tier: ServiceTier = ServiceTier.PREMIUM
    ) -> Dict[str, Any]:
        """Execute request with automatic degradation - SYNCHRONOUS VERSION"""
        
        start_time = time.time()
        tiers_to_try = list(ServiceTier)
        
        # Start from preferred tier
        start_index = tiers_to_try.index(preferred_tier)
        tiers_to_try = tiers_to_try[start_index:]
        
        print(f"🔄 Starting with tier: {preferred_tier.name}")
        print(f"   Available fallback chain: {[t.name for t in tiers_to_try]}")
        
        for tier in tiers_to_try:
            try:
                self.metrics[f'attempt_{tier.name}'] += 1
                print(f"\n   Trying tier: {tier.name}...")
                
                # Simulate failures if enabled
                if self.simulate_failures and tier in self.failure_tiers:
                    print(f"   💥 Simulating failure for {tier.name}")
                    raise Exception(f"Simulated failure for tier {tier.name}")
                
                # Try cache first
                if tier == ServiceTier.CACHE:
                    cached = self.cache.get(messages)
                    if cached:
                        self.metrics['cache_hits'] += 1
                        print(f"   ✅ Cache hit!")
                        return {
                            'content': cached,
                            'tier': tier.name,
                            'cached': True,
                            'latency': time.time() - start_time
                        }
                    else:
                        print(f"   ❌ Cache miss, trying next tier...")
                        continue  # Try next tier
                
                # Try static response
                elif tier == ServiceTier.STATIC:
                    query_type = self.classify_query(messages)
                    response = self.static_responses.get(
                        query_type, 
                        self.static_responses['error']
                    )
                    print(f"   ✅ Using static response for '{query_type}' query")
                    return {
                        'content': response,
                        'tier': tier.name,
                        'static': True,
                        'latency': time.time() - start_time
                    }
                
                # Try API call
                else:
                    config = self.tier_configs[tier]
                    print(f"   📡 Calling {config['model']} API...")
                    
                    response = self.client.chat.completions.create(
                        messages=messages,
                        **config
                    )
                    
                    content = response.choices[0].message.content
                    
                    # Cache successful responses
                    self.cache.set(messages, content)
                    
                    self.metrics[f'success_{tier.name}'] += 1
                    self.tier_history.append(tier)
                    
                    print(f"   ✅ Success with {tier.name}!")
                    
                    return {
                        'content': content,
                        'tier': tier.name,
                        'cached': False,
                        'latency': time.time() - start_time,
                        'model': config['model']
                    }
                    
            except Exception as e:
                print(f"   ⚠️ Tier {tier.name} failed: {str(e)[:50]}")
                self.metrics[f'failure_{tier.name}'] += 1
                continue
        
        # Ultimate fallback
        print(f"   ❌ All tiers failed!")
        return {
            'content': "Service temporarily unavailable. Please try again.",
            'tier': 'FALLBACK',
            'error': True,
            'latency': time.time() - start_time
        }
    
    def get_service_metrics(self) -> Dict[str, Any]:
        """Get detailed service metrics"""
        total_requests = sum(v for k, v in self.metrics.items() if k.startswith('attempt_'))
        
        if total_requests == 0:
            return {}
        
        tier_distribution = {}
        for tier in ServiceTier:
            successes = self.metrics.get(f'success_{tier.name}', 0)
            attempts = self.metrics.get(f'attempt_{tier.name}', 0)
            failures = self.metrics.get(f'failure_{tier.name}', 0)
            tier_distribution[tier.name] = {
                'attempts': attempts,
                'successes': successes,
                'failures': failures,
                'success_rate': successes / attempts if attempts > 0 else 0
            }
        
        return {
            'total_requests': total_requests,
            'cache_hits': self.metrics.get('cache_hits', 0),
            'cache_hit_rate': self.metrics.get('cache_hits', 0) / total_requests if total_requests > 0 else 0,
            'tier_distribution': tier_distribution
        }

# Initialize the service
degradation_service = GracefulDegradationService()

print("🏰 Graceful Degradation Service Initialized!")
print("\n📊 Service Tiers:")
for tier in ServiceTier:
    if tier in degradation_service.tier_configs:
        config = degradation_service.tier_configs[tier]
        print(f"  • {tier.name}: {config['model']} ({config['max_tokens']} tokens)")
    else:
        print(f"  • {tier.name}: Special handling")

print("\n✨ Features:")
print("  • Automatic tier fallback")
print("  • Response caching")
print("  • Static fallbacks")
print("  • Detailed metrics")
print("\n✅ Ready for testing!")

🏰 Graceful Degradation Service Initialized!

📊 Service Tiers:
  • PREMIUM: gpt-4o (500 tokens)
  • STANDARD: gpt-4o-mini (300 tokens)
  • BASIC: gpt-3.5-turbo (100 tokens)
  • CACHE: Special handling
  • STATIC: Special handling

✨ Features:
  • Automatic tier fallback
  • Response caching
  • Static fallbacks
  • Detailed metrics

✅ Ready for testing!


### 🎮 **Interactive Degradation Tester**
#### Watch your system gracefully handle failures!

In [5]:
# 🎯 Cell 5: Live Degradation Testing Dashboard
# See graceful degradation in action with real API calls!

import ipywidgets as widgets
from IPython.display import display, clear_output
from datetime import datetime

print("🎮 Graceful Degradation Testing Dashboard")
print("="*50)

# Create interactive controls
query_input = widgets.Textarea(
    value='What is the meaning of life?',
    placeholder='Enter your query...',
    description='Query:',
    layout=widgets.Layout(width='500px', height='80px')
)

tier_selector = widgets.RadioButtons(
    options=['PREMIUM', 'STANDARD', 'BASIC', 'CACHE', 'STATIC'],
    value='PREMIUM',
    description='Start Tier:',
    style={'description_width': 'initial'}
)

# Failure simulation checkboxes
failure_premium = widgets.Checkbox(value=False, description='Fail PREMIUM')
failure_standard = widgets.Checkbox(value=False, description='Fail STANDARD')
failure_basic = widgets.Checkbox(value=False, description='Fail BASIC')

test_button = widgets.Button(
    description='🚀 Test Degradation',
    button_style='success',
    layout=widgets.Layout(width='200px')
)

clear_cache_button = widgets.Button(
    description='🗑️ Clear Cache',
    button_style='warning',
    layout=widgets.Layout(width='150px')
)

output_area = widgets.Output()
metrics_output = widgets.Output()

# Test history for visualization
test_history = []

def run_degradation_test(b):
    """Run the degradation test - SYNCHRONOUS"""
    with output_area:
        clear_output()
        
        print("🔄 Testing degradation system...")
        print("="*40)
        
        messages = [
            {"role": "system", "content": "You are a helpful assistant. Keep responses brief."},
            {"role": "user", "content": query_input.value}
        ]
        
        # Get starting tier
        start_tier = ServiceTier[tier_selector.value]
        
        # Configure failure simulation
        degradation_service.simulate_failures = any([
            failure_premium.value,
            failure_standard.value,
            failure_basic.value
        ])
        
        degradation_service.failure_tiers = []
        if failure_premium.value:
            degradation_service.failure_tiers.append(ServiceTier.PREMIUM)
        if failure_standard.value:
            degradation_service.failure_tiers.append(ServiceTier.STANDARD)
        if failure_basic.value:
            degradation_service.failure_tiers.append(ServiceTier.BASIC)
        
        if degradation_service.failure_tiers:
            print(f"⚠️ Simulating failures for: {[t.name for t in degradation_service.failure_tiers]}")
        
        try:
            # Execute with fallback - SYNCHRONOUS call
            result = degradation_service.execute_with_fallback(
                messages, 
                preferred_tier=start_tier
            )
            
            # Display result
            print(f"\n✅ Response received!")
            print("="*40)
            print(f"\n📊 Result Details:")
            print(f"  • Final Tier: {result['tier']}")
            print(f"  • Latency: {result['latency']:.2f}s")
            print(f"  • From Cache: {result.get('cached', False)}")
            print(f"  • Static Response: {result.get('static', False)}")
            if 'model' in result:
                print(f"  • Model Used: {result['model']}")
            
            print(f"\n💬 Response:")
            print("-"*40)
            # Display full response or truncated version
            content = result['content']
            if len(content) > 300:
                print(f"{content[:300]}...")
                print(f"\n[Response truncated - {len(content)} total characters]")
            else:
                print(content)
            
            # Store in history
            test_history.append({
                'timestamp': datetime.now(),
                'query': query_input.value[:30],
                'tier': result['tier'],
                'latency': result['latency'],
                'cached': result.get('cached', False)
            })
            
        except Exception as e:
            print(f"❌ Unexpected error: {e}")
            import traceback
            traceback.print_exc()
        
        finally:
            # Reset failure simulation
            degradation_service.simulate_failures = False
            degradation_service.failure_tiers = []
    
    # Update metrics display
    update_metrics()

def clear_cache(b):
    """Clear the response cache"""
    with output_area:
        clear_output()
        cache_size = len(degradation_service.cache.cache)
        degradation_service.cache.cache.clear()
        degradation_service.cache.access_times.clear()
        print(f"🗑️ Cleared {cache_size} cached responses")
    update_metrics()

def update_metrics():
    """Update metrics display"""
    with metrics_output:
        clear_output()
        
        metrics = degradation_service.get_service_metrics()
        if metrics:
            print("\n📈 Service Metrics")
            print("="*40)
            print(f"Total Requests: {metrics.get('total_requests', 0)}")
            print(f"Cache Hits: {metrics.get('cache_hits', 0)}")
            print(f"Cache Hit Rate: {metrics.get('cache_hit_rate', 0):.1%}")
            print(f"Current Cache Size: {len(degradation_service.cache.cache)}")
            
            print("\n📊 Tier Performance:")
            for tier_name, stats in metrics.get('tier_distribution', {}).items():
                if stats['attempts'] > 0:
                    print(f"\n  {tier_name}:")
                    print(f"    Attempts: {stats['attempts']}")
                    print(f"    Successes: {stats['successes']}")
                    print(f"    Failures: {stats['failures']}")
                    print(f"    Success Rate: {stats['success_rate']:.1%}")
        
        if test_history:
            print("\n📝 Recent Tests:")
            print("-"*40)
            for test in test_history[-3:]:  # Show last 3 tests
                print(f"  • {test['timestamp'].strftime('%H:%M:%S')}: "
                      f"{test['query']}... → {test['tier']} "
                      f"({'cached' if test['cached'] else test['latency']:.1f}s)")

# Connect buttons
test_button.on_click(run_degradation_test)
clear_cache_button.on_click(clear_cache)

# Create the dashboard
dashboard = widgets.VBox([
    widgets.HTML("<h3>🏰 Test Graceful Degradation</h3>"),
    widgets.HTML("<p style='color: #666;'>Enter a query and select options to test the fallback system</p>"),
    query_input,
    widgets.HBox([
        tier_selector, 
        widgets.VBox([
            widgets.HTML("<b>Simulate Failures:</b>"),
            failure_premium,
            failure_standard,
            failure_basic
        ])
    ]),
    widgets.HBox([test_button, clear_cache_button]),
    widgets.HBox([output_area, metrics_output])
])

display(dashboard)

print("\n💡 How to Test:")
print("  1. Run a query normally first (no failures)")
print("  2. Run the same query again to see cache hit")
print("  3. Enable failure checkboxes to force fallbacks")
print("  4. Try starting from different tiers")
print("  5. Watch how the system degrades gracefully!")
print("\n⚡ Pro tip: Check multiple failure boxes to see cascading fallbacks!")

🎮 Graceful Degradation Testing Dashboard


VBox(children=(HTML(value='<h3>🏰 Test Graceful Degradation</h3>'), HTML(value="<p style='color: #666;'>Enter a…


💡 How to Test:
  1. Run a query normally first (no failures)
  2. Run the same query again to see cache hit
  3. Enable failure checkboxes to force fallbacks
  4. Try starting from different tiers
  5. Watch how the system degrades gracefully!

⚡ Pro tip: Check multiple failure boxes to see cascading fallbacks!


---

## 🚦 **Module 3: Rate Limiting & Quota Management**
### Master the Art of API Traffic Control

```python
# The Rate Limit Dance 💃
while tokens_remain:
    if rate_limit_allows():
        make_request()
    else:
        graceful_wait()
```

In [6]:
# 🎯 Cell 6: Advanced Rate Limiting System
# Build a production-grade rate limiter with multiple algorithms!

from enum import Enum, auto
from collections import deque, defaultdict
import threading
from datetime import datetime, timedelta
import time
import random
import pandas as pd
import numpy as np

class RateLimitAlgorithm(Enum):
    """Rate limiting algorithms"""
    TOKEN_BUCKET = auto()
    SLIDING_WINDOW = auto()
    FIXED_WINDOW = auto()
    LEAKY_BUCKET = auto()
    ADAPTIVE = auto()  # New: Adaptive rate limiting

class TokenBucket:
    """Token bucket rate limiter with burst support"""
    
    def __init__(self, capacity: int, refill_rate: float, burst_multiplier: float = 1.5):
        self.capacity = capacity
        self.refill_rate = refill_rate  # tokens per second
        self.tokens = capacity
        self.last_refill = time.time()
        self.lock = threading.Lock()
        self.burst_capacity = capacity * burst_multiplier
        self.burst_tokens = 0
        
        # Analytics
        self.consumed_tokens = 0
        self.rejected_tokens = 0
        self.total_requests = 0
    
    def _refill(self):
        """Refill tokens based on elapsed time"""
        now = time.time()
        elapsed = now - self.last_refill
        
        new_tokens = elapsed * self.refill_rate
        self.tokens = min(self.capacity, self.tokens + new_tokens)
        
        # Slowly refill burst tokens
        if self.burst_tokens < self.burst_capacity - self.capacity:
            self.burst_tokens = min(self.burst_capacity - self.capacity, 
                                   self.burst_tokens + new_tokens * 0.1)
        
        self.last_refill = now
    
    def consume(self, tokens: int = 1) -> bool:
        """Try to consume tokens with burst support"""
        with self.lock:
            self._refill()
            self.total_requests += 1
            
            # Try regular tokens first
            if self.tokens >= tokens:
                self.tokens -= tokens
                self.consumed_tokens += tokens
                return True
            
            # Try burst tokens
            total_available = self.tokens + self.burst_tokens
            if total_available >= tokens:
                needed_from_burst = tokens - self.tokens
                self.burst_tokens -= needed_from_burst
                self.tokens = 0
                self.consumed_tokens += tokens
                return True
            
            self.rejected_tokens += tokens
            return False
    
    def wait_time(self, tokens: int = 1) -> float:
        """Calculate wait time for tokens"""
        with self.lock:
            self._refill()
            
            total_available = self.tokens + self.burst_tokens
            if total_available >= tokens:
                return 0
            
            needed = tokens - total_available
            return needed / self.refill_rate
    
    def get_stats(self) -> dict:
        """Get current statistics"""
        with self.lock:
            self._refill()
            return {
                'tokens': self.tokens,
                'burst_tokens': self.burst_tokens,
                'capacity': self.capacity,
                'burst_capacity': self.burst_capacity,
                'consumed': self.consumed_tokens,
                'rejected': self.rejected_tokens,
                'total_requests': self.total_requests,
                'acceptance_rate': (self.consumed_tokens / 
                                   (self.consumed_tokens + self.rejected_tokens) 
                                   if (self.consumed_tokens + self.rejected_tokens) > 0 else 1.0)
            }

class SlidingWindowRateLimiter:
    """Enhanced sliding window with request distribution tracking"""
    
    def __init__(self, max_requests: int, window_seconds: int):
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self.requests = deque()
        self.lock = threading.Lock()
        
        # Analytics
        self.accepted = 0
        self.rejected = 0
        self.request_distribution = defaultdict(int)
    
    def _clean_old_requests(self):
        """Remove requests outside the window"""
        cutoff = time.time() - self.window_seconds
        removed = 0
        while self.requests and self.requests[0] < cutoff:
            self.requests.popleft()
            removed += 1
        return removed
    
    def allow_request(self) -> bool:
        """Check if request is allowed"""
        with self.lock:
            self._clean_old_requests()
            
            current_second = int(time.time())
            
            if len(self.requests) < self.max_requests:
                self.requests.append(time.time())
                self.accepted += 1
                self.request_distribution[current_second] += 1
                return True
            
            self.rejected += 1
            return False
    
    def wait_time(self) -> float:
        """Calculate wait time until next slot"""
        with self.lock:
            self._clean_old_requests()
            
            if len(self.requests) < self.max_requests:
                return 0
            
            oldest = self.requests[0]
            wait = self.window_seconds - (time.time() - oldest)
            return max(0, wait)
    
    def get_stats(self) -> dict:
        """Get current statistics"""
        with self.lock:
            self._clean_old_requests()
            return {
                'current_requests': len(self.requests),
                'max_requests': self.max_requests,
                'utilization': len(self.requests) / self.max_requests * 100,
                'accepted': self.accepted,
                'rejected': self.rejected,
                'acceptance_rate': self.accepted / (self.accepted + self.rejected) 
                                  if (self.accepted + self.rejected) > 0 else 1.0
            }

class AdaptiveRateLimiter:
    """Self-adjusting rate limiter based on error rates"""
    
    def __init__(self, initial_rate: int, min_rate: int, max_rate: int):
        self.current_rate = initial_rate
        self.min_rate = min_rate
        self.max_rate = max_rate
        self.error_window = deque(maxlen=100)
        self.success_window = deque(maxlen=100)
        self.adjustment_interval = 10  # seconds
        self.last_adjustment = time.time()
        self.lock = threading.Lock()
        
        # Underlying limiter
        self.limiter = SlidingWindowRateLimiter(initial_rate, 60)
    
    def record_result(self, success: bool):
        """Record request result for adaptation"""
        with self.lock:
            if success:
                self.success_window.append(time.time())
            else:
                self.error_window.append(time.time())
            
            # Check if we should adjust
            if time.time() - self.last_adjustment > self.adjustment_interval:
                self._adjust_rate()
    
    def _adjust_rate(self):
        """Adjust rate based on error rate"""
        total = len(self.error_window) + len(self.success_window)
        if total == 0:
            return
        
        error_rate = len(self.error_window) / total
        
        # Adjust based on error rate
        if error_rate > 0.1:  # More than 10% errors
            # Decrease rate
            self.current_rate = max(self.min_rate, int(self.current_rate * 0.9))
        elif error_rate < 0.01:  # Less than 1% errors
            # Increase rate
            self.current_rate = min(self.max_rate, int(self.current_rate * 1.1))
        
        # Update underlying limiter
        self.limiter = SlidingWindowRateLimiter(self.current_rate, 60)
        self.last_adjustment = time.time()
    
    def allow_request(self) -> bool:
        """Check if request is allowed"""
        return self.limiter.allow_request()
    
    def get_stats(self) -> dict:
        """Get current statistics"""
        with self.lock:
            total = len(self.error_window) + len(self.success_window)
            error_rate = len(self.error_window) / total if total > 0 else 0
            
            return {
                'current_rate': self.current_rate,
                'min_rate': self.min_rate,
                'max_rate': self.max_rate,
                'error_rate': error_rate,
                'success_rate': 1 - error_rate,
                **self.limiter.get_stats()
            }

class SmartRateLimiter:
    """Intelligent multi-algorithm rate limiter with predictive analytics"""
    
    def __init__(self):
        # OpenAI-like rate limits with realistic values
        self.limits = {
            'gpt-4o': {
                'rpm': 500,      # Requests per minute
                'tpm': 30000,    # Tokens per minute
                'rpd': 10000     # Requests per day
            },
            'gpt-4o-mini': {
                'rpm': 500,
                'tpm': 200000,   
                'rpd': 50000
            },
            'gpt-3.5-turbo': {
                'rpm': 500,
                'tpm': 200000,
                'rpd': 100000
            }
        }
        
        # Initialize rate limiters for each model
        self.request_limiters = {}
        self.token_limiters = {}
        self.adaptive_limiters = {}
        
        for model, limits in self.limits.items():
            # Request limiter (sliding window)
            self.request_limiters[model] = SlidingWindowRateLimiter(
                max_requests=limits['rpm'],
                window_seconds=60
            )
            
            # Token limiter (token bucket with burst)
            self.token_limiters[model] = TokenBucket(
                capacity=limits['tpm'],
                refill_rate=limits['tpm'] / 60,
                burst_multiplier=1.5
            )
            
            # Adaptive limiter
            self.adaptive_limiters[model] = AdaptiveRateLimiter(
                initial_rate=limits['rpm'],
                min_rate=limits['rpm'] // 10,
                max_rate=limits['rpm'] * 2
            )
        
        # Comprehensive metrics
        self.metrics = defaultdict(lambda: defaultdict(int))
        self.request_history = deque(maxlen=10000)
        self.prediction_data = defaultdict(list)
        
        # Circuit breaker
        self.circuit_breakers = defaultdict(lambda: {'state': 'closed', 'failures': 0, 'last_failure': None})
    
    def estimate_tokens(self, text: str) -> int:
        """More accurate token estimation"""
        # Better approximation based on OpenAI's tokenizer
        words = len(text.split())
        chars = len(text)
        
        # Average of word-based and char-based estimation
        word_estimate = words * 1.3
        char_estimate = chars / 4
        
        return int((word_estimate + char_estimate) / 2)
    
    def check_limits(self, model: str, estimated_tokens: int) -> Dict[str, Any]:
        """Enhanced limit checking with circuit breaker"""
        result = {
            'allowed': False,
            'wait_time': 0,
            'reason': None,
            'suggestions': []
        }
        
        if model not in self.request_limiters:
            result['allowed'] = True
            return result
        
        # Check circuit breaker
        breaker = self.circuit_breakers[model]
        if breaker['state'] == 'open':
            if breaker['last_failure'] and time.time() - breaker['last_failure'] > 30:
                # Try to close circuit
                breaker['state'] = 'half-open'
            else:
                result['reason'] = 'circuit_breaker_open'
                result['wait_time'] = 30 - (time.time() - breaker['last_failure'])
                result['suggestions'].append("Service temporarily disabled due to errors")
                return result
        
        # Check request limit
        request_limiter = self.request_limiters[model]
        if not request_limiter.allow_request():
            result['wait_time'] = request_limiter.wait_time()
            result['reason'] = 'request_limit'
            result['suggestions'].append(f"Reduce request rate or wait {result['wait_time']:.1f}s")
            return result
        
        # Check token limit
        token_limiter = self.token_limiters[model]
        if not token_limiter.consume(estimated_tokens):
            result['wait_time'] = token_limiter.wait_time(estimated_tokens)
            result['reason'] = 'token_limit'
            result['suggestions'].append(f"Reduce prompt size or use a different model")
            # Roll back request count
            request_limiter.requests.pop()
            request_limiter.accepted -= 1
            return result
        
        # Check adaptive limit
        if not self.adaptive_limiters[model].allow_request():
            result['wait_time'] = 1.0
            result['reason'] = 'adaptive_limit'
            result['suggestions'].append("System is auto-adjusting rates based on performance")
            # Roll back other limits
            request_limiter.requests.pop()
            request_limiter.accepted -= 1
            token_limiter.tokens += estimated_tokens
            token_limiter.consumed_tokens -= estimated_tokens
            return result
        
        result['allowed'] = True
        return result
    
    def record_usage(self, model: str, tokens_used: int, latency: float, success: bool = True):
        """Record comprehensive usage metrics"""
        self.metrics[model]['requests'] += 1
        self.metrics[model]['tokens'] += tokens_used
        self.metrics[model]['total_latency'] += latency
        
        if success:
            self.metrics[model]['successes'] += 1
            self.circuit_breakers[model]['failures'] = 0
            if self.circuit_breakers[model]['state'] == 'half-open':
                self.circuit_breakers[model]['state'] = 'closed'
        else:
            self.metrics[model]['failures'] += 1
            self.circuit_breakers[model]['failures'] += 1
            self.circuit_breakers[model]['last_failure'] = time.time()
            
            if self.circuit_breakers[model]['failures'] >= 5:
                self.circuit_breakers[model]['state'] = 'open'
        
        self.adaptive_limiters[model].record_result(success)
        
        self.request_history.append({
            'timestamp': datetime.now(),
            'model': model,
            'tokens': tokens_used,
            'latency': latency,
            'success': success
        })
        
        # Update prediction data
        hour = datetime.now().hour
        self.prediction_data[model].append({
            'hour': hour,
            'tokens': tokens_used,
            'latency': latency
        })
    
    def predict_usage(self, model: str, hours_ahead: int = 1) -> dict:
        """Predict future usage based on patterns"""
        if model not in self.prediction_data or len(self.prediction_data[model]) < 10:
            return {'predicted_load': 'insufficient_data'}
        
        # Simple prediction based on recent patterns
        recent = self.prediction_data[model][-100:]
        avg_tokens = sum(r['tokens'] for r in recent) / len(recent)
        avg_latency = sum(r['latency'] for r in recent) / len(recent)
        
        current_hour = datetime.now().hour
        target_hour = (current_hour + hours_ahead) % 24
        
        # Hour-based patterns
        hour_data = [r for r in recent if r['hour'] == target_hour]
        if hour_data:
            predicted_tokens = sum(r['tokens'] for r in hour_data) / len(hour_data)
            predicted_load = 'high' if predicted_tokens > avg_tokens * 1.2 else 'normal'
        else:
            predicted_load = 'normal'
        
        return {
            'predicted_load': predicted_load,
            'predicted_tokens': predicted_tokens if hour_data else avg_tokens,
            'predicted_latency': avg_latency,
            'confidence': min(len(recent) / 100, 1.0)
        }
    
    def get_comprehensive_stats(self) -> dict:
        """Get detailed statistics for all models"""
        stats = {}
        
        for model in self.limits.keys():
            request_stats = self.request_limiters[model].get_stats()
            token_stats = self.token_limiters[model].get_stats()
            adaptive_stats = self.adaptive_limiters[model].get_stats()
            
            # Calculate overall health score
            health_score = (
                request_stats['acceptance_rate'] * 0.3 +
                token_stats['acceptance_rate'] * 0.3 +
                adaptive_stats['success_rate'] * 0.2 +
                (1 - request_stats['utilization'] / 100) * 0.2
            ) * 100
            
            stats[model] = {
                'request_limiter': request_stats,
                'token_limiter': token_stats,
                'adaptive_limiter': adaptive_stats,
                'circuit_breaker': self.circuit_breakers[model]['state'],
                'health_score': health_score,
                'total_requests': self.metrics[model]['requests'],
                'total_tokens': self.metrics[model]['tokens'],
                'avg_latency': (self.metrics[model]['total_latency'] / 
                               self.metrics[model]['requests'] 
                               if self.metrics[model]['requests'] > 0 else 0),
                'success_rate': (self.metrics[model]['successes'] / 
                                self.metrics[model]['requests'] 
                                if self.metrics[model]['requests'] > 0 else 1.0)
            }
        
        return stats

# Initialize the advanced rate limiter
rate_limiter = SmartRateLimiter()

print("🚀 Advanced Smart Rate Limiter Initialized!")
print("\n📊 Rate Limits per Model:")
for model, limits in rate_limiter.limits.items():
    print(f"\n  {model}:")
    print(f"    • {limits['rpm']:,} requests/minute")
    print(f"    • {limits['tpm']:,} tokens/minute")
    print(f"    • {limits['rpd']:,} requests/day")

print("\n🎯 Advanced Features:")
print("  • Token bucket with burst capacity")
print("  • Sliding window with distribution tracking")
print("  • Adaptive rate limiting based on performance")
print("  • Circuit breaker pattern for fault tolerance")
print("  • Predictive analytics for usage patterns")
print("  • Comprehensive health scoring")
print("  • Real-time performance monitoring")

print("\n✅ System ready for production use!")

🚀 Advanced Smart Rate Limiter Initialized!

📊 Rate Limits per Model:

  gpt-4o:
    • 500 requests/minute
    • 30,000 tokens/minute
    • 10,000 requests/day

  gpt-4o-mini:
    • 500 requests/minute
    • 200,000 tokens/minute
    • 50,000 requests/day

  gpt-3.5-turbo:
    • 500 requests/minute
    • 200,000 tokens/minute
    • 100,000 requests/day

🎯 Advanced Features:
  • Token bucket with burst capacity
  • Sliding window with distribution tracking
  • Adaptive rate limiting based on performance
  • Circuit breaker pattern for fault tolerance
  • Predictive analytics for usage patterns
  • Comprehensive health scoring
  • Real-time performance monitoring

✅ System ready for production use!


### 📊 **Live Rate Limit Monitor**
#### Real-time visualization of your API usage!

In [7]:
# 🎯 Cell 7: Ultimate Interactive Rate Limit Dashboard
# Monitor and test rate limiting with advanced visualizations!

import plotly.graph_objects as go
from plotly.subplots import make_subplots
import ipywidgets as widgets
from IPython.display import display, clear_output, HTML
import threading
import time
import random
from datetime import datetime, timedelta

class UltimateRateLimitDashboard:
    """Advanced real-time rate limit monitoring and testing dashboard"""
    
    def __init__(self, rate_limiter: SmartRateLimiter):
        self.rate_limiter = rate_limiter
        self.test_running = False
        self.test_thread = None
        self.test_history = []
        self.live_metrics = defaultdict(list)
        
        # Color schemes for beautiful visualizations
        self.colors = {
            'success': '#00d084',
            'warning': '#ff9800',
            'danger': '#ff3860',
            'info': '#3273dc',
            'dark': '#363636',
            'light': '#f5f5f5'
        }
    
    def create_live_dashboard(self):
        """Create comprehensive real-time dashboard"""
        stats = self.rate_limiter.get_comprehensive_stats()
        
        # Create subplots with different chart types
        fig = make_subplots(
            rows=3, cols=3,
            subplot_titles=(
                '🎯 Health Scores', '📊 Request Limits', '💰 Token Limits',
                '🔄 Adaptive Rates', '⚡ Circuit Breakers', '📈 Success Rates',
                '⏱️ Latency Distribution', '🔮 Usage Prediction', '🏆 Performance Leaderboard'
            ),
            specs=[
                [{'type': 'bar'}, {'type': 'indicator'}, {'type': 'indicator'}],
                [{'type': 'scatter'}, {'type': 'pie'}, {'type': 'bar'}],
                [{'type': 'box'}, {'type': 'scatter'}, {'type': 'table'}]
            ],
            vertical_spacing=0.12,
            horizontal_spacing=0.15
        )
        
        models = list(self.rate_limiter.limits.keys())
        
        # 1. Health Scores - Beautiful bar chart
        health_scores = [stats[m]['health_score'] for m in models]
        colors = [self.colors['success'] if s > 75 else 
                 self.colors['warning'] if s > 50 else 
                 self.colors['danger'] for s in health_scores]
        
        fig.add_trace(
            go.Bar(
                x=models,
                y=health_scores,
                marker_color=colors,
                text=[f'{s:.1f}%' for s in health_scores],
                textposition='outside',
                name='Health'
            ),
            row=1, col=1
        )
        
        # 2. Request Limits - Gauge
        model = models[0]
        request_util = stats[model]['request_limiter']['utilization']
        fig.add_trace(
            go.Indicator(
                mode="gauge+number+delta",
                value=request_util,
                title={'text': f"{model} Requests"},
                delta={'reference': 50},
                gauge={
                    'axis': {'range': [0, 100]},
                    'bar': {'color': self._get_gauge_color(request_util)},
                    'steps': [
                        {'range': [0, 50], 'color': self.colors['light']},
                        {'range': [50, 80], 'color': '#ffeb3b'},
                        {'range': [80, 100], 'color': '#ffcdd2'}
                    ],
                    'threshold': {
                        'line': {'color': "red", 'width': 4},
                        'thickness': 0.75,
                        'value': 90
                    }
                }
            ),
            row=1, col=2
        )
        
        # 3. Token Limits - Gauge with animation
        token_util = (1 - stats[model]['token_limiter']['tokens'] / 
                     stats[model]['token_limiter']['capacity']) * 100
        fig.add_trace(
            go.Indicator(
                mode="gauge+number",
                value=token_util,
                title={'text': f"{model} Tokens"},
                number={'suffix': "% used"},
                gauge={
                    'axis': {'range': [0, 100]},
                    'bar': {'color': self._get_gauge_color(token_util)},
                    'bgcolor': "white",
                    'borderwidth': 2,
                    'bordercolor': "gray",
                    'steps': [
                        {'range': [0, 50], 'color': 'lightgray'},
                        {'range': [50, 80], 'color': 'gray'}
                    ],
                    'threshold': {
                        'line': {'color': "red", 'width': 4},
                        'thickness': 0.75,
                        'value': 95
                    }
                }
            ),
            row=1, col=3
        )
        
        # 4. Adaptive Rate Changes Over Time
        if self.live_metrics['adaptive_rates']:
            times = list(range(len(self.live_metrics['adaptive_rates'])))
            for m in models:
                rates = [r.get(m, 0) for r in self.live_metrics['adaptive_rates']]
                fig.add_trace(
                    go.Scatter(
                        x=times[-50:],  # Last 50 points
                        y=rates[-50:],
                        mode='lines+markers',
                        name=m,
                        line=dict(width=2)
                    ),
                    row=2, col=1
                )
        
        # 5. Circuit Breaker States - Pie chart
        breaker_states = {'Open': 0, 'Closed': 0, 'Half-Open': 0}
        for m in models:
            state = stats[m]['circuit_breaker']
            breaker_states[state.title()] = breaker_states.get(state.title(), 0) + 1
        
        fig.add_trace(
            go.Pie(
                labels=list(breaker_states.keys()),
                values=list(breaker_states.values()),
                hole=.4,
                marker_colors=[self.colors['danger'], self.colors['success'], self.colors['warning']],
                textinfo='label+percent'
            ),
            row=2, col=2
        )
        
        # 6. Success Rates Comparison
        success_rates = [stats[m]['success_rate'] * 100 for m in models]
        fig.add_trace(
            go.Bar(
                x=models,
                y=success_rates,
                marker_color=[self.colors['success'] if r > 95 else 
                             self.colors['warning'] if r > 90 else 
                             self.colors['danger'] for r in success_rates],
                text=[f'{r:.1f}%' for r in success_rates],
                textposition='outside',
                name='Success Rate'
            ),
            row=2, col=3
        )
        
        # 7. Latency Distribution
        if self.rate_limiter.request_history:
            recent_requests = list(self.rate_limiter.request_history)[-100:]
            for m in models:
                model_latencies = [r['latency'] for r in recent_requests if r['model'] == m]
                if model_latencies:
                    fig.add_trace(
                        go.Box(
                            y=model_latencies,
                            name=m,
                            boxmean='sd',
                            marker_color=self.colors['info']
                        ),
                        row=3, col=1
                    )
        
        # 8. Usage Prediction
        for i, m in enumerate(models):
            prediction = self.rate_limiter.predict_usage(m, hours_ahead=1)
            if prediction['predicted_load'] != 'insufficient_data':
                x = list(range(24))
                y = [random.uniform(100, 500) for _ in range(24)]  # Simulated hourly pattern
                
                fig.add_trace(
                    go.Scatter(
                        x=x,
                        y=y,
                        mode='lines',
                        name=m,
                        fill='tozeroy',
                        line=dict(width=2)
                    ),
                    row=3, col=2
                )
        
        # 9. Performance Leaderboard
        leaderboard_data = []
        for m in models:
            leaderboard_data.append([
                m,
                f"{stats[m]['health_score']:.1f}%",
                f"{stats[m]['success_rate']*100:.1f}%",
                f"{stats[m]['avg_latency']:.2f}s",
                f"{stats[m]['total_requests']:,}"
            ])
        
        fig.add_trace(
            go.Table(
                header=dict(
                    values=['Model', 'Health', 'Success', 'Latency', 'Requests'],
                    fill_color=self.colors['dark'],
                    font_color='white',
                    align='center'
                ),
                cells=dict(
                    values=list(zip(*leaderboard_data)),
                    fill_color='lavender',
                    align='center'
                )
            ),
            row=3, col=3
        )
        
        # Update layout for beautiful appearance
        fig.update_layout(
            height=900,
            showlegend=False,
            title_text="<b>🚀 Ultimate Rate Limit Monitor</b>",
            title_font_size=24,
            title_x=0.5,
            title_xanchor='center',
            paper_bgcolor=self.colors['light'],
            plot_bgcolor='white',
            font=dict(size=11)
        )
        
        # Update axes
        fig.update_xaxes(showgrid=True, gridwidth=1, gridcolor='LightGray')
        fig.update_yaxes(showgrid=True, gridwidth=1, gridcolor='LightGray')
        
        return fig
    
    def _get_gauge_color(self, value):
        """Get color based on value"""
        if value < 50:
            return self.colors['success']
        elif value < 80:
            return self.colors['warning']
        else:
            return self.colors['danger']
    
    def run_stress_test(self, model: str, scenario: str, duration: int):
        """Run different stress test scenarios"""
        
        scenarios = {
            'steady': {'rate': 10, 'pattern': 'constant', 'tokens': (50, 100)},
            'burst': {'rate': 50, 'pattern': 'burst', 'tokens': (100, 500)},
            'wave': {'rate': 20, 'pattern': 'wave', 'tokens': (50, 200)},
            'chaos': {'rate': 30, 'pattern': 'random', 'tokens': (10, 1000)},
            'ddos': {'rate': 100, 'pattern': 'attack', 'tokens': (10, 50)}
        }
        
        config = scenarios.get(scenario, scenarios['steady'])
        
        print(f"\n🔥 Starting {scenario.upper()} stress test on {model}")
        print(f"   Pattern: {config['pattern']}")
        print(f"   Base Rate: {config['rate']} req/s")
        print(f"   Duration: {duration}s")
        print("-" * 50)
        
        start_time = time.time()
        results = {
            'accepted': 0,
            'rejected': 0,
            'by_reason': defaultdict(int)
        }
        
        self.test_running = True
        last_print_time = time.time()
        
        # Progress bar for better visibility
        progress_interval = duration / 20  # Update 20 times during test
        next_progress = progress_interval
        
        while time.time() - start_time < duration and self.test_running:
            # Calculate current rate based on pattern
            elapsed = time.time() - start_time
            
            if config['pattern'] == 'burst':
                # Burst every 5 seconds
                current_rate = config['rate'] * 3 if int(elapsed) % 5 == 0 else config['rate'] / 2
            elif config['pattern'] == 'wave':
                # Sine wave pattern
                import math
                current_rate = config['rate'] * (1 + 0.5 * math.sin(elapsed))
            elif config['pattern'] == 'random':
                current_rate = random.uniform(1, config['rate'] * 2)
            else:
                current_rate = config['rate']
            
            # Generate request
            tokens = random.randint(*config['tokens'])
            check = self.rate_limiter.check_limits(model, tokens)
            
            if check['allowed']:
                results['accepted'] += 1
                # Simulate processing
                latency = random.uniform(0.1, 1.0)
                success = random.random() > 0.05  # 95% success rate
                self.rate_limiter.record_usage(model, tokens, latency, success)
            else:
                results['rejected'] += 1
                results['by_reason'][check['reason']] += 1
            
            # Update live metrics
            stats = self.rate_limiter.get_comprehensive_stats()
            self.live_metrics['adaptive_rates'].append({
                m: stats[m]['adaptive_limiter']['current_rate'] for m in stats
            })
            
            # Print progress updates (better for Jupyter)
            if elapsed >= next_progress or time.time() - last_print_time > 1:
                progress = (elapsed / duration) * 100
                total = results['accepted'] + results['rejected']
                accept_rate = (results['accepted'] / total * 100) if total > 0 else 0
                
                # Create progress bar
                bar_length = 30
                filled = int(bar_length * progress / 100)
                bar = '█' * filled + '░' * (bar_length - filled)
                
                print(f"Progress: {bar} {progress:.0f}% | "
                      f"✅ {results['accepted']} | ❌ {results['rejected']} | "
                      f"Rate: {accept_rate:.1f}% accepted")
                
                next_progress += progress_interval
                last_print_time = time.time()
            
            # Sleep to control rate
            sleep_time = 1 / current_rate
            time.sleep(max(0.01, sleep_time))
        
        self.test_running = False
        
        # Print results
        total = results['accepted'] + results['rejected']
        print(f"\n{'='*50}")
        print(f"📊 STRESS TEST COMPLETE - {scenario.upper()}")
        print(f"{'='*50}")
        print(f"Total Requests: {total:,}")
        print(f"Accepted: {results['accepted']:,} ({results['accepted']/total*100:.1f}%)")
        print(f"Rejected: {results['rejected']:,} ({results['rejected']/total*100:.1f}%)")
        
        if results['by_reason']:
            print(f"\nRejection Breakdown:")
            for reason, count in results['by_reason'].items():
                percentage = count/results['rejected']*100 if results['rejected'] > 0 else 0
                print(f"  • {reason}: {count:,} ({percentage:.1f}%)")
        
        # Show final stats
        final_stats = self.rate_limiter.get_comprehensive_stats()[model]
        print(f"\nFinal Model Stats:")
        print(f"  • Health Score: {final_stats['health_score']:.1f}%")
        print(f"  • Circuit Breaker: {final_stats['circuit_breaker']}")
        print(f"  • Adaptive Rate: {final_stats['adaptive_limiter']['current_rate']}")
        print(f"  • Token Utilization: {final_stats['token_limiter']['tokens']}/{final_stats['token_limiter']['capacity']}")
        
        return results

# Create the ultimate dashboard
dashboard = UltimateRateLimitDashboard(rate_limiter)

# Create interactive controls
print("🎮 Ultimate Rate Limit Control Center")
print("="*50)

# Model selector
model_selector = widgets.Dropdown(
    options=['gpt-4o', 'gpt-4o-mini', 'gpt-3.5-turbo'],
    value='gpt-4o-mini',
    description='Model:',
    style={'description_width': 'initial'}
)

# Scenario selector with descriptions
scenario_selector = widgets.RadioButtons(
    options=[
        ('💚 Steady - Constant load', 'steady'),
        ('💥 Burst - Sudden spikes', 'burst'),
        ('🌊 Wave - Oscillating pattern', 'wave'),
        ('🎲 Chaos - Random mayhem', 'chaos'),
        ('☠️ DDoS - Attack simulation', 'ddos')
    ],
    value='steady',
    description='Scenario:',
    style={'description_width': 'initial'}
)

# Duration slider
duration_slider = widgets.IntSlider(
    value=15,
    min=5,
    max=60,
    step=5,
    description='Duration (s):',
    style={'description_width': 'initial'}
)

# Control buttons
start_button = widgets.Button(
    description='🚀 Start Test',
    button_style='success',
    layout=widgets.Layout(width='150px')
)

stop_button = widgets.Button(
    description='🛑 Stop Test',
    button_style='danger',
    layout=widgets.Layout(width='150px')
)

refresh_button = widgets.Button(
    description='🔄 Refresh Dashboard',
    button_style='info',
    layout=widgets.Layout(width='150px')
)

reset_button = widgets.Button(
    description='🗑️ Reset All',
    button_style='warning',
    layout=widgets.Layout(width='150px')
)

# Output areas
test_output = widgets.Output()
dashboard_output = widgets.Output()

# Button handlers
def start_test(b):
    with test_output:
        clear_output()
        if not dashboard.test_running:
            # Run synchronously for better visibility
            dashboard.run_stress_test(
                model_selector.value, 
                scenario_selector.value, 
                duration_slider.value
            )

def stop_test(b):
    dashboard.test_running = False
    with test_output:
        print("\n⏹️ Stopping test...")
        time.sleep(0.5)
        print("Test stopped by user")

def refresh_dashboard(b):
    with dashboard_output:
        clear_output()
        fig = dashboard.create_live_dashboard()
        fig.show()

def reset_all(b):
    with test_output:
        clear_output()
        # Reset metrics
        rate_limiter.metrics.clear()
        rate_limiter.request_history.clear()
        dashboard.live_metrics.clear()
        print("♻️ All metrics reset!")
    refresh_dashboard(b)

# Connect handlers
start_button.on_click(start_test)
stop_button.on_click(stop_test)
refresh_button.on_click(refresh_dashboard)
reset_button.on_click(reset_all)

# Quick Test Function for immediate feedback
def quick_test():
    """Run a quick test to show the system working"""
    print("🚀 Running Quick Rate Limit Test...")
    print("-" * 50)
    
    model = 'gpt-4o-mini'
    test_requests = 20
    
    for i in range(test_requests):
        tokens = random.randint(100, 500)
        check = rate_limiter.check_limits(model, tokens)
        
        if check['allowed']:
            print(f"✅ Request {i+1}: ALLOWED ({tokens} tokens)")
            rate_limiter.record_usage(model, tokens, random.uniform(0.1, 0.5), True)
        else:
            print(f"❌ Request {i+1}: BLOCKED - {check['reason']} (wait {check['wait_time']:.1f}s)")
        
        time.sleep(0.1)  # Small delay between requests
    
    # Show stats
    stats = rate_limiter.get_comprehensive_stats()[model]
    print("\n📊 Quick Test Results:")
    print(f"  Health Score: {stats['health_score']:.1f}%")
    print(f"  Token Usage: {stats['token_limiter']['tokens']:.0f}/{stats['token_limiter']['capacity']}")
    print(f"  Request Count: {stats['request_limiter']['current_requests']}/{stats['request_limiter']['max_requests']}")

# Add quick test button
quick_test_button = widgets.Button(
    description='⚡ Quick Test',
    button_style='primary',
    layout=widgets.Layout(width='150px')
)

def run_quick_test(b):
    with test_output:
        clear_output()
        quick_test()

quick_test_button.on_click(run_quick_test)

# Create enhanced layout
control_panel = widgets.VBox([
    widgets.HTML("<h2>🎛️ Control Panel</h2>"),
    widgets.HBox([
        widgets.VBox([model_selector, duration_slider]),
        scenario_selector
    ]),
    widgets.HBox([start_button, stop_button, refresh_button, reset_button, quick_test_button]),
    test_output
])

# Display everything
display(control_panel)
display(dashboard_output)

# Show initial dashboard
with dashboard_output:
    fig = dashboard.create_live_dashboard()
    fig.show()

print("\n💡 Pro Tips:")
print("  🎯 Start with 'Steady' to baseline performance")
print("  💥 Try 'Burst' to see adaptive rate limiting in action")
print("  🌊 'Wave' pattern shows how the system handles oscillating load")
print("  🎲 'Chaos' tests unpredictable patterns")
print("  ☠️ 'DDoS' triggers circuit breakers and protection mechanisms")
print("\n🔄 Refresh the dashboard during tests to see live updates!")
print("📈 Watch how different algorithms work together to maintain stability!")

🎮 Ultimate Rate Limit Control Center


VBox(children=(HTML(value='<h2>🎛️ Control Panel</h2>'), HBox(children=(VBox(children=(Dropdown(description='Mo…

Output()


💡 Pro Tips:
  🎯 Start with 'Steady' to baseline performance
  💥 Try 'Burst' to see adaptive rate limiting in action
  🌊 'Wave' pattern shows how the system handles oscillating load
  🎲 'Chaos' tests unpredictable patterns
  ☠️ 'DDoS' triggers circuit breakers and protection mechanisms

🔄 Refresh the dashboard during tests to see live updates!
📈 Watch how different algorithms work together to maintain stability!


---

## 🔌 **Module 4: Circuit Breakers & Self-Protection**
### Build Systems That Protect Themselves!

```
Circuit Breaker States:
┌──────┐  Success   ┌────────┐  Timeout  ┌──────┐
│CLOSED│ ────────► │HALF-OPEN│ ◄──────── │ OPEN │
└──────┘  Failure   └────────┘  Success  └──────┘
     │                                        ▲
     └────────────Threshold Exceeded──────────┘
```

In [9]:
# 🎯 Cell 9: Interactive Circuit Breaker Live Demo & Visualization
# Experience circuit breakers in action with real-time visual feedback!

import plotly.graph_objects as go
from plotly.subplots import make_subplots
import ipywidgets as widgets
from IPython.display import display, clear_output, HTML
import threading
import time
import random
from datetime import datetime, timedelta
import queue

class CircuitBreakerLiveDemo:
    """Interactive circuit breaker demonstration with live visualization"""
    
    def __init__(self, resilient_system):
        self.system = resilient_system
        self.demo_running = False
        self.demo_thread = None
        self.event_queue = queue.Queue(maxsize=1000)
        self.visualization_thread = None
        self.visualization_running = False
        
        # Live data storage
        self.live_data = {
            'timestamps': deque(maxlen=100),
            'states': {model: deque(maxlen=100) for model in self.system.breakers},
            'success_rates': {model: deque(maxlen=100) for model in self.system.breakers},
            'latencies': {model: deque(maxlen=100) for model in self.system.breakers},
            'events': deque(maxlen=50)
        }
        
        # State colors for visualization
        self.state_colors = {
            'closed': '#00d084',      # Green
            'open': '#ff3860',        # Red
            'half_open': '#ffdd57',   # Yellow
            'degraded': '#ff9800',    # Orange
            'forced_open': '#9c27b0'  # Purple
        }
        
        # Demo scenarios
        self.scenarios = {
            'normal': {
                'name': '😊 Normal Operation',
                'description': 'Everything works perfectly',
                'failure_rate': 0.0,
                'latency_range': (0.5, 1.5),
                'pattern': 'steady'
            },
            'degrading': {
                'name': '📉 Gradual Degradation',
                'description': 'Service slowly degrades over time',
                'failure_rate': 0.1,
                'latency_range': (1.0, 3.0),
                'pattern': 'increasing'
            },
            'cascade': {
                'name': '💥 Cascade Failure',
                'description': 'One failure triggers others',
                'failure_rate': 0.3,
                'latency_range': (2.0, 5.0),
                'pattern': 'cascade'
            },
            'recovery': {
                'name': '🔄 Failure & Recovery',
                'description': 'Service fails then recovers',
                'failure_rate': 0.5,
                'latency_range': (1.0, 10.0),
                'pattern': 'wave'
            },
            'chaos': {
                'name': '🎲 Chaos Monkey',
                'description': 'Random failures everywhere!',
                'failure_rate': 0.7,
                'latency_range': (0.1, 15.0),
                'pattern': 'random'
            }
        }
    
    def create_live_visualization(self):
        """Create comprehensive live visualization dashboard"""
        
        # Create figure with subplots
        fig = make_subplots(
            rows=3, cols=3,
            subplot_titles=(
                '🎭 Circuit States', '📈 Success Rates', '⏱️ Latency Trends',
                '🎯 Current Status', '📊 Failure Breakdown', '🔄 State Transitions',
                '📝 Live Event Log', '💪 System Health', '🏆 Model Performance'
            ),
            specs=[
                [{'type': 'scatter'}, {'type': 'scatter'}, {'type': 'scatter'}],
                [{'type': 'indicator'}, {'type': 'pie'}, {'type': 'bar'}],
                [{'type': 'table', 'rowspan': 1}, {'type': 'indicator'}, {'type': 'bar'}]
            ],
            vertical_spacing=0.12,
            horizontal_spacing=0.12,
            row_heights=[0.35, 0.35, 0.3]
        )
        
        models = list(self.system.breakers.keys())
        current_time = datetime.now()
        
        # 1. Circuit States Timeline (Visual state representation)
        for model in models:
            if self.live_data['timestamps']:
                states = self.live_data['states'].get(model, [])
                if states:
                    # Convert states to numeric values for plotting
                    state_values = []
                    colors = []
                    for state in states:
                        if state == 'closed':
                            state_values.append(3)
                            colors.append(self.state_colors['closed'])
                        elif state == 'degraded':
                            state_values.append(2)
                            colors.append(self.state_colors['degraded'])
                        elif state == 'half_open':
                            state_values.append(1)
                            colors.append(self.state_colors['half_open'])
                        else:  # open or forced_open
                            state_values.append(0)
                            colors.append(self.state_colors['open'])
                    
                    fig.add_trace(
                        go.Scatter(
                            x=list(self.live_data['timestamps']),
                            y=state_values,
                            mode='lines+markers',
                            name=model,
                            line=dict(width=3),
                            marker=dict(size=8, color=colors[-1] if colors else 'gray'),
                            hovertemplate='%{y}<extra></extra>'
                        ),
                        row=1, col=1
                    )
        
        # 2. Success Rates Over Time
        for model in models:
            if self.live_data['timestamps'] and model in self.live_data['success_rates']:
                rates = list(self.live_data['success_rates'][model])
                if rates:
                    fig.add_trace(
                        go.Scatter(
                            x=list(self.live_data['timestamps']),
                            y=[r * 100 for r in rates],
                            mode='lines',
                            name=model,
                            line=dict(width=2),
                            fill='tozeroy',
                            opacity=0.7
                        ),
                        row=1, col=2
                    )
        
        # 3. Latency Trends
        for model in models:
            if self.live_data['timestamps'] and model in self.live_data['latencies']:
                latencies = list(self.live_data['latencies'][model])
                if latencies:
                    fig.add_trace(
                        go.Scatter(
                            x=list(self.live_data['timestamps']),
                            y=latencies,
                            mode='lines+markers',
                            name=model,
                            line=dict(width=2)
                        ),
                        row=1, col=3
                    )
        
        # 4. Current Status Indicator (for primary model)
        primary_model = models[0]
        breaker = self.system.breakers[primary_model]
        status = breaker.get_detailed_status()
        
        # Create a compound indicator showing multiple metrics
        fig.add_trace(
            go.Indicator(
                mode="number+gauge+delta",
                value=status['metrics']['total_calls'],
                title={'text': f"{primary_model}<br>Total Calls"},
                delta={'reference': 100, 'position': "top"},
                gauge={
                    'shape': "bullet",
                    'axis': {'range': [None, 1000]},
                    'bar': {'color': self.state_colors.get(status['state'], 'gray')},
                    'steps': [
                        {'range': [0, 250], 'color': "lightgray"},
                        {'range': [250, 750], 'color': "gray"}
                    ],
                    'threshold': {
                        'line': {'color': "red", 'width': 2},
                        'thickness': 0.75,
                        'value': 900
                    }
                }
            ),
            row=2, col=1
        )
        
        # 5. Failure Breakdown Pie Chart
        failure_data = {}
        for model in models:
            breaker = self.system.breakers[model]
            for failure_type, count in breaker.metrics.failures_by_type.items():
                failure_data[failure_type.name] = failure_data.get(failure_type.name, 0) + count
        
        if failure_data:
            fig.add_trace(
                go.Pie(
                    labels=list(failure_data.keys()),
                    values=list(failure_data.values()),
                    hole=0.4,
                    marker=dict(colors=['#ff6b6b', '#4ecdc4', '#45b7d1', '#fdcb6e', '#6c5ce7']),
                    textinfo='label+percent'
                ),
                row=2, col=2
            )
        
        # 6. State Transition History
        transition_data = {'From': [], 'To': [], 'Count': []}
        for model in models:
            breaker = self.system.breakers[model]
            transitions = {}
            for i in range(1, len(breaker.state_history)):
                from_state = breaker.state_history[i-1]['from']
                to_state = breaker.state_history[i]['to']
                key = f"{from_state}→{to_state}"
                transitions[key] = transitions.get(key, 0) + 1
            
            for key, count in transitions.items():
                states = key.split('→')
                if len(states) == 2:
                    transition_data['From'].append(states[0])
                    transition_data['To'].append(states[1])
                    transition_data['Count'].append(count)
        
        if transition_data['Count']:
            fig.add_trace(
                go.Bar(
                    x=[f"{f}→{t}" for f, t in zip(transition_data['From'], transition_data['To'])],
                    y=transition_data['Count'],
                    marker_color='#3498db'
                ),
                row=2, col=3
            )
        
        # 7. Live Event Log
        events_data = []
        for event in list(self.live_data['events'])[-10:]:  # Last 10 events
            events_data.append([
                event.get('time', '').strftime('%H:%M:%S') if isinstance(event.get('time'), datetime) else '',
                event.get('model', ''),
                event.get('event', ''),
                event.get('details', '')
            ])
        
        fig.add_trace(
            go.Table(
                header=dict(
                    values=['Time', 'Model', 'Event', 'Details'],
                    fill_color='#34495e',
                    font_color='white',
                    align='left',
                    height=25
                ),
                cells=dict(
                    values=list(zip(*events_data)) if events_data else [[], [], [], []],
                    fill_color='#ecf0f1',
                    align='left',
                    height=20,
                    font_size=10
                )
            ),
            row=3, col=1
        )
        
        # 8. System Health Score
        health_scores = []
        for model in models:
            breaker = self.system.breakers[model]
            metrics = breaker.metrics
            health = (metrics.success_rate * 0.5 + 
                     (1 - metrics.slow_call_rate) * 0.3 +
                     (1 - len(breaker.sliding_window) / breaker.config.sliding_window_size) * 0.2)
            health_scores.append(health * 100)
        
        avg_health = sum(health_scores) / len(health_scores) if health_scores else 0
        
        fig.add_trace(
            go.Indicator(
                mode="gauge+number",
                value=avg_health,
                title={'text': "System Health"},
                gauge={
                    'axis': {'range': [0, 100]},
                    'bar': {'color': self._get_health_color(avg_health)},
                    'steps': [
                        {'range': [0, 50], 'color': "#ffebee"},
                        {'range': [50, 80], 'color': "#fff3e0"},
                        {'range': [80, 100], 'color': "#e8f5e9"}
                    ],
                    'threshold': {
                        'line': {'color': "red", 'width': 4},
                        'thickness': 0.75,
                        'value': 30
                    }
                }
            ),
            row=3, col=2
        )
        
        # 9. Model Performance Comparison
        perf_data = []
        for model in models:
            breaker = self.system.breakers[model]
            perf_data.append({
                'model': model,
                'success': breaker.metrics.success_rate * 100,
                'speed': 100 - min(breaker.metrics.average_duration * 10, 100)  # Convert to speed score
            })
        
        if perf_data:
            fig.add_trace(
                go.Bar(
                    x=[d['model'] for d in perf_data],
                    y=[d['success'] for d in perf_data],
                    name='Success Rate',
                    marker_color='#2ecc71'
                ),
                row=3, col=3
            )
            fig.add_trace(
                go.Bar(
                    x=[d['model'] for d in perf_data],
                    y=[d['speed'] for d in perf_data],
                    name='Speed Score',
                    marker_color='#3498db'
                ),
                row=3, col=3
            )
        
        # Update layout
        fig.update_layout(
            height=900,
            showlegend=True,
            title_text="<b>🚀 Circuit Breaker Live Monitor</b>",
            title_font_size=24,
            title_x=0.5,
            title_xanchor='center',
            plot_bgcolor='white',
            paper_bgcolor='#f8f9fa',
            font=dict(size=10),
            margin=dict(l=50, r=50, t=80, b=50)
        )
        
        # Update axes
        fig.update_xaxes(showgrid=True, gridwidth=0.5, gridcolor='#e0e0e0')
        fig.update_yaxes(showgrid=True, gridwidth=0.5, gridcolor='#e0e0e0')
        
        # Custom y-axis for state chart
        fig.update_yaxes(
            ticktext=['Open', 'Half-Open', 'Degraded', 'Closed'],
            tickvals=[0, 1, 2, 3],
            row=1, col=1
        )
        
        return fig
    
    def _get_health_color(self, health):
        """Get color based on health score"""
        if health > 80:
            return '#2ecc71'
        elif health > 60:
            return '#f39c12'
        elif health > 40:
            return '#e67e22'
        else:
            return '#e74c3c'
    
    def simulate_scenario(self, scenario_key, duration=30):
        """Run a demo scenario"""
        scenario = self.scenarios[scenario_key]
        self.demo_running = True
        
        print(f"\n🎬 Starting Demo: {scenario['name']}")
        print(f"   {scenario['description']}")
        print(f"   Duration: {duration} seconds")
        print("="*50)
        
        start_time = time.time()
        request_count = 0
        
        # Add initial event
        self.add_event('all', 'Demo Started', f"Running {scenario['name']}")
        
        while time.time() - start_time < duration and self.demo_running:
            elapsed = time.time() - start_time
            progress = elapsed / duration
            
            # Select model based on scenario pattern
            if scenario['pattern'] == 'cascade':
                # Start with one model, then cascade to others
                if progress < 0.3:
                    model = 'gpt-4o'
                elif progress < 0.6:
                    model = random.choice(['gpt-4o', 'gpt-4o-mini'])
                else:
                    model = random.choice(list(self.system.breakers.keys()))
            else:
                model = random.choice(list(self.system.breakers.keys()))
            
            # Adjust failure rate based on pattern
            if scenario['pattern'] == 'increasing':
                current_failure_rate = scenario['failure_rate'] * progress
            elif scenario['pattern'] == 'wave':
                import math
                current_failure_rate = scenario['failure_rate'] * (0.5 + 0.5 * math.sin(elapsed))
            else:
                current_failure_rate = scenario['failure_rate']
            
            # Simulate request
            request_count += 1
            should_fail = random.random() < current_failure_rate
            latency = random.uniform(*scenario['latency_range'])
            
            if should_fail:
                latency = latency * 2  # Failed requests take longer
            
            # Simulate the call through the circuit breaker
            breaker = self.system.breakers[model]
            
            try:
                if should_fail:
                    # Inject a failure
                    def failing_call():
                        time.sleep(latency)
                        raise Exception("Simulated failure")
                    
                    breaker.call(failing_call)
                else:
                    # Successful call
                    def successful_call():
                        time.sleep(latency)
                        return "Success"
                    
                    breaker.call(successful_call)
                    
                # Record success
                self.add_event(model, 'Success', f"Latency: {latency:.2f}s")
                
            except Exception as e:
                # Record failure
                failure_type = breaker._classify_failure(e)
                self.add_event(model, 'Failed', f"{failure_type.name}")
            
            # Update live data
            self.update_live_data()
            
            # Print status
            if request_count % 10 == 0:
                print(f"📊 Requests: {request_count} | Time: {elapsed:.1f}s", end='\r')
            
            # Control request rate
            time.sleep(random.uniform(0.1, 0.5))
        
        self.demo_running = False
        
        print(f"\n\n✅ Demo Complete!")
        print(f"   Total Requests: {request_count}")
        print("="*50)
        
        # Final status
        self.print_final_status()
    
    def add_event(self, model, event_type, details):
        """Add event to the event log"""
        event = {
            'time': datetime.now(),
            'model': model,
            'event': event_type,
            'details': details
        }
        self.live_data['events'].append(event)
        
        # Also add to queue for processing
        try:
            self.event_queue.put_nowait(event)
        except:
            pass
    
    def update_live_data(self):
        """Update live data for visualization"""
        timestamp = datetime.now()
        self.live_data['timestamps'].append(timestamp)
        
        for model, breaker in self.system.breakers.items():
            # State
            self.live_data['states'][model].append(breaker.state.value)
            
            # Success rate
            self.live_data['success_rates'][model].append(breaker.metrics.success_rate)
            
            # Average latency
            self.live_data['latencies'][model].append(breaker.metrics.average_duration)
    
    def print_final_status(self):
        """Print detailed final status"""
        print("\n📊 Final Circuit Breaker Status:")
        print("-"*50)
        
        for model, breaker in self.system.breakers.items():
            status = breaker.get_detailed_status()
            print(f"\n{model}:")
            print(f"  State: {status['state'].upper()}")
            print(f"  Total Calls: {status['metrics']['total_calls']}")
            print(f"  Success Rate: {status['metrics']['success_rate']}")
            print(f"  Avg Duration: {status['metrics']['avg_duration']}")
            
            if status['failures_by_type']:
                print(f"  Failures:")
                for failure_type, count in status['failures_by_type'].items():
                    print(f"    - {failure_type}: {count}")

# Create the demo system
demo = CircuitBreakerLiveDemo(resilient_system)

# Create interactive controls
print("🎮 Circuit Breaker Interactive Demo")
print("="*50)

# Scenario selector
scenario_cards = []
for key, scenario in demo.scenarios.items():
    card_html = f"""
    <div style='border: 2px solid #ddd; padding: 10px; margin: 5px; border-radius: 8px;'>
        <h4>{scenario['name']}</h4>
        <p style='color: #666;'>{scenario['description']}</p>
        <small>Failure Rate: {scenario['failure_rate']*100:.0f}%</small>
    </div>
    """
    scenario_cards.append((card_html, key))

scenario_selector = widgets.RadioButtons(
    options=[(s[1], s[1]) for s in scenario_cards],
    value='normal',
    description='',
    layout=widgets.Layout(width='600px')
)

# Duration slider
duration_slider = widgets.IntSlider(
    value=20,
    min=10,
    max=60,
    step=5,
    description='Duration:',
    style={'description_width': 'initial'}
)

# Control buttons
start_demo_button = widgets.Button(
    description='🎬 Start Demo',
    button_style='success',
    layout=widgets.Layout(width='150px')
)

stop_demo_button = widgets.Button(
    description='⏹️ Stop Demo',
    button_style='danger',
    layout=widgets.Layout(width='150px')
)

refresh_viz_button = widgets.Button(
    description='🔄 Refresh Viz',
    button_style='info',
    layout=widgets.Layout(width='150px')
)

reset_system_button = widgets.Button(
    description='♻️ Reset System',
    button_style='warning',
    layout=widgets.Layout(width='150px')
)

# Manual controls
manual_controls = widgets.HBox([
    widgets.Dropdown(
        options=list(resilient_system.breakers.keys()),
        description='Model:',
        value='gpt-4o'
    ),
    widgets.Button(description='🔴 Force Open', button_style='danger', layout=widgets.Layout(width='120px')),
    widgets.Button(description='🟢 Force Close', button_style='success', layout=widgets.Layout(width='120px')),
    widgets.Button(description='🏥 Health Check', button_style='primary', layout=widgets.Layout(width='120px'))
])

# Output areas
demo_output = widgets.Output()
viz_output = widgets.Output()

# Button handlers
def start_demo(b):
    if not demo.demo_running:
        with demo_output:
            clear_output()
            demo.demo_thread = threading.Thread(
                target=demo.simulate_scenario,
                args=(scenario_selector.value, duration_slider.value)
            )
            demo.demo_thread.start()

def stop_demo(b):
    demo.demo_running = False
    with demo_output:
        print("\n⏹️ Demo stopped by user")

def refresh_viz(b):
    with viz_output:
        clear_output()
        fig = demo.create_live_visualization()
        fig.show()

def reset_system(b):
    with demo_output:
        clear_output()
        for breaker in resilient_system.breakers.values():
            breaker.reset()
        demo.live_data = {
            'timestamps': deque(maxlen=100),
            'states': {model: deque(maxlen=100) for model in resilient_system.breakers},
            'success_rates': {model: deque(maxlen=100) for model in resilient_system.breakers},
            'latencies': {model: deque(maxlen=100) for model in resilient_system.breakers},
            'events': deque(maxlen=50)
        }
        print("♻️ System reset complete!")
    refresh_viz(b)

def force_open(b):
    model = manual_controls.children[0].value
    resilient_system.breakers[model].force_open("Manual test")
    demo.add_event(model, 'Forced Open', 'Manual intervention')
    demo.update_live_data()
    refresh_viz(b)

def force_close(b):
    model = manual_controls.children[0].value
    resilient_system.breakers[model].force_close()
    demo.add_event(model, 'Forced Close', 'Manual intervention')
    demo.update_live_data()
    refresh_viz(b)

def health_check(b):
    with demo_output:
        clear_output()
        resilient_system.run_health_check()

# Connect handlers
start_demo_button.on_click(start_demo)
stop_demo_button.on_click(stop_demo)
refresh_viz_button.on_click(refresh_viz)
reset_system_button.on_click(reset_system)
manual_controls.children[1].on_click(force_open)
manual_controls.children[2].on_click(force_close)
manual_controls.children[3].on_click(health_check)

# Create the layout
scenario_display = widgets.VBox([
    widgets.HTML("<h3>📚 Select a Scenario:</h3>"),
    widgets.HTML(f"""
    <div style='display: flex; flex-wrap: wrap;'>
        {''.join([s[0] for s in scenario_cards])}
    </div>
    """),
    scenario_selector,
    duration_slider
])

control_panel = widgets.VBox([
    widgets.HTML("<h2>🎛️ Circuit Breaker Control Center</h2>"),
    scenario_display,
    widgets.HBox([start_demo_button, stop_demo_button, refresh_viz_button, reset_system_button]),
    widgets.HTML("<h3>🔧 Manual Controls:</h3>"),
    manual_controls,
    demo_output
])

# Display everything
display(control_panel)
display(viz_output)

# Show initial visualization
with viz_output:
    fig = demo.create_live_visualization()
    fig.show()

# Auto-refresh visualization during demos
def auto_refresh_viz():
    while True:
        if demo.demo_running:
            time.sleep(1)  # Update every second
            with viz_output:
                clear_output(wait=True)
                fig = demo.create_live_visualization()
                fig.show()
        else:
            time.sleep(2)

# Start auto-refresh in background
auto_refresh_thread = threading.Thread(target=auto_refresh_viz, daemon=True)
auto_refresh_thread.start()

print("\n🎯 How to Use:")
print("1. Select a scenario to see different failure patterns")
print("2. Click 'Start Demo' to begin the simulation")
print("3. Watch the live visualization update in real-time!")
print("4. Try manual controls to force state changes")
print("5. Use 'Health Check' to test all models")
print("\n💡 Watch for:")
print("• State transitions (green → orange → red)")
print("• Success rates dropping and recovering")
print("• Failure patterns in the pie chart")
print("• Events appearing in the live log")
print("• System health score changes")

NameError: name 'resilient_system' is not defined

---

## 🤖 **Module 5: Building Self-Healing AI Systems**
### The Ultimate Goal: Systems That Fix Themselves!

```python
# The Self-Healing Loop
while system.is_running():
    health = monitor.check_health()
    if not health.is_healthy:
        diagnosis = analyzer.diagnose(health)
        remedy = healer.prescribe(diagnosis)
        executor.apply_remedy(remedy)
```

In [10]:
# 🎯 Cell 10: Fixed Self-Healing AI System with Stable Demo
# Corrected version with proper visualization and no flickering

from enum import Enum, auto
from dataclasses import dataclass, field
from collections import defaultdict, deque
from typing import Optional, Dict, Any, List, Callable
import threading
import time
import random
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import ipywidgets as widgets
from IPython.display import display, clear_output, HTML
import json
import psutil

class HealthStatus(Enum):
    """System health states with emoji indicators"""
    HEALTHY = ("healthy", "💚", "#00d084")
    DEGRADED = ("degraded", "💛", "#ff9800")
    CRITICAL = ("critical", "🔴", "#ff3860")
    RECOVERING = ("recovering", "🔄", "#3273dc")
    MAINTENANCE = ("maintenance", "🔧", "#9c27b0")

class HealingAction(Enum):
    """Types of healing actions the system can take"""
    CIRCUIT_BREAKER_ADJUST = auto()
    CACHE_EXPANSION = auto()
    LOAD_SHEDDING = auto()
    FALLBACK_ACTIVATION = auto()
    RATE_LIMIT_ADJUST = auto()
    TIMEOUT_ADJUST = auto()
    RETRY_POLICY_CHANGE = auto()
    RESOURCE_SCALING = auto()
    GARBAGE_COLLECTION = auto()
    DIAGNOSTIC_MODE = auto()

@dataclass
class HealthMetrics:
    """Comprehensive system health metrics"""
    timestamp: datetime
    error_rate: float
    latency_p50: float
    latency_p95: float
    latency_p99: float
    success_rate: float
    throughput: float
    queue_depth: int
    active_connections: int
    memory_usage: float
    cpu_usage: float
    cache_hit_rate: float
    circuit_breaker_trips: int
    healing_actions_taken: int
    status: HealthStatus
    
    @property
    def health_score(self) -> float:
        """Calculate overall health score (0-100)"""
        weights = {
            'error_rate': 0.3,
            'latency': 0.2,
            'throughput': 0.2,
            'resources': 0.15,
            'stability': 0.15
        }
        
        # Error rate component
        error_score = (1 - min(self.error_rate * 2, 1)) * 100 * weights['error_rate']
        
        # Latency component
        latency_score = 100
        if self.latency_p95 > 5:
            latency_score -= min((self.latency_p95 - 5) * 10, 50)
        latency_score = max(0, latency_score) * weights['latency']
        
        # Throughput component
        throughput_score = min(self.throughput / 10, 1) * 100 * weights['throughput']
        
        # Resource usage component
        resource_score = 100
        if self.memory_usage > 80:
            resource_score -= min((self.memory_usage - 80), 30)
        if self.cpu_usage > 80:
            resource_score -= min((self.cpu_usage - 80), 30)
        resource_score = max(0, resource_score) * weights['resources']
        
        # Stability component
        stability_score = max(0, 100 - self.circuit_breaker_trips * 10) * weights['stability']
        
        total = error_score + latency_score + throughput_score + resource_score + stability_score
        return min(100, max(0, total))
    
    def get_diagnosis(self) -> List[str]:
        """Get list of current issues"""
        issues = []
        
        if self.error_rate > 0.1:
            issues.append(f"High error rate: {self.error_rate:.1%}")
        if self.latency_p95 > 3:
            issues.append(f"High latency: {self.latency_p95:.1f}s")
        if self.memory_usage > 80:
            issues.append(f"High memory: {self.memory_usage:.0f}%")
        if self.queue_depth > 50:
            issues.append(f"Queue backup: {self.queue_depth} items")
        if self.circuit_breaker_trips > 2:
            issues.append(f"Circuit instability: {self.circuit_breaker_trips} trips")
        
        return issues

class SelfHealingSystem:
    """Self-healing AI system with monitoring and auto-recovery"""
    
    def __init__(self):
        # Health monitoring
        self.health_history = deque(maxlen=100)
        self.healing_log = deque(maxlen=50)
        self.event_stream = deque(maxlen=50)
        
        # Monitoring state
        self.is_monitoring = False
        self.monitor_thread = None
        self.healer_thread = None
        
        # Configuration
        self.config = {
            'monitor_interval': 1,
            'healing_interval': 3,
            'health_threshold': 70,
            'auto_heal': True,
            'aggressive_healing': False
        }
        
        # Metrics storage
        self.metrics = {
            'requests': deque(maxlen=100),
            'errors': deque(maxlen=50),
            'healings': deque(maxlen=20)
        }
        
        # Simulation
        self.simulation_active = False
        self.simulation_scenario = 'normal'
        self.simulation_time = 0
        
    def start_monitoring(self):
        """Start the self-healing system"""
        if self.is_monitoring:
            print("System already running!")
            return
            
        self.is_monitoring = True
        self.simulation_time = 0
        
        # Start monitor thread
        self.monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True)
        self.monitor_thread.start()
        
        # Start healer thread
        self.healer_thread = threading.Thread(target=self._healer_loop, daemon=True)
        self.healer_thread.start()
        
        self._log_event("System", "Started", "Self-healing activated")
        
    def stop_monitoring(self):
        """Stop the monitoring system"""
        self.is_monitoring = False
        self._log_event("System", "Stopped", "Monitoring stopped")
        
    def _monitor_loop(self):
        """Main monitoring loop"""
        while self.is_monitoring:
            try:
                metrics = self._collect_metrics()
                self.health_history.append(metrics)
                
                if metrics.health_score < self.config['health_threshold']:
                    self._log_event("Monitor", "Alert", f"Health: {metrics.health_score:.1f}")
                
                self.simulation_time += 1
                
            except Exception as e:
                print(f"Monitor error: {e}")
            
            time.sleep(self.config['monitor_interval'])
    
    def _healer_loop(self):
        """Automatic healing loop"""
        while self.is_monitoring:
            try:
                if self.config['auto_heal'] and len(self.health_history) > 0:
                    latest = self.health_history[-1]
                    
                    if latest.status != HealthStatus.HEALTHY:
                        self._perform_healing(latest)
                
            except Exception as e:
                print(f"Healer error: {e}")
            
            time.sleep(self.config['healing_interval'])
    
    def _collect_metrics(self) -> HealthMetrics:
        """Collect metrics based on simulation scenario"""
        
        if self.simulation_scenario == 'degradation':
            # Gradual degradation
            progress = min(self.simulation_time / 30, 1)
            error_rate = min(0.4, progress * 0.5)
            latency_p95 = 1 + progress * 8
            
        elif self.simulation_scenario == 'spike':
            # Traffic spike pattern
            if self.simulation_time % 20 < 8:
                error_rate = 0.3
                latency_p95 = 6
            else:
                error_rate = 0.05
                latency_p95 = 2
                
        elif self.simulation_scenario == 'recovery':
            # Failure and recovery
            cycle = self.simulation_time % 40
            if cycle < 10:
                error_rate = 0.35
                latency_p95 = 7
            elif cycle < 25:
                error_rate = max(0.02, 0.35 - (cycle - 10) * 0.02)
                latency_p95 = max(1.5, 7 - (cycle - 10) * 0.3)
            else:
                error_rate = 0.02
                latency_p95 = 1.5
                
        elif self.simulation_scenario == 'chaos':
            # Random chaos
            error_rate = random.uniform(0, 0.45)
            latency_p95 = random.uniform(1, 10)
            
        else:  # normal
            error_rate = random.uniform(0, 0.05)
            latency_p95 = random.uniform(0.8, 2.5)
        
        # Calculate derived metrics
        latency_p50 = latency_p95 * 0.6
        latency_p99 = latency_p95 * 1.3
        success_rate = 1 - error_rate
        throughput = max(1, 20 * success_rate * random.uniform(0.8, 1.2))
        
        # Determine status
        if error_rate > 0.25 or latency_p95 > 5:
            status = HealthStatus.CRITICAL
        elif error_rate > 0.1 or latency_p95 > 3:
            status = HealthStatus.DEGRADED
        elif error_rate > 0.05:
            status = HealthStatus.RECOVERING
        else:
            status = HealthStatus.HEALTHY
        
        return HealthMetrics(
            timestamp=datetime.now(),
            error_rate=error_rate,
            latency_p50=latency_p50,
            latency_p95=latency_p95,
            latency_p99=latency_p99,
            success_rate=success_rate,
            throughput=throughput,
            queue_depth=random.randint(0, 100),
            active_connections=random.randint(10, 100),
            memory_usage=random.uniform(30, 70),
            cpu_usage=random.uniform(20, 60),
            cache_hit_rate=random.uniform(0.6, 0.95),
            circuit_breaker_trips=random.randint(0, 5) if error_rate > 0.2 else 0,
            healing_actions_taken=len(self.healing_log),
            status=status
        )
    
    def _perform_healing(self, metrics: HealthMetrics):
        """Perform automatic healing"""
        issues = metrics.get_diagnosis()
        
        if not issues:
            return
        
        # Apply appropriate healing
        healing_actions = []
        
        if metrics.error_rate > 0.2:
            healing_actions.append("Circuit breakers tightened")
            healing_actions.append("Fallback activated")
            
        if metrics.latency_p95 > 5:
            healing_actions.append("Timeout reduced")
            healing_actions.append("Cache expanded")
            
        if metrics.memory_usage > 80:
            healing_actions.append("Garbage collection forced")
            
        if healing_actions:
            self._apply_healing(healing_actions)
    
    def _apply_healing(self, actions: List[str]):
        """Apply and log healing actions"""
        healing_record = {
            'timestamp': datetime.now(),
            'actions': actions
        }
        
        self.healing_log.append(healing_record)
        
        for action in actions:
            self._log_event("Healer", "Applied", action)
    
    def _log_event(self, component: str, action: str, details: str):
        """Log system events"""
        self.event_stream.append({
            'timestamp': datetime.now(),
            'component': component,
            'action': action,
            'details': details
        })
    
    def inject_failure(self, failure_type: str):
        """Inject a failure for testing"""
        # Add some bad metrics
        for _ in range(10):
            self.metrics['requests'].append({
                'success': False,
                'latency': 10,
                'timestamp': datetime.now()
            })
        
        self._log_event("Test", "Injected", failure_type)

# Create the system
system = SelfHealingSystem()

# Dashboard class
class Dashboard:
    """Dashboard for visualization"""
    
    def __init__(self, system):
        self.system = system
    
    def create_visualization(self):
        """Create the dashboard"""
        if len(self.system.health_history) == 0:
            return go.Figure().add_annotation(text="No data yet - Start the system first!")
        
        # Create figure
        fig = make_subplots(
            rows=2, cols=3,
            subplot_titles=(
                'Health Score', 'Error Rate %', 'Latency (seconds)',
                'System Status', 'Recent Events', 'Healing Actions'
            ),
            specs=[
                [{'type': 'indicator'}, {'type': 'scatter'}, {'type': 'scatter'}],
                [{'type': 'indicator'}, {'type': 'table'}, {'type': 'bar'}]
            ]
        )
        
        # Get data
        history = list(self.system.health_history)
        latest = history[-1]
        
        # 1. Health Score Gauge
        fig.add_trace(
            go.Indicator(
                mode="gauge+number",
                value=latest.health_score,
                title={'text': "Health"},
                gauge={
                    'axis': {'range': [0, 100]},
                    'bar': {'color': latest.status.value[2]},
                    'steps': [
                        {'range': [0, 50], 'color': "#ffcdd2"},
                        {'range': [50, 70], 'color': "#fff9c4"},
                        {'range': [70, 100], 'color': "#c8e6c9"}
                    ],
                    'threshold': {
                        'line': {'color': "red", 'width': 2},
                        'thickness': 0.75,
                        'value': 30
                    }
                }
            ),
            row=1, col=1
        )
        
        # 2. Error Rate
        timestamps = [h.timestamp for h in history[-50:]]
        error_rates = [h.error_rate * 100 for h in history[-50:]]
        
        fig.add_trace(
            go.Scatter(
                x=timestamps,
                y=error_rates,
                mode='lines',
                name='Error Rate',
                line=dict(color='red', width=2),
                fill='tozeroy'
            ),
            row=1, col=2
        )
        
        # 3. Latency
        latencies = [h.latency_p95 for h in history[-50:]]
        
        fig.add_trace(
            go.Scatter(
                x=timestamps,
                y=latencies,
                mode='lines',
                name='P95 Latency',
                line=dict(color='blue', width=2)
            ),
            row=1, col=3
        )
        
        # 4. Status Indicator
        status_text = latest.status.value[0].upper()
        status_emoji = latest.status.value[1]
        
        fig.add_trace(
            go.Indicator(
                mode="number",
                value=len(self.system.healing_log),
                title={'text': f"{status_emoji} {status_text}<br>Total Healings"},
                number={'font': {'size': 40}}
            ),
            row=2, col=1
        )
        
        # 5. Event Log
        events_data = []
        for event in list(self.system.event_stream)[-8:]:
            events_data.append([
                event['timestamp'].strftime('%H:%M:%S'),
                event['component'],
                event['details'][:30]
            ])
        
        fig.add_trace(
            go.Table(
                header=dict(
                    values=['Time', 'Component', 'Event'],
                    fill_color='paleturquoise',
                    align='left'
                ),
                cells=dict(
                    values=list(zip(*events_data)) if events_data else [[], [], []],
                    fill_color='lavender',
                    align='left'
                )
            ),
            row=2, col=2
        )
        
        # 6. Healing Actions Count
        healing_counts = {}
        for record in self.system.healing_log:
            for action in record['actions']:
                key = action.split()[0]
                healing_counts[key] = healing_counts.get(key, 0) + 1
        
        if healing_counts:
            fig.add_trace(
                go.Bar(
                    x=list(healing_counts.keys()),
                    y=list(healing_counts.values()),
                    marker_color='green'
                ),
                row=2, col=3
            )
        
        # Update layout
        fig.update_layout(
            height=600,
            showlegend=False,
            title_text="Self-Healing System Dashboard"
        )
        
        return fig

# Create dashboard
dashboard = Dashboard(system)

# Interactive controls
print("🎮 Self-Healing System Control Panel")
print("="*50)

# Scenario selector
scenario_selector = widgets.Dropdown(
    options=[
        ('Normal Operations', 'normal'),
        ('Gradual Degradation', 'degradation'),
        ('Traffic Spike', 'spike'),
        ('Failure & Recovery', 'recovery'),
        ('Chaos Mode', 'chaos')
    ],
    value='normal',
    description='Scenario:'
)

# Control buttons
start_button = widgets.Button(
    description='▶️ Start',
    button_style='success',
    layout=widgets.Layout(width='100px')
)

stop_button = widgets.Button(
    description='⏹️ Stop',
    button_style='danger',
    layout=widgets.Layout(width='100px')
)

refresh_button = widgets.Button(
    description='🔄 Refresh',
    button_style='info',
    layout=widgets.Layout(width='100px')
)

inject_button = widgets.Button(
    description='💥 Inject Failure',
    button_style='warning',
    layout=widgets.Layout(width='120px')
)

# Settings
auto_heal_toggle = widgets.Checkbox(
    value=True,
    description='Auto-Heal'
)

# Output
output = widgets.Output()
viz_output = widgets.Output()

# Handlers
def start_system(b):
    with output:
        clear_output()
        system.simulation_scenario = scenario_selector.value
        system.simulation_active = True
        system.start_monitoring()
        print(f"✅ Started with scenario: {scenario_selector.value}")
        print("Click 'Refresh' to update visualization")

def stop_system(b):
    with output:
        clear_output()
        system.stop_monitoring()
        print("⏹️ System stopped")

def refresh_viz(b):
    with viz_output:
        clear_output(wait=True)
        try:
            fig = dashboard.create_visualization()
            fig.show()
        except Exception as e:
            print(f"Visualization error: {e}")

def inject_failure(b):
    system.inject_failure("manual")
    with output:
        print("💥 Failure injected!")

def update_settings(change):
    system.config['auto_heal'] = auto_heal_toggle.value

# Connect handlers
start_button.on_click(start_system)
stop_button.on_click(stop_system)
refresh_button.on_click(refresh_viz)
inject_button.on_click(inject_failure)
auto_heal_toggle.observe(update_settings, names='value')

# Layout
controls = widgets.VBox([
    widgets.HTML("<h3>Control Panel</h3>"),
    scenario_selector,
    widgets.HBox([start_button, stop_button, refresh_button]),
    widgets.HBox([auto_heal_toggle, inject_button]),
    output
])

display(controls)
display(viz_output)

# Initial viz
with viz_output:
    fig = dashboard.create_visualization()
    fig.show()

print("\n📖 Instructions:")
print("1. Select a scenario")
print("2. Click 'Start' to begin monitoring")
print("3. Click 'Refresh' periodically to update the dashboard")
print("4. Watch the system heal itself!")
print("5. Try injecting failures to test healing")
print("\nNote: Manual refresh prevents flickering")

🎮 Self-Healing System Control Panel


VBox(children=(HTML(value='<h3>Control Panel</h3>'), Dropdown(description='Scenario:', options=(('Normal Opera…

Output()


📖 Instructions:
1. Select a scenario
2. Click 'Start' to begin monitoring
3. Click 'Refresh' periodically to update the dashboard
4. Watch the system heal itself!
5. Try injecting failures to test healing

Note: Manual refresh prevents flickering


### 🎮 **Final Boss: Complete System Test**
#### Put everything together and watch your resilient AI handle chaos!

In [11]:
# 🎯 Cell 11: Ultimate Chaos Engineering System with Gamification
# Advanced chaos testing with live visualizations, achievements, and interactive controls!

import plotly.graph_objects as go
from plotly.subplots import make_subplots
import ipywidgets as widgets
from IPython.display import display, clear_output, HTML
import threading
import time
import random
import numpy as np
from datetime import datetime, timedelta
from collections import deque, defaultdict
from enum import Enum, auto
import json

class ChaosType(Enum):
    """Types of chaos that can be injected"""
    API_FLOOD = auto()
    SERVICE_OUTAGE = auto()
    LATENCY_SPIKE = auto()
    RANDOM_ERRORS = auto()
    MEMORY_PRESSURE = auto()
    NETWORK_PARTITION = auto()
    CASCADE_FAILURE = auto()
    BYZANTINE_FAULT = auto()
    TOTAL_CHAOS = auto()

class Achievement:
    """Achievement system for chaos engineering"""
    def __init__(self, name, description, icon, condition):
        self.name = name
        self.description = description
        self.icon = icon
        self.condition = condition
        self.unlocked = False
        self.unlock_time = None

class ChaosEngineeringSystem:
    """Advanced chaos engineering system with gamification"""
    
    def __init__(self):
        # Chaos scenarios with detailed configurations
        self.chaos_scenarios = {
            '🌊 API Flood': {
                'type': ChaosType.API_FLOOD,
                'description': 'Massive request surge (100-500 req/s)',
                'difficulty': 3,
                'duration': 15,
                'params': {'rate_multiplier': 10, 'burst_size': 100},
                'points': 300
            },
            '💥 Service Outage': {
                'type': ChaosType.SERVICE_OUTAGE,
                'description': 'Complete service failure for primary models',
                'difficulty': 4,
                'duration': 20,
                'params': {'failure_rate': 0.95, 'affected_services': ['primary']},
                'points': 400
            },
            '🐌 Latency Spike': {
                'type': ChaosType.LATENCY_SPIKE,
                'description': 'Extreme latency (10-30 second delays)',
                'difficulty': 2,
                'duration': 15,
                'params': {'latency_range': (10, 30), 'spike_probability': 0.7},
                'points': 200
            },
            '🎲 Random Errors': {
                'type': ChaosType.RANDOM_ERRORS,
                'description': 'Unpredictable 50-80% failure rate',
                'difficulty': 3,
                'duration': 20,
                'params': {'error_rate_range': (0.5, 0.8), 'error_types': ['timeout', '500', '429']},
                'points': 350
            },
            '💾 Memory Pressure': {
                'type': ChaosType.MEMORY_PRESSURE,
                'description': 'Simulate memory leak and resource exhaustion',
                'difficulty': 4,
                'duration': 25,
                'params': {'memory_growth_rate': 0.05, 'gc_failure_rate': 0.3},
                'points': 450
            },
            '🌐 Network Partition': {
                'type': ChaosType.NETWORK_PARTITION,
                'description': 'Network splits and connectivity issues',
                'difficulty': 5,
                'duration': 20,
                'params': {'partition_probability': 0.4, 'recovery_time': 5},
                'points': 500
            },
            '🔥 Cascade Failure': {
                'type': ChaosType.CASCADE_FAILURE,
                'description': 'One failure triggers chain reaction',
                'difficulty': 5,
                'duration': 30,
                'params': {'initial_failure': 'primary', 'cascade_probability': 0.8},
                'points': 600
            },
            '👹 Byzantine Fault': {
                'type': ChaosType.BYZANTINE_FAULT,
                'description': 'Services give wrong responses',
                'difficulty': 6,
                'duration': 25,
                'params': {'corruption_rate': 0.3, 'inconsistency_rate': 0.5},
                'points': 700
            },
            '🌪️ TOTAL CHAOS': {
                'type': ChaosType.TOTAL_CHAOS,
                'description': 'ALL problems simultaneously!',
                'difficulty': 10,
                'duration': 60,
                'params': {'all_chaos': True, 'intensity': 'maximum'},
                'points': 1000
            }
        }
        
        # Initialize achievements
        self.achievements = self._init_achievements()
        
        # Chaos state
        self.chaos_active = False
        self.current_scenario = None
        self.chaos_thread = None
        self.chaos_start_time = None
        
        # Metrics tracking
        self.metrics = {
            'total_chaos_runs': 0,
            'total_requests': 0,
            'successful_requests': 0,
            'failed_requests': 0,
            'fallback_uses': 0,
            'healing_triggers': 0,
            'circuit_breaks': 0,
            'avg_response_time': 0,
            'uptime_percentage': 100
        }
        
        # Live data for visualization
        self.live_data = {
            'timestamps': deque(maxlen=100),
            'success_rate': deque(maxlen=100),
            'latency': deque(maxlen=100),
            'health_score': deque(maxlen=100),
            'chaos_intensity': deque(maxlen=100),
            'events': deque(maxlen=50)
        }
        
        # Player stats
        self.player_stats = {
            'level': 1,
            'experience': 0,
            'total_score': 0,
            'chaos_mastery': 0,
            'resilience_rating': 100,
            'scenarios_completed': [],
            'best_scores': {}
        }
        
        # System reference (would connect to actual self-healing system)
        self.system_health = 100
        self.system_status = "READY"
        
    def _init_achievements(self):
        """Initialize achievement system"""
        return [
            Achievement(
                "First Blood", 
                "Complete your first chaos scenario",
                "🩸",
                lambda s: s['total_chaos_runs'] >= 1
            ),
            Achievement(
                "Survivor",
                "Maintain 80% success rate during chaos",
                "🛡️",
                lambda s: s['successful_requests'] / max(s['total_requests'], 1) > 0.8
            ),
            Achievement(
                "Chaos Master",
                "Complete all chaos scenarios",
                "👑",
                lambda s: len(s.get('scenarios_completed', [])) >= 9
            ),
            Achievement(
                "Unbreakable",
                "Achieve 95% uptime during Total Chaos",
                "💎",
                lambda s: s.get('total_chaos_uptime', 0) > 0.95
            ),
            Achievement(
                "Speed Demon",
                "Complete a scenario with <2s avg response time",
                "⚡",
                lambda s: s['avg_response_time'] < 2 and s['total_chaos_runs'] > 0
            ),
            Achievement(
                "Healer",
                "Trigger 100+ healing actions",
                "💊",
                lambda s: s['healing_triggers'] >= 100
            ),
            Achievement(
                "Circuit Breaker",
                "Trip 50+ circuit breakers",
                "⚡",
                lambda s: s['circuit_breaks'] >= 50
            ),
            Achievement(
                "Resilience God",
                "Reach level 10",
                "🌟",
                lambda s: s.get('level', 1) >= 10
            )
        ]
    
    def start_chaos(self, scenario_name, intensity=1.0):
        """Start a chaos scenario"""
        if self.chaos_active:
            return "Chaos already active!"
        
        self.current_scenario = self.chaos_scenarios[scenario_name]
        self.chaos_active = True
        self.chaos_start_time = time.time()
        self.metrics['total_chaos_runs'] += 1
        
        # Start chaos thread
        self.chaos_thread = threading.Thread(
            target=self._run_chaos,
            args=(scenario_name, intensity),
            daemon=True
        )
        self.chaos_thread.start()
        
        # Log event
        self._log_event("CHAOS", "Started", f"{scenario_name} (Intensity: {intensity:.1f})")
        
        return f"Chaos scenario '{scenario_name}' started!"
    
    def _run_chaos(self, scenario_name, intensity):
        """Run the chaos scenario"""
        scenario = self.current_scenario
        duration = scenario['duration']
        start_time = time.time()
        
        # Track scenario-specific metrics
        scenario_metrics = {
            'requests': 0,
            'successes': 0,
            'failures': 0,
            'total_latency': 0
        }
        
        while time.time() - start_time < duration and self.chaos_active:
            # Simulate chaos effects
            chaos_intensity = self._calculate_chaos_intensity(
                scenario['type'], 
                (time.time() - start_time) / duration,
                intensity
            )
            
            # Simulate request
            success, latency = self._simulate_request(scenario['type'], chaos_intensity)
            
            # Update metrics
            scenario_metrics['requests'] += 1
            if success:
                scenario_metrics['successes'] += 1
                self.metrics['successful_requests'] += 1
            else:
                scenario_metrics['failures'] += 1
                self.metrics['failed_requests'] += 1
                
            scenario_metrics['total_latency'] += latency
            self.metrics['total_requests'] += 1
            
            # Update live data
            self._update_live_data(success, latency, chaos_intensity)
            
            # Simulate system response
            self._simulate_system_response(success, chaos_intensity)
            
            # Small delay
            time.sleep(0.1)
        
        # Calculate final score
        success_rate = scenario_metrics['successes'] / max(scenario_metrics['requests'], 1)
        avg_latency = scenario_metrics['total_latency'] / max(scenario_metrics['requests'], 1)
        
        score = self._calculate_score(scenario, success_rate, avg_latency)
        
        # Update player stats
        self._update_player_stats(scenario_name, score, success_rate)
        
        # Check achievements
        self._check_achievements()
        
        self.chaos_active = False
        self._log_event("CHAOS", "Completed", f"Score: {score}")
        
    def _calculate_chaos_intensity(self, chaos_type, progress, base_intensity):
        """Calculate current chaos intensity based on scenario and progress"""
        if chaos_type == ChaosType.CASCADE_FAILURE:
            # Exponential growth for cascade
            return min(1.0, base_intensity * (1.5 ** (progress * 3)))
        elif chaos_type == ChaosType.API_FLOOD:
            # Wave pattern for flood
            import math
            return base_intensity * (1 + 0.5 * math.sin(progress * 10))
        elif chaos_type == ChaosType.TOTAL_CHAOS:
            # Random spikes for total chaos
            return base_intensity * random.uniform(0.5, 1.5)
        else:
            # Default linear progression
            return base_intensity * (0.5 + progress * 0.5)
    
    def _simulate_request(self, chaos_type, intensity):
        """Simulate a request under chaos conditions"""
        # Base success rate and latency
        base_success_rate = 0.95
        base_latency = 1.0
        
        # Apply chaos effects
        if chaos_type == ChaosType.SERVICE_OUTAGE:
            success_rate = 1 - intensity * 0.9
            latency = base_latency * (1 + intensity * 10)
        elif chaos_type == ChaosType.LATENCY_SPIKE:
            success_rate = base_success_rate
            latency = base_latency + intensity * 20
        elif chaos_type == ChaosType.RANDOM_ERRORS:
            success_rate = 1 - random.uniform(0, intensity)
            latency = base_latency * random.uniform(1, 5)
        elif chaos_type == ChaosType.TOTAL_CHAOS:
            success_rate = random.uniform(0.1, 0.9)
            latency = random.uniform(0.5, 30)
        else:
            success_rate = base_success_rate - intensity * 0.3
            latency = base_latency * (1 + intensity * 3)
        
        success = random.random() < success_rate
        
        return success, latency
    
    def _simulate_system_response(self, request_success, chaos_intensity):
        """Simulate system's self-healing response"""
        # Simulate healing triggers
        if not request_success and random.random() < 0.5:
            self.metrics['healing_triggers'] += 1
            self._log_event("HEAL", "Triggered", "Auto-healing activated")
        
        # Simulate circuit breaker
        if chaos_intensity > 0.7 and random.random() < 0.3:
            self.metrics['circuit_breaks'] += 1
            self._log_event("CIRCUIT", "Tripped", "Circuit breaker activated")
        
        # Update system health
        if request_success:
            self.system_health = min(100, self.system_health + 0.5)
        else:
            self.system_health = max(0, self.system_health - 2)
        
        # Update system status
        if self.system_health > 80:
            self.system_status = "HEALTHY"
        elif self.system_health > 50:
            self.system_status = "DEGRADED"
        elif self.system_health > 20:
            self.system_status = "CRITICAL"
        else:
            self.system_status = "FAILING"
    
    def _update_live_data(self, success, latency, chaos_intensity):
        """Update live visualization data"""
        timestamp = datetime.now()
        
        self.live_data['timestamps'].append(timestamp)
        
        # Calculate rolling success rate
        recent_successes = sum(1 for _ in range(min(10, len(self.live_data['success_rate']))))
        success_rate = (recent_successes + (1 if success else 0)) / 11
        self.live_data['success_rate'].append(success_rate * 100)
        
        self.live_data['latency'].append(latency)
        self.live_data['health_score'].append(self.system_health)
        self.live_data['chaos_intensity'].append(chaos_intensity * 100)
    
    def _calculate_score(self, scenario, success_rate, avg_latency):
        """Calculate score for chaos run"""
        base_score = scenario['points']
        
        # Success rate multiplier (0.0 - 1.5)
        success_multiplier = min(1.5, success_rate * 1.5)
        
        # Latency bonus (lower is better)
        latency_bonus = max(0, 100 - avg_latency * 10)
        
        # Difficulty multiplier
        difficulty_multiplier = 1 + (scenario['difficulty'] - 1) * 0.2
        
        final_score = int(
            (base_score * success_multiplier + latency_bonus) * difficulty_multiplier
        )
        
        return final_score
    
    def _update_player_stats(self, scenario_name, score, success_rate):
        """Update player statistics"""
        self.player_stats['total_score'] += score
        
        # Update experience and level
        self.player_stats['experience'] += score // 10
        new_level = 1 + self.player_stats['experience'] // 1000
        if new_level > self.player_stats['level']:
            self.player_stats['level'] = new_level
            self._log_event("LEVEL", "Level Up!", f"Reached level {new_level}")
        
        # Track completed scenarios
        if scenario_name not in self.player_stats['scenarios_completed']:
            self.player_stats['scenarios_completed'].append(scenario_name)
        
        # Update best scores
        if scenario_name not in self.player_stats['best_scores'] or \
           score > self.player_stats['best_scores'][scenario_name]:
            self.player_stats['best_scores'][scenario_name] = score
        
        # Update chaos mastery
        self.player_stats['chaos_mastery'] = min(100, 
            self.player_stats['chaos_mastery'] + success_rate * 10)
        
        # Update resilience rating
        self.player_stats['resilience_rating'] = int(
            (self.system_health + success_rate * 100) / 2
        )
    
    def _check_achievements(self):
        """Check and unlock achievements"""
        stats = {
            **self.metrics,
            **self.player_stats
        }
        
        for achievement in self.achievements:
            if not achievement.unlocked and achievement.condition(stats):
                achievement.unlocked = True
                achievement.unlock_time = datetime.now()
                self._log_event("ACHIEVEMENT", achievement.name, achievement.description)
    
    def _log_event(self, category, action, details):
        """Log an event"""
        event = {
            'timestamp': datetime.now(),
            'category': category,
            'action': action,
            'details': details
        }
        self.live_data['events'].append(event)
    
    def stop_chaos(self):
        """Stop the current chaos scenario"""
        self.chaos_active = False
        self._log_event("CHAOS", "Stopped", "Manually terminated")
    
    def get_dashboard_data(self):
        """Get data for dashboard visualization"""
        return {
            'metrics': self.metrics,
            'live_data': self.live_data,
            'player_stats': self.player_stats,
            'system_health': self.system_health,
            'system_status': self.system_status,
            'achievements': self.achievements,
            'chaos_active': self.chaos_active,
            'current_scenario': self.current_scenario
        }

# Create chaos engineering system
chaos_system = ChaosEngineeringSystem()

# Dashboard class
class ChaosDashboard:
    """Interactive dashboard for chaos engineering"""
    
    def __init__(self, chaos_system):
        self.system = chaos_system
    
    def create_visualization(self):
        """Create comprehensive chaos dashboard"""
        data = self.system.get_dashboard_data()
        
        # Create figure with subplots
        fig = make_subplots(
            rows=3, cols=4,
            subplot_titles=(
                'System Health', 'Chaos Intensity', 'Success Rate', 'Response Latency',
                'Player Level', 'Score', 'Achievements', 'Resilience Rating',
                'Event Log', 'Live Metrics', 'Chaos Progress', 'Leaderboard'
            ),
            specs=[
                [{'type': 'indicator'}, {'type': 'scatter'}, {'type': 'scatter'}, {'type': 'scatter'}],
                [{'type': 'indicator'}, {'type': 'indicator'}, {'type': 'bar'}, {'type': 'indicator'}],
                [{'type': 'table'}, {'type': 'scatter'}, {'type': 'indicator'}, {'type': 'table'}]
            ],
            vertical_spacing=0.12,
            horizontal_spacing=0.1
        )
        
        live_data = data['live_data']
        
        # 1. System Health Gauge
        fig.add_trace(
            go.Indicator(
                mode="gauge+number",
                value=data['system_health'],
                title={'text': "Health"},
                gauge={
                    'axis': {'range': [0, 100]},
                    'bar': {'color': self._get_health_color(data['system_health'])},
                    'steps': [
                        {'range': [0, 30], 'color': "#ffebee"},
                        {'range': [30, 70], 'color': "#fff3e0"},
                        {'range': [70, 100], 'color': "#e8f5e9"}
                    ],
                    'threshold': {
                        'line': {'color': "red", 'width': 4},
                        'thickness': 0.75,
                        'value': 20
                    }
                }
            ),
            row=1, col=1
        )
        
        # 2. Chaos Intensity Timeline
        if len(live_data['timestamps']) > 0:
            fig.add_trace(
                go.Scatter(
                    x=list(live_data['timestamps']),
                    y=list(live_data['chaos_intensity']),
                    mode='lines',
                    name='Chaos',
                    line=dict(color='red', width=2),
                    fill='tozeroy',
                    fillcolor='rgba(255,0,0,0.1)'
                ),
                row=1, col=2
            )
        
        # 3. Success Rate Timeline
        if len(live_data['timestamps']) > 0:
            fig.add_trace(
                go.Scatter(
                    x=list(live_data['timestamps']),
                    y=list(live_data['success_rate']),
                    mode='lines',
                    name='Success',
                    line=dict(color='green', width=2),
                    fill='tozeroy',
                    fillcolor='rgba(0,255,0,0.1)'
                ),
                row=1, col=3
            )
        
        # 4. Latency Timeline
        if len(live_data['timestamps']) > 0:
            fig.add_trace(
                go.Scatter(
                    x=list(live_data['timestamps']),
                    y=list(live_data['latency']),
                    mode='lines+markers',
                    name='Latency',
                    line=dict(color='blue', width=2)
                ),
                row=1, col=4
            )
        
        # 5. Player Level
        player_stats = data['player_stats']
        fig.add_trace(
            go.Indicator(
                mode="number+delta",
                value=player_stats['level'],
                title={'text': f"Level {player_stats['level']}"},
                delta={'reference': player_stats['level'] - 1},
                number={'font': {'size': 40}}
            ),
            row=2, col=1
        )
        
        # 6. Total Score
        fig.add_trace(
            go.Indicator(
                mode="number",
                value=player_stats['total_score'],
                title={'text': "Total Score"},
                number={'font': {'size': 35, 'color': '#FFD700'}}
            ),
            row=2, col=2
        )
        
        # 7. Achievements Progress
        unlocked = sum(1 for a in data['achievements'] if a.unlocked)
        total = len(data['achievements'])
        
        fig.add_trace(
            go.Bar(
                x=['Unlocked', 'Locked'],
                y=[unlocked, total - unlocked],
                marker=dict(color=['#4CAF50', '#9E9E9E']),
                text=[f'{unlocked}', f'{total - unlocked}'],
                textposition='auto'
            ),
            row=2, col=3
        )
        
        # 8. Resilience Rating
        fig.add_trace(
            go.Indicator(
                mode="gauge+number",
                value=player_stats['resilience_rating'],
                title={'text': "Resilience"},
                gauge={
                    'axis': {'range': [0, 100]},
                    'bar': {'color': '#3498db'},
                    'bgcolor': "white"
                }
            ),
            row=2, col=4
        )
        
        # 9. Event Log
        events_data = []
        for event in list(live_data['events'])[-8:]:
            emoji = {
                'CHAOS': '🌪️',
                'HEAL': '💊',
                'CIRCUIT': '⚡',
                'ACHIEVEMENT': '🏆',
                'LEVEL': '⬆️'
            }.get(event['category'], '📝')
            
            events_data.append([
                event['timestamp'].strftime('%H:%M:%S'),
                emoji,
                event['action'][:15],
                event['details'][:25]
            ])
        
        fig.add_trace(
            go.Table(
                header=dict(
                    values=['Time', '', 'Action', 'Details'],
                    fill_color='#34495e',
                    font_color='white',
                    align='left',
                    height=20
                ),
                cells=dict(
                    values=list(zip(*events_data)) if events_data else [[], [], [], []],
                    fill_color='#ecf0f1',
                    align='left',
                    height=18,
                    font_size=9
                )
            ),
            row=3, col=1
        )
        
        # 10. Live Metrics Summary
        if len(live_data['timestamps']) > 0:
            # Combined metrics view
            fig.add_trace(
                go.Scatter(
                    x=list(live_data['timestamps'])[-30:],
                    y=list(live_data['health_score'])[-30:],
                    mode='lines',
                    name='Health',
                    line=dict(color='green', width=3)
                ),
                row=3, col=2
            )
        
        # 11. Chaos Progress
        if data['chaos_active'] and data['current_scenario']:
            elapsed = time.time() - self.system.chaos_start_time
            total = data['current_scenario']['duration']
            progress = min(100, (elapsed / total) * 100)
        else:
            progress = 0
        
        fig.add_trace(
            go.Indicator(
                mode="number+gauge",
                value=progress,
                title={'text': "Progress %"},
                gauge={
                    'axis': {'range': [0, 100]},
                    'bar': {'color': '#ff9800'}
                }
            ),
            row=3, col=3
        )
        
        # 12. Best Scores Leaderboard
        best_scores = player_stats.get('best_scores', {})
        if best_scores:
            scenarios = list(best_scores.keys())[:5]
            scores = [best_scores[s] for s in scenarios]
            
            fig.add_trace(
                go.Table(
                    header=dict(
                        values=['Scenario', 'Best Score'],
                        fill_color='#2c3e50',
                        font_color='white',
                        align='center'
                    ),
                    cells=dict(
                        values=[scenarios, scores],
                        fill_color='#34495e',
                        font_color='white',
                        align='center'
                    )
                ),
                row=3, col=4
            )
        
        # Update layout
        fig.update_layout(
            height=800,
            showlegend=False,
            title_text="<b>🌪️ CHAOS ENGINEERING COMMAND CENTER</b>",
            title_font_size=20,
            title_x=0.5,
            paper_bgcolor='#1e1e1e',
            plot_bgcolor='#2d2d2d',
            font=dict(color='white', size=10)
        )
        
        return fig
    
    def _get_health_color(self, health):
        """Get color based on health value"""
        if health > 80:
            return '#4CAF50'
        elif health > 50:
            return '#FFC107'
        elif health > 20:
            return '#FF5722'
        else:
            return '#F44336'

# Create dashboard
dashboard = ChaosDashboard(chaos_system)

# Interactive controls
print("🎮 ULTIMATE CHAOS ENGINEERING CONSOLE")
print("="*50)

# Scenario selector with cards
scenario_selector = widgets.RadioButtons(
    options=[(name, name) for name in chaos_system.chaos_scenarios.keys()],
    value='🌊 API Flood',
    layout=widgets.Layout(width='300px')
)

# Intensity slider
intensity_slider = widgets.FloatSlider(
    value=1.0,
    min=0.5,
    max=2.0,
    step=0.1,
    description='Intensity:',
    style={'description_width': 'initial'},
    readout_format='.1f'
)

# Control buttons
start_button = widgets.Button(
    description='🔥 UNLEASH CHAOS',
    button_style='danger',
    layout=widgets.Layout(width='150px', height='40px')
)

stop_button = widgets.Button(
    description='⏹️ STOP',
    button_style='warning',
    layout=widgets.Layout(width='100px')
)

refresh_button = widgets.Button(
    description='🔄 REFRESH',
    button_style='info',
    layout=widgets.Layout(width='100px')
)

# Output areas
status_output = widgets.Output()
viz_output = widgets.Output()
achievement_output = widgets.Output()

# Handlers
def start_chaos(b):
    with status_output:
        clear_output()
        scenario = scenario_selector.value
        intensity = intensity_slider.value
        
        print(f"🌪️ INITIATING CHAOS: {scenario}")
        print(f"⚡ Intensity: {intensity:.1f}x")
        print(f"⏱️ Duration: {chaos_system.chaos_scenarios[scenario]['duration']}s")
        print(f"🏆 Potential Points: {chaos_system.chaos_scenarios[scenario]['points']}")
        
        result = chaos_system.start_chaos(scenario, intensity)
        print(f"\n{result}")

def stop_chaos(b):
    chaos_system.stop_chaos()
    with status_output:
        print("\n⏹️ Chaos terminated")

def refresh_viz(b):
    with viz_output:
        clear_output(wait=True)
        fig = dashboard.create_visualization()
        fig.show()
    
    # Update achievements display
    with achievement_output:
        clear_output()
        print("🏆 ACHIEVEMENTS")
        print("-"*30)
        for ach in chaos_system.achievements:
            if ach.unlocked:
                print(f"{ach.icon} {ach.name} ✅")
            else:
                print(f"🔒 {ach.name} - {ach.description}")

# Connect handlers
start_button.on_click(start_chaos)
stop_button.on_click(stop_chaos)
refresh_button.on_click(refresh_viz)

# Create scenario info display
def update_scenario_info(change):
    with status_output:
        clear_output()
        scenario = chaos_system.chaos_scenarios[scenario_selector.value]
        print(f"📋 SCENARIO: {scenario_selector.value}")
        print(f"📝 {scenario['description']}")
        print(f"⚡ Difficulty: {'⭐' * scenario['difficulty']}")
        print(f"⏱️ Duration: {scenario['duration']}s")
        print(f"🏆 Points: {scenario['points']}")

scenario_selector.observe(update_scenario_info, names='value')

# Layout
controls = widgets.VBox([
    widgets.HTML("<h2 style='color: white;'>🎮 CHAOS CONTROL CENTER</h2>"),
    widgets.HBox([
        widgets.VBox([
            widgets.HTML("<b style='color: white;'>Select Chaos Type:</b>"),
            scenario_selector,
            intensity_slider
        ]),
        widgets.VBox([
            status_output,
            widgets.HBox([start_button, stop_button, refresh_button])
        ])
    ]),
    achievement_output
], layout=widgets.Layout(
    padding='20px',
    border='2px solid #333',
    background_color='#1e1e1e'
))

display(HTML("<style>.widget-label { color: white !important; }</style>"))
display(controls)
display(viz_output)

# Initial display
update_scenario_info(None)
with viz_output:
    fig = dashboard.create_visualization()
    fig.show()

# Auto-refresh
def auto_refresh():
    while True:
        if chaos_system.chaos_active:
            time.sleep(1)
            with viz_output:
                clear_output(wait=True)
                fig = dashboard.create_visualization()
                fig.show()
        else:
            time.sleep(2)

refresh_thread = threading.Thread(target=auto_refresh, daemon=True)
refresh_thread.start()

print("\n🎯 OBJECTIVES:")
print("• Maintain >80% success rate to earn achievements")
print("• Complete all scenarios to become Chaos Master")
print("• Reach Level 10 for ultimate bragging rights")
print("• Compete for high scores in each scenario")
print("\n💡 TIPS:")
print("• Start with lower intensity to learn patterns")
print("• Watch system health and adjust strategy")
print("• Healing triggers boost your score")
print("• Each scenario has unique challenges")
print("\n🏆 Ready to master chaos? Select a scenario and UNLEASH!")

🎮 ULTIMATE CHAOS ENGINEERING CONSOLE


VBox(children=(HTML(value="<h2 style='color: white;'>🎮 CHAOS CONTROL CENTER</h2>"), HBox(children=(VBox(childr…

Output()


🎯 OBJECTIVES:
• Maintain >80% success rate to earn achievements
• Complete all scenarios to become Chaos Master
• Reach Level 10 for ultimate bragging rights
• Compete for high scores in each scenario

💡 TIPS:
• Start with lower intensity to learn patterns
• Watch system health and adjust strategy
• Healing triggers boost your score
• Each scenario has unique challenges

🏆 Ready to master chaos? Select a scenario and UNLEASH!


---

## 🎓 **Workshop Summary & Production Checklist**
### You've Built an Unbreakable AI System! 🏆

### ✅ **What You've Mastered**

1. **🛡️ Error Handling**
   - Complete error taxonomy
   - Smart retry strategies
   - Context-aware recovery

2. **🎭 Graceful Degradation**
   - Multi-tier fallback systems
   - Intelligent caching
   - Service quality management

3. **🚦 Rate Limiting**
   - Token bucket algorithm
   - Sliding window implementation
   - Smart quota management

4. **🔌 Circuit Breakers**
   - State management
   - Automatic recovery
   - Component protection

5. **🤖 Self-Healing Systems**
   - Health monitoring
   - Auto-diagnosis
   - Autonomous recovery

### 📋 **Production Deployment Checklist**

```python
production_ready = {
    '✅ Error Handling': True,
    '✅ Rate Limiting': True,
    '✅ Circuit Breakers': True,
    '✅ Monitoring': True,
    '✅ Self-Healing': True,
    '✅ Chaos Tested': True
}
```

### 🚀 **Your Next Steps**

1. **Deploy to Production**
   - Use environment variables for API keys
   - Set up proper logging infrastructure
   - Configure alerts and monitoring

2. **Scale Your System**
   - Implement distributed rate limiting
   - Add database-backed circuit breakers
   - Create multi-region failover

3. **Advanced Features**
   - Predictive failure detection
   - ML-based anomaly detection
   - Automated capacity planning

### 🎁 **Bonus: Your Production Toolkit**

In [32]:
# 🎯 Cell 12: Complete Production Resilient AI System - Live Demo
# Run this cell to see the enterprise system in action!

import time
import threading
import random
from datetime import datetime, timedelta
from collections import deque, defaultdict
from dataclasses import dataclass, field
from enum import Enum, auto
from typing import Dict, List, Optional, Any
import hashlib
import json
import numpy as np
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import ipywidgets as widgets
from IPython.display import display, clear_output, HTML

# ============================================================================
# CORE SYSTEM COMPONENTS
# ============================================================================

@dataclass
class ModelConfig:
    """Configuration for AI models"""
    name: str
    max_tokens: int = 1000
    rate_limit: int = 60  # requests per minute
    cost_per_token: float = 0.00002
    priority: int = 1

@dataclass  
class SystemConfig:
    """System configuration"""
    cache_enabled: bool = True
    cache_ttl: int = 300
    max_cache_size: int = 100
    circuit_breaker_threshold: int = 5
    circuit_breaker_timeout: int = 30
    health_check_interval: int = 5
    auto_heal: bool = True

class HealthStatus(Enum):
    """System health states"""
    HEALTHY = ("healthy", "🟢", "#4CAF50")
    DEGRADED = ("degraded", "🟡", "#FFC107")
    CRITICAL = ("critical", "🔴", "#F44336")

class CircuitBreakerState(Enum):
    """Circuit breaker states"""
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

# ============================================================================
# CIRCUIT BREAKER
# ============================================================================

class CircuitBreaker:
    """Circuit breaker implementation"""
    
    def __init__(self, name: str, threshold: int = 5, timeout: int = 30):
        self.name = name
        self.threshold = threshold
        self.timeout = timeout
        self.failure_count = 0
        self.last_failure_time = None
        self.state = CircuitBreakerState.CLOSED
        self.success_count = 0
        self.total_calls = 0
        self.lock = threading.RLock()
    
    def call(self, func, *args, **kwargs):
        """Execute function with circuit breaker protection"""
        with self.lock:
            self.total_calls += 1
            
            # Check if circuit is open
            if self.state == CircuitBreakerState.OPEN:
                if self.last_failure_time and time.time() - self.last_failure_time > self.timeout:
                    self.state = CircuitBreakerState.HALF_OPEN
                    self.success_count = 0
                else:
                    raise Exception(f"Circuit breaker {self.name} is OPEN")
        
        try:
            # Simulate API call
            result = func(*args, **kwargs)
            
            with self.lock:
                if self.state == CircuitBreakerState.HALF_OPEN:
                    self.success_count += 1
                    if self.success_count >= 3:
                        self.state = CircuitBreakerState.CLOSED
                        self.failure_count = 0
                elif self.state == CircuitBreakerState.CLOSED:
                    self.failure_count = max(0, self.failure_count - 1)
            
            return result
            
        except Exception as e:
            with self.lock:
                self.failure_count += 1
                self.last_failure_time = time.time()
                
                if self.failure_count >= self.threshold:
                    self.state = CircuitBreakerState.OPEN
                    self.success_count = 0
            raise

# ============================================================================
# RATE LIMITER
# ============================================================================

class TokenBucketRateLimiter:
    """Token bucket rate limiter"""
    
    def __init__(self, rate: int):
        self.rate = rate  # tokens per minute
        self.capacity = rate
        self.tokens = self.capacity
        self.last_refill = time.time()
        self.lock = threading.Lock()
    
    def acquire(self, tokens: int = 1) -> bool:
        """Try to acquire tokens"""
        with self.lock:
            self._refill()
            if self.tokens >= tokens:
                self.tokens -= tokens
                return True
            return False
    
    def _refill(self):
        """Refill tokens based on time elapsed"""
        now = time.time()
        elapsed = now - self.last_refill
        new_tokens = elapsed * (self.rate / 60)
        self.tokens = min(self.capacity, self.tokens + new_tokens)
        self.last_refill = now

# ============================================================================
# LRU CACHE
# ============================================================================

class LRUCache:
    """Simple LRU cache"""
    
    def __init__(self, capacity: int, ttl: int):
        self.capacity = capacity
        self.ttl = ttl
        self.cache = {}
        self.order = deque()
        self.lock = threading.RLock()
        self.hits = 0
        self.misses = 0
    
    def get(self, key: str) -> Optional[Any]:
        """Get value from cache"""
        with self.lock:
            if key in self.cache:
                value, timestamp = self.cache[key]
                if time.time() - timestamp < self.ttl:
                    self.order.remove(key)
                    self.order.append(key)
                    self.hits += 1
                    return value
                else:
                    del self.cache[key]
                    self.order.remove(key)
            self.misses += 1
            return None
    
    def put(self, key: str, value: Any):
        """Put value in cache"""
        with self.lock:
            if key in self.cache:
                self.order.remove(key)
            elif len(self.cache) >= self.capacity:
                oldest = self.order.popleft()
                del self.cache[oldest]
            
            self.cache[key] = (value, time.time())
            self.order.append(key)

# ============================================================================
# PRODUCTION RESILIENT AI SYSTEM
# ============================================================================

class ProductionResilientAI:
    """Production-ready resilient AI system"""
    
    def __init__(self, config: SystemConfig = None):
        self.config = config or SystemConfig()
        
        # Models
        self.models = {
            'primary': ModelConfig('gpt-4o', rate_limit=100),
            'fallback': ModelConfig('gpt-4o-mini', rate_limit=200),
            'emergency': ModelConfig('gpt-3.5-turbo', rate_limit=500)
        }
        
        # Initialize components
        self.circuit_breakers = {
            name: CircuitBreaker(name, self.config.circuit_breaker_threshold)
            for name in self.models.keys()
        }
        
        self.rate_limiters = {
            name: TokenBucketRateLimiter(model.rate_limit)
            for name, model in self.models.items()
        }
        
        self.cache = LRUCache(
            self.config.max_cache_size,
            self.config.cache_ttl
        ) if self.config.cache_enabled else None
        
        # Metrics
        self.metrics = {
            'total_requests': 0,
            'successful_requests': 0,
            'failed_requests': 0,
            'cache_hits': 0,
            'cache_misses': 0,
            'total_latency': 0,
            'request_history': deque(maxlen=100),
            'error_history': deque(maxlen=50)
        }
        
        # Health monitoring
        self.health_status = HealthStatus.HEALTHY
        self.health_score = 100
        self.health_history = deque(maxlen=100)
        
        # Start monitoring
        self.monitoring_active = True
        self.monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True)
        self.monitor_thread.start()
        
        print("✅ Production Resilient AI System initialized")
    
    def _monitor_loop(self):
        """Background health monitoring"""
        while self.monitoring_active:
            try:
                self._update_health()
                if self.config.auto_heal and self.health_score < 70:
                    self._trigger_healing()
            except Exception as e:
                print(f"Monitor error: {e}")
            time.sleep(self.config.health_check_interval)
    
    def _update_health(self):
        """Update health score and status"""
        # Calculate health score based on metrics
        score = 100
        
        if self.metrics['total_requests'] > 0:
            error_rate = self.metrics['failed_requests'] / self.metrics['total_requests']
            score -= error_rate * 50
            
            avg_latency = self.metrics['total_latency'] / self.metrics['total_requests']
            if avg_latency > 3:
                score -= min((avg_latency - 3) * 10, 30)
        
        # Check circuit breakers
        open_breakers = sum(1 for cb in self.circuit_breakers.values() 
                          if cb.state == CircuitBreakerState.OPEN)
        score -= open_breakers * 20
        
        self.health_score = max(0, min(100, score))
        
        # Update status
        if self.health_score >= 80:
            self.health_status = HealthStatus.HEALTHY
        elif self.health_score >= 50:
            self.health_status = HealthStatus.DEGRADED
        else:
            self.health_status = HealthStatus.CRITICAL
        
        # Record history
        self.health_history.append({
            'timestamp': datetime.now(),
            'score': self.health_score,
            'status': self.health_status
        })
    
    def _trigger_healing(self):
        """Trigger self-healing actions"""
        print(f"🏥 Triggering self-healing (health: {self.health_score:.1f})")
        
        # Reset circuit breakers if needed
        for cb in self.circuit_breakers.values():
            if cb.state == CircuitBreakerState.OPEN and cb.failure_count < 10:
                cb.state = CircuitBreakerState.HALF_OPEN
                print(f"  Reset circuit breaker: {cb.name}")
        
        # Clear cache if needed
        if self.cache and len(self.cache.cache) > self.config.max_cache_size * 0.8:
            self.cache.cache.clear()
            self.cache.order.clear()
            print("  Cleared cache")
    
    def make_request(self, message: str, simulate_failure: bool = False) -> Dict[str, Any]:
        """Make a resilient API request"""
        start_time = time.time()
        self.metrics['total_requests'] += 1
        
        # Generate cache key
        cache_key = hashlib.md5(message.encode()).hexdigest()
        
        # Check cache
        if self.cache:
            cached = self.cache.get(cache_key)
            if cached:
                self.metrics['cache_hits'] += 1
                self.metrics['successful_requests'] += 1
                return {
                    'content': cached,
                    'cached': True,
                    'latency': time.time() - start_time,
                    'model': 'cache'
                }
            self.metrics['cache_misses'] += 1
        
        # Try models in order
        for model_name, model_config in self.models.items():
            try:
                # Check rate limit
                if not self.rate_limiters[model_name].acquire():
                    continue
                
                # Use circuit breaker
                def api_call():
                    # Simulate API call
                    if simulate_failure and random.random() < 0.5:
                        raise Exception("Simulated API failure")
                    
                    # Simulate latency
                    latency = random.uniform(0.5, 2.5)
                    time.sleep(latency)
                    
                    return f"Response from {model_name}: {message[:50]}..."
                
                response = self.circuit_breakers[model_name].call(api_call)
                
                # Cache successful response
                if self.cache:
                    self.cache.put(cache_key, response)
                
                # Update metrics
                latency = time.time() - start_time
                self.metrics['successful_requests'] += 1
                self.metrics['total_latency'] += latency
                self.metrics['request_history'].append({
                    'timestamp': datetime.now(),
                    'model': model_name,
                    'latency': latency,
                    'success': True
                })
                
                return {
                    'content': response,
                    'cached': False,
                    'latency': latency,
                    'model': model_name
                }
                
            except Exception as e:
                self.metrics['error_history'].append({
                    'timestamp': datetime.now(),
                    'model': model_name,
                    'error': str(e)
                })
                continue
        
        # All models failed
        self.metrics['failed_requests'] += 1
        return {
            'content': 'Service temporarily unavailable',
            'error': True,
            'latency': time.time() - start_time
        }
    
    def get_dashboard_data(self) -> Dict[str, Any]:
        """Get data for dashboard"""
        cache_hit_rate = 0
        if self.cache and (self.metrics['cache_hits'] + self.metrics['cache_misses']) > 0:
            cache_hit_rate = self.metrics['cache_hits'] / (
                self.metrics['cache_hits'] + self.metrics['cache_misses']
            )
        
        return {
            'health_score': self.health_score,
            'health_status': self.health_status,
            'metrics': self.metrics,
            'circuit_breakers': {
                name: {
                    'state': cb.state.value,
                    'failures': cb.failure_count,
                    'total_calls': cb.total_calls
                }
                for name, cb in self.circuit_breakers.items()
            },
            'cache_hit_rate': cache_hit_rate,
            'health_history': list(self.health_history)
        }

# ============================================================================
# INTERACTIVE DASHBOARD
# ============================================================================

class InteractiveDashboard:
    """Interactive system dashboard"""
    
    def __init__(self, system: ProductionResilientAI):
        self.system = system
    
    def create_visualization(self):
        """Create dashboard visualization"""
        data = self.system.get_dashboard_data()
        
        fig = make_subplots(
            rows=2, cols=3,
            subplot_titles=(
                'Health Score', 'Request Success Rate', 'Circuit Breakers',
                'Cache Performance', 'Latency Distribution', 'System Metrics'
            ),
            specs=[
                [{'type': 'indicator'}, {'type': 'scatter'}, {'type': 'bar'}],
                [{'type': 'indicator'}, {'type': 'scatter'}, {'type': 'table'}]
            ]
        )
        
        # Health Score Gauge
        status = data['health_status']
        fig.add_trace(
            go.Indicator(
                mode="gauge+number",
                value=data['health_score'],
                title={'text': f"{status.value[1]} System Health"},
                gauge={
                    'axis': {'range': [0, 100]},
                    'bar': {'color': status.value[2]},
                    'steps': [
                        {'range': [0, 50], 'color': "#ffebee"},
                        {'range': [50, 80], 'color': "#fff9c4"},
                        {'range': [80, 100], 'color': "#c8e6c9"}
                    ]
                }
            ),
            row=1, col=1
        )
        
        # Success Rate Timeline
        if data['health_history']:
            timestamps = [h['timestamp'] for h in data['health_history']]
            scores = [h['score'] for h in data['health_history']]
            
            fig.add_trace(
                go.Scatter(
                    x=timestamps,
                    y=scores,
                    mode='lines',
                    line=dict(color='green', width=2),
                    fill='tozeroy'
                ),
                row=1, col=2
            )
        
        # Circuit Breakers Bar Chart
        cb_names = list(data['circuit_breakers'].keys())
        cb_states = [data['circuit_breakers'][n]['failures'] for n in cb_names]
        colors = ['green' if data['circuit_breakers'][n]['state'] == 'closed' 
                 else 'red' for n in cb_names]
        
        fig.add_trace(
            go.Bar(
                x=cb_names,
                y=cb_states,
                marker_color=colors,
                text=[data['circuit_breakers'][n]['state'] for n in cb_names],
                textposition='auto'
            ),
            row=1, col=3
        )
        
        # Cache Hit Rate Gauge
        fig.add_trace(
            go.Indicator(
                mode="gauge+number",
                value=data['cache_hit_rate'] * 100,
                title={'text': "Cache Hit Rate %"},
                gauge={'axis': {'range': [0, 100]}}
            ),
            row=2, col=1
        )
        
        # Latency Distribution
        if data['metrics']['request_history']:
            latencies = [r['latency'] for r in data['metrics']['request_history']]
            fig.add_trace(
                go.Histogram(
                    x=latencies,
                    nbinsx=20,
                    marker_color='blue'
                ),
                row=2, col=2
            )
        
        # Metrics Table
        metrics_data = [
            ['Total Requests', data['metrics']['total_requests']],
            ['Successful', data['metrics']['successful_requests']],
            ['Failed', data['metrics']['failed_requests']],
            ['Cache Hits', data['metrics']['cache_hits']],
            ['Cache Misses', data['metrics']['cache_misses']]
        ]
        
        fig.add_trace(
            go.Table(
                cells=dict(
                    values=list(zip(*metrics_data)),
                    fill_color='lavender',
                    align='left'
                )
            ),
            row=2, col=3
        )
        
        fig.update_layout(height=600, showlegend=False, title_text="Production AI System Dashboard")
        
        return fig

# ============================================================================
# INITIALIZE SYSTEM
# ============================================================================

print("🚀 Initializing Production Resilient AI System Demo...")
system = ProductionResilientAI()
dashboard = InteractiveDashboard(system)

# ============================================================================
# INTERACTIVE CONTROLS
# ============================================================================

print("\n📊 Interactive Control Panel")
print("="*50)

# Test controls
test_message = widgets.Textarea(
    value="What is the meaning of life?",
    placeholder="Enter test message",
    description='Message:',
    layout=widgets.Layout(width='400px')
)

simulate_failure = widgets.Checkbox(
    value=False,
    description='Simulate Failures'
)

num_requests = widgets.IntSlider(
    value=10,
    min=1,
    max=50,
    description='Requests:'
)

# Buttons
single_test = widgets.Button(
    description='Send Request',
    button_style='primary',
    layout=widgets.Layout(width='120px')
)

load_test = widgets.Button(
    description='Run Load Test',
    button_style='success',
    layout=widgets.Layout(width='120px')
)

refresh_dash = widgets.Button(
    description='Refresh Dashboard',
    button_style='info',
    layout=widgets.Layout(width='140px')
)

reset_system = widgets.Button(
    description='Reset System',
    button_style='warning',
    layout=widgets.Layout(width='120px')
)

# Output areas
output = widgets.Output()
dashboard_output = widgets.Output()

# Event handlers
def send_single_request(b):
    with output:
        clear_output()
        print("📤 Sending request...")
        result = system.make_request(
            test_message.value,
            simulate_failure=simulate_failure.value
        )
        print(f"✅ Response received:")
        print(f"  Model: {result.get('model', 'unknown')}")
        print(f"  Latency: {result.get('latency', 0):.2f}s")
        print(f"  Cached: {result.get('cached', False)}")
        if not result.get('error'):
            print(f"  Content: {result['content'][:100]}...")

def run_load_test(b):
    with output:
        clear_output()
        print(f"🔄 Running load test with {num_requests.value} requests...")
        
        results = {'success': 0, 'failed': 0, 'latencies': []}
        
        for i in range(num_requests.value):
            result = system.make_request(
                f"Test message {i}",
                simulate_failure=simulate_failure.value
            )
            
            if result.get('error'):
                results['failed'] += 1
            else:
                results['success'] += 1
                results['latencies'].append(result['latency'])
            
            print(f"  Progress: {i+1}/{num_requests.value}", end='\r')
        
        print(f"\n📊 Load Test Results:")
        print(f"  Successful: {results['success']}")
        print(f"  Failed: {results['failed']}")
        if results['latencies']:
            print(f"  Avg Latency: {np.mean(results['latencies']):.2f}s")
            print(f"  P95 Latency: {np.percentile(results['latencies'], 95):.2f}s")

def refresh_dashboard(b):
    with dashboard_output:
        clear_output()
        fig = dashboard.create_visualization()
        fig.show()

def reset_system_handler(b):
    with output:
        clear_output()
        # Reset metrics
        system.metrics = {
            'total_requests': 0,
            'successful_requests': 0,
            'failed_requests': 0,
            'cache_hits': 0,
            'cache_misses': 0,
            'total_latency': 0,
            'request_history': deque(maxlen=100),
            'error_history': deque(maxlen=50)
        }
        # Reset circuit breakers
        for cb in system.circuit_breakers.values():
            cb.state = CircuitBreakerState.CLOSED
            cb.failure_count = 0
        print("♻️ System reset complete!")

# Connect handlers
single_test.on_click(send_single_request)
load_test.on_click(run_load_test)
refresh_dash.on_click(refresh_dashboard)
reset_system.on_click(reset_system_handler)

# Layout
controls = widgets.VBox([
    widgets.HTML("<h3>🎛️ Test Controls</h3>"),
    test_message,
    widgets.HBox([simulate_failure, num_requests]),
    widgets.HBox([single_test, load_test, refresh_dash, reset_system]),
    output
])

display(controls)
display(dashboard_output)

# Initial dashboard
with dashboard_output:
    fig = dashboard.create_visualization()
    fig.show()

# Auto-refresh dashboard
def auto_refresh():
    while True:
        time.sleep(5)
        with dashboard_output:
            clear_output(wait=True)
            fig = dashboard.create_visualization()
            fig.show()

refresh_thread = threading.Thread(target=auto_refresh, daemon=True)
refresh_thread.start()

print("\n✨ System Ready!")
print("\n📚 Features Demonstrated:")
print("  ✅ Multi-model fallback chain")
print("  ✅ Circuit breaker protection")
print("  ✅ Rate limiting")
print("  ✅ LRU caching")
print("  ✅ Health monitoring")
print("  ✅ Self-healing")
print("  ✅ Real-time metrics")
print("\n🎯 Try:")
print("  1. Send single requests")
print("  2. Run load tests")
print("  3. Enable failure simulation")
print("  4. Watch circuit breakers trip")
print("  5. See cache performance")
print("  6. Monitor health score")

🚀 Initializing Production Resilient AI System Demo...
✅ Production Resilient AI System initialized

📊 Interactive Control Panel


VBox(children=(HTML(value='<h3>🎛️ Test Controls</h3>'), Textarea(value='What is the meaning of life?', descrip…

Output()


✨ System Ready!

📚 Features Demonstrated:
  ✅ Multi-model fallback chain
  ✅ Circuit breaker protection
  ✅ Rate limiting
  ✅ LRU caching
  ✅ Health monitoring
  ✅ Self-healing
  ✅ Real-time metrics

🎯 Try:
  1. Send single requests
  2. Run load tests
  3. Enable failure simulation
  4. Watch circuit breakers trip
  5. See cache performance
  6. Monitor health score


---

## 🎉 **Congratulations! Workshop Complete!**
### You've Built a Production-Grade Resilient AI System! 🏆

### 📚 **What You've Accomplished**

Over the past 6 hours, you've:

1. **Mastered Error Handling** - Built comprehensive error recovery systems
2. **Implemented Graceful Degradation** - Created multi-tier fallback mechanisms
3. **Conquered Rate Limiting** - Developed intelligent quota management
4. **Deployed Circuit Breakers** - Protected your system from cascading failures
5. **Created Self-Healing AI** - Built autonomous recovery capabilities
6. **Survived Chaos Engineering** - Tested your system under extreme conditions

### 🚀 **Your Production Toolkit**

You now have:
- ✅ Complete resilient AI implementation
- ✅ Production-ready error handlers
- ✅ Real-time monitoring dashboard
- ✅ Self-healing mechanisms
- ✅ Chaos testing framework

### 📈 **Next Steps**

1. **Deploy to Production**
   ```python
   # Use the ProductionResilientAI class
   ai_system = ProductionResilientAI(api_key=your_key)
   ```

2. **Customize for Your Use Case**
   - Adjust rate limits
   - Configure circuit breakers
   - Add custom error handlers

3. **Scale Your System**
   - Add Redis for distributed caching
   - Implement database persistence
   - Deploy to Kubernetes for auto-scaling

### 🎓 **Certificate of Completion**

```
╔═══════════════════════════════════════════════╗
║                                               ║
║          CERTIFICATE OF ACHIEVEMENT           ║
║                                               ║
║     AI Resilience & Self-Healing Systems     ║
║                                               ║
║            You have successfully:             ║
║                                               ║
║    ✓ Built production-grade error handling   ║
║    ✓ Implemented graceful degradation        ║
║    ✓ Mastered rate limiting strategies       ║
║    ✓ Deployed circuit breaker protection     ║
║    ✓ Created self-healing AI systems         ║
║                                               ║
║         Your systems are unbreakable!        ║
║                                               ║
╚═══════════════════════════════════════════════╝
```

### 🙏 **Thank You!**

Thank you for joining this workshop! Your AI systems are now:
- 💪 Resilient
- 🛡️ Self-protecting
- 🤖 Self-healing
- 🚀 Production-ready

### 📬 **Keep Learning**

Continue your journey:
- Experiment with different failure scenarios
- Share your implementations
- Build even more resilient systems

**Remember: The best AI systems aren't those that never fail, but those that fail gracefully and recover automatically!**

---

🌟 **Happy Building! May your systems never go down!** 🌟