# Lesson A3: Concurrency & Async Programming

**Duration**: 150-165 minutes  
**Stage**: Advanced (Mastery)  
**Prerequisites**: Lessons A1-A2 (Lifetimes, Smart Pointers)

---

## üìã What You'll Learn

This lesson covers Rust's powerful concurrency features, from threads and message passing to async/await and futures. You'll learn to write safe, efficient concurrent code that leverages Rust's type system to prevent data races at compile time.

**Why this matters**: Concurrency is essential for modern applications - from web servers handling thousands of connections to data processing pipelines. Rust's fearless concurrency makes it possible to write concurrent code that's both fast and safe, without the bugs that plague concurrent code in other languages.

---

## üéØ Learning Objectives

By the end of this lesson, you will be able to:
1. Create and manage threads safely
2. Use channels for message passing
3. Implement shared-state concurrency with Arc/Mutex
4. Understand async/await syntax
5. Work with Futures and executors
6. Build concurrent applications with Tokio


In [None]:
// Basic thread creation and management

use std::thread;
use std::time::Duration;

fn basic_threads_demo() {
    println!("=== Basic Thread Creation ===");
    
    // Spawn a simple thread
    let handle = thread::spawn(|| {
        for i in 1..=5 {
            println!("Thread: count {}", i);
            thread::sleep(Duration::from_millis(100));
        }
    });
    
    // Main thread work
    for i in 1..=3 {
        println!("Main: count {}", i);
        thread::sleep(Duration::from_millis(150));
    }
    
    // Wait for thread to complete
    handle.join().unwrap();
    println!("Thread completed!\n");
    
    // Multiple threads
    let mut handles = vec![];
    
    for i in 0..3 {
        let handle = thread::spawn(move || {
            println!("Worker thread {} starting", i);
            thread::sleep(Duration::from_millis(100 * (i + 1) as u64));
            println!("Worker thread {} finished", i);
            i * i // Return value
        });
        handles.push(handle);
    }
    
    // Collect results
    let results: Vec<i32> = handles
        .into_iter()
        .map(|handle| handle.join().unwrap())
        .collect();
    
    println!("Thread results: {:?}", results);
}

basic_threads_demo();

### Message Passing with Channels

In [None]:
// Message passing with channels

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

#[derive(Debug, Clone)]
enum Message {
    Text(String),
    Number(i32),
    Quit,
}

fn channels_demo() {
    println!("\n=== Message Passing with Channels ===");
    
    // Simple channel example
    let (tx, rx) = mpsc::channel();
    
    thread::spawn(move || {
        let messages = vec![
            "Hello",
            "from", 
            "the",
            "thread"
        ];
        
        for msg in messages {
            tx.send(msg.to_string()).unwrap();
            thread::sleep(Duration::from_millis(100));
        }
    });
    
    // Receive messages
    for received in rx {
        println!("Received: {}", received);
    }
    
    // Multiple producers
    let (tx, rx) = mpsc::channel();
    
    for i in 0..3 {
        let tx_clone = tx.clone();
        thread::spawn(move || {
            for j in 0..3 {
                let msg = Message::Text(format!("Producer {} - Message {}", i, j));
                tx_clone.send(msg).unwrap();
                thread::sleep(Duration::from_millis(50));
            }
            tx_clone.send(Message::Number(i * 10)).unwrap();
        });
    }
    
    // Drop original sender
    drop(tx);
    
    println!("\nMultiple producers:");
    for received in rx {
        match received {
            Message::Text(text) => println!("Text: {}", text),
            Message::Number(num) => println!("Number: {}", num),
            Message::Quit => break,
        }
    }
}

channels_demo();

### Shared State with Arc and Mutex

In [None]:
// Shared state with Arc<Mutex<T>>

use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;

#[derive(Debug)]
struct Counter {
    value: i32,
    name: String,
}

impl Counter {
    fn new(name: String) -> Self {
        Counter { value: 0, name }
    }
    
    fn increment(&mut self) {
        self.value += 1;
        println!("{}: incremented to {}", self.name, self.value);
    }
    
    fn get_value(&self) -> i32 {
        self.value
    }
}

