# 8. 비동기 I/O

## 배경
빅데이터 세계에서는 실제 코드 자체보다는 코드에 필요한 데이터를 얻어오는 작업이 병목이 된다. (I/O bound)
- I/O는 프로그램의 흐름의 짐이 될수 있다.
  - 파일이나 네트워크 소켓에서 데이터를 읽는 작업은 잠시 실행을 멈춰야 하고, 커널에 연산을 수행하도록 요청한 다음, 그 작업이 끝날때 까지 기다려야 한다. (I/O 대기)
  - I/O 연산 대부분은 CPU보다 수십배 느린 장치에서 일어난다. 커널과의 통신이 아무리 빨라도, 커널이 장치에서 결과를 가져와서 우리에게 전달하는데 상당한 시간이 걸린다.
- **비동기 I/O를 사용하면 I/O 연산이 완료되기를 기다리는 동안 다른 연산을 수행**하여 이런 유휴 시간을 활용할수 있다.

<img width="603" alt="스크린샷 2021-07-31 오후 3 29 25" src="https://user-images.githubusercontent.com/22383120/127735307-3d0c1f77-9d63-4bf2-97f7-b528ee36f43c.png">

  
## 비동기 I/O가 가능한 이유는?
- 프로그램이 I/O 대기 일때, 커널이 데이터를 읽어달라고 HDD나 Network 등에 요청하고, 이를 기다렸다가 데이터가 준비되면 커널에게 신호를 보내기 때문이다.
- 즉 기다리는 대신, 데이터에 대한 요청을 꺼낼수 있는 메커니즘 (이벤트 루프)을 만들고, 연산을 계속 수행하며 읽을 데이터가 준비되면 통지를 받는다.

# 8.1 비동기 프로그래밍 소개
## 일반적인 프로그램
- I/O 대기에 들어가면, 실행을 멈추고 커널이 I/O 요청과 관련된 저수준 연산을 처리 (컨텍스트 스위치)하며, I/O 연산이 끝날때가지 프로그램은 재개되지 않는다.
  - 컨텍스트 스위치란?
    - 멀티프로세스 환경에서 CPU가 하나의 프로세스를 실행하는 도중, 인터럽트 요청이 걸렸을때 기존까지의 작업을 저장한 뒤 다음 프로세스가 CPU 실행할수 있도록 프로세스의 상태를 교체하는 작업
  - 컨텍스트 스위치는 비싼 작업이며, 프로그램의 현재 상태를 저장해야 하고, CPU 사용을 포기해야 한다.
  
## 동시성 프로그램
- 실행 대상, 시점을 관리하는 이벤트 루프 (Queue)를 사용한다. 이벤트 루프는 실행할 함수의 목록에 지나지 않는다.

In [2]:
from queue import Queue
from functools import partial

eventloop = None

class EventLoop(Queue):
    def start(self):
        while True:
            function = self.get()
            function()

def do_hello():
    global eventloop
    print("Hello")
    eventloop.put(do_world)

def do_world():
    global eventloop
    print("World")
    eventloop.put(do_hello)
    
eventloop = EventLoop()
eventloop.put(do_hello)
eventloop.start()

Hello
World
Hello
World
Hello
World


eventloop.put(do_world) 호출은 do_world 함수에 대한 비동기 호출을 대략적으로 보여준다.
- 이연산은 `non-blocking` 이라 부르며, 즉시 do_hello가 끝나 결과값이 반환되지만 나중에 do_world 함수를 호출함을 보장한다.
- do_hello는 do_world가 끝날떄까지 기다리지 않는다.

**이벤트 루프, 비동기 I/O 개념을 같이 이용하면, 요청한 I/O 연산이 끝나기를 기다리는 동안 다른 함수를 실행하는 프로그램을 만들수 있다.**

## 이벤트 루프를 사용하는 프로그래밍은 `콜백, 퓨처` 라는 두 형태가 있다.

### 콜백: 각 함수를 호출할때 콜백이라는 인자를 넘긴다. 함수가 반환하는 대신, 그 값을 인자로 실어 콜백 함수를 호출한다.

