# The multiprocess (or multiprocessing module)

The Global Interpreter Lock (GIL) is a mechanism used in CPython (the default implementation of the Python programming language) to ensure that only one thread executes Python bytecodes at a time. This lock is necessary because CPython's memory management is not thread-safe.

This basically means using threads (that share the same memory space) are difficult to do in python (unless we go cython). But we can use different processes.

However, the GIL does not apply to multi-process programming, as each process runs in its own memory space and has its own GIL. In a multi-process environment, multiple processes can run Python bytecodes concurrently, each in its own separate process. This means that the GIL is not a bottleneck in multi-process programming, and can provide better performance on multi-core systems compared to multi-threading.

In other words, multiprocessing allows you to bypass the GIL by creating separate processes, each with its own interpreter instance, which can run simultaneously on multiple cores. This makes it possible to take full advantage of multi-core systems and improve the performance of CPU-bound and parallelizable tasks in Python.

## The process function

In [1]:
import multiprocessing
import time

def worker(number):
    """A function that runs in a separate process."""
    print(f"Worker {number} started")
    time.sleep(1)
    print(f"Worker {number} finished")

# Create a process for each worker
processes = [multiprocessing.Process(target=worker, args=(i,)) for i in range(5)]

# Start the processes
for process in processes:
    process.start()

# Wait for all processes to finish
for process in processes:
    process.join()

Worker 0 started
Worker 1 started
Worker 2 started
Worker 3 started
Worker 4 started
Worker 0 finished
Worker 1 finished
Worker 2 finished
Worker 3 finished
Worker 4 finished


### What is going

This represents one of (many) ways to do multi-processing in python.

- We are creating 5 new processes.
- Each process is going to be creating running the function worker.
- We are going to pass the arguments 1 through 5 to the function
- We use the function start to start the process
- We use the function join to stop the main thread until the given thread has completed.

## The pool function 

In [2]:
import multiprocessing

def worker(x):
    """A function that takes a number and squares it."""
    return x ** 2

with multiprocessing.Pool(processes=4) as pool:
    # Use the map function to apply the worker function to each element of the list
    result = pool.map(worker, [1, 2, 3, 4, 5])

print(result)


[1, 4, 9, 16, 25]


## What is going

- the Pool command says that we are going to create 4 new parallel workers
- we are then mapping 5 different jobs for those works to do
- each new parallel job will call the function worker
    - it will pass the argument from our mapped list to the function call


## Locks

In [None]:
import multiprocessing
import time

def worker(worker_id, lock, resource):
    """A function that runs in a separate process."""
    for i in range(10):
        with lock:
            print(f"Worker {worker_id} is accessing the resource")
            time.sleep(0.1)
            resource.value = resource.value + 1
            print(f"Worker {worker_id} has updated the resource to {resource.value}")

if __name__ == "__main__":
    # Create a shared resource
    resource = multiprocessing.Value("i", 0)

    # Create a lock to synchronize access to the shared resource
    lock = multiprocessing.Lock()

    # Start the worker processes
    processes = [multiprocessing.Process(target=worker, args=(i, lock, resource)) for i in range(2)]
    for process in processes:
        process.start()

    # Wait for the worker processes to finish
    for process in processes:
        process.join()

# Communcating

There are many different ways to communicate with processes. In this first example we will use a pipe.

In [1]:
import multiprocessing

def worker(conn):
    """A function that runs in a separate process."""
    while True:
        message = conn.recv()
        if message == "close":
            break
        print(f"Worker received message: {message}")
        conn.send(f"Processed: {message}")
    print("Worker closing")

parent_conn, child_conn = multiprocessing.Pipe()

# Start the worker process
process = multiprocessing.Process(target=worker, args=(child_conn,))
process.start()

# Send messages to the worker process
for message in ["hello", "world", "close"]:
    parent_conn.send(message)
    if message != "close":
        response = parent_conn.recv()
        print(f"Parent received response: {response}")

# Wait for the worker process to finish
process.join()

Worker received message: hello
Worker received message: world
Worker closing
Parent received response: Processed: hello
Parent received response: Processed: world


This program creates a pair of connected pipes using the Pipe function, and then starts a worker process that runs the worker function. The main process sends messages to the worker process using the send method, and receives responses using the recv method. The worker process listens for incoming messages using the recv method and sends responses using the send method. The program terminates when the main process sends the message "close" to the worker process.


Another, more fleixble, option is using queues.

In [1]:
import multiprocessing
import time
import random
def worker(worker_id, queue):
    """A function that runs in a separate process."""
    while True:
        message = queue.get()
        if message == "close":
            break
        time.sleep(random.randint(0,4))
        print(f"Processed by worker {worker_id}: {message}")
    print(f"Worker {worker_id} closing")

queue = multiprocessing.Queue()

if __name__ == "__main__":
    # Start the worker processes
    processes = [multiprocessing.Process(target=worker, args=(i, queue)) for i in range(3)]
    for process in processes:
        process.start()

    # Send messages to the worker processes
    for message in ["hello", "world", "close","close","close"]:
        queue.put(message)


    # Wait for the worker processes to finish
    for process in processes:
        process.join()


Processed by worker 1: worldWorker 2 closing

Worker 1 closing
Processed by worker 0: hello
Worker 0 closing


In [None]:
import multiprocessing
import time

def worker_function(number):
    """A function that takes a long time to run."""
    return number * 2

if __name__ == "__main__":
    # Create a pool of processes
    with multiprocessing.Pool(processes=2) as pool:

        # Execute the worker_function with multiple inputs
        results = pool.map(worker_function, [5, 2, 1, 3])

    # Print the results
    for result in results:
        print(f"Result: {result}")

## Futures

In this example, the concurrent.futures.ProcessPoolExecutor is used to execute the worker_function asynchronously using multiple processes. The executor.submit method is used to submit the function to be executed with each of the input values, and the results are stored as Future objects in a list. The concurrent.futures.wait method is used to wait for all futures to finish. The future.result method is used to retrieve the result from each Future object.





In [None]:
import concurrent.futures
import time

def worker_function(number):
    """A function that takes a long time to run."""
    print(f"worker_function({number}) starting")
    time.sleep(number)
    print(f"worker_function({number}) finished")
    return number

if __name__ == "__main__":
    # Create a list of input values
    inputs = [5, 2, 1, 3]

    # Use the ProcessPoolExecutor to execute the worker_function asynchronously using multiple processes
    with concurrent.futures.ProcessPoolExecutor() as executor:
        # Submit the worker_function to be executed with each of the input values
        futures = [executor.submit(worker_function, number) for number in inputs]

        # Wait for all futures to finish
        concurrent.futures.wait(futures)

    # Print the results
    for future in futures:
        result = future.result()
        print(f"Result: {result}")
