# concurrent.futures
https://docs.python.org/3/library/concurrent.futures.html

This module makes it possible to execute callables asynchronously. This can be done either in a `ProcessPoolExecutor` (which is most suitable for CPU bound tasks because execution in separate processes is not subject to the GIL) or a `ThreadPoolExecutor` (which helps if the tasks are I/O bound). 

In [1]:
import concurrent.futures

## Creating `Future` objects
Both executors provide a `submit()` method that returns a `Future` for the asynchronous compuation.

In [2]:
with concurrent.futures.ProcessPoolExecutor() as executor:
    future = executor.submit(int, "123456789")
    assert isinstance(future, concurrent.futures.Future)
    
    # Wait for the asynchronous call to complete and retrieve the result
    print(future.result())

123456789


## Speeding up CPU bound tasks with `ProcessPoolExecutor.map()`

First of all, we define a simple context manager for time measurements. We could also use `%%time` or `%%timeit`, but their results are partly confusing because they cannot track the CPU usage in child processes.

In [3]:
import time
import contextlib

@contextlib.contextmanager
def measure_time():
    start = time.time()
    yield
    end = time.time()
    print(f"Elapsed time: {end - start:.03} seconds.")

## CPU bound task: determine all prime factors of a given number

In [4]:
import math

def factors(n):
    assert n >= 1
    
    def candidates():
        yield 2
        sqrt_n = int(math.floor(math.sqrt(n)))
        yield from range(3, sqrt_n + 1, 2)

    def gen(n):
        for i in candidates():
            while n % i == 0:
                yield i
                n //= i
                if n == 1:
                    return
        yield n

    return tuple(gen(n))

In [5]:
factors(123456789)

(3, 3, 3607, 3803)

## Calculate the sum of all factors of a few numbers

In [6]:
few_numbers_large_prime_factors = (
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419)

### Calculate the factors sequentially on a single CPU core

In [7]:
with measure_time():
    print(sum(sum(factors(n)) for n in few_numbers_large_prime_factors))

568206055343329
Elapsed time: 6.47 seconds.


### Make use of multiple CPU cores with `ProcessPoolExecutor.map()`

In [8]:
with measure_time():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        print(sum(sum(result) for result in executor.map(factors, few_numbers_large_prime_factors)))

568206055343329
Elapsed time: 2.98 seconds.


### Compare with `ThreadPoolExecutor.map()`
Using threads rather than processes does not provide a speed-up because Python's GIL restricts the compulation to a single CPU core.

In [9]:
with measure_time():
    with concurrent.futures.ThreadPoolExecutor() as executor:
        print(sum(sum(result) for result in executor.map(factors, few_numbers_large_prime_factors)))

568206055343329
Elapsed time: 7.47 seconds.


## Repeat the experiment with a larger set of small numbers

In [10]:
many_small_numbers = range(1, 40000)

In [11]:
with measure_time():
    print(sum(sum(factors(n)) for n in many_small_numbers))

139614694
Elapsed time: 0.322 seconds.


In this case, using `ProcessPoolExecutor.map()` as above does not help because the vast majority of the numbers can be factored very quickly, such that the overhead caused by passing the arguments and results between the main process and the callables in the child processes dominates the run time:

In [12]:
with measure_time():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        print(sum(sum(result) for result in executor.map(factors, many_small_numbers)))

139614694
Elapsed time: 10.8 seconds.


However, we can use the `chunksize` argument to group the numbers into larger chunks and reduce the overhead considerably:

In [13]:
with measure_time():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        print(sum(sum(result) for result in executor.map(factors, many_small_numbers, chunksize=1000)))

139614694
Elapsed time: 0.176 seconds.


## Resilience against aborted child processes
Since Python 3.2, a `BrokenProcessPool` exception is raised if one of the worker processes created by the pool dies. This cannot be seen easily in Jupyter, but at least we see that the main process is not frozen.

In [14]:
import sys

def fail_sometimes(n):
    if n % 5 == 0:
        sys.exit(1)
    return n

try:
    with concurrent.futures.ProcessPoolExecutor() as executor:
        tuple(map(fail_sometimes, range(10)))
except concurrent.futures.process.BrokenProcessPool as e:
    print("You would see that a BrokenProcessPool exception is raised when trying this outside Jupyter.")
    pass

SystemExit: 1

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


## Do not try the same with `multiprocessing`
In contrast, `multiprocessing.Pool.map()` will usually freeze as of October 2019 if one of the processes dies, which can cause trouble if, e.g., the kernel decides to kill one of them in out-of-memory situations: https://bugs.python.org/issue9205

In [15]:
import multiprocessing
p = multiprocessing.Pool()

# Replace 'False' by 'True' in the line below to see the Jupyter kernel freeze.
if False:
    tuple(p.map(fail_sometimes, range(10)))