# Параллельная разработка с библиотекой `concurrent.futures`

## Основы

Логично предположить, что для параллельной разработки на Python созданы какие-то библиотеки. Первой из них была `concurrent.futures`. Библиотека эта введена в Python начиная с версии 3.2, однако позже она была адаптирована для Python начиная с версии 2.5. Основной подход в работе с этой библиотекой воплощает следующую мысль Микеле Симионато (Michele Simionato):

_Люди, критикующие явную работу с несколькими потоками, обычно являются системными программистами, которые имеют в виду варианты использования, с которыми типичный прикладной программист никогда не столкнется в своей жизни. […] В 99% случаев прикладному программисту достаточно создавать группы независимых потоков и собирать результаты их работы в очереди_.

Далее мы рассмотрим такую сущность, как футуры. Если говорить кратко, то футура - это объект, предоставляющий возможность асинхронного выполнения операции. 

Начнем с простого примера. Допустим, нам надо написать программу, скачивающую с интернета какие-то файлы. Есть 2 подхода: либо мы качаем файлы последовательно, либо параллельно. Рассмотрим оба варианта реализации.

In [1]:
%%writefile flags_seq.py

import os
import time
import sys
import shutil
import requests

countries = 'CN IN US ID BR PK NG BD RU JP MX PH VN ET EG DE IR TR CD FR'.split()
base_url = 'http://flupy.org/data/flags'
dest_dir = './downloads'

def save_image(img, filename):
    path = os.path.join(dest_dir, filename)
    with open(path, 'wb') as fp:
        fp.write(img)

def show(text):
    #Печатаем без перехода на следующую строку
    print(f'{text}', end=' ')

def get_flag(country):
    url = f'{base_url}/{country}/{country}.gif'
    response = requests.get(url)
    return response.content

def init_dir(dest_dir):
    if os.path.exists(dest_dir):
        shutil.rmtree(dest_dir)
    os.mkdir(dest_dir)
    
def download_flag(country):
    image = get_flag(country.lower())
    save_image(image, f'{country.lower()}.gif')
    show(country)
    return country

def download_flags(country_lst):  
    downloaded = [download_flag(country) for country in sorted(country_lst)]
    return len(downloaded)

def main(downloader):
    init_dir(dest_dir)
    start_time = time.time()
    count = downloader(countries)
    time_taken = time.time() - start_time
    print(f'\n{count} flag(s) downloaded in {time_taken:.2f} seconds.')

if __name__ == '__main__':
    main(download_flags)

Overwriting flags_seq.py


In [2]:
%%bash

python3 flags_seq.py

BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN 
20 flag(s) downloaded in 17.62 seconds.


Если качать последовательно, то на скачивание 20 маленьких `*.gif`-файлов уходит от 10 до 20 секунд. Посмотрим, сколько займет та же операция при использовании библиотеки `concurrent.futures`. Также, чтобы не переписывать код повторно, мы используем некоторые функции из написанного выше скрипта, выполняющего последовательное скачивание.

In [3]:
%%writefile flags_fut.py

from concurrent import futures
from flags_seq import main, download_flag

max_workers = 20

def download_flags(country_lst):
    worker_number = min(max_workers, len(country_lst))
    with futures.ThreadPoolExecutor(worker_number) as executor:
        downloaded_flags = executor.map(download_flag, sorted(country_lst))
    return len(list(downloaded_flags))

if __name__ == '__main__':
    main(download_flags)

Overwriting flags_fut.py


In [4]:
%%bash

python3 flags_fut.py

BD NG BR DE PH RU JP IN PK FR IR MX TR VN EG ET US ID CD CN 
20 flag(s) downloaded in 0.94 seconds.


В данном случае на скачивание всех флагов уходит не более 3 секунд.

Код выше явным образом футуры не использует, оставляя их под капотом. По факту футуры - это объекты, инкапсулирующие операции, которые могут быть выполнены аснхронно. Есть 2 класса футур - `concurrent.futures.Future` и `asyncio.Future`. Они, естественно, несколько различаются. У них есть следующие интересные методы:

* `.done()` - отвечает на вопрос, отработала ли футура  
* `.add_done_callback()` - назначает функцию, которая будет вызвана по завершении работы футуры  
* `.result()` - выдает результат работы футуры  

Мы можем попытаться более явно использовать футуры, 

In [5]:
%%writefile flags_fut2.py

from concurrent import futures
from flags_seq import main, download_flag

max_workers = 20

