# Chapter 17. Concurrency with Futures
- sequential
- concurrency
- asyncio
- (multiprocess)

## concurrent.futures.Future and asyncio.Future
Commons
- `.done()`
- `.add_done_callback()`
- `.result()`

Differences
- behavior of `.result()` when the future is not done.
    - current.futures.Future
        - Invoking `f.result()` will block the caller's thread until the result is ready.
        - If the future is not doen in the specified `timeout`, raises `TimeoutError`
    - asyncio.Future
        - does Not support timeout.
        - preferred to get the result by using `yield from`
            - This way does Not work with concurrency.futures.Future

# Example: Web Downloads in Three Styles

In [2]:
# The sequential download
import os
import time
import sys

import requests

POP20_CC = (
    'CN IN US ID BR PK NG BD RU JP '
    'MX PH VN ET EG DE IR TR CD FR'
).split()

BASE_URL = 'http://flupy.org/data/flags'

DEST_DIR = '/Users/tsubasa/tmp/garbage'

def save_flag(img, filename):
    path = os.path.join(DEST_DIR, filename)
    with open(path, 'wb') as fp:
        fp.write(img)

def get_flag(cc):
    url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
    resp = requests.get(url)
    return resp.content

def show(text):
    print(text, end=' ')
    sys.stdout.flush()

def download_many(cc_list):
    for cc in sorted(cc_list):
        image = get_flag(cc)
        show(cc)
        save_flag(image, cc.lower() + '.gif')

def main():
    t0 = time.time()
    count = download_many(POP20_CC)
    elapsed = time.time() - t0
    msg = '\n{} flags downloaded in {:.2f}s'
    print(msg.format(count, elapsed))

main()

BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN 
None flags downloaded in 14.27s


In [3]:
image = get_flag('CN')
type(image)

bytes

In [4]:
image

b'GIF89a\x90\x01\x0c\x01\xf7\x00\x00\xd32\x0c\xf2\x1e-\xfd\xb4I\xf7\xf46\xea\x1a\x17\xfd\xe5D\xf9\x1c7\xf0\x1d#\xea\xeb+\xeeT%\xfc\xd59\xf0$&\xf6#5\xf1\xec*\xeb\xec\x1d\xe8G\x1a\xec\x89,\xe7&+\xda\xf1%\xe8&6\xf3"+\xc5\xc9\xc8\xd9H\x10\xfe\xd9D\xda)(\xf5\xe34\xeb\xed\'\xf4\xeb=\xed\xf22\xfd\xc3H\xc8\xc7\xcc\xf2\xf2,\xea\x1c"\xe4##\xeb\xf2+\xe5o\x1d\xfd\xc9;\xf2\xe5+\xfb\xb7;\xfa\xea3\xed\xe6,\xc7\xc9\xc6\xee\x1a*\xfc\xe43\xeb$)\xf8\x87:\xed %\xfd\xeb=\xfc\xa8:\xe3\xeb+\xee\xeb;\xde:\x1b\xe9{!\xec\xee0\xe79"\xf9\x97:\xa8\xdf\xe4\xf4"\x1c\xfc\xe0<\xf4\xeb%\xef$\x1e\xea\x1d3\xe3KX\xe0MS\xfc\x1b-\xec\xb8*\xea%\x1f\xe4\xf1*\xea\x9a%\xee\xf1#\xe1%\x1d\xeb,%\xed\xf1;\xf0\xea0\xe2\xf2!\xf4\xe3=\xed"\x1e\xec\xe12\xe7_\x1f\xf5\x94-\xe3-$\xdb\xb9\xbb\xf0\xdd=\xdba\x12\xf2""\xeb*-\xde\xf3\x16\xd9HW\xea!$\xf9\xd9\'\xe5\xea#\xef\xe93\xe3\x1e \xec\xa59\xf6v6\xec\xac*\xeb\xe6%\xf9\xe1.\xf1\xee0\xf9\xf1D\xe2*0\xf0"A\xfe\xed/\xf6\xa0.\xec&(\xe6\xca&\xdf+\x1d\xf3k)\xcb\xc5\xb9\xfa!-\xf9\xed#\xf6\x84,\xf2 