```python
from functools import partial

def save_value(value, callback):
    print(f"Saving {value} to database")
    save_result_to_db(result, callback) #db 의 결과를 callback 함수와 같이 넘겨준다.

def print_response(do_response):
    print("Response from database: {db_response}")
    
eventloop.put(partial(save_value, "Hello world", print_response))
```

1. save_result_to_db()는 비동기 함수로, 이 함수가 즉시 반환되면서 함수가 종료되고 다음 코드를 실행할수 있다.
2. 이후에 save_result_to_db()가 끝나면, print_response() 함수가 호출된다.

콜백은 함수의 결과를 받는 다른 함수가 더해지는 함수의 사슬 형태로 만들어 진다. 이게 무한정되면 콜백 지옥이라 부른다. (함수안에 함수, 함수안에 함수)

![스크린샷 2021-08-02 오후 5 14 01](https://user-images.githubusercontent.com/22383120/127942109-6dba81bd-bfba-43af-98fe-0a3dbb8bf218.png)

### 파이썬 3.4 이전
- 콜백 패러타임이 유행

### 파이썬 3.4 이후
- 퓨처 메커니즘을 파이썬 네이티브로 만들었다.
- await, async 키워드를 도입함으로써, 비동기 함수를 정의하고 결과를 기다릴수 있게 해준다.

![스크린샷 2021-08-02 오후 5 17 08](https://user-images.githubusercontent.com/22383120/127942054-602c46e7-1e0f-4020-9053-5c9297cb66d7.png)
 - 내부는 비동기적 작업이지만 동기 코드처럼 직관적이게 이해할수 있다.

### 퓨처: 비동기 함수는 실제 결과가 아니라 퓨처를 반환한다. 퓨처는 미래에 얻을 결과를 담은 프라미스라 한다.

프라미스?
- 비동기 작업의 최종적인 결과 (또는 에러)를 담고 있는 객체
- pending, fulfilled, rejected, settled 상태를 가진다.

비동기 함수가 반환하는 퓨처가 완료되어 필요한 값이 채워지길 기다려야 한다. (await 사용)
- 퓨처 객체에 요청한 데이터가 들어오기를 기다리는 동안 다른 계산을 수행할수 있다.

```python
async def save_value(value):
    print(f"Saving {value} to database")
    db_response = await save_result_to_db(result)
    print("Response from database:" {db_response}")
  
eventloop.put(partial(save_value, "Hello world", print))
```

- save_result_to_db()는 퓨처 타입을 반환하고, await 함으로써 값이 준비될때까지 save_value()는 잠시 멈출수 있다. 그리고 그 값이 준비되면 save_value가 재개되어 연산을 마무리 한다.

# 8.2 async / await 동작 방식

async/await함수를 **코루틴**이라 부른다. (coroutine)
- 코루틴? (cooperation + routines (=functions))
  - 실행을 일시중단(suspend), 재개 (resume)할수 있도록하여 비선점형 멀티태스킹을 위한 서브 루틴을 일반화하는 컴퓨터 프로그램 구성 요소
  - light-weight thread로, 협력적인 멀티태스킹 (비선점형 멀티태스킹)이 된다.
  - `async/await, yield 같은 키워드를 사용`한다.
- **파이썬에서 코루틴은 제네레이터와 같은 철학으로 구현**된다. (제너레이터도 다음 실행을 일시 중단하고 나중에 계속 실행할수 있는 장치가 있음)
- 즉, await 문은 함수의 yield 문과 기능면에서 비슷해진다. (현재 함수의 실행을 일시 중단하고, 다른 코드를 실행하기 때문)

동시성 코드를 실행할때는 이벤트 루프에 의존한다는 사실을 깨닫자.
- 대부분의 완전한 동시성 코드의 주 진입점은 이벤트 루프를 설정하는 것이지만, 이는 `전체 프로그램이 동시성이라고 가정하는 것`

다른 경우에 몇가지 퓨처를 프로그램 안에서 생성하고 임시 이벤트 루프를 시작해 퓨처를 관리하다, 이벤트 루프가 끝나면 프로그램을 다시 일반적인 방식으로 실행하게 할수 있다.
- asyncio.loop 모듈의 loop.run_until_complete(coro), loop.run_forever() 를 통해 이루어진다.

## 8.2.1 순차적 크롤러

In [30]:
import random
import string

import requests


def generate_urls(base_url, num_urls):
    for i in range(num_urls):
        yield base_url + "".join(random.sample(string.ascii_lowercase, 10))


def run_experiment(base_url, num_iter=1000):
    response_size = 0
    for url in generate_urls(base_url, num_iter):
        response = requests.get(url)
        response_size += len(response.text)
    return response_size


if __name__ == "__main__":
    import time

    delay = 100
    num_iter = 1000
    base_url = f"http://127.0.0.1:8080/add?name=serial&delay={delay}&"

    start = time.time()
    result = run_experiment(base_url, num_iter)
    end = time.time()
    print(f"Result: {result}, Time: {end - start}")

Result: 1000, Time: 107.43802499771118


가장 간단한 방법으로 1000개 요청중 앞서 요청한 url이 끝난 이후 다음 url을 처리한다. (동기 I/O 방식)

## 8.2.2 gevent

비동기 라이브러리로, 비동기 함수가 퓨처를 반환한다는 패러다임을 따른다.
- 즉 코드의 대부분 로직들은 동시에 실행할수 있다.
- gevent는 표준 I/O 함수를 몽키패치 (monkey patch)?? 해서 비동기적으로 만든다.

두 가지 메커니즘을 제공한다.
- 표준 라이브러리를 비동기 I/O 함수로 변경
- 동시 실행을 위해 사용할수 있는 Greenlet 객체를 제공 (그린렛)
  - 그린렛은 코루틴의 일종으로 스레드와 같다고 할수있다.

그린렛 동작 방식
- 모든 그린렛은 같은 물리 스레드에서 실행된다. 
- 여러 CPU에서 실행되는 대신 gevent는 한 CPU에서 실행되는 이벤트 루프를 통해 I/O를 대기하는 동안 그린렛의 실행을 전환한다.

In [2]:
import random
import string
import urllib.error
import urllib.parse
import urllib.request
from contextlib import closing

import gevent
from gevent import monkey
from gevent.lock import Semaphore

monkey.patch_socket()


def generate_urls(base_url, num_urls):
    for i in range(num_urls):
        yield base_url + "".join(random.sample(string.ascii_lowercase, 10))


def download(url, semaphore):
    with semaphore:  # <2> 한번에 최대의 100개의 그린렛이 HTTP GET 요청을 보내도록 제한하기 위해, 세마포어를 사용한다.
        with closing(urllib.request.urlopen(url)) as data:
            return data.read()


def chunked_requests(urls, chunk_size=100):
    semaphore = Semaphore(chunk_size)  # <1> 100의 사이즈를 가진 세마포어를 만든다.
    requests = [gevent.spawn(download, u, semaphore) for u in urls]  # <3> 함수와 인자를 받아서 그 함수를 실행하는 그린렛을 시작한다.
    for response in gevent.iwait(requests): # <4> iwait을 통해 이벤트 루프를 시작하고, 루프를 그린렛이 모두 끝날때까지 계속 실행한다. 이로인해 순차적으로 실행된다.
        yield response


def run_experiment(base_url, num_iter=1000):
    urls = generate_urls(base_url, num_iter)
    response_futures = chunked_requests(urls, 100) # 세마포어 수를 조정하여 요청의 개수를 조정할수 있다.
    response_size = sum(len(r.value) for r in response_futures)
    return response_size


if __name__ == "__main__":
    import time

    delay = 100
    num_iter = 1000
    base_url = f"http://127.0.0.1:8080/add?name=gevent&delay={delay}&"

    start = time.time()
    result = run_experiment(base_url, num_iter)
    end = time.time()
    print(f"Result: {result}, Time: {end - start}")

  and should_run_async(code)


Result: 1000, Time: 1.1788930892944336


동기 I/O 보다 성능이 월등히 빠르다.

이벤트 루프에 `과도한 부하를 걸면 성능이 떨어질수 있으므로`, 번거롭더라도 요청을 한꺼번에 보내지 않고 `정한 개수로 묶어서 보내는게 좋다.`
- 우리가 통신할 서버도 동시에 응답할수 있는 요청의 수에 한계가 있다.
- 여기 실험에서는 약 50밀리초라면 한번에 100개의 정도를 연결하는 것이 최적임을 알게되었다.
  - 더 많이 연결하면 `컨텍스트 스위칭이 자주 발생해서 프로그램에 불필요한 오버헤드가 생긴다.`

<img width="601" alt="스크린샷 2021-07-31 오후 5 50 50" src="https://user-images.githubusercontent.com/22383120/127735297-dee7bf42-7a4a-4db2-b149-ccdfa041a5f8.png">
- 이처럼 시간이 갑자기 올라가는 구간이 있는데, 그 이상의 동시 다운로드 수를 정하지 않는게 좋다.


<img width="598" alt="스크린샷 2021-07-31 오후 5 54 09" src="https://user-images.githubusercontent.com/22383120/127735276-334d247e-90ca-4ab4-b4d8-19fa83afa8ae.png">


100번째 요청에서 새로운 요청이 되지 않는 구간이 있는데, 이는 아직 앞에 http 요청이 처리가 되지 않았음을 뜻한다. (락에 걸림)
- 즉 이전 http 요청이 끝나고 세마포어의 락을 풀어줄때까지 새로운 요청을 할수 없다.

## 8.2.3 Tornado

파이썬 비동기 I/O에서 자주사용하며, HTTP 클라이언트와 서버를 위해 페이스북에서 개발한 패키지

async/await 도입전에 있었으며, 처음 `콜백을 통해 비동기적 동작을 수행`했었다. 

하지만 최근 코루틴을 사용하기로 결정했으며, 현재는 async/await 구문이나 파이썬 tornado.gen 모듈을 통해 코루틴을 쓸수있다.

In [None]:
import asyncio
import random
import string
from functools import partial

from tornado.httpclient import AsyncHTTPClient

AsyncHTTPClient.configure(
    "tornado.curl_httpclient.CurlAsyncHTTPClient", max_clients=100  # <1> HTTP 클라이언트 설정, 한꺼번에 처리할 요청 개수를 정함
)


def generate_urls(base_url, num_urls):
    for i in range(num_urls):
        yield base_url + "".join(random.sample(string.ascii_lowercase, 10))


async def run_experiment(base_url, num_iter=1000):
    http_client = AsyncHTTPClient()
    urls = generate_urls(base_url, num_iter)
    response_sum = 0
    tasks = [http_client.fetch(url) for url in urls]  # <2> 퓨처를 여러개 만들어 URL 내용을 얻는 작업을 이벤트 루프(큐)에 넣는다.
    for task in asyncio.as_completed(tasks):  # <3> tasks 리스에 있는 코루틴을 모두 실행하고 완료되면 yield 한다.
        response = await task  # <4> await 문을 통해 HTTP GET 요청을 기다리고, 작업이 완료된 결과를 즉시 반환한다.
        response_sum += len(response.body)
    return response_sum


if __name__ == "__main__":
    import time

    delay = 100
    num_iter = 1000
    run_func = partial(
        run_experiment,
        f"http://127.0.0.1:8080/add?name=tornado&delay={delay}&",
        num_iter,
    )

    start = time.time()
    result = asyncio.run(run_func)  # <5> 지정한 함수가 실행되는 동안만 IOLoop를 시작한다.
    end = time.time()
    print(f"Result: {result}, Time: {end - start}")

gevnet와 큰 차이점은?
- gevent의 이벤트 루프는 iwait 함수가 실행되는 동안에만 실행된다.
  - 주로 CPU를 쓰고 가끔 무거운 I/O를 할때 적합한다.
- tornado는 항상 이벤트루프가 실행되고 있으며, 비동기 I/O 뿐만 아니라 프로그램 전체 실행 흐름을 제어한다.
  - 대부분이 비동기적이여야 하는 I/O 위주의 어플리케이션에 가장 적합하다. (고성능 웹서버)
  
<img width="584" alt="스크린샷 2021-07-31 오후 6 09 21" src="https://user-images.githubusercontent.com/22383120/127735309-a8bdbeb4-a880-4749-bf81-af26e2ac3994.png">

toenado의 실행 결과는 gevent와 다르게 멈췄다 시작하는 (stop-and-go) 형태를 가진다. 
 - 이는 열린 연결의 수를 제한하는 내부 메커니즘이 끝나는 연결을 빠르게 처리하지 못한다는 뜻. (실제 실험 결과 gevent보다 약간 더 느리다.)
 - 자원을 과도하게 활용하거나 낭비한다는 의미.

## 8.2.4 aiohttp

aiohttp는 asyncio 라이브러리에 전적으로 의존해 만들어진 라이브러리
- aiohttp는 HTTP 클라이언트와 서버 기능 모두 제공하며, tornado와 비슷한 API를 사용한다.

In [None]:
import asyncio
import random
import string

import aiohttp


def generate_urls(base_url, num_urls):
    for i in range(num_urls):
        yield base_url + "".join(random.sample(string.ascii_lowercase, 10))


def chunked_http_client(num_chunks):
    semaphore = asyncio.Semaphore(num_chunks)  # <1> 세마포어의 수를 정한다.

    async def http_get(url, client_session):  # <2> 비동기적으로 파일을 다운로드하고, 세마포어 락을 준수하는 새로운 코루틴을 반환한다.
        nonlocal semaphore
        async with semaphore:
            async with client_session.request("GET", url) as response:
                return await response.content.read()

    return http_get


async def run_experiment(base_url, num_iter=1000):
    urls = generate_urls(base_url, num_iter)
    http_client = chunked_http_client(100)
    responses_sum = 0
    async with aiohttp.ClientSession() as client_session:
        tasks = [http_client(url, client_session) for url in urls]  # <3> 퓨처를 반환하고, 퓨처를 리스트에 저장해 진행 상황을 추적한다.
        for future in asyncio.as_completed(tasks):  # <4> 퓨처의 결과가 준비될때까지 기다린 다음, 결과를 이터레이션 한다.
            data = await future
            responses_sum += len(data)
    return responses_sum


if __name__ == "__main__":
    import time

    loop = asyncio.get_event_loop()
    delay = 100
    num_iter = 1000

    start = time.time()
    result = loop.run_until_complete(
        run_experiment(
            f"http://127.0.0.1:8080/add?name=asyncio&delay={delay}&", num_iter
        )
    )
    end = time.time()
    print(f"Result: {result}, Time: {end - start}")

async with을 사용하면 요청한 자원을 획득하려고 기다리는 동안 다른 코루틴을 실행할수 있다.
- 열린 세마포어 슬롯을 공유하거나 이미 호스트와 연결된 열린 연결을 공유하는 등의 일이 tornado에서보다 더 효율적(?) 이다.

아래 그래프 결과 tornade보다 부르러운 전환을 보여준다.

<img width="591" alt="스크린샷 2021-08-01 오후 4 16 17" src="https://user-images.githubusercontent.com/22383120/127762659-f9060e41-ff80-406b-b893-0530fb69c4ab.png">


aiohttp는 우리가 만든 요청의 세부 요소와 이벤트 루프를 잘 제어한다.

# 8.3 CPU 공유: I/O 부하

데이터베이스와 자주 통신하며 결과를 저장하는 CPU 위주로 예제를 살펴보자.
- bcrypt 해시 (비크립트 해시)를 계산하여 CPU 위주 작업의 크기를 늘린다.
- 즉, 프로그램의 상당한 양의 계산을 하면서, 계산의 결과를 데이터베이스에 저장하는 상황

<img width="555" alt="스크린샷 2021-08-01 오후 4 17 37" src="https://user-images.githubusercontent.com/22383120/127762690-98f0e464-edcf-46e7-9c0d-6334abb59007.png">


데이터베이스에 바라는 요구사항
1. 데이터베이스가 HTTP API를 제공한다.
2. 응답 시간은 100 밀리초
3. 데이터베이스는 한번에 여러 요청을 처리할수 있다. (Postgre, MongoDB, Riak 등)

여기서 100 밀리초는 우리 문제의 전환점을 과장해서 보여주기 위해 높게 잡았다.
- 전환점은 어느 CPU 작업 처리시 걸리는 시간이 I/O 작업보다 더 오래걸리는 지점
- 간단한 값을 저장하는 데이터베이스에서 10밀리초 이상의 응답 시간은 느리다고 볼수 있다.

## 8.3.1 순차 처리

In [13]:
import random
import string

import bcrypt
import requests


def do_task(i, difficulty):
    passwd = "".join(random.sample(string.ascii_lowercase, 10)).encode("utf8")
    salt = bcrypt.gensalt(difficulty) # <2> difficulty를 조정함으로써 암호 생성의 난이도를 높일수 있다.
    result = bcrypt.hashpw(passwd, salt)
    return result.decode("utf8")

def save_result_serial(result):
    url = f"http://127.0.0.1:8080/add"
    response = requests.post(url, data=result)
    return response.json()


def calculate_task_serial(num_iter, task_difficulty):
    for i in range(num_iter):
        result = do_task(i, task_difficulty)
        save_result_serial(result)
        
data = {
            "async": [],
            "serial": [],
            "no IO": [],
            "file IO": [],
            "batches": [],
            "async+uvloop": [],
        }
for difficulty, num_iter in ((8, 600), (10, 400), (11, 400), (12, 400)):
    print(f"Difficulty: {difficulty}")
    start = time.perf_counter()
    calculate_task_serial(num_iter, difficulty)
    t = time.perf_counter() - start
    print("Serial code took: {} {}s".format(num_iter, t))
    data["serial"].append((num_iter, difficulty, t))

Difficulty: 8
Serial code took: 600 75.34634159400048s
Difficulty: 10
Serial code took: 400 67.88798047399996s
Difficulty: 11
Serial code took: 400 89.51139227099975s
Difficulty: 12
Serial code took: 400 134.74418474699996s


순차처리 방식이다 보니 총 시간에서 최소 40초를 I/O에 쓴다. (약 56% 소모)
- 이는 I/O 대기를 하면서 다른 일을 할수 있는 시간을 날렸다.

하지만 난이도 올라갈수록 CPU에 점점더 많은 시간을 소비함에 따라, 순차적인 I/O의 전체 비율이 줄어든다.
- CPU에 시간을 더 많이 쓴다면, 위에 I/O 비율(56%) 더 적어질것이다.

<img width="593" alt="스크린샷 2021-08-01 오후 4 10 59" src="https://user-images.githubusercontent.com/22383120/127762535-659314ea-d7d8-4d3b-888f-30bcc7a66a7b.png">

- 이터레이션당 시간이 높을수록 CPU에서 시간을 많이 쓴다는 의미 (Difficulty 높음)
- Difficulty가 높을수록 CPU 시간이 대부분 차지하기 때문에, I/O 없음과 별로 차이가 없음을 뜻한다.
- Difficulty가 낮을수록 CPU 시간 보다 I/O가 대부분 차지하기 때문에, I/O 없음과 차이가 많이 난다.

ex)
- Difficulty 낮을때
  - 순차처리 -> CPU 시간 10초, IO 시간이 100초 = 총 소요시간 `110초`
  - IO 없음 -> CPU 시간 10초 = 총 소요시간 `10초`
  - 순차처리와 IO 없음의 `총 소요시간의 차이가 많이 난다.`
- Difficulty 높을때
  - 순차처리 -> CPU 시간이 1000초, I/O 시간이 100초 = 총 소요시간 `1,100초`
  - IO 없음 -> CPU 시간 1000초 = 총 소요시간 `1,000초`
  - 순차처리와 IO 없음의 `총 소요시간의 차이가 거의 나지 않는다.`

처음부터 `어떤 최적화 (CPU? I/O?)를 수행할지 선택`하는 것보다 `어떤 곳에서 부하가 많이 발생하는지 먼저 이해`해야 한다.
  - ex) 실행이 1시간 걸리는 CPU 작업와 1초 걸리는 I/O 작업이 있다면, I/O 작업의 속도를 높이려해도 원하는 속도 향상을 얻기 어렵다.