def download_flags(country_lst):
    worker_number = min(max_workers, len(country_lst))
    results = []
    with futures.ThreadPoolExecutor(worker_number) as executor:
        to_do = [executor.submit(download_flag, country) for country in country_lst]        
        for future in futures.as_completed(to_do):
            result = future.result()
            print(f'{future} result: {result}')
            results.append(result)
    return len(results)
    #    downloaded_flags = executor.map(download_flag, sorted(country_lst))
    #return len(list(downloaded_flags))

if __name__ == '__main__':
    main(download_flags)

Overwriting flags_fut2.py


In [6]:
%%bash

python3 flags_fut2.py

JP <Future at 0x7f9cd59c8850 state=finished returned str> result: JP
BD <Future at 0x7f9cd59dce50 state=finished returned str> result: BD
FR <Future at 0x7f9cd4794550 state=finished returned str> result: FR
US <Future at 0x7f9cd59ea8b0 state=finished returned str> result: US
RU <Future at 0x7f9cd59c83d0 state=finished returned str> result: RU
ET <Future at 0x7f9cd476c550 state=finished returned str> result: ET
NG <Future at 0x7f9cd4a61ee0 state=finished returned str> result: NG
EG <Future at 0x7f9cd4752790 state=finished returned str> result: EG
CN <Future at 0x7f9cd47424c0 state=finished returned str> result: CN
IN <Future at 0x7f9cd4742160 state=finished returned str> result: IN
MX <Future at 0x7f9cd4e2b940 state=finished returned str> result: MX
ID <Future at 0x7f9cd59d5ca0 state=finished returned str> result: ID
DE PH <Future at 0x7f9cd474ed90 state=finished returned str> result: DE
<Future at 0x7f9cd4d05940 state=finished returned str> result: PH
VN <Future at 0x7f9cd476ccd0 state

Как известно, в Python есть такой компонент, как Global Interpreter Lock (GIL), который блокирует одновременное использование вычислительных ресурсов несколькими потоками. При этом, как мы видели выше, многопоточный код скачал файлы существенно быстрее, чем однопоточный. Как такое произошло? Очевидно, что пользовательский код Python может как что-то считать сам, так и вызывать функции библиотек. Библиотеки могут тоже, как считать, так и вызывать другие библиотеки. Здесь важно вспомнить, что вызовы библиотек в конечном счете могут превращаться в вызовы API операционной системы, которые GIL не блокирует. Скачивание файла - это пример вызова API операционной системы, который GIL не блокирует. Поэтому и получается сильно быстрее. Если обобщить сказанное, то получается, что все операции ввода-вывода в Python хорошо параллелятся.

## Практический пример

То, что мы уже написали, является скорее учебным примером - здесь нет никакого контроля ошибок и программа предлагает скачать флаги захардкоженных стран. Понятно, что в реальной жизни все интереснее. Реализуем пример с красивым прогресс-баром, различными опциями и обработкой ошибок.

In [7]:
%%writefile country_codes.txt

CN IN US ID BR PK NG BD RU JP MX PH VN ET EG DE IR TR CD FR

Overwriting country_codes.txt


In [8]:
%%writefile flags2_common.py

"""Utilities for second set of flag examples.
"""

import os
import time
import sys
import string
import argparse
from collections import namedtuple
from enum import Enum


Result = namedtuple('Result', 'status data')

HTTPStatus = Enum('Status', 'ok not_found error')

POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
            'MX PH VN ET EG DE IR TR CD FR').split()

DEFAULT_CONCUR_REQ = 1
MAX_CONCUR_REQ = 1

SERVERS = {
    'REMOTE': 'http://flupy.org/data/flags',
    'LOCAL':  'http://localhost:8001/flags',
    'DELAY':  'http://localhost:8002/flags',
    'ERROR':  'http://localhost:8003/flags',
}
DEFAULT_SERVER = 'REMOTE'

DEST_DIR = 'downloads/'
COUNTRY_CODES_FILE = 'country_codes.txt'


def save_flag(img, filename):
    path = os.path.join(DEST_DIR, filename)
    with open(path, 'wb') as fp:
        fp.write(img)


def initial_report(cc_list, actual_req, server_label):
    if len(cc_list) <= 10:
        cc_msg = ', '.join(cc_list)
    else:
        cc_msg = 'from {} to {}'.format(cc_list[0], cc_list[-1])
    print('{} site: {}'.format(server_label, SERVERS[server_label]))
    msg = 'Searching for {} flag{}: {}'
    plural = 's' if len(cc_list) != 1 else ''
    print(msg.format(len(cc_list), plural, cc_msg))
    plural = 's' if actual_req != 1 else ''
    msg = '{} concurrent connection{} will be used.'
    print(msg.format(actual_req, plural))


