# Seminar 8: Concurrency Practices

## Multithreading

### Basic Multithreading in Python

![GIL](GIL-behaviour.png)

Historically, Python has GIL, so only one thread can be executed at a time.

In [None]:
import threading
import time


def simple_worker(time_wait_seconds: float) -> None:
    print(f"Thread {threading.current_thread().name} started. Sleeping for {time_wait_seconds:.3f} seconds")
    time.sleep(time_wait_seconds)
    print(f"Thread {threading.current_thread().name} finished. Exiting.")


thread1 = threading.Thread(target=simple_worker, args=(2,), name="Worker-1")
thread2 = threading.Thread(target=simple_worker, args=(5,), name="Worker-2")

print("Main: Starting threads")

thread1.start()
thread2.start()

print("Main: Waiting for threads to finish...")

thread1.join()
print("Main: Thread1 joined")
thread2.join()
print("Main: Thread2 joined")

In [None]:
# Sometimes due to mistakes/intentionally threads may never stop
def never_ending_thread() -> None:
    while True:
        time.sleep(2)
        print(f"Thread {threading.current_thread().name}: waiting")


thread = threading.Thread(target=never_ending_thread, name="Thread")
thread.start()
print("Main: Thread started")
thread.join() # Will wait forever in the main thread
print("Main: Thread joined")

In [None]:
# Solution: join with timeout
def never_ending_thread() -> None:
    while True:
        time.sleep(2)
        print(f"Thread {threading.current_thread().name}: waiting")


thread = threading.Thread(target=never_ending_thread, name="Thread")
thread.start()
print("Main: Thread started")
thread.join(timeout=5) # Always whenever you can, put timeouts for safety.
print("Main: Thread joined")

Good use case for threading is the parallel downloading

In [None]:
# Bad version (sequential download)

import requests
import time

urls = [
    'https://www.example.com',
    'https://www.python.org',
    'https://httpbin.org/get',
    'https://www.github.com',
    'https://www.stackoverflow.com'
]

def download_url(url):
    response = requests.get(url)
    print(f"Downloaded {url} with status {response.status_code}")

start_time = time.time()

# Sequential execution
for url in urls:
    download_url(url)

duration = time.time() - start_time
print(f"Downloaded {len(urls)} URLs in {duration:.2f} seconds sequentially.")

In [None]:
# Good version (multithreading)

import requests
import threading
import time

urls = [
    'https://www.example.com',
    'https://www.python.org',
    'https://httpbin.org/get',
    'https://www.github.com',
    'https://www.stackoverflow.com'
]

def download_url(url):
    response = requests.get(url)
    print(f"Downloaded {url} with status {response.status_code}")

start_time = time.time()
threads = []

for url in urls:
    thread = threading.Thread(target=download_url, args=(url,))
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

duration = time.time() - start_time
print(f"Downloaded {len(urls)} URLs in {duration:.2f} seconds with threading.")

In [None]:
# Bonus: cleaner example (spoiler: joblib)

import requests
import time
from joblib import Parallel, delayed

urls = [
    'https://www.example.com',
    'https://www.python.org',
    'https://httpbin.org/get',
    'https://www.github.com',
    'https://www.stackoverflow.com'
]

def download_url(url):
    response = requests.get(url)
    print(f"Downloaded {url} with status {response.status_code}")
    return response.status_code

start_time = time.time()

results = Parallel(n_jobs=-1, backend="threading", prefer="threads")(
    delayed(download_url)(url) for url in urls
)

duration = time.time() - start_time
print(f"Downloaded {len(urls)} URLs in {duration:.2f} seconds with joblib threading.")
print(f"Results: {results}")

### Thread Synchronization

The most challenging part of a concurrent program is when threads need to communicate with each other. 

In [None]:
import threading
import time

bank_account = 100
print(f"Initial balance: ${bank_account}")

def withdraw(amount):
    global bank_account
    
    # Simulate some delay (e.g., checking the balance, processing the request)
    time.sleep(0.001)
    
    # --- CRITICAL SECTION START ---
    # Check if there's enough money
    if bank_account >= amount:
        # Simulate some processing
        time.sleep(0.001)
        # Withdraw the money
        bank_account -= amount
        print(f"{threading.current_thread().name} withdrew ${amount}. New balance: ${bank_account}")
    else:
        print(f"{threading.current_thread().name} failed to withdraw ${amount}. Insufficient funds.")
    # --- CRITICAL SECTION END ---

husband = threading.Thread(target=withdraw, args=(90,), name='Husband')
wife = threading.Thread(target=withdraw, args=(90,), name='Wife')

husband.start()
wife.start()

husband.join()
wife.join()

print(f"Final balance: ${bank_account}")

