# Lesson A4: Concurrency Fundamentals

**Duration**: 150-165 minutes  
**Stage**: Advanced (Mastery)

---

## 🎯 Learning Objectives

By the end of this lesson, you will be able to:
1. Create and manage threads using std::thread
2. Communicate between threads using channels (mpsc)
3. Share state safely with Arc<T> and Mutex<T>
4. Understand Send and Sync traits for thread safety
5. Apply concurrent programming patterns effectively

---

## 📋 Prerequisite Review

**Quick Check**: From our previous lessons:

1. How do smart pointers like Rc<T> and Arc<T> differ?
2. What is interior mutability with RefCell<T>?
3. How do you prevent reference cycles?
4. What are the benefits of Box<T> for heap allocation?

**Answers**: 1) Rc<T> is single-threaded, Arc<T> is thread-safe, 2) Mutating through shared references with runtime checks, 3) Use Weak<T> references, 4) Heap allocation, recursive types, large data

---

## 🧠 Key Concepts

### Thread Safety in Rust

**Send Trait**: Types that can be transferred between threads
**Sync Trait**: Types that can be referenced from multiple threads
**Arc<T>**: Atomic reference counting for shared ownership
**Mutex<T>**: Mutual exclusion for thread-safe mutation
**Channels**: Message passing for thread communication

### Concurrency Models

- **Shared State**: Multiple threads access shared data with synchronization
- **Message Passing**: Threads communicate by sending messages
- **Actor Model**: Isolated actors communicate via messages
- **Data Parallelism**: Parallel processing of data collections

---

## 🔬 Live Code Exploration

### Basic Thread Creation and Management

In [None]:
// Basic thread creation and management

use std::thread;
use std::time::Duration;

fn basic_threads_demo() {
    println!("=== Basic Thread Creation ===");
    
    // Spawn a simple thread
    let handle = thread::spawn(|| {
        for i in 1..=5 {
            println!("Thread: count {}", i);
            thread::sleep(Duration::from_millis(100));
        }
    });
    
    // Main thread work
    for i in 1..=3 {
        println!("Main: count {}", i);
        thread::sleep(Duration::from_millis(150));
    }
    
    // Wait for thread to complete
    handle.join().unwrap();
    println!("Thread completed!\n");
    
    // Multiple threads
    let mut handles = vec![];
    
    for i in 0..3 {
        let handle = thread::spawn(move || {
            println!("Worker thread {} starting", i);
            thread::sleep(Duration::from_millis(100 * (i + 1) as u64));
            println!("Worker thread {} finished", i);
            i * i // Return value
        });
        handles.push(handle);
    }
    
    // Collect results
    let results: Vec<i32> = handles
        .into_iter()
        .map(|handle| handle.join().unwrap())
        .collect();
    
    println!("Thread results: {:?}", results);
}

basic_threads_demo();

### Message Passing with Channels

In [None]:
// Message passing with channels

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

#[derive(Debug, Clone)]
enum Message {
    Text(String),
    Number(i32),
    Quit,
}

fn channels_demo() {
    println!("\n=== Message Passing with Channels ===");
    
    // Simple channel example
    let (tx, rx) = mpsc::channel();
    
    thread::spawn(move || {
        let messages = vec![
            "Hello",
            "from", 
            "the",
            "thread"
        ];
        
        for msg in messages {
            tx.send(msg.to_string()).unwrap();
            thread::sleep(Duration::from_millis(100));
        }
    });
    
    // Receive messages
    for received in rx {
        println!("Received: {}", received);
    }
    
    // Multiple producers
    let (tx, rx) = mpsc::channel();
    
    for i in 0..3 {
        let tx_clone = tx.clone();
        thread::spawn(move || {
            for j in 0..3 {
                let msg = Message::Text(format!("Producer {} - Message {}", i, j));
                tx_clone.send(msg).unwrap();
                thread::sleep(Duration::from_millis(50));
            }
            tx_clone.send(Message::Number(i * 10)).unwrap();
        });
    }
    
    // Drop original sender
    drop(tx);
    
    println!("\nMultiple producers:");
    for received in rx {
        match received {
            Message::Text(text) => println!("Text: {}", text),
            Message::Number(num) => println!("Number: {}", num),
            Message::Quit => break,
        }
    }
}

channels_demo();

### Shared State with Arc and Mutex

In [None]:
// Shared state with Arc<Mutex<T>>

use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;

#[derive(Debug)]
struct Counter {
    value: i32,
    name: String,
}

impl Counter {
    fn new(name: String) -> Self {
        Counter { value: 0, name }
    }
    
    fn increment(&mut self) {
        self.value += 1;
        println!("{}: incremented to {}", self.name, self.value);
    }
    
    fn get_value(&self) -> i32 {
        self.value
    }
}

