# Fluent Python
https://github.com/fluentpython/example-code

파이썬 용어집 

https://docs.python.org/ko/3/glossary.html

In [4]:
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

import pandas as pd
import numpy as np


## CHAPTER 18. Concurrency with asyncio

“concurrency” and “parallelism.”

asyncio 패키지: 이벤트 루프에 의해서 운용되는 코루틴을 이용해 concurrency 를 구현

python 3.4 에 추가됨

### Thread Versus Coroutine: A Comparison

파이썬에는 쓰레드를 종료시키는 API 가 정의되어 있지 않다. 스레드에 메시지를 보내서 종료시켜야한다. 

스레드 대신 @asyncio.coroutine 을 이용해 스레드 동작을 구현한다. 


In [1]:
import threading
import itertools
import time
import sys

# class Signal:
#     go = True
    
@asyncio.coroutine
def spin(msg, signal):
    write, flush = sys.stdout.write, sys.stdout.flush
    for char in itertools.cycle('|/-\\'):
        status = char + ' ' + msg
        write(status)
        flush()
        write('\x08' * len(status))
#         time.sleep(.1)
#         if not signal.go:
#             break
        try:
            yield from asyncio.sleep(.1)
        except asyncio.CancelledError:
            # 스핀이 깨어난후 예외가 발생
            break
    write(' ' * len(status) + '\x08' * len(status))

@asyncio.coroutine
def slow_function():
    # 코루틴이 잠자면서 입출력을 수행하는 체 하는 동안 
    # 이벤트 루프가 진행될 수 있게 하기 위해 yield from 사용 
#    pretend waiting a long time for I/O
#    time.sleep(3)
    # 메인 루프의 제어 흐름을 처리하는데 
    # 메인 루프는 잠자고 난후에 코루틴을 계속 실행 
    yield from asyncio.sleep(3)
    return 42

@asyncio.coroutine
def supervisor():
#     signal = Signal()
#     spinner = threading.Thread(target=spin,
#                     args=('thinking!', signal))
#     print('spinner object:', spinner)
#     spinner.start()
#     result = slow_function()
#     signal.go = False
#     spinner.join()
    # spin() 코루틴 실행을 스케쥴링하고 
    # Task 객체 안에 넣어, task 객체를 즉시 반환
    spinner = asyncio.async(spin('thinking!'))
    # task 객체를 출력 
    print('spinner object:', spinner)
    # 함수를 실행해서 완료되면 값을 가져옴 
    # 그러는 동안 이벤트 루프는 계속 실행됨 
    # sleep(3) 을 실행 후 메인 루프로 제어권을 넘김 
    result = yield from slow_function()
    # task 취소, 코루틴이 중단된 고셍서 Cancelled 예외 발생 
    # 코루틴은 예외르 ㄹ잡아서 지연시키거나 취소요청을 거부 할수 있음 
    spinner.cancel()
    return result

def main():
#     result = supervisor()
    loop = asyncio.get_event_loop()
    result = loop.run_until_complete(supervisor())
    loop.close()
    print('Answer:', result)


NameError: name 'asyncio' is not defined

@asyncio.coroutine 데코레이터
- 필수는 아니지만 일반함수와 구분할 수 있도록 추가 해야한다. 
- 코루틴이 yield from 되지 않고 가지비 컬렉트되는 경우 경고 메시지를 출력해서 디버깅에 도움이 된다.
- 데코레이트된 제너레이터를 자동으로 가동하지 않는다. 


supervisor() 함수 비교
- Task 는 코루틴을 구동하고 Thread 는 콜러블을 호출한다. 
- Task 객체는 직접 생성하지 않고, 코루틴을 asyncio.async() 나 loop.create_task() 에 전달해서 가져온다. 
- Task 객체를 가져오면 이 객체는 이미 asyncio.asyn() 등에 의해 실행이 스케쥴링 되어 있다. Thread 객체는 start() 메서드를 호출해서 실행하라고 명력해야 한다. 
- 스레드화된 supervisor() 에서 slow_function() 은 평범한 함수로서, 스레드에 의해 직접 호출 된다. 
비동기 supervisor() 에서 slow_function() 은 yield from 으로 구동하는 코루틴이다. 
- 스레드는 외부에서 API 를 이용해서 중단시킬 수 없다. 
- loop.run_until_comlete() 로 코루틴을 실행해야 한다. 

코루틴의 경우, 기본적으로 인터럽트로부터 보호된다. yield 를 실행해 줘야 다른 부분이 실행된다. 여러 스레드의 연산을 동기화하기 위해 락을 잠그는대신, 실행되고 있는 코루틴 중 하나만 사용하면된다. 

