# Understanding Python's asyncio

This notebook provides a comprehensive tutorial on Python's `asyncio` library, designed to teach you asynchronous programming concepts from the ground up.

## Contents

1. [Introduction to Asynchronous Programming](#1-introduction-to-asynchronous-programming)
2. [Asyncio Basics](#2-asyncio-basics)
3. [Working with Tasks and Futures](#3-working-with-tasks-and-futures)
4. [Advanced Asyncio Concepts](#4-advanced-asyncio-concepts)
5. [Practical Examples](#5-practical-examples)
6. [Best Practices and Patterns](#6-best-practices-and-patterns)

## 1. Introduction to Asynchronous Programming

### What is Asynchronous Programming?

Asynchronous programming is a programming paradigm that allows multiple operations to be executed concurrently (not necessarily in parallel) without blocking the main execution thread. In contrast to synchronous programming where operations are executed sequentially, asynchronous programming enables non-blocking execution, making it ideal for I/O-bound operations.

### Synchronous vs. Asynchronous: A Practical Example

Let's compare synchronous and asynchronous approaches to understand the difference:

In [None]:
import time

# Synchronous function to simulate an I/O operation
def sync_io_operation(operation_name, duration):
    print(f"Starting {operation_name}...")
    time.sleep(duration)  # Blocking operation
    print(f"Finished {operation_name} after {duration}s")
    return f"{operation_name} result"

# Simulating multiple synchronous operations
def run_sync_operations():
    start = time.time()
    
    result1 = sync_io_operation("Operation 1", 2)
    result2 = sync_io_operation("Operation 2", 1)
    result3 = sync_io_operation("Operation 3", 3)
    
    end = time.time()
    print(f"Total synchronous execution time: {end - start:.2f}s")
    return [result1, result2, result3]

# Run the synchronous operations
run_sync_operations()

Now, let's look at the same operations executed asynchronously using `asyncio`:

In [None]:
import asyncio

# Asynchronous function to simulate an I/O operation
async def async_io_operation(operation_name, duration):
    print(f"Starting {operation_name}...")
    await asyncio.sleep(duration)  # Non-blocking operation
    print(f"Finished {operation_name} after {duration}s")
    return f"{operation_name} result"

# Simulating multiple asynchronous operations
async def run_async_operations():
    start = time.time()
    
    # Create tasks to run concurrently
    task1 = asyncio.create_task(async_io_operation("Operation 1", 2))
    task2 = asyncio.create_task(async_io_operation("Operation 2", 1))
    task3 = asyncio.create_task(async_io_operation("Operation 3", 3))
    
    # Wait for all tasks to complete
    results = await asyncio.gather(task1, task2, task3)
    
    end = time.time()
    print(f"Total asynchronous execution time: {end - start:.2f}s")
    return results

# Run the asynchronous operations
await run_async_operations()

### Practice!

Now it's your turn to try the difference between synchronous and asynchronous code.

Complete the following exercise to simulate downloading files of different sizes. The `download_file` function already simulates synchronous downloads. Your task is to:

1. Complete the `async_download_file` function to make it asynchronous using `asyncio.sleep()`
2. Complete the `download_all_async` function to download all files concurrently using `asyncio.gather()`
3. Run both functions and compare the execution times

In [ ]:
import time

# This function simulates a synchronous file download
def download_file(file_name, file_size):
    print(f"Started downloading {file_name} ({file_size}MB)...")
    time.sleep(file_size / 10)  # Simulate download time (1 second per 10MB)
    print(f"Finished downloading {file_name}")
    return f"{file_name} - {file_size}MB"

# Complete this function to simulate asynchronous file download
async def async_download_file(file_name, file_size):
    print(f"Started downloading {file_name} ({file_size}MB)...")
    # TODO: Replace the line below with asyncio.sleep()
    time.sleep(file_size / 10)  
    print(f"Finished downloading {file_name}")
    return f"{file_name} - {file_size}MB"

# Synchronous download of all files
def download_all_sync(files):
    start = time.time()
    results = []
    
    for file_name, file_size in files:
        result = download_file(file_name, file_size)
        results.append(result)
    
    end = time.time()
    print(f"Synchronous download time: {end - start:.2f} seconds")
    return results

# Complete this function to download all files asynchronously
async def download_all_async(files):
    start = time.time()
    
    # TODO: Create tasks for each file download and use asyncio.gather() to run them concurrently
    results = []
    for file_name, file_size in files:
        result = await async_download_file(file_name, file_size)
        results.append(result)
    
    end = time.time()
    print(f"Asynchronous download time: {end - start:.2f} seconds")
    return results

# Files to download: (name, size in MB)
files_to_download = [
    ("document.pdf", 5),  # 0.5 seconds
    ("image.jpg", 10),    # 1 second
    ("video.mp4", 30)     # 3 seconds
]

# Run the synchronous download
sync_results = download_all_sync(files_to_download)

# TODO: Run the asynchronous download
# Uncomment and complete this line:
# async_results = await ...

# Compare results
print("\nSync download complete!")
# Uncomment this line:
# print("Async download complete!")

In [ ]:
### SOLUTION

# Here's the solution for the practice exercise
# It's hidden below - try to solve it yourself first!

#
#
#
#
#
# Scroll down to see the solution
#
#
#
#
#
#

import time
import asyncio

# This is the correct asynchronous implementation
async def async_download_file_solution(file_name, file_size):
    print(f"Started downloading {file_name} ({file_size}MB)...")
    await asyncio.sleep(file_size / 10)  # Non-blocking sleep
    print(f"Finished downloading {file_name}")
    return f"{file_name} - {file_size}MB"

# This is the correct asynchronous implementation for downloading all files
async def download_all_async_solution(files):
    start = time.time()
    
    # Create tasks for each file download
    tasks = [async_download_file_solution(file_name, file_size) 
             for file_name, file_size in files]
    
    # Run all tasks concurrently and wait for all to complete
    results = await asyncio.gather(*tasks)
    
    end = time.time()
    print(f"Asynchronous download time: {end - start:.2f} seconds")
    return results

# Run the asynchronous download with the solution
async def run_solution():
    files_to_download = [
        ("document.pdf", 5),  # 0.5 seconds
        ("image.jpg", 10),    # 1 second
        ("video.mp4", 30)     # 3 seconds
    ]
    print("\nRunning the solution:")
    async_results = await download_all_async_solution(files_to_download)
    print("Async download complete!")
    
    # Expected output: The async version should take around 3 seconds (time of the largest file)
    # rather than 4.5 seconds (sum of all download times) for the sync version

# Uncomment to see the solution in action
# await run_solution()

### When to Use Asyncio

Asyncio is particularly useful for:
- I/O-bound operations (network requests, file operations)
- Handling many concurrent connections (web servers, chat applications)
- Operations that involve waiting (APIs with rate limits, scheduled tasks)

It's less suitable for:
- CPU-bound tasks (use multiprocessing instead)
- Simple sequential operations with no waiting
- Small scripts with minimal I/O operations

## 2. Asyncio Basics

### Coroutines and the `async`/`await` Syntax

A coroutine is a specialized version of a Python generator function that can suspend and resume execution. They're created using the `async def` syntax.

In [None]:
# Define a coroutine
async def my_coroutine():
    print("Coroutine started")
    await asyncio.sleep(1)  # Give up control and allow other coroutines to run
    print("Coroutine resumed after 1 second")
    return "Coroutine completed"

# This won't execute the coroutine, it just creates a coroutine object
coro = my_coroutine()
print(f"Type of my_coroutine(): {type(coro)}")

# To actually run a coroutine, you need to schedule it on the event loop
result = await my_coroutine()
print(f"Result: {result}")

### Practice!

Time to practice creating and running multiple coroutines! In this exercise, you'll create a simple asynchronous countdown system.

Your tasks:
1. Complete the `countdown` coroutine to print a countdown from a given number to 1
2. Complete the `run_countdowns` function to create and run multiple countdowns concurrently
3. Compare the behavior with different waiting strategies

In [ ]:
async def countdown(name, start_value):
    """
    Coroutine that counts down from start_value to 1
    - name: A name for this countdown to identify it in output
    - start_value: Number to start counting down from
    """
    # TODO: Implement a countdown that:
    # 1. Prints the current count
    # 2. Waits for 0.5 seconds between counts using asyncio.sleep()
    # 3. Returns the total time taken when done
    pass

async def run_countdowns():
    """Run multiple countdowns concurrently"""
    print("Starting countdowns...")
    
    # TODO: Create three countdowns with different starting values:
    # - "Countdown A" starting from 5
    # - "Countdown B" starting from 3
    # - "Countdown C" starting from 7
    
    # TODO: Try both methods of running coroutines:
    # 1. Using asyncio.gather()
    # 2. Using asyncio.create_task() and awaiting separately
    
    print("All countdowns complete!")

# Run the exercise
# Uncomment this line to execute:
# await run_countdowns()

In [ ]:
### SOLUTION

# Here's the solution for the practice exercise
# It's hidden below - try to solve it yourself first!

#
#
#
#
#
# Scroll down to see the solution
#
#
#
#
#
#

import time

async def countdown_solution(name, start_value):
    """
    Coroutine that counts down from start_value to 1
    - name: A name for this countdown to identify it in output
    - start_value: Number to start counting down from
    """
    start_time = time.time()
    
    for i in range(start_value, 0, -1):
        print(f"{name}: {i}")
        await asyncio.sleep(0.5)  # Non-blocking wait
    
    elapsed = time.time() - start_time
    print(f"{name} complete in {elapsed:.2f} seconds")
    return name, elapsed

async def run_countdowns_solution():
    """Run multiple countdowns concurrently"""
    print("Starting countdowns...")
    
    print("\nMethod 1: Using asyncio.gather()")
    # Using asyncio.gather()
    results = await asyncio.gather(
        countdown_solution("Countdown A", 5),
        countdown_solution("Countdown B", 3),
        countdown_solution("Countdown C", 7)
    )
    
    for name, elapsed in results:
        print(f"Result: {name} took {elapsed:.2f} seconds")
    
    print("\nMethod 2: Using asyncio.create_task()")
    # Using create_task and awaiting individually
    task_a = asyncio.create_task(countdown_solution("Countdown A", 5))
    task_b = asyncio.create_task(countdown_solution("Countdown B", 3))
    task_c = asyncio.create_task(countdown_solution("Countdown C", 7))
    
    # Wait for all tasks to complete and get their results
    result_a = await task_a
    print(f"Task completed: {result_a[0]}")
    
    result_b = await task_b
    print(f"Task completed: {result_b[0]}")
    
    result_c = await task_c
    print(f"Task completed: {result_c[0]}")
    
    print("All countdowns complete!")

# Uncomment to see the solution in action
# await run_countdowns_solution()

### Understanding the Event Loop

The event loop is the core of every asyncio application. It's responsible for:
- Scheduling and running asyncio tasks
- Handling I/O events
- Running subprocesses
- Managing timeouts

In [None]:
# Get the current event loop
loop = asyncio.get_event_loop()

# In Jupyter notebooks, the event loop is already running (in IPython)
# In regular Python scripts, you would manually run the loop like this:
'''
# For Python 3.7+
asyncio.run(my_coroutine())  # Creates a new event loop and runs the coroutine

# For older Python versions
loop = asyncio.get_event_loop()
result = loop.run_until_complete(my_coroutine())
'''

### Running Multiple Coroutines

There are several ways to run multiple coroutines:

### Practice!

Let's practice working with Futures! In this exercise, you'll implement a simple task management system that uses Futures to represent pending tasks.

Your tasks:
1. Complete the `execute_task` function that takes a Future and sets its result after a delay
2. Complete the `monitor_task` function that reports on a Future's status until it's done
3. Complete the `task_manager` function to create and monitor multiple tasks

In [ ]:
import random

async def execute_task(future, task_name, delay):
    """
    Simulates task execution and sets the future result when done
    - future: The Future object to complete
    - task_name: Name of the task
    - delay: Time in seconds that the task will take to complete
    """
    # TODO: 
    # 1. Print message that the task has started
    # 2. Pause execution for 'delay' seconds
    # 3. Set the future's result with a message like "Task {task_name} completed in {delay} seconds"
    pass

async def monitor_task(future, task_name):
    """
    Monitors a future until it's done
    - future: The Future object to monitor
    - task_name: Name of the task
    """
    # TODO:
    # 1. Check the future's status (done or not) every 0.5 seconds
    # 2. Print updates about the task's status
    # 3. When the future is done, print its result and return
    pass

async def task_manager():
    """Creates and manages multiple concurrent tasks using Futures"""
    print("Starting task manager...")
    
    # TODO: 
    # 1. Create three Future objects
    # 2. Start the execute_task coroutine for each Future with random delays between 1-5 seconds
    # 3. Start the monitor_task coroutine for each Future
    # 4. Wait for all monitoring to complete
    # 5. Print a message that all tasks are complete
    
    print("Task manager completed!")

# Uncomment to run the task manager
# await task_manager()

In [ ]:
### SOLUTION

# Here's the solution for the practice exercise
# It's hidden below - try to solve it yourself first!

#
#
#
#
#
# Scroll down to see the solution
#
#
#
#
#
#

import random

async def execute_task_solution(future, task_name, delay):
    """
    Simulates task execution and sets the future result when done
    - future: The Future object to complete
    - task_name: Name of the task
    - delay: Time in seconds that the task will take to complete
    """
    print(f"[Executor] {task_name} started, will take {delay:.1f} seconds")
    await asyncio.sleep(delay)  # Simulate the task running
    result = f"Task {task_name} completed in {delay:.1f} seconds"
    future.set_result(result)  # Set the future's result
    print(f"[Executor] {task_name} completed")

async def monitor_task_solution(future, task_name):
    """
    Monitors a future until it's done
    - future: The Future object to monitor
    - task_name: Name of the task
    """
    count = 0
    while not future.done():
        count += 1
        print(f"[Monitor] {task_name}: Still running... (check {count})")
        await asyncio.sleep(0.5)  # Check status every 0.5 seconds
    
    # Once future is done, get the result
    result = future.result()
    print(f"[Monitor] {task_name}: COMPLETED with result: {result}")
    return result

async def task_manager_solution():
    """Creates and manages multiple concurrent tasks using Futures"""
    print("Starting task manager...")
    
    # Create futures
    future1 = asyncio.Future()
    future2 = asyncio.Future()
    future3 = asyncio.Future()
    
    # Create random delays for each task
    delay1 = random.uniform(1, 3)
    delay2 = random.uniform(1, 3)
    delay3 = random.uniform(1, 3)
    
    # Start execution tasks (these will set future results when done)
    asyncio.create_task(execute_task_solution(future1, "Database Backup", delay1))
    asyncio.create_task(execute_task_solution(future2, "File Indexing", delay2))
    asyncio.create_task(execute_task_solution(future3, "Image Processing", delay3))
    
    # Start monitoring tasks and wait for all futures to complete
    monitor_tasks = [
        monitor_task_solution(future1, "Database Backup"),
        monitor_task_solution(future2, "File Indexing"),
        monitor_task_solution(future3, "Image Processing")
    ]
    
    # Wait for all monitors to report completion
    results = await asyncio.gather(*monitor_tasks)
    
    print("\nAll tasks completed with results:")
    for i, result in enumerate(results, 1):
        print(f"  Task {i}: {result}")
    
    print("Task manager completed!")

# Uncomment to run the solution
# await task_manager_solution()

In [None]:
async def task1():
    await asyncio.sleep(1)
    print("Task 1 completed")
    return "Result 1"

async def task2():
    await asyncio.sleep(0.5)
    print("Task 2 completed")
    return "Result 2"

# Method 1: Using asyncio.gather()
print("Running tasks with asyncio.gather()")
results = await asyncio.gather(task1(), task2())
print(f"Results: {results}")

# Method 2: Creating and awaiting Tasks
print("\nRunning tasks with asyncio.create_task()")
t1 = asyncio.create_task(task1())
t2 = asyncio.create_task(task2())
await t1
await t2
print(f"Results: {t1.result()}, {t2.result()}")

### Practice!

Let's practice working with asynchronous context managers! In this exercise, you'll create a simple resource pool that manages connections to various services.

Your tasks:
1. Complete the `AsyncResource` class implementation with proper `__aenter__` and `__aexit__` methods
2. Create a decorator-based context manager using `@asynccontextmanager`
3. Use both approaches to manage resources in an application scenario

In [ ]:
from contextlib import asynccontextmanager

class DatabaseConnection:
    """A mock database connection class"""
    
    def __init__(self, db_name):
        self.db_name = db_name
        self.is_connected = False
        
    async def connect(self):
        print(f"Connecting to database {self.db_name}...")
        await asyncio.sleep(1)  # Simulate connection time
        self.is_connected = True
        print(f"Connected to {self.db_name}")
        
    async def disconnect(self):
        if self.is_connected:
            print(f"Disconnecting from {self.db_name}...")
            await asyncio.sleep(0.5)  # Simulate disconnect time
            self.is_connected = False
            print(f"Disconnected from {self.db_name}")
            
    async def execute_query(self, query):
        if not self.is_connected:
            raise RuntimeError("Not connected to database")
        print(f"Executing query on {self.db_name}: {query}")
        await asyncio.sleep(0.5)  # Simulate query execution
        return f"Results from {query} on {self.db_name}"


class AsyncDatabaseManager:
    """
    Async context manager for database connections
    """
    def __init__(self, db_name):
        self.connection = DatabaseConnection(db_name)
    
    # TODO: Implement __aenter__ and __aexit__ methods
    async def __aenter__(self):
        # Your code here
        pass
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        # Your code here
        pass


# TODO: Create a decorator-based context manager for the same purpose
@asynccontextmanager
async def database_connection(db_name):
    # Your code here
    pass


async def run_database_operations():
    """Run database operations using both context manager approaches"""
    # TODO: 
    # 1. Use the AsyncDatabaseManager class to connect to a database
    # 2. Execute a query within the context
    # 3. Verify that the connection is properly closed after exiting the context
    
    # TODO:
    # 4. Use the database_connection function to connect to another database
    # 5. Execute a different query within this context
    # 6. Also verify proper connection cleanup
    
    # TODO: 
    # 7. Demonstrate error handling by raising an exception within one of the contexts
    
    pass

# Uncomment to run the exercise
# await run_database_operations()

In [ ]:
### SOLUTION

# Here's the solution for the practice exercise
# It's hidden below - try to solve it yourself first!

#
#
#
#
#
# Scroll down to see the solution
#
#
#
#
#
#

from contextlib import asynccontextmanager

class AsyncDatabaseManager_Solution:
    """
    Async context manager for database connections
    """
    def __init__(self, db_name):
        self.connection = DatabaseConnection(db_name)
    
    async def __aenter__(self):
        # Connect to the database when entering the context
        await self.connection.connect()
        return self.connection  # Return the connection for use inside the context
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        # Disconnect when exiting the context, even if an error occurred
        await self.connection.disconnect()
        # By returning False, we ensure that any exceptions are propagated
        return False


@asynccontextmanager
async def database_connection_solution(db_name):
    # Create and connect to database
    connection = DatabaseConnection(db_name)
    try:
        await connection.connect()
        yield connection  # Provide the connection to the context
    finally:
        # Always disconnect, even if there was an error
        await connection.disconnect()


async def run_database_operations_solution():
    """Run database operations using both context manager approaches"""
    print("\n=== Using class-based async context manager ===")
    try:
        # Use the class-based context manager
        async with AsyncDatabaseManager_Solution("ProductDB") as db:
            result = await db.execute_query("SELECT * FROM products")
            print(f"Got result: {result}")
    except Exception as e:
        print(f"Error using class-based manager: {e}")
    
    print("\n=== Using decorator-based async context manager ===")
    try:
        # Use the decorator-based context manager
        async with database_connection_solution("UserDB") as db:
            result = await db.execute_query("SELECT * FROM users")
            print(f"Got result: {result}")
    except Exception as e:
        print(f"Error using decorator-based manager: {e}")
    
    print("\n=== Demonstrating error handling ===")
    try:
        # Demonstrate error handling
        async with AsyncDatabaseManager_Solution("LogDB") as db:
            print("About to raise an exception...")
            raise ValueError("Something went wrong!")
            # This line never executes
            result = await db.execute_query("SELECT * FROM logs")
    except ValueError as e:
        print(f"Successfully caught exception: {e}")
    
    print("\nAll database operations completed")

# Uncomment to run the solution
# await run_database_operations_solution()

## 3. Working with Tasks and Futures

### Tasks

A Task is a wrapper around a coroutine that schedules it to run on the event loop. Tasks allow you to run coroutines concurrently.

In [None]:
async def long_running_task(name, duration):
    print(f"{name} started")
    await asyncio.sleep(duration)
    print(f"{name} completed after {duration}s")
    return f"{name} result"

async def manage_tasks():
    # Create tasks
    task_a = asyncio.create_task(long_running_task("Task A", 3))
    task_b = asyncio.create_task(long_running_task("Task B", 2))
    task_c = asyncio.create_task(long_running_task("Task C", 1))
    
    # Wait for all tasks to complete
    await asyncio.gather(task_a, task_b, task_c)
    
    print(f"Task A result: {task_a.result()}")
    print(f"Task B result: {task_b.result()}")
    print(f"Task C result: {task_c.result()}")

await manage_tasks()

### Task Cancellation

You can cancel running tasks:

### Practice!

Let's practice using asyncio queues to build a producer-consumer system. In this exercise, you'll implement a simple task processing system where:

1. Multiple producers generate tasks and put them into a queue
2. Multiple consumers take tasks from the queue and process them
3. The system should handle backpressure (when producers are faster than consumers)

Your tasks:
1. Complete the producer function to generate tasks and add them to the queue
2. Complete the consumer function to process tasks from the queue
3. Complete the main function to set up and run the system

In [ ]:
import random

async def task_producer(queue, producer_id, num_tasks):
    """
    Produces tasks and puts them into the queue
    - queue: The asyncio.Queue to add tasks to
    - producer_id: ID to identify this producer
    - num_tasks: Number of tasks to produce
    """
    # TODO:
    # 1. Create a loop to generate 'num_tasks' tasks
    # 2. For each task, create a task description like f"Task {producer_id}-{task_num}"
    # 3. Put the task into the queue using queue.put()
    # 4. Add a small random delay between producing tasks (0.1-0.5 seconds)
    # 5. Print messages when starting/finishing production
    pass
    
async def task_consumer(queue, consumer_id, processing_time):
    """
    Consumes tasks from the queue and processes them
    - queue: The asyncio.Queue to get tasks from
    - consumer_id: ID to identify this consumer
    - processing_time: Time in seconds each task takes to process
    """
    # TODO:
    # 1. Create an infinite loop to continuously check for tasks
    # 2. Get tasks from the queue using queue.get()
    # 3. Process each task by waiting for 'processing_time' seconds
    # 4. Print messages when starting/finishing processing
    # 5. Call queue.task_done() after processing each task
    # 6. Add exception handling in case of errors
    pass

async def producer_consumer_demo():
    """
    Sets up and runs the producer-consumer system
    """
    # TODO:
    # 1. Create a queue with a maximum size of 5 tasks
    # 2. Create 2 producers that generate 5 tasks each
    # 3. Create 3 consumers with different processing times
    # 4. Start all producers and consumers
    # 5. Wait for all producers to finish
    # 6. Wait for the queue to be empty (all tasks processed)
    # 7. Cancel the consumer tasks (which are in infinite loops)
    # 8. Print a final summary message
    pass

# Uncomment to run the demo
# await producer_consumer_demo()

In [ ]:
### SOLUTION

# Here's the solution for the practice exercise
# It's hidden below - try to solve it yourself first!

#
#
#
#
#
# Scroll down to see the solution
#
#
#
#
#
#

import random

async def task_producer_solution(queue, producer_id, num_tasks):
    """
    Produces tasks and puts them into the queue
    - queue: The asyncio.Queue to add tasks to
    - producer_id: ID to identify this producer
    - num_tasks: Number of tasks to produce
    """
    print(f"Producer {producer_id}: Starting, will produce {num_tasks} tasks")
    
    for i in range(1, num_tasks + 1):
        # Create a task with producer ID and task number
        task = f"Task {producer_id}-{i}"
        
        # Put the task in the queue, this will wait if the queue is full
        print(f"Producer {producer_id}: Adding {task} to queue")
        await queue.put(task)
        
        # Random delay between tasks
        delay = random.uniform(0.1, 0.5)
        await asyncio.sleep(delay)
    
    print(f"Producer {producer_id}: Finished producing all {num_tasks} tasks")
    
async def task_consumer_solution(queue, consumer_id, processing_time):
    """
    Consumes tasks from the queue and processes them
    - queue: The asyncio.Queue to get tasks from
    - consumer_id: ID to identify this consumer
    - processing_time: Time in seconds each task takes to process
    """
    print(f"Consumer {consumer_id}: Starting (processing time: {processing_time}s per task)")
    
    # Track processed tasks for reporting
    tasks_processed = 0
    
    try:
        while True:  # Infinite loop to keep processing tasks
            # Get a task from the queue
            task = await queue.get()
            tasks_processed += 1
            
            # Process the task
            print(f"Consumer {consumer_id}: Processing {task}")
            await asyncio.sleep(processing_time)  # Simulate processing time
            print(f"Consumer {consumer_id}: Completed {task}")
            
            # Signal that the task is done
            queue.task_done()
    except asyncio.CancelledError:
        # Handle cancellation gracefully
        print(f"Consumer {consumer_id}: Shutting down after processing {tasks_processed} tasks")
        raise
    except Exception as e:
        print(f"Consumer {consumer_id}: Error processing task: {e}")
        queue.task_done()  # Don't forget to mark the task as done even on errors

async def producer_consumer_demo_solution():
    """
    Sets up and runs the producer-consumer system
    """
    # Create a queue with maximum size 5 to demonstrate backpressure
    queue = asyncio.Queue(maxsize=5)
    
    # Create and start producers
    producer_tasks = [
        asyncio.create_task(task_producer_solution(queue, 1, 5)),
        asyncio.create_task(task_producer_solution(queue, 2, 5))
    ]
    
    # Create and start consumers with different processing speeds
    consumer_tasks = [
        asyncio.create_task(task_consumer_solution(queue, 1, 0.7)),
        asyncio.create_task(task_consumer_solution(queue, 2, 1.0)),
        asyncio.create_task(task_consumer_solution(queue, 3, 0.5))
    ]
    
    # Wait for all producers to finish
    print("Waiting for producers to finish...")
    await asyncio.gather(*producer_tasks)
    print("All producers have finished!")
    
    # Wait for the queue to be empty (all tasks processed)
    print("Waiting for all tasks to be processed...")
    await queue.join()
    
    # All tasks are done, cancel the consumer tasks
    print("All tasks processed, cancelling consumers...")
    for consumer in consumer_tasks:
        consumer.cancel()
    
    # Wait for consumers to finish cancellation
    await asyncio.gather(*consumer_tasks, return_exceptions=True)
    
    print("\nProducer-Consumer demo completed")
    print(f"Total tasks produced: 10")
    print(f"Queue size at end: {queue.qsize()}")
    print(f"All tasks successfully processed!")

# Uncomment to run the solution
# await producer_consumer_demo_solution()

In [None]:
async def cancellable_task():
    try:
        print("Task started")
        while True:  # Infinite loop
            print("Working...")
            await asyncio.sleep(0.5)
    except asyncio.CancelledError:
        print("Task was cancelled!")
        raise  # Re-raise to properly handle cancellation

async def cancel_after_delay(task, delay):
    await asyncio.sleep(delay)
    task.cancel()
    print(f"Cancellation request sent after {delay}s")

async def demo_cancellation():
    task = asyncio.create_task(cancellable_task())
    # Schedule the task to be cancelled after 2 seconds
    asyncio.create_task(cancel_after_delay(task, 2))
    
    try:
        await task
    except asyncio.CancelledError:
        print("Main function caught the cancellation")

await demo_cancellation()

### Futures

A Future is a low-level awaitable object that represents an eventual result of an asynchronous operation. Tasks are a subclass of Future.

In [None]:
async def set_future_result(future, value, delay):
    await asyncio.sleep(delay)
    future.set_result(value)

async def future_demo():
    # Create a future
    future = asyncio.Future()
    
    # Schedule a coroutine to set the future's result
    asyncio.create_task(set_future_result(future, "Future result", 2))
    
    # Wait for the future to have a result
    print("Waiting for future result...")
    result = await future
    print(f"Got result: {result}")

await future_demo()

### Asynchronous Context Managers

Asyncio provides the asynchronous counterpart to Python's context managers. This allows resources to be properly managed asynchronously, especially when acquisition or release operations are I/O-bound.

In [None]:
# Define a class with __aenter__ and __aexit__ methods
class AsyncResource:
    def __init__(self, name):
        self.name = name
        
    async def __aenter__(self):
        print(f"Acquiring {self.name} asynchronously...")
        await asyncio.sleep(1)  # Simulate async resource acquisition
        print(f"{self.name} acquired")
        return self
        
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print(f"Releasing {self.name} asynchronously...")
        await asyncio.sleep(0.5)  # Simulate async resource release
        print(f"{self.name} released")
        # Returning False means exceptions propagate, True would suppress them
        return False
        
    async def use_resource(self):
        print(f"Using {self.name}")
        await asyncio.sleep(0.5)
        return f"{self.name} result"

# Using an async context manager with "async with"
async def async_context_demo():
    # Method 1: Using async with
    print("Using async with:")
    async with AsyncResource("Database Connection") as resource:
        result = await resource.use_resource()
        print(f"Got result: {result}")
    
    # Method 2: Using try/finally and manual enter/exit
    print("\nUsing manual enter/exit:")
    resource = AsyncResource("File Handle")
    try:
        await resource.__aenter__()
        result = await resource.use_resource()
        print(f"Got result: {result}")
    finally:
        await resource.__aexit__(None, None, None)
        
    # Method 3: Error handling demonstration
    print("\nHandling exceptions in async context managers:")
    try:
        async with AsyncResource("Network Connection") as resource:
            print("About to raise an exception...")
            raise ValueError("Something went wrong!")
            # This line never executes
            result = await resource.use_resource()
    except ValueError as e:
        print(f"Caught exception: {e}")

await async_context_demo()

Using `__aenter__` and `__aexit__` allows you to create asynchronous context managers that can:

1. Asynchronously acquire and release resources
2. Properly handle exceptions that occur within the context
3. Ensure resources are cleaned up even when errors occur

Common use cases include:
- Database connections requiring async setup/teardown
- Network resources with async initialization
- File I/O operations with aiofiles
- Connection pools that need to be acquired/released asynchronously

The `asynccontextmanager` decorator from `contextlib` provides an alternative way to create async context managers without implementing a full class:

In [None]:
from contextlib import asynccontextmanager

@asynccontextmanager
async def managed_resource(name):
    print(f"Acquiring {name}...")
    await asyncio.sleep(1)  # Simulate async acquisition
    try:
        yield name  # Provide the resource
    finally:
        print(f"Releasing {name}...")
        await asyncio.sleep(0.5)  # Simulate async release
        print(f"{name} released")

async def contextmanager_demo():
    async with managed_resource("API Connection") as resource:
        print(f"Using {resource}")
        await asyncio.sleep(0.5)
        print("Operation complete")

await contextmanager_demo()

### Waiting for Multiple Tasks

You can wait for multiple tasks with different strategies:

In [None]:
async def wait_demo():
    tasks = [
        asyncio.create_task(long_running_task("Task X", 3)),
        asyncio.create_task(long_running_task("Task Y", 1)),
        asyncio.create_task(long_running_task("Task Z", 2))
    ]
    
    # Wait for the first task to complete
    print("\nWaiting for the first task to complete...")
    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
    
    print(f"\n{len(done)} task(s) completed:")
    for task in done:
        print(f"  - {task.result()}")
        
    print(f"\n{len(pending)} task(s) still pending")
    
    # Wait for the remaining tasks
    print("\nWaiting for remaining tasks...")
    done, pending = await asyncio.wait(pending)
    
    print(f"\nAll tasks completed. Results:")
    for task in tasks:
        print(f"  - {task.result()}")

await wait_demo()

### Practice!

Let's practice proper error handling in asyncio applications. In this exercise, you'll build a system to process a batch of URLs, handling different types of errors that might occur.

Your tasks:
1. Complete the `fetch_url` function to handle connection errors and timeouts gracefully
2. Complete the `process_batch` function to process multiple URLs in parallel with proper error handling
3. Implement three approaches for handling errors in asyncio tasks

In [ ]:
class NotFoundException(Exception):
    """Raised when a resource is not found"""
    pass

class TimeoutException(Exception):
    """Raised when an operation times out"""
    pass

class ServerException(Exception):
    """Raised when a server error occurs"""
    pass

async def simulate_request(url):
    """
    Simulates an HTTP request that might succeed, fail, or timeout
    This is a helper function for the exercise - you don't need to modify it
    """
    # Delay to simulate network latency
    await asyncio.sleep(random.uniform(0.1, 0.5))
    
    # Simulate different response scenarios
    if "notfound" in url:
        raise NotFoundException(f"Resource not found: {url}")
    elif "timeout" in url:
        raise TimeoutException(f"Request timed out: {url}")
    elif "error" in url:
        raise ServerException(f"Server error for: {url}")
    elif "slow" in url:
        # This is a slow but ultimately successful request
        await asyncio.sleep(2)
        return f"Slow response from {url}"
    else:
        return f"Success! Data from {url}"

async def fetch_url(url, timeout=1.0):
    """
    Fetches a URL with proper error handling and timeout
    - url: The URL to fetch
    - timeout: Maximum time to wait before timing out
    
    Should return a tuple of (url, status, result)
    where status is 'success', 'not_found', 'timeout', or 'error'
    and result contains either the data or the error message
    """
    # TODO:
    # 1. Implement proper exception handling for different error types
    # 2. Add a timeout using asyncio.wait_for
    # 3. Return appropriate status and result values depending on the outcome
    pass

async def process_batch_method1():
    """
    Process multiple URLs using individual try/except blocks
    """
    print("Method 1: Individual try/except around each task")
    
    urls = [
        "https://example.com/api/data",
        "https://example.com/api/notfound",
        "https://example.com/api/timeout",
        "https://example.com/api/error",
        "https://example.com/api/slow"
    ]
    
    # TODO:
    # 1. Process each URL individually with its own try/except
    # 2. Print the results, showing success or the specific error for each URL
    pass

async def process_batch_method2():
    """
    Process multiple URLs using gather with return_exceptions=True
    """
    print("\nMethod 2: Using gather with return_exceptions=True")
    
    urls = [
        "https://example.com/api/data",
        "https://example.com/api/notfound",
        "https://example.com/api/timeout",
        "https://example.com/api/error",
        "https://example.com/api/slow"
    ]
    
    # TODO:
    # 1. Create tasks for each URL
    # 2. Use asyncio.gather with return_exceptions=True
    # 3. Process the results, distinguishing between successful results and exceptions
    pass

async def process_batch_method3():
    """
    Process multiple URLs using tasks with callbacks
    """
    print("\nMethod 3: Using tasks with callbacks")
    
    urls = [
        "https://example.com/api/data",
        "https://example.com/api/notfound",
        "https://example.com/api/timeout",
        "https://example.com/api/error",
        "https://example.com/api/slow"
    ]
    
    # TODO:
    # 1. Create a callback function to handle task completion and exceptions
    # 2. Create and start tasks for each URL
    # 3. Add the callback to each task
    # 4. Wait for all tasks to complete
    pass

async def error_handling_exercise():
    """Run all three methods of error handling"""
    await process_batch_method1()
    await process_batch_method2()
    await process_batch_method3()
    print("\nAll error handling methods demonstrated!")

# Uncomment to run the exercise
# await error_handling_exercise()

In [ ]:
### SOLUTION

# Here's the solution for the practice exercise
# It's hidden below - try to solve it yourself first!

#
#
#
#
#
# Scroll down to see the solution
#
#
#
#
#
#

async def fetch_url_solution(url, timeout=1.0):
    """
    Fetches a URL with proper error handling and timeout
    """
    try:
        # Try to fetch the URL with a timeout
        try:
            result = await asyncio.wait_for(simulate_request(url), timeout)
            return url, 'success', result
        except asyncio.TimeoutError:
            return url, 'timeout', f"Request timed out after {timeout} seconds"
            
    except NotFoundException as e:
        return url, 'not_found', str(e)
    except TimeoutException as e:
        return url, 'timeout', str(e)
    except ServerException as e:
        return url, 'error', str(e)
    except Exception as e:
        # Catch any other unexpected exceptions
        return url, 'error', f"Unexpected error: {str(e)}"

async def process_batch_method1_solution():
    """
    Process multiple URLs using individual try/except blocks
    """
    print("Method 1: Individual try/except around each task")
    
    urls = [
        "https://example.com/api/data",
        "https://example.com/api/notfound",
        "https://example.com/api/timeout",
        "https://example.com/api/error",
        "https://example.com/api/slow"
    ]
    
    results = []
    
    # Process each URL individually
    for url in urls:
        try:
            print(f"Processing {url}...")
            result = await fetch_url_solution(url)
            print(f"Result: {result[1]} - {result[2]}")
            results.append(result)
        except Exception as e:
            # This should never happen because fetch_url handles all exceptions
            print(f"Unexpected error processing {url}: {e}")
            results.append((url, 'error', str(e)))
    
    # Summary
    print("\nSummary:")
    for url, status, message in results:
        print(f"  {url}: {status}")

async def process_batch_method2_solution():
    """
    Process multiple URLs using gather with return_exceptions=True
    """
    print("\nMethod 2: Using gather with return_exceptions=True")
    
    urls = [
        "https://example.com/api/data",
        "https://example.com/api/notfound",
        "https://example.com/api/timeout",
        "https://example.com/api/error",
        "https://example.com/api/slow"
    ]
    
    # Create tasks for each URL
    tasks = [fetch_url_solution(url) for url in urls]
    
    # Wait for all tasks, getting either results or exceptions
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    # Process and display results
    print("\nResults:")
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            # This is an exception that wasn't caught in fetch_url
            print(f"  {urls[i]}: Error - {str(result)}")
        else:
            url, status, message = result
            print(f"  {url}: {status} - {message}")

async def process_batch_method3_solution():
    """
    Process multiple URLs using tasks with callbacks
    """
    print("\nMethod 3: Using tasks with callbacks")
    
    urls = [
        "https://example.com/api/data",
        "https://example.com/api/notfound",
        "https://example.com/api/timeout",
        "https://example.com/api/error",
        "https://example.com/api/slow"
    ]
    
    # Dictionary to track results
    results = {}
    
    # Callback function to handle task completion
    def task_done(task):
        try:
            url, status, message = task.result()
            results[url] = (status, message)
            print(f"Callback: {url} completed with status '{status}'")
        except Exception as e:
            # This happens if fetch_url itself raises an uncaught exception
            url = task.get_name()
            results[url] = ('error', str(e))
            print(f"Callback: Error in task for {url}: {e}")
    
    # Create and start tasks
    tasks = []
    for url in urls:
        # Create task with a name that identifies the URL
        task = asyncio.create_task(fetch_url_solution(url), name=url)
        # Add the callback to handle completion
        task.add_done_callback(task_done)
        tasks.append(task)
    
    # Wait for all tasks to complete
    await asyncio.gather(*tasks, return_exceptions=True)
    
    # Summary
    print("\nSummary from callbacks:")
    for url, (status, message) in results.items():
        print(f"  {url}: {status}")

async def error_handling_exercise_solution():
    """Run all three methods of error handling"""
    await process_batch_method1_solution()
    await process_batch_method2_solution()
    await process_batch_method3_solution()
    print("\nAll error handling methods demonstrated!")

# Uncomment to run the solution
# await error_handling_exercise_solution()

## 4. Advanced Asyncio Concepts

### Timeouts

You can add timeouts to operations to prevent them from running too long:

In [None]:
async def potentially_slow_operation():
    print("Slow operation started")
    await asyncio.sleep(5)  # Simulate a slow operation
    print("Slow operation finished")
    return "Operation result"

async def timeout_demo():
    try:
        # Set a 2-second timeout for the operation
        result = await asyncio.wait_for(potentially_slow_operation(), timeout=2)
        print(f"Got result: {result}")
    except asyncio.TimeoutError:
        print("Operation timed out!")

await timeout_demo()

### Asyncio Queues

Asyncio provides queue implementations for coordinating between producer and consumer coroutines:

In [None]:
async def producer(queue, name, items):
    for i in range(items):
        item = f"{name} item {i}"
        await queue.put(item)
        print(f"{name} produced: {item}")
        await asyncio.sleep(0.5)
    print(f"{name} finished producing")

async def consumer(queue, name, items_to_process):
    for _ in range(items_to_process):
        item = await queue.get()
        print(f"{name} consumed: {item}")
        queue.task_done()
        await asyncio.sleep(1)  # Simulate processing time
    print(f"{name} finished consuming")

async def queue_demo():
    # Create a queue
    queue = asyncio.Queue(maxsize=5)  # Limit queue size to 5 items
    
    # Create tasks for producers and consumers
    producer_tasks = [
        asyncio.create_task(producer(queue, "Producer 1", 4)),
        asyncio.create_task(producer(queue, "Producer 2", 3))
    ]
    
    consumer_tasks = [
        asyncio.create_task(consumer(queue, "Consumer 1", 3)),
        asyncio.create_task(consumer(queue, "Consumer 2", 4))
    ]
    
    # Wait for all producers to finish
    await asyncio.gather(*producer_tasks)
    
    # Wait for the queue to be fully processed
    await queue.join()
    
    # Wait for all consumers to finish
    await asyncio.gather(*consumer_tasks)
    
    print("Queue demo completed")

await queue_demo()

### Synchronization Primitives

Asyncio provides synchronization primitives similar to the ones in the `threading` module:

In [None]:
async def worker(lock, worker_id, shared_resource):
    print(f"Worker {worker_id} is waiting for the lock")
    async with lock:  # Acquire and release the lock automatically
        print(f"Worker {worker_id} acquired the lock")
        # Simulate working with a shared resource
        shared_resource.append(worker_id)
        print(f"Worker {worker_id} updated shared resource: {shared_resource}")
        await asyncio.sleep(1)  # Simulate some work
    print(f"Worker {worker_id} released the lock")

async def lock_demo():
    # Create a lock
    lock = asyncio.Lock()
    shared_resource = []
    
    # Create and run multiple workers concurrently
    workers = [worker(lock, i, shared_resource) for i in range(3)]
    await asyncio.gather(*workers)
    
    print(f"Final shared resource: {shared_resource}")

await lock_demo()

## 5. Practical Examples

### Asynchronous Web Requests

Let's use `aiohttp` to make concurrent web requests (install with `pip install aiohttp` if needed):

In [None]:
try:
    import aiohttp
except ImportError:
    print("aiohttp is not installed. Please install it with:")
    print("pip install aiohttp")
    print("\nFor now, we'll skip this example.")
else:
    async def fetch_url(session, url):
        print(f"Fetching {url}")
        async with session.get(url) as response:
            if response.status != 200:
                return f"Error: {response.status} for {url}"
            data = await response.text()
            return f"Fetched {len(data)} bytes from {url}"

    async def fetch_multiple_urls():
        urls = [
            "https://example.com",
            "https://python.org",
            "https://docs.python.org/"
        ]
        
        async with aiohttp.ClientSession() as session:
            tasks = [fetch_url(session, url) for url in urls]
            results = await asyncio.gather(*tasks)
            
            for i, result in enumerate(results):
                print(f"Result {i+1}: {result}")

    # Run the example if aiohttp is installed
    try:
        await fetch_multiple_urls()
    except NameError:
        pass  # Skip if aiohttp isn't installed

### File I/O with aiofiles

Let's use `aiofiles` for asynchronous file operations (install with `pip install aiofiles` if needed):

In [None]:
try:
    import aiofiles
except ImportError:
    print("aiofiles is not installed. Please install it with:")
    print("pip install aiofiles")
    print("\nFor now, we'll skip this example.")
else:
    async def write_file(filename, content):
        print(f"Writing to {filename}")
        async with aiofiles.open(filename, 'w') as file:
            await file.write(content)
        print(f"Finished writing to {filename}")

    async def read_file(filename):
        print(f"Reading from {filename}")
        async with aiofiles.open(filename, 'r') as file:
            content = await file.read()
        print(f"Read {len(content)} bytes from {filename}")
        return content

    async def file_io_demo():
        # Write files concurrently
        write_tasks = [
            write_file('asyncio_demo_1.txt', 'This is file 1 content\n' * 1000),
            write_file('asyncio_demo_2.txt', 'This is file 2 content\n' * 1000),
            write_file('asyncio_demo_3.txt', 'This is file 3 content\n' * 1000)
        ]
        await asyncio.gather(*write_tasks)
        
        # Read files concurrently
        read_tasks = [
            read_file('asyncio_demo_1.txt'),
            read_file('asyncio_demo_2.txt'),
            read_file('asyncio_demo_3.txt')
        ]
        results = await asyncio.gather(*read_tasks)
        
        # Clean up demo files (optional)
        import os
        for i in range(1, 4):
            os.remove(f'asyncio_demo_{i}.txt')
            
        print("File I/O demo completed")

    # Run the example if aiofiles is installed
    try:
        await file_io_demo()
    except NameError:
        pass  # Skip if aiofiles isn't installed

### Building a Simple Asynchronous Server

Let's build a very simple echo server using asyncio streams:

In [None]:
async def handle_client(reader, writer):
    # Get client address
    addr = writer.get_extra_info('peername')
    print(f"Client connected: {addr}")
    
    while True:
        # Read data from the client
        data = await reader.read(100)
        message = data.decode()
        
        if not data:  # Client disconnected
            break
            
        print(f"Received from {addr}: {message.strip()}")
        
        # Echo the message back to the client
        response = f"Echo: {message}"
        writer.write(response.encode())
        await writer.drain()
        
    # Close the connection
    print(f"Client disconnected: {addr}")
    writer.close()
    await writer.wait_closed()

async def run_server():
    server = await asyncio.start_server(
        handle_client, '127.0.0.1', 8888)
    
    # Get server address
    addr = server.sockets[0].getsockname()
    print(f"Server running on {addr}")
    
    print("To test the server:")
    print("1. Open another terminal")
    print("2. Use: telnet 127.0.0.1 8888")
    print("3. Type messages and see them echoed back")
    print("4. Press Ctrl+C here to stop the server")
    
    try:
        async with server:
            # This will keep the server running until cancelled
            await server.serve_forever()
    except asyncio.CancelledError:
        print("Server stopped")

# Uncomment to run the server (note: this will block the notebook until cancelled)
# await run_server()

### Simulating a Real-world Application: Web Crawler

Let's build a simple asynchronous web crawler that respects rate limits:

In [None]:
try:
    import aiohttp
    from bs4 import BeautifulSoup
except ImportError:
    print("Required packages not installed. Please install them with:")
    print("pip install aiohttp beautifulsoup4")
    print("\nSkipping this example.")
else:
    class AsyncCrawler:
        def __init__(self, base_url, max_urls=10, rate_limit=1):
            self.base_url = base_url
            self.to_visit = asyncio.Queue()
            self.visited = set()
            self.max_urls = max_urls
            self.rate_limit = rate_limit  # Seconds between requests
            self.session = None
            self.rate_limiter = asyncio.Semaphore(1)  # Only 1 request at a time
            
        async def __aenter__(self):
            self.session = aiohttp.ClientSession()
            await self.to_visit.put(self.base_url)
            return self
            
        async def __aexit__(self, exc_type, exc_val, exc_tb):
            assert self.session is not None, "Session is not initialized"
            await self.session.close()
            
        async def fetch_url(self, url):
            async with self.rate_limiter:
                print(f"Fetching: {url}")
                try:
                    assert self.session is not None, "Session must be initialized"
                    async with self.session.get(url) as response:
                        if response.status != 200:
                            print(f"Error: {response.status} for {url}")
                            return None
                            
                        html = await response.text()
                        print(f"Fetched {len(html)} bytes from {url}")
                        # Respect rate limit
                        await asyncio.sleep(self.rate_limit)
                        return html
                except Exception as e:
                    print(f"Error fetching {url}: {e}")
                    return None
                    
        def extract_links(self, html, url):
            soup = BeautifulSoup(html, 'html.parser')
            links = set()
            
            for link in soup.find_all('a', href=True):
                href = link['href']
                
                # Handle relative URLs
                if href.startswith('/'):
                    href = f"{self.base_url.rstrip('/')}{href}"
                    
                # Only follow links from the same domain
                if href.startswith(self.base_url) and href not in self.visited:
                    links.add(href)
                    
            print(f"Found {len(links)} new links on {url}")
            return links
            
        async def crawl(self):
            while len(self.visited) < self.max_urls:
                if self.to_visit.empty():
                    print("No more URLs to visit")
                    break
                    
                url = await self.to_visit.get()
                
                if url in self.visited:
                    continue
                    
                self.visited.add(url)
                html = await self.fetch_url(url)
                
                if html:
                    links = self.extract_links(html, url)
                    for link in links:
                        if len(self.visited) + self.to_visit.qsize() < self.max_urls:
                            await self.to_visit.put(link)
                            
            print(f"Crawling complete. Visited {len(self.visited)} URLs")
            return self.visited

    async def run_crawler_demo():
        print("Starting web crawler demo")
        async with AsyncCrawler("https://docs.python.org/", max_urls=5, rate_limit=1) as crawler:
            visited_urls = await crawler.crawl()
            print("\nVisited URLs:")
            for i, url in enumerate(visited_urls, 1):
                print(f"{i}. {url}")

    # Run the crawler example if required packages are installed
    try:
        # Uncomment to run the crawler demo
        # await run_crawler_demo()
        pass
    except NameError:
        pass  # Skip if required packages aren't installed

## 6. Best Practices and Patterns

### Error Handling in Asyncio

Proper error handling is crucial in asynchronous code:

In [None]:
async def might_fail(success_rate=0.5):
    import random
    await asyncio.sleep(1)  # Simulate some work
    if random.random() > success_rate:
        raise ValueError("Operation failed")
    return "Operation succeeded"

async def error_handling_demo():
    # Method 1: Try/except around individual tasks
    print("Method 1: Individual try/except:")
    for i in range(3):
        try:
            result = await might_fail(0.3)
            print(f"Task {i}: {result}")
        except ValueError as e:
            print(f"Task {i} failed: {e}")
    
    # Method 2: gather() with return_exceptions=True
    print("\nMethod 2: gather with return_exceptions=True:")
    tasks = [might_fail(0.3) for _ in range(3)]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Task {i} failed: {result}")
        else:
            print(f"Task {i}: {result}")
    
    # Method 3: Creating tasks and handling exceptions with callbacks
    print("\nMethod 3: Task callbacks:")
    
    def handle_task_result(task):
        try:
            result = task.result()
            print(f"Task completed: {result}")
        except Exception as e:
            print(f"Task failed: {e}")
    
    tasks = []
    for i in range(3):
        task = asyncio.create_task(might_fail(0.3))
        task.add_done_callback(handle_task_result)
        tasks.append(task)
    
    # Wait for all tasks to complete
    await asyncio.gather(*tasks, return_exceptions=True)

await error_handling_demo()

### Debugging Asyncio Code

Asyncio provides debugging options to help track down issues:

In [None]:
# Enable debug mode
def debug_demo():
    """This is just informational - you would run this in a Python script, not in a notebook"""
    # In a Python script, you can enable debug mode:
    """
    import asyncio
    
    # Method 1: Environment variable
    # Set PYTHONASYNCIODEBUG=1 before running your script
    
    # Method 2: Using the event loop's debug mode
    asyncio.get_event_loop().set_debug(True)
    
    # Method 3: In Python 3.7+
    # asyncio.run(main(), debug=True)
    """
    
    # Debug features include:
    # - More detailed logging
    # - Warning about slow callbacks (>100ms)
    # - Resource tracking (detect unclosed resources)
    # - Exception handling improvements (more details about where exceptions occurred)

# Tips for effective asyncio debugging:
async def debugging_tips():
    """This is just for documentation - these are strategies you would apply in real code"""
    # 1. Use meaningful task names
    task = asyncio.create_task(long_running_task("Task A", 3), name="fetch_user_data")
    
    # 2. Use logging instead of print
    import logging
    logging.basicConfig(level=logging.DEBUG)
    logging.debug("Task started")
    
    # 3. Run with debug mode in scripts
    # asyncio.run(main(), debug=True)
    
    # 4. Use asyncio.current_task() to get the current task
    current = asyncio.current_task()
    assert current is not None, "No current task found"
    print(f"Current task: {current.get_name()}")
    
    # 5. Use asyncio.all_tasks() to see all tasks
    all_tasks = asyncio.all_tasks()
    print(f"Total tasks: {len(all_tasks)}")

### Common Asyncio Patterns

Here are some common patterns you'll see with asyncio:

In [None]:
# Pattern 1: Scatter-gather (process multiple items in parallel)
async def process_item(item):
    await asyncio.sleep(1)  # Simulate processing
    return f"Processed {item}"

async def scatter_gather_pattern(items):
    tasks = [process_item(item) for item in items]
    results = await asyncio.gather(*tasks)
    return results

# Pattern 2: Producer-Consumer with a queue
async def producer_consumer_pattern(num_producers, num_consumers, num_items):
    queue = asyncio.Queue()
    
    # Producer function
    async def producer(producer_id, n_items):
        for i in range(n_items):
            item = f"Producer {producer_id} - Item {i}"
            await queue.put(item)
            await asyncio.sleep(0.1)  # Simulate production time
        await queue.put(None)  # Sentinel to signal completion
    
    # Consumer function
    async def consumer(consumer_id):
        while True:
            item = await queue.get()
            if item is None:  # Check for sentinel
                await queue.put(None)  # Put sentinel back for other consumers
                break
            print(f"Consumer {consumer_id} processing {item}")
            await asyncio.sleep(0.5)  # Simulate consumption time
            queue.task_done()
    
    # Create and start producers
    producer_tasks = [asyncio.create_task(producer(i, num_items // num_producers)) 
                      for i in range(num_producers)]
    
    # Create and start consumers
    consumer_tasks = [asyncio.create_task(consumer(i)) 
                     for i in range(num_consumers)]
    
    # Wait for all producers to finish
    await asyncio.gather(*producer_tasks)
    
    # Wait for all consumers to finish
    await asyncio.gather(*consumer_tasks)

# Pattern 3: Throttled API calls with semaphores
async def api_call(url, semaphore):
    async with semaphore:  # Limit the number of concurrent API calls
        print(f"Calling API: {url}")
        await asyncio.sleep(1)  # Simulate API call
        return f"Result from {url}"

async def throttled_api_pattern(urls, max_concurrent=3):
    semaphore = asyncio.Semaphore(max_concurrent)
    tasks = [api_call(url, semaphore) for url in urls]
    results = await asyncio.gather(*tasks)
    return results

# Demonstrate a few of these patterns
async def demonstrate_patterns():
    print("Pattern 1: Scatter-Gather")
    items = ['item1', 'item2', 'item3', 'item4']
    results = await scatter_gather_pattern(items)
    print(f"Results: {results}")
    
    print("\nPattern 3: Throttled API Calls")
    urls = ['https://api.example.com/' + str(i) for i in range(1, 6)]
    results = await throttled_api_pattern(urls, max_concurrent=2)
    print(f"Results: {results}")
    
    # Pattern 2 is more verbose, so let's run a small version
    print("\nPattern 2: Producer-Consumer")
    await producer_consumer_pattern(num_producers=2, num_consumers=2, num_items=4)

await demonstrate_patterns()

### Common Pitfalls to Avoid

Here are some common mistakes when working with asyncio:

In [None]:
# Pitfall 1: Forgetting to await a coroutine
async def pitfall_not_awaiting():
    async def my_coro():
        await asyncio.sleep(1)
        print("Coroutine executed")
        return "Result"
    
    print("\nPitfall 1: Forgetting to await a coroutine")
    # Wrong: This doesn't execute the coroutine, just creates a coroutine object
    my_coro()  # This will show a warning in Python 3.8+
    
    # Correct: This executes the coroutine
    await my_coro()

# Pitfall 2: Blocking the event loop with CPU-bound or blocking operations
async def pitfall_blocking_loop():
    print("\nPitfall 2: Blocking the event loop")
    
    # Wrong: This blocks the event loop, preventing other coroutines from running
    def heavy_computation():
        print("Starting heavy computation...")
        # Simulate a CPU-intensive task
        result = 0
        for i in range(10_000_000):
            result += i
        print("Heavy computation finished")
        return result
    
    print("Wrong way (blocks the event loop):")
    start = time.time()
    result = heavy_computation()  # This blocks the event loop!
    print(f"Time taken: {time.time() - start:.2f}s")
    
    # Correct: Use run_in_executor for CPU-bound tasks
    print("\nCorrect way (using executor):")
    start = time.time()
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(None, heavy_computation)
    print(f"Time taken: {time.time() - start:.2f}s")

# Pitfall 3: Not handling exceptions properly in tasks
async def pitfall_unhandled_exceptions():
    print("\nPitfall 3: Not handling exceptions in tasks")
    
    async def will_fail():
        await asyncio.sleep(0.5)
        raise ValueError("Task failed!")
    
    # Wrong: Exceptions in tasks can be lost if not handled
    print("Wrong way (exception will be lost):")
    task = asyncio.create_task(will_fail())
    await asyncio.sleep(1)  # Wait long enough for the task to fail
    
    # Correct: Always await tasks to propagate exceptions
    print("\nCorrect way (catching the exception):")
    task = asyncio.create_task(will_fail())
    try:
        await task
    except ValueError as e:
        print(f"Caught exception: {e}")

# Let's show these pitfalls
async def show_pitfalls():
    await pitfall_not_awaiting()
    await pitfall_blocking_loop()
    await pitfall_unhandled_exceptions()

await show_pitfalls()

## Conclusion

Congratulations! You've completed this comprehensive tutorial on Python's asyncio library. Let's recap what we've learned:

1. **Asynchronous Programming Basics**
   - Understanding the difference between synchronous and asynchronous code
   - When to use asyncio (I/O-bound operations)

2. **Core Asyncio Concepts**
   - Coroutines and the `async`/`await` syntax
   - Event loops
   - Tasks and Futures

3. **Advanced Features**
   - Working with queues
   - Synchronization primitives
   - Error handling
   - Timeouts

4. **Practical Applications**
   - Asynchronous HTTP requests
   - File I/O operations
   - Building a simple server
   - Creating a web crawler

5. **Best Practices and Common Patterns**
   - Error handling strategies
   - Debugging techniques
   - Common patterns (scatter-gather, producer-consumer, throttling)
   - Pitfalls to avoid

Remember that asyncio is particularly powerful for I/O-bound applications but isn't suitable for CPU-bound tasks (use `multiprocessing` for those). With the knowledge gained from this tutorial, you should now be able to write efficient, concurrent code using Python's asyncio library.

### Further Resources

To continue learning about asyncio:

1. [Official asyncio documentation](https://docs.python.org/3/library/asyncio.html)
2. [PEP 3156 – Asynchronous IO Support](https://peps.python.org/pep-3156/)
3. [Real Python's asyncio tutorials](https://realpython.com/async-io-python/)
4. Popular asyncio libraries:
   - [aiohttp](https://docs.aiohttp.org/) – HTTP client/server
   - [FastAPI](https://fastapi.tiangolo.com/) – Web framework built on asyncio
   - [asyncpg](https://magicstack.github.io/asyncpg/) – PostgreSQL client
   - [aiomysql](https://aiomysql.readthedocs.io/) – MySQL client
   - [motor](https://motor.readthedocs.io/) – MongoDB client