# File Location: docs/notebooks/05_async_programming.ipynb

# Asynchronous Programming in Python - Interactive Learning Notebook

Welcome to Asynchronous Programming! This notebook teaches you how to write concurrent Python code using async/await, asyncio, and related concepts.

## Learning Objectives

After completing this notebook, you will be able to:

- Understand the concepts of concurrency and asynchronous programming
- Use async/await syntax effectively
- Work with asyncio for I/O-bound operations
- Handle multiple tasks concurrently
- Implement async context managers and iterators
- Debug and optimize asynchronous code
- Choose between threading, multiprocessing, and asyncio

## Table of Contents

1. [Introduction to Async Programming](#introduction)
2. [Async/Await Basics](#async-await-basics)
3. [Asyncio Fundamentals](#asyncio-fundamentals)
4. [Working with Tasks](#working-with-tasks)
5. [Async Context Managers](#async-context-managers)
6. [Async Iterators and Generators](#async-iterators)
7. [Error Handling in Async Code](#error-handling)
8. [Performance and Debugging](#performance-debugging)
9. [Real-World Examples](#real-world-examples)
10. [Practice Exercises](#practice-exercises)

---

## 1. Introduction to Async Programming

### Why Asynchronous Programming?

```python
import time
import asyncio

def synchronous_example():
    """Demonstrate synchronous (blocking) operations"""
    print("Synchronous Example:")
    
    def download_file(filename, delay):
        print(f"Starting download: {filename}")
        time.sleep(delay)  # Simulate download time
        print(f"Finished download: {filename}")
        return f"Content of {filename}"
    
    start_time = time.time()
    
    # Downloads happen one after another (blocking)
    file1 = download_file("file1.txt", 2)
    file2 = download_file("file2.txt", 3)
    file3 = download_file("file3.txt", 1)
    
    total_time = time.time() - start_time
    print(f"Total time: {total_time:.2f} seconds\n")
    
    return [file1, file2, file3]

async def asynchronous_example():
    """Demonstrate asynchronous (non-blocking) operations"""
    print("Asynchronous Example:")
    
    async def download_file_async(filename, delay):
        print(f"Starting download: {filename}")
        await asyncio.sleep(delay)  # Non-blocking sleep
        print(f"Finished download: {filename}")
        return f"Content of {filename}"
    
    start_time = time.time()
    
    # Downloads happen concurrently (non-blocking)
    tasks = [
        download_file_async("file1.txt", 2),
        download_file_async("file2.txt", 3),
        download_file_async("file3.txt", 1)
    ]
    
    results = await asyncio.gather(*tasks)
    
    total_time = time.time() - start_time
    print(f"Total time: {total_time:.2f} seconds\n")
    
    return results

# Compare synchronous vs asynchronous
sync_results = synchronous_example()
async_results = asyncio.run(asynchronous_example())

print(f"Both approaches returned the same results: {sync_results == async_results}")
```

### Concurrency vs Parallelism

```python
def explain_concurrency_vs_parallelism():
    """
    Concurrency: Dealing with multiple things at once (interleaving)
    Parallelism: Doing multiple things at once (simultaneously)
    """
    
    print("Concurrency vs Parallelism:")
    print("=" * 40)
    
    print("Concurrency (asyncio):")
    print("  - Single thread, multiple tasks")
    print("  - Tasks yield control during I/O waits")
    print("  - Best for I/O-bound operations")
    print("  - Example: Web scraping, database queries")
    
    print("\nParallelism (multiprocessing):")
    print("  - Multiple processes/threads")
    print("  - True simultaneous execution")
    print("  - Best for CPU-bound operations")
    print("  - Example: Mathematical calculations, image processing")
    
    print("\nThreading:")
    print("  - Multiple threads in same process")
    print("  - Limited by GIL in CPython")
    print("  - Good for I/O-bound with some blocking operations")

explain_concurrency_vs_parallelism()
```

---

## 2. Async/Await Basics

### Basic Async Functions

```python
import asyncio
import random

async def simple_async_function():
    """Basic async function example"""
    print("Starting async function")
    await asyncio.sleep(1)  # Non-blocking sleep
    print("Async function completed")
    return "Result from async function"

async def async_function_with_params(name, delay):
    """Async function with parameters"""
    print(f"Processing {name}...")
    await asyncio.sleep(delay)
    result = f"Processed {name} in {delay} seconds"
    print(result)
    return result

async def demonstrate_basic_async():
    """Demonstrate basic async/await usage"""
    
    # Call a single async function
    result1 = await simple_async_function()
    print(f"Result: {result1}")
    
    # Call multiple async functions sequentially
    print("\nSequential execution:")
    start_time = asyncio.get_event_loop().time()
    
    result2 = await async_function_with_params("Task A", 1)
    result3 = await async_function_with_params("Task B", 2)
    
    sequential_time = asyncio.get_event_loop().time() - start_time
    print(f"Sequential time: {sequential_time:.2f} seconds")
    
    # Call multiple async functions concurrently
    print("\nConcurrent execution:")
    start_time = asyncio.get_event_loop().time()
    
    task_a = async_function_with_params("Task C", 1)
    task_b = async_function_with_params("Task D", 2)
    
    results = await asyncio.gather(task_a, task_b)
    
    concurrent_time = asyncio.get_event_loop().time() - start_time
    print(f"Concurrent time: {concurrent_time:.2f} seconds")
    print(f"Results: {results}")

# Run the demonstration
asyncio.run(demonstrate_basic_async())
```

### Async Function Patterns

```python
async def conditional_async_operation():
    """Async function with conditional logic"""
    should_succeed = random.choice([True, False])
    
    print("Starting conditional operation...")
    await asyncio.sleep(0.5)
    
    if should_succeed:
        print("Operation succeeded!")
        return "Success"
    else:
        print("Operation failed!")
        raise Exception("Random failure occurred")

async def retry_async_operation(max_retries=3):
    """Async function with retry logic"""
    for attempt in range(max_retries):
        try:
            print(f"Attempt {attempt + 1}")
            result = await conditional_async_operation()
            return result
        except Exception as e:
            print(f"Attempt {attempt + 1} failed: {e}")
            if attempt == max_retries - 1:
                print("All attempts failed!")
                raise
            await asyncio.sleep(1)  # Wait before retry

async def timeout_example():
    """Demonstrate timeout handling"""
    
    async def slow_operation():
        print("Starting slow operation...")
        await asyncio.sleep(5)  # This takes 5 seconds
        return "Slow operation completed"
    
    try:
        # Set a timeout of 2 seconds
        result = await asyncio.wait_for(slow_operation(), timeout=2.0)
        print(f"Result: {result}")
    except asyncio.TimeoutError:
        print("Operation timed out!")

async def demonstrate_patterns():
    """Demonstrate various async patterns"""
    
    print("Retry pattern:")
    try:
        result = await retry_async_operation()
        print(f"Final result: {result}")
    except Exception as e:
        print(f"Operation failed completely: {e}")
    
    print("\nTimeout pattern:")
    await timeout_example()

asyncio.run(demonstrate_patterns())
```

---

## 3. Asyncio Fundamentals

### Event Loop and Tasks

```python
async def understanding_event_loop():
    """Understand how the event loop works"""
    
    # Get the current event loop
    loop = asyncio.get_running_loop()
    print(f"Current event loop: {loop}")
    
    # Schedule a callback
    def callback_function():
        print("Callback executed!")
    
    loop.call_later(1, callback_function)
    
    # Create tasks explicitly
    async def background_task(name, duration):
        print(f"Starting background task: {name}")
        await asyncio.sleep(duration)
        print(f"Completed background task: {name}")
        return f"Result from {name}"
    
    # Create tasks
    task1 = asyncio.create_task(background_task("Task 1", 2))
    task2 = asyncio.create_task(background_task("Task 2", 1))
    task3 = asyncio.create_task(background_task("Task 3", 3))
    
    print("Tasks created, starting execution...")
    
    # Wait for all tasks to complete
    results = await asyncio.gather(task1, task2, task3)
    print(f"All tasks completed: {results}")
    
    # Wait a bit for the callback
    await asyncio.sleep(1.5)

asyncio.run(understanding_event_loop())
```

### Working with Futures

```python
async def working_with_futures():
    """Demonstrate working with futures"""
    
    loop = asyncio.get_running_loop()
    
    # Create a future manually
    future = loop.create_future()
    
    async def set_future_result():
        await asyncio.sleep(2)
        future.set_result("Future result is ready!")
    
    # Start the task that will set the future result
    asyncio.create_task(set_future_result())
    
    print("Waiting for future...")
    result = await future
    print(f"Future result: {result}")
    
    # Future with exception
    future_with_error = loop.create_future()
    
    async def set_future_exception():
        await asyncio.sleep(1)
        future_with_error.set_exception(ValueError("Something went wrong!"))
    
    asyncio.create_task(set_future_exception())
    
    try:
        await future_with_error
    except ValueError as e:
        print(f"Future raised exception: {e}")

asyncio.run(working_with_futures())
```

---

## 4. Working with Tasks

### Task Management

```python
async def task_management_examples():
    """Demonstrate various task management techniques"""
    
    async def worker_task(worker_id, work_duration):
        print(f"Worker {worker_id} starting work")
        await asyncio.sleep(work_duration)
        print(f"Worker {worker_id} finished work")
        return f"Result from worker {worker_id}"
    
    # 1. Creating and managing multiple tasks
    print("1. Creating multiple tasks:")
    tasks = []
    for i in range(3):
        task = asyncio.create_task(worker_task(i, random.uniform(1, 3)))
        tasks.append(task)
    
    # Wait for all tasks
    results = await asyncio.gather(*tasks)
    print(f"All workers completed: {results}")
    
    # 2. Task cancellation
    print("\n2. Task cancellation:")
    
    async def long_running_task():
        try:
            print("Long task started")
            await asyncio.sleep(10)  # This would take 10 seconds
            print("Long task completed")
        except asyncio.CancelledError:
            print("Long task was cancelled")
            raise  # Re-raise to properly handle cancellation
    
    task = asyncio.create_task(long_running_task())
    await asyncio.sleep(1)  # Let it run for 1 second
    task.cancel()  # Cancel the task
    
    try:
        await task
    except asyncio.CancelledError:
        print("Confirmed: task was cancelled")
    
    # 3. Task groups (Python 3.11+)
    print("\n3. Running tasks with timeout:")
    
    async def task_with_timeout():
        tasks = [
            asyncio.create_task(worker_task(f"timeout_{i}", random.uniform(1, 4)))
            for i in range(3)
        ]
        
        try:
            results = await asyncio.wait_for(
                asyncio.gather(*tasks), 
                timeout=2.5
            )
            return results
        except asyncio.TimeoutError:
            print("Some tasks timed out!")
            # Cancel remaining tasks
            for task in tasks:
                if not task.done():
                    task.cancel()
            raise
    
    try:
        results = await task_with_timeout()
        print(f"All tasks completed within timeout: {results}")
    except asyncio.TimeoutError:
        print("Tasks timed out as expected")

asyncio.run(task_management_examples())
```

### Task Synchronization

```python
async def task_synchronization():
    """Demonstrate task synchronization primitives"""
    
    # 1. Locks
    print("1. Using asyncio.Lock:")
    
    shared_resource = {"counter": 0}
    lock = asyncio.Lock()
    
    async def increment_counter(worker_id):
        for _ in range(3):
            async with lock:  # Critical section
                current = shared_resource["counter"]
                print(f"Worker {worker_id} read counter: {current}")
                await asyncio.sleep(0.1)  # Simulate work
                shared_resource["counter"] = current + 1
                print(f"Worker {worker_id} incremented counter to: {shared_resource['counter']}")
    
    # Run multiple workers concurrently
    await asyncio.gather(
        increment_counter("A"),
        increment_counter("B"),
        increment_counter("C")
    )
    
    print(f"Final counter value: {shared_resource['counter']}")
    
    # 2. Semaphores
    print("\n2. Using asyncio.Semaphore:")
    
    # Limit to 2 concurrent operations
    semaphore = asyncio.Semaphore(2)
    
    async def limited_operation(operation_id):
        async with semaphore:
            print(f"Operation {operation_id} started (semaphore acquired)")
            await asyncio.sleep(1)
            print(f"Operation {operation_id} completed (semaphore released)")
    
    # Start 5 operations, but only 2 can run concurrently
    await asyncio.gather(*[
        limited_operation(i) for i in range(5)
    ])
    
    # 3. Events
    print("\n3. Using asyncio.Event:")
    
    event = asyncio.Event()
    
    async def waiter(waiter_id):
        print(f"Waiter {waiter_id} waiting for event...")
        await event.wait()
        print(f"Waiter {waiter_id} received event!")
    
    async def setter():
        await asyncio.sleep(2)
        print("Setting event...")
        event.set()
    
    # Start waiters and setter
    await asyncio.gather(
        waiter("1"),
        waiter("2"),
        waiter("3"),
        setter()
    )

asyncio.run(task_synchronization())
```

---

## 5. Async Context Managers

### Creating Async Context Managers

```python
class AsyncDatabaseConnection:
    """Example async context manager for database connections"""
    
    def __init__(self, connection_string):
        self.connection_string = connection_string
        self.connection = None
    
    async def __aenter__(self):
        print(f"Connecting to database: {self.connection_string}")
        await asyncio.sleep(0.5)  # Simulate connection time
        self.connection = f"Connected to {self.connection_string}"
        print("Database connection established")
        return self
    
    async def __aexit__(self, exc_type, exc_value, traceback):
        print("Closing database connection")
        await asyncio.sleep(0.2)  # Simulate cleanup time
        self.connection = None
        print("Database connection closed")
        
        if exc_type is not None:
            print(f"Exception occurred: {exc_value}")
        
        return False  # Don't suppress exceptions
    
    async def execute_query(self, query):
        if not self.connection:
            raise Exception("Not connected to database")
        
        print(f"Executing query: {query}")
        await asyncio.sleep(0.3)  # Simulate query time
        return f"Result for: {query}"

class AsyncFileManager:
    """Async context manager for file operations"""
    
    def __init__(self, filename, mode='r'):
        self.filename = filename
        self.mode = mode
        self.file = None
    
    async def __aenter__(self):
        print(f"Opening file: {self.filename}")
        await asyncio.sleep(0.1)  # Simulate file opening
        # In real implementation, you'd use aiofiles
        self.file = f"Simulated file handle for {self.filename}"
        return self
    
    async def __aexit__(self, exc_type, exc_value, traceback):
        print(f"Closing file: {self.filename}")
        await asyncio.sleep(0.1)  # Simulate file closing
        self.file = None
    
    async def read_content(self):
        if not self.file:
            raise Exception("File not open")
        await asyncio.sleep(0.2)  # Simulate reading
        return f"Content from {self.filename}"

async def demonstrate_async_context_managers():
    """Demonstrate usage of async context managers"""
    
    # 1. Database connection example
    print("1. Async database context manager:")
    async with AsyncDatabaseConnection("postgresql://localhost:5432/mydb") as db:
        result1 = await db.execute_query("SELECT * FROM users")
        print(f"Query result: {result1}")
        
        result2 = await db.execute_query("SELECT * FROM orders")
        print(f"Query result: {result2}")
    
    print()  # Database automatically closed here
    
    # 2. File manager example
    print("2. Async file context manager:")
    async with AsyncFileManager("data.txt", "r") as file_mgr:
        content = await file_mgr.read_content()
        print(f"File content: {content}")
    
    print()  # File automatically closed here
    
    # 3. Exception handling
    print("3. Exception handling in async context manager:")
    try:
        async with AsyncDatabaseConnection("postgresql://localhost:5432/testdb") as db:
            await db.execute_query("SELECT * FROM users")
            raise ValueError("Something went wrong!")
    except ValueError as e:
        print(f"Caught exception: {e}")

asyncio.run(demonstrate_async_context_managers())
```

### Async Context Manager Decorators

```python
from contextlib import asynccontextmanager

@asynccontextmanager
async def async_timer():
    """Async context manager to time operations"""
    start_time = asyncio.get_event_loop().time()
    print("Timer started")
    try:
        yield
    finally:
        end_time = asyncio.get_event_loop().time()
        elapsed = end_time - start_time
        print(f"Timer ended. Elapsed time: {elapsed:.2f} seconds")

@asynccontextmanager
async def async_resource_pool(max_resources=3):
    """Async context manager for resource pooling"""
    semaphore = asyncio.Semaphore(max_resources)
    resources_used = 0
    
    async def acquire_resource():
        nonlocal resources_used
        await semaphore.acquire()
        resources_used += 1
        resource_id = resources_used
        print(f"Acquired resource #{resource_id}")
        return resource_id
    
    def release_resource(resource_id):
        print(f"Released resource #{resource_id}")
        semaphore.release()
    
    try:
        yield acquire_resource, release_resource
    finally:
        print("Resource pool cleanup complete")

async def demonstrate_contextmanager_decorator():
    """Demonstrate @asynccontextmanager decorator"""
    
    # 1. Timer example
    print("1. Async timer context manager:")
    async with async_timer():
        print("Doing some async work...")
        await asyncio.sleep(1.5)
        print("Work completed")
    
    print()
    
    # 2. Resource pool example
    print("2. Resource pool context manager:")
    async with async_resource_pool(max_resources=2) as (acquire, release):
        
        async def worker(worker_id):
            resource_id = await acquire()
            try:
                print(f"Worker {worker_id} using resource {resource_id}")
                await asyncio.sleep(1)
                print(f"Worker {worker_id} finished with resource {resource_id}")
            finally:
                release(resource_id)
        
        # Start 4 workers, but only 2 resources available
        await asyncio.gather(*[worker(i) for i in range(4)])

asyncio.run(demonstrate_contextmanager_decorator())
```

---

## 6. Async Iterators and Generators

### Async Iterators

```python
class AsyncNumberGenerator:
    """Async iterator that generates numbers with delays"""
    
    def __init__(self, start, end, delay=0.5):
        self.start = start
        self.end = end
        self.delay = delay
        self.current = start
    
    def __aiter__(self):
        return self
    
    async def __anext__(self):
        if self.current >= self.end:
            raise StopAsyncIteration
        
        await asyncio.sleep(self.delay)  # Simulate async work
        value = self.current
        self.current += 1
        print(f"Generated: {value}")
        return value

class AsyncDataStreamer:
    """Async iterator that simulates streaming data"""
    
    def __init__(self, data_source, chunk_size=3):
        self.data_source = data_source
        self.chunk_size = chunk_size
        self.index = 0
    
    def __aiter__(self):
        return self
    
    async def __anext__(self):
        if self.index >= len(self.data_source):
            raise StopAsyncIteration
        
        # Simulate network delay
        await asyncio.sleep(0.3)
        
        # Get next chunk
        chunk = self.data_source[self.index:self.index + self.chunk_size]
        self.index += self.chunk_size
        
        print(f"Streamed chunk: {chunk}")
        return chunk

async def demonstrate_async_iterators():
    """Demonstrate async iterator usage"""
    
    print("1. Async number generator:")
    async for number in AsyncNumberGenerator(1, 5, delay=0.3):
        print(f"Received number: {number}")
    
    print("\n2. Async data streamer:")
    data = list(range(10))  # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
    
    async for chunk in AsyncDataStreamer(data, chunk_size=3):
        print(f"Processing chunk: {chunk}")
        # Simulate processing time
        await asyncio.sleep(0.2)
    
    print("\n3. Async comprehension:")
    # Async comprehension with async iterator
    squares = [
        x ** 2 async for x in AsyncNumberGenerator(1, 6, delay=0.1)
    ]
    print(f"Squares: {squares}")

asyncio.run(demonstrate_async_iterators())
```

### Async Generators

```python
async def async_fibonacci(n):
    """Async generator for Fibonacci sequence"""
    a, b = 0, 1
    count = 0
    
    while count < n:
        yield a
        print(f"Generated Fibonacci number: {a}")
        await asyncio.sleep(0.2)  # Simulate computation delay
        a, b = b, a + b
        count += 1

async def async_file_reader(lines):
    """Async generator that simulates reading file lines"""
    for line_num, line in enumerate(lines, 1):
        # Simulate I/O delay
        await asyncio.sleep(0.1)
        yield line_num, line.strip()

async def async_api_paginator(total_pages, page_size=5):
    """Async generator that simulates API pagination"""
    for page in range(1, total_pages + 1):
        print(f"Fetching page {page}...")
        # Simulate API call delay
        await asyncio.sleep(0.5)
        
        # Generate sample data for this page
        start_item = (page - 1) * page_size + 1
        end_item = min(page * page_size, total_pages * page_size)
        items = [f"item_{i}" for i in range(start_item, end_item + 1)]
        
        yield {
            "page": page,
            "items": items,
            "total_pages": total_pages
        }

async def demonstrate_async_generators():
    """Demonstrate async generator usage"""
    
    print("1. Async Fibonacci generator:")
    async for fib_num in async_fibonacci(8):
        print(f"Fibonacci: {fib_num}")
    
    print("\n2. Async file reader:")
    sample_lines = [
        "First line of file",
        "Second line with data",
        "Third line here",
        "Final line of file"
    ]
    
    async for line_num, line_content in async_file_reader(sample_lines):
        print(f"Line {line_num}: {line_content}")
    
    print("\n3. Async API paginator:")
    all_items = []
    async for page_data in async_api_paginator(total_pages=3, page_size=4):
        print(f"Page {page_data['page']}: {page_data['items']}")
        all_items.extend(page_data['items'])
    
    print(f"Total items collected: {len(all_items)}")

asyncio.run(demonstrate_async_generators())
```

---

## 7. Error Handling in Async Code

### Exception Handling Patterns

```python
async def error_handling_examples():
    """Demonstrate error handling in async code"""
    
    # 1. Basic exception handling
    async def risky_operation(should_fail=False):
        await asyncio.sleep(0.5)
        if should_fail:
            raise ValueError("Operation failed!")
        return "Operation succeeded"
    
    print("1. Basic async exception handling:")
    try:
        result = await risky_operation(should_fail=False)
        print(f"Success: {result}")
        
        result = await risky_operation(should_fail=True)
        print(f"This won't print: {result}")
    except ValueError as e:
        print(f"Caught exception: {e}")
    
    # 2. Handling exceptions in multiple tasks
    print("\n2. Exception handling with multiple tasks:")
    
    async def task_that_might_fail(task_id, should_fail=False):
        await asyncio.sleep(random.uniform(0.5, 1.5))
        if should_fail:
            raise Exception(f"Task {task_id} failed!")
        return f"Task {task_id} completed successfully"
    
    tasks = [
        task_that_might_fail(1, should_fail=False),
        task_that_might_fail(2, should_fail=True),
        task_that_might_fail(3, should_fail=False),
    ]
    
    # Using asyncio.gather with exception handling
    try:
        results = await asyncio.gather(*tasks, return_exceptions=True)
        print("Results with exceptions:")
        for i, result in enumerate(results, 1):
            if isinstance(result, Exception):
                print(f"  Task {i}: FAILED - {result}")
            else:
                print(f"  Task {i}: SUCCESS - {result}")
    except Exception as e:
        print(f"Unexpected error: {e}")
    
    # 3. Exception handling with timeout
    print("\n3. Exception handling with timeout:")
    
    async def slow_operation():
        await asyncio.sleep(3)  # Takes 3 seconds
        return "Slow operation completed"
    
    try:
        result = await asyncio.wait_for(slow_operation(), timeout=1.0)
        print(f"Result: {result}")
    except asyncio.TimeoutError:
        print("Operation timed out")
    except Exception as e:
        print(f"Other error: {e}")

asyncio.run(error_handling_examples())
```

### Retry and Circuit Breaker Patterns

```python
import functools

def async_retry(max_attempts=3, delay=1.0, backoff=2.0):
    """Decorator for async retry logic"""
    def decorator(func):
        @functools.wraps(func)
        async def wrapper(*args, **kwargs):
            current_delay = delay
            last_exception = None
            
            for attempt in range(max_attempts):
                try:
                    return await func(*args, **kwargs)
                except Exception as e:
                    last_exception = e
                    print(f"Attempt {attempt + 1} failed: {e}")
                    
                    if attempt < max_attempts - 1:
                        print(f"Retrying in {current_delay} seconds...")
                        await asyncio.sleep(current_delay)
                        current_delay *= backoff
                    else:
                        print(f"All {max_attempts} attempts failed")
            
            raise last_exception
        return wrapper
    return decorator

class AsyncCircuitBreaker:
    """Circuit breaker pattern for async operations"""
    
    def __init__(self, failure_threshold=5, recovery_timeout=60, expected_exception=Exception):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.expected_exception = expected_exception
        
        self.failure_count = 0
        self.last_failure_time = None
        self.state = "CLOSED"  # CLOSED, OPEN, HALF_OPEN
    
    async def call(self, func, *args, **kwargs):
        """Execute function with circuit breaker protection"""
        
        if self.state == "OPEN":
            if self._should_attempt_reset():
                self.state = "HALF_OPEN"
            else:
                raise Exception("Circuit breaker is OPEN")
        
        try:
            result = await func(*args, **kwargs)
            self._on_success()
            return result
        except self.expected_exception as e:
            self._on_failure()
            raise e
    
    def _should_attempt_reset(self):
        return (
            self.last_failure_time is not None and
            time.time() - self.last_failure_time >= self.recovery_timeout
        )
    
    def _on_success(self):
        self.failure_count = 0
        self.state = "CLOSED"
    
    def _on_failure(self):
        import time
        self.failure_count += 1
        self.last_failure_time = time.time()
        
        if self.failure_count >= self.failure_threshold:
            self.state = "OPEN"
            print(f"Circuit breaker opened after {self.failure_count} failures")

async def demonstrate_error_patterns():
    """Demonstrate retry and circuit breaker patterns"""
    
    # 1. Retry decorator
    print("1. Retry pattern:")
    
    @async_retry(max_attempts=3, delay=0.5, backoff=2.0)
    async def unreliable_service():
        if random.random() < 0.7:  # 70% chance of failure
            raise ConnectionError("Service unavailable")
        return "Service response"
    
    try:
        result = await unreliable_service()
        print(f"Service result: {result}")
    except ConnectionError as e:
        print(f"Service failed after retries: {e}")
    
    # 2. Circuit breaker pattern
    print("\n2. Circuit breaker pattern:")
    
    circuit_breaker = AsyncCircuitBreaker(failure_threshold=3, recovery_timeout=2)
    
    async def flaky_external_api():
        if random.random() < 0.8:  # 80% chance of failure
            raise ConnectionError("External API error")
        return "API response"
    
    # Simulate multiple calls
    for i in range(10):
        try:
            result = await circuit_breaker.call(flaky_external_api)
            print(f"Call {i+1}: {result}")
        except Exception as e:
            print(f"Call {i+1}: {e}")
        
        await asyncio.sleep(0.3)

asyncio.run(demonstrate_error_patterns())
```

---

## 8. Performance and Debugging

### Performance Monitoring

```python
import cProfile
import pstats
from io import StringIO

async def performance_monitoring():
    """Demonstrate performance monitoring techniques"""
    
    # 1. Timing async operations
    async def timed_operation(name, duration):
        start_time = asyncio.get_event_loop().time()
        await asyncio.sleep(duration)
        end_time = asyncio.get_event_loop().time()
        elapsed = end_time - start_time
        print(f"{name} took {elapsed:.3f} seconds")
        return f"Result from {name}"
    
    print("1. Timing individual operations:")
    await timed_operation("Fast operation", 0.5)
    await timed_operation("Slow operation", 1.5)
    
    # 2. Profiling async code
    print("\n2. Profiling async code:")
    
    async def cpu_intensive_async():
        """Simulate CPU-intensive work in async function"""
        total = 0
        for i in range(100000):
            total += i * i
            if i % 10000 == 0:
                await asyncio.sleep(0)  # Yield control
        return total
    
    # Profile the async function
    profiler = cProfile.Profile()
    profiler.enable()
    
    result = await cpu_intensive_async()
    
    profiler.disable()
    
    # Get profiling results
    s = StringIO()
    ps = pstats.Stats(profiler, stream=s).sort_stats('cumulative')
    ps.print_stats(10)  # Top 10 functions
    
    print("Profiling results:")
    print(s.getvalue())
    
    # 3. Memory usage monitoring
    print("\n3. Memory usage patterns:")
    
    async def memory_intensive_task():
        data = []
        for i in range(1000):
            data.append([random.random() for _ in range(100)])
            if i % 100 == 0:
                await asyncio.sleep(0)  # Yield control
                print(f"Created {i+1} data chunks")
        return len(data)
    
    result = await memory_intensive_task()
    print(f"Created {result} data chunks")

asyncio.run(performance_monitoring())
```

### Debugging Async Code

```python
import logging
import sys

# Configure logging for async debugging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

logger = logging.getLogger(__name__)

async def debugging_examples():
    """Demonstrate debugging techniques for async code"""
    
    # 1. Logging in async functions
    async def logged_operation(operation_id):
        logger.info(f"Starting operation {operation_id}")
        try:
            await asyncio.sleep(random.uniform(0.5, 1.5))
            if random.random() < 0.3:  # 30% chance of failure
                raise ValueError(f"Operation {operation_id} failed")
            logger.info(f"Operation {operation_id} completed successfully")
            return f"Result from operation {operation_id}"
        except Exception as e:
            logger.error(f"Operation {operation_id} failed: {e}")
            raise
    
    print("1. Logging async operations:")
    tasks = [logged_operation(i) for i in range(5)]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Operation {i}: FAILED")
        else:
            print(f"Operation {i}: SUCCESS")
    
    # 2. Debug mode for asyncio
    print("\n2. Asyncio debug information:")
    
    # Get debug info about the event loop
    loop = asyncio.get_running_loop()
    print(f"Event loop debug mode: {loop.get_debug()}")
    
    # 3. Task inspection
    print("\n3. Task inspection:")
    
    async def long_running_task(task_id):
        print(f"Task {task_id} started")
        for i in range(5):
            print(f"Task {task_id} - step {i+1}")
            await asyncio.sleep(0.5)
        print(f"Task {task_id} completed")
        return f"Result from task {task_id}"
    
    # Start tasks
    task1 = asyncio.create_task(long_running_task("A"))
    task2 = asyncio.create_task(long_running_task("B"))
    
    # Wait a bit and inspect tasks
    await asyncio.sleep(1)
    
    print(f"Task 1 done: {task1.done()}")
    print(f"Task 2 done: {task2.done()}")
    
    # Wait for completion
    await asyncio.gather(task1, task2)

# Enable asyncio debug mode
asyncio.run(debugging_examples(), debug=True)
```

---

## 9. Real-World Examples

### Web Scraping with Async

```python
# Note: In real implementation, you'd use aiohttp and aiofiles
# This is a simulation for educational purposes

async def async_web_scraper():
    """Simulate asynchronous web scraping"""
    
    async def fetch_url(session, url):
        """Simulate fetching a URL"""
        print(f"Fetching {url}...")
        await asyncio.sleep(random.uniform(0.5, 2.0))  # Simulate network delay
        
        # Simulate different response scenarios
        if random.random() < 0.1:  # 10% chance of failure
            raise Exception(f"Failed to fetch {url}")
        
        # Simulate response content
        content = f"Content from {url} (length: {random.randint(1000, 5000)} chars)"
        print(f"Successfully fetched {url}")
        return content
    
    async def process_content(content):
        """Simulate content processing"""
        await asyncio.sleep(0.2)  # Simulate processing time
        word_count = len(content.split())
        return {"content": content[:50] + "...", "word_count": word_count}
    
    urls = [
        "https://example.com/page1",
        "https://example.com/page2",
        "https://example.com/page3",
        "https://example.com/page4",
        "https://example.com/page5"
    ]
    
    # Simulate session (in real code, use aiohttp.ClientSession)
    session = "simulated_session"
    
    print("Starting concurrent web scraping...")
    
    # Fetch all URLs concurrently
    fetch_tasks = [fetch_url(session, url) for url in urls]
    contents = await asyncio.gather(*fetch_tasks, return_exceptions=True)
    
    # Process successful fetches
    successful_contents = [
        content for content in contents 
        if not isinstance(content, Exception)
    ]
    
    print(f"Successfully fetched {len(successful_contents)} out of {len(urls)} URLs")
    
    # Process content concurrently
    process_tasks = [process_content(content) for content in successful_contents]
    results = await asyncio.gather(*process_tasks)
    
    print("Processing results:")
    for i, result in enumerate(results, 1):
        print(f"  Page {i}: {result['word_count']} words")
    
    return results

asyncio.run(async_web_scraper())
```

### Async Database Operations

```python
async def async_database_example():
    """Simulate async database operations"""
    
    class AsyncDatabase:
        """Simulated async database connection"""
        
        def __init__(self):
            self.connected = False
            self.data = {
                "users": [
                    {"id": 1, "name": "Alice", "email": "alice@example.com"},
                    {"id": 2, "name": "Bob", "email": "bob@example.com"},
                    {"id": 3, "name": "Charlie", "email": "charlie@example.com"}
                ]
            }
        
        async def connect(self):
            print("Connecting to database...")
            await asyncio.sleep(0.5)  # Simulate connection time
            self.connected = True
            print("Database connected")
        
        async def disconnect(self):
            print("Disconnecting from database...")
            await asyncio.sleep(0.2)
            self.connected = False
            print("Database disconnected")
        
        async def execute_query(self, query, params=None):
            if not self.connected:
                raise Exception("Database not connected")
            
            print(f"Executing query: {query}")
            await asyncio.sleep(random.uniform(0.1, 0.5))  # Simulate query time
            
            # Simulate different query types
            if "SELECT" in query.upper():
                if "WHERE id" in query:
                    user_id = params.get("id") if params else 1
                    user = next((u for u in self.data["users"] if u["id"] == user_id), None)
                    return [user] if user else []
                else:
                    return self.data["users"]
            elif "INSERT" in query.upper():
                new_user = params if params else {"id": 999, "name": "New User"}
                self.data["users"].append(new_user)
                return {"inserted_id": new_user["id"]}
            
            return {"affected_rows": 1}
    
    async def fetch_user_data(db, user_ids):
        """Fetch multiple users concurrently"""
        async def fetch_single_user(user_id):
            result = await db.execute_query(
                "SELECT * FROM users WHERE id = %s", 
                {"id": user_id}
            )
            return result[0] if result else None
        
        tasks = [fetch_single_user(user_id) for user_id in user_ids]
        users = await asyncio.gather(*tasks)
        return [user for user in users if user is not None]
    
    # Main database operations
    db = AsyncDatabase()
    
    try:
        await db.connect()
        
        # Fetch all users
        print("\n1. Fetching all users:")
        all_users = await db.execute_query("SELECT * FROM users")
        for user in all_users:
            print(f"  {user}")
        
        # Fetch specific users concurrently
        print("\n2. Fetching specific users concurrently:")
        user_ids = [1, 2, 3]
        users = await fetch_user_data(db, user_ids)
        for user in users:
            print(f"  Fetched: {user}")
        
        # Simulate batch operations
        print("\n3. Batch operations:")
        batch_tasks = [
            db.execute_query("SELECT * FROM users WHERE id = %s", {"id": 1}),
            db.execute_query("SELECT * FROM users WHERE id = %s", {"id": 2}),
            db.execute_query("INSERT INTO users VALUES (%s, %s)", {"id": 4, "name": "Diana"})
        ]
        
        results = await asyncio.gather(*batch_tasks)
        print(f"  Batch operation results: {len(results)} operations completed")
        
    finally:
        await db.disconnect()

asyncio.run(async_database_example())
```

---

## 10. Practice Exercises

### Exercise 1: Async Task Queue

```python
class AsyncTaskQueue:
    """Async task queue with worker pool"""
    
    def __init__(self, max_workers=3):
        self.max_workers = max_workers
        self.queue = asyncio.Queue()
        self.workers = []
        self.running = False
        self.results = {}
    
    async def add_task(self, task_id, coro):
        """Add a task to the queue"""
        await self.queue.put((task_id, coro))
        print(f"Added task {task_id} to queue")
    
    async def worker(self, worker_id):
        """Worker that processes tasks from the queue"""
        print(f"Worker {worker_id} started")
        while self.running:
            try:
                # Wait for a task with timeout
                task_id, coro = await asyncio.wait_for(
                    self.queue.get(), timeout=1.0
                )
                
                print(f"Worker {worker_id} processing task {task_id}")
                start_time = asyncio.get_event_loop().time()
                
                try:
                    result = await coro
                    end_time = asyncio.get_event_loop().time()
                    elapsed = end_time - start_time
                    
                    self.results[task_id] = {
                        "status": "completed",
                        "result": result,
                        "duration": elapsed,
                        "worker": worker_id
                    }
                    print(f"Worker {worker_id} completed task {task_id} in {elapsed:.2f}s")
                    
                except Exception as e:
                    self.results[task_id] = {
                        "status": "failed",
                        "error": str(e),
                        "worker": worker_id
                    }
                    print(f"Worker {worker_id} failed task {task_id}: {e}")
                
                finally:
                    self.queue.task_done()
                    
            except asyncio.TimeoutError:
                continue  # No tasks available, keep waiting
            except Exception as e:
                print(f"Worker {worker_id} error: {e}")
        
        print(f"Worker {worker_id} stopped")
    
    async def start(self):
        """Start the worker pool"""
        if self.running:
            return
        
        self.running = True
        self.workers = [
            asyncio.create_task(self.worker(i))
            for i in range(self.max_workers)
        ]
        print(f"Started {self.max_workers} workers")
    
    async def stop(self):
        """Stop the worker pool"""
        if not self.running:
            return
        
        self.running = False
        
        # Wait for all workers to finish
        await asyncio.gather(*self.workers, return_exceptions=True)
        print("All workers stopped")
    
    async def wait_for_completion(self):
        """Wait for all queued tasks to complete"""
        await self.queue.join()
    
    def get_results(self):
        """Get all task results"""
        return self.results.copy()

async def test_async_task_queue():
    """Test the async task queue"""
    
    # Create sample tasks
    async def sample_task(task_name, duration, should_fail=False):
        await asyncio.sleep(duration)
        if should_fail:
            raise ValueError(f"Task {task_name} intentionally failed")
        return f"Result from {task_name}"
    
    # Initialize and start the queue
    task_queue = AsyncTaskQueue(max_workers=2)
    await task_queue.start()
    
    # Add various tasks
    tasks_to_add = [
        ("task1", sample_task("task1", 1.0)),
        ("task2", sample_task("task2", 0.5)),
        ("task3", sample_task("task3", 1.5)),
        ("task4", sample_task("task4", 0.3, should_fail=True)),
        ("task5", sample_task("task5", 0.8)),
    ]
    
    # Add all tasks to queue
    for task_id, coro in tasks_to_add:
        await task_queue.add_task(task_id, coro)
    
    # Wait for all tasks to complete
    await task_queue.wait_for_completion()
    
    # Get and display results
    results = task_queue.get_results()
    print("\nTask Results:")
    for task_id, result in results.items():
        if result["status"] == "completed":
            print(f"  {task_id}: SUCCESS - {result['result']} (took {result['duration']:.2f}s, worker {result['worker']})")
        else:
            print(f"  {task_id}: FAILED - {result['error']} (worker {result['worker']})")
    
    # Stop the queue
    await task_queue.stop()

asyncio.run(test_async_task_queue())
```

### Exercise 2: Async Rate Limiter

```python
class AsyncRateLimiter:
    """Async rate limiter using token bucket algorithm"""
    
    def __init__(self, rate, capacity=None):
        """
        rate: tokens per second
        capacity: maximum tokens (defaults to rate)
        """
        self.rate = rate
        self.capacity = capacity or rate
        self.tokens = self.capacity
        self.last_update = asyncio.get_event_loop().time()
        self.lock = asyncio.Lock()
    
    async def acquire(self, tokens=1):
        """Acquire tokens, blocking if necessary"""
        async with self.lock:
            now = asyncio.get_event_loop().time()
            
            # Add tokens based on elapsed time
            elapsed = now - self.last_update
            self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
            self.last_update = now
            
            # If we don't have enough tokens, wait
            if self.tokens < tokens:
                wait_time = (tokens - self.tokens) / self.rate
                await asyncio.sleep(wait_time)
                self.tokens = 0
            else:
                self.tokens -= tokens
    
    async def try_acquire(self, tokens=1):
        """Try to acquire tokens without blocking"""
        async with self.lock:
            now = asyncio.get_event_loop().time()
            
            # Add tokens based on elapsed time
            elapsed = now - self.last_update
            self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
            self.last_update = now
            
            if self.tokens >= tokens:
                self.tokens -= tokens
                return True
            return False

async def test_rate_limiter():
    """Test the async rate limiter"""
    
    # Create rate limiter: 2 requests per second
    rate_limiter = AsyncRateLimiter(rate=2, capacity=5)
    
    async def rate_limited_operation(operation_id):
        """Simulate a rate-limited operation"""
        await rate_limiter.acquire()
        print(f"Operation {operation_id} executed at {asyncio.get_event_loop().time():.2f}")
        # Simulate some work
        await asyncio.sleep(0.1)
        return f"Result from operation {operation_id}"
    
    print("Testing rate limiter (2 ops/second):")
    
    # Start multiple operations
    tasks = [
        rate_limited_operation(i) for i in range(8)
    ]
    
    start_time = asyncio.get_event_loop().time()
    results = await asyncio.gather(*tasks)
    end_time = asyncio.get_event_loop().time()
    
    print(f"All operations completed in {end_time - start_time:.2f} seconds")
    print("Results:", len(results))

asyncio.run(test_rate_limiter())
```

### Exercise 3: Async Producer-Consumer

```python
class AsyncProducerConsumer:
    """Async producer-consumer pattern with buffering"""
    
    def __init__(self, buffer_size=10):
        self.buffer = asyncio.Queue(maxsize=buffer_size)
        self.producers_finished = asyncio.Event()
        self.active_producers = 0
        self.producer_lock = asyncio.Lock()
    
    async def producer(self, producer_id, items):
        """Producer that adds items to the buffer"""
        async with self.producer_lock:
            self.active_producers += 1
        
        try:
            for item in items:
                await self.buffer.put((producer_id, item))
                print(f"Producer {producer_id} produced: {item}")
                await asyncio.sleep(random.uniform(0.1, 0.5))  # Simulate production time
        
        finally:
            async with self.producer_lock:
                self.active_producers -= 1
                if self.active_producers == 0:
                    self.producers_finished.set()
        
        print(f"Producer {producer_id} finished")
    
    async def consumer(self, consumer_id):
        """Consumer that processes items from the buffer"""
        processed_count = 0
        
        while True:
            try:
                # Wait for item or producer completion
                item_task = asyncio.create_task(self.buffer.get())
                finished_task = asyncio.create_task(self.producers_finished.wait())
                
                done, pending = await asyncio.wait(
                    [item_task, finished_task],
                    return_when=asyncio.FIRST_COMPLETED
                )
                
                # Cancel pending tasks
                for task in pending:
                    task.cancel()
                
                if item_task in done:
                    producer_id, item = await item_task
                    
                    # Process the item
                    print(f"Consumer {consumer_id} processing: {item} from producer {producer_id}")
                    await asyncio.sleep(random.uniform(0.2, 0.8))  # Simulate processing time
                    processed_count += 1
                    
                    self.buffer.task_done()
                
                # Check if producers are finished and buffer is empty
                if self.producers_finished.is_set() and self.buffer.empty():
                    break
                    
            except asyncio.CancelledError:
                break
        
        print(f"Consumer {consumer_id} finished, processed {processed_count} items")
        return processed_count

async def test_producer_consumer():
    """Test the async producer-consumer pattern"""
    
    pc = AsyncProducerConsumer(buffer_size=5)
    
    # Create producer data
    producer_data = {
        "A": [f"item_A_{i}" for i in range(5)],
        "B": [f"item_B_{i}" for i in range(7)],
        "C": [f"item_C_{i}" for i in range(3)]
    }
    
    # Start producers
    producer_tasks = [
        asyncio.create_task(pc.producer(prod_id, items))
        for prod_id, items in producer_data.items()
    ]
    
    # Start consumers
    consumer_tasks = [
        asyncio.create_task(pc.consumer(f"Consumer_{i}"))
        for i in range(3)
    ]
    
    # Wait for all producers to finish
    await asyncio.gather(*producer_tasks)
    
    # Wait for all items to be processed
    await pc.buffer.join()
    
    # Cancel consumers (they'll finish processing current items)
    for task in consumer_tasks:
        task.cancel()
    
    # Collect results
    consumer_results = await asyncio.gather(*consumer_tasks, return_exceptions=True)
    total_processed = sum(result for result in consumer_results if isinstance(result, int))
    
    print(f"\nProduction completed. Total items processed: {total_processed}")

asyncio.run(test_producer_consumer())
```

---

## Summary and Best Practices

### Key Concepts Mastered

```python
def async_programming_summary():
    """Summary of async programming concepts and best practices"""
    
    print("Asynchronous Programming in Python - Course Summary")
    print("=" * 55)
    
    concepts = {
        "Core Concepts": [
            "async/await syntax for defining and calling async functions",
            "Event loop as the heart of async execution",
            "Concurrency vs parallelism understanding",
            "I/O-bound vs CPU-bound task optimization"
        ],
        
        "Asyncio Fundamentals": [
            "Creating and managing tasks with asyncio.create_task()",
            "Coordinating multiple operations with asyncio.gather()",
            "Timeout handling with asyncio.wait_for()",
            "Task cancellation and cleanup"
        ],
        
        "Synchronization Primitives": [
            "asyncio.Lock for protecting shared resources",
            "asyncio.Semaphore for limiting concurrent operations",
            "asyncio.Event for signaling between tasks",
            "asyncio.Queue for producer-consumer patterns"
        ],
        
        "Advanced Patterns": [
            "Async context managers with __aenter__/__aexit__",
            "Async iterators and generators for streaming data",
            "Exception handling in concurrent environments",
            "Rate limiting and circuit breaker patterns"
        ],
        
        "Performance & Debugging": [
            "Profiling async code for bottlenecks",
            "Memory usage optimization in async applications",
            "Debugging techniques for concurrent code",
            "Monitoring and logging async operations"
        ]
    }
    
    print("\nConcepts Covered:")
    for category, items in concepts.items():
        print(f"\n{category}:")
        for item in items:
            print(f"  • {item}")
    
    print(f"\nBest Practices:")
    best_practices = [
        "Use async/await for I/O-bound operations only",
        "Avoid blocking operations in async functions",
        "Handle exceptions properly in concurrent code",
        "Use asyncio.gather() for coordinating multiple tasks",
        "Implement proper cleanup with context managers",
        "Monitor and limit concurrent operations",
        "Use structured concurrency patterns",
        "Profile and optimize async code performance"
    ]
    
    for practice in best_practices:
        print(f"  • {practice}")
    
    print(f"\nCommon Pitfalls to Avoid:")
    pitfalls = [
        "Forgetting to use 'await' with async functions",
        "Using blocking I/O operations in async code",
        "Creating too many concurrent tasks without limits",
        "Not handling task cancellation properly",
        "Mixing synchronous and asynchronous code incorrectly",
        "Not considering exception propagation in gather()",
        "Using async for CPU-bound tasks",
        "Forgetting to close resources in async context managers"
    ]
    
    for pitfall in pitfalls:
        print(f"  • {pitfall}")

async_programming_summary()
```

### When to Use Async Programming

```python
def when_to_use_async():
    """Guidelines for when async programming is beneficial"""
    
    print("\nWhen to Use Async Programming:")
    print("=" * 33)
    
    use_cases = {
        "Excellent for Async": [
            "Web scraping with multiple HTTP requests",
            "Database operations with high I/O wait",
            "File operations with multiple files",
            "Real-time applications (chat, gaming)",
            "API services handling many requests",
            "Data streaming and processing pipelines"
        ],
        
        "Not Suitable for Async": [
            "CPU-intensive mathematical calculations",
            "Image/video processing without I/O",
            "Simple scripts with minimal I/O",
            "Single-threaded CPU-bound algorithms",
            "Applications with mostly synchronous libraries"
        ],
        
        "Consider Alternatives": [
            "multiprocessing for CPU-bound parallel work",
            "threading for I/O with blocking libraries",
            "concurrent.futures for mixed workloads",
            "Synchronous code for simple, sequential tasks"
        ]
    }
    
    for category, examples in use_cases.items():
        print(f"\n{category}:")
        for example in examples:
            print(f"  • {example}")
    
    print(f"\nPerformance Considerations:")
    considerations = [
        "Async overhead: small for I/O-bound, significant for CPU-bound",
        "Memory usage: async can use more memory per task",
        "Debugging complexity: async code can be harder to debug",
        "Library ecosystem: ensure async-compatible libraries exist",
        "Team expertise: consider learning curve for async patterns"
    ]
    
    for consideration in considerations:
        print(f"  • {consideration}")

when_to_use_async()
```

---

## Final Project Challenge

```python
async def final_project_example():
    """
    Final Project: Async Web Service Monitor
    
    This project combines multiple async concepts:
    - Concurrent HTTP monitoring
    - Rate limiting
    - Error handling and retries
    - Data aggregation
    - Real-time reporting
    """
    
    class WebServiceMonitor:
        """Monitor multiple web services concurrently"""
        
        def __init__(self, check_interval=10, max_concurrent=5):
            self.check_interval = check_interval
            self.semaphore = asyncio.Semaphore(max_concurrent)
            self.services = {}
            self.running = False
            self.results = {}
        
        def add_service(self, name, url, expected_status=200):
            """Add a service to monitor"""
            self.services[name] = {
                "url": url,
                "expected_status": expected_status,
                "check_count": 0,
                "success_count": 0,
                "last_check": None,
                "last_status": None
            }
        
        async def check_service(self, name, service_info):
            """Check a single service"""
            async with self.semaphore:
                try:
                    # Simulate HTTP request
                    await asyncio.sleep(random.uniform(0.1, 1.0))
                    
                    # Simulate response
                    status_code = random.choices(
                        [200, 404, 500, 503],
                        weights=[85, 5, 5, 5]
                    )[0]
                    
                    service_info["check_count"] += 1
                    service_info["last_check"] = asyncio.get_event_loop().time()
                    service_info["last_status"] = status_code
                    
                    if status_code == service_info["expected_status"]:
                        service_info["success_count"] += 1
                        return {"name": name, "status": "UP", "response_code": status_code}
                    else:
                        return {"name": name, "status": "DOWN", "response_code": status_code}
                
                except Exception as e:
                    return {"name": name, "status": "ERROR", "error": str(e)}
        
        async def monitor_loop(self):
            """Main monitoring loop"""
            while self.running:
                check_tasks = [
                    self.check_service(name, info)
                    for name, info in self.services.items()
                ]
                
                results = await asyncio.gather(*check_tasks, return_exceptions=True)
                
                # Process results
                timestamp = asyncio.get_event_loop().time()
                self.results[timestamp] = results
                
                # Report status
                up_count = sum(1 for r in results if isinstance(r, dict) and r.get("status") == "UP")
                total_count = len(results)
                
                print(f"Health Check: {up_count}/{total_count} services UP")
                
                # Wait for next check
                await asyncio.sleep(self.check_interval)
        
        async def start_monitoring(self, duration=30):
            """Start monitoring for specified duration"""
            self.running = True
            monitor_task = asyncio.create_task(self.monitor_loop())
            
            # Run for specified duration
            await asyncio.sleep(duration)
            
            self.running = False
            await monitor_task
        
        def get_service_stats(self):
            """Get statistics for all services"""
            stats = {}
            for name, info in self.services.items():
                if info["check_count"] > 0:
                    uptime = (info["success_count"] / info["check_count"]) * 100
                    stats[name] = {
                        "uptime_percentage": round(uptime, 2),
                        "total_checks": info["check_count"],
                        "successful_checks": info["success_count"],
                        "last_status": info["last_status"]
                    }
            return stats
    
    # Demo the monitoring system
    monitor = WebServiceMonitor(check_interval=2, max_concurrent=3)
    
    # Add services to monitor
    services_to_monitor = [
        ("Primary API", "https://api.primary.com/health"),
        ("User Service", "https://users.api.com/status"),
        ("Payment Gateway", "https://payments.secure.com/ping"),
        ("Database", "https://db.internal.com/health"),
        ("Cache Service", "https://cache.redis.com/status")
    ]
    
    for name, url in services_to_monitor:
        monitor.add_service(name, url)
    
    print("Starting Web Service Monitor...")
    print("Monitoring 5 services for 10 seconds...\n")
    
    # Start monitoring
    await monitor.start_monitoring(duration=10)
    
    # Get final statistics
    stats = monitor.get_service_stats()
    
    print("\nFinal Service Statistics:")
    print("-" * 40)
    for service_name, service_stats in stats.items():
        print(f"{service_name}:")
        print(f"  Uptime: {service_stats['uptime_percentage']}%")
        print(f"  Checks: {service_stats['successful_checks']}/{service_stats['total_checks']}")
        print(f"  Last Status: {service_stats['last_status']}")
        print()

asyncio.run(final_project_example())
```

---

## Course Completion

Congratulations! You've mastered asynchronous programming in Python. You now understand how to:

- Write efficient concurrent code using async/await
- Handle I/O-bound operations efficiently
- Implement complex async patterns and synchronization
- Debug and optimize async applications
- Build real-world async systems

### Next Steps

1. **Practice with Real Projects**: Apply these concepts to web scraping, API development, or data processing projects
2. **Explore Advanced Libraries**: Learn aiohttp, aiofiles, asyncpg for specific use cases
3. **Study Production Patterns**: Investigate how frameworks like FastAPI use async patterns
4. **Performance Optimization**: Dive deeper into async performance profiling and optimization
5. **Distributed Systems**: Learn about async patterns in microservices and distributed computing

### Additional Resources

- **aiohttp**: Async HTTP client/server framework
- **FastAPI**: Modern async web framework
- **asyncpg**: Async PostgreSQL driver
- **aiofiles**: Async file operations
- **aiodns**: Async DNS resolution
- **asyncio documentation**: Official Python async programming guide

Happy async programming!