제어권을 넘겨주고 싶을떈 yield 또는 yield from 을 이용해서 스케줄러에게 넘겨줄 수 있다. 

yield 지점에서 중단되었을 때만 취소 할 수 있다. CancelledError 예외를 처리해서 마무리 하면된다. 



### asyncio.Future: Nonblocking by Design

BaseEventLoop.create_task(): 코루틴을 받아서 실행하기 위해 스케쥴링 하고, asyncio.Task 객체를 반환한다. 

Task : 코루틴을 래핑

asyncio.Future 에서 result() 메서드는 인수를 받지 않는다. 시관초과를 지정할수 없다. 

아직 실행이 완료 되지 않은 Future 객체의 result() 메서드를 호출하면 asyncio.InvalidStateError 가 발생한다. 

asyncio.Futrue 결과를 가져오기 위해 yield from 을 이용한다. 이벤트 루프를 블로킹 하지 않고 작업 완료를 기다리는 과정을 자동으로 처리해준다. 제어권을 넘겨주기 위해 사용한다. 

add_done_callback() 콜백을 호출하지 않고 지연된 작업이 완료되면, 이벤트 루프는 Future 객체의 결과를 설정하고 yield from  표현식은 지연된 코루틴 내부에서 반환된 값을 생성하고 실행을 계속 진행한다. 

asyncio.Future is designed to work with yield from


### Yielding from Futures, Tasks, and Coroutines

Task 객체 가져오는 방법
- asyncio.async(coro_or_future, *, loop=None)
- BaseEventLoop.create_task(coro)


In [2]:
import asyncio
def run_sync(coro_or_future):
    loop = asyncio.get_event_loop()
    return loop.run_until_complete(coro_or_future)

# a = run_sync(some_coroutine())

### Downloading with asyncio and aiohttp


In [4]:
import asyncio
import aiohttp
from flags import BASE_URL, save_flag, show, main

@asyncio.coroutine
def get_flag(cc):
    url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
    # 블로킹 연산은 코루틴됨, yield from 이용해 코루틴에 위임, 코루틴이 비동기 실행
    resp = yield from aiohttp.request('GET', url)
    # 응답내용을 읽는 것은 비동기 연산에서 구현 
    image = yield from resp.read()
    return image

@asyncio.coroutine
def download_one(cc):
    image = yield from get_flag(cc)
    show(cc)
    save_flag(image, cc.lower() + '.gif')
    return cc

def download_many(cc_list):
    loop = asyncio.get_event_loop() 
    to_do = [download_one(cc) for cc in sorted(cc_list)] # 제너레이터 객채 생성
    wait_coro = asyncio.wait(to_do) # 일종의 코루틴 자신에게 전달된 코루틴들이 완료되면 완료 됨
    res, _ = loop.run_until_complete(wait_coro) # wait_core 가 완료될때까지 이벤트 루프 실행. 브로킹됨 
    loop.close()
    return len(res)

ModuleNotFoundError: No module named 'flags'

wait() 가 관리하는 객체는 모두 Future 객체가 된다. 그리고 코루틴 제너레이터객체가 반환된다. 

loop.run_until_complete() 함수는 Future 객체나 코루틴을 받는다. 코루틴을 task 안에 래핑한다. 

wait_coro 는 실행이 완료되면 <실행 완료된 Future 들의 집합>, <실행이 완료되지 않은 Future 들의 집합> 튜플 반환 

yield from 사용법
-  yield from 으로 연결된 전체 코루틴 체인은 궁극적으로 가장 바깥쪽에 있느 ㄴ대표 제너레이터의 next() 나 send() 를 명시적 혹은 암묵적(for) 으로 출하는 호출자에 의해 구동된다. 
- 제너레이터는 yield 를 사용하는 단순 제너레이터나 반복형 객체 여야한다. 

asyncio API 특징 
- loop.run_until_complete() 와 같은 api 로 제너레터를 전달 해서 구동시킴 (next, send 필요 없음)
- 코루틴함수는 asyncio.sleep() 혹은 상위 프로토콜을 구현하는 라이브러리(get_flag() 의 resp = yield from aiohttp.request()) 에 yield from 을 호출하면서 끝나야 한다. 실제로 입출력을 수행하는 라이브러리 함수여야 한다. 

asyncio 이벤트 루프가 코루틴 체인을 구동하고, 코루틴 체인은 결국 저수준 비동기 입출력을 수행하는 라이브러리 함수에서 끝난다. 


### Running Circling Around Blocking Calls

블로킹 함수 : 디스크나 네트워크 입출력 수행

블로킹 함수가 전체 어플을 멈추지 않게 하는 방법
1. 블로킹을 스레드 별로 처리
2. 보든 블로킹 연산을 논블로킹 비동기 연산으로 바꿈

