**Chapter 17. Concurrency with Futures**

Here I introduce the concept of "futures" - objects representing the asynchronous execution of an operation. This powerful idea is the foundation not only of `concurrent.futures` but also of the `asyncio` package.

# Example: Web Downloads in Three Styles

To handle network I/O efficiently, you need concurrency, as it involves high latency so instead of wasting CPU cycles waiting, it's better to do something else until a response come back from the network.

## A Sequential Download Script

sequential download script:

In [1]:
import os
import sys
import time
import requests

In [2]:
POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
            'MX PH VN ET EG DE IR TR CD FR').split()
BASE_URL= 'http://flupy.org/data/flags'
DEST_DIR = 'downloads/'

In [10]:
def save_flag(img, filename):
    path = os.path.join(DEST_DIR, filename)
    with open(path, 'wb') as fp:
        fp.write(img)

In [3]:
def get_flag(cc):
    url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
    print('download %s' % url)
    resp = requests.get(url)
    return resp.content


In [4]:
def show(text):
    print(text, end=' ')

In [5]:
def download_many(cc_list):
    for cc in cc_list:
        image = get_flag(cc)
        show(cc)
        save_flag(image, cc.lower()+'.gif')
        
    return len(cc_list)

In [6]:
def main(download_many):
    t0 = time.time()
    count = download_many(POP20_CC)
    elapsed = time.time() - t0
    print('\n{} flags downloaded in {:.2f}s'.format(count, elapsed))

## Downloading with concurrent.futures

The main features of the `concurrent.futures` package are the ThreadPoolExecutor and ProcessPoolExecutor classes, which implement an interface that allows you submit callables for execution in different threads or processes, respectively. The classes manage an internal pool of worker threads or processes, and a queue of tasks to be executed.

In [7]:
import concurrent

In [8]:
MAX_WORKERS = 20

In [11]:
def download_one(cc):
    image = get_flag(cc)
    save_flag(image, cc.lower()+'.gif')

In [13]:
def download_many(cc_list):
    workers = min(MAX_WORKERS, len(cc_list))
    # Instantiate the ThreadPoolExecutor with that number of worker threads; the executor.__exit__
    # method will call executor.shutdown(wait=True), which will block until all threads
    # are done.
    with concurrent.futures.ThreadPoolExecutor(workers) as executor:
        # The map method is similar to the map in built-in, except that the download_one
        # function will be called concurrently from multiple threads; it returns a generator
        # that can be iterated over to retrieve the value returned by each function.
        res = executor.map(download_one, sorted(cc_list))
        
    return len(list(res))

## Where Are the Futures?

Futures are essential components in the internals of concurrent.futures and asyncio.

As of Python 3.4, there are two classes named Future in the standard library: concurrent.futures.Future and asyncio.Future. They serve the same purpose: **an instance of either Future class represents a deferred computation that may or may not have completed.** This is similar to the Deferred class in Twisted, the Future class in Tornado, and Promise objects in JavaScript libraries.

Futures encapsulate pending operations so that they can be put in queues, their state of completion can be queried, and their results (or exception) can be retrieved when available.

**An important thing to known is that you and I should nt create them: they are meant to be instantiated exclusively by the concurrency framework.** It's easy to understand why: a Future represents something that will eventually happen, and the only way to be sure that something will happen is to schedule its execution. Therefore, concurrent.futures.Future instances are created by as the result of scheduling something for execution with a concurrent.futures.Executor subclass. For example, the `Executor.submit()` method takes a callable, schedules it to run, and return a future.

Client code is not supposed to change the state of a future: the concurrency framework changes the state of a future when the computation it represents is done, and we can't control when that happens.

**Both types of Future have a `.done()` method that is nonblocking and return a Boolean that tells you whether the callable linked to that future has executed or not. Instead of asking whether a future is done, client code usually asks to be noticed. That's why both Future classes have an `.add_done_callback()` method: you give it a callable, and the callable will invoked with the future as the single argument when the future is done.**

There is also a `.result()` method, which works the same in both classes when the future is done: it returns the result of the callbale, or re-raises whatever exception might have been throw when the callable was executed. However, when the future is not done, the behavior of the result method is very different between the two flavors of Future. In a concurrency.future.Future instance, invoking `f.result()` will block the caller's thread until the result is ready. An optional timeout argument can be passed, and if the future is not done in the specified time, a `TimeoutError` exception is raised. The `asyncio.Future.result()` method does not support timeout, and the preferred way to get the result of future in that library is to use `yield from` - which doesn't work with `concurrency.future.Future` intances.