In [5]:
# Downloading with concurrent.futures
from concurrent import futures

MAX_WORKERS = 20

def download_one(cc):
    image = get_flag(cc)
    show(cc)
    save_flag(image, cc.lower() + '.gif')
    return cc

def download_many(cc_list):
    workers = min(MAX_WORKERS, len(cc_list))
    with futures.ThreadPoolExecutor(workers) as executor:
        # multiple threads
        res = executor.map(download_one, sorted(cc_list))
    return len(list(res))

def main():
    t0 = time.time()
    count = download_many(POP20_CC)
    elapsed = time.time() - t0
    msg = '\n{} flags downloaded in {:.2f}s'
    print(msg.format(count, elapsed))

main()

VN JP DE RU TR IR CNBDET  IN  FRPK NG  CD IDEG  PH USBR  MX 
20 flags downloaded in 0.35s


In [6]:
# as_completed
from concurrent import futures

def download_many(cc_list):
    cc_list = cc_list[:5]
    with futures.ThreadPoolExecutor(max_workers=3) as executor:
        to_do = []
        for cc in sorted(cc_list):
            future = executor.submit(download_one, cc)
            to_do.append(future)
            msg = 'Scheduled for {}: {}'
            print(msg.format(cc, future))
        
        results = []
        for future in futures.as_completed(to_do):
            res = future.result()
            msg = '{} result: {!r}'
            print(msg.format(future, res))
            results.append(res)
    return len(results)

download_many(POP20_CC)

Scheduled for BR: <Future at 0x10db90128 state=running>
Scheduled for CN: <Future at 0x10d7b5978 state=running>
Scheduled for ID: <Future at 0x10d7b5fd0 state=running>
Scheduled for IN: <Future at 0x10db1ac88 state=pending>
Scheduled for US: <Future at 0x10db1ae48 state=pending>
BR <Future at 0x10db90128 state=finished returned str> result: 'BR'
ID <Future at 0x10d7b5fd0 state=finished returned str> result: 'ID'
CN <Future at 0x10d7b5978 state=finished returned str> result: 'CN'
IN <Future at 0x10db1ac88 state=finished returned str> result: 'IN'
US <Future at 0x10db1ae48 state=finished returned str> result: 'US'


5

# Blocking I/O and the GIL
- GIL; Global Interpreter Lock
    - The CPython interpreter is not thread-safe internally
    - can use only one thread at a time to execute Python bytecodes
    - So single Python process usually cannot use multiple CPU cores at the same time.
- However, all standard library functions that perform blocking I/O release the GIL when waiting for a result from the OS.
    - e.g, while on Python thread is waiting for a response from the network, the blocked I/O function releases the GIL so another thread can run.

-> Python threads are usable in I/O-bound applications, despite the GIL

# Launching Processes with concurrent.futures
- Both `ProcessPoolExecutor` and `ThreadPoolExecutor` implement the generic `Executor` interface
- `ProcessPoolExecutor`
    - enable truly parallel computations -> CPU-bound processing
        - bypassing the GIL and leveraging all available CPU cores

In [14]:
def download_many(cc_list):
    with futures.ProcessPoolExecutor() as executor:
         res = executor.map(download_one, sorted(cc_list))
    return len(list(res))

def main():
    t0 = time.time()
    count = download_many(POP20_CC)
    elapsed = time.time() - t0
    msg = '\n{} flags downloaded in {:.2f}s'
    print(msg.format(count, elapsed))

main()

FR BD CN DE EG ET CD BR JP IN NG ID IR PK PH MX RU TR US VN 
20 flags downloaded in 0.76s


