## 17. Future를 이용한 동시성
스레드를 혹사하는 사람들은 일반적인 애플리케이션 프로그래머들이 평생 보지도 못할 유스케이스를 마음속에 담고 있는 시스템 프로그래머들이 대부분이다. 애플리케이션 프로그래머들의 99%는 여러 독립적인 스레드를 생성하고 결과를 큐에 수집하는 방법만 알고 있으면 된다. - 미셀 시미오나토, 파이썬 철학자 -

### 17.1 예제: 세가지 스타일의 웹 내려받기
순차적 작성 방식은 스레드를 사용하거나 asyncio를 사용하는 것보다 현저하게 속도가 떨어진다. asyncio는 18장에서 설명한다.

#### 17.1.1 순차 내려받기 스크립트
순차 내려받기에는 그리 새로운 것은 없다. 다른 스크립트와 비교를 하귀 위한 기준선이다.

In [1]:
""" [예제 17-2] 순차 내려받기 스크립트, 몇몇 함수는 다른 스크립트에서 재사용할 것이다. """
import os
import time
import sys

import requests # 표준 라이브러리에 속해 있지 않으므로 관례에 따라 os, time, sys 표준 라이브러리 모듈을
                # 먼저 임포트 하고 한 줄 띄운 후 임포트한다.

POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
            'MX PH VN ET EG DE IR TR CD FR').split()

BASE_URL = 'http://flupy.org/data/flags' # 국기 이미지를 갖고 있는 웹사이트
DEST_DIR = 'downloads/'                  # 이미지를 저장할 디렉토리

def save_flag(img, filename):
    """ 
    img(바이트 시퀀스)를 DEST_DIR 안의 filename으로 저장
    """
    path = os.path.join(DEST_DIR, filename)
    with open(path, 'wb') as fp:
        fp.write(img)
        
def get_flag(cc):
    """
    국가 코드를 인수로 받아서 URL을 만들고 이미지(이진 시퀀스)를 받아 리턴한다.
    """
    url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
    resp = requests.get(url)
    return resp.content

def show(text):
    """
    진행 상황을 한 줄로 출력한다.
    파이썬은 개행 문자를 받기 전까지 문자열을 출력하지 않으므로 sys.stdout.flush()를 호출해서 
    버퍼에 남아있는 내용을 모두 화면에 출력하게 한다.
    """
    print(text, end=' ')
    sys.stdout.flush()
    
def download_many(cc_list):
    """
    국가 코드를 알파벳순으로 반복해서 이미지를 내려받기, 진행상황 표시, 저장을 진행한다.
    (동시성으로 개선될 핵심 부분이다.)
    """
    for cc in sorted(cc_list):
        image = get_flag(cc)
        show(cc)
        save_flag(image, cc.lower() + '.gif')
    return len(cc_list)
    
def main(download_many):
    t0 = time.time()
    count = download_many(POP20_CC)
    elapsed = time.time() - t0
    msg = '\n{} flags downloaded in {:.2f}s'
    print(msg.format(count, elapsed))
    
if __name__ == '__main__':
    main(download_many)

BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN 
20 flags downloaded in 8.33s


#### 17.1.2 concurrent.futures로 내려받기
concurrent.futures 패키지의 가장 중요한 기능은 ThreadPoolExecutor와 ProcessPoolExecutor클래스가 담당한다. 이 클래스들은 콜러블 객체를 서로 다른 스레드나 프로세스에서 실행할 수 있게 해주는 인터페이스를 구현한다. 이 클래스들은 작업자 스레드나 작업자 프로세스를 관리하는 풀과 실행할 작업을 담은 큐를 가지고 있다. 따라서 아주 고수준의 인터페이스를 구현하고 있기 때문에 국기를 내려받는 간단한 프로그램을 구현할 때는 내부 작동과정을 알 필요가 없다.

In [3]:
""" [예제 17-3] futures.ThreadPoolExecutor()로 스레드화된 내려받기 스크립트 """
from concurrent import futures

MAX_WORKERS = 20 # 최대 스레드 수

def download_one(cc):
    image = get_flag(cc)
    show(cc)
    save_flag(image, cc.lower() + '.gif')
    return cc

