# Parallel processing via the `multiprocessing` module

CPUs with multiple cores have become the standard in the recent development of modern computer architectures and we can not only find them in supercomputer facilities but also in our desktop machines at home, and our laptops; even Apple's iPhone 5S got a 1.3 Ghz Dual-core processor in 2013.

However, the default Python interpreter was designed with simplicity in mind and has a thread-safe mechanism, the so-called "GIL" (Global Interpreter Lock). In order to prevent conflicts between threads, it executes only one statement at a time (so-called serial processing, or single-threading).

In this introduction to Python's `multiprocessing` module, we will see how we can spawn multiple subprocesses to avoid some of the GIL's disadvantages.

<br>
<br>

## Sections

- [An introduction to parallel programming using Python's `multiprocessing` module](#An-introduction-to-parallel-programming-using-Python's-`multiprocessing`-module)
    - [Multi-Threading vs. Multi-Processing](#Multi-Threading-vs.-Multi-Processing)
- [Introduction to the `multiprocessing` module](#Introduction-to-the-multiprocessing-module)
    - [The `Process` class](#The-Process-class)
        - [How to retrieve results in a particular order](#How-to-retrieve-results-in-a-particular-order)
    - [The `Pool` class](#The-Pool-class)
- [Kernel density estimation as benchmarking function](#Kernel-density-estimation-as-benchmarking-function)
    - [The Parzen-window method in a nutshell](#The-Parzen-window-method-in-a-nutshell)
    - [Sample data and `timeit` benchmarks](#Sample-data-and-timeit-benchmarks)
    - [Benchmarking functions](#Benchmarking-functions)
    - [Preparing the plotting of the results](#Preparing-the-plotting-of-the-results)
- [Results](#Results)
- [Conclusion](#Conclusion)

<br>
<br>

###  Multi-Threading vs. Multi-Processing

Depending on the application, two common approaches in parallel programming are either to run code via threads or multiple processes, respectively. If we submit "jobs" to different threads, those jobs can be pictured as "sub-tasks" of a single process and those threads will usually have access to the same memory areas (i.e., shared memory). This approach can easily lead to conflicts in case of improper  synchronization, for example, if processes are writing to the same memory location at the same time.  

A safer approach (although it comes with an additional overhead due to the communication overhead between separate processes) is to submit multiple processes to completely separate memory locations (i.e., distributed memory): Every process will run completely independent from each other.

Here, we will take a look at Python's [`multiprocessing`](https://docs.python.org/dev/library/multiprocessing.html) module and how we can use it to submit multiple processes that can run independently from each other in order to make best use of our CPU cores.

![](https://raw.githubusercontent.com/rasbt/python_reference/master/Images/multiprocessing_scheme.png)

<br>
<br>

# Introduction to the `multiprocessing` module

[[back to top](#Sections)]

The [multiprocessing](https://docs.python.org/dev/library/multiprocessing.html) module in Python's Standard Library has a lot of powerful features. If you want to read about all the nitty-gritty tips, tricks, and details, I would recommend to use the [official documentation](https://docs.python.org/dev/library/multiprocessing.html) as an entry point.  

In the following sections, I want to provide a brief overview of different approaches to show how the `multiprocessing` module can be used for parallel programming.

<br>
<br>

### The `Process` class

[[back to top](#Sections)]

The most basic approach is probably to use the `Process` class from the `multiprocessing` module.  
Here, we will use a simple queue function to generate four random strings in parallel.

In [1]:
%%file rand_string_.py

import random
import string
import os

def rand_string(length, output):
    """ Generates a random string of numbers, lower- and uppercase chars. """
    print('parent process:', os.getppid())
    print('process id:', os.getpid())
    rand_str = ''.join(random.choice(
                        string.ascii_lowercase 
                        + string.ascii_uppercase 
                        + string.digits)
                   for i in range(length))
    output.put(rand_str)

Overwriting rand_string_.py


In [2]:
import rand_string_

In [3]:
import multiprocessing as mp
import random
import string

random.seed(123)

# Define an output queue
output = mp.Queue()


# Setup a list of processes that we want to run
processes = [mp.Process(target=rand_string_.rand_string, args=(5, output)) \
             for x in range(4)]

# Run processes
for p in processes:
    p.start()

# Exit the completed processes
for p in processes:
    p.join()

# Get process results from the output queue
results = [output.get() for p in processes]

print(results)

['Y3S9B', '8IAV3', 'TpKIu', 'B5A7y']


1) Подсчитать, сколько раз встречается каждая из русских букв в текстовом файле (заглавные и строчные символы не различаются). Применить функцию к файлу 'Tolstoy Lev. Voyna i mir. Kniga 1 - BooksCafe.Net.txt' 

In [36]:
rus_symb = set(chr(symb) for symb in range(ord('а'), ord('я')+1))
# rus_symb

In [35]:
from collections import Counter
with open("Tolstoy Lev. Voyna i mir. Kniga 1 - BooksCafe.Net.txt") as f:
    c = Counter()
    for i in f:
        c += Counter(s for s in i.lower() if s in rus_symb)
c

Counter({'а': 97839,
         'б': 20531,
         'в': 52779,
         'г': 23341,
         'д': 35378,
         'е': 95018,
         'ж': 12046,
         'з': 20113,
         'и': 75749,
         'й': 13403,
         'к': 39920,
         'л': 58155,
         'м': 34933,
         'н': 75573,
         'о': 131184,
         'п': 29287,
         'р': 51223,
         'с': 61050,
         'т': 66898,
         'у': 32184,
         'ф': 2227,
         'х': 9985,
         'ц': 4090,
         'ч': 16468,
         'ш': 11266,
         'щ': 3336,
         'ъ': 521,
         'ы': 22020,
         'ь': 23191,
         'э': 3600,
         'ю': 7581,
         'я': 26303})

In [12]:
from collections import Counter
import re
import itertools as itt

In [2]:
def count_letters(file_name):
    with open(file_name) as file:
        text = file.read().lower()
    txts = re.findall("[а-яА-ЯёЁ]", text)
    return Counter(txts)

In [10]:
def count_letters2(file_name):
    rus_symb = set(chr(symb) for symb in range(ord('а'), ord('я') + 1))
    with open(file_name, encoding='cp1251') as file:
        cnt = Counter(itt.chain.from_iterable((c for c in line.lower() if c in rus_symb) for line in file))
    return cnt

In [38]:
%%time
count_letters2("text//" + 'Tolstoy Lev. Voyna i mir. Kniga 1 - BooksCafe.Net.txt')

Wall time: 398 ms


Counter({'с': 61050,
         'п': 29287,
         'а': 97839,
         'и': 75749,
         'б': 20531,
         'о': 131184,
         'ч': 16468,
         'т': 66898,
         'к': 39920,
         'л': 58155,
         'н': 75573,
         'г': 23341,
         'у': 32184,
         'в': 52779,
         'е': 95018,
         'й': 13403,
         'э': 3600,
         'р': 51223,
         'ж': 12046,
         'д': 35378,
         'х': 9985,
         'ф': 2227,
         'м': 34933,
         'я': 26303,
         'ы': 22020,
         'ь': 23191,
         'з': 20113,
         'ю': 7581,
         'щ': 3336,
         'ш': 11266,
         'ц': 4090,
         'ъ': 521})

2) Подсчитать, сколько раз встречается каждая из русских букв во всех  текстовых файлах, лежащих в папке 'text'. При реализации выделить фукнцию, которая суммирует результаты обработки отдельных файлов. Определить за каое время решается задача для всех файлов из папки 'text'.

In [13]:
%%timeit -n1

path = 'text//'
onlyfiles = [f for f in listdir(path) if isfile(join(path, f))]
count_lst = [count_letters2(join(path, filename)) for filename in onlyfiles]
b = reduce(op.add, count_lst)


3.22 s ± 45.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


3) Решить задачу 2, распараллелив вычисления с помощью модуля multiprocessing. Для обарботки каждого файла создать свой собственный процес. Определить за каое время решается задача для всех файлов из папки 'text'.

In [8]:
%%file counter.py
from collections import Counter
from itertools import chain

def counting(file_name, output):
    rus = set(chr(symb) for symb in range(ord('а'), ord('я') + 1))
    with open(file_name, encoding='cp1251') as file:
        cnt = Counter(chain.from_iterable((c for c in line.lower() if c in rus) for line in file))
    output.put(cnt)


Overwriting counter.py


In [9]:
from functools import reduce
import operator as op
from os import listdir
from os.path import isfile, join

In [10]:
import counter
import multiprocessing as mp
onlyfiles = [join('text/', f) for f in listdir('text/') if isfile( join('text/', f))]

In [11]:
%%time

output = mp.Queue()

processes = [mp.Process(target=counter.counting, args=(onlyfiles[i], output)) \
             for i in range(len(onlyfiles))]

for p in processes:
    #print('Kek')
    p.start()
    
for p in processes:
    #print('lol')
    p.join()

results = reduce(op.add, [output.get() for p in processes])

print(results)


Counter({'о': 1117484, 'е': 856915, 'а': 784852, 'и': 659252, 'н': 643029, 'т': 610901, 'с': 528249, 'в': 468208, 'л': 461034, 'р': 414101, 'к': 322315, 'м': 305385, 'д': 299571, 'у': 269110, 'п': 261291, 'я': 212182, 'ь': 197395, 'г': 183767, 'ы': 174732, 'б': 169798, 'ч': 166946, 'з': 159415, 'ж': 107274, 'й': 103544, 'ш': 85560, 'х': 79354, 'ю': 61580, 'ц': 32633, 'э': 32590, 'щ': 29041, 'ф': 18419, 'ъ': 2961})
Wall time: 1.48 s


4) Решить задачу 2, распараллелив вычисления с помощью модуля multiprocessing. Создать фиксированное количество процессов (равное количеству ядер на компьютере). При помощи очереди передать задачи процессам и при помощи другой очереди забрать от них ответы. Определить за каое время решается задача для всех файлов из папки 'text'.

In [12]:
%%file counter.py
from collections import Counter
from itertools import chain

def counting(queue, output):
    
    while True:
        job = queue.get()
        if job == None:
            break
        
        rus = set(chr(symb) for symb in range(ord('а'), ord('я') + 1))
        with open(job, encoding='cp1251') as file:
            cnt = Counter(chain.from_iterable((c for c in line.lower() if c in rus) for line in file))
        output.put(cnt)


Overwriting counter.py


In [64]:
from functools import reduce
import operator as op
from os import listdir
from os.path import isfile, join
import counter
import multiprocessing as mp
onlyfiles = [join('text/', f) for f in listdir('text/') if isfile( join('text/', f))]

In [14]:
%%time
output = mp.Queue()   #ответы
queue = mp.Queue() #задания 
CPU = mp.cpu_count()

for i in onlyfiles:
    queue.put(i)
for i in range(CPU):
    queue.put(None)
    
processes = [mp.Process(target=counter.counting, args=(queue, output)) \
             for i in range(CPU)]

for p in processes:
    p.start()
    
for p in processes:
    p.join()

results = reduce(op.add, [output.get() for p in processes])

print(results)


Counter({'о': 358930, 'е': 281373, 'а': 252289, 'н': 209288, 'и': 207716, 'т': 202585, 'с': 169671, 'в': 151952, 'л': 145688, 'р': 131793, 'к': 102011, 'м': 100570, 'д': 97678, 'у': 87975, 'п': 87166, 'я': 70697, 'ь': 68091, 'ч': 58455, 'г': 57277, 'ы': 54443, 'б': 54019, 'з': 51424, 'ж': 35736, 'й': 32223, 'ш': 26357, 'х': 24943, 'ю': 19122, 'э': 10849, 'ц': 9465, 'щ': 9421, 'ф': 6015, 'ъ': 790})
Wall time: 1.41 s


5) Решить задачу 2, распараллелив вычисления с помощью модуля multiprocessing. Решить задачу при помощи Pool. Определить за каое время решается задача для всех файлов из папки 'text'.

In [61]:
%%file counter.py
from collections import Counter
from itertools import chain

def counting(file_name):
    rus = set(chr(symb) for symb in range(ord('а'), ord('я') + 1))
    with open(file_name, encoding='cp1251') as file:
        cnt = Counter(chain.from_iterable((c for c in line.lower() if c in rus) for line in file))
    return cnt


Overwriting counter.py


In [63]:
import counter

In [65]:
onlyfiles = [join('text/', f) for f in listdir('text/') if isfile( join('text/', f))]

In [67]:
%%time
pool = mp.Pool(mp.cpu_count())
result = [pool.apply_async(counter.counting, args=(i,)) for i in onlyfiles ]
pool.close()
pool.join()
result = reduce(op.add, [i.get() for i in result])
print(result)



Counter({'о': 1117484, 'е': 856915, 'а': 784852, 'и': 659252, 'н': 643029, 'т': 610901, 'с': 528249, 'в': 468208, 'л': 461034, 'р': 414101, 'к': 322315, 'м': 305385, 'д': 299571, 'у': 269110, 'п': 261291, 'я': 212182, 'ь': 197395, 'г': 183767, 'ы': 174732, 'б': 169798, 'ч': 166946, 'з': 159415, 'ж': 107274, 'й': 103544, 'ш': 85560, 'х': 79354, 'ю': 61580, 'ц': 32633, 'э': 32590, 'щ': 29041, 'ф': 18419, 'ъ': 2961})
Wall time: 3.14 s


In [10]:
pool = mp.Pool(processes=4)
results = [pool.apply(cube_.cube, args=(x,)) for x in range(1,7)]
print(results)

[1, 8, 27, 64, 125, 216]


6) Распараллелиьть задачу один при помощи Pool. Сравнить время параллельного и последовательного решения задачи. 

In [28]:
%%file counter.py
from collections import Counter
from itertools import chain

def counting(slices):
    rus = set(chr(symb) for symb in range(ord('а'), ord('я') + 1))
    cnt = Counter((c for c in slices if c in rus))
    return cnt

Overwriting counter.py


In [49]:
from collections import Counter

def counting(slices):
    rus = set(chr(symb) for symb in range(ord('а'), ord('я') + 1))
    cnt = Counter((c for c in slices if c in rus))
    return cnt
                  
counting("абфбыафыбафыбабфыабфыабфыабфыба")

Counter({'а': 8, 'б': 9, 'ф': 7, 'ы': 7})

In [55]:
%%time
counting(file[b*3 : b*4])

Wall time: 91 ms


Counter({'е': 24598,
         'р': 12346,
         'а': 26541,
         'н': 19490,
         'с': 15184,
         'к': 9919,
         'и': 18708,
         'й': 3583,
         'з': 5067,
         'л': 14560,
         'о': 32668,
         'в': 12761,
         'т': 17341,
         'г': 5694,
         'д': 8923,
         'я': 6749,
         'ш': 3227,
         'у': 7790,
         'п': 7104,
         'х': 2495,
         'м': 9167,
         'э': 966,
         'ч': 4414,
         'ж': 3194,
         'ц': 810,
         'ь': 6079,
         'б': 5385,
         'ы': 5484,
         'ю': 2100,
         'щ': 798,
         'ф': 483,
         'ъ': 116})

In [29]:
import counter

In [30]:
import multiprocessing as mp

In [31]:
file_name = "text//Tolstoy Lev. Voyna i mir. Kniga 1 - BooksCafe.Net.txt"
file = open(file_name, 'r', encoding='cp1251').read().lower()


In [58]:
%%time
cpu = mp.cpu_count()
pool = mp.Pool(cpu)
b = len(file) // cpu


for i in range(cpu):
    pool.apply_async(counter.counting, args =(file[ (b * i) : (b * (i + 1))],))

#results = reduce(op.add, [i.get() for i in results])
print(reduce(op.add,results))

Counter({'о': 131184, 'а': 97839, 'е': 95018, 'и': 75749, 'н': 75573, 'т': 66898, 'с': 61050, 'л': 58155, 'в': 52779, 'р': 51223, 'к': 39920, 'д': 35378, 'м': 34933, 'у': 32184, 'п': 29287, 'я': 26303, 'г': 23341, 'ь': 23191, 'ы': 22020, 'б': 20531, 'з': 20113, 'ч': 16468, 'й': 13403, 'ж': 12046, 'ш': 11266, 'х': 9985, 'ю': 7581, 'ц': 4090, 'э': 3600, 'щ': 3336, 'ф': 2227, 'ъ': 521})
Wall time: 106 ms


In [None]:
%%file counter.py
from collections import Counter
from itertools import chain

def counting(slices, output):
    rus = set(chr(symb) for symb in range(ord('а'), ord('я') + 1))
    cnt = Counter(chain.from_iterable((c for c in slices if c in rus)))
    output.put(cnt)

In [None]:
import counter

<br>
<br>

### How to retrieve results in a particular order 

[[back to top](#Sections)]

The order of the obtained results does not necessarily have to match the order of the processes (in the `processes` list). Since we eventually use the `.get()` method to retrieve the results from the `Queue` sequentially, the order in which the processes finished determines the order of our results.  
E.g., if the second process has finished just before the first process, the order of the strings in the `results` list could have also been
`['PQpqM', 'yzQfA', 'SHZYV', 'PSNkD']` instead of `['yzQfA', 'PQpqM', 'SHZYV', 'PSNkD']`

If our application required us to retrieve results in a particular order, one possibility would be to refer to the processes' `._identity` attribute. In this case, we could also simply use the values from our `range` object as position argument. The modified code would be:

In [4]:
%%file rand_string_2.py

import random
import string

def rand_string(length, pos, output):
    """ Generates a random string of numbers, lower- and uppercase chars. """
    rand_str = ''.join(random.choice(
                        string.ascii_lowercase 
                        + string.ascii_uppercase 
                        + string.digits)
                   for i in range(length))
    output.put((pos, rand_str))

Writing rand_string_2.py


In [5]:
import rand_string_2

In [6]:
# Define an output queue
output = mp.Queue()

# define a example function
# def rand_string(length, pos, output):
#     """ Generates a random string of numbers, lower- and uppercase chars. """
#     rand_str = ''.join(random.choice(
#                         string.ascii_lowercase 
#                         + string.ascii_uppercase 
#                         + string.digits)
#                    for i in range(length))
#     output.put((pos, rand_str))

# Setup a list of processes that we want to run
processes = [mp.Process(target=rand_string_2.rand_string, args=(5, x, output)) for x in range(4)]

# Run processes
for p in processes:
    p.start()

# Exit the completed processes
for p in processes:
    p.join()

# Get process results from the output queue
results = [output.get() for p in processes]

print(results)

[(0, 'uCIwk'), (2, 'ELkQg'), (1, 'wAM9T'), (3, 'uRmzN')]


And the retrieved results would be tuples, for example, `[(0, 'KAQo6'), (1, '5lUya'), (2, 'nj6Q0'), (3, 'QQvLr')]`   
or `[(1, '5lUya'), (3, 'QQvLr'), (0, 'KAQo6'), (2, 'nj6Q0')]`

To make sure that we retrieved the results in order, we could simply sort the results and optionally get rid of the position argument:

In [7]:
results.sort()
results = [r[1] for r in results]
print(results)

['uCIwk', 'wAM9T', 'ELkQg', 'uRmzN']


**A simpler way to maintain an ordered list of results is to use the `Pool.apply` and `Pool.map` functions which we will discuss in the next section.**

<br>
<br>

### The `Pool` class

[[back to top](#Sections)]

Another and more convenient approach for simple parallel processing tasks is provided by the `Pool` class.  

There are four methods that are particularly interesting:

    - Pool.apply
    
    - Pool.map
    
    - Pool.apply_async
    
    - Pool.map_async
    
The `Pool.apply` and `Pool.map` methods are basically equivalents to Python's in-built [`apply`](https://docs.python.org/2/library/functions.html#apply) and [`map`](https://docs.python.org/2/library/functions.html#map) functions.

Before we come to the `async` variants of the `Pool` methods, let us take a look at a simple example using `Pool.apply` and `Pool.map`. Here, we will set the number of processes to 4, which means that the `Pool` class will only allow 4 processes running at the same time.

In [8]:
%%file cube_.py

def cube(x):
    return x**3

Writing cube_.py


In [9]:
import cube_

In [11]:
pool = mp.Pool(processes=4)
results = pool.map(cube_.cube, range(1,7))
print(results)

[1, 8, 27, 64, 125, 216]


The `Pool.map` and `Pool.apply` will lock the main program until all processes are finished, which is quite useful if we want to obtain results in a particular order for certain applications.   
In contrast, the `async` variants will submit all processes at once and retrieve the results as soon as they are finished. 
One more difference is that we need to use the `get` method after the `apply_async()` call in order to obtain the `return` values of the finished processes.

In [12]:
pool = mp.Pool(processes=4)
results = [pool.apply_async(cube_.cube, args=(x,)) for x in range(1,7)]
output = [p.get() for p in results]
print(output)

[1, 8, 27, 64, 125, 216]


<br>
<br>

1) Реализовать параллельную версию операции умножения двух матриц, хранящихся в массивах numpy. При реализации использовать multiprocesing Pool. Для повышения эффективности операции можно предварительно преобразовать способ хранения информации. Определить длительность выполнения операции для размера матрицы $N = 2^7, 2^8, 2^9$. Проверить равенство результата с результатами `numpy.matmul`. 