# Chapter 7: Concurrency and Parallelism (Part 1)

**Items 52-58**

---

## Item 52: Use subprocess to Manage Child Processes

### Overview

Python has battle-hardened libraries for running and managing child processes. This makes it a great language for:
- Gluing together other tools (command-line utilities)
- Graduating shell scripts to Python for readability and maintainability
- Consuming all CPU cores of a machine
- Driving and coordinating CPU-intensive workloads

### Key Concepts

**Best choice for managing child processes:** `subprocess` built-in module

**Two main approaches:**
1. `subprocess.run()` - Simple convenience function
2. `subprocess.Popen` - Advanced usage for pipelines

### Basic subprocess Usage with run()

In [None]:
import subprocess

# Simple subprocess execution
result = subprocess.run(
    ['echo', 'Hello from the child!'],
    capture_output=True,
    encoding='utf-8'
)

result.check_returncode()  # No exception means clean exit
print(result.stdout)

### Enhanced Example: Running Multiple Commands

In [None]:
import subprocess

# Running multiple commands and collecting results
commands = [
    ['echo', 'First command'],
    ['echo', 'Second command'],
    ['echo', 'Third command']
]

results = []
for cmd in commands:
    result = subprocess.run(
        cmd,
        capture_output=True,
        encoding='utf-8'
    )
    results.append(result.stdout.strip())

print("All command outputs:")
for i, output in enumerate(results, 1):
    print(f"{i}. {output}")

### Using Popen for Non-blocking Execution

In [None]:
import subprocess
import time

# Non-blocking subprocess with Popen
proc = subprocess.Popen(['sleep', '1'])

while proc.poll() is None:
    print('Working...')
    # Some time-consuming work here
    time.sleep(0.3)

print('Exit status', proc.poll())

### Parallel Subprocess Execution

In [None]:
import subprocess
import time

# Start multiple processes in parallel
start = time.time()
sleep_procs = []

for _ in range(10):
    proc = subprocess.Popen(['sleep', '1'])
    sleep_procs.append(proc)

# Wait for all processes to complete
for proc in sleep_procs:
    proc.communicate()

end = time.time()
delta = end - start
print(f'Finished in {delta:.3} seconds')
print(f"Expected: ~1 second (parallel execution)")
print(f"Without parallelism: ~10 seconds")

### Piping Data Between Processes

In [None]:
import subprocess
import os

def run_encrypt(data):
    """Encrypt data using openssl (example)"""
    env = os.environ.copy()
    env['password'] = 'zf7ShyBhZOraQDdE/FiZpm/m/8f9X+M1'
    
    proc = subprocess.Popen(
        ['openssl', 'enc', '-des3', '-pass', 'env:password'],
        env=env,
        stdin=subprocess.PIPE,
        stdout=subprocess.PIPE
    )
    
    proc.stdin.write(data)
    proc.stdin.flush()  # Ensure the child gets input
    return proc

# Example: Encrypt multiple pieces of data in parallel
procs = []
for _ in range(3):
    data = os.urandom(10)
    proc = run_encrypt(data)
    procs.append(proc)

# Collect results
for proc in procs:
    out, _ = proc.communicate()
    print(f"Encrypted output (last 10 bytes): {out[-10:]}")

### Creating Process Pipelines (UNIX-style)

In [None]:
import subprocess
import os

def run_hash(input_stdin):
    """Generate a Whirlpool hash of the input stream"""
    return subprocess.Popen(
        ['openssl', 'dgst', '-whirlpool', '-binary'],
        stdin=input_stdin,
        stdout=subprocess.PIPE
    )

# Create a pipeline: encrypt -> hash
encrypt_procs = []
hash_procs = []

for _ in range(3):
    data = os.urandom(100)
    
    # Start encryption
    encrypt_proc = run_encrypt(data)
    encrypt_procs.append(encrypt_proc)
    
    # Chain with hashing
    hash_proc = run_hash(encrypt_proc.stdout)
    hash_procs.append(hash_proc)
    
    # Critical: Close stdout to allow SIGPIPE propagation
    encrypt_proc.stdout.close()
    encrypt_proc.stdout = None

# Wait for all processes
for proc in encrypt_procs:
    proc.communicate()
    assert proc.returncode == 0

for proc in hash_procs:
    out, _ = proc.communicate()
    print(f"Hash output (last 10 bytes): {out[-10:]}")
    assert proc.returncode == 0

### Handling Timeouts

In [None]:
import subprocess

# Using timeout to prevent hanging processes
proc = subprocess.Popen(['sleep', '10'])

try:
    proc.communicate(timeout=0.1)
except subprocess.TimeoutExpired:
    proc.terminate()
    proc.wait()

print('Exit status', proc.poll())
print('Process was terminated due to timeout')

### Enhanced Example: Process Manager

In [None]:
import subprocess
import time
from typing import List, Tuple

class ProcessManager:
    """Manages multiple child processes with timeout support"""
    
    def __init__(self):
        self.processes = []
    
    def start_process(self, command: List[str]) -> subprocess.Popen:
        """Start a new process"""
        proc = subprocess.Popen(
            command,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE
        )
        self.processes.append(proc)
        return proc
    
    def wait_all(self, timeout: float = None) -> List[Tuple[int, str, str]]:
        """Wait for all processes to complete"""
        results = []
        
        for proc in self.processes:
            try:
                stdout, stderr = proc.communicate(timeout=timeout)
                results.append((
                    proc.returncode,
                    stdout.decode('utf-8') if stdout else '',
                    stderr.decode('utf-8') if stderr else ''
                ))
            except subprocess.TimeoutExpired:
                proc.kill()
                results.append((-1, '', 'Timeout'))
        
        return results