def download_many(cc_list):
    workers = min(MAX_WORKERS, len(cc_list))
    
    # executor.__exit__() 메서드는 executor.shutdown(wait=True) 메서드를 호출하는데, 이 메서드는 모든 스레드가 완료될 때까지 블록된다.
    with futures.ThreadPoolExecutor(workers) as executor:
        # 여러 스레드에 의해 download_one 함수가 동시에 호출된다는 것을 제외하면 내장된 map() 함수와 유사하다
        # map() 메서드는 각 함수가 반환한 값을 가져올 수 있도록 반복할 수 있는 제너레이터를 반환한다.
        res = executor.map(download_one, sorted(cc_list))
        
    return len(list(res)) # 가져온 결과의 수를 반환한다. 스레드 중 하나라도 예외를 발생시키면 여기에서 예외가 발생한다. 

if __name__ == '__main__':
    main(download_many)

DE VNRU NG TR MX  FR EG PK IN ID BR BD CN JP ET PH CD US IR 
20 flags downloaded in 0.72s


#### 17.1.3 Future는 어디에 있나?
Future는 concurrent.futures와 asyncio의 내부에 있는 핵심 컴포넌트이지만 예제 17-3와 같이 사용자에게 드러나지 않는 경우가 있다. 이 절에서는 전반적인 Future의 특징에 대해 설명하고, Future를 이용한 예제 코드를 구현해본다. 


In [3]:
""" [예제 17-4] download_may() 함수 안의 executor.map()을 executor.submit()과 futures.as_completed()로 대체하기 """

def download_many(cc_list):
    cc_list = cc_list[:5] # 인구가 많이 5개국만 사용한다.
    
    with futures.ThreadPoolExecutor(max_workers=3) as executor: # 대기 중인 Future 객체를 출력해서 살펴보기 위해 max_workers를 3으로 하드코딩 한다.
        to_do = []
        for cc in sorted(cc_list): # 결과 순서가 바뀌는 것을 확인하기 위해 정렬한다.
            future = executor.submit(download_one, cc) # 콜러블이 실행되도록 스케줄링하고 이 작업을 나타내는 Future 객체를 반환한다.
            to_do.append(future)
            msg = 'Scheduled for {}: {}'
            print(msg.format(cc, future))
            
        results = []
        for future in futures.as_completed(to_do): # as_completed()는 완료된 Future 객체를 생성한다.
            res = future.result()                  # 결과를 가져온다
            msg = '{} result: {!r}'                
            print(msg.format(future, res))         # 결과를 출력한다.
            results.append(res)
            
    return len(results)

if __name__ == '__main__':
    main(download_many)

Scheduled for BR: <Future at 0x7f08a089abe0 state=running>
Scheduled for CN: <Future at 0x7f08a00663c8 state=running>
Scheduled for ID: <Future at 0x7f08b80e7978 state=running>
Scheduled for IN: <Future at 0x7f08a3167cc0 state=pending>
Scheduled for US: <Future at 0x7f08a3167ef0 state=pending>
CN <Future at 0x7f08a00663c8 state=finished returned str> result: 'CN'
ID <Future at 0x7f08b80e7978 state=finished returned str> result: 'ID'
BR <Future at 0x7f08a089abe0 state=finished returned str> result: 'BR'
IN <Future at 0x7f08a3167cc0 state=finished returned str> result: 'IN'
US <Future at 0x7f08a3167ef0 state=finished returned str> result: 'US'

5 flags downloaded in 0.48s


+ Future 객체가 알파벳순으로 스케줄링되었다. Future 객체의 상태를 보면 작업자 스레드 수를 최대 3으로 설정했기 때문에 처음 세 개만 실행중이다. 
+ 마지막 두 개의 Future객체는 대기 중이다.
+ 주 스레드의 download_many( ) 에서 첫 스레드 BR의 결과를 출력하기 전에 ID, BR 스레드가 국가 코드를 먼저 출력한다.