fn shared_state_demo() {
    println!("\n=== Shared State with Arc<Mutex<T>> ===");
    
    // Simple shared counter
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];
    
    for i in 0..5 {
        let counter_clone = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            for j in 0..3 {
                let mut num = counter_clone.lock().unwrap();
                *num += 1;
                println!("Thread {} increment {}: value = {}", i, j, *num);
                // Lock is automatically released when `num` goes out of scope
            }
        });
        handles.push(handle);
    }
    
    for handle in handles {
        handle.join().unwrap();
    }
    
    println!("Final counter value: {}", *counter.lock().unwrap());
    
    // Complex shared state
    let shared_counter = Arc::new(Mutex::new(Counter::new("Shared".to_string())));
    let mut handles = vec![];
    
    for i in 0..3 {
        let counter_clone = Arc::clone(&shared_counter);
        let handle = thread::spawn(move || {
            for _ in 0..2 {
                {
                    let mut counter = counter_clone.lock().unwrap();
                    counter.increment();
                } // Lock released here
                thread::sleep(Duration::from_millis(10));
            }
        });
        handles.push(handle);
    }
    
    for handle in handles {
        handle.join().unwrap();
    }
    
    let final_value = shared_counter.lock().unwrap().get_value();
    println!("\nFinal shared counter value: {}", final_value);
}

shared_state_demo();

---

## 🎯 Guided Practice

### Exercise 1: Concurrent Web Scraper

Create a concurrent web scraper simulation using threads and channels.

In [None]:
// TODO: Complete the concurrent web scraper

use std::sync::{Arc, Mutex, mpsc};
use std::thread;
use std::time::Duration;
use std::collections::HashMap;

#[derive(Debug, Clone)]
struct WebPage {
    url: String,
    content: String,
    word_count: usize,
    load_time_ms: u64,
}

impl WebPage {
    fn new(url: String) -> Self {
        // Simulate web scraping
        let load_time = (url.len() as u64 * 10) + 50; // Simulate variable load times
        thread::sleep(Duration::from_millis(load_time));
        
        let content = format!("Content from {} with some sample text data", url);
        let word_count = content.split_whitespace().count();
        
        WebPage {
            url,
            content,
            word_count,
            load_time_ms: load_time,
        }
    }
}

#[derive(Debug)]
enum WorkerMessage {
    Scrape(String),
    Shutdown,
}

#[derive(Debug)]
enum ResultMessage {
    Success(WebPage),
    Error(String, String), // URL, Error message
    WorkerFinished(usize), // Worker ID
}

struct WebScraper {
    worker_count: usize,
    results: Arc<Mutex<Vec<WebPage>>>,
    errors: Arc<Mutex<Vec<(String, String)>>>,
    stats: Arc<Mutex<HashMap<String, usize>>>,
}

impl WebScraper {
    fn new(worker_count: usize) -> Self {
        WebScraper {
            worker_count,
            results: Arc::new(Mutex::new(Vec::new())),
            errors: Arc::new(Mutex::new(Vec::new())),
            stats: Arc::new(Mutex::new(HashMap::new())),
        }
    }
    
    fn scrape_urls(&self, urls: Vec<String>) {
        let (work_tx, work_rx) = mpsc::channel();
        let (result_tx, result_rx) = mpsc::channel();
        
        // Shared work receiver
        let work_rx = Arc::new(Mutex::new(work_rx));
        
        // Spawn worker threads
        let mut worker_handles = vec![];
        
        for worker_id in 0..self.worker_count {
            let work_rx_clone = Arc::clone(&work_rx);
            let result_tx_clone = result_tx.clone();
            
            let handle = thread::spawn(move || {
                println!("Worker {} started", worker_id);
                
                loop {
                    let message = {
                        let receiver = work_rx_clone.lock().unwrap();
                        receiver.recv()
                    };
                    
                    match message {
                        Ok(WorkerMessage::Scrape(url)) => {
                            println!("Worker {} scraping: {}", worker_id, url);
                            
                            // Simulate potential errors
                            if url.contains("error") {
                                let error_msg = format!("Failed to scrape {}", url);
                                result_tx_clone.send(ResultMessage::Error(url, error_msg)).unwrap();
                            } else {
                                let page = WebPage::new(url);
                                result_tx_clone.send(ResultMessage::Success(page)).unwrap();
                            }
                        }
                        Ok(WorkerMessage::Shutdown) => {
                            println!("Worker {} shutting down", worker_id);
                            result_tx_clone.send(ResultMessage::WorkerFinished(worker_id)).unwrap();
                            break;
                        }
                        Err(_) => {
                            println!("Worker {} channel closed", worker_id);
                            break;
                        }
                    }
                }
            });
            
            worker_handles.push(handle);
        }
        
        // Send work to workers
        for url in urls {
            work_tx.send(WorkerMessage::Scrape(url)).unwrap();
        }
        
        // Send shutdown messages
        for _ in 0..self.worker_count {
            work_tx.send(WorkerMessage::Shutdown).unwrap();
        }
        
        drop(work_tx); // Close the channel
        
        // Collect results
        let mut finished_workers = 0;
        
        for result in result_rx {
            match result {
                ResultMessage::Success(page) => {
                    println!("✅ Scraped: {} ({} words, {}ms)", 
                            page.url, page.word_count, page.load_time_ms);
                    
                    self.results.lock().unwrap().push(page);
                    
                    // Update stats
                    let mut stats = self.stats.lock().unwrap();
                    *stats.entry("successful_scrapes".to_string()).or_insert(0) += 1;
                }
                ResultMessage::Error(url, error) => {
                    println!("❌ Error scraping {}: {}", url, error);
                    self.errors.lock().unwrap().push((url, error));
                    
                    // Update stats
                    let mut stats = self.stats.lock().unwrap();
                    *stats.entry("failed_scrapes".to_string()).or_insert(0) += 1;
                }
                ResultMessage::WorkerFinished(worker_id) => {
                    println!("Worker {} finished", worker_id);
                    finished_workers += 1;
                    
                    if finished_workers == self.worker_count {
                        break;
                    }
                }
            }
        }
        
        // Wait for all workers to complete
        for handle in worker_handles {
            handle.join().unwrap();
        }
    }
    