fn shared_state_demo() {
    println!("\n=== Shared State with Arc<Mutex<T>> ===");
    
    // Simple shared counter
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];
    
    for i in 0..5 {
        let counter_clone = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            for j in 0..3 {
                let mut num = counter_clone.lock().unwrap();
                *num += 1;
                println!("Thread {} increment {}: value = {}", i, j, *num);
                // Lock is automatically released when `num` goes out of scope
            }
        });
        handles.push(handle);
    }
    
    for handle in handles {
        handle.join().unwrap();
    }
    
    println!("Final counter value: {}", *counter.lock().unwrap());
    
    // Complex shared state
    let shared_counter = Arc::new(Mutex::new(Counter::new("Shared".to_string())));
    let mut handles = vec![];
    
    for i in 0..3 {
        let counter_clone = Arc::clone(&shared_counter);
        let handle = thread::spawn(move || {
            for _ in 0..2 {
                {
                    let mut counter = counter_clone.lock().unwrap();
                    counter.increment();
                } // Lock released here
                thread::sleep(Duration::from_millis(10));
            }
        });
        handles.push(handle);
    }
    
    for handle in handles {
        handle.join().unwrap();
    }
    
    let final_value = shared_counter.lock().unwrap().get_value();
    println!("\nFinal shared counter value: {}", final_value);
}

shared_state_demo();

In [None]:
// TODO: Complete the concurrent web scraper

use std::sync::{Arc, Mutex, mpsc};
use std::thread;
use std::time::Duration;
use std::collections::HashMap;

#[derive(Debug, Clone)]
struct WebPage {
    url: String,
    content: String,
    word_count: usize,
    load_time_ms: u64,
}

impl WebPage {
    fn new(url: String) -> Self {
        // Simulate web scraping
        let load_time = (url.len() as u64 * 10) + 50; // Simulate variable load times
        thread::sleep(Duration::from_millis(load_time));
        
        let content = format!("Content from {} with some sample text data", url);
        let word_count = content.split_whitespace().count();
        
        WebPage {
            url,
            content,
            word_count,
            load_time_ms: load_time,
        }
    }
}

#[derive(Debug)]
enum WorkerMessage {
    Scrape(String),
    Shutdown,
}

#[derive(Debug)]
enum ResultMessage {
    Success(WebPage),
    Error(String, String), // URL, Error message
    WorkerFinished(usize), // Worker ID
}

struct WebScraper {
    worker_count: usize,
    results: Arc<Mutex<Vec<WebPage>>>,
    errors: Arc<Mutex<Vec<(String, String)>>>,
    stats: Arc<Mutex<HashMap<String, usize>>>,
}

impl WebScraper {
    fn new(worker_count: usize) -> Self {
        WebScraper {
            worker_count,
            results: Arc::new(Mutex::new(Vec::new())),
            errors: Arc::new(Mutex::new(Vec::new())),
            stats: Arc::new(Mutex::new(HashMap::new())),
        }
    }
    
    fn scrape_urls(&self, urls: Vec<String>) {
        let (work_tx, work_rx) = mpsc::channel();
        let (result_tx, result_rx) = mpsc::channel();
        
        // Shared work receiver
        let work_rx = Arc::new(Mutex::new(work_rx));
        
        // Spawn worker threads
        let mut worker_handles = vec![];
        
        for worker_id in 0..self.worker_count {
            let work_rx_clone = Arc::clone(&work_rx);
            let result_tx_clone = result_tx.clone();
            
            let handle = thread::spawn(move || {
                println!("Worker {} started", worker_id);
                
                loop {
                    let message = {
                        let receiver = work_rx_clone.lock().unwrap();
                        receiver.recv()
                    };
                    
                    match message {
                        Ok(WorkerMessage::Scrape(url)) => {
                            println!("Worker {} scraping: {}", worker_id, url);
                            
                            // Simulate potential errors
                            if url.contains("error") {
                                let error_msg = format!("Failed to scrape {}", url);
                                result_tx_clone.send(ResultMessage::Error(url, error_msg)).unwrap();
                            } else {
                                let page = WebPage::new(url);
                                result_tx_clone.send(ResultMessage::Success(page)).unwrap();
                            }
                        }
                        Ok(WorkerMessage::Shutdown) => {
                            println!("Worker {} shutting down", worker_id);
                            result_tx_clone.send(ResultMessage::WorkerFinished(worker_id)).unwrap();
                            break;
                        }
                        Err(_) => {
                            println!("Worker {} channel closed", worker_id);
                            break;
                        }
                    }
                }
            });
            
            worker_handles.push(handle);
        }
        
        // Send work to workers
        for url in urls {
            work_tx.send(WorkerMessage::Scrape(url)).unwrap();
        }
        
        // Send shutdown messages
        for _ in 0..self.worker_count {
            work_tx.send(WorkerMessage::Shutdown).unwrap();
        }
        
        drop(work_tx); // Close the channel
        
        // Collect results
        let mut finished_workers = 0;
        
        for result in result_rx {
            match result {
                ResultMessage::Success(page) => {
                    println!("‚úÖ Scraped: {} ({} words, {}ms)", 
                            page.url, page.word_count, page.load_time_ms);
                    
                    self.results.lock().unwrap().push(page);
                    
                    // Update stats
                    let mut stats = self.stats.lock().unwrap();
                    *stats.entry("successful_scrapes".to_string()).or_insert(0) += 1;
                }
                ResultMessage::Error(url, error) => {
                    println!("‚ùå Error scraping {}: {}", url, error);
                    self.errors.lock().unwrap().push((url, error));
                    
                    // Update stats
                    let mut stats = self.stats.lock().unwrap();
                    *stats.entry("failed_scrapes".to_string()).or_insert(0) += 1;
                }
                ResultMessage::WorkerFinished(worker_id) => {
                    println!("Worker {} finished", worker_id);
                    finished_workers += 1;
                    
                    if finished_workers == self.worker_count {
                        break;
                    }
                }
            }
        }
        
        // Wait for all workers to complete
        for handle in worker_handles {
            handle.join().unwrap();
        }
    }
    