What happened? Both threads checked if the bank has sufficient funds. But somehow it ended up with a negative balance.
The behavior that we observe is called **a Race Condition**: multiple threads interact with shared resources at the same time.

The part of the code where such crazy stuff can happen, is called **a Critical Section**.
We need to ensure that only one thread enters Critical Section at a time.
The most straightforward way to do that is Mutex.

![mutex](mutex.png)

In [None]:
# Solution

import threading
import time

bank_account = 100
print(f"Initial balance: ${bank_account}")

account_lock = threading.Lock()

def withdraw_safely(amount):
    global bank_account
    
    time.sleep(0.001)
    
    # lock.acquire()
    with account_lock:  # This automatically handles lock.acquire() and lock.release()
        # --- CRITICAL SECTION START ---
        print(f"{threading.current_thread().name} is checking balance...")
        
        if bank_account >= amount:
            time.sleep(0.001)
            bank_account -= amount
            print(f"{threading.current_thread().name} withdrew ${amount}. New balance: ${bank_account}")
        else:
            print(f"{threading.current_thread().name} failed to withdraw ${amount}. Insufficient funds.")
        # --- CRITICAL SECTION END ---
    # lock.release()

husband = threading.Thread(target=withdraw_safely, args=(90,), name='Husband')
wife = threading.Thread(target=withdraw_safely, args=(90,), name='Wife')

husband.start()
wife.start()

husband.join()
wife.join()

print(f"Final balance: ${bank_account}")

In [None]:
# Cleaner example
import threading
import time

# It's usually better to wrap locks inside the services, so that the main code doesn't even see the inner lock.
class BankAccount:
    def __init__(self, amount: int):
        self._amount = amount
        self._lock = threading.Lock()
    
    def withdraw(self, request_amount: int) -> bool:
        with self._lock:
            if request_amount >= self._amount:
                return False
            self._amount -= request_amount
            return True
    
    def get_balance(self) -> int:
        return self._amount

bank_account = BankAccount(amount=100)
print(f"Initial balance: ${bank_account.get_balance()}")


def withdraw_safely(amount):
    global bank_account
    
    time.sleep(0.001)
    
    success = bank_account.withdraw(amount)
    if success:
        print(f"{threading.current_thread().name} withdrew ${amount}. New balance: ${bank_account.get_balance()}")
    else:
        print(f"{threading.current_thread().name} failed to withdraw ${amount}. Insufficient funds.")
    
husband = threading.Thread(target=withdraw_safely, args=(90,), name='Husband')
wife = threading.Thread(target=withdraw_safely, args=(90,), name='Wife')

husband.start()
wife.start()

husband.join()
wife.join()

print(f"Final balance: ${bank_account.get_balance()}")

**The Dining Philosophers**

![Dining-philosophers](dining-philosophers.png)

In [None]:
import threading
import time
import random


class Philosopher:
    def __init__(self, index: int, left_fork: threading.Lock, right_fork: threading.Lock):
        self.index = index
        self._left_fork = left_fork
        self._right_fork = right_fork
    
    def think(self):
        print(f"Philosopher {self.index} is thinking...")
        time.sleep(random.uniform(0.1, 0.5))
    
    def pick_left_fork(self):
        print(f"Philosopher {self.index} is hungry, trying to pick up left fork {self.index}")
        self._left_fork.acquire()
        time.sleep(0.4)
    
    def pick_right_fork(self):
        print(f"Philosopher {self.index} is hungry, trying to pick up right fork {(self.index + 1) % PHILOSOPHERS_COUNT}")
        self._right_fork.acquire()
        time.sleep(0.4)
    
    def eat(self):
        print(f"Philosopher {self.index} is EATING!")
        time.sleep(random.uniform(0.2, 0.4))
    
    def release_forks(self):
        self._right_fork.release()
        print(f"Philosopher {self.index} put down right fork")
        self._left_fork.release()
        print(f"Philosopher {self.index} put down left fork")
    

PHILOSOPHERS_COUNT = 5
forks = [threading.Lock() for _ in range(PHILOSOPHERS_COUNT)]
philosophers = []
for i in range(PHILOSOPHERS_COUNT):
    philosophers.append(
        Philosopher(
            index=i,
            left_fork=forks[i],
            right_fork=forks[(i + 1) % PHILOSOPHERS_COUNT],
        )
    )


def philosopher_bad(philosopher: Philosopher):
    while True:
        philosopher.think()
        philosopher.pick_left_fork()
        philosopher.pick_right_fork()
        philosopher.eat()
        philosopher.release_forks()


philosopher_threads = []
print('Main: Starting simulation')
for philosopher in philosophers:
    philosopher_thread = threading.Thread(target=philosopher_bad, args=(philosopher,), daemon=True)
    philosopher_threads.append(philosopher_thread)
    philosopher_thread.start()