def final_report(cc_list, counter, start_time):
    elapsed = time.time() - start_time
    print('-' * 20)
    msg = '{} flag{} downloaded.'
    plural = 's' if counter[HTTPStatus.ok] != 1 else ''
    print(msg.format(counter[HTTPStatus.ok], plural))
    if counter[HTTPStatus.not_found]:
        print(counter[HTTPStatus.not_found], 'not found.')
    if counter[HTTPStatus.error]:
        plural = 's' if counter[HTTPStatus.error] != 1 else ''
        print('{} error{}.'.format(counter[HTTPStatus.error], plural))
    print('Elapsed time: {:.2f}s'.format(elapsed))


def expand_cc_args(every_cc, all_cc, cc_args, limit):
    codes = set()
    A_Z = string.ascii_uppercase
    if every_cc:
        codes.update(a+b for a in A_Z for b in A_Z)
    elif all_cc:
        with open(COUNTRY_CODES_FILE) as fp:
            text = fp.read()
        codes.update(text.split())
    else:
        for cc in (c.upper() for c in cc_args):
            if len(cc) == 1 and cc in A_Z:
                codes.update(cc+c for c in A_Z)
            elif len(cc) == 2 and all(c in A_Z for c in cc):
                codes.add(cc)
            else:
                msg = 'each CC argument must be A to Z or AA to ZZ.'
                raise ValueError('*** Usage error: '+msg)
    return sorted(codes)[:limit]


def process_args(default_concur_req):
    server_options = ', '.join(sorted(SERVERS))
    parser = argparse.ArgumentParser(
                description='Download flags for country codes. '
                'Default: top 20 countries by population.')
    parser.add_argument('cc', metavar='CC', nargs='*',
                help='country code or 1st letter (eg. B for BA...BZ)')
    parser.add_argument('-a', '--all', action='store_true',
                help='get all available flags (AD to ZW)')
    parser.add_argument('-e', '--every', action='store_true',
                help='get flags for every possible code (AA...ZZ)')
    parser.add_argument('-l', '--limit', metavar='N', type=int,
                help='limit to N first codes', default=sys.maxsize)
    parser.add_argument('-m', '--max_req', metavar='CONCURRENT', type=int,
                default=default_concur_req,
                help='maximum concurrent requests (default={})'
                      .format(default_concur_req))
    parser.add_argument('-s', '--server', metavar='LABEL',
                default=DEFAULT_SERVER,
                help='Server to hit; one of {} (default={})'
                      .format(server_options, DEFAULT_SERVER))
    parser.add_argument('-v', '--verbose', action='store_true',
                help='output detailed progress info')
    args = parser.parse_args()
    if args.max_req < 1:
        print('*** Usage error: --max_req CONCURRENT must be >= 1')
        parser.print_usage()
        sys.exit(1)
    if args.limit < 1:
        print('*** Usage error: --limit N must be >= 1')
        parser.print_usage()
        sys.exit(1)
    args.server = args.server.upper()
    if args.server not in SERVERS:
        print('*** Usage error: --server LABEL must be one of',
              server_options)
        parser.print_usage()
        sys.exit(1)
    try:
        cc_list = expand_cc_args(args.every, args.all, args.cc, args.limit)
    except ValueError as exc:
        print(exc.args[0])
        parser.print_usage()
        sys.exit(1)

    if not cc_list:
        cc_list = sorted(POP20_CC)
    return args, cc_list


def main(download_many, default_concur_req, max_concur_req):
    args, cc_list = process_args(default_concur_req)
    actual_req = min(args.max_req, max_concur_req, len(cc_list))
    initial_report(cc_list, actual_req, args.server)
    base_url = SERVERS[args.server]
    t0 = time.time()
    counter = download_many(cc_list, base_url, args.verbose, actual_req)
    assert sum(counter.values()) == len(cc_list), \
        'some downloads are unaccounted for'
    final_report(cc_list, counter, t0)


Overwriting flags2_common.py


In [9]:
%%writefile flags2_sequential.py

"""Download flags of countries (with error handling).

Sequential version

Sample run::

    $ python3 flags2_sequential.py -s DELAY b
    DELAY site: http://localhost:8002/flags
    Searching for 26 flags: from BA to BZ
    1 concurrent connection will be used.
    --------------------
    17 flags downloaded.
    9 not found.
    Elapsed time: 13.36s

"""