## 8.3.2 일괄 처리

In [None]:
import asyncio
import aiohttp

class AsyncBatcher(object):
    def __init__(self, batch_size):
        self.batch_size = batch_size
        self.batch = []
        self.client_session = None
        self.url = f"http://127.0.0.1:8080/add"

    def __enter__(self):
        return self

    def __exit__(self, *args, **kwargs):
        self.flush()

    def save(self, result):
        self.batch.append(result)
        if len(self.batch) == self.batch_size:
            self.flush()

    def flush(self):
        loop = asyncio.get_event_loop()
        loop.run_until_complete(self.__flush()) # <1> 비동기 함수 하나만 실행하려고 이벤트 루프를 시작할수 있다. 이후 코드는 일반적인 코드처럼 실행한다.

    async def __flush(self): # <2> 앞서 살펴본 aiohttp 예제와 비슷
        async with aiohttp.ClientSession() as session:
            tasks = [self.fetch(result, session) for result in self.batch]
            for task in asyncio.as_completed(tasks):
                await task
        self.batch.clear()

    async def fetch(self, result, session):
        async with session.post(self.url, data=result) as response:
            return await response.json()

def do_task(i, difficulty):
    passwd = "".join(random.sample(string.ascii_lowercase, 10)).encode("utf8")
    salt = bcrypt.gensalt(difficulty)
    result = bcrypt.hashpw(passwd, salt)
    return result.decode("utf8")
        
