# Parallel Work: Threads and Processes

When to use threads vs processes, how to handle shared state, and practical examples using `concurrent.futures`.

In [3]:
import os
import time
import math
import threading
from queue import Queue
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

## Choosing Threads vs Processes
- Python threads share memory and are great for I/O-bound work (waiting on disk/network). The GIL prevents multiple threads from running Python bytecode simultaneously.
- Processes sidestep the GIL for CPU-bound work; they do not share memory, so use picklable arguments/results.
- Rule of thumb: I/O → threads; CPU → processes.

In [4]:
print(f"Logical CPUs: {os.cpu_count()}")

Logical CPUs: 20


## I/O-Bound Work with Threads
Use `ThreadPoolExecutor` when tasks mostly wait on I/O (network, disk, sleep).

In [9]:
def fake_download(idx: int) -> str:
    time.sleep(10)  # simulate waiting on network 10s
    return f"item-{idx}"

start = time.perf_counter()
with ThreadPoolExecutor(max_workers=5) as pool:
    for result in pool.map(fake_download, range(10)):
        print("received", result)
elapsed = time.perf_counter() - start
print(f"Threaded I/O finished in {elapsed:.2f}s")

received item-0
received item-1
received item-2
received item-3
received item-4
received item-5
received item-6
received item-7
received item-8
received item-9
Threaded I/O finished in 60.00s


In [7]:
def fake_download(idx: int) -> str:
    t_end = time.time() + 10 # simulate 10s download
    while time.time() < t_end:
        a = math.sqrt(12345.6789)  # simulate CPU work
    return f"item-{idx}"

start = time.perf_counter()
with ThreadPoolExecutor(max_workers=5) as pool:
    for result in pool.map(fake_download, range(10)):
        print("received", result)
elapsed = time.perf_counter() - start
print(f"Threaded I/O finished in {elapsed:.2f}s")

received item-0
received item-1
received item-2
received item-3
received item-4
received item-5
received item-6
received item-7
received item-8
received item-9
Threaded I/O finished in 60.51s


## CPU-Bound Work with Processes
`ProcessPoolExecutor` bypasses the GIL. Use picklable callables/arguments. Guard with `if __name__ == "__main__"` on Windows.  
Tricky in jupyter. Better to call it from a .py file

In [47]:
import multiprocessing
import importlib
import prime_pool
from prime_pool import run_prime_pool

if __name__ == "__main__":
    # Guard prevents child processes from re-running the cell in Jupyter
    # Reload the module to pick up any changes
    importlib.reload(prime_pool)
    run_prime_pool(workers=10, n_inputs=60)

Primes below 750,000: 60238
Primes below 755,000: 60618
Primes below 760,000: 60978
Primes below 765,000: 61354
Primes below 770,000: 61733
Primes below 775,000: 62103
Primes below 780,000: 62468
Primes below 785,000: 62838
Primes below 790,000: 63206
Primes below 795,000: 63581
Primes below 800,000: 63951
Primes below 805,000: 64319
Primes below 810,000: 64683
Primes below 815,000: 65052
Primes below 820,000: 65416
Primes below 825,000: 65796
Primes below 830,000: 66161
Primes below 835,000: 66533
Primes below 840,000: 66890
Primes below 845,000: 67258
Primes below 850,000: 67617
Primes below 855,000: 67975
Primes below 860,000: 68342
Primes below 865,000: 68716
Primes below 870,000: 69095
Primes below 875,000: 69448
Primes below 880,000: 69823
Primes below 885,000: 70186
Primes below 890,000: 70555
Primes below 895,000: 70918
Primes below 900,000: 71274
Primes below 905,000: 71635
Primes below 910,000: 72026
Primes below 915,000: 72367
Primes below 920,000: 72734
Primes below 925,000

## Shared State and Locks (Threads)
Threads share memory. Use locks to protect shared state; prefer message passing (queues) when possible.

In [35]:
counter = 0
lock = threading.Lock()

def add_many(n: int):
    global counter
    for _ in range(n):
        with lock:
            counter += 1
        # counter += 1  # Unsafe increment

threads = [threading.Thread(target=add_many, args=(100_000,)) for _ in range(5)]
start = time.perf_counter()
for t in threads:
    t.start()
for t in threads:
    t.join()
elapsed = time.perf_counter() - start
print(f"Counter = {counter} (expected 500000)")
print(f"Locked increments completed in {elapsed:.2f}s")

Counter = 500000 (expected 500000)
Locked increments completed in 0.08s


## Producer/Consumer with Queue
Use a `Queue` to safely hand off work between threads without manual locks.

In [None]:
import random
q = Queue()
results = []

# Producer
for i in range(20):
    q.put(f"task-{i}")

# Consumers
def worker(name):
    while True: # Consume tasks until queue is empty
        try:
            item = q.get_nowait() # Get task without blocking
        except Exception:
            break # Exit if queue is empty
        
        time.sleep(0.1 + random.random() * 0.01)  # simulate variable work
        results.append((name, item))
        q.task_done()

threads = [threading.Thread(target=worker, args=(f"worker-{i}",)) for i in range(3)]
for t in threads:
    t.start()
for t in threads:
    t.join()

print("Results (unordered):")
for entry in results:
    print(entry)

Results (unordered):
('worker-0', 'task-0')
('worker-2', 'task-2')
('worker-1', 'task-1')
('worker-0', 'task-3')
('worker-2', 'task-4')
('worker-1', 'task-5')
('worker-2', 'task-7')
('worker-0', 'task-6')
('worker-1', 'task-8')
('worker-2', 'task-9')
('worker-0', 'task-10')
('worker-1', 'task-11')
('worker-2', 'task-12')
('worker-1', 'task-14')
('worker-0', 'task-13')
('worker-1', 'task-16')
('worker-2', 'task-15')
('worker-0', 'task-17')
('worker-1', 'task-18')
('worker-2', 'task-19')


## Summary
- Threads excel at I/O-bound tasks; processes excel at CPU-bound tasks.
- Protect shared state with locks or, better, avoid sharing via queues.
- For CPU pools on Windows, keep pool creation under `if __name__ == "__main__"`.
- Keep functions small and picklable for process pools.