    fn print_summary(&self) {
        let results = self.results.lock().unwrap();
        let errors = self.errors.lock().unwrap();
        let stats = self.stats.lock().unwrap();
        
        println!("\n📊 Scraping Summary:");
        println!("  Successful scrapes: {}", results.len());
        println!("  Failed scrapes: {}", errors.len());
        
        if !results.is_empty() {
            let total_words: usize = results.iter().map(|p| p.word_count).sum();
            let avg_words = total_words / results.len();
            let total_time: u64 = results.iter().map(|p| p.load_time_ms).sum();
            let avg_time = total_time / results.len() as u64;
            
            println!("  Average words per page: {}", avg_words);
            println!("  Average load time: {}ms", avg_time);
        }
        
        if !errors.is_empty() {
            println!("\n❌ Errors:");
            for (url, error) in errors.iter() {
                println!("  {}: {}", url, error);
            }
        }
        
        println!("\n📈 Detailed Stats: {:?}", *stats);
    }
}

fn web_scraper_demo() {
    println!("\n=== Concurrent Web Scraper Demo ===");
    
    let scraper = WebScraper::new(3); // 3 worker threads
    
    let urls = vec![
        "https://example.com".to_string(),
        "https://rust-lang.org".to_string(),
        "https://github.com".to_string(),
        "https://stackoverflow.com".to_string(),
        "https://error-site.com".to_string(), // This will simulate an error
        "https://docs.rs".to_string(),
        "https://crates.io".to_string(),
        "https://another-error.com".to_string(), // Another error
    ];
    
    println!("Starting concurrent scraping of {} URLs with {} workers...", 
            urls.len(), scraper.worker_count);
    
    let start_time = std::time::Instant::now();
    scraper.scrape_urls(urls);
    let elapsed = start_time.elapsed();
    
    scraper.print_summary();
    println!("\n⏱️  Total execution time: {:?}", elapsed);
}

web_scraper_demo();

---

## 🧪 Active Recall Checkpoint

**Test your understanding without looking back:**

1. What's the difference between Send and Sync traits?
2. How do you share data between threads safely?
3. What's the difference between Rc<T> and Arc<T>?
4. How do channels work for thread communication?
5. When would you use message passing vs shared state?
6. What happens if you don't join() a thread?
7. How does Mutex<T> prevent data races?
8. What are the benefits of Rust's concurrency model?

**Write your answers below:**

**Your Answers:**
1. 
2. 
3. 
4. 
5. 
6. 
7. 
8. 

---

## 🤔 Reflection Prompt

Consider these questions:

1. **How does Rust's ownership system enable safe concurrency?**
2. **What are the trade-offs between message passing and shared state?**
3. **How do you decide when to use concurrency in your programs?**
4. **What challenges does concurrent programming introduce?**

Write your thoughts below:

**Your Reflections:**

1. 

2. 

3. 

4. 

---

## 🔮 Preview & Connections

### Coming Up Next: Async Programming

In our next lesson, you'll learn about:
- Async/await syntax and futures
- Async runtimes like Tokio
- Async I/O and networking
- Combining async with concurrency

### How This Connects
Concurrency fundamentals are essential for:
- Building high-performance applications
- Understanding async programming
- Working with parallel algorithms
- Creating scalable systems

---

## ✅ Expected Outcomes

**Self-Assessment Checklist** - Can you:

- [ ] Create and manage threads effectively?
- [ ] Use channels for thread communication?
- [ ] Share state safely with Arc<T> and Mutex<T>?
- [ ] Understand Send and Sync trait implications?
- [ ] Choose between message passing and shared state?
- [ ] Handle thread synchronization correctly?
- [ ] Design concurrent systems safely?

If you checked all boxes, excellent! You've mastered Rust's concurrency fundamentals.

---

**🎉 Outstanding Achievement!** You now understand how to write safe, concurrent programs in Rust!