The higher-level `executor.map` call is replaced by two for loops: one to create and schedule the futures, the other to retrieve their results.

In [14]:
def download_many(cc_list):
    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
        to_do = []
        for cc in sorted(cc_list):
            future = executor.submit(download_one, cc)
            to_do.append(future)
            
        results = []
        for future in future.as_complete(to_do):
            res = future.result()
            results.append(res)
            
    return len(results)

None of the concurrent scripts we tested can perform downloads in parallel. The `concurrent.futures` examples are limited by the GIL.

# Blocking I/O and the GIL

The CPython interpreter is not thread-safe internally, so it has Global Interpreter Lock(GIL), which allows only one thread at a time to execute python bytecode. That's why a single Python process usually cannot use multiple CPU cores at the same time.

**However, all standard library functions that perform blocking I/O release the GIL when waiting for a result from the OS. This means Python programs that are I/O bound can benefit from using threads at the Python level: while one Python thread is waiting for a response from the network, the blocked I/O function release the GIL, so another thread can run.**

> Every blocking I/O function in the Python standard library release the GIL,, allowing other thread to run. The `time.sleep()` function also release the GIL. Therefore, Python threads are perfectly usable in I/O-bound applications, despite thee GIL.

# Lauching Processes with concurrent.futures

`concurrent.futures` does enable truly parallel computation because it suppports distirbuting work among multiple Python processes using the `ProcessPoolExecutor` class - thus bypassing the GIL and leveraging all available CPU cores, if you need to do CPU-bound processing.

Replace `ThreadPoolExecutor` with `ProcessPoolExecutor`

```python
def download_many(cc_list):
    with futures.ThreadPoolExecutor() as executor:
        pass

To 

def download_many(cc_list):
    with futures.ProcessPoolExecutor() as executor:
        pass

```

# Experimenting with Executor.map

The simplest way to run several callables concurrently is with the Executor.map function.

In [38]:
import time
import concurrent

def display(*args):
    print(time.strftime('[%H:%M:%S]'), end=' ')
    print(*args)

def loiter(n):
    msg = '{}loiter({}): doing nothing for {}s...'
    display(msg.format('\t'*n, n, n))
    time.sleep(n)
    msg = '{}loiter({}): done.'
    display(msg.format('\t'*n, n))
    return n * 10

def main():
    display('Script starting')
    executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)
    results = executor.map(loiter, range(5))
    display('results:', results)
    for i, result in enumerate(results):
        display('result {}: {}'.format(i, result))

main()

[09:52:22] Script starting
[09:52:22] loiter(0): doing nothing for 0s...
[09:52:22] loiter(0): done.
[09:52:22] 	loiter(1): doing nothing for 1s...[09:52:22]
[09:52:22][09:52:22] results: <generator object Executor.map.<locals>.result_iterator at 0x10f3087c8>
[09:52:22] result 0: 0
 		loiter(2): doing nothing for 2s...
 			loiter(3): doing nothing for 3s...
[09:52:23] 	loiter(1): done.
[09:52:23] 				loiter(4): doing nothing for 4s...
[09:52:23] result 1: 10
[09:52:24] 		loiter(2): done.
[09:52:24] result 2: 20
[09:52:25] 			loiter(3): done.
[09:52:25] result 3: 30
[09:52:27] 				loiter(4): done.
[09:52:27] result 4: 40


**The `Executor.map` function is easy to use but it has feature that may or may not be helpful, depending on your needs: it returns the result exactly in the same order as the calls are started: if the first call take 10s to produce a result, and the other takes 1s each, you code will block for 10s as it tries to retrieve the first result of the generator returned by map.** After that, you'll get the remaining results without blocking because they will be done. That's OK when you must have all the results before proceeding, but often it's preferabel get the results as they are ready, regardless of the order they were submitted.

> The combination of `executor.submit` and `futures.as_completed` is more flexible than `executor.map` because you can submit different callables and arguments, while `executor.map` is designed to run the same callable on the different arguments. In addition, the set of futures you pass to `futures.as_completed` may came from different more than one executor - perhaps some were created by a ThreadPoolExecutor instance while others are from a `ProcessPoolExecutor`.

# Chapter Summary