### 17.2 블로킹 I/O와 GIL
CPython 인터프리터는 내부적으로 <a href="https://ko.wikipedia.org/wiki/%EC%8A%A4%EB%A0%88%EB%93%9C_%EC%95%88%EC%A0%84">스레드 안전</a>하지 않으므로 전역 인터프리터 락(GIL)을 가지고 있으며, 한 번에 한 스레드만 파이썬 바이트코드를 실행하도록 제한한다. 그렇기 때문에 단일 파이썬 프로세스가 동시에 다중 CPU 코어를 사용할 수 없다.

그렇지만 내장 함수나 C로 작성된 확장은 시간이 오래 걸리는 작업을 실행할 때 GIL을 해제할 수 있다(단지 제작자가 그렇게 구현하지 않을 뿐이다). 파이썬 표준 라이브러리의 모든 <a href="https://tech.peoplefund.co.kr/2017/08/02/non-blocking-asynchronous-concurrency.html">블로킹 입출력 함수</a>는 GIL를 해제해서 다른 스레드가 실행할 수 있게 한다. time.sleep() 함수도 GIL을 해제한다. 따라서 GIL을 사용하고 있더라도 파이썬 스레드는 입출력 위주의 애플리케이션에서는 엄청난 효용성이 있다.

### 17.3 concurrent.future로 프로세스 실행하기
ProcessPoolExecutor와 ThreadPoolExecutor는 모두 범용 Excutor 인터페이스를 구현하므로 concurrent.future를 사용하는 경우에는 스레드 기반 프로그램을 프로세스 기반의 프로그램으로 쉽게 변환할 수 있다. 그러나 국기를 내려받는 프로그램처럼 입출력 위주의 작업에서는 도움이 되지 않는다.

In [4]:
def download_many(cc_list):
    """
    ThreadPoolExecutor를 ProcessPoolExecutor로 변경
    인수는 os.cpu_count()가 반환하는 값을 사용하므로 대부분 직접 설정하지 않는다.
    """
    with futures.ProcessPoolExecutor() as executor:
        res = executor.map(download_one, sorted(cc_list))
        
    return len(list(res)) # 가져온 결과의 수를 반환한다. 스레드 중 하나라도 예외를 발생시키면 여기에서 예외가 발생한다. 

main(download_many)

CN CD BR DE FR BD EG ET ID IN MX PH JP NG PK IR RU TR VN US 
20 flags downloaded in 0.70s


### 17.4 Executor.map( ) 실험
Exectuor.map()은 사용하기 쉽지만, 호출한 순서 그대로 결과를 반환하는 특징이 있다. 이러한 특징은 상황에 따라 도움이 되기도 하고 아닐 수도 있다. 더 진행하기 전에 모든 결과가 필요한 경우라면 이 특징은 문제가 되지 않지만, submit()한 순서와 상관없이 완료되는 대로 결과를 가져오는 게 더 좋은 경우도 있다. 

완료되는 대로 결과를 가져오려면 [예제 17-4]에서 본 것처럼 Executor.submit() 메서드와 futures.as_completed() 함수를 함께 사용해야 한다. 
```
<TIP>
submit()이 다양한 콜로블과 인수를 제출할 수 있는 반면 executor.map()은 여러 인수에 동일한 콜러블을 실행하도록 설계되어 있으므로 executor.submit() / futures.as_completed() 조합이 executor.map()보다 융통성이 높다. 게다가 일부는 ThreadPoolExecutor 객체에서 다른 일부는 ProcessPoolExecutor 객체에서 가져오는 등 여러 실행자에서 가져온 Future 객체의 집합을 futures.as_completed()에 전달할 수 있다.
```

In [5]:
""" <예제 17-6> ThreadPoolExecutor.map() 메서드 사용 예 """
from time import sleep, strftime
from concurrent import futures

def display(*args):
    """
    자신이 받은 인수 앞에 [HH:MM:SS] 포맷의 타임스팸프를 찍어서 출력
    """
    print(strftime('[%H:%M:%S]'), end=' ')
    print(*args)
    
def loiter(n):
    """
    시작할 때 메세지를 출력하고 인수로 받은 n초 동안 잠자고 마지막 메세지를 출력
    """
    msg = '{}loiter({}): doing nothing for {}s...'
    display(msg.format('\t'*n, n, n))
    sleep(n)
    msg = '{}loiter({}): done'
    display(msg.format('\t'*n, n))
    return n * 10