time.sleep(5)
print('Main: Ending simulation')
for philosopher_thread in philosopher_threads:
    philosopher_thread.join(timeout=0.1)

With the naive approach we encountered a common scenario in multithreading, called **Deadlock**. No philosopher can eat, because everyone has grabbed his left fork.

In [None]:
# Good solution: always pick the lowest number fork first. Then, there will be no deadlocks.
PHILOSOPHERS_COUNT = 5
forks = [threading.Lock() for _ in range(PHILOSOPHERS_COUNT)]
philosophers = []
for i in range(PHILOSOPHERS_COUNT):
    philosophers.append(
        Philosopher(
            index=i,
            left_fork=forks[i],
            right_fork=forks[(i + 1) % PHILOSOPHERS_COUNT],
        )
    )

def philosopher_good(philosopher: Philosopher):
    while True:
        philosopher.think()
        if philosopher.index < PHILOSOPHERS_COUNT - 1:
            philosopher.pick_left_fork()
            philosopher.pick_right_fork()
        else:
            philosopher.pick_right_fork()
            philosopher.pick_left_fork()
        philosopher.eat()
        philosopher.release_forks()


philosopher_threads = []
print('Main: Starting simulation')
for philosopher in philosophers:
    philosopher_thread = threading.Thread(target=philosopher_good, args=(philosopher,), daemon=True)
    philosopher_threads.append(philosopher_thread)
    philosopher_thread.start()

time.sleep(5)
print('Main: Ending simulation')
for philosopher_thread in philosopher_threads:
    philosopher_thread.join(timeout=0.1)

In [None]:
# Bonus: Arbitrator solution with Semaphore: ask permission to eat

PHILOSOPHERS_COUNT = 5
forks = [threading.Lock() for _ in range(PHILOSOPHERS_COUNT)]
arbitrator = threading.Semaphore(PHILOSOPHERS_COUNT - 1)
philosophers = []
for i in range(PHILOSOPHERS_COUNT):
    philosophers.append(
        Philosopher(
            index=i,
            left_fork=forks[i],
            right_fork=forks[(i + 1) % PHILOSOPHERS_COUNT],
        )
    )
    
def philosopher_arbitrator(philosopher: Philosopher):
    while True:
        philosopher.think()
        arbitrator.acquire()
        philosopher.pick_left_fork()
        philosopher.pick_right_fork()
        philosopher.eat()
        philosopher.release_forks()
        arbitrator.release()


philosopher_threads = []
print('Main: Starting simulation')
for philosopher in philosophers:
    philosopher_thread = threading.Thread(target=philosopher_arbitrator, args=(philosopher,), daemon=True)
    philosopher_threads.append(philosopher_thread)
    philosopher_thread.start()

time.sleep(5)
print('Main: Ending simulation')
for philosopher_thread in philosopher_threads:
    philosopher_thread.join(timeout=0.1)

## Multiprocessing

### Bypassing the GIL

Problem with threads: they do not bypass GIL

In [None]:
import time
import math

def cpu_bound_task(number):
    return math.sqrt(number)

numbers = range(5_000_000)
start_time = time.time()
for number in numbers:
    cpu_bound_task(number)

end_time = time.time()

print(f"Sequential execution took {end_time - start_time:.2f} seconds.")

In [None]:
import time
import math
import threading

def cpu_bound_task(number):
    return math.sqrt(number)


def calculate_chunk(chunk):
    for number in chunk:
        cpu_bound_task(number)

numbers = range(5_000_000)
threads = []
start_time = time.time()
NUM_WORKERS = 12
chunk_size = (len(numbers) + NUM_WORKERS - 1) // NUM_WORKERS
chunks = [numbers[i:i + chunk_size] for i in range(0, len(numbers), chunk_size)]

for chunk in chunks:
    thread = threading.Thread(target=calculate_chunk, args=(chunk,))
    threads.append(thread)
    thread.start()

print(len(threads))
for thread in threads:
    thread.join()

end_time = time.time()

print(f"Multithreading execution took {end_time - start_time:.2f} seconds.")

In [None]:
import time
import math
import multiprocessing

def cpu_bound_task(number):
    return math.sqrt(number)


def calculate_chunk(chunk):
    for number in chunk:
        cpu_bound_task(number)

numbers = range(5_000_000)
# threads = []
processes = []
start_time = time.time()
NUM_WORKERS = 12
chunk_size = (len(numbers) + NUM_WORKERS - 1) // NUM_WORKERS
chunks = [numbers[i:i + chunk_size] for i in range(0, len(numbers), chunk_size)]

for chunk in chunks:
    # thread = threading.Thread(target=calculate_chunk, args=(chunk,))
    process = multiprocessing.Process(target=calculate_chunk, args=(chunk,))
    # threads.append(thread)
    processes.append(process)
    # thread.start()
    process.start()

