# **Python Book3**
**[효율적 개발로 이끄는 파이썬 실천기술 Jupyter Notebook](https://nbviewer.org/github/Jpub/fulfillPython/tree/main/)**

- [How to print colored text to the terminal](https://stackoverflow.com/questions/287871/how-to-print-colored-text-to-the-terminal)

<!-- ![](../data/cover.jpg) -->
<img src="http://image.kyobobook.co.kr/images/book/xlarge/872/x9791190665872.jpg" width="150" height="150" />

## **[Chapter 10 동시 처리](https://nbviewer.org/github/Jpub/fulfillPython/blob/main/10-concurrent-programming/interactive.ipynb)**
```python
# Notebook 환경에서는 이미 이벤트 루프안에서 동작하고 있기 때문에 
# asyncio.run()은 사용할 수 없습니다. 따라서 Notebook 환경에서는 
# asyncio.run(X)는 await X로 바꿔 읽기 바랍니다.
asyncio.run(main()) 
<_UnixSelectorEventLoop running=True closed=False debug=False>
```

## **1 동시 처리와 병렬처리**
### 01 **<span style="color:orange">concurrent.futures</span>** 모듈 
── 동시 처리를 위한 고수준 인터페이스
- 고수준의 추상회된 API를 제공
- 다중 스레드는 `threading` 모듈을, 다중 프로세스는 `multiprocess` 를 주로 이용했었다
- 현재는 상황과 관계없이 `concurrent.futures` 를 추천하고 있다

### 02 **<span style="color:orange">Future</span>** 클래스와 **<span style="color:orange">Executor</span>** 클래스 
── 비동기 처리 캡슐화 실행
- 호출 가능한 객체를 `Executor` 클래스의 `.submin()` 메서드로 전달
- `.submit()` 메서드는 `Future` 에 스케쥴링 포함된 인스턴스를 출력

In [1]:
# ThreadPoolExecutor는 Executor의 구현 서브 클래스
from concurrent.futures import (
    Future, ThreadPoolExecutor,
)

# 비동기로 수행할 처리
def func():
    return 1

# 비동기로 수행할 처리를 submit()에 전달
future = ThreadPoolExecutor().submit(func)
isinstance(future, Future)

# 비동기로 수행한 처리의 반환값 취득
print("비동기 처리 반환 값 :", future.result())
print("현재상태 :", future.done())
print("Future Status :", future.running())
print("Future Cancelled :", future.running())

비동기 처리 반환 값 : 1
현재상태 : True
Future Status : False
Future Cancelled : False


## 2 스레드 기반의 **<span style="color:orange">ThreadPoolExecutor</span>** 클래스
── 스레드 기반의 비동기 실행
- 비동기 실행은 `Executor` 의 서브클래스를 **구체화** 해야 합니다
- **구체화된 서브 클래스** 는 `concurrent.futures.ThreadPoolExecutor` 를 사용 합니다

### 01 **순차 처리의 구현**
다중 스레드 처리와 비교를 위한 예시

In [2]:
urls = ['https://twitter.com', 'https://facebook.com', 'https://instagram.com']
from hashlib import md5
from pathlib import Path
from urllib import request

def download(url):
    req = request.Request(url)
    # 파일 이름에 / 등이 포함되지 않도록 함
    name = md5(url.encode('utf-8')).hexdigest()
    file_path = './' + name
    with request.urlopen(req) as res:
        Path(file_path).write_bytes(res.read())
        return url, file_path


import time
def elapsed_time(f):
    def wrapper(*args, **kwargs):
        st = time.time()
        v = f(*args, **kwargs)
        print(f"{f.__name__}: {time.time() - st}")
        return v
    return wrapper

@elapsed_time
def get_sequential():
    [print(download(url)) for url in urls]
        
get_sequential()

('https://twitter.com', './be8b09f7f1f66235a9c91986952483f0')
('https://facebook.com', './a023cfbf5f1c39bdf8407f28b60cd134')
('https://instagram.com', './09f8b89478d7e1046fa93c7ee4afa99e')
get_sequential: 1.7453134059906006


### 02 **다중 스레드 구현**
`get_multi_thread()` 를 활용한 다중 스레드 구현
- `ThreadPoolExecutor` 클래스 인스턴스는 **<span style="color:orange">콘텍스트 관리자</span>** 로 **<span style="color:orange">with</span>** 문을 사용할 수 있다.
- `max_workers` 의 기본값은 코어 수 * 5 를 쓴다

In [3]:
from concurrent.futures import (
    ThreadPoolExecutor,
    as_completed
)

@elapsed_time
def get_multi_thread():
    with ThreadPoolExecutor(max_workers=5) as executor:
        futures = [
            executor.submit(download, url)
            for url in urls
        ]
        
        # 완료된 것부터 얻을 수 있다
        [print(future.result()) for future in as_completed(futures)]     
            
get_multi_thread()

('https://twitter.com', './be8b09f7f1f66235a9c91986952483f0')
('https://facebook.com', './a023cfbf5f1c39bdf8407f28b60cd134')
('https://instagram.com', './09f8b89478d7e1046fa93c7ee4afa99e')
get_multi_thread: 0.7480037212371826


### 03-1 **다중 스레드 구현시 주의할 점**
다중 실행을 하면 동작하지 않는 경우가 발생하는데. **스레드 세이프한 구성** 으로 변경한다
- 100,000 번 반복을 2개 쓰레드로 반복하는 코드로, 200,000 이 아닌 다른 숫자를 출력한다


In [4]:
from concurrent.futures import (
    ThreadPoolExecutor,
    wait
)

class Counter:
    def __init__(self):
        self.count = 0
    def increment(self):
        self.count = self.count + 1

# 1,000,000회 증가
def count_up(counter):
    for _ in range(1000000):
        counter.increment()

counter = Counter()

# 2개의 스레드를 준비하고, 각각 count_up을 호출
threads = 2
with ThreadPoolExecutor() as e:    
    futures = [
        e.submit(count_up, counter)
        for _ in range(threads)
    ]
    done, not_done = wait(futures)

# 2,000,000 을 출력하지 않고 중간에 엉성하게 중복연산이 실행
print(f'{counter.count=:,}')

counter.count=1,765,225


### 03-2 **다중 스레드 구현시 주의할 점 2**
── 스레트 세이프티한 카운터 예시
- `threading.Lock` 객체를 활용하여 `exclusive control` 을 추가한다
- `Lock` 을 얻은으면 해당 스레드만 실행할 수 있고, `Lock` 의 처리가 끝나면 빠르게 해제된다
- `Lock` 이 뒤늦게 해제되는 상황을 방지하기 위해 `Lock` 객체는 `with` 문과 함께 사용한다

── 주의 할 점
- 실전에서는 성능저하를 최소화 위해, 부분 적으로만 `Lock` 객체를 감싸도록 한다

In [5]:
import threading
class ThreadSafeCounter:
    lock = threading.Lock() # 록을 준비
    
    def __init__(self):
        self.count = 0
    
    # with 로 감싼 `배타 제어처리`를 `Lock` 내부에 정의한다
    def increment(self):
        with self.lock:
            self.count = self.count + 1

counter = ThreadSafeCounter()
threads = 2
with ThreadPoolExecutor() as e:
    futures = [e.submit(count_up, counter)
               for _ in range(threads)]
    done, not_done = wait(futures)

print(f'{counter.count=:,}') # 기대한 값이 출력된다

counter.count=2,000,000


## 3 프로세스 기반의 **<span style="color:orange">ProcessPoolExecutor</span>** 클래스
── 프로세스 기반의 비동기 실행
- `concurrent.futures.ProcessPoolExecutor` 클래스를 사용하여 구체화한 서브 클래스를 정의 합니다
- `API` 및 사용법은 `ThreadPoolExecutor` 와 거의 동일해서, 클래스명만 변경하면 바로 전환 가능 합니다

### 01 **순차 처리의 구현**
다중 스레드 처리와 비교를 위한 예시

In [6]:
N = 100000

import os 
import time 
import sys

def fibonacci(n):
    a, b = 0, 1 
    for _ in range(n): 
        a, b = b, b + a 
    else: 
        return a

def elapsed_time(f):
    def wrapper(args, **kwargs):
        st = time.time()
        v = f(args, **kwargs) 
        print(f"{f.__name__}: {time.time() - st}")
        return v 
    return wrapper

@elapsed_time 
def get_sequential(nums):
    [ fibonacci(num)  for num in nums]

def main(n): 
    # n = int(sys.argv[1])
    print(f'Os Count is : {os.cpu_count()}')
    nums = [n] * os.cpu_count()
    get_sequential(nums)

main(N)

Os Count is : 12
get_sequential: 0.89145827293396


### 02 **다중 프로세스 구현**
다중 프로세스를 사용하여 병렬화 합니다

In [7]:
from concurrent.futures import (
    ProcessPoolExecutor, as_completed )

def fibonacci(n): 
    a, b = 0, 1 
    for _ in range(n): 
        a, b = b, b + a 
    else: return a

def elapsed_time(f):
    def wrapper(args, **kwargs): 
        st = time.time() 
        v = f(args, **kwargs) 
        print(f"{f.__name__}: {time.time() - st}") 
        return v 
    return wrapper

@elapsed_time 
def get_sequential(nums): 
    [fibonacci(num)  for num in nums]

@elapsed_time 
def get_multi_process(nums): 
    with ProcessPoolExecutor() as e: 
        futures = [e.submit(fibonacci, num) for num in nums] 
        [future.result()  for future in as_completed(futures)]

def main(n): 
    # n = int(sys.argv[1]) 
    nums = [n] * os.cpu_count() 
    get_multi_process(nums)

main(N)

get_multi_process: 0.23077726364135742


### 03 **pickle** 화 가능한 객체의 사용
`ProcessPoolExecutor` 클래스는 `queue` 를 사용하여 프로세스간 객체를 전달 합니다.
- `queue` 는 `multiprocessing.Queue` 클래스로 구현되고, `pickle` 형식으로 직렬화 됩니다
- `pickle` 화 불가능한 객체로 `lambda` 식으로 정의한 객체가 있습니다.

In [8]:
from concurrent.futures import (
    ProcessPoolExecutor,
    wait
)

func = lambda: 1

def main():
    with ProcessPoolExecutor() as e:
        future = e.submit(func)
        done, _ = wait([future])
    print(future.result())

try:
    main()
except Exception as e:
    import termcolor  # termcolor.COLORS
    print(termcolor.colored(e, 'red'))

[31mCan't pickle <function <lambda> at 0x7fe0a0107700>: attribute lookup <lambda> on __main__ failed[0m


### 04 **난수 사용법**


In [9]:
from concurrent.futures import (
    ProcessPoolExecutor,
    as_completed
)

import numpy as np
def use_numpy_random(): # 난수 생성기를 초기화
    return np.random.random()

def main():
    with ProcessPoolExecutor() as e:
        futures = [
            e.submit(use_numpy_random)
            for _ in range(3)]
        for future in as_completed(futures):
            print(future.result())

if __name__ == '__main__':
    main()

0.7548810825156606
0.7548810825156606
0.7548810825156606


In [10]:
from concurrent.futures import (
    ProcessPoolExecutor,
    as_completed
)

import random
def use_starndard_random():
    return random.random()

def main():
    with ProcessPoolExecutor() as e:
        futures = [
            e.submit(use_starndard_random)
            for _ in range(3)]
        for future in as_completed(futures):
            print(future.result())

if __name__ == '__main__':
    main()

0.21848652968019733
0.7947343618065531
0.9513253851747362


## 4 **<span style="color:orange">Asyncio</span>** 모듈
── 이벤트 루프를 사용한 동시처리
- 이벤트 루프를 사용하면 **단일 스레드** 에서도 동시처리가 가능해 진다
- 그리고 코드를 동기처리와 같이 서술하면 높은 가독성을 유지할 수 있다

### 01 **Async 를 활용한 코루틴**
- **<span style="color:orange">코루틴 (Co Routine)</span>** 은 **<span style="color:orange">서브루틴 (Subroutine)</span>** 과 같이 일련의 처리를 모아 둡니다
- **<span style="color:orange">서브루틴 (Subroutine)</span>** 은 파이썬에서는 함수에 해당하고, 한번 호출되면 마지막까지 한 번에 실행 합니다

In [11]:
async def coro():
    return 1

coro() # 반환값은 1이 아닌 코루틴 객체

<coroutine object coro at 0x7fe07807e0c0>

In [12]:
await coro()

1

### 02 **Asyncio 를 활용한 코루틴**
- **<span style="color:orange">코루틴 (Co Routine)</span>** 은 **<span style="color:orange">서브루틴 (Subroutine)</span>** 과 같이 일련의 처리를 모아 둡니다

In [13]:
import asyncio
import random

async def call_web_api(url):
    # Web API 처리를 여기에서는 슬립(sleep)으로 대신함
    print(f'send a request: {url}')
    await asyncio.sleep(random.random())
    print(f'got a response: {url}')
    return url

async def async_download(url): # await를 사용해 코루틴을 호출
    response = await call_web_api(url)
    return response

result = await async_download('https://twitter.com/')
result

send a request: https://twitter.com/
got a response: https://twitter.com/


'https://twitter.com/'

### 03 **Asyncio 를 활용한 코루틴 동시 실행하기**
- **<span style="color:orange">asyncio.gather()</span>** 에서 **여러 코루틴을 받아서 개별 스케줄링을** 실행 합니다
- 반환값은 **<span style="color:orange">awaitable</span>** 한 객체로, 처리를 완료하면 순서가 유지된 상태로 **코루틴** 결과를 **<span style="color:orange">list()</span>** 객체로 반환 합니다

In [14]:
async def main():
    task = asyncio.gather(
        async_download('https://twitter.com/'),
        async_download('https://facebook.com'),
        async_download('https://instagram.com'),
    )
    return await task

# Notebook 환경에서는 이미 이벤트 루프안에서 동작하고 있기 때문에 asyncio.run()은 사용할 수 없습니다. 
# 따라서 Notebook 환경에서는 asyncio.run(X)는 await X로 바꿔 읽기 바랍니다.>>> 
# result = asyncio.run(main())
result = await main()
result

send a request: https://twitter.com/
send a request: https://facebook.com
send a request: https://instagram.com
got a response: https://facebook.com
got a response: https://twitter.com/
got a response: https://instagram.com


['https://twitter.com/', 'https://facebook.com', 'https://instagram.com']

## 5 **코루틴 스케줄링과 실행**
── 코루틴을 위해서는 `이벤트  루프` 와 `테스크` 가 필요 합니다.
- **코루틴과 테스크** : `코루틴` -> (스케줄링 적용) -> `테크스`
- **이벤트 루프** : 이벤트 루프는 `I/O 이벤트` 에 맞춰서 `테스크` 실행을 제어 합니다

### 01 **이벤트 루프**
- **<span style="color:orange">이벤트 루프</span>** 는 **<span style="color:orange">asyncio</span>** 모듈의 핵심 개념으로 **<span style="color:orange">asyncio.run()</span>** 함수로 생성 합니다.
- 코루틴 내부에서는 현지 실행 중인 이벤트 루프를 **<span style="color:orange">asyncio.get_running_loop()</span>** 함수로 확인 합니다.

In [15]:
import asyncio
async def main():
    loop = asyncio.get_running_loop()
    print(loop)
    
await main()

<_UnixSelectorEventLoop running=True closed=False debug=False>


### 02 **테스크**
── 스케줄링한 코루틴의 테스크 실행
- 캡슐화 방법 3가지 : 1)`asyncio.run()`, 2) 코루틴 내부에서 `await` 로 실행, 3) `테크스` 를 만들어서 실행
- 테스크 (스케줄링한 코루틴 캡슐화) : `asyncio.Tast()` 클래스의 인스턴스, `asyncio.Future` 의 서브 클래스 입니다
- 결과값 출력 : `.result()` 를 사용하지 않고, 대신 `await` 키워드를 사용 합니다