def main():
    display('Script starting.')
    executor = futures.ThreadPoolExecutor(max_workers=3) # 5개의 작업을 요청했으나, 3개만 먼저 실행된다. 
    results = executor.map(loiter, range(5))
    display('result:', results) # 제너레이터가 생성된다.
    display('Waiting for individal results:')
    for i, result in enumerate(results):
        display('result {}: {}'.format(i, result)) # 결과를 순차적으로 반환한다.

if __name__ == '__main__':
    main()

[23:19:09] Script starting.
[23:19:09] loiter(0): doing nothing for 0s...
[23:19:09] loiter(0): done
[23:19:09][23:19:09] 		loiter(2): doing nothing for 2s...
 	loiter(1): doing nothing for 1s...
[23:19:09] result: <generator object Executor.map.<locals>.result_iterator at 0x7f08a314f9e8>
[23:19:09] Waiting for individal results:
[23:19:09] result 0: 0
[23:19:09] 			loiter(3): doing nothing for 3s...
[23:19:10] 	loiter(1): done
[23:19:10] 				loiter(4): doing nothing for 4s...
[23:19:10] result 1: 10
[23:19:11] 		loiter(2): done
[23:19:11] result 2: 20
[23:19:12] 			loiter(3): done
[23:19:12] result 3: 30
[23:19:14] 				loiter(4): done
[23:19:14] result 4: 40


### 17.5 진행 상황을 출력하고 에러를 처리하며 내려받기
다양한 에러 조건의 처리를 테스트하기 위해 다음과 같은 flags2 예제들을 만들었다.

In [4]:
"""Utilities for second set of flag examples.
"""

import os
import time
import sys
import string
import argparse
from collections import namedtuple
from enum import Enum


Result = namedtuple('Result', 'status data')

HTTPStatus = Enum('Status', 'ok not_found error')

POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
            'MX PH VN ET EG DE IR TR CD FR').split()

DEFAULT_CONCUR_REQ = 1
MAX_CONCUR_REQ = 1

SERVERS = {
    'REMOTE': 'http://flupy.org/data/flags',
    'LOCAL':  'http://localhost:8001/flags',
    'DELAY':  'http://localhost:8002/flags',
    'ERROR':  'http://localhost:8003/flags',
}
DEFAULT_SERVER = 'LOCAL'

DEST_DIR = 'downloads/'
COUNTRY_CODES_FILE = 'country_codes.txt'


def save_flag(img, filename):
    path = os.path.join(DEST_DIR, filename)
    with open(path, 'wb') as fp:
        fp.write(img)


def initial_report(cc_list, actual_req, server_label):
    if len(cc_list) <= 10:
        cc_msg = ', '.join(cc_list)
    else:
        cc_msg = 'from {} to {}'.format(cc_list[0], cc_list[-1])
    print('{} site: {}'.format(server_label, SERVERS[server_label]))
    msg = 'Searching for {} flag{}: {}'
    plural = 's' if len(cc_list) != 1 else ''
    print(msg.format(len(cc_list), plural, cc_msg))
    plural = 's' if actual_req != 1 else ''
    msg = '{} concurrent connection{} will be used.'
    print(msg.format(actual_req, plural))


def final_report(cc_list, counter, start_time):
    elapsed = time.time() - start_time
    print('-' * 20)
    msg = '{} flag{} downloaded.'
    plural = 's' if counter[HTTPStatus.ok] != 1 else ''
    print(msg.format(counter[HTTPStatus.ok], plural))
    if counter[HTTPStatus.not_found]:
        print(counter[HTTPStatus.not_found], 'not found.')
    if counter[HTTPStatus.error]:
        plural = 's' if counter[HTTPStatus.error] != 1 else ''
        print('{} error{}.'.format(counter[HTTPStatus.error], plural))
    print('Elapsed time: {:.2f}s'.format(elapsed))