    fn print_summary(&self) {
        let results = self.results.lock().unwrap();
        let errors = self.errors.lock().unwrap();
        let stats = self.stats.lock().unwrap();
        
        println!("\nüìä Scraping Summary:");
        println!("  Successful scrapes: {}", results.len());
        println!("  Failed scrapes: {}", errors.len());
        
        if !results.is_empty() {
            let total_words: usize = results.iter().map(|p| p.word_count).sum();
            let avg_words = total_words / results.len();
            let total_time: u64 = results.iter().map(|p| p.load_time_ms).sum();
            let avg_time = total_time / results.len() as u64;
            
            println!("  Average words per page: {}", avg_words);
            println!("  Average load time: {}ms", avg_time);
        }
        
        if !errors.is_empty() {
            println!("\n‚ùå Errors:");
            for (url, error) in errors.iter() {
                println!("  {}: {}", url, error);
            }
        }
        
        println!("\nüìà Detailed Stats: {:?}", *stats);
    }
}

fn web_scraper_demo() {
    println!("\n=== Concurrent Web Scraper Demo ===");
    
    let scraper = WebScraper::new(3); // 3 worker threads
    
    let urls = vec![
        "https://example.com".to_string(),
        "https://rust-lang.org".to_string(),
        "https://github.com".to_string(),
        "https://stackoverflow.com".to_string(),
        "https://error-site.com".to_string(), // This will simulate an error
        "https://docs.rs".to_string(),
        "https://crates.io".to_string(),
        "https://another-error.com".to_string(), // Another error
    ];
    
    println!("Starting concurrent scraping of {} URLs with {} workers...", 
            urls.len(), scraper.worker_count);
    
    let start_time = std::time::Instant::now();
    scraper.scrape_urls(urls);
    let elapsed = start_time.elapsed();
    
    scraper.print_summary();
    println!("\n‚è±Ô∏è  Total execution time: {:?}", elapsed);
}

web_scraper_demo();

In [None]:
// Basic async functions and futures
// Note: This is conceptual code for learning - evcxr doesn't support async runtimes

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

// Simple async function
async fn simple_async_function() -> String {
    "Hello from async function!".to_string()
}

// Async function with delay simulation
async fn delayed_computation(delay_ms: u64, value: i32) -> i32 {
    // In real async code, this would be:
    // tokio::time::sleep(Duration::from_millis(delay_ms)).await;
    
    println!("Starting computation with {}ms delay for value {}", delay_ms, value);
    
    // Simulate async work
    value * 2
}

// Custom Future implementation
struct DelayedValue {
    value: i32,
    delay: Duration,
    start_time: Option<Instant>,
}

impl DelayedValue {
    fn new(value: i32, delay: Duration) -> Self {
        DelayedValue {
            value,
            delay,
            start_time: None,
        }
    }
}

impl Future for DelayedValue {
    type Output = i32;
    
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let start_time = self.start_time.get_or_insert_with(Instant::now);
        