# Example usage
manager = ProcessManager()
manager.start_process(['echo', 'Process 1'])
manager.start_process(['echo', 'Process 2'])
manager.start_process(['echo', 'Process 3'])

results = manager.wait_all(timeout=5.0)
for i, (returncode, stdout, stderr) in enumerate(results, 1):
    print(f"Process {i}: returncode={returncode}, output={stdout.strip()}")

### Things to Remember

✦ Use the `subprocess` module to run child processes and manage their input and output streams.

✦ Child processes run in parallel with the Python interpreter, enabling you to maximize your usage of CPU cores.

✦ Use the `run` convenience function for simple usage, and the `Popen` class for advanced usage like UNIX-style pipelines.

✦ Use the `timeout` parameter of the `communicate` method to avoid deadlocks and hanging child processes.

---

## Item 53: Use Threads for Blocking I/O, Avoid for Parallelism

### Understanding CPython and the GIL

**CPython execution steps:**
1. Parse and compile source text into bytecode
2. Run bytecode using a stack-based interpreter

**Global Interpreter Lock (GIL):**
- Mutual-exclusion lock (mutex)
- Prevents preemptive multithreading from corrupting interpreter state
- Only one thread makes forward progress at a time
- Ensures bytecode instruction correctness

**Important:** The GIL prevents parallel CPU computation in Python threads!

### Example: CPU-Bound Task (GIL Impact)

In [None]:
import time

def factorize(number):
    """Find all factors of a number"""
    for i in range(1, number + 1):
        if number % i == 0:
            yield i

# Serial execution
numbers = [2139079, 1214759, 1516637, 1852285]
start = time.time()

for number in numbers:
    list(factorize(number))

end = time.time()
delta = end - start
print(f'Took {delta:.3f} seconds (serial)')

### Attempting Multi-threaded Factorization

In [None]:
from threading import Thread
import time

class FactorizeThread(Thread):
    def __init__(self, number):
        super().__init__()
        self.number = number
    
    def run(self):
        self.factors = list(factorize(self.number))

# Multi-threaded execution
start = time.time()
threads = []

for number in numbers:
    thread = FactorizeThread(number)
    thread.start()
    threads.append(thread)

for thread in threads:
    thread.join()

end = time.time()
delta = end - start
print(f'Took {delta:.3f} seconds (multi-threaded)')
print(f'Expected faster, but actually slower due to GIL overhead!')

### Enhanced Example: Demonstrating GIL Impact

In [None]:
import time
from threading import Thread

def cpu_intensive_task(iterations):
    """CPU-intensive calculation"""
    result = 0
    for i in range(iterations):
        result += i ** 2
    return result

def benchmark_threads(num_threads, iterations):
    """Benchmark thread performance"""
    threads = []
    start = time.time()
    
    for _ in range(num_threads):
        thread = Thread(
            target=cpu_intensive_task,
            args=(iterations,)
        )
        thread.start()
        threads.append(thread)
    
    for thread in threads:
        thread.join()
    
    return time.time() - start

# Compare single vs multi-threaded
iterations = 1000000

single_time = benchmark_threads(1, iterations)
multi_time = benchmark_threads(4, iterations)

print(f"Single thread: {single_time:.3f} seconds")
print(f"Four threads: {multi_time:.3f} seconds")
print(f"Speedup: {single_time / multi_time:.2f}x")
print(f"Expected: 4x speedup, Actual: ~1x due to GIL")

### Why Use Threads? Reason 1: Concurrent Execution

In [None]:
from threading import Thread
import time

def task_a():
    print("Task A: Starting")
    time.sleep(2)
    print("Task A: Completed")

def task_b():
    print("Task B: Starting")
    time.sleep(1)
    print("Task B: Completed")

# Without threads (sequential)
print("=== Sequential Execution ===")
start = time.time()
task_a()
task_b()
print(f"Total time: {time.time() - start:.3f} seconds\n")

# With threads (concurrent)
print("=== Concurrent Execution ===")
start = time.time()
thread_a = Thread(target=task_a)
thread_b = Thread(target=task_b)
thread_a.start()
thread_b.start()
thread_a.join()
thread_b.join()
print(f"Total time: {time.time() - start:.3f} seconds")

### Why Use Threads? Reason 2: Blocking I/O

In [None]:
import select
import socket
import time

def slow_systemcall():
    """Simulate a slow system call (e.g., network I/O)"""
    select.select([socket.socket()], [], [], 0.1)

# Serial execution of I/O operations
start = time.time()
for _ in range(5):
    slow_systemcall()
end = time.time()
delta = end - start
print(f'Serial I/O took {delta:.3f} seconds')

### Parallel I/O with Threads

In [None]:
from threading import Thread
import time

# Parallel I/O operations
start = time.time()
threads = []

for _ in range(5):
    thread = Thread(target=slow_systemcall)
    thread.start()
    threads.append(thread)

# Simulate computation while I/O happens
def compute_helicopter_location(index):
    """Simulate some computation"""
    pass

for i in range(5):
    compute_helicopter_location(i)

# Wait for all I/O to complete
for thread in threads:
    thread.join()

end = time.time()
delta = end - start
print(f'Parallel I/O took {delta:.3f} seconds')
print(f'Speedup: ~5x faster than serial execution!')

### Enhanced Example: Concurrent File Download Simulation

