## Executor.map() 실험
* 사용하기 쉬움
* 호출 순서대로 결과 반환
* 완료되는 대로 결과를 가져오려면 `Executor.submit()` 메서드와 `futures.as_completed()` 함수 사용해야 함
    * `Executor.map()`이 여러 인수에 동일 콜러블을 실행하도록 설계되어 있고, `submit()`은 다양한 콜러블과 인수를 제출할 수 있다는 점에서 보다 유연

In [1]:
from time import sleep, strftime
from concurrent import futures


# 출력을 위한 함수, 인수 앞에 타임스탬프 찍음
def display(*args):
    print(strftime('[%H:%M:%S]'), end=' ')
    print(*args)
    
# 인수 n만큼 sleep하고 n만큼 탭을 붙여 메시지 출력
def loiter(n):
    msg = '{}loiter({}): doing nothing for {}s...'
    display(msg.format('\t'*n, n, n))
    sleep(n)
    msg = '{}loiter({}): done'
    display(msg.format('\t'*n, n))
    return n * 10 # 결과 가져오는 방법을 보여주기 위해 반환

def main():
    display('Script starting.')
    executor = futures.ThreadPoolExecutor(max_workers=3) # 스레드 3개 생성
    results = executor.map(loiter, range(5)) # 5개 작업 요청, map() 논블로킹 메서드
    display('results: ', results)
    display('Waiting for individual results: ')
    for i, result in enumerate(results):
        display('result {}: {}'.format(i, result))

In [2]:
main()

[04:53:53] Script starting.
[04:53:53] loiter(0): doing nothing for 0s...
[04:53:53] loiter(0): done
[04:53:53] 	loiter(1): doing nothing for 1s...
[04:53:53] 		loiter(2): doing nothing for 2s...
[04:53:53][04:53:53] 			loiter(3): doing nothing for 3s...
 results:  <generator object Executor.map.<locals>.result_iterator at 0x7f0db2f88eb8>
[04:53:53] Waiting for individual results: 
[04:53:53] result 0: 0
[04:53:54] 	loiter(1): done
[04:53:54] 				loiter(4): doing nothing for 4s...
[04:53:54] result 1: 10
[04:53:55] 		loiter(2): done
[04:53:55] result 2: 20
[04:53:56] 			loiter(3): done
[04:53:56] result 3: 30
[04:53:58] 				loiter(4): done
[04:53:58] result 4: 40


* 동기 vs 비동기
    * 동기
        * 요청과 결과가 동시에 일어남
        * 함수의 결과를 호출한 쪽에서 처리
        * 대부분 함수는 동기 방식
    * 비동기
        * 요청과 결과가 동시에 일어나지 않음
        * 함수의 결과를 호출한 쪽에서 처리하지 않음
* 블로킹 vs 논블로킹
    * 블로킹
        * 자신의 수행결과까지 제어권을 가짐
    * 논블로킹
        * 자신이 호출되었을 때 제어권을 자신을 호출한 쪽으로 넘김
        * 자신을 호출한 쪽에서 다른 일 할 수 있음