        if start_time.elapsed() >= self.delay {
            println!("DelayedValue ready: {}", self.value);
            Poll::Ready(self.value)
        } else {
            // In a real runtime, we'd register a waker
            cx.waker().wake_by_ref();
            Poll::Pending
        }
    }
}

// Async error handling
#[derive(Debug)]
enum AsyncError {
    NetworkError(String),
    TimeoutError,
    ParseError(String),
}

impl std::fmt::Display for AsyncError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            AsyncError::NetworkError(msg) => write!(f, "Network error: {}", msg),
            AsyncError::TimeoutError => write!(f, "Operation timed out"),
            AsyncError::ParseError(msg) => write!(f, "Parse error: {}", msg),
        }
    }
}

impl std::error::Error for AsyncError {}

// Async function with error handling
async fn fetch_data(url: &str) -> Result<String, AsyncError> {
    println!("Fetching data from: {}", url);
    
    // Simulate different outcomes
    if url.contains("error") {
        Err(AsyncError::NetworkError("Connection failed".to_string()))
    } else if url.contains("timeout") {
        Err(AsyncError::TimeoutError)
    } else {
        Ok(format!("Data from {}", url))
    }
}

// Combining multiple async operations
async fn process_multiple_urls(urls: Vec<&str>) -> Vec<Result<String, AsyncError>> {
    let mut results = Vec::new();
    
    for url in urls {
        let result = fetch_data(url).await;
        results.push(result);
    }
    
    results
}

fn async_concepts_demo() {
    println!("=== Async Programming Concepts ===");
    
    // Note: In a real async environment, you would use:
    // #[tokio::main]
    // async fn main() {
    //     let result = simple_async_function().await;
    //     println!("Result: {}", result);
    // }
    
    println!("Async function concepts:");
    println!("- async fn returns impl Future<Output = T>");
    println!("- .await suspends execution until Future is ready");
    println!("- Futures are lazy - they don't run until awaited");
    println!("- Runtime manages task scheduling and I/O");
    
    // Demonstrate Future trait concepts
    println!("\nCustom Future implementation:");
    let delayed = DelayedValue::new(42, Duration::from_millis(100));
    println!("Created DelayedValue future (not yet executed)");
    
    // Error handling patterns
    println!("\nAsync error handling patterns:");
    println!("- Use Result<T, E> for fallible operations");
    println!("- ? operator works with async functions");
    println!("- Combine with try_join! for concurrent error handling");
}

async_concepts_demo();

In [None]:
// Async patterns and combinators (conceptual)

use std::collections::HashMap;

// Async data structures
#[derive(Debug, Clone)]
struct AsyncTask {
    id: u32,
    name: String,
    duration_ms: u64,
    dependencies: Vec<u32>,
}

impl AsyncTask {
    fn new(id: u32, name: String, duration_ms: u64) -> Self {
        AsyncTask {
            id,
            name,
            duration_ms,
            dependencies: Vec::new(),
        }
    }
    
    fn with_dependencies(mut self, deps: Vec<u32>) -> Self {
        self.dependencies = deps;
        self
    }
}

// Async task executor (conceptual)
struct AsyncTaskExecutor {
    tasks: HashMap<u32, AsyncTask>,
    completed: HashMap<u32, String>,
}

impl AsyncTaskExecutor {
    fn new() -> Self {
        AsyncTaskExecutor {
            tasks: HashMap::new(),
            completed: HashMap::new(),
        }
    }
    
    fn add_task(&mut self, task: AsyncTask) {
        self.tasks.insert(task.id, task);
    }
    
    // Simulate async task execution
    async fn execute_task(&mut self, task_id: u32) -> Result<String, String> {
        let task = self.tasks.get(&task_id)
            .ok_or_else(|| format!("Task {} not found", task_id))?;
        
        // Check dependencies
        for dep_id in &task.dependencies {
            if !self.completed.contains_key(dep_id) {
                return Err(format!("Dependency {} not completed for task {}", dep_id, task_id));
            }
        }
        
        println!("Executing task {}: {} ({}ms)", task.id, task.name, task.duration_ms);
        
        // In real async code:
        // tokio::time::sleep(Duration::from_millis(task.duration_ms)).await;
        
        let result = format!("Completed: {}", task.name);
        self.completed.insert(task_id, result.clone());
        
        Ok(result)
    }
    