# Experimenting with Executor.map
- `Executor.map` returns the results in the same order as the calls are started:
    - if the first call takes 10s to produce a result, and the other take 1s each, the code will block for 10s as it tries to retrieve the first result of the generator returned by `map`

# Executor.submit
- Should use this when no need to consider the result order

In [None]:
from time import sleep, strftime
from concurrent import futures


def dispaly(*args):
    print(strftime('[%H:%M:%S]'), end=' ')
    print(*args)


def loiter(n):
    msg = '{}loiter({}): doing nothing for {}s...'
    display(msg.format('  '*n, n, n))
    sleep(n)
    msg = '{}loiter({}): done.'
    display(msg.format('  '*n, n))
    return n * 10


def main():
    display('Script starting.')
    executor = futures.ThreadPoolExecutor(max_workers=3)
    results = executor.map(loiter, range(5))
    display('results:', results)
    display('Waiting for individual results:')
    for i, result in enumerate(results):
        display('result {}: {}'.format(i, result))


main()

In [18]:
import time
from tqdm import tqdm
for i in tqdm(range(1000)):
    time.sleep(.01)

100%|██████████| 1000/1000 [00:11<00:00, 87.48it/s]


## Error Handling in the flags2 Example

In [None]:
def get_flag(base_url, cc):
    url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
    resp = requests.get(url)
    if resp.status_code != 200:
        resp.raise_for_status()
    return resp.content


def download_one(cc, base_url, verbose=False):
    try:
        image = get_flag(base_url, cc)
    except requests.exceptions.HTTPError as exc:
        res = exc.response
        if res.status_code == 404:
            status = HTTPStatus.not_found
            msg = 'not found'
        else:
            raise
    else:
        save_flag(image, cc.lower() + '.gif')
        status = HTTPStatus.ok
        msg = 'OK'

    if verbose:
        print(cc, msg)

    return Result(status, cc)


def download_many(cc_list, base_url, verbose, max_req):
    counter = collections.Counter()
    cc_iter = sorted(cc_list)
    if not verbose:
        cc_iter = tqdm.tqdm(cc_iter)
    for cc in cc_iter:
        try:
            res = download_one(cc, base_url, verbose)
        except requests.exceptions.HTTPError as exc:
            error_msg = 'HTTP error {res.status_code} - {res.reason}'
            error_msg = error_msg.format(res=exc.response)
        except requests.exceptions.ConnectionError as exc:
            error_msg = 'Connection error'
        else:
            error_msg = ''
            status = res.status

        if error_msg:
            status = HTTPStatus.error
        counter[status] += 1
        if verbose and error_msg:
            print('*** Error for {}: {}'.format(cc, error_msg))
    return counter

## Using futures.as_completed

In [None]:
import collections
from concurrent import futures
import requests
import tqdm

DEFAULT_CONCUR_REQ = 30
MAX_CONCUR_REQ = 1000


def download_many(cc_list, base_url, verbose, concur_req):
    counter = collections.Counter()
    with futures.ThreadPoolExecutor(max_workers=concur_req) as executor:
        to_do_map = {}
        for cc in sorted(cc_list):
            future = executor.submit(download_one, cc, base_url, verbose)
            to_do_map[future] = cc
        done_iter = futures.as_completed(to_do_map)
        
        if not verbose:
            done_iter = tqdm.tqdm(done_iter, total=len(cc_list))
        for future in done_iter:
            try:
                res = future.result()
            except requests.exceptions.HTTPError as exc:
                error_msg = 'HTTP {res.status_code} - {res.reason}'
                error_msg = error_msg.format(res=exc.response)
            except reqeusts.exceptions.ConnectionError as exc:
                error_msg = 'Connection error'
            else:
                error_msg = ''
                status = res.status
            
            if error_msg:
                status = HTTPStatus.error
            counter[status] += 1
            if verbose and error_msg:
                cc = to_do_map[future]
                print('*** Error for {}: {}'.format(cc, error_msg))
                
    return counter