In [None]:
from threading import Thread
import time
from typing import List

class FileDownloader(Thread):
    """Simulate downloading a file"""
    
    def __init__(self, file_id: int, size_mb: float):
        super().__init__()
        self.file_id = file_id
        self.size_mb = size_mb
        self.download_time = None
    
    def run(self):
        """Simulate download (I/O operation)"""
        start = time.time()
        # Simulate network I/O
        time.sleep(self.size_mb * 0.1)  # 0.1 sec per MB
        self.download_time = time.time() - start
        print(f"File {self.file_id}: Downloaded {self.size_mb}MB in {self.download_time:.2f}s")

# Download files concurrently
files_to_download = [10, 15, 8, 12, 20]  # File sizes in MB

print("=== Sequential Downloads ===")
start = time.time()
for i, size in enumerate(files_to_download):
    downloader = FileDownloader(i, size)
    downloader.run()  # Run sequentially
sequential_time = time.time() - start
print(f"Total time: {sequential_time:.2f}s\n")

print("=== Concurrent Downloads ===")
start = time.time()
threads: List[FileDownloader] = []
for i, size in enumerate(files_to_download):
    downloader = FileDownloader(i, size)
    downloader.start()
    threads.append(downloader)

for thread in threads:
    thread.join()

concurrent_time = time.time() - start
print(f"\nTotal time: {concurrent_time:.2f}s")
print(f"Speedup: {sequential_time / concurrent_time:.2f}x")

### Things to Remember

✦ Python threads can't run in parallel on multiple CPU cores because of the global interpreter lock (GIL).

✦ Python threads are still useful despite the GIL because they provide an easy way to do multiple things seemingly at the same time.

✦ Use Python threads to make multiple system calls in parallel. This allows you to do blocking I/O at the same time as computation.

---

## Item 54: Use Lock to Prevent Data Races in Threads

### The GIL Misconception

**Important:** The GIL does NOT protect you from data races!

**Why?** Thread operations can be interrupted between any two bytecode instructions.

**Result:** Data structure invariants can be violated, leaving your program in a corrupted state.

### Example: Data Race Problem

In [None]:
class Counter:
    def __init__(self):
        self.count = 0
    
    def increment(self, offset):
        self.count += offset

# Simulate sensor readings
def worker(sensor_index, how_many, counter):
    for _ in range(how_many):
        # Read from the sensor
        counter.increment(1)

# Run with multiple threads
from threading import Thread

how_many = 10**5
counter = Counter()
threads = []

for i in range(5):
    thread = Thread(target=worker, args=(i, how_many, counter))
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

expected = how_many * 5
found = counter.count
print(f'Counter should be {expected}, got {found}')
print(f'Data race occurred! Lost {expected - found} increments')

### Understanding the Data Race

The operation `counter.count += 1` is actually three operations:

```python
value = getattr(counter, 'count')
result = value + 1
setattr(counter, 'count', result)
```

**Race condition example:**
```
Thread A: value_a = getattr(counter, 'count')  # Gets 0
[Context switch to Thread B]
Thread B: value_b = getattr(counter, 'count')  # Gets 0
Thread B: result_b = value_b + 1               # 1
Thread B: setattr(counter, 'count', result_b)  # Sets to 1
[Context switch to Thread A]
Thread A: result_a = value_a + 1               # 1
Thread A: setattr(counter, 'count', result_a)  # Sets to 1 (overwrites B's work!)
```

### Solution: Using Lock

In [None]:
from threading import Lock, Thread

class LockingCounter:
    def __init__(self):
        self.lock = Lock()
        self.count = 0
    
    def increment(self, offset):
        with self.lock:
            self.count += offset

# Run with Lock protection
counter = LockingCounter()
threads = []

for i in range(5):
    thread = Thread(target=worker, args=(i, how_many, counter))
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

expected = how_many * 5
found = counter.count
print(f'Counter should be {expected}, got {found}')
print('Lock solved the problem!')

### Enhanced Example: Bank Account Transfer

In [None]:
from threading import Lock, Thread
import time
import random

class BankAccount:
    """Thread-safe bank account"""
    
    def __init__(self, initial_balance):
        self.lock = Lock()
        self.balance = initial_balance
    
    def deposit(self, amount):
        with self.lock:
            new_balance = self.balance + amount
            time.sleep(0.0001)  # Simulate processing time
            self.balance = new_balance
    
    def withdraw(self, amount):
        with self.lock:
            if self.balance >= amount:
                new_balance = self.balance - amount
                time.sleep(0.0001)  # Simulate processing time
                self.balance = new_balance
                return True
            return False
    
    def get_balance(self):
        with self.lock:
            return self.balance

def random_transactions(account, num_transactions):
    """Perform random deposits and withdrawals"""
    for _ in range(num_transactions):
        if random.choice([True, False]):
            account.deposit(random.randint(1, 100))
        else:
            account.withdraw(random.randint(1, 100))

# Test with multiple threads
account = BankAccount(1000)
print(f"Initial balance: ${account.get_balance()}")

threads = []
for i in range(10):
    thread = Thread(target=random_transactions, args=(account, 100))
    thread.start()
    threads.append(thread)

for thread in threads:
    thread.join()

print(f"Final balance: ${account.get_balance()}")
print("All transactions completed safely!")

### Enhanced Example: Shared Resource Pool

In [None]:
from threading import Lock, Thread
import time
from typing import List

