# "Notes about `multiprocessing` in Python"

- toc: false
- badges: true
- categories: [coding, python, parallelism, thread, process]
- comments: true

## GIL, threading, multiprocessing

GIL (Global Interpreter Lock) assures that only one thread executes Python bytecode in a process at a time. Therefore, multithreading program achieved by the `threading` module cannot fully utilize multi-core manchines. To write a parallel program in Python, the `multiprocessing` module should be used.

> Since GIL is always released when doing I/O, `threading` is still an appropriate module if we want to run multiple I/O-bound tasks simultaneously.


## Process

It is quite easy to create a new process in Python. We basically pass a target function and all arguments needed by the invocation to the `Process` object. Use `start()` to start the process's activity (i.e. the invocation of the target function). Use `join()` to block the parent process until the child process exits.

In [26]:
from multiprocessing import Process
import time

def worker(s):
    print('Start sleeping for 2 seconds...')
    time.sleep(2)
    print('Sleeping ends.')
    print(s)

p = Process(target=worker, args=('Hello, world.',))
p.start()
print('before join(), the parent process is not blocked')
p.join()
print('After join()')

Start sleeping for 2 seconds...
before join(), the parent process is not blocked
Sleeping ends.
Hello, world.
After join()


If an exception happens in the child process, the stack trace will also be printed in the console.

In [27]:
from multiprocessing import Process
import time

def worker(s):
    print(2 / 0)

p = Process(target=worker, args=('Hello, world.',))
p.start()
p.join()

Process Process-99:
Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-27-8d12741476dc>", line 5, in worker
    print(2 / 0)
ZeroDivisionError: division by zero


## Process Pool

We can also create a pool of processes. The following snippet creates a process pool of size 4, and we put 8 tasks into the pool.

In [28]:
from multiprocessing import Pool
import time
import random

def worker(i):
    time.sleep(random.random())
    print(i)

p = Pool(4)
for i in range(8):
    p.apply_async(worker, args=(i,))
p.close()
p.join()

0
4
2
3
1
7
6
5


When invoking `apply_async()`, a new task is submitted to the pool and the task will start once there is an empty slot in the pool. `Pool` also has a synchronized version of this method, `apply()`, which will block the parent process until the result is ready. Therefore, for parallelism, `apply_async()` should always be used. We also used `join()` method to block the parent process after we finished adding all tasks to the pool. If we do not block the parent process, **parent process might exit before child processes finish their tasks**. That's why we always use `join()` in the parent process to avoid this situation. Note that `close()` must be called before using `join()`.

Unlike `Process`, stack trace of exceptions in `Pool` will not be printed in the console, but we might need that when debugging a multiprocessing program. We can pass an error callback function to the `apply_async()` method, and that function will be used to print stack trace if exceptions happen. Note that the callback functions will block the parent process, so, do not put time-consuming tasks in the callback function since it should complete immediately. The error callback function only takes one argument and that is the exception instance.

In [50]:
from multiprocessing import Pool
import traceback

def worker(i):
    print(i / 0)

def print_traceback(e):
    print(traceback.print_exception(type(e), e, e.__traceback__))

p = Pool(1)
for i in range(2):
    p.apply_async(worker, args=(i,), error_callback=print_traceback)
p.close()
p.join()

None
None
multiprocessing.pool.RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
  File "<ipython-input-50-90874a7b31e2>", line 5, in worker
    print(i / 0)
ZeroDivisionError: division by zero
"""

The above exception was the direct cause of the following exception:

ZeroDivisionError: division by zero
multiprocessing.pool.RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
  File "<ipython-input-50-90874a7b31e2>", line 5, in worker
    print(i / 0)
ZeroDivisionError: division by zero
"""

The above exception was the direct cause of the following exception:

ZeroDivisionError: division by zero


## Communication Between Processes

I usually just use `Queue` for communication between processes, even if there are only two processes, in which `Pipe` also works. We don't have to worry about race conditions when using queue since queues are **thread and process safe** in Python. We will look at the one-producer-one-consumer case first.

In [31]:
from multiprocessing import Process, Queue
import time, random

def producer(q):
    for i in range(5):
        time.sleep(random.random())
        q.put(i)
    q.put('end') # to notify the consumer process that the producer process has exited
    print('producer exits')

def consumer(q):
    while True:
        product = q.get()
        if product == 'end':
            break
        print(product)
    print('consumer exits')

q = Queue()
p1 = Process(target=producer, args=(q,))
p2 = Process(target=consumer, args=(q,))
p1.start()
p2.start()
p1.join()
p2.join()

0
1
2
3
producer exits4

consumer exits