전통적으로 메모리 부담을 줄이기 위해 콜백으로 비동기 호출을 구현했다. (인터럽트 처럼) 

응답을 기다리는 대신, 이벤트가 끝남을 알리는 함수를 등록하여 호출한 것을 논블로킹으로 만들수 있다. 

이벤트 루프가 응답을 받으면, 우리 코드를 다시 호출한다. 

### Enhancing the asyncio downloader Script Using asyncio.as_completed

loop.run_until.complete() 에 의해 구동되는 await()는 내려 받는 작업들이 모두 완료 된 후 결과를 반환한다. 

asyncio 프로그램에는 주 스레드가 하나만 있고 주 스레드에서 이벤트 루프를 실행하므로, 주 스레드 안에서 블로킹 함수를 호출하면 안된다.

모든 네트워크 접속에서 yield from 을 사용해야 한다. 

download_one() 을 asyncio.wait() 안에 래핑하고 loop.run_until_comlete() 에 전달함으로써 결국 download_many() 가 download_one() 을 구동했다. download_many() 에서 새로 만든 downloader_core() 코루틴으로 옮기고 download_many() ㄴ느 단지 이벤트 루프를 생성하고 downloader_core() 를 스케쥴링한다. 

In [1]:
import asyncio
import collections
import aiohttp
from aiohttp import web
import tqdm
from flags2_common import main, HTTPStatus, Result, save_flag
# default set low to avoid errors from remote site, such as
# 503 - Service Temporarily Unavailable
DEFAULT_CONCUR_REQ = 5
MAX_CONCUR_REQ = 1000

class FetchError(Exception):
    def __init__(self, country_code):
        self.country_code = country_code

@asyncio.coroutine
def get_flag(base_url, cc):
    url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
    resp = yield from aiohttp.request('GET', url)
    if resp.status == 200:
        image = yield from resp.read()
        return image
    elif resp.status == 404:
        raise web.HTTPNotFound()
    else:
        raise aiohttp.HttpProcessingError(
            code=resp.status, message=resp.reason,
            headers=resp.headers)

@asyncio.coroutine
def download_one(cc, base_url, semaphore, verbose):
    # semaphore 인수는 ayncio.Semaphore 객체로, 동시 요청 수를 제한하기 위한 동기화 장치
    try:
        with (yield from semaphore):
            # semaphore 를 yield from 표현식의 콘텍스트 관리자로 사용하므로, 시스템 전체가 블로킹되지 않게 한다. 
            # 단지 semaphore 카운터가 최대 허용수가 되면 코루틴이 브로킹 된다. 
            image = yield from get_flag(base_url, cc)
            # with 문장을 빠져나올 떄 semaphore 카운터가 감소되고, 세마포어 객체를 기다리고 있던 다른 코루틴 객체가 진행되도록 한다. 
    except web.HTTPNotFound:
        status = HTTPStatus.not_found
        msg = 'not found'
    except Exception as exc:
        raise FetchError(cc) from exc
        # 나머지 예외는 PEP3134 예외 체이닝과 내장된 역추적 제안서에 정의된 
        # raise X from Y 구문을 이용해서 연결된 원래 예외를 담은 fetcherror 로 만들어 전파
    else:
        save_flag(image, cc.lower() + '.gif')
        status = HTTPStatus.ok
        msg = 'OK'
        
    if verbose and msg:
        print(cc, msg)
    return Result(status, cc)

ModuleNotFoundError: No module named 'tqdm'

* 세마포어
    - 내부에 카운터를 가지고 있는 객체로서 acquire() 코루틴 메서드를 호출할 때마다 감소
    - release() 코루틴 메서드를 호출할떄 마다 증가

세마포어 설정 
```python
semaphore = asyncio.Semaphore(concur_req)
```

카운터가 0 보다 클때는 acuire() 를 호출해도 클로킹 되지 않지만, 카운터가 0 일떄 acuire() 를 호출하면 다른곳에서 release() 를 호출해서 카운터를 증가시켜줄 때까지 블로킹 된다. 

acuire, release 를 직접 호출하지 않고 콘텍스트 관리자를 사용
```python 
with (yield from semaphore):
    image = yield from get_flag(base_url, cc)
```

코루틴이 as_completed() 호출해야 하기 때문에 syncio.as_completed()에 의해 생성된 future 객체들의 결과를 yield from 을 이용해서 가져와야한다

main() 함수는 단지 일반 함수를 받을 뿐 코루틴을 받지 않기 떄문에 download_many() 를 코루틴으로 바꿀수 없다. 

downloader_core() 코루틴을 만들어서 as_completed() 루프를 실행했다. 


