# Module 6: Advanced Python Concepts

### The Scenario

Your application needs to:
- Process millions of records using all CPU cores
- Download 1000 files from an API without waiting sequentially
- Ensure database connections are always properly closed, even when errors occur

### The Goal

By the end of this module, you will:
- Use **multiprocessing** for CPU-bound parallelism
- Use **threading** for I/O-bound concurrency
- Master **asyncio** for high-concurrency applications
- Write **context managers** for safe resource handling

---

## Lesson 1: Multiprocessing (True Parallelism)

### The Problem

You need to process 10 million records. Your 8-core CPU sits at 12% usage because Python's GIL prevents true parallelism with threads.

### The "Aha!" Moment

Each **process** has its own Python interpreter and GIL. Multiprocessing bypasses the GIL entirely!

### When to Use

| Use Multiprocessing | Use Threading |
|---------------------|---------------|
| CPU-bound (math, processing) | I/O-bound (network, disk) |
| Need true parallelism | Need shared memory |
| Independent tasks | Lightweight concurrency |

In [1]:
import multiprocessing as mp
import time
import os

def cpu_intensive(n):
    """Simulate CPU-heavy work."""
    total = 0
    for i in range(n):
        total += i ** 2
    return total

# Show available cores
print(f"CPU cores available: {mp.cpu_count()}")

CPU cores available: 10


In [2]:
# Sequential processing
COUNT = 5_000_000

start = time.perf_counter()
results = [cpu_intensive(COUNT) for _ in range(4)]
seq_time = time.perf_counter() - start
print(f"Sequential: {seq_time:.2f}s")

Sequential: 0.63s


In [4]:
# Parallel with Pool
# Note: This may not show speedup in Jupyter due to how notebooks handle processes
# Run as a script for accurate timing
mp.set_start_method('fork', force=True)
if __name__ == "__main__":
    start = time.perf_counter()
    
    with mp.Pool(processes=4) as pool:
        results = pool.map(cpu_intensive, [COUNT] * 4)
    
    parallel_time = time.perf_counter() - start
    print(f"Parallel (4 processes): {parallel_time:.2f}s")
    print(f"Speedup: {seq_time / parallel_time:.2f}x")

Parallel (4 processes): 0.35s
Speedup: 1.80x


### Multiprocessing Patterns

| Pattern | Use Case | Example |
|---------|----------|--------|
| `Pool.map()` | Apply function to list | `pool.map(fn, items)` |
| `Pool.starmap()` | Multiple arguments | `pool.starmap(fn, [(a,b), (c,d)])` |
| `Process` | Manual control | `p = Process(target=fn)` |
| `Queue` | Share data between processes | `q.put(data)` / `q.get()` |

In [5]:
# Process with Queue for communication
def worker(queue, data):
    """Worker that puts result in queue."""
    result = sum(data)
    queue.put(result)

if __name__ == "__main__":
    queue = mp.Queue()
    data_chunks = [[1,2,3], [4,5,6], [7,8,9]]
    
    processes = []
    for chunk in data_chunks:
        p = mp.Process(target=worker, args=(queue, chunk))
        processes.append(p)
        p.start()
    
    # Collect results
    results = [queue.get() for _ in processes]
    
    for p in processes:
        p.join()
    
    print(f"Results from workers: {results}")
    print(f"Total: {sum(results)}")

Results from workers: [6, 15, 24]
Total: 45


---

## Lesson 2: Threading (I/O Concurrency)

### The Problem

You need to download 100 files from an API. Each request takes 1 second. Sequential = 100 seconds. Can we do better?

### The "Aha!" Moment

The GIL is released during I/O operations! While one thread waits for a network response, others can run.

### Threading vs Multiprocessing

| Aspect | Threading | Multiprocessing |
|--------|-----------|----------------|
| Memory | Shared | Isolated |
| Overhead | Low | High |
| GIL | Blocks CPU work | Bypassed |
| Best for | I/O-bound | CPU-bound |

In [None]:
import threading
import time

def download_file(file_id):
    """Simulate downloading a file (I/O-bound)."""
    time.sleep(0.5)  # Simulate network latency
    return f"file_{file_id}.txt"