There might be race conditions on invocations of `print()`, so multiple lines could be shown in the same line. But other than that, everything should be expected. Note that the `get()` method of `Queue` will block the child process if no item in the queue is immediately available. We can avoid blocking by set argument `block` to `Fasle` or simply using `get_nowait()`, but it will raise an exception if it cannot get an item immediately. If we want to avoid blocking for too long, we can pass a `timeout` argument to `get()`. If the size of a queue is limited, the same approach can also be applied to the `put()` method.

We now look at the multi-producer-multi-consumer case, where we will use `Pool`.

In [56]:
from multiprocessing import Pool, Queue
import time, random

def print_traceback(e):
    print(traceback.print_exception(type(e), e, e.__traceback__))

def producer(q, producer_id):
    for i in range(5):
        time.sleep(random.random())
        q.put(f'product {i} from producer {producer_id}')
    q.put('end') # to notify the consumer process that the producer process has exited
    print(f'producer {producer_id} exits')

def consumer(q, consumer_id):
    while True:
        product = q.get()
        if product == 'end':
            break
        print(f'{product} consumed by consumer {consumer_id}')
    print(f'consumer {consumer_id} exits')

q = Queue()
producer_count = 2
consumer_count = 2
producer_pool = Pool(producer_count)
consumer_pool = Pool(consumer_count)
for i in range(producer_count):
    producer_pool.apply_async(producer, args=(q, i), error_callback=print_traceback)
for i in range(consumer_count):
    consumer_pool.apply_async(consumer, args=(q, i), error_callback=print_traceback)
producer_pool.close()
consumer_pool.close()
producer_pool.join()
consumer_pool.join()

