In [1]:
import numpy as np
import multiprocessing as mp
import time
import concurrent.futures as cf

## Multiprocessing

Number of processors available

In [5]:
mp.cpu_count()

8

Slow function with process names

In [6]:
def f1(x):
    worker = mp.current_process()
    time.sleep(np.random.uniform(0, 5))
    res = x**2
    return worker.name, res

Creating a process pool

In [7]:
p = mp.Pool()

Blocking calls to `map`

In [8]:
p.map(f1, range(10))

[('ForkPoolWorker-1', 0),
 ('ForkPoolWorker-2', 1),
 ('ForkPoolWorker-3', 4),
 ('ForkPoolWorker-4', 9),
 ('ForkPoolWorker-5', 16),
 ('ForkPoolWorker-6', 25),
 ('ForkPoolWorker-7', 36),
 ('ForkPoolWorker-8', 49),
 ('ForkPoolWorker-3', 64),
 ('ForkPoolWorker-4', 81)]

Non-blocking calls to `map_async`

In [9]:
res = p.map_async(f1, range(10))

In [10]:
res.get()

[('ForkPoolWorker-1', 0),
 ('ForkPoolWorker-5', 1),
 ('ForkPoolWorker-2', 4),
 ('ForkPoolWorker-8', 9),
 ('ForkPoolWorker-6', 16),
 ('ForkPoolWorker-7', 25),
 ('ForkPoolWorker-3', 36),
 ('ForkPoolWorker-4', 49),
 ('ForkPoolWorker-2', 64),
 ('ForkPoolWorker-1', 81)]

Closing the pool

In [11]:
p.close()

Using a context manager

In [12]:
with mp.Pool() as p:
    rest = p.map(f1, range(1, 11))
res

<multiprocessing.pool.MapResult at 0x7f5545f45278>

Limiting number of workers

In [13]:
with mp.Pool(4) as p:
    res = p.map(f1, range(10))
res

[('ForkPoolWorker-17', 0),
 ('ForkPoolWorker-18', 1),
 ('ForkPoolWorker-19', 4),
 ('ForkPoolWorker-20', 9),
 ('ForkPoolWorker-19', 16),
 ('ForkPoolWorker-17', 25),
 ('ForkPoolWorker-18', 36),
 ('ForkPoolWorker-20', 49),
 ('ForkPoolWorker-17', 64),
 ('ForkPoolWorker-19', 81)]

Using `star_map`

In [2]:
def f2(x, y):
    return x + y

In [3]:
pairs = np.random.randint(0, 10, (10, 2))
pairs

array([[2, 3],
       [0, 6],
       [9, 5],
       [1, 4],
       [4, 9],
       [1, 7],
       [4, 5],
       [1, 2],
       [9, 9],
       [0, 0]])

In [4]:
with mp.Pool() as p:
    res = p.starmap(f2, pairs)

In [5]:
res

[5, 6, 14, 5, 13, 8, 9, 3, 18, 0]

In [6]:
def f2_(args):
    return f2(*args)

In [8]:
f2_([1,2])

3

In [9]:
with mp.Pool() as p:
    res = p.map(f2_, pairs)
res

[5, 6, 14, 5, 13, 8, 9, 3, 18, 0]

Shared memrory ??

In [10]:
x = mp.Value('i', 0)

In [11]:
x

<Synchronized wrapper for c_int(0)>

In [12]:
def count(i):
    x.value += 1

In [13]:
with mp.Pool() as p:
    p.map(count, range(1000))

In [14]:
x.value

846

Replicated shared memory

In [15]:
xs = mp.Array('i', mp.cpu_count())

In [20]:
def count(i):
    i = mp.current_process()._identity[0] % mp.cpu_count()
    xs[i] += 1

In [21]:
with mp.Pool() as p:
    p.map(count, range(1000))

In [22]:
xs[:]

[0, 0, 0, 0, 104, 544, 0, 352]

In [23]:
np.sum(xs[:])

1000

## Concurrent futures

Uisng concurrent futures

In [24]:
%%time
np.random.seed(123)
with cf.ProcessPoolExecutor(4) as p:
    res = p.map(f1, range(10))

CPU times: user 12 ms, sys: 28 ms, total: 40 ms
Wall time: 6.09 s


In [25]:
res

<generator object _chain_from_iterable_of_lists at 0x7f697c8b3fc0>

In [26]:
list(res)

[('Process-45', 0),
 ('Process-46', 1),
 ('Process-47', 4),
 ('Process-48', 9),
 ('Process-45', 16),
 ('Process-47', 25),
 ('Process-46', 36),
 ('Process-48', 49),
 ('Process-45', 64),
 ('Process-47', 81)]

Functions with multiple arguments

In [27]:
def f2_(x):
    return f2(*x)

In [28]:
with cf.ProcessPoolExecutor(4) as p:
    res = p.map(f2_, pairs)

In [29]:
list(res)

[2, 17, 3, 13, 8, 11, 7, 10, 6, 11]

In [30]:
%%time
np.random.seed(123)
with cf.ThreadPoolExecutor(4) as p:
    res = p.map(f1, range(10))

CPU times: user 8 ms, sys: 0 ns, total: 8 ms
Wall time: 7.67 s


In [31]:
res

<generator object Executor.map.<locals>.result_iterator at 0x7f697c875eb8>

In [32]:
list(res)

[('MainProcess', 0),
 ('MainProcess', 1),
 ('MainProcess', 4),
 ('MainProcess', 9),
 ('MainProcess', 16),
 ('MainProcess', 25),
 ('MainProcess', 36),
 ('MainProcess', 49),
 ('MainProcess', 64),
 ('MainProcess', 81)]