# print(len(threads))
# for thread in threads:
#     thread.join()

print(len(processes))
for process in processes:
    process.join()

end_time = time.time()

print(f"Multiprocessing execution took {end_time - start_time:.2f} seconds.")

In [None]:
# We don't have do the mapping ourselves: multiprocessing.Pool()

import math
import multiprocessing

def cpu_bound_task(number):
    return math.sqrt(number)

numbers = range(5_000_000)

with multiprocessing.Pool() as pool:
    results = pool.map(cpu_bound_task, numbers)

results[:10]

### Pools

In [None]:
def worker_task(x):
    time.sleep(0.1)  # Simulate work
    result = x * x
    return (x, result) # Return both input and result

In [None]:
def example_map():
    print("=== map() example ===")
    print("Processing all items, waiting for all, returning in order...")
    
    data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    
    start_time = time.time()
    
    with multiprocessing.Pool(processes=4) as pool:
        results = pool.map(worker_task, data)
    
    end_time = time.time()
    
    print(f"Results: {results}")
    print(f"Time taken: {end_time - start_time:.2f} seconds")
    print()

example_map()

In [None]:
def example_map_async():
    print("=== map_async() example ===")
    print("Starting processing, returning immediately, can wait for results later...")
    
    data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    
    start_time = time.time()
    
    with multiprocessing.Pool(processes=4) as pool:
        async_result = pool.map_async(worker_task, data)
        
        print("Doing other work while processes run...")
        time.sleep(0.2)
        
        results = async_result.get()
    
    end_time = time.time()
    
    print(f"Results: {results}")
    print(f"Time taken: {end_time - start_time:.2f} seconds")
    print()
    
example_map_async()

In [None]:
def example_imap():
    print("=== imap() example ===")
    print("Processing items, yielding results IN ORDER as they complete...")
    
    data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    
    start_time = time.time()
    
    with multiprocessing.Pool(processes=4) as pool:
        results_iterator = pool.imap(worker_task, data)
        
        print("Results (will appear in original order, not completion order):")
        for i, result in enumerate(results_iterator, 1):
            print(f"  Received result {i}: {result}")
            time.sleep(0.05)  # Simulate processing results
    
    end_time = time.time()
    
    print(f"Time taken: {end_time - start_time:.2f} seconds")
    print()

example_imap()

In [None]:
import random

def variable_worker_task(x):
        sleep_time = random.uniform(0.05, 0.3)
        time.sleep(sleep_time)
        return (x, x * x, f"slept {sleep_time:.3f}s")


def example_imap_unordered():
    print("=== imap_unordered() example ===")
    print("Processing items, yielding results IN COMPLETION ORDER (fastest first)...")
    
    data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    
    start_time = time.time()
    
    with multiprocessing.Pool(processes=4) as pool:
        results_iterator = pool.imap_unordered(variable_worker_task, data)
        
        print("Results (will appear in completion order):")
        for i, result in enumerate(results_iterator, 1):
            print(f"  Received result {i}: {result}")
            time.sleep(0.05)  # Simulate processing results
    
    end_time = time.time()
    
    print(f"Time taken: {end_time - start_time:.2f} seconds")
    print()

example_imap_unordered()

In [None]:
print("=== When to Use Which Method ===")
print()
print("Use map():")
print("  - When you need ALL results before continuing")
print("  - When you need results in original order")
print("  - For simple parallel processing")
print()
print("Use map_async():")
print("  - When you want to start processing and do other work")
print("  - When you need non-blocking execution")
print()
print("Use imap():")
print("  - When processing LARGE datasets that don't fit in memory")
print("  - When you want to process results incrementally IN ORDER")
print("  - When you might need to stop early (break from loop)")
print()
print("Use imap_unordered():")
print("  - When you want to process results as soon as they're ready")
print("  - When order doesn't matter")
print("  - For progress monitoring or streaming results")
print("  - Often the fastest way to process all items")

### Basic IPC

In [None]:
import multiprocessing
import time

data = [0]

def worker():
    data[0] += 1
    print(f"[child] data = {data}")

if __name__ == "__main__":
    print(f"[parent before] data = {data}")
    p = multiprocessing.Process(target=worker)
    p.start()
    p.join()

    print(f"[parent after]  data = {data}")

We can see that communication between processes is not as easy as with threads. In reality, the process COPIES all the variables from the parent, and later modifies those copies only. 

In [None]:
# Queue: passing data between messages

import multiprocessing
import random
import time


def worker(q, x):
    time.sleep(random.uniform(0.1, 0.2)) # simulate computation
    q.put(x * x)

if __name__ == "__main__":
    q = multiprocessing.Queue()
    xs = [1, 2, 3, 4]

    procs = [multiprocessing.Process(target=worker, args=(q, x)) for x in xs]
    for p in procs: p.start()
    for p in procs: p.join()

    results = [q.get() for _ in xs]
    print(results) # Unordered

