# multiprocessing & parallelization

## 1. introduction

* difficult to apply module (it's hard to see where to apply efectively)
* python is simple threaded and sequential by default

## 2. get advantage of multiple cpus

In [105]:
import multiprocessing

In [107]:
multiprocessing.cpu_count()

8

## 3. processing pool

Let's create a square function and a lot of random numbers...

In [108]:
import numpy as np

def square(x):
    return x * x

data = list(np.random.randint(0, 
                              high=10000, 
                              size=int(1e3)))

Processing it sequentially looks like this:

In [118]:
%%timeit

seq_square = list(map(square, data))

139 µs ± 2.69 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)


Now, let's use multiprocessing module to square all these numbers in parallel...

In [119]:
%%timeit

pool = multiprocessing.Pool(processes=8)
result = pool.map(square, data)
pool.terminate()
pool.join()

144 ms ± 4.23 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


What is happening? -> Inter-Process Communication (https://stackoverflow.com/a/35862168)

Let's repeat the example adding a 0.1 seconds sleep...

In [121]:
import time

def square(x):
    time.sleep(0.01)
    return x * x

In [122]:
%%timeit

seq_square = list(map(square, data))

10.1 s ± 4.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [123]:
%%timeit

pool = multiprocessing.Pool(processes=8)
result = pool.map(square, data)
pool.terminate()
pool.join()

1.34 s ± 8.39 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [128]:
data = np.array(data)

In [129]:
%%timeit

np.square(data)

1.13 µs ± 44.2 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)


## 3. managing filesystem faster with multiprocessing

In [131]:
import os
import itertools

In [132]:
directories = ['./data/directories/' + 
               path for path in os.listdir('./data/directories/')]

Let's find Markdown files the lesson way:

In [137]:
def find_md_file(path):
    files = os.listdir(path)
    md_files = [path + '/' + file 
                for file in files 
                if file.endswith('.md')]
    return md_files

In [138]:
%%timeit

md_files = map(find_md_file, directories)
md_files_flatten = list(itertools.chain.from_iterable(md_files))

22.3 ms ± 268 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [139]:
%%timeit

pool = multiprocessing.Pool()
md_files = pool.map(find_md_file, directories)
pool.terminate()
pool.join()
md_files_flatten = list(itertools.chain.from_iterable(md_files))

143 ms ± 4.53 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


## 5. the best way to do it...

The Pythonist way to do this 😎:

In [140]:
from pathlib import Path

In [142]:
my_directory = Path('./data/')

In [157]:
%%timeit
md_files_flatten = list(my_directory.glob('*/*.md'))

153 µs ± 2.9 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)


In [150]:
md_files_flatten[:10]

[PosixPath('data/directories/dir2/file8.md'),
 PosixPath('data/directories/dir2/file317.md'),
 PosixPath('data/directories/dir2/file194.md'),
 PosixPath('data/directories/dir2/file319.md'),
 PosixPath('data/directories/dir2/file122.md'),
 PosixPath('data/directories/dir2/file206.md'),
 PosixPath('data/directories/dir2/file155.md'),
 PosixPath('data/directories/dir2/file387.md'),
 PosixPath('data/directories/dir2/file350.md'),
 PosixPath('data/directories/dir2/file18.md')]

In [158]:
md_files_flatten[0].stat()

os.stat_result(st_mode=33188, st_ino=1718721, st_dev=2049, st_nlink=1, st_uid=1000, st_gid=1000, st_size=36, st_atime=1576920136, st_mtime=1547756228, st_ctime=1576905265)

## 5. other important concepts about multiprocessing

* spawn:

The parent process starts a fresh python interpreter process. The child process will only inherit those resources necessary to run the process objects run() method. In particular, unnecessary file descriptors and handles from the parent process will not be inherited. Starting a process using this method is rather slow compared to using fork or forkserver.

Available on Unix and Windows. The default on Windows and macOS.
    
* fork:

The parent process uses os.fork() to fork the Python interpreter. The child process, when it begins, is effectively identical to the parent process. All resources of the parent are inherited by the child process. Note that safely forking a multithreaded process is problematic.

Available on Unix only. The default on Unix.

* forkserver

When the program starts and selects the forkserver start method, a server process is started. From then on, whenever a new process is needed, the parent process connects to the server and requests that it fork a new process. The fork server process is single threaded so it is safe for it to use os.fork(). No unnecessary resources are inherited.

Available on Unix platforms which support passing file descriptors over Unix pipes.

## 6. process and queues

In [174]:
from multiprocessing import Process, Queue

def f(q, number):
    q.put(number)

q = Queue()

for i in range(10):
    p = Process(target=f, args=(q, np.random.randint(128)))
    p.start()
    p.join()

In [None]:
q.get()