import collections

import requests
import tqdm

from flags2_common import main, save_flag, HTTPStatus, Result


DEFAULT_CONCUR_REQ = 1
MAX_CONCUR_REQ = 1

# BEGIN FLAGS2_BASIC_HTTP_FUNCTIONS
def get_flag(base_url, cc):
    url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
    resp = requests.get(url)
    if resp.status_code != 200:  # <1>
        resp.raise_for_status()
    return resp.content


def download_one(cc, base_url, verbose=False):
    try:
        image = get_flag(base_url, cc)
    except requests.exceptions.HTTPError as exc:  # <2>
        res = exc.response
        if res.status_code == 404:
            status = HTTPStatus.not_found  # <3>
            msg = 'not found'
        else:  # <4>
            raise
    else:
        save_flag(image, cc.lower() + '.gif')
        status = HTTPStatus.ok
        msg = 'OK'

    if verbose:  # <5>
        print(cc, msg)

    return Result(status, cc)  # <6>
# END FLAGS2_BASIC_HTTP_FUNCTIONS

# BEGIN FLAGS2_DOWNLOAD_MANY_SEQUENTIAL
def download_many(cc_list, base_url, verbose, max_req):
    counter = collections.Counter()  # <1>
    cc_iter = sorted(cc_list)  # <2>
    if not verbose:
        cc_iter = tqdm.tqdm(cc_iter)  # <3>
    for cc in cc_iter:  # <4>
        try:
            res = download_one(cc, base_url, verbose)  # <5>
        except requests.exceptions.HTTPError as exc:  # <6>
            error_msg = 'HTTP error {res.status_code} - {res.reason}'
            error_msg = error_msg.format(res=exc.response)
        except requests.exceptions.ConnectionError as exc:  # <7>
            error_msg = 'Connection error'
        else:  # <8>
            error_msg = ''
            status = res.status

        if error_msg:
            status = HTTPStatus.error  # <9>
        counter[status] += 1  # <10>
        if verbose and error_msg: # <11>
            print('*** Error for {}: {}'.format(cc, error_msg))

    return counter  # <12>
# END FLAGS2_DOWNLOAD_MANY_SEQUENTIAL

if __name__ == '__main__':
    main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)

Overwriting flags2_sequential.py


In [10]:
%%writefile flags2_threadpool.py

"""Download flags of countries (with error handling).

ThreadPool version

Sample run::

    $ python3 flags2_threadpool.py -s ERROR -e
    ERROR site: http://localhost:8003/flags
    Searching for 676 flags: from AA to ZZ
    30 concurrent connections will be used.
    --------------------
    150 flags downloaded.
    361 not found.
    165 errors.
    Elapsed time: 7.46s

"""

# BEGIN FLAGS2_THREADPOOL
import collections
from concurrent import futures

import requests
import tqdm  # <1>

from flags2_common import main, HTTPStatus  # <2>
from flags2_sequential import download_one  # <3>

DEFAULT_CONCUR_REQ = 30  # <4>
MAX_CONCUR_REQ = 1000  # <5>


def download_many(cc_list, base_url, verbose, concur_req):
    counter = collections.Counter()
    with futures.ThreadPoolExecutor(max_workers=concur_req) as executor:  # <6>
        to_do_map = {}  # <7>
        for cc in sorted(cc_list):  # <8>
            future = executor.submit(download_one,
                            cc, base_url, verbose)  # <9>
            to_do_map[future] = cc  # <10>
        done_iter = futures.as_completed(to_do_map)  # <11>
        if not verbose:
            done_iter = tqdm.tqdm(done_iter, total=len(cc_list))  # <12>
        for future in done_iter:  # <13>
            try:
                res = future.result()  # <14>
            except requests.exceptions.HTTPError as exc:  # <15>
                error_msg = 'HTTP {res.status_code} - {res.reason}'
                error_msg = error_msg.format(res=exc.response)
            except requests.exceptions.ConnectionError as exc:
                error_msg = 'Connection error'
            else:
                error_msg = ''
                status = res.status

            if error_msg:
                status = HTTPStatus.error
            counter[status] += 1
            if verbose and error_msg:
                cc = to_do_map[future]  # <16>
                print('*** Error for {}: {}'.format(cc, error_msg))

    return counter


if __name__ == '__main__':
    main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)
# END FLAGS2_THREADPOOL

Overwriting flags2_threadpool.py