def expand_cc_args(every_cc, all_cc, cc_args, limit):
    codes = set()
    A_Z = string.ascii_uppercase
    if every_cc:
        codes.update(a+b for a in A_Z for b in A_Z)
    elif all_cc:
        with open(COUNTRY_CODES_FILE) as fp:
            text = fp.read()
        codes.update(text.split())
    else:
        for cc in (c.upper() for c in cc_args):
            if len(cc) == 1 and cc in A_Z:
                codes.update(cc+c for c in A_Z)
            elif len(cc) == 2 and all(c in A_Z for c in cc):
                codes.add(cc)
            else:
                msg = 'each CC argument must be A to Z or AA to ZZ.'
                raise ValueError('*** Usage error: '+msg)
    return sorted(codes)[:limit]


def process_args(default_concur_req):
    server_options = ', '.join(sorted(SERVERS))
    parser = argparse.ArgumentParser(
                description='Download flags for country codes. '
                'Default: top 20 countries by population.')
    parser.add_argument('cc', metavar='CC', nargs='*',
                help='country code or 1st letter (eg. B for BA...BZ)')
    parser.add_argument('-a', '--all', action='store_true',
                help='get all available flags (AD to ZW)')
    parser.add_argument('-e', '--every', action='store_true',
                help='get flags for every possible code (AA...ZZ)')
    parser.add_argument('-l', '--limit', metavar='N', type=int,
                help='limit to N first codes', default=sys.maxsize)
    parser.add_argument('-m', '--max_req', metavar='CONCURRENT', type=int,
                default=default_concur_req,
                help='maximum concurrent requests (default={})'
                      .format(default_concur_req))
    parser.add_argument('-s', '--server', metavar='LABEL',
                default=DEFAULT_SERVER,
                help='Server to hit; one of {} (default={})'
                      .format(server_options, DEFAULT_SERVER))
    parser.add_argument('-v', '--verbose', action='store_true',
                help='output detailed progress info')
    args = parser.parse_args()
    if args.max_req < 1:
        print('*** Usage error: --max_req CONCURRENT must be >= 1')
        parser.print_usage()
        sys.exit(1)
    if args.limit < 1:
        print('*** Usage error: --limit N must be >= 1')
        parser.print_usage()
        sys.exit(1)
    args.server = args.server.upper()
    if args.server not in SERVERS:
        print('*** Usage error: --server LABEL must be one of',
              server_options)
        parser.print_usage()
        sys.exit(1)
    try:
        cc_list = expand_cc_args(args.every, args.all, args.cc, args.limit)
    except ValueError as exc:
        print(exc.args[0])
        parser.print_usage()
        sys.exit(1)

    if not cc_list:
        cc_list = sorted(POP20_CC)
    return args, cc_list


def main(download_many, default_concur_req, max_concur_req):
    args, cc_list = process_args(default_concur_req)
    actual_req = min(args.max_req, max_concur_req, len(cc_list))
    initial_report(cc_list, actual_req, args.server)
    base_url = SERVERS[args.server]
    t0 = time.time()
    counter = download_many(cc_list, base_url, args.verbose, actual_req)
    assert sum(counter.values()) == len(cc_list), \
        'some downloads are unaccounted for'
    final_report(cc_list, counter, t0)


In [5]:
"""Download flags of countries (with error handling).

Sequential version

Sample run::

    $ python3 flags2_sequential.py -s DELAY b
    DELAY site: http://localhost:8002/flags
    Searching for 26 flags: from BA to BZ
    1 concurrent connection will be used.
    --------------------
    17 flags downloaded.
    9 not found.
    Elapsed time: 13.36s

"""

import collections

import requests
import tqdm

DEFAULT_CONCUR_REQ = 1
MAX_CONCUR_REQ = 1

# BEGIN FLAGS2_BASIC_HTTP_FUNCTIONS
def get_flag(base_url, cc):
    url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
    resp = requests.get(url)
    if resp.status_code != 200:  # <1>
        resp.raise_for_status()
    return resp.content


def download_one(cc, base_url, verbose=False):
    try:
        image = get_flag(base_url, cc)
    except requests.exceptions.HTTPError as exc:  # <2>
        res = exc.response
        if res.status_code == 404:
            status = HTTPStatus.not_found  # <3>
            msg = 'not found'
        else:  # <4>
            raise
    else:
        save_flag(image, cc.lower() + '.gif')
        status = HTTPStatus.ok
        msg = 'OK'

    if verbose:  # <5>
        print(cc, msg)

    return Result(status, cc)  # <6>