    // Execute all tasks respecting dependencies
    async fn execute_all(&mut self) -> Vec<Result<String, String>> {
        let mut results = Vec::new();
        let task_ids: Vec<u32> = self.tasks.keys().cloned().collect();
        
        // Simple sequential execution (in real code, use proper dependency resolution)
        for task_id in task_ids {
            let result = self.execute_task(task_id).await;
            results.push(result);
        }
        
        results
    }
}

// Async stream processing (conceptual)
struct AsyncDataProcessor {
    name: String,
}

impl AsyncDataProcessor {
    fn new(name: String) -> Self {
        AsyncDataProcessor { name }
    }
    
    // Process data asynchronously
    async fn process_item(&self, item: i32) -> Result<i32, String> {
        println!("{}: Processing item {}", self.name, item);
        
        // Simulate processing time and potential errors
        if item < 0 {
            Err(format!("Invalid item: {}", item))
        } else {
            Ok(item * 2)
        }
    }
    
    // Process multiple items
    async fn process_batch(&self, items: Vec<i32>) -> Vec<Result<i32, String>> {
        let mut results = Vec::new();
        
        for item in items {
            let result = self.process_item(item).await;
            results.push(result);
        }
        
        results
    }
    
    // Concurrent processing (conceptual)
    async fn process_concurrent(&self, items: Vec<i32>) -> Vec<Result<i32, String>> {
        println!("{}: Starting concurrent processing of {} items", self.name, items.len());
        
        // In real async code with tokio:
        // let futures: Vec<_> = items.into_iter()
        //     .map(|item| self.process_item(item))
        //     .collect();
        // 
        // futures::future::join_all(futures).await
        
        // For demonstration, process sequentially
        self.process_batch(items).await
    }
}

// Async HTTP client simulation
struct AsyncHttpClient {
    base_url: String,
    timeout_ms: u64,
}

impl AsyncHttpClient {
    fn new(base_url: String, timeout_ms: u64) -> Self {
        AsyncHttpClient { base_url, timeout_ms }
    }
    
    async fn get(&self, path: &str) -> Result<String, AsyncError> {
        let url = format!("{}/{}", self.base_url, path);
        println!("GET {}", url);
        
        // Simulate network request
        if path.contains("slow") {
            // Simulate timeout
            Err(AsyncError::TimeoutError)
        } else if path.contains("error") {
            Err(AsyncError::NetworkError("404 Not Found".to_string()))
        } else {
            Ok(format!("Response from {}", url))
        }
    }
    
    async fn post(&self, path: &str, data: &str) -> Result<String, AsyncError> {
        let url = format!("{}/{}", self.base_url, path);
        println!("POST {} with data: {}", url, data);
        
        // Simulate POST request
        Ok(format!("Posted to {}: {}", url, data))
    }
    
    // Batch requests
    async fn batch_get(&self, paths: Vec<&str>) -> Vec<Result<String, AsyncError>> {
        let mut results = Vec::new();
        
        for path in paths {
            let result = self.get(path).await;
            results.push(result);
        }
        
        results
    }
}

fn async_patterns_demo() {
    println!("\n=== Async Patterns and Combinators ===");
    
    // Task execution patterns
    println!("Async task execution patterns:");
    println!("- Sequential: await each task in order");
    println!("- Concurrent: join_all for parallel execution");
    println!("- Racing: select! for first completion");
    println!("- Streaming: process data as it arrives");
    
    // Error handling patterns
    println!("\nAsync error handling:");
    println!("- try_join! fails fast on first error");
    println!("- join_all collects all results");
    println!("- timeout() adds time limits");
    println!("- retry() for resilient operations");
    
    // Common async patterns
    println!("\nCommon async patterns:");
    println!("- Producer/Consumer with async channels");
    println!("- Connection pooling for resources");
    println!("- Circuit breaker for fault tolerance");
    println!("- Rate limiting for API calls");
    
    // Performance considerations
    println!("\nPerformance considerations:");
    println!("- Async is great for I/O-bound tasks");
    println!("- Use threads for CPU-intensive work");
    println!("- Avoid blocking operations in async code");
    println!("- Consider task spawning overhead");
}

async_patterns_demo();

In [None]:
// TODO: Complete the async web service simulation

use std::collections::HashMap;
use std::time::{Duration, Instant};

#[derive(Debug, Clone)]
struct Request {
    id: u32,
    method: String,
    path: String,
    body: Option<String>,
    timestamp: Instant,
}