In [None]:
import multiprocessing

def worker(child_conn):
    x = child_conn.recv()      # blocks until parent sends
    child_conn.send(x + 10)    # reply
    child_conn.close()

if __name__ == "__main__":
    parent_conn, child_conn = multiprocessing.Pipe()

    p = multiprocessing.Process(target=worker, args=(child_conn,))
    p.start()

    parent_conn.send(32)            # send to child
    reply = parent_conn.recv()      # receive from child
    print(reply)

    p.join()

In [None]:
# Deadlocks: here we go again

import multiprocessing
import time

def child(q_c2p, q_p2c):
    print('CHILD START')
    q_c2p.put("child: hello")     # succeeds (queue was empty)
    time.sleep(0.1)
    q_p2c.put("child: follow-up") # <-- blocks forever if parent has filled q_p2c
    print('CHILD FINISH')

q_p2c = multiprocessing.Queue(maxsize=1)   # parent -> child
q_c2p = multiprocessing.Queue(maxsize=1)   # child  -> parent

p = multiprocessing.Process(target=child, args=(q_c2p, q_p2c))
p.start()

print('PARENT START')
q_p2c.put("parent: hello")     # succeeds (queue was empty)
time.sleep(0.1)
q_c2p.put("parent: follow-up") # <-- blocks forever; child is also blocked above

print('PARENT FINISH')
print(q_c2p.get())
print(q_p2c.get())
p.join()

## Concurrency: Parallelizing Tasks

Now let's talk about parallelization in general and libraries to do that. As a toy example, we will be using (again) taking square root.

In [None]:
import time
import threading
import math
import multiprocessing
import joblib
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor


def worker_task(number: int):
    for i in range(1000):
        if number % 2 == 0:
            number = number // 2
        else:
            number = 3 * number + 1
    return number


START_NUM = 1
END_NUM = 50000 + 1
num_to_index = lambda num: num - 1
index_to_num = lambda idx: idx + 1


def naive_implementation():
    results = [0 for i in range(START_NUM, END_NUM)]
    for i, num in enumerate(range(START_NUM, END_NUM)):
        results[i] = worker_task(num)
    return results


def threading_implementation(max_workers=4):
    results = [0 for i in range(START_NUM, END_NUM)]
    threads = []
    numbers_per_thread = math.ceil(len(results) / max_workers)
    
    def thread_worker(start_idx, end_idx):
        for idx in range(start_idx, end_idx):
            results[idx] = worker_task(index_to_num(idx))
    
    for i in range(max_workers):
        start_idx = i * numbers_per_thread
        end_idx = min(start_idx + numbers_per_thread, len(results))
        thread = threading.Thread(target=thread_worker, args=(start_idx, end_idx))
        threads.append(thread)
        thread.start()
        
    for thread in threads:
        thread.join()
    return results


def multiprocessing_implementation(max_workers=4):
    results = [0 for i in range(START_NUM, END_NUM)]
    processes = []
    numbers_per_thread = math.ceil(len(results) / max_workers)
    result_queue = multiprocessing.Queue()
    
    def process_worker(start_idx, end_idx):
        batch_results = []
        for idx in range(start_idx, end_idx):
            batch_results.append((idx, worker_task(index_to_num(idx))))
        result_queue.put(batch_results)
    
    for i in range(max_workers):
        start_idx = i * numbers_per_thread
        end_idx = min(start_idx + numbers_per_thread, len(results))
        process = multiprocessing.Process(target=process_worker, args=(start_idx, end_idx))
        processes.append(process)
        process.start()
    
    for _ in range(max_workers):
        batch_results = result_queue.get()
        for idx, result in batch_results:
            results[idx] = result
    
    for process in processes:
        process.join()
    return results


def threadpool_implementation(max_workers=4):
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        results = list(executor.map(worker_task, range(START_NUM, END_NUM)))
    return results


def processpool_implementation(max_workers=4):
    with ProcessPoolExecutor(max_workers=max_workers) as executor:
        results = list(executor.map(worker_task, range(START_NUM, END_NUM), chunksize=1000))
    return results


def joblib_threading_implementation(n_jobs=4):
    results = joblib.Parallel(n_jobs=n_jobs, backend='threading')(
        joblib.delayed(worker_task)(i) for i in range(START_NUM, END_NUM)
    )
    return results


def joblib_multiprocessing_implementation(n_jobs=4):
    results = joblib.Parallel(n_jobs=n_jobs, backend='multiprocessing')(
        joblib.delayed(worker_task)(i) for i in range(START_NUM, END_NUM)
    )
    return results