class ResourcePool:
    """Thread-safe resource pool"""
    
    def __init__(self, resources: List[str]):
        self.lock = Lock()
        self.available = list(resources)
        self.in_use = []
    
    def acquire(self) -> str:
        """Acquire a resource from the pool"""
        with self.lock:
            if not self.available:
                return None
            resource = self.available.pop(0)
            self.in_use.append(resource)
            return resource
    
    def release(self, resource: str):
        """Release a resource back to the pool"""
        with self.lock:
            if resource in self.in_use:
                self.in_use.remove(resource)
                self.available.append(resource)
    
    def get_stats(self):
        """Get current pool statistics"""
        with self.lock:
            return {
                'available': len(self.available),
                'in_use': len(self.in_use),
                'total': len(self.available) + len(self.in_use)
            }

def use_resource(pool, worker_id, duration):
    """Worker that uses a resource"""
    resource = pool.acquire()
    if resource:
        print(f"Worker {worker_id}: Using {resource}")
        time.sleep(duration)
        pool.release(resource)
        print(f"Worker {worker_id}: Released {resource}")
    else:
        print(f"Worker {worker_id}: No resources available")

# Test resource pool
pool = ResourcePool(['Resource-A', 'Resource-B', 'Resource-C'])

threads = []
for i in range(5):
    thread = Thread(target=use_resource, args=(pool, i, 0.5))
    thread.start()
    threads.append(thread)

for thread in threads:
    thread.join()

print(f"\nFinal stats: {pool.get_stats()}")

### Things to Remember

✦ Even though Python has a global interpreter lock, you're still responsible for protecting against data races between the threads in your programs.

✦ Your programs will corrupt their data structures if you allow multiple threads to modify the same objects without mutual-exclusion locks (mutexes).

✦ Use the `Lock` class from the `threading` built-in module to enforce your program's invariants between multiple threads.

---

## Item 55: Use Queue to Coordinate Work Between Threads

### Pipeline Concept

**Pipeline:** An assembly line approach with multiple phases in serial
- Each phase has a specific function
- Work moves forward through phases
- Functions can operate concurrently
- Especially good for blocking I/O or subprocesses

**Example scenario:** Image processing pipeline
1. Download images
2. Resize images
3. Upload images

### Naive Implementation: Custom Queue

In [None]:
from collections import deque
from threading import Lock

class MyQueue:
    """Simple thread-safe queue"""
    
    def __init__(self):
        self.items = deque()
        self.lock = Lock()
    
    def put(self, item):
        """Producer adds items to the queue"""
        with self.lock:
            self.items.append(item)
    
    def get(self):
        """Consumer removes items from the queue"""
        with self.lock:
            return self.items.popleft()

# Example pipeline functions
def download(item):
    """Download an image"""
    return item

def resize(item):
    """Resize an image"""
    return item

def upload(item):
    """Upload an image"""
    return item

### Worker Thread with Busy Waiting (Problematic)

In [None]:
from threading import Thread
import time

class Worker(Thread):
    """Worker that polls for work (busy waiting)"""
    
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue
        self.polled_count = 0
        self.work_done = 0
    
    def run(self):
        while True:
            self.polled_count += 1
            try:
                item = self.in_queue.get()
            except IndexError:
                time.sleep(0.01)  # No work to do
            else:
                result = self.func(item)
                self.out_queue.put(result)
                self.work_done += 1

print("This implementation has problems:")
print("1. Busy waiting wastes CPU")
print("2. Hard to determine when all work is done")
print("3. No way to signal workers to exit")
print("4. Can cause memory explosion if producer is faster than consumer")

### Better Solution: Using Queue

In [None]:
from queue import Queue
from threading import Thread

# Queue eliminates busy waiting
my_queue = Queue()

def consumer():
    print('Consumer waiting')
    my_queue.get()  # Blocks until item available
    print('Consumer done')

thread = Thread(target=consumer)
thread.start()

print('Producer putting')
my_queue.put(object())
print('Producer done')
thread.join()

### Queue with Buffer Size

In [None]:
from queue import Queue
from threading import Thread
import time

my_queue = Queue(1)  # Buffer size of 1

def consumer():
    time.sleep(0.1)  # Wait
    my_queue.get()   # Runs second
    print('Consumer got 1')
    my_queue.get()   # Runs fourth
    print('Consumer got 2')
    print('Consumer done')

thread = Thread(target=consumer)
thread.start()

my_queue.put(object())  # Runs first
print('Producer put 1')
my_queue.put(object())  # Runs third (blocks until consumer gets first)
print('Producer put 2')
print('Producer done')
thread.join()

### Using task_done() and join()

In [None]:
from queue import Queue
from threading import Thread

in_queue = Queue()

def consumer():
    print('Consumer waiting')
    work = in_queue.get()  # Runs second
    print('Consumer working')
    # Doing work...
    print('Consumer done')
    in_queue.task_done()   # Runs third

thread = Thread(target=consumer)
thread.start()

print('Producer putting')
in_queue.put(object())  # Runs first
print('Producer waiting')
in_queue.join()         # Runs fourth (blocks until task_done called)
print('Producer done')
thread.join()

### Complete Pipeline with ClosableQueue

In [None]:
from queue import Queue
from threading import Thread

class ClosableQueue(Queue):
    """Queue that can signal end of data"""
    SENTINEL = object()
    
    def close(self):
        self.put(self.SENTINEL)
    
    def __iter__(self):
        while True:
            item = self.get()
            try:
                if item is self.SENTINEL:
                    return  # Cause the thread to exit
                yield item
            finally:
                self.task_done()