impl Request {
    fn new(id: u32, method: String, path: String) -> Self {
        Request {
            id,
            method,
            path,
            body: None,
            timestamp: Instant::now(),
        }
    }
    
    fn with_body(mut self, body: String) -> Self {
        self.body = Some(body);
        self
    }
}

#[derive(Debug, Clone)]
struct Response {
    status: u16,
    body: String,
    processing_time_ms: u64,
}

impl Response {
    fn ok(body: String, processing_time_ms: u64) -> Self {
        Response {
            status: 200,
            body,
            processing_time_ms,
        }
    }
    
    fn not_found(processing_time_ms: u64) -> Self {
        Response {
            status: 404,
            body: "Not Found".to_string(),
            processing_time_ms,
        }
    }
    
    fn error(message: String, processing_time_ms: u64) -> Self {
        Response {
            status: 500,
            body: message,
            processing_time_ms,
        }
    }
}

// Async web service
struct AsyncWebService {
    name: String,
    data_store: HashMap<String, String>,
    request_count: u32,
}

impl AsyncWebService {
    fn new(name: String) -> Self {
        let mut data_store = HashMap::new();
        data_store.insert("users/1".to_string(), "User 1 Data".to_string());
        data_store.insert("users/2".to_string(), "User 2 Data".to_string());
        data_store.insert("posts/1".to_string(), "Post 1 Content".to_string());
        
        AsyncWebService {
            name,
            data_store,
            request_count: 0,
        }
    }
    
    // Handle GET requests
    async fn handle_get(&mut self, path: &str) -> Response {
        let start_time = Instant::now();
        
        println!("{}: Handling GET {}", self.name, path);
        
        // Simulate processing time based on path
        let processing_delay = match path {
            p if p.contains("slow") => 200,
            p if p.contains("users") => 50,
            p if p.contains("posts") => 30,
            _ => 10,
        };
        
        // In real async code: tokio::time::sleep(Duration::from_millis(processing_delay)).await;
        
        let processing_time = start_time.elapsed().as_millis() as u64 + processing_delay;
        
        // Simulate different responses
        if path.contains("error") {
            Response::error("Internal Server Error".to_string(), processing_time)
        } else if let Some(data) = self.data_store.get(path) {
            Response::ok(data.clone(), processing_time)
        } else {
            Response::not_found(processing_time)
        }
    }
    
    // Handle POST requests
    async fn handle_post(&mut self, path: &str, body: &str) -> Response {
        let start_time = Instant::now();
        
        println!("{}: Handling POST {} with body: {}", self.name, path, body);
        
        // Simulate processing
        let processing_delay = 100; // POST operations are typically slower
        let processing_time = start_time.elapsed().as_millis() as u64 + processing_delay;
        
        if path.contains("error") {
            Response::error("Failed to create resource".to_string(), processing_time)
        } else {
            // Store the data
            self.data_store.insert(path.to_string(), body.to_string());
            Response::ok(format!("Created resource at {}", path), processing_time)
        }
    }
    
    // Main request handler
    async fn handle_request(&mut self, request: Request) -> Response {
        self.request_count += 1;
        
        println!("\n[Request {}] {} {} (ID: {})", 
                self.request_count, request.method, request.path, request.id);
        
        let response = match request.method.as_str() {
            "GET" => self.handle_get(&request.path).await,
            "POST" => {
                let body = request.body.as_deref().unwrap_or("");
                self.handle_post(&request.path, body).await
            }
            _ => Response::error("Method not allowed".to_string(), 1),
        };
        
        println!("[Response {}] Status: {}, Time: {}ms", 
                request.id, response.status, response.processing_time_ms);
        
        response
    }
    
    // Process multiple requests
    async fn process_requests(&mut self, requests: Vec<Request>) -> Vec<Response> {
        let mut responses = Vec::new();
        
        println!("\nüöÄ Processing {} requests...", requests.len());
        
        for request in requests {
            let response = self.handle_request(request).await;
            responses.push(response);
        }
        
        responses
    }
    
    // Concurrent request processing (conceptual)
    async fn process_concurrent(&mut self, requests: Vec<Request>) -> Vec<Response> {
        println!("\n‚ö° Processing {} requests concurrently...", requests.len());
        
        // In real async code with proper concurrency:
        // let futures: Vec<_> = requests.into_iter()
        //     .map(|req| self.handle_request(req))
        //     .collect();
        // futures::future::join_all(futures).await
        
        // For demonstration, process sequentially
        self.process_requests(requests).await
    }
    