def joblib_loky_implementation(n_jobs=4):
    results = joblib.Parallel(n_jobs=n_jobs, backend='loky')(
        joblib.delayed(worker_task)(i) for i in range(START_NUM, END_NUM)
    )
    return results


In [None]:
def benchmark_implementation(func, *args, **kwargs):
    start_time = time.time()
    results = func(*args, **kwargs)
    end_time = time.time()
    return end_time - start_time, results

implementations = [
    ("Naive Sequential", naive_implementation),
    ("Threading (4 threads)", lambda: threading_implementation(4)),
    ("Threading (12 threads)", lambda: threading_implementation(12)),
    ("Threading (100 threads)", lambda: threading_implementation(100)),
    ("Multiprocessing (4 proc)", lambda: multiprocessing_implementation(4)),
    ("Multiprocessing (12 proc)", lambda: multiprocessing_implementation(12)),
    ("ThreadPool (4 workers)", lambda: threadpool_implementation(4)),
    ("ThreadPool (12 workers)", lambda: threadpool_implementation(12)),
    ("ThreadPool (100 workers)", lambda: threadpool_implementation(100)),
    ("ProcessPool (4 workers)", lambda: processpool_implementation(4)),
    ("ProcessPool (12 workers)", lambda: processpool_implementation(12)),
    ("Joblib Threading (4)", lambda: joblib_threading_implementation(4)),
    ("Joblib Threading (12)", lambda: joblib_threading_implementation(12)),
    ("Joblib Threading (100)", lambda: joblib_threading_implementation(100)),
    ("Joblib Multiprocessing (4)", lambda: joblib_multiprocessing_implementation(4)),
    ("Joblib Multiprocessing (12)", lambda: joblib_multiprocessing_implementation(12)),
    ("Joblib Loky (4)", lambda: joblib_loky_implementation(4)),
    ("Joblib Loky (12)", lambda: joblib_loky_implementation(12)),
]

print("Benchmarking parallelization approaches (4 workers/threads):")
print("=" * 65)

_, reference_result = benchmark_implementation(naive_implementation)

results = []
for name, func in implementations:
    try:
        execution_time, result = benchmark_implementation(func)
        
        # Verify results match
        is_correct = all(
            ref == comp for ref, comp in zip(reference_result, result)
        )
        status = "✓" if is_correct else "✗"
        
        results.append((name, execution_time, status))
        print(f"{name:25s}: {execution_time:6.3f} seconds {status}")
        
    except Exception as e:
        results.append((name, float('inf'), f"Error: {str(e)}"))
        print(f"{name:25s}: FAILED - {str(e)}")

## Exercise: Mandelbrot Set

In this task, the goal is to create a Mandelbrot set. The simple implementation is given below. Now, we need to think how to parallelize it.

In [None]:
def create_mandelbrot_image_simple(
        x_min: float,
        x_max: float,
        y_min: float,
        y_max: float,
        width: int,
        height: int,
        max_iter: int = 200
):
    x_step = (x_max - x_min) / width
    y_step = (y_max - y_min) / height
    
    C = [[0 for x in range(width)] for y in range(height)]
    Z = [[0 for x in range(width)] for y in range(height)]
    active = [[True for x in range(width)] for y in range(height)]
    escape_iter = [[max_iter for x in range(width)] for y in range(height)]
    for i in range(height):
        for j in range(width):
            C[i][j] = (x_min + j * x_step) + 1j * (y_min + i * y_step)
    
    for iter_num in range(max_iter):
        for i in range(height):
            for j in range(width):
                if active[i][j]:
                    Z[i][j] = Z[i][j] * Z[i][j] + C[i][j]
                    if abs(Z[i][j]) > 2:
                        escape_iter[i][j] = iter_num
                        active[i][j] = False
    return escape_iter

In [None]:
import matplotlib.pyplot as plt
import time


def visualize(
        fn, 
        width=400,
        height=400,
        xmin=-2.5,
        xmax=1.0,
        y_center=0.0,
        **kwargs
):
    y_span = (xmax - xmin) * (height / width) / 2.0
    ymin, ymax = y_center - y_span, y_center + y_span
    
    time_start = time.time()
    img = fn(xmin, xmax, ymin, ymax, width, height, max_iter=250, **kwargs)
    print(f"Time: {time.time() - time_start:.3f} s")
    
    plt.figure(figsize=(6, 6))
    plt.imshow(img, extent=(xmin, xmax, ymin, ymax), origin="lower", cmap="turbo")
    plt.axis("off")
    plt.tight_layout(pad=0)

In [None]:
visualize(create_mandelbrot_image_simple)

We can notice that each pixel is generated independently. That gives us opportunity for parallelization. Let's perform multiprocessing by vertical coordinate (y)

In [None]:
import multiprocessing