In [3]:
@asyncio.coroutine
def downloader_coro(cc_list, base_url, verbose, concur_req):
    counter = collections.Counter()
    # 동시에 concur_req 개까지 코루틴 실행하는 세마포어 생성
    semaphore = asyncio.Semaphore(concur_req)

    to_do = [download_one(cc, base_url, semaphore, verbose)
                for cc in sorted(cc_list)]
    to_do_iter = asyncio.as_completed(to_do)
    # 실행이 완료된 future 객체를 반환하는 반복자를 가져옴
    
    if not verbose:
        to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list))
        # tqdm 함수를 래핑 진행상황 확인
        
    for future in to_do_iter: # 
        # 완료된 future 객체들을 반복, requests와 aiohttp HTTP 차이떄문에 예외처리에 대한 것만 달라짐
        try:
            res = yield from future
            # asyncio.Future 객체의 결과를 가져올 때는 객체의 result(0 메서드를 호출하는 것보다 yield from 을 사용하는 것이 쉬움
        except FetchError as exc:
            country_code = exc.country_code
            try:
                error_msg = exc.__cause__.args[0]
                # 원래 예외 (__cause__) 에서 오류 메시지 가져옴 
            except IndexError:
                error_msg = exc.__cause__.__class__.__name__
                # 연결된 예외 클래스 명을 에러 메시지로 사용 
            if verbose and error_msg:
                msg = '*** Error for {}: {}'
                print(msg.format(country_code, error_msg))
            status = HTTPStatus.error
        else:
            status = res.status
        
        counter[status] += 1
    return counter

def download_many(cc_list, base_url, verbose, concur_req):
    loop = asyncio.get_event_loop()
    coro = downloader_coro(cc_list, base_url, verbose, concur_req)
    counts = loop.run_until_complete(coro)
    loop.close()
    return counts

In Example 18-8, we could not use the mapping of futures to country codes we saw in
Example 17-14 because the futures returned by asyncio.as_completed are not necessarily
the same futures we pass into the as_completed call. Internally, the asyncio
machinery replaces the future objects we provide with others that will, in the end, produce
the same results.

### Using an Executor to Avoid Blocking the Event Loop

asyncio 이벤트 루프는 스레드 풀 실행자를 내부에 가지고 잇으며 run_in_executor() 메서드에 실행할 콜러블을 전달 할수 있다. 


In [4]:
@asyncio.coroutine
def download_one(cc, base_url, semaphore, verbose):
    try:
        with (yield from semaphore):
            image = yield from get_flag(base_url, cc)
    except web.HTTPNotFound:
        status = HTTPStatus.not_found
        msg = 'not found'
    except Exception as exc:
        raise FetchError(cc) from exc
    else:
        loop = asyncio.get_event_loop()
        # run_in_executor 첫번쨰 인수는 실행자 객체, 이벤트 루프의 기본 스레드 풀 실행자를 사용할떄는 None 
        # 나머지 인수는 콜러블 및 콜러블이 받을 위치 인수
        loop.run_in_executor(None,save_flag, image, cc.lower() + '.gif')
        status = HTTPStatus.ok
        msg = 'OK'
        
    if verbose and msg:
        print(cc, msg)
    return Result(status, cc)

### From Callbacks to Futures and Coroutines

코루틴이 어떻게 고전적인 콜백 스타일을 향상시키는지 설명

세개의 비동기 작업을 연속적으로 처리(Callback hell) 하기 위해 코루틴 체인에서 yield 를 세번 사용해서 이벤트 루프가 계속 실행할 수 있게 해주면 된다. 겨로가가 나오면 코루틴의 send() 메서드를 호출해서 코루틴을 활성화 하면된다. 


```python
# api_call1(request1, function (response1) {
#     // stage 1
#     var request2 = step1(response1);
    
#     api_call2(request2, function (response2) {
#         // stage 2
#         var request3 = step2(response2);
        
#         api_call3(request3, function (response3) {
#             // stage 3
#             step3(response3);
#         });
#     });
# });

@asyncio.coroutine
def three_stages(request1):
    response1 = yield from api_call1(request1)
    # stage 1
    request2 = step1(response1)
    response2 = yield from api_call2(request2)
    # stage 2
    request3 = step2(response2)
    response3 = yield from api_call3(request3)
    # stage 3
    step3(response3)
    
loop.create_task(three_stages(request1)) # must explicitly schedule execution
```

3 단계의 연산을 하나의 함수안에서 나란히 비동기식으로 호출하므로, 각각의 yield from 을 각기 별도의 try/exception 블록안에 넣어서 api_call1(), api_call2(), api_call3() 가 발생시키는 예외를 처리할 수 있다. 