In [16]:
async def coro(n):
    await asyncio.sleep(n)
    return n

async def main():
    task = asyncio.create_task(coro(1))
    print(task)
    return await task

# print() 시점에서는 아직 Pending 상태
await main()

<Task pending name='Task-11' coro=<coro() running at /tmp/ipykernel_37364/658887625.py:1>>


1

In [17]:
# 태스크를 작성해 실행
# 3초에 완료됨
async def main():
    task1 = asyncio.create_task(coro(1))
    task2 = asyncio.create_task(coro(2))
    task3 = asyncio.create_task(coro(3))
    print(await task1)
    print(await task2)
    print(await task3)
    
await main()

1
2
3


In [18]:
# 코루틴인 상태로 실행
# 6초에 완료됨
async def main():
    print(await coro(1))
    print(await coro(2))
    print(await coro(3))
    
await main()

1
2
3


### 03 **비동기 I/O**
── 이벤트 루프에 적합한 I/O 처리
- 비동기 예약이 정의되면, 

In [19]:
# 동기 I/O를 이용하는 처리의 태스크화
async def main():
    loop = asyncio.get_running_loop()
    # 동기 I/O를 이용하는 download에서 태스크를 작성
    futures = [loop.run_in_executor(
        None, download, url) for url in urls]
    for result in await asyncio.gather(*futures):
        print(result)
        
await main()

('https://twitter.com', './be8b09f7f1f66235a9c91986952483f0')
('https://facebook.com', './a023cfbf5f1c39bdf8407f28b60cd134')
('https://instagram.com', './09f8b89478d7e1046fa93c7ee4afa99e')


## 6 **Asyncio 모듈과 HTTP 통신**
── TCP, UDP, SSL 파이프 서브 프로세스 통신은 구현 되어 있고, HTTP 는 별도로 구현해야 합니다
- 직업 Asyncio 모듈을 사용하여 HTTP 클라이언트/ 서버를 구축 합니다

### 01 **aioHttp**
── 비동기 I/O 를 이용하는 HTTP 클라이언트 겸 서버 라이브러리
```
! pip install aiohttp
```

In [20]:
import aiohttp
import asyncio

async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()
    
async def main():
    async with aiohttp.ClientSession() as session:
        html = await fetch(session, 'http://python.org')
        print(html)
        
loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
except Exception as e:
    import termcolor  # termcolor.COLORS
    print(termcolor.colored(e, 'red'))

[31mThis event loop is already running[0m


  print(termcolor.colored(e, 'red'))