    fn print_stats(&self, responses: &[Response]) {
        let total_requests = responses.len();
        let successful = responses.iter().filter(|r| r.status == 200).count();
        let errors = responses.iter().filter(|r| r.status >= 400).count();
        
        let total_time: u64 = responses.iter().map(|r| r.processing_time_ms).sum();
        let avg_time = if total_requests > 0 { total_time / total_requests as u64 } else { 0 };
        
        println!("\nüìä Service Statistics:");
        println!("  Total requests: {}", total_requests);
        println!("  Successful: {} ({:.1}%)", successful, 
                (successful as f64 / total_requests as f64) * 100.0);
        println!("  Errors: {} ({:.1}%)", errors, 
                (errors as f64 / total_requests as f64) * 100.0);
        println!("  Average response time: {}ms", avg_time);
        println!("  Total processing time: {}ms", total_time);
        println!("  Data store entries: {}", self.data_store.len());
    }
}

// Load balancer simulation
struct AsyncLoadBalancer {
    services: Vec<AsyncWebService>,
    current_service: usize,
}

impl AsyncLoadBalancer {
    fn new(service_names: Vec<String>) -> Self {
        let services = service_names.into_iter()
            .map(AsyncWebService::new)
            .collect();
        
        AsyncLoadBalancer {
            services,
            current_service: 0,
        }
    }
    
    // Round-robin load balancing
    async fn handle_request(&mut self, request: Request) -> Response {
        let service_index = self.current_service;
        self.current_service = (self.current_service + 1) % self.services.len();
        
        println!("üîÑ Load balancer routing request {} to service {}", 
                request.id, service_index);
        
        self.services[service_index].handle_request(request).await
    }
    
    async fn process_requests(&mut self, requests: Vec<Request>) -> Vec<Response> {
        let mut responses = Vec::new();
        
        for request in requests {
            let response = self.handle_request(request).await;
            responses.push(response);
        }
        
        responses
    }
}

fn async_web_service_demo() {
    println!("\n=== Async Web Service Simulation ===");
    
    // Create requests
    let requests = vec![
        Request::new(1, "GET".to_string(), "users/1".to_string()),
        Request::new(2, "GET".to_string(), "users/2".to_string()),
        Request::new(3, "POST".to_string(), "users/3".to_string())
            .with_body("New User Data".to_string()),
        Request::new(4, "GET".to_string(), "posts/1".to_string()),
        Request::new(5, "GET".to_string(), "nonexistent".to_string()),
        Request::new(6, "GET".to_string(), "error-endpoint".to_string()),
        Request::new(7, "GET".to_string(), "slow-endpoint".to_string()),
        Request::new(8, "POST".to_string(), "posts/2".to_string())
            .with_body("New Post Content".to_string()),
    ];
    
    println!("Created {} test requests", requests.len());
    
    // Note: In a real async environment, you would run this with:
    // let mut service = AsyncWebService::new("TestService".to_string());
    // let responses = service.process_requests(requests).await;
    // service.print_stats(&responses);
    
    println!("\nüí° In a real async environment, this would:");
    println!("  - Process requests concurrently");
    println!("  - Use actual async I/O operations");
    println!("  - Handle thousands of concurrent connections");
    println!("  - Provide much better performance than blocking I/O");
    
    println!("\nüîß Key async patterns demonstrated:");
    println!("  - Async functions with .await");
    println!("  - Error handling with Result types");
    println!("  - State management in async contexts");
    println!("  - Request/response patterns");
    println!("  - Load balancing and service distribution");
}

async_web_service_demo();

---

## ‚ö†Ô∏è Common Pitfalls: Concurrency

### 1. Data Races with Shared Mutable State
```rust
use std::thread;

let mut counter = 0;
let handle = thread::spawn(|| {
    counter += 1;  // ‚ùå Error: can't capture mutable reference across threads
});
```
**Solution:** Use `Arc<Mutex<T>>` for shared mutable state:
```rust
use std::sync::{Arc, Mutex};

let counter = Arc::new(Mutex::new(0));
let counter_clone = Arc::clone(&counter);
let handle = thread::spawn(move || {
    let mut num = counter_clone.lock().unwrap();
    *num += 1;
});
```

