# Multiprocessing

Multiprocessing in Python leverages multiple processes to parallelize computation, with each process having its own memory space. 


<a href="https://colab.research.google.com/github/Ziaeemehr/workshop_hpcpy/blob/main/notebooks/multiprocessing/note_multiprocessing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


In [None]:
import os
import sys

# Check if running on Google Colab
try:
    from google.colab import drive
    IN_COLAB = True
    print("Running on Google Colab")
except ImportError:
    IN_COLAB = False
    print("Running locally")

# Clone repository if on Colab and not already cloned
if IN_COLAB:
    if not os.path.exists('/content/workshop_hpcpy'):
        print("Cloning workshop_hpcpy repository...")
        os.system('git clone https://github.com/Ziaeemehr/workshop_hpcpy.git /content/workshop_hpcpy')
    
    # Change to notebook directory
    os.chdir('/content/workshop_hpcpy/notebooks/multiprocessing')
    print(f"Working directory: {os.getcwd()}")

In [11]:
from time import sleep
import multiprocessing as mp

def square(num):
    # print the process ID
    print(f'ID: {mp.current_process().name}, num: {num}, square: {num * num} \n')
    sleep(1)

with mp.Pool(processes=4) as p:
    p.map(square, range(4))

ID: ForkPoolWorker-48, num: 3, square: 9 
ID: ForkPoolWorker-47, num: 0, square: 0 
ID: ForkPoolWorker-50, num: 1, square: 1 
ID: ForkPoolWorker-49, num: 2, square: 4 






Storing output in a list

In [12]:
import multiprocessing as mp

def square(num):
    return num * num

with mp.Pool(processes=4) as p:
    result =  p.map(square, range(10))
print(result)

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]


`Pool.apply_async()` allows you to execute a function asynchronously in a separate process

In [13]:
from multiprocessing import Pool


def task(n, m):
    return n * n + m

n = 4
with Pool(processes=4) as p:
    async_results = [
        p.apply_async(task, args=(i, j)) for i in range(n) for j in range(n // 2)
    ]
    results = [r.get() for r in async_results]
print(results)

[0, 1, 1, 2, 4, 5, 9, 10]


In [14]:
# Sequential version

for i in range(n):
    for j in range(n // 2):
        print(i, j, i * i + j)

0 0 0
0 1 1
1 0 1
1 1 2
2 0 4
2 1 5
3 0 9
3 1 10


### Adding progress bar

In [15]:
import tqdm

n = 20

with mp.Pool(processes=4) as pool:
    results = []
    for result in tqdm.tqdm(pool.map(square, range(n)), total=n):
        results.append(result)

100%|██████████| 20/20 [00:00<00:00, 205100.44it/s]


In [16]:
# Using apply_async with callback to update progress bar
import numpy as np

def unequal_task(n):
    sleep(np.random.randint(1, 4))
    return n * n

n = 10
with mp.Pool(processes=10) as pool:
    with tqdm.tqdm(total=n, desc="Processing") as pbar:
        async_res = [
            pool.apply_async(
                unequal_task, 
                args=(i,), 
                callback=lambda _: pbar.update(1))
            for i in range(n)
        ]
        results = [res.get() for res in async_res]

Processing: 100%|██████████| 10/10 [00:03<00:00,  3.32it/s]


### Sharing data among processes

If you want to share data between processes, Python's multiprocessing module provides mechanisms like Value and Array for shared memory.

Example below shows how multiple processes can safely update a shared counter using a Lock to avoid race conditions.

In [None]:
from multiprocessing import Value, Lock

# Each of the 3 processes attempts to withdraw money 5 times
# There are 3 processes created

def withdraw_money(balance, lock, amount):
    """Simulate multiple people withdrawing from a shared bank account"""
    for _ in range(5):
        with lock:
            if balance.value >= amount:
                balance.value -= amount
                print(f"Process {mp.current_process().name} withdrew ${amount}. Remaining: ${balance.value}")
            else:
                print(f"Process {mp.current_process().name} couldn't withdraw (insufficient funds)")


# Shared bank account balance
balance = Value('i', 100)  # Start with $100, shared integer
lock = Lock()  # Prevent race conditions

processes = [mp.Process(target=withdraw_money, args=(balance, lock, 10)) for _ in range(3)]
for p in processes:
    p.start()
for p in processes:
    p.join()

print(f"\nFinal balance: ${balance.value}")


Process Process-73 withdrew $10. Remaining: $90

Process Process-73 withdrew $10. Remaining: $80
Process Process-73 withdrew $10. Remaining: $70
Process Process-73 withdrew $10. Remaining: $60
Process Process-73 withdrew $10. Remaining: $50
Process Process-74 withdrew $10. Remaining: $40
Process Process-74 withdrew $10. Remaining: $30
Process Process-74 withdrew $10. Remaining: $20
Process Process-74 withdrew $10. Remaining: $10
Process Process-73 withdrew $10. Remaining: $80
Process Process-73 withdrew $10. Remaining: $70
Process Process-73 withdrew $10. Remaining: $60
Process Process-73 withdrew $10. Remaining: $50
Process Process-74 withdrew $10. Remaining: $40
Process Process-74 withdrew $10. Remaining: $30
Process Process-74 withdrew $10. Remaining: $20
Process Process-74 withdrew $10. Remaining: $10
Process Process-74 withdrew $10. Remaining: $0
Process Process-75 couldn't withdraw (insufficient funds)
Process Process-75 couldn't withdraw (insufficient funds)
Process Process-75 c

### Sharing a List using `Manager`
Here’s how you can use a Manager to share a list among multiple processes:

In [18]:
def process_document(task_id, completed_tasks, results_dict):
    """Simulate processing different documents and storing results"""
    result = f"Document_{task_id}_processed"
    completed_tasks.append(task_id)  # Add task ID to shared list
    results_dict[f"doc_{task_id}"] = result  # Store result in shared dictionary
    print(f"Process {mp.current_process().name} completed task {task_id}")


# Manager() creates a server process that manages shared data structures
# It allows multiple processes to safely access and modify the same objects
with mp.Manager() as manager:
    # Shared list: All processes can append task IDs here
    # Acts like a regular Python list, but works across processes
    completed_tasks = manager.list()
    
    # Shared dictionary: All processes can store key-value pairs here
    # Acts like a regular Python dict, but works across processes
    results_dict = manager.dict()
    
    # Create 4 processes, each will process one document
    processes = [
        mp.Process(target=process_document, args=(i, completed_tasks, results_dict)) 
        for i in range(4)
    ]
    
    # Start all 4 processes (they run in parallel)
    for p in processes:
        p.start()
    
    # Wait for all processes to complete before continuing
    for p in processes:
        p.join()
    
    # Print final results after all processes are done
    print(f"\nCompleted tasks: {list(completed_tasks)}")
    print(f"Results: {dict(results_dict)}")


Process Process-77 completed task 0
Process Process-78 completed task 1
Process Process-79 completed task 2
Process Process-80 completed task 3

Process Process-78 completed task 1
Process Process-79 completed task 2
Process Process-80 completed task 3



Completed tasks: [0, 1, 2, 3]
Results: {'doc_0': 'Document_0_processed', 'doc_1': 'Document_1_processed', 'doc_2': 'Document_2_processed', 'doc_3': 'Document_3_processed'}