class StoppableWorker(Thread):
    """Worker that can be stopped cleanly"""
    
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue
    
    def run(self):
        for item in self.in_queue:
            result = self.func(item)
            self.out_queue.put(result)

print("ClosableQueue benefits:")
print("1. No busy waiting")
print("2. Bounded buffer prevents memory explosion")
print("3. Clean worker shutdown with sentinel value")
print("4. Progress tracking with task_done/join")

### Enhanced Example: Image Processing Pipeline

In [None]:
from queue import Queue
from threading import Thread
import time

class ClosableQueue(Queue):
    SENTINEL = object()
    
    def close(self):
        self.put(self.SENTINEL)
    
    def __iter__(self):
        while True:
            item = self.get()
            try:
                if item is self.SENTINEL:
                    return
                yield item
            finally:
                self.task_done()

class StoppableWorker(Thread):
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue
    
    def run(self):
        for item in self.in_queue:
            result = self.func(item)
            self.out_queue.put(result)

# Pipeline functions with simulated I/O
def download(item):
    time.sleep(0.01)  # Simulate network I/O
    return f"Downloaded-{item}"

def resize(item):
    time.sleep(0.01)  # Simulate CPU work
    return f"Resized-{item}"

def upload(item):
    time.sleep(0.01)  # Simulate network I/O
    return f"Uploaded-{item}"

# Create queues
download_queue = ClosableQueue()
resize_queue = ClosableQueue()
upload_queue = ClosableQueue()
done_queue = ClosableQueue()

# Create worker threads
threads = [
    StoppableWorker(download, download_queue, resize_queue),
    StoppableWorker(resize, resize_queue, upload_queue),
    StoppableWorker(upload, upload_queue, done_queue),
]

# Start all workers
for thread in threads:
    thread.start()

# Add work to pipeline
for i in range(10):
    download_queue.put(f"Image-{i}")

# Close pipeline
download_queue.close()
download_queue.join()
resize_queue.close()
resize_queue.join()
upload_queue.close()
upload_queue.join()

# Collect results
print(f"{done_queue.qsize()} items finished")
print("\nSample results:")
for i in range(min(3, done_queue.qsize())):
    print(f"  {done_queue.get()}")

# Clean up
for thread in threads:
    thread.join()

### Multiple Workers Per Stage

In [None]:
def start_threads(count, *args):
    """Start multiple worker threads"""
    threads = [StoppableWorker(*args) for _ in range(count)]
    for thread in threads:
        thread.start()
    return threads

def stop_threads(closable_queue, threads):
    """Stop all worker threads cleanly"""
    for _ in threads:
        closable_queue.close()
    
    closable_queue.join()
    
    for thread in threads:
        thread.join()

# Create queues
download_queue = ClosableQueue()
resize_queue = ClosableQueue()
upload_queue = ClosableQueue()
done_queue = ClosableQueue()

# Start multiple workers per stage
download_threads = start_threads(3, download, download_queue, resize_queue)
resize_threads = start_threads(4, resize, resize_queue, upload_queue)
upload_threads = start_threads(5, upload, upload_queue, done_queue)

# Add work
for i in range(100):
    download_queue.put(f"Image-{i}")

# Stop all stages
stop_threads(download_queue, download_threads)
stop_threads(resize_queue, resize_threads)
stop_threads(upload_queue, upload_threads)

print(f"{done_queue.qsize()} items finished")
print("\nParallel processing achieved with multiple workers per stage!")

### Things to Remember

✦ Pipelines are a great way to organize sequences of work—especially I/O-bound programs—that run concurrently using multiple Python threads.

✦ Be aware of the many problems in building concurrent pipelines: busy waiting, how to tell workers to stop, and potential memory explosion.

✦ The `Queue` class has all the facilities you need to build robust pipelines: blocking operations, buffer sizes, and joining.

---

## Item 56: Know How to Recognize When Concurrency Is Necessary

### The Challenge

As programs grow in scope and complexity:
- Requirements expand
- Single-threaded approach becomes limiting
- Need for concurrent execution emerges

**Most difficult change:** Moving from single-threaded to concurrent execution

### Example: Conway's Game of Life

**Rules:**
- 2D grid of arbitrary size
- Each cell: alive (`*`) or empty (`-`)
- Each tick: cells count neighbors and decide state
- State decisions:
  - Die if < 2 neighbors (too few)
  - Die if > 3 neighbors (too many)
  - Regenerate if exactly 3 neighbors (empty cell)

### Basic Grid Implementation

In [None]:
ALIVE = '*'
EMPTY = '-'

class Grid:
    """2D grid for Game of Life"""
    
    def __init__(self, height, width):
        self.height = height
        self.width = width
        self.rows = []
        for _ in range(self.height):
            self.rows.append([EMPTY] * self.width)
    
    def get(self, y, x):
        """Get cell state (wraps around edges)"""
        return self.rows[y % self.height][x % self.width]
    
    def set(self, y, x, state):
        """Set cell state"""
        self.rows[y % self.height][x % self.width] = state
    
    def __str__(self):
        """String representation"""
        result = []
        for row in self.rows:
            result.append(''.join(row))
        return '\n'.join(result)

# Create grid and set up a glider pattern
grid = Grid(5, 9)
grid.set(0, 3, ALIVE)
grid.set(1, 4, ALIVE)
grid.set(2, 2, ALIVE)
grid.set(2, 3, ALIVE)
grid.set(2, 4, ALIVE)

print("Initial grid state:")
print(grid)

### Game Logic Functions