### 2. Deadlocks from Lock Ordering
```rust
use std::sync::Mutex;

let lock_a = Mutex::new(1);
let lock_b = Mutex::new(2);

// Thread 1: locks A then B
thread::spawn(|| {
    let a = lock_a.lock().unwrap();
    let b = lock_b.lock().unwrap();  // ‚ùå Potential deadlock
});

// Thread 2: locks B then A
thread::spawn(|| {
    let b = lock_b.lock().unwrap();
    let a = lock_a.lock().unwrap();  // ‚ùå Potential deadlock
});
```
**Solution:** Always acquire locks in the same order across all threads

### 3. Forgetting to Join Threads
```rust
thread::spawn(|| {
    println!("Important work!");
});  // ‚ùå Thread may not complete before main exits
```
**Solution:** Store handle and join:
```rust
let handle = thread::spawn(|| {
    println!("Important work!");
});
handle.join().unwrap();  // ‚úÖ Wait for completion
```

### 4. Channel Sender/Receiver Lifetime Issues
```rust
use std::sync::mpsc;

let (tx, rx) = mpsc::channel();
drop(tx);  // Drop sender
rx.recv();  // ‚ùå Error: all senders disconnected
```
**Solution:** Keep senders alive while receivers are waiting

### 5. Blocking in Async Context
```rust
async fn bad_async() {
    std::thread::sleep(Duration::from_secs(1));  // ‚ùå Blocks executor thread!
}
```
**Solution:** Use async sleep:
```rust
async fn good_async() {
    tokio::time::sleep(Duration::from_secs(1)).await;  // ‚úÖ Yields to executor
}
```

### 6. Not Handling Panics in Threads
```rust
let handle = thread::spawn(|| {
    panic!("Thread panic!");
});
// Main thread continues...
handle.join().unwrap();  // ‚ùå Panic propagates here
```
**Solution:** Handle join result:
```rust
match handle.join() {
    Ok(_) => println!("Thread completed"),
    Err(e) => println!("Thread panicked: {:?}", e),
}
```

### 7. Send/Sync Trait Violations
```rust
use std::rc::Rc;

let rc = Rc::new(5);
thread::spawn(move || {
    println!("{}", rc);  // ‚ùå Error: Rc is not Send
});
```
**Solution:** Use thread-safe types:
- `Rc<T>` ‚Üí `Arc<T>` (Send + Sync)
- `RefCell<T>` ‚Üí `Mutex<T>` or `RwLock<T>`

### 8. Forgetting .await in Async Functions
```rust
async fn fetch_data() -> String {
    // ...
}

async fn process() {
    let data = fetch_data();  // ‚ùå Returns Future, doesn't execute
    println!("{}", data);  // Error: Future doesn't implement Display
}
```
**Solution:** Always .await async calls:
```rust
async fn process() {
    let data = fetch_data().await;  // ‚úÖ Executes and waits
    println!("{}", data);
}
```

### 9. Mutex Poisoning
```rust
let mutex = Arc::new(Mutex::new(0));
let mutex_clone = Arc::clone(&mutex);

thread::spawn(move || {
    let mut data = mutex_clone.lock().unwrap();
    *data += 1;
    panic!("Oops!");  // Mutex becomes poisoned
});

// Later...
let data = mutex.lock().unwrap();  // ‚ùå Panic: mutex poisoned
```
**Solution:** Handle poisoned mutex:
```rust
match mutex.lock() {
    Ok(guard) => { /* use guard */ },
    Err(poisoned) => {
        let guard = poisoned.into_inner();  // Recover data
        // Handle poisoned state
    }
}
```

### 10. Race Conditions with Channels
```rust
let (tx, rx) = mpsc::channel();

thread::spawn(move || {
    tx.send(1).unwrap();
    tx.send(2).unwrap();
});

// ‚ö†Ô∏è Order not guaranteed in complex scenarios
println!("{}", rx.recv().unwrap());
```
**Note:** Messages arrive in order from single sender, but multiple senders may interleave

**üìö Rust Book References:**
- [Chapter 16 - Fearless Concurrency](https://doc.rust-lang.org/book/ch16-00-concurrency.html)
- [Chapter 16.1 - Using Threads](https://doc.rust-lang.org/book/ch16-01-threads.html)
- [Chapter 16.2 - Message Passing](https://doc.rust-lang.org/book/ch16-02-message-passing.html)
- [Chapter 16.3 - Shared-State Concurrency](https://doc.rust-lang.org/book/ch16-03-shared-state.html)