# END FLAGS2_BASIC_HTTP_FUNCTIONS

# BEGIN FLAGS2_DOWNLOAD_MANY_SEQUENTIAL
def download_many(cc_list, base_url, verbose, max_req):
    counter = collections.Counter()  # <1>
    cc_iter = sorted(cc_list)  # <2>
    if not verbose:
        cc_iter = tqdm.tqdm(cc_iter)  # <3>
    for cc in cc_iter:  # <4>
        try:
            res = download_one(cc, base_url, verbose)  # <5>
        except requests.exceptions.HTTPError as exc:  # <6>
            error_msg = 'HTTP error {res.status_code} - {res.reason}'
            error_msg = error_msg.format(res=exc.response)
        except requests.exceptions.ConnectionError as exc:  # <7>
            error_msg = 'Connection error'
        else:  # <8>
            error_msg = ''
            status = res.status

        if error_msg:
            status = HTTPStatus.error  # <9>
        counter[status] += 1  # <10>
        if verbose and error_msg: # <11>
            print('*** Error for {}: {}'.format(cc, error_msg))

    return counter  # <12>
# END FLAGS2_DOWNLOAD_MANY_SEQUENTIAL

if __name__ == '__main__':
    sys.argv = [sys.argv[0], '-s', 'REMOTE']
    main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)

  0%|          | 0/20 [00:00<?, ?it/s]

REMOTE site: http://flupy.org/data/flags
Searching for 20 flags: from BD to VN
1 concurrent connection will be used.


100%|██████████| 20/20 [00:09<00:00,  1.89it/s]

--------------------
20 flags downloaded.
Elapsed time: 9.83s





In [10]:
"""Download flags of countries (with error handling).

ThreadPool version

Sample run::

    $ python3 flags2_threadpool.py -s ERROR -e
    ERROR site: http://localhost:8003/flags
    Searching for 676 flags: from AA to ZZ
    30 concurrent connections will be used.
    --------------------
    150 flags downloaded.
    361 not found.
    165 errors.
    Elapsed time: 7.46s

"""

# BEGIN FLAGS2_THREADPOOL
import collections
from concurrent import futures

import requests
import tqdm  # <1>

DEFAULT_CONCUR_REQ = 1  # <4>
MAX_CONCUR_REQ = 1000  # <5>


def download_many(cc_list, base_url, verbose, concur_req):
    counter = collections.Counter()
    with futures.ThreadPoolExecutor(max_workers=concur_req) as executor:  # <6>
        to_do_map = {}  # <7>
        for cc in sorted(cc_list):  # <8>
            future = executor.submit(download_one,
                            cc, base_url, verbose)  # <9>
            to_do_map[future] = cc  # <10>
        done_iter = futures.as_completed(to_do_map)  # <11>
        if not verbose:
            done_iter = tqdm.tqdm(done_iter, total=len(cc_list))  # <12>
        for future in done_iter:  # <13>
            try:
                res = future.result()  # <14>
            except requests.exceptions.HTTPError as exc:  # <15>
                error_msg = 'HTTP {res.status_code} - {res.reason}'
                error_msg = error_msg.format(res=exc.response)
            except requests.exceptions.ConnectionError as exc:
                error_msg = 'Connection error'
            else:
                error_msg = ''
                status = res.status

            if error_msg:
                status = HTTPStatus.error
            counter[status] += 1
            if verbose and error_msg:
                cc = to_do_map[future]  # <16>
                print('*** Error for {}: {}'.format(cc, error_msg))

    return counter


if __name__ == '__main__':
    sys.argv = [sys.argv[0], '-s', 'REMOTE']
    main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)
# END FLAGS2_THREADPOOL

  0%|          | 0/20 [00:00<?, ?it/s]

REMOTE site: http://flupy.org/data/flags
Searching for 20 flags: from BD to VN
1 concurrent connection will be used.


100%|██████████| 20/20 [00:08<00:00,  2.39it/s]

--------------------
20 flags downloaded.
Elapsed time: 8.88s