In [None]:
def count_neighbors(y, x, get):
    """Count living neighbors around a cell"""
    n_ = get(y - 1, x + 0)  # North
    ne = get(y - 1, x + 1)  # Northeast
    e_ = get(y + 0, x + 1)  # East
    se = get(y + 1, x + 1)  # Southeast
    s_ = get(y + 1, x + 0)  # South
    sw = get(y + 1, x - 1)  # Southwest
    w_ = get(y + 0, x - 1)  # West
    nw = get(y - 1, x - 1)  # Northwest
    
    neighbor_states = [n_, ne, e_, se, s_, sw, w_, nw]
    count = 0
    for state in neighbor_states:
        if state == ALIVE:
            count += 1
    return count

def game_logic(state, neighbors):
    """Determine next state based on current state and neighbors"""
    if state == ALIVE:
        if neighbors < 2:
            return EMPTY  # Die: Too few
        elif neighbors > 3:
            return EMPTY  # Die: Too many
    else:
        if neighbors == 3:
            return ALIVE  # Regenerate
    return state

def step_cell(y, x, get, set):
    """Update a single cell"""
    state = get(y, x)
    neighbors = count_neighbors(y, x, get)
    next_state = game_logic(state, neighbors)
    set(y, x, next_state)

# Test neighbor counting
neighbors = count_neighbors(2, 3, grid.get)
print(f"\nCell at (2,3) has {neighbors} neighbors")

### Single-Threaded Simulation

In [None]:
def simulate(grid):
    """Progress grid forward by one generation"""
    next_grid = Grid(grid.height, grid.width)
    for y in range(grid.height):
        for x in range(grid.width):
            step_cell(y, x, grid.get, next_grid.set)
    return next_grid

# Run simulation for 5 generations
print("\nSimulation over 5 generations:")
print("=" * 50)
for generation in range(5):
    print(f"\nGeneration {generation}:")
    print(grid)
    grid = simulate(grid)

print("\nNotice the glider pattern moving down and right!")

### The Problem: Adding I/O

**Scenario:** Need to add I/O to `game_logic` (e.g., multiplayer online game)

**Challenge with blocking I/O:**
- If I/O latency = 100ms per cell
- Grid with 45 cells = 4.5 seconds per generation
- Grid with 10,000 cells = 15+ minutes per generation

**Solution needed:** Parallel I/O

### Concurrency Concepts

**Fan-out:** Spawning concurrent lines of execution for each unit of work

**Fan-in:** Waiting for all concurrent units to finish before moving to next phase

### Enhanced Example: Performance Comparison

In [None]:
import time

def game_logic_with_io(state, neighbors):
    """Game logic with simulated I/O"""
    # Simulate blocking I/O
    time.sleep(0.001)  # 1ms per cell
    
    if state == ALIVE:
        if neighbors < 2:
            return EMPTY
        elif neighbors > 3:
            return EMPTY
    else:
        if neighbors == 3:
            return ALIVE
    return state

def simulate_with_io(grid):
    """Simulation with I/O delays"""
    next_grid = Grid(grid.height, grid.width)
    for y in range(grid.height):
        for x in range(grid.width):
            state = grid.get(y, x)
            neighbors = count_neighbors(y, x, grid.get)
            next_state = game_logic_with_io(state, neighbors)
            next_grid.set(y, x, next_state)
    return next_grid

# Create a small grid for demo
small_grid = Grid(3, 3)
small_grid.set(0, 1, ALIVE)
small_grid.set(1, 1, ALIVE)
small_grid.set(2, 1, ALIVE)

print("Performance with blocking I/O:")
print(f"Grid size: {small_grid.height}x{small_grid.width} = {small_grid.height * small_grid.width} cells")

start = time.time()
small_grid = simulate_with_io(small_grid)
duration = time.time() - start

print(f"Time per generation: {duration:.3f} seconds")
print(f"Time per cell: {duration / (small_grid.height * small_grid.width):.4f} seconds")
print("\nWith a 100x100 grid (10,000 cells) and 100ms I/O:")
print(f"Serial execution would take: {10000 * 0.1:.0f} seconds = {10000 * 0.1 / 60:.1f} minutes!")

### Things to Remember

✦ A program often grows to require multiple concurrent lines of execution as its scope and complexity increases.

✦ The most common types of concurrency coordination are fan-out (generating new units of concurrency) and fan-in (waiting for existing units of concurrency to complete).

✦ Python has many different ways of achieving fan-out and fan-in.

---

## Item 57: Avoid Creating New Thread Instances for On-demand Fan-out

### Problems with Using Thread for Fan-out

**Three major downsides:**

1. **Complexity:** Threads require special coordination tools (Locks)
2. **Memory:** ~8 MB per executing thread
3. **Performance:** High startup cost and context switching overhead

### Thread-based Solution Attempt

In [None]:
from threading import Lock, Thread

class LockingGrid(Grid):
    """Thread-safe Grid with locking"""
    
    def __init__(self, height, width):
        super().__init__(height, width)
        self.lock = Lock()
    
    def __str__(self):
        with self.lock:
            return super().__str__()
    
    def get(self, y, x):
        with self.lock:
            return super().get(y, x)
    
    def set(self, y, x, state):
        with self.lock:
            return super().set(y, x, state)

print("LockingGrid adds thread safety but:")
print("1. Increased complexity (Lock overhead)")
print("2. Potential for lock contention")
print("3. Still need to create many threads")

### Threaded Simulation with Fan-out

In [None]:
from threading import Thread
import time