def create_mandelbrot_image_multiprocess(
        x_min: float,
        x_max: float,
        y_min: float,
        y_max: float,
        width: int,
        height: int,
        max_iter: int = 200,
        num_workers: int = 4
):
    chunk_size = height // num_workers
    y_step = (y_max - y_min) / height
    
    chunks = [
        (
            x_min,
            x_max,
            y_min + i * y_step, 
            y_min + (i + chunk_size) * y_step,
            width,
            chunk_size,
            max_iter
        )
        for i in range(0, height, chunk_size)
    ]
    
    with multiprocessing.Pool(processes=num_workers) as pool:
        sub_images = pool.starmap(create_mandelbrot_image_simple, chunks)
    
    result = []
    for sub_image in sub_images:
        result += sub_image
    return result

In [None]:
visualize(create_mandelbrot_image_multiprocess, num_workers=12)

When working with arrays, it's too hard to parallelize everything by hand. Furthermore, python lists are not memory-efficient, so we might lose time on creating and modifying them. That's why usually numpy is used for working with arrays. You will learn it in more details later, but now you can check how it speeds up processing significantly.

In [None]:
import numpy as np


def create_mandelbrot_image_numpy(
        x_min: float,
        x_max: float,
        y_min: float,
        y_max: float,
        width: int,
        height: int,
        max_iter: int = 200
):
    
    # C = [[0 for x in range(width)] for y in range(height)]
    # for i in range(height):
    #     for j in range(width):
    #         C[i][j] = (x_min + j * x_step) + 1j * (y_min + i * y_step)
    x_range = np.linspace(x_min, x_max, width)
    y_range = np.linspace(y_min, y_max, height)
    C = x_range + 1j * y_range[:, None]
    
    # Z = [[0 for x in range(width)] for y in range(height)]
    Z = np.zeros_like(C)
    
    # active = [[True for x in range(width)] for y in range(height)]
    active = np.full_like(Z, fill_value=True, dtype=bool)
    
    # escape_iter = [[max_iter for x in range(width)] for y in range(height)]    
    escape_iter = np.full_like(C, fill_value=max_iter, dtype=np.uint16)
    
    for iter_num in range(max_iter):
        # for i in range(height):
        #     for j in range(width):
        #         if active[i][j]:
        #             Z[i][j] = Z[i][j] * Z[i][j] + C[i][j]
        #             if abs(Z[i][j]) > 2:
        #                 escape_iter[i][j] = iter_num
        #                 active[i][j] = False
        Z[active] = Z[active] * Z[active] + C[active]
        escape_mask = np.abs(Z) > 2
        escape_iter[escape_mask & active] = iter_num
        active[escape_mask] = False
        active = ~escape_mask
    
    return escape_iter

In [None]:
visualize(create_mandelbrot_image_numpy)

Now, let's combine numpy with multiprocessing

In [None]:
def create_mandelbrot_image_numpy_multiprocess(
        x_min: float,
        x_max: float,
        y_min: float,
        y_max: float,
        width: int,
        height: int,
        max_iter: int = 200,
        num_workers: int = 4
):
    chunk_size = height // num_workers
    y_step = (y_max - y_min) / height
    
    chunks = [
        (
            x_min,
            x_max,
            y_min + i * y_step, 
            y_min + (i + chunk_size) * y_step,
            width,
            chunk_size,
            max_iter
        )
        for i in range(0, height, chunk_size)
    ]
    
    with multiprocessing.Pool(processes=num_workers) as pool:
        sub_images = pool.starmap(create_mandelbrot_image_numpy, chunks)
    
    return np.vstack(sub_images)

In [None]:
visualize(create_mandelbrot_image_numpy_multiprocess, num_workers=12)

In [None]:
visualize(create_mandelbrot_image_numpy_multiprocess, num_workers=12, width=1280, height=720)

For heavy mathematical tasks, usually the best choice is compiling your code with numba. This makes it a lot faster, even than numpy version.

In [None]:
from numba import jit, prange

@jit(nopython=True, parallel=True, fastmath=True)
def create_mandelbrot_image_numba(
        x_min: float,
        x_max: float,
        y_min: float,
        y_max: float,
        width: int,
        height: int,
        max_iter: int = 200
):
    result = np.empty((height, width), np.uint16)
    dx = (x_max - x_min) / width
    dy = (y_max - y_min) / height

    for i in prange(height):
        y = y_min + i * dy
        row = result[i]
        for j in range(width):
            x = x_min + j * dx

            zx = 0.0
            zy = 0.0
            iter_num = 0
            while (zx*zx + zy*zy) <= 4.0 and iter_num < max_iter:
                zx_new  = zx*zx - zy*zy + x
                zy  = 2.0*zx*zy + y
                zx = zx_new
                iter_num += 1

            row[j] = iter_num
    return result

In [None]:
# First compile is slow: that's how JIT works
visualize(create_mandelbrot_image_numba)