def calculate_task_batch(num_iter, task_difficulty):
    batcher = AsyncBatcher(100) # 결과 100개를 하나로 묶어서 데이터베이스에 비동기적으로 한번에 넣는다.
    for i in range(num_iter): 
        result = do_task(i, task_difficulty)
        batcher.save(result)
    batcher.flush()

data = {
            "async": [],
            "serial": [],
            "no IO": [],
            "file IO": [],
            "batches": [],
            "async+uvloop": [],
        }

for difficulty, num_iter in ((8, 600), (10, 400), (11, 400), (12, 400)):
    print(f"Difficulty: {difficulty}")
    start = time.perf_counter()
    calculate_task_batch(num_iter, difficulty)
    t = time.perf_counter() - start
    print("Serial code took: {} {}s".format(num_iter, t))
    data["serial"].append((num_iter, difficulty, t))

위 예제를 통해 결과를 한 묶음으로 모아서 데이터베이스에 비동기적으로 전송한다.
- 하지만, 프로그램이 CPU 작업을 수행하지 않고 멈춘채 I/O 대기에 들어가지만, 이 시간동안 `한번에 하나가 아니라 여러 요청을 보낼수 있다.` (?)

이런식으로 일괄 처리하는 방식을 파이프라이닝이라 하며, I/O 작업의 부하를 낮추고 싶을때 도움이 된다.
- 파이프라이닝은 `비동기 I/O의 속도와 순차 프로그램의 작성 용이성을 잘 절충한 방식`이다.
- 파이프라이닝 시 사용할 적절한 묶음의 크기는 상황에 따라 달라지므로 최선의 결과를 얻으려면 프로파일링, 튜닝이 필요하다.