def simulate_threaded(grid):
    """Simulate with one thread per cell (fan-out)"""
    next_grid = LockingGrid(grid.height, grid.width)
    
    threads = []
    for y in range(grid.height):
        for x in range(grid.width):
            args = (y, x, grid.get, next_grid.set)
            thread = Thread(target=step_cell, args=args)
            thread.start()  # Fan out
            threads.append(thread)
    
    for thread in threads:
        thread.join()  # Fan in
    
    return next_grid

# Test with small grid
grid = LockingGrid(3, 3)
grid.set(0, 1, ALIVE)
grid.set(1, 1, ALIVE)
grid.set(2, 1, ALIVE)

print("Testing threaded simulation:")
start = time.time()
grid = simulate_threaded(grid)
duration = time.time() - start
print(f"Time: {duration:.4f} seconds")
print(f"\nFor 10,000 cells: {10000 * 8} MB = {10000 * 8 / 1024:.1f} GB of memory!")
print("This doesn't scale!")

### Problem: Exception Handling in Threads

In [None]:
import contextlib
import io
from threading import Thread

def game_logic_with_error(state, neighbors):
    """Game logic that raises an exception"""
    raise OSError('Problem with I/O')

# Capture stderr to see what happens
fake_stderr = io.StringIO()
with contextlib.redirect_stderr(fake_stderr):
    thread = Thread(
        target=game_logic_with_error,
        args=(ALIVE, 3)
    )
    thread.start()
    thread.join()

print("Thread exception output:")
print(fake_stderr.getvalue()[:200], "...")
print("\nProblem: Exception is caught by Thread, not propagated!")
print("This makes debugging very difficult.")

### Enhanced Example: Thread Resource Usage

In [None]:
import threading
import time

def measure_thread_overhead():
    """Demonstrate thread creation overhead"""
    
    def dummy_work():
        time.sleep(0.001)
    
    # Measure thread creation time
    num_threads = 100
    
    start = time.time()
    threads = []
    for _ in range(num_threads):
        thread = threading.Thread(target=dummy_work)
        thread.start()
        threads.append(thread)
    
    for thread in threads:
        thread.join()
    
    duration = time.time() - start
    
    print(f"Created and joined {num_threads} threads")
    print(f"Total time: {duration:.3f} seconds")
    print(f"Overhead per thread: {duration / num_threads * 1000:.2f} ms")
    print(f"\nFor 10,000 cells being updated 60 times/second:")
    print(f"Thread creation overhead: {10000 * 60 * (duration / num_threads):.1f} seconds/second")
    print("This is clearly unsustainable!")

measure_thread_overhead()

### Things to Remember

✦ Threads have many downsides: They're costly to start and run if you need a lot of them, they each require a significant amount of memory, and they require special tools like `Lock` instances for coordination.

✦ Threads do not provide a built-in way to raise exceptions back in the code that started a thread or that is waiting for one to finish, which makes them difficult to debug.

---

## Item 58: Understand How Using Queue for Concurrency Requires Refactoring

### Queue-based Pipeline Approach

**Benefits over Thread per task:**
- Fixed number of worker threads
- Controlled resource usage
- No repeated thread startup costs

**Drawbacks:**
- Significant refactoring required
- More complex code
- Manual exception handling
- Fixed parallelism level

### Queue-based Implementation

In [None]:
from queue import Queue
from threading import Thread

class ClosableQueue(Queue):
    SENTINEL = object()
    
    def close(self):
        self.put(self.SENTINEL)
    
    def __iter__(self):
        while True:
            item = self.get()
            try:
                if item is self.SENTINEL:
                    return
                yield item
            finally:
                self.task_done()

class StoppableWorker(Thread):
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue
    
    def run(self):
        for item in self.in_queue:
            result = self.func(item)
            self.out_queue.put(result)

print("Queue-based approach requires:")
print("1. ClosableQueue implementation")
print("2. StoppableWorker implementation")
print("3. Manual exception handling")
print("4. Complex shutdown coordination")

### Game Logic Thread Wrapper

In [None]:
def game_logic_thread(item):
    """Wrapper for game logic with exception handling"""
    y, x, state, neighbors = item
    try:
        next_state = game_logic(state, neighbors)
    except Exception as e:
        next_state = e
    return (y, x, next_state)

# Create queues and workers
in_queue = ClosableQueue()
out_queue = ClosableQueue()

threads = []
for _ in range(5):  # Fixed number of workers
    thread = StoppableWorker(
        game_logic_thread,
        in_queue,
        out_queue
    )
    thread.start()
    threads.append(thread)

print(f"Started {len(threads)} worker threads")
print("Workers wait for items on in_queue")
print("Results go to out_queue")

### Pipeline Simulation Function

In [None]:
class SimulationError(Exception):
    pass

def simulate_pipeline(grid, in_queue, out_queue):
    """Simulate using queue-based pipeline"""
    # Fan out: Add all work to queue
    for y in range(grid.height):
        for x in range(grid.width):
            state = grid.get(y, x)
            neighbors = count_neighbors(y, x, grid.get)
            in_queue.put((y, x, state, neighbors))
    
    # Wait for processing
    in_queue.join()
    out_queue.close()
    
    # Fan in: Collect all results
    next_grid = Grid(grid.height, grid.width)
    for item in out_queue:
        y, x, next_state = item
        if isinstance(next_state, Exception):
            raise SimulationError(y, x) from next_state
        next_grid.set(y, x, next_state)
    
    return next_grid