In [None]:
# But now it's blazingly fast!!!
visualize(create_mandelbrot_image_numba)

Now, let's create a zooming video in the set. Remember, that we need to be memory-efficient as frames take a lot of memory. That's why, we'll use Generator

In [None]:
from typing import Generator
import numpy as np
from tqdm.notebook import tqdm


def generate_mandelbrot_video_frames(
        x_min_start: float,
        x_max_start: float,
        y_min_start: float,
        y_max_start: float,
        target_x: float,
        target_y: float,
        width: int,
        height: int,
        num_frames: int,
        zoom_factor_per_frame: float,
        max_iter: int = 200
) -> Generator[np.ndarray, None, None]:
    x_min, x_max, y_min, y_max = x_min_start, x_max_start, y_min_start, y_max_start
    for frame_num in tqdm(range(num_frames)):
        frame = create_mandelbrot_image_numba(
            x_min=x_min,
            x_max=x_max,
            y_min=y_min,
            y_max=y_max,
            width=width,
            height=height,
            max_iter=max_iter
        )
        
        yield frame

        current_center_x = (x_min + x_max) / 2
        current_center_y = (y_min + y_max) / 2

        current_width = x_max - x_min
        current_height = y_max - y_min

        new_width = current_width / zoom_factor_per_frame
        new_height = current_height / zoom_factor_per_frame

        zoom_center_weight = 0.1
        new_center_x = current_center_x * (1 - zoom_center_weight) + target_x * zoom_center_weight
        new_center_y = current_center_y * (1 - zoom_center_weight) + target_y * zoom_center_weight

        x_min = new_center_x - new_width / 2
        x_max = new_center_x + new_width / 2
        y_min = new_center_y - new_height / 2
        y_max = new_center_y + new_height / 2

Now, we create imageio VideoWriter context class for convenient video saving

In [None]:
import imageio
from dataclasses import dataclass, field
import numpy as np


@dataclass
class VideoWriter:
    output_path: str
    fps: float = 30
    codec: str = 'libx264'
    pix_fmt: str = 'yuv420p'
    _writer: imageio.core.format.Format.Writer | None = field(default=None, init=False, repr=False)
    
    def __enter__(self):
        self._writer = imageio.get_writer(
            self.output_path,
            format="FFMPEG",
            fps=self.fps,
            codec=self.codec,
            pixelformat=self.pix_fmt
        )
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        if self._writer is not None:
            self._writer.close()
            self._writer = None
    
    
    def add(self, frame: np.ndarray) -> None:
        if self._writer is None:
            raise RuntimeError("Must be used as context manager")
        
        frame = np.asarray(frame)
        if frame.ndim != 3 or frame.shape[2] != 3:
            raise ValueError(f"Unsupported frame shape: {frame.shape}")
        
        self._writer.append_data(frame)

In [None]:
import matplotlib.pyplot as plt
from PIL import Image

turbo_cmap = plt.colormaps['turbo']
def transform_frame(frame: np.ndarray, max_iter: int, scale_down: int | None = None):
    frame = np.asarray(frame)
    normalized = np.clip(frame.astype(np.float32) / max_iter, 0, 1)
    colored_frame = turbo_cmap(normalized)
    colored_frame_uint8 = (colored_frame[:, :, :3] * 255).astype(np.uint8)
    
    if scale_down is not None and scale_down > 1:
        h, w = colored_frame_uint8.shape[:2]
        colored_frame_uint8 = np.array(
            Image.fromarray(colored_frame_uint8).resize((w // scale_down, h // scale_down), resample=Image.LANCZOS)
        )
    
    return colored_frame_uint8

In [None]:
initial_view = (-2.0, 0.8, -1.25, 1.25) # x_min, x_max, y_min, y_max
target_point = (-0.743653884279361, 0.131822903750375)  # A interesting point near the "seahorse valley"
resolution = (640, 480)
video_length_frames = 800
zoom_per_frame = 1.04
fps = 30
max_iterations = 400
scale_down = 3

with VideoWriter("mandelbrot_zoom.mp4", fps=fps) as writer:
    for frame in generate_mandelbrot_video_frames(
        x_min_start=initial_view[0],
        x_max_start=initial_view[1],
        y_min_start=initial_view[2],
        y_max_start=initial_view[3],
        target_x=target_point[0],
        target_y=target_point[1],
        width=resolution[0] * scale_down,
        height=resolution[1] * scale_down,
        num_frames=video_length_frames,
        zoom_factor_per_frame=zoom_per_frame,
        max_iter=max_iterations
    ):
        transformed_frame = transform_frame(frame, max_iterations, scale_down=scale_down)
        writer.add(transformed_frame)

In [None]:
# from ipywidgets import Video
# 
# Video.from_file("mandelbrot_zoom.mp4", width=640, height=480)