None
None
None
None
Traceback (most recent call last):
Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 537, in _handle_tasks
    put(task)
  File "/usr/lib/python3.8/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/usr/lib/python3.8/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
  File "/usr/lib/python3.8/multiprocessing/queues.py", line 58, in __getstate__
    context.assert_spawning(self)
  File "/usr/lib/python3.8/multiprocessing/context.py", line 359, in assert_spawning
    raise RuntimeError(
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 537, in _handle_tasks
    put(task)
  File "/usr/lib/python3.8/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/usr/lib/python3.8/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
  File "/usr/lib/python3.8/mu

We are getting `RuntimeError`, and it says `Queue objects should only be shared between processes through inheritance`. This happens if we try to use `multiprocessing.Queue` in a process pool. As for solution, we should use `multiprocessing.Manager.Queue` among pool workers instead.

In [65]:
from multiprocessing import Pool, Manager
import time, random

def print_traceback(e):
    print(traceback.print_exception(type(e), e, e.__traceback__))

def producer(q, producer_id):
    for i in range(5):
        time.sleep(random.random())
        q.put(f'product {i} from producer {producer_id}')
    q.put('end') # to notify the consumer process that a producer process has exited
    print(f'producer {producer_id} exits')

def consumer(q, consumer_id):
    while True:
        product = q.get()
        if product == 'end':
            break
        print(f'{product} consumed by consumer {consumer_id}')
    print(f'consumer {consumer_id} exits')

manager = Manager()
q = manager.Queue()
producer_count = 2
consumer_count = 2
producer_pool = Pool(producer_count)
consumer_pool = Pool(consumer_count)
for i in range(producer_count):
    producer_pool.apply_async(producer, args=(q, i), error_callback=print_traceback)
for i in range(consumer_count):
    consumer_pool.apply_async(consumer, args=(q, i), error_callback=print_traceback)
producer_pool.close()
consumer_pool.close()
producer_pool.join()
consumer_pool.join()

product 0 from producer 1 consumed by consumer 0
product 0 from producer 0 consumed by consumer 1
product 1 from producer 1 consumed by consumer 0
product 1 from producer 0 consumed by consumer 1
product 2 from producer 0 consumed by consumer 0
product 2 from producer 1 consumed by consumer 1
product 3 from producer 0 consumed by consumer 0
product 3 from producer 1 consumed by consumer 1
consumer 1 exitsproduct 4 from producer 1 consumed by consumer 0

producer 1 exits
product 4 from producer 0 consumed by consumer 0
producer 0 exitsconsumer 0 exits



## Lock

The above snippet has several flaws. Firstly, invocations of `print()` in different processes has race conditions, which could mess up the printed text in the console. Also, a consumer exits immediately when it receives an `end` signal, which is not desired since there might be products from one producer enqueued after the `end` signal from another producer. We want consumers to exit only if all producers have exited.

To handle the `print()` problem, we need a lock to avoid race conditions. To know whether all producers have exited or not, we need a shared value among consumers, which counts the number of exited producers. This shared value will be increased by 1 every time a consumer receives an `end` signal.

In [69]:
from multiprocessing import Pool, Manager, Value
import time, random

def print_traceback(e):
    print(traceback.print_exception(type(e), e, e.__traceback__))

def producer(q, producer_id, print_lock):
    for i in range(5):
        time.sleep(random.random())
        q.put(f'product {i} from producer {producer_id}')
    q.put('end') # to notify the consumer process that a producer process has exited
    with print_lock:
        print(f'producer {producer_id} exits')

def consumer(q, consumer_id, producer_count, print_lock, exited_producer_count, count_lock):
    while True:
        try:
            product = q.get_nowait()
        except:
            if exited_producer_count.value == producer_count:
                # all producers have exited
                break
            else:
                continue
        if product == 'end':
            with count_lock:
                exited_producer_count.value += 1
            continue
        with print_lock:
            print(f'{product} consumed by consumer {consumer_id}')
    with print_lock:
        print(f'consumer {consumer_id} exits')

manager = Manager()
q = manager.Queue()
exited_producer_count = manager.Value('i', 0) # type code 'i' means 'signed int' in C type
print_lock = manager.Lock()
count_lock = manager.Lock()
producer_count = 2
consumer_count = 2
producer_pool = Pool(producer_count)
consumer_pool = Pool(consumer_count)
for i in range(producer_count):
    producer_pool.apply_async(producer, args=(q, i, print_lock), error_callback=print_traceback)
for i in range(consumer_count):
    consumer_pool.apply_async(consumer, args=(q, i, producer_count, print_lock, exited_producer_count, count_lock), error_callback=print_traceback)
producer_pool.close()
consumer_pool.close()
producer_pool.join()
consumer_pool.join()

product 0 from producer 1 consumed by consumer 0
product 0 from producer 0 consumed by consumer 0
product 1 from producer 1 consumed by consumer 0
product 2 from producer 1 consumed by consumer 1
product 1 from producer 0 consumed by consumer 0
product 3 from producer 1 consumed by consumer 0
product 2 from producer 0 consumed by consumer 0
product 4 from producer 1 consumed by consumer 0
producer 1 exits
product 3 from producer 0 consumed by consumer 1
producer 0 exits
product 4 from producer 0 consumed by consumer 0
consumer 1 exits
consumer 0 exits


We defined two locks. The `print_lock` is acquired before each `print()` invocation. Another lock `count_lock` ensures that the increment of `exited_producer_count` is atomic. Same as `Queue`, we have to use `multiprocessing.Manager.Value` instead of `multiprocessing.Value` in a process pool. As shown in the output, each `print()` invocation takes one line exclusively, and consumers exited only if all producers have exited.

The following snipppet will demonstrate race conditions between processes.

In [93]:
from multiprocessing import Manager, Pool, Value

def print_traceback(e):
    print(traceback.print_exception(type(e), e, e.__traceback__))

def add(counter):
    for _ in range(100):
        counter.value += 1

def sub(counter):
    for _ in range(100):
        counter.value -= 1

manager = Manager()
counter = manager.Value('i', 0)
add_pool = Pool(4)
add_pool.starmap_async(add, [(counter,) for _ in range(4)], error_callback=print_traceback)
sub_pool = Pool(2)
sub_pool.starmap_async(sub, [(counter,) for _ in range(2)], error_callback=print_traceback)
add_pool.close()
sub_pool.close()
add_pool.join()
sub_pool.join()
print(counter.value)

49


We have four `add` wokers and two `sub` wokers. Since each worker will change the shared value by +100/-100, we should have 200 as the final result. However, the result is not what we expected because race conditions happened.

We wrap the augmented assignment with a `Manager.Lock` in the following code, which gives us the correct output. Apart from `with` statements we used previously, we can also explictly call `acquire()` and `release()` method of `Lock`.

In [95]:
from multiprocessing import Manager, Pool, Value

def print_traceback(e):
    print(traceback.print_exception(type(e), e, e.__traceback__))

def add(counter, counter_lock):
    for _ in range(100):
        counter_lock.acquire()
        counter.value += 1
        counter_lock.release()

def sub(counter, counter_lock):
    for _ in range(100):
        counter_lock.acquire()
        counter.value -= 1
        counter_lock.release()

manager = Manager()
counter = manager.Value('i', 0)
counter_lock = manager.Lock()
add_pool = Pool(4)
add_pool.starmap_async(add, [(counter, counter_lock) for _ in range(4)], error_callback=print_traceback)
sub_pool = Pool(2)
sub_pool.starmap_async(sub, [(counter, counter_lock) for _ in range(2)], error_callback=print_traceback)
add_pool.close()
sub_pool.close()
add_pool.join()
sub_pool.join()
print(counter.value)

200