* 일반적으로 동기 + 블로킹 / 비동기 + 논블로킹 조합이 많음
* [출처](https://victorydntmd.tistory.com/8)

## 진행 상황 출력하고 에러를 처리하며 내려받기
* tqdm 패키지 이용
    * tqdm 패키지는 텍스트 기반 진행 막대 애니메이트
    * 반복형을 인수로 받아 처리하며 남은 시간 추정해 보여줌
* [시연 영상](https://www.youtube.com/watch?v=M8Z65tAl5l4)
* 명령행 인터페이스 제공
    * `-s / --server` 옵션으로 url 지정
        * `LOCAL`: 기본값, http://localhost:8001/flags 사용
        * `REMOTE`: `http://flupy.org/data/flags` 사용, 너무 많은 요청을 동시에 보내지 않도록 유의, CDN에 의해 처음 이후 빨라짐
        * `DELAY`: http://localhost:8002/flags 사용, 응답 지연을 위해 Vaurien 프록시 사용
        * `ERROR`: http://localhost:8003/flags 사용, http 에러 발생시키고 응답을 지연시키는 프록시는 8003에서 처리, Vaurien 설정 변경해 적용

In [13]:
"""
flags2_common
Utilities for second set of flag examples.
"""

import os
import time
import sys
import string
import argparse
from collections import namedtuple
from enum import Enum


Result = namedtuple('Result', 'status data')

HTTPStatus = Enum('Status', 'ok not_found error')

POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
            'MX PH VN ET EG DE IR TR CD FR').split()

DEFAULT_CONCUR_REQ = 1
MAX_CONCUR_REQ = 1

SERVERS = {
    'REMOTE': 'http://flupy.org/data/flags',
    'LOCAL':  'http://localhost:8001/flags',
    'DELAY':  'http://localhost:8002/flags',
    'ERROR':  'http://localhost:8003/flags',
}
DEFAULT_SERVER = 'LOCAL'

DEST_DIR = 'downloads/'
COUNTRY_CODES_FILE = 'country_codes.txt'


def save_flag(img, filename):
    path = os.path.join(DEST_DIR, filename)
    with open(path, 'wb') as fp:
        fp.write(img)


def initial_report(cc_list, actual_req, server_label):
    if len(cc_list) <= 10:
        cc_msg = ', '.join(cc_list)
    else:
        cc_msg = 'from {} to {}'.format(cc_list[0], cc_list[-1])
    print('{} site: {}'.format(server_label, SERVERS[server_label]))
    msg = 'Searching for {} flag{}: {}'
    plural = 's' if len(cc_list) != 1 else ''
    print(msg.format(len(cc_list), plural, cc_msg))
    plural = 's' if actual_req != 1 else ''
    msg = '{} concurrent connection{} will be used.'
    print(msg.format(actual_req, plural))


def final_report(cc_list, counter, start_time):
    elapsed = time.time() - start_time
    print('-' * 20)
    msg = '{} flag{} downloaded.'
    plural = 's' if counter[HTTPStatus.ok] != 1 else ''
    print(msg.format(counter[HTTPStatus.ok], plural))
    if counter[HTTPStatus.not_found]:
        print(counter[HTTPStatus.not_found], 'not found.')
    if counter[HTTPStatus.error]:
        plural = 's' if counter[HTTPStatus.error] != 1 else ''
        print('{} error{}.'.format(counter[HTTPStatus.error], plural))
    print('Elapsed time: {:.2f}s'.format(elapsed))


def expand_cc_args(every_cc, all_cc, cc_args, limit):
    codes = set()
    A_Z = string.ascii_uppercase
    if every_cc:
        codes.update(a+b for a in A_Z for b in A_Z)
    elif all_cc:
        with open(COUNTRY_CODES_FILE) as fp:
            text = fp.read()
        codes.update(text.split())
    else:
        for cc in (c.upper() for c in cc_args):
            if len(cc) == 1 and cc in A_Z:
                codes.update(cc+c for c in A_Z)
            elif len(cc) == 2 and all(c in A_Z for c in cc):
                codes.add(cc)
            else:
                msg = 'each CC argument must be A to Z or AA to ZZ.'
                raise ValueError('*** Usage error: '+msg)
    return sorted(codes)[:limit]


def process_args(default_concur_req):
    server_options = ', '.join(sorted(SERVERS))
    parser = argparse.ArgumentParser(
                description='Download flags for country codes. '
                'Default: top 20 countries by population.')
    parser.add_argument('cc', metavar='CC', nargs='*',
                help='country code or 1st letter (eg. B for BA...BZ)')
    parser.add_argument('-a', '--all', action='store_true',
                help='get all available flags (AD to ZW)')
    parser.add_argument('-e', '--every', action='store_true',
                help='get flags for every possible code (AA...ZZ)')
    parser.add_argument('-l', '--limit', metavar='N', type=int,
                help='limit to N first codes', default=sys.maxsize)
    parser.add_argument('-m', '--max_req', metavar='CONCURRENT', type=int,
                default=default_concur_req,
                help='maximum concurrent requests (default={})'
                      .format(default_concur_req))
    parser.add_argument('-s', '--server', metavar='LABEL',
                default=DEFAULT_SERVER,
                help='Server to hit; one of {} (default={})'
                      .format(server_options, DEFAULT_SERVER))
    parser.add_argument('-v', '--verbose', action='store_true',
                help='output detailed progress info')
    args = parser.parse_args()
    if args.max_req < 1:
        print('*** Usage error: --max_req CONCURRENT must be >= 1')
        parser.print_usage()
        sys.exit(1)
    if args.limit < 1:
        print('*** Usage error: --limit N must be >= 1')
        parser.print_usage()
        sys.exit(1)
    args.server = args.server.upper()
    if args.server not in SERVERS:
        print('*** Usage error: --server LABEL must be one of',
              server_options)
        parser.print_usage()
        sys.exit(1)
    try:
        cc_list = expand_cc_args(args.every, args.all, args.cc, args.limit)
    except ValueError as exc:
        print(exc.args[0])
        parser.print_usage()
        sys.exit(1)

    if not cc_list:
        cc_list = sorted(POP20_CC)
    return args, cc_list


def main(download_many, default_concur_req, max_concur_req):
    args, cc_list = process_args(default_concur_req)
    actual_req = min(args.max_req, max_concur_req, len(cc_list))
    initial_report(cc_list, actual_req, args.server)
    base_url = SERVERS[args.server]
    t0 = time.time()
    counter = download_many(cc_list, base_url, args.verbose, actual_req)
    assert sum(counter.values()) == len(cc_list), \
        'some downloads are unaccounted for'
    final_report(cc_list, counter, t0)

In [14]:
# 에러 처리
# flag2_sequential.py
import tqdm
import requests
import collections


def get_flag(base_url, cc):
    url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
    resp = requests.get(url)
    if resp.status_code != 200:
        rep.raise_for_status() # http status code 200 아닌 경우 에러 발생
    return resp.content

def download_one(cc, base_url, verbose=False):
    try:
        image = get_flag(base_url, cc)
    except requests.exceptions.HTTPError as exec:
        res = exec.response
        if res.status_code == 404:
            status = HTTPStatus.not_found
            msg = 'not found'
        else:
            raise # http status code 4040 예외 제외 예외 다시 발생시켜 호출자에 전달
    else:
        save_flag(image, cc.lower() + '.gif')
        status = HTTPStatus.ok
        msg = 'OK'
        
    if verbose:
        print(cc, msg)
        
    return Result(status, cc)

def download_many(cc_list, base_url, verbose, max_req):
    counter = collections.Counter()
    cc_iter = sorted(cc_list)
    if not verbose:
        cc_iter = tqdm.tqdm(cc_iter)
    for cc in cc_iter:
        try:
            res = download_one(cc, base_url, verbose)
        except requests.HTTPError as exc:
            error_msg = 'HTTP erorr {res.status_code} - {res.reason}'
            error_msg = error_mgs.format(res=exc.response)
        except requests.exceptions.ConnectionError as exc:
            error_msg = 'Connection error'
        else:
            error_msg = ''
            status = res.status
            
        if error_msg:
            status = HTTPStatus.error
            
        # counter 객체는 각 처리 결과 발생 빈도수를 구함
        # 빈도수는 main에 반환해 최종 보고에 출력 
        counter[status] += 1
        if verbose and error_msg:
            print('*** Error for {}: {}'.format(cc, error_msg))
            
        return counter

In [15]:
# flag2_threadpool
# download_many() 함수만 구현 (나머지는 다른 모듈에서 가져옴)
# futures.as_completed() 사용
from concurrent import futures


DEFAULT_CONCUR_REQ = 30
MAX_CONCUR_REQ = 1000

def download_many(cc_list, base_url, verbose, concur_req):
    counter = collections.Counter()
    with futures.ThreadPoolExecutor(max_workers=concur_req) as executor:
        to_do_map = {}
        for cc in sorted(cc_list):
            future = executor.submit(download_one, cc, base_url, verbose)
            # future 객체와 국가 코드와 매핑해 future 객체 결과 가져와 처리가 쉬움
            to_do_map[future] = cc
        do_iter = futures.as_completed(to_do_map)
        if not verbose:
            done_iter = tqdm.tqdm(done_iter, total_len(cc_list))
        for future in done_iter:
            try:
                res = function.result()
            except requests.exceptions.HTTPError as exc:
                error_msg = 'HTTP erorr {res.status_code} - {res.reason}'
                error_msg = error_mgs.format(res=exc.response)
            except requests.exceptions.ConnectionError as exc:
                error_msg = 'Connection error'
            else:
                error_msg = ''
                status = res.status
            
        if error_msg:
            status = HTTPStatus.error
        counter[status] += 1
        if verbose and error_msg:
            cc = to_do_map[future]
            print('*** Error for {}: {}'.format(cc, error_msg))
            
        return counter

* [http status code 참고 자료](https://ko.wikipedia.org/wiki/HTTP_%EC%83%81%ED%83%9C_%EC%BD%94%EB%93%9C)
* `thread` 모듈 사용 중단 
* `threading` 모듈 사용 권장
* 스레드 간 데이터 전송 `queue` 모듈 사용
* 어플리케이션 구조가 `futures.ProcessPoolExecutor`에 안 맞는 경우 `multiprocessing` 사용
* [GIL 참고 자료](https://docs.python.org/ko/3/glossary.html#term-global-interpreter-lock)

* PEP3148
    * Executor 
        * callable과 그 인자로 비동기 요청을 받음
        * Future 반환
    * Future
        * 요청 실행한 바를 표현
    * [출처](https://www.python.org/dev/peps/pep-3148/)
    * [멀티프로세스 관련 PEP](https://www.python.org/dev/peps/pep-0371/)