위 순차처리 보다 실행 시간이 훨씬 줄어든다.

===== iteration - 책에 있는데로 진행 =====

Difficulty: 8
- Serial code took: 600 9.204531800000002s

Difficulty: 10
- Serial code took: 400 21.750898011s

Difficulty: 11
- Serial code took: 400 41.946706219999996s

Difficulty: 12
- Serial code took: 400 84.48185938900001s


===== Iteration - 10,000 =====

Difficulty: 8
- Serial code took: 10000 152.74034377400002s

Dificculty: 10
- Serial code took: 10000 604.083965662s


<img width="595" alt="스크린샷 2021-08-01 오후 4 27 13" src="https://user-images.githubusercontent.com/22383120/127762970-063f14cb-09dc-4951-ab74-f7ffa21574a5.png">


## 8.3.3 완전한 비동기 처리

HTTP 서버 처럼 큰 I/O 위주 프로그램에 CPU 작업이 포함되었다면 완전한 비동기 해법이 필요할수 있다.
- API가 동시에 여러 연결을 효율적으로 처리하는 동시에 CPU 작업도 빠르길 원할때.

완전한 비동기 처리의 이점은?
- CPU 작업 도중에 I/O 작업도 할수 있다. 이는 전체 실행 시간에서 CPU 위주의 작업을 수행하는데 걸린 시간을 상쇄해버리는 효과가 있다.
- 일괄 처리와 비교했을때 `I/O 부하를 더 빨리 처리`하며, `작업의 반복이 많을수록 이는 더욱 벌어진다.`

