# Concurrency and parallelism



## Concurrency vs. parallelism

[Stack Overflow: What is the difference between concurrency and parallelism?](https://stackoverflow.com/questions/1050222/what-is-the-difference-between-concurrency-and-parallelism)
> **Concurrency** is when two or more tasks can start, run, and complete in overlapping time periods. It doesn't necessarily mean they'll ever both be running at the same instant. For example, multitasking on a single-core machine.

> **Parallelism** is when tasks literally run at the same time, e.g., on a multicore processor.

## Overview of Python concurrency modules

See [official docs: Concurrent Execution](https://docs.python.org/3/library/concurrency.html).

Built-in modules dealing with concurrency and parallelism:
* `threading` — Low-level thread-based parallelism, rather avoid because the intrinsic complexity of threads (e.g. the [Dining philosophers problem](https://en.wikipedia.org/wiki/Dining_philosophers_problem)) and the limitations of the infamous [CPython's Global Interpreter Lock (GIL)](https://realpython.com/python-gil/).
* `multiprocessing` — Low-level process-based parallelism.
* `asyncio` — Cooperative multitasking, use for web, streaming data or input / output heavy applications.
* `concurrent.futures` — High-level launching parallel tasks. Convenient entry point for parallelization of CPU-heavy tasks.

## `concurrent.futures`

A nice, in-depth overview: [Effortless Concurrency with Python's concurrent.futures](https://rednafi.github.io/digressions/python/2020/04/21/python-concurrent-futures.html).

From the official docs:
> The concurrent.futures module provides a high-level interface for asynchronously executing **callables**.

This is basically saying that you can conveniently 
1. *execute functions* so that they can run *asynchronously*,
2. *interact* with the resulting (asynchronous) tasks.

There are two (abstract) classes that enable that:

### `Executor`

[`Executor`](https://docs.python.org/3/library/concurrent.futures.html#executor-objects) 
provides two methods that execute a function with some input arguments:
* **`submit`**`(fn, /, *args, **kwargs)`
* **`map`**`(func, *iterables, timeout=None, chunksize=1)`

Where the execution actually happens depends on the concrete executor. Built-in executors are
* `ThreadPoolExecutor`, which uses threads for executing tasks,
* `ProcessPoolExecutor`, which uses subprocesses for executing tasks.

`submit` executes a single function call with positional arguments `*args` and keyword arguments `**kwargs`.
I.e., the function `fn` is called as `fn(*args,  **kwargs)`.

`map` (which is similar to the built-in function [`map`](https://docs.python.org/3/library/functions.html#map)) executes the function `func` on each item in all `iterables`. Something like
```python
for iterable in iterables:
    for item in iterable:
        yield func(item)
```
You may wonder how to pass arguments to `func` in this case. This can be done via `functools.partial`,
which essentially provides *lazy evaluation*, similarly to `Executor.submit`.

Executor can also be explicitely shut down via **`shutdown`**`(wait=True, *, cancel_futures=False)`.


### `Future`

[`Future` objects](https://docs.python.org/3/library/concurrent.futures.html#future-objects)
encapsulate the asynchronous execution of a callable 
and are created by `Executor.submit()`.

The fundamental methods, defining the `Future` protocol, are:
* `cancel()`
* `cancelled()`
* `running()`
* `done()`
* `result(timeout=None)`


On top of these classes, there are two [module functions](https://docs.python.org/3/library/concurrent.futures.html#module-functions), 
which are used to iterate through completed `Future`s:
* **`wait`**`(fs, timeout=None, return_when=ALL_COMPLETED)`
* **`as_completed`**`(fs, timeout=None)`

Here, `fs` is always an iterable of `Future`s.

### An illustrative example

Let's demonstrate how we can distribute CPU-intense hashing usign `ProcessPoolExecutor`.

In [1]:
import hashlib
import concurrent.futures
import functools

In [2]:
# set parameters that will make the hashing a bit challenging
salt = b"salt"
n = 2 ** 7
r = 2 ** 10
p = 2 ** 5

hashlib.scrypt(b"my password", salt=salt, n=n, r=r, p=p)

b"^,\x84aX\xbdf|8\t\x9c\x9d'\xeb\x89\x90\xd4c!Q\xeb\xea\x95\xde\x18\xeb\xe4\x07\xd8#\xd1r\x056\x9f-^J\x96N\xfb\xbc\x1bwM\x94Fb\xff\xd2:+\xef\xa5\x9c\xb8\x15d\\KM\xd7\r?"

In [3]:
%timeit hashlib.scrypt(b'my password', salt=salt, n=n, r=r, p=p)

1.54 s ± 13.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [4]:
# create a process pool executor with an explicit maximum number of workers
executor = concurrent.futures.ProcessPoolExecutor(max_workers=4)

In [5]:
# submit a single hash calculation
digest_future = executor.submit(hashlib.scrypt, b'my password', salt=salt, n=n, r=r, p=p)

In [6]:
# check whether the job is done
digest_future.done()

False

In [7]:
# retrieve the result
digest_future.result()

b"^,\x84aX\xbdf|8\t\x9c\x9d'\xeb\x89\x90\xd4c!Q\xeb\xea\x95\xde\x18\xeb\xe4\x07\xd8#\xd1r\x056\x9f-^J\x96N\xfb\xbc\x1bwM\x94Fb\xff\xd2:+\xef\xa5\x9c\xb8\x15d\\KM\xd7\r?"

In [8]:
# now we need to hash multiple passwords
num_passwords = 8
passwords = [f"password {i}".encode() for i in range(num_passwords)]
passwords

[b'password 0',
 b'password 1',
 b'password 2',
 b'password 3',
 b'password 4',
 b'password 5',
 b'password 6',
 b'password 7']

In [9]:
# to use map, we need a simple callable of a single argument
# this can be easily created via functools.partial
digest_func = functools.partial(hashlib.scrypt, salt=salt, n=n, r=r, p=p)

In [10]:
# now we can map this function to all the passwords
digests = executor.map(digest_func, passwords)

In [11]:
# digest is a generator
digests

<generator object _chain_from_iterable_of_lists at 0x7f4b2dd603d0>

In [12]:
# we can, for example, collect the results into a list
# the list(digests) iterates over digests
list(digests)

[b'_0\x8f\xea"\xa0y\x7f=\x87\xf0\x9b\x07\x07#2\xc2\xc0\xbcc?1:\xb9\xb0KC\xe4@\x9f\xd1\xf2\xbd\xc7\\\xacY\x92\x0fR\x11\x8bkz?uV\xef\xe2S!b?`\xd6\x91\xab}3r\xb3W\xc4B',
 b'\x9a\xfb\xe7!\xcf\x04\xdbg)N\xa0\x7f\x9e,3\xfbd\xa9\xf5\xccW\x9a\x1bO\xde)`\xd8r\xf6a\tZ,s0\x93PE\xd3~\xe7\xd1\xf96$\xdf\xfa\xc8\xdd\xcc\xf0m#\x13Z,\xba\x85S\xd3\x10\xe7\x88',
 b'\xb7\xfa\x1a\x7f\x06\xed\xb1\xcb\xbe\xc0\xcdg\x16`\xdb\xa5\xe0\x10N=\x1c9\x9f\xactQ]\x18Q\xc9\xcd@\xa7X\xe1\xb1\x04p7\xf8,\r\x0f\xac\x8e\xab\xbfT\xbd\xe3\xb5#\xb9\x90L\xd2\xdd\x88\xc1W\xc1\xc5H\x99',
 b'9\xed\x88]B\xd9\xfcVf\xe3\xeb\xfb\x9c\xf0\xfe\xde\xc4\x10\xb7%\xfa\xa4\x85C\x82\xef\xaa@?C\xdf\xe9\x95(\xd4\x99\xa5t\xfc\x1d\xb3\xa5\xa7]r\x10\x95\xdf\n\xa4\x8f.\xb3\xe5\x14\x9e="\xaa()X\xa2\xbf',
 b'\xdb\xd3\x05\x8d\xe9\xd4F\xa0\xc1\xce\xe7\xa1\xc7w/\xfa\xb8\xc0\xa1)Z\x92NgU\xc5\x0b\x1c$(\x11\xca\xbf{\xd5\x91\xd5m\x1f\xf6\x85\xf0J/\xe4dU\xe9rw\xfdD\xae\xb5R\x8a%\xbbP\nNB2\xd6',
 b'\xa9\xb6\x8e\xe5\x08\xc0W!\xfb5/4_\x11\xe3\x13\xff\xc4iEN\xfc\x

In [13]:
# iterating and submitting the function on each item is similar to map
digest_futures = [executor.submit(digest_func, password) for password in passwords]
digest_futures

[<Future at 0x7f4b2dd69210 state=pending>,
 <Future at 0x7f4b2dd69650 state=pending>,
 <Future at 0x7f4b2dd5ff10 state=pending>,
 <Future at 0x7f4b2dd5fe10 state=pending>,
 <Future at 0x7f4b2dd5fe90 state=pending>,
 <Future at 0x7f4b2dd80dd0 state=pending>,
 <Future at 0x7f4b2dd80790 state=pending>,
 <Future at 0x7f4b2dd80450 state=pending>]

In [14]:
# we can then iterate over finished futures using as_completed
for digest_future in concurrent.futures.as_completed(digest_futures):
    print(digest_future.result())

b'_0\x8f\xea"\xa0y\x7f=\x87\xf0\x9b\x07\x07#2\xc2\xc0\xbcc?1:\xb9\xb0KC\xe4@\x9f\xd1\xf2\xbd\xc7\\\xacY\x92\x0fR\x11\x8bkz?uV\xef\xe2S!b?`\xd6\x91\xab}3r\xb3W\xc4B'
b'9\xed\x88]B\xd9\xfcVf\xe3\xeb\xfb\x9c\xf0\xfe\xde\xc4\x10\xb7%\xfa\xa4\x85C\x82\xef\xaa@?C\xdf\xe9\x95(\xd4\x99\xa5t\xfc\x1d\xb3\xa5\xa7]r\x10\x95\xdf\n\xa4\x8f.\xb3\xe5\x14\x9e="\xaa()X\xa2\xbf'
b'\xb7\xfa\x1a\x7f\x06\xed\xb1\xcb\xbe\xc0\xcdg\x16`\xdb\xa5\xe0\x10N=\x1c9\x9f\xactQ]\x18Q\xc9\xcd@\xa7X\xe1\xb1\x04p7\xf8,\r\x0f\xac\x8e\xab\xbfT\xbd\xe3\xb5#\xb9\x90L\xd2\xdd\x88\xc1W\xc1\xc5H\x99'
b'\x9a\xfb\xe7!\xcf\x04\xdbg)N\xa0\x7f\x9e,3\xfbd\xa9\xf5\xccW\x9a\x1bO\xde)`\xd8r\xf6a\tZ,s0\x93PE\xd3~\xe7\xd1\xf96$\xdf\xfa\xc8\xdd\xcc\xf0m#\x13Z,\xba\x85S\xd3\x10\xe7\x88'
b'\xdb\xd3\x05\x8d\xe9\xd4F\xa0\xc1\xce\xe7\xa1\xc7w/\xfa\xb8\xc0\xa1)Z\x92NgU\xc5\x0b\x1c$(\x11\xca\xbf{\xd5\x91\xd5m\x1f\xf6\x85\xf0J/\xe4dU\xe9rw\xfdD\xae\xb5R\x8a%\xbbP\nNB2\xd6'
b'\xa9\xb6\x8e\xe5\x08\xc0W!\xfb5/4_\x11\xe3\x13\xff\xc4iEN\xfc\xa6\xba\xbb\

In [15]:
%%timeit
# timing of the parallel execution
digest_futures = [executor.submit(digest_func, password) for password in passwords]
digests = [digest_future.result() for digest_future in concurrent.futures.as_completed(digest_futures)]

13 s ± 149 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [16]:
%%timeit
# vs serial execution without any executor
digests = [digest_func(password) for password in passwords]

12.8 s ± 175 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


**Exercise:** Use an executor and either `sumbit` or `map` to explicitely parallelize the SVD of on array of matrices, which are stored as a single numpy array with the first dimension being the index of the matrix. We are interested in the singular values only so that `compute_uv=False` should be used to speed up and simplify the calculatio. Play with the size of the problem so that we can observe when (if) parallelization becomes efficient. 

In [13]:
import numpy as np

In [75]:
# generate random matrices
size = 6, 1_000, 10
matrices = np.random.sample(size=size)

In [72]:
# SVD for all matrices
s = np.linalg.svd(matrices, compute_uv=False)

In [73]:
# check the shape
s.shape

In [74]:
# hint: inidividual decomposiotions can be calculated by
for i in range(size[0]):
    s_i = np.linalg.svd(matrices[i], compute_uv=False)
    np.testing.assert_allclose(s[i], s_i)

In [60]:
# Your code using executor

### Gotchas

* Threads and subprocess have overheads due to their creation and communication. Hence, functions that runs for very short times are not worth submitting to executors.
* Workers need to have access to the same packages - via the same (virtual) envs, `PYTHONPATH` or so.
* Some objects cannot be pickled: e.g. open files or lambda / interactively defined functions. This can be solved by using pickle alternatives like `dill` or `cloudpickle`, which are primary choices for tools like `ipyparallel` or `dask` mentioned below.

In [14]:
def my_function(x):
    return x - 1


future = executor.submit(my_function, 1)

In [15]:
future.exception()

In the meantime in the logs of jupyter notebook:

```
Process SpawnProcess-3:
Traceback (most recent call last):
  File "/Users/kuba/miniconda3/envs/data/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/Users/kuba/miniconda3/envs/data/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/kuba/miniconda3/envs/data/lib/python3.8/concurrent/futures/process.py", line 233, in _process_worker
    call_item = call_queue.get(block=True)
  File "/Users/kuba/miniconda3/envs/data/lib/python3.8/multiprocessing/queues.py", line 116, in get
    return _ForkingPickler.loads(res)
AttributeError: Can't get attribute 'my_function' on <module '__main__' (built-in)>
```

### Beyond built-in executors

The abstract `Executor` and `Future` interfaces are simple and powerful enough to boost
other implementations, making the `concurrent.futures` ecosystem very neat.


* [ipyparallel](https://ipyparallel.readthedocs.io/en/latest/) offers a rich functionality for parallel computing on clusters (multiple nodes). `ipyparallel.Client.executor` method returns a `concurrent.futures.Executor` interface.
* [mpi4py.futures](https://mpi4py.readthedocs.io/en/stable/mpi4py.futures.html) provides `concurrent.futures.Executor` interface for the reference MPI (Message Passing Interface) library for Python, `mpi4pi`.
* `dask.disributed` is similar to `ipyparallel` and provides `concurrent.futures.Executor` via the [`Client.get_executor`](https://distributed.dask.org/en/latest/api.html?highlight=executor#distributed.Client.get_executor) method. You will hear more about Dask soon.

<a style='text-decoration:none;line-height:16px;display:flex;color:#5B5B62;padding:10px;justify-content:end;' href='https://deepnote.com?utm_source=created-in-deepnote-cell&projectId=77a5caea-ff40-471d-8b4b-98dc66dd30c3' target="_blank">
 </img>
Created in <span style='font-weight:600;margin-left:4px;'>Deepnote</span></a>