print("Pipeline simulation:")
print("1. Adds all work to in_queue (fan-out)")
print("2. Workers process items in parallel")
print("3. Collects results from out_queue (fan-in)")
print("4. Propagates exceptions properly")

### Running the Pipeline

In [None]:
# Create initial grid
grid = Grid(5, 9)
grid.set(0, 3, ALIVE)
grid.set(1, 4, ALIVE)
grid.set(2, 2, ALIVE)
grid.set(2, 3, ALIVE)
grid.set(2, 4, ALIVE)

print("Initial grid:")
print(grid)

# Run simulation
print("\nRunning 3 generations with pipeline:")
for generation in range(3):
    grid = simulate_pipeline(grid, in_queue, out_queue)
    print(f"\nGeneration {generation + 1}:")
    print(grid)

# Cleanup
for thread in threads:
    in_queue.close()
for thread in threads:
    thread.join()

print("\nPipeline completed successfully!")

### Enhanced Example: Multi-stage Pipeline

In [None]:
# Multi-stage pipeline for count_neighbors + game_logic

def count_neighbors_thread(item):
    """Thread wrapper for count_neighbors"""
    y, x, state, get = item
    try:
        neighbors = count_neighbors(y, x, get)
    except Exception as e:
        neighbors = e
    return (y, x, state, neighbors)

def game_logic_thread_v2(item):
    """Thread wrapper for game_logic (v2)"""
    y, x, state, neighbors = item
    if isinstance(neighbors, Exception):
        next_state = neighbors
    else:
        try:
            next_state = game_logic(state, neighbors)
        except Exception as e:
            next_state = e
    return (y, x, next_state)

# Create multi-stage pipeline
in_queue = ClosableQueue()
logic_queue = ClosableQueue()
out_queue = ClosableQueue()

threads = []

# Stage 1: count_neighbors workers
for _ in range(5):
    thread = StoppableWorker(
        count_neighbors_thread,
        in_queue,
        logic_queue
    )
    thread.start()
    threads.append(thread)

# Stage 2: game_logic workers
for _ in range(5):
    thread = StoppableWorker(
        game_logic_thread_v2,
        logic_queue,
        out_queue
    )
    thread.start()
    threads.append(thread)

print("Multi-stage pipeline created:")
print("Stage 1: 5 workers for count_neighbors")
print("Stage 2: 5 workers for game_logic")
print("\nThis allows I/O in both stages to be parallelized!")

### Complexity Comparison

In [None]:
print("Code Complexity Comparison:")
print("="*50)
print("\nOriginal (Single-threaded):")
print("  - Grid class")
print("  - game_logic function")
print("  - simulate function")
print("  Total: ~50 lines")

print("\nWith Thread per cell:")
print("  - LockingGrid class (with Lock)")
print("  - game_logic function")
print("  - simulate_threaded function")
print("  - Exception handling")
print("  Total: ~80 lines")

print("\nWith Queue pipeline:")
print("  - Grid class")
print("  - ClosableQueue class")
print("  - StoppableWorker class")
print("  - game_logic_thread wrapper")
print("  - simulate_pipeline function")
print("  - Worker setup/teardown")
print("  - Exception propagation logic")
print("  Total: ~150 lines")

print("\nWith Multi-stage Queue pipeline:")
print("  - All of the above, plus:")
print("  - LockingGrid (for thread safety)")
print("  - count_neighbors_thread wrapper")
print("  - Additional queue coordination")
print("  - Stage sequencing logic")
print("  Total: ~200+ lines")

print("\n" + "="*50)
print("Queue approach trades simplicity for:")
print("  ✓ Better resource control")
print("  ✓ No thread creation overhead")
print("  ✓ Fixed memory usage")
print("  ✗ Significant complexity increase")
print("  ✗ Manual exception handling")
print("  ✗ Rigid parallelism (fixed worker count)")

### Things to Remember

✦ Using `Queue` instances with a fixed number of worker threads improves the scalability of fan-out and fan-in using threads.

✦ It takes a significant amount of work to refactor existing code to use `Queue`, especially when multiple stages of a pipeline are required.

✦ Using `Queue` fundamentally limits the total amount of I/O parallelism a program can leverage compared to alternative approaches provided by other built-in Python features and modules.

---

## Summary

### Key Takeaways from Items 52-58

**Item 52 - subprocess:**
- Use `subprocess` module for child processes
- `run()` for simple cases, `Popen` for advanced usage
- Enables parallel CPU utilization

**Item 53 - Threads and GIL:**
- GIL prevents parallel CPU execution
- Threads excellent for blocking I/O
- Not suitable for CPU-bound parallelism

**Item 54 - Lock:**
- GIL doesn't protect from data races
- Use `Lock` for thread synchronization
- Critical for shared mutable state

**Item 55 - Queue:**
- Coordinate work between threads
- Eliminates busy waiting
- Provides buffer management and joining

**Item 56 - Recognizing Concurrency:**
- Fan-out: spawn concurrent execution
- Fan-in: wait for completion
- Critical for I/O-heavy applications

**Item 57 - Avoid Thread Per Task:**
- High memory cost (~8MB/thread)
- Startup overhead
- Poor exception propagation

**Item 58 - Queue Refactoring:**
- Fixed worker pool better than thread-per-task
- Requires significant refactoring
- Trade-off: complexity vs scalability

---

## Next Steps

In Part 2, we'll cover:
- Item 59: ThreadPoolExecutor
- Item 60: Coroutines for I/O
- Item 61+: Advanced concurrency patterns