In [None]:
def save_result_aiohttp(client_session):
    sem = asyncio.Semaphore(100)

    async def saver(result):
        nonlocal sem, client_session
        url = f"http://127.0.0.1:8080/add"
        async with sem:
            async with client_session.post(url, data=result) as response:
                return await response.json()

    return saver

async def calculate_task_aiohttp(num_iter, task_difficulty):
    tasks = []
    async with aiohttp.ClientSession() as client_session:
        saver = save_result_aiohttp(client_session)
        for i in range(num_iter):
            result = do_task(i, task_difficulty)
            task = asyncio.create_task(saver(result))  # <1> 데이터베이스에 저장시 즉시 await하지 않고, 이벤트 루프에 데이터베이스 저장요청을 넣고 함수가 끝나기 전 작업이 완료됐는지 확인함.
            tasks.append(task)
            await asyncio.sleep(0)  # <2> 이벤트 루프가 실행을 기다리는 작업을 처리할수 있도록 주 함수를 일시 중단한다. 이부분이 없다면 큐에 들어간 작업은 프로그램이 끝날때까지 실행되지 않는다.
        await asyncio.wait(tasks)  # <3> 완료되지 않은 작업을 기다린다.

===== iteration - 책에 있는데로 진행 =====

Difficulty: 8
- Serial code took: 600 9.165673459s

Difficulty: 10
- Serial code took: 400 22.156515983s

Difficulty: 11
- Serial code took: 400 42.457975987999994s

Difficulty: 12
- Serial code took: 400 84.15474196200002s


===== Iteration - 10,000 =====

Difficulty: 8
- Serial code took: 10000 137.318941296s

Difficulty: 10
- Serial code took: 10000 550.970170232s


### 위 결과처럼 `I/O 비율이 더 높은 Difficulty를 선택`하고 Iteration을 더 많이 돌렸을때, 완전한 비동기 처리가 일괄 처리보다 빠르다.



<img width="601" alt="스크린샷 2021-08-01 오후 4 27 53" src="https://user-images.githubusercontent.com/22383120/127762992-eeb80958-24d2-4b6e-992c-7d637c98e0cb.png">

CPU, I/O 작업을 25번 실행하면서 각 작업의시작과 끝을 표시했다.
- 초반 I/O 작업이 느린데 이는 서버에 처음 연결해야 하며, aiohttp의 ClientSession을 사용하므로 `연결이 캐시되어서 연결된 서버에 다시 접속하는 요청`은 더 빠르다.
- CPU 작업이 끝나자마자 다음 HTTP 요청이 발생하고, 나중에는 다른 CPU 작업이 끝나자마자 HTTP 요청이 완료됨을 볼수있다.

<img width="1453" alt="스크린샷 2021-08-01 오후 4 28 05" src="https://user-images.githubusercontent.com/22383120/127762994-ee154b26-bcc8-4962-89d1-c63f165bfc3f.png">