# Sequential downloads
start = time.perf_counter()
results = [download_file(i) for i in range(6)]
seq_time = time.perf_counter() - start
print(f"Sequential: {seq_time:.2f}s")

In [None]:
# Threaded downloads
from concurrent.futures import ThreadPoolExecutor

start = time.perf_counter()

with ThreadPoolExecutor(max_workers=6) as executor:
    results = list(executor.map(download_file, range(6)))

thread_time = time.perf_counter() - start
print(f"Threaded: {thread_time:.2f}s")
print(f"Speedup: {seq_time / thread_time:.2f}x")
print(f"Results: {results}")

In [None]:
# Thread safety with Lock
counter = 0
lock = threading.Lock()

def increment_unsafe():
    global counter
    for _ in range(100000):
        counter += 1  # Race condition!

def increment_safe():
    global counter
    for _ in range(100000):
        with lock:
            counter += 1

# Demonstrate race condition
counter = 0
threads = [threading.Thread(target=increment_unsafe) for _ in range(2)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Unsafe counter (expected 200000): {counter}")

# With lock
counter = 0
threads = [threading.Thread(target=increment_safe) for _ in range(2)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Safe counter (expected 200000): {counter}")

### ThreadPoolExecutor Methods

| Method | Use Case | Returns |
|--------|----------|--------|
| `map(fn, items)` | Apply to iterable | Iterator of results |
| `submit(fn, *args)` | Single task | Future object |
| `future.result()` | Get result | Blocks until done |

---

## Lesson 3: AsyncIO (Event Loop Concurrency)

### The Problem

Threading works for I/O, but 1000 threads is too many. Each thread uses ~8MB of stack space.

### The "Aha!" Moment

AsyncIO uses **cooperative multitasking** in a single thread. Coroutines voluntarily yield control at `await` points, allowing thousands of concurrent tasks with minimal overhead.

### Key Concepts

| Concept | Description |
|---------|-------------|
| `async def` | Define a coroutine function |
| `await` | Pause and yield control |
| Event Loop | Scheduler that runs coroutines |
| Task | Scheduled coroutine |

In [None]:
import asyncio

async def fetch_data(item_id):
    """Simulate async I/O operation."""
    print(f"Starting fetch {item_id}")
    await asyncio.sleep(1)  # Non-blocking sleep
    print(f"Completed fetch {item_id}")
    return f"data_{item_id}"

# Run single coroutine
result = await fetch_data(1)
print(f"Result: {result}")

In [None]:
# Run multiple coroutines concurrently
async def fetch_all():
    tasks = [fetch_data(i) for i in range(5)]
    results = await asyncio.gather(*tasks)
    return results

start = time.perf_counter()
results = await fetch_all()
elapsed = time.perf_counter() - start

print(f"\nAll results: {results}")
print(f"Total time: {elapsed:.2f}s (not 5s!)")

In [None]:
# Async context manager for resources
class AsyncDatabase:
    async def __aenter__(self):
        print("Connecting to database...")
        await asyncio.sleep(0.1)  # Simulate connection
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("Closing database connection...")
        await asyncio.sleep(0.1)
    
    async def query(self, sql):
        await asyncio.sleep(0.1)
        return f"Result for: {sql}"

async def main():
    async with AsyncDatabase() as db:
        result = await db.query("SELECT * FROM users")
        print(result)

await main()

### AsyncIO Patterns

| Pattern | Use Case | Example |
|---------|----------|--------|
| `gather(*coros)` | Run concurrently, wait for all | `await gather(a(), b())` |
| `create_task(coro)` | Schedule without waiting | `task = create_task(fn())` |
| `wait_for(coro, timeout)` | With timeout | `await wait_for(fn(), 5.0)` |
| `as_completed(coros)` | Process as they finish | `for coro in as_completed(...)` |

In [None]:
# Process results as they complete
async def fetch_with_delay(item_id, delay):
    await asyncio.sleep(delay)
    return f"item_{item_id}"

async def process_as_completed():
    tasks = [
        fetch_with_delay(1, 0.3),
        fetch_with_delay(2, 0.1),
        fetch_with_delay(3, 0.2),
    ]
    
    for coro in asyncio.as_completed(tasks):
        result = await coro
        print(f"Got: {result}")

await process_as_completed()

---

## Lesson 4: Context Managers

### The Problem

You open a file, process it, but an exception occurs before you close it. The file handle leaks.

### The "Aha!" Moment

Context managers guarantee cleanup code runs, even if exceptions occur. The `with` statement handles `__enter__` and `__exit__` automatically.

### Common Built-in Context Managers

| Context Manager | Purpose |
|-----------------|--------|
| `open(file)` | File handling |
| `threading.Lock()` | Thread synchronization |
| `contextlib.suppress(exc)` | Ignore specific exceptions |
| `decimal.localcontext()` | Temporary decimal settings |

In [None]:
# Without context manager (dangerous!)
# f = open('file.txt')
# data = f.read()  # If this fails, file never closes!
# f.close()

# With context manager (safe!)
# with open('file.txt') as f:
#     data = f.read()  # File closes even if exception occurs

In [None]:
# Creating a context manager with class
class Timer:
    """Context manager to time code blocks."""
    
    def __enter__(self):
        self.start = time.perf_counter()
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.elapsed = time.perf_counter() - self.start
        print(f"Elapsed: {self.elapsed:.4f}s")
        return False  # Don't suppress exceptions

# Usage
with Timer():
    time.sleep(0.5)
    print("Doing work...")

In [None]:
# Creating a context manager with @contextmanager decorator
from contextlib import contextmanager

@contextmanager
def timer():
    """Simpler way to create context managers."""
    start = time.perf_counter()
    try:
        yield  # Code inside 'with' block runs here
    finally:
        elapsed = time.perf_counter() - start
        print(f"Elapsed: {elapsed:.4f}s")

with timer():
    time.sleep(0.3)
    print("Working...")

In [None]:
# Practical example: Database transaction
@contextmanager
def transaction(connection):
    """Ensure transaction commits or rolls back."""
    try:
        yield connection
        connection.commit()
        print("Transaction committed")
    except Exception as e:
        connection.rollback()
        print(f"Transaction rolled back: {e}")
        raise

# Mock connection for demo
class MockConnection:
    def commit(self): pass
    def rollback(self): pass
    def execute(self, sql): print(f"Executing: {sql}")

conn = MockConnection()
with transaction(conn) as c:
    c.execute("INSERT INTO users VALUES (1, 'Alice')")
    c.execute("UPDATE accounts SET balance = 100")

In [None]:
# Useful contextlib utilities
from contextlib import suppress, redirect_stdout
from io import StringIO

# suppress - ignore specific exceptions
with suppress(FileNotFoundError):
    # This won't raise an error
    open('nonexistent_file.txt')
print("Continued after suppressed error")

# redirect_stdout - capture print output
buffer = StringIO()
with redirect_stdout(buffer):
    print("This is captured")
    print("So is this")

print(f"Captured: {buffer.getvalue()!r}")

### Context Manager Methods

| Method | When Called | Purpose |
|--------|-------------|--------|
| `__enter__` | At `with` start | Setup, return resource |
| `__exit__` | At `with` end | Cleanup, handle exceptions |
| Return `True` from `__exit__` | | Suppress the exception |
| Return `False` from `__exit__` | | Re-raise the exception |

---

## Summary

### Choosing the Right Concurrency Model

| Situation | Solution | Why |
|-----------|----------|-----|
| CPU-bound, independent tasks | `multiprocessing` | Bypasses GIL |
| I/O-bound, few tasks | `threading` | Simple, shared memory |
| I/O-bound, many tasks | `asyncio` | Lightweight, scalable |
| Mixed workload | Combine them | Use the right tool |

### Quick Reference

| Module | Key Classes/Functions |
|--------|----------------------|
| `multiprocessing` | `Pool`, `Process`, `Queue` |
| `threading` | `Thread`, `Lock`, `Event` |
| `concurrent.futures` | `ThreadPoolExecutor`, `ProcessPoolExecutor` |
| `asyncio` | `gather`, `create_task`, `sleep` |
| `contextlib` | `contextmanager`, `suppress`, `redirect_stdout` |

### Context Manager Patterns

| Pattern | Use Case |
|---------|----------|
| Class-based | Complex setup/teardown |
| `@contextmanager` | Simple cases |
| `async with` | Async resources |

---

**Next Module:** Modern Tooling and Packaging - pip, conda, uv, virtual environments