Multiprocessing

In [None]:
'''Create and run processes
You create a process with multiprocessing.Process(). It takes two important arguments:

target: a callable object (function) for this process to be invoked when the process starts
args: the (function) arguments for the target function. This must be a tuple
Start a process with process.start()

Call process.join() to tell the program that it should wait for this process to complete before it continues with the rest of the code.'''

from multiprocessing import Process
import os

def square_numbers():
    for i in range(1000):
        result = i * i

        
if __name__ == "__main__":        
    processes = []
    num_processes = os.cpu_count()
    # number of CPUs on the machine. Usually a good choise for the number of processes

    # create processes and asign a function for each process
    for i in range(num_processes):
        process = Process(target=square_numbers)
        processes.append(process)

    # start all processes
    for process in processes:
        process.start()

    # wait for all processes to finish
    # block the main programm until these processes are finished
    for process in processes:
        process.join()

Share data between processes
Since processes don't live in the same memory space, they do not have access to the same (public) data. Thus, they need special shared memory objects to share data.

Data can be stored in a shared memory variable using Value or Array.

Value(type, value): Create a ctypes object of type type. Access the value with .target.
Array(type, value): Create a ctypes array with elements of type type. Access the values with [].
Task: Create two processes, each process should have access to a shared variable and modify it (in this case only increase it repeatedly by 1 for 100 times). Create another two processes that share an array and modify (increase) all the elements in the array.

In [1]:
from multiprocessing import Process, Value, Array
import time

def add_100(number):
    for _ in range(100):
        time.sleep(0.01)
        number.value += 1

def add_100_array(numbers):
    for _ in range(100):
        time.sleep(0.01)
        for i in range(len(numbers)):
            numbers[i] += 1


if __name__ == "__main__":

    shared_number = Value('i', 0) 
    print('Value at beginning:', shared_number.value)

    shared_array = Array('d', [0.0, 100.0, 200.0])
    print('Array at beginning:', shared_array[:])

    process1 = Process(target=add_100, args=(shared_number,))
    process2 = Process(target=add_100, args=(shared_number,))

    #process3 = Process(target=add_100_array, args=(shared_array,))
    #process4 = Process(target=add_100_array, args=(shared_array,))

    process1.start()
    process2.start()
    #process3.start()
    #process4.start()

    process1.join()
    process2.join()
    #process3.join()
    #process4.join()

    print('Value at end:', shared_number.value)
    print('Array at end:', shared_array[:])

    print('end main')

Value at beginning: 0
Array at beginning: [0.0, 100.0, 200.0]
Value at end: 0
Array at end: [0.0, 100.0, 200.0]
end main


In [4]:
# How to use Locks

# import Lock
from multiprocessing import Lock
from multiprocessing import Process, Value, Array
import time

def add_100(number, lock):
    for _ in range(100):
        time.sleep(0.01)
        # lock the state
        lock.acquire()
        
        number.value += 1
        
        # unlock the state
        lock.release()

def add_100_array(numbers, lock):
    for _ in range(100):
        time.sleep(0.01)
        for i in range(len(numbers)):
            with lock:
                numbers[i] += 1


if __name__ == "__main__":

    # create a lock
    lock = Lock()
    
    shared_number = Value('i', 0) 
    print('Value at beginning:', shared_number.value)

    shared_array = Array('d', [0.0, 100.0, 200.0])
    print('Array at beginning:', shared_array[:])

    # pass the lock to the target function
    process1 = Process(target=add_100, args=(shared_number, lock))
    process2 = Process(target=add_100, args=(shared_number, lock))

    process3 = Process(target=add_100_array, args=(shared_array, lock))
    process4 = Process(target=add_100_array, args=(shared_array, lock))

    process1.start()
    process2.start()
    process3.start()
    process4.start()

    process1.join()
    process2.join()
    process3.join()
    process4.join()

    print('Value at end:', shared_number.value)
    print('Array at end:', shared_array[:])

    print('end main')

Value at beginning: 0
Array at beginning: [0.0, 100.0, 200.0]
Value at end: 0
Array at end: [0.0, 100.0, 200.0]
end main


In [None]:
# Use the lock as a context manager
def add_100(number, lock):
    for _ in range(100):
        time.sleep(0.01)
        with lock:
            number.value += 1

In [None]:
# A queue is a linear data structure that follows the First In First Out (FIFO) principle..

from multiprocessing import Queue

# create queue
q = Queue()

# add elements
q.put(1) # 1
q.put(2) # 2 1
q.put(3) # 3 2 1 

# now q looks like this:
# back --> 3 2 1 --> front

# get and remove first element
first = q.get() # --> 1
print(first) 

# q looks like this:
# back --> 3 2 --> front

In [None]:
# Using a queue in multiprocessing

from multiprocessing import Process, Queue

def square(numbers, queue):
    for i in numbers:
        queue.put(i*i)

def make_negative(numbers, queue):
    for i in numbers:
        queue.put(i*- 1)

if __name__ == "__main__":
    
    numbers = range(1, 6)
    q = Queue()

    p1 = Process(target=square, args=(numbers,q))
    p2 = Process(target=make_negative, args=(numbers,q))

    p1.start()
    p2.start()

    p1.join()
    p2.join()

    # order might not be sequential
    while not q.empty():
        print(q.get())
        
    print('end main')

In [None]:
# A Process Pool is a group of worker processes that execute tasks in parallel, managed automatically by Python.
# You submit tasks, and the pool distributes the work across multiple CPU cores.

from multiprocessing import Pool 

def cube(number):
    return number * number * number

    
if __name__ == "__main__":
    numbers = range(10)
    
    p = Pool()

    # by default this allocates the maximum number of available 
    # processors for this task --> os.cpu_count()
    result = p.map(cube,  numbers)
    
    # or 
    # result = [p.apply(cube, args=(i,)) for i in numbers]
    
    p.close()
    p.join()
    
    print(result)