# 동시 처리와 병렬 처리 ── 여러 처리를 동시에 수행

## 순차 처리 실행

## 동시 처리 실행

#### 다중 스레드와 GIL

## 병렬 처리

## 파이썬과 동시 처리

### 동시 처리와 비동기 처리의 관계

# concurrent.futures 모듈 ── 동시 처리를 위한 고수준 인터페이스

## Future 클래스와 Executor 클래스 ── 비동기 처리 캡슐화와 실행

In [12]:
# ThreadPoolExecutor는 Executor의 구현 서브 클래스
from concurrent.futures import (
    ThreadPoolExecutor,
    Future
)

In [13]:
# 비동기로 수행할 처리
def func():
    return 1

In [14]:
# 비동기로 수행할 처리를 submit()에 전달
future = ThreadPoolExecutor().submit(func)
isinstance(future, Future)

True

In [15]:
# 비동기로 수행한 처리의 반환값 취득
future.result()

1

In [16]:
# 현재 상태 확인
future.done()

True

In [17]:
future.running()

False

In [18]:
future.cancelled()

False

## ThreadPoolExecutor 클래스 ── 스레드 기반 비동기 실행

### 스레드 기반 비동기 실행이 효과적일 때

## ThreadPoolExecutor 클래스를 이용한 다중 스레드 처리 실제 사례

In [19]:
# 대상 페이지 URL 목록
urls = [
    'https://twitter.com',
    'https://facebook.com',
    'https://instagram.com',
]

In [20]:
from hashlib import md5
from pathlib import Path
from urllib import request

In [21]:
def download(url):
    req = request.Request(url)
    # 파일 이름에 / 등이 포함되지 않도록 함
    name = md5(url.encode('utf-8')).hexdigest()
    file_path = './' + name
    with request.urlopen(req) as res:
        Path(file_path).write_bytes(res.read())
        return url, file_path

In [22]:
# 동작 확인
download(urls[0])

URLError: <urlopen error [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1108)>

### 순차 처리 구현

In [23]:
import time
def elapsed_time(f):
    def wrapper(*args, **kwargs):
        st = time.time()
        v = f(*args, **kwargs)
        print(f"{f.__name__}: {time.time() - st}")
        return v
    return wrapper

In [24]:
@elapsed_time
def get_sequential():
    for url in urls:
        print(download(url))

In [25]:
get_sequential()

URLError: <urlopen error [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1108)>

### 다중 스레드 구현

In [None]:
from concurrent.futures import (
    ThreadPoolExecutor,
    as_completed
)

In [None]:
@elapsed_time
def get_multi_thread():
    # max_workers의 기본값은 코어 수 * 5
    with ThreadPoolExecutor(max_workers=3) as executor:
        futures = [executor.submit(download, url)
                   for url in urls]
        for future in as_completed(futures):
            # 완료된 것부터 얻을 수 있음
            print(future.result())

In [None]:
get_multi_thread()

### 다중 스레드의 주의점

### 다중 스레드에서의 동작에 문제가 발생할 때

In [26]:
from concurrent.futures import (
    ThreadPoolExecutor,
    wait
)

In [27]:
class Counter:
    def __init__(self):
        self.count = 0
    def increment(self):
        self.count = self.count + 1

In [28]:
def count_up(counter):
    # 1,000,000회 증가시킴
    for _ in range(1000000):
        counter.increment()

In [29]:
counter = Counter()
threads = 2
with ThreadPoolExecutor() as e:
    # 2개의 스레드를 준비하고, 각각 count_up을 호출함
    futures = [e.submit(count_up, counter)
               for _ in range(threads)]
    done, not_done = wait(futures)

In [30]:
# 숫자값을 ,로 구분해서 표시
# 2,000,000이 되지 않음
print(f'{counter.count=:,}')

counter.count=1,737,177


### 스레드 세이프한 구현

In [31]:
import threading
class ThreadSafeCounter:
    # 록을 준비함
    lock = threading.Lock()
    def __init__(self):
        self.count = 0
    def increment(self):
        with self.lock:
            # 배타 제어할 처리를 록 안에 씀
            self.count = self.count + 1

In [32]:
counter = ThreadSafeCounter()
threads = 2
with ThreadPoolExecutor() as e:
    futures = [e.submit(count_up, counter)
               for _ in range(threads)]
    done, not_done = wait(futures)

In [33]:
# 기대한 값이 됨
print(f'{counter.count=:,}')

counter.count=2,000,000


## ProcessPoolExecutor 클래스 ── 프로세스 기반 비동기 실행

### 프로세스 비동기 실행이 효과적일 때

## ProcessPoolExecutor 클래스를 이용한 다중 프로세스 처리 실졔 사례

샘플 코드에서는 최종 파일만 배포합니다

(주:fib.py)
import sys

def fibonacci(n):
    a, b = 0, 1
    for _ in range(n):
        a, b = b, b + a
    else:
        return a

def main():
    n = int(sys.argv[1])
    print(fibonacci(n))

if __name__ == '__main__':
    main()

# 적당한 값으로 조정할 것
!python3 fib.py 1000000

### 순차 처리로 구현

(주:fib.py)
import os
import time
import sys

def fibonacci(n):
    a, b = 0, 1
    for _ in range(n):
        a, b = b, b + a
    else:
        return a

def elapsed_time(f):
    def wrapper(*args, **kwargs):
        st = time.time()
        v = f(*args, **kwargs)
        print(f"{f.__name__}: {time.time() - st}")
        return v
    return wrapper

@elapsed_time
def get_sequential(nums):
    for num in nums:
        print(fibonacci(num))

def main():
    n = int(sys.argv[1])
    nums = [n] * os.cpu_count()
    get_sequential(nums)

if __name__ == '__main__':
    main()

!python3 fib.py 1000000

### 다중 프로세스로 구현

(주:fib.py)
import os
import time
import sys
from concurrent.futures import (
    ProcessPoolExecutor,
    as_completed
)

def fibonacci(n):
    a, b = 0, 1
    for _ in range(n):
        a, b = b, b + a
    else:
        return a

def elapsed_time(f):
    def wrapper(*args, **kwargs):
        st = time.time()
        v = f(*args, **kwargs)
        print(f"{f.__name__}: {time.time() - st}")
        return v
    return wrapper

@elapsed_time
def get_sequential(nums):
    for num in nums:
        print(fibonacci(num))

@elapsed_time
def get_multi_process(nums):
    with ProcessPoolExecutor() as e:
        futures = [e.submit(fibonacci, num)
                   for num in nums]
        for future in as_completed(futures):
            print(future.result())

def main():
    n = int(sys.argv[1])
    nums = [n] * os.cpu_count()
    get_multi_process(nums)

if __name__ == '__main__':
    main()

!python3 fib.py 1000000

In [34]:
!cat fib.py

import os
import time
import sys
from concurrent.futures import (
    ThreadPoolExecutor,
    as_completed
)

def fibonacci(n):
    a, b = 0, 1
    for _ in range(n):
        a, b = b, b + a
    else:
        return a

def elapsed_time(f):
    def wrapper(*args, **kwargs):
        st = time.time()
        v = f(*args, **kwargs)
        print(f"{f.__name__}: {time.time() - st}")
        return v
    return wrapper

@elapsed_time
def get_sequential(nums):
    for num in nums:
        print(fibonacci(num))

@elapsed_time
def get_multi_thread(nums):
    with ThreadPoolExecutor() as e:
        futures = [e.submit(fibonacci, num)
                   for num in nums]
        for future in as_completed(futures):
            print(future.result())

def main():
    n = int(sys.argv[1])
    nums = [n] * os.cpu_count()
    get_multi_thread(nums)

if __name__ == '__main__':
    main()

!python3 fib.py 1000000

### 다중 프로세스의 주의점

### pickle화 가능한 객체 사용

In [35]:
!cat unpickle.py

from concurrent.futures import (
    ProcessPoolExecutor,
    wait
)

func = lambda: 1

def main():
    with ProcessPoolExecutor() as e:
        future = e.submit(func)
        done, _ = wait([future])
    print(future.result())

if __name__ == '__main__':
    main()

In [36]:
!python3 unpickle.py

concurrent.futures.process._RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/queues.py", line 239, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function <lambda> at 0x7fb94801c310>: attribute lookup <lambda> on __main__ failed
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "unpickle.py", line 15, in <module>
    main()
  File "unpickle.py", line 12, in main
    print(future.result())
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/concurrent/futures/_base.py", line 432, in result
    return self.__get_result()
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/concurrent/futures/_base.py", 

### 난수 사용법

In [37]:
!cat rand.py

from concurrent.futures import (
    ProcessPoolExecutor,
    as_completed
)
import numpy as np

def use_numpy_random():
    # 난수 생성기를 초기화할 때 이 행을 실행함
    # np.random.seed()
    return np.random.random()

def main():
    with ProcessPoolExecutor() as e:
        futures = [e.submit(use_numpy_random)
                   for _ in range(3)]
        for future in as_completed(futures):
            print(future.result())

if __name__ == '__main__':
    main()

In [38]:
!pip install numpy

Collecting numpy
  Downloading numpy-1.19.4-cp38-cp38-macosx_10_9_x86_64.whl (15.3 MB)
[K     |████████████████████████████████| 15.3 MB 2.0 MB/s eta 0:00:01
[?25hInstalling collected packages: numpy
Successfully installed numpy-1.19.4


In [39]:
# macOS에서는 Python3.8로 실행
!python3 rand.py

0.5016572826575797
0.09325158216819784
0.2172261772080898


In [40]:
!docker run -it --rm -v $(pwd):/usr/src/app -w /usr/src/app python:3.8.1 bash -c 'pip install numpy; python3 rand.py'

Collecting numpy
  Downloading numpy-1.19.4-cp38-cp38-manylinux2010_x86_64.whl (14.5 MB)
[K     |████████████████████████████████| 14.5 MB 1.6 MB/s eta 0:00:01
[?25hInstalling collected packages: numpy
Successfully installed numpy-1.19.4
You should consider upgrading via the '/usr/local/bin/python -m pip install --upgrade pip' command.[0m
0.144542955879863
0.144542955879863
0.144542955879863


In [41]:
!cat standard_rand.py

from concurrent.futures import (
    ProcessPoolExecutor,
    as_completed
)
import random

def use_starndard_random():
    return random.random()

def main():
    with ProcessPoolExecutor() as e:
        futures = [e.submit(use_starndard_random)
                   for _ in range(3)]
        for future in as_completed(futures):
            print(future.result())

if __name__ == '__main__':
    main()

In [42]:
!docker run -it --rm -v $(pwd):/usr/src/app -w /usr/src/app python:3.8.1 python3 standard_rand.py

0.8048240847467094
0.16351998662010914
0.41609230306304734


# asyncio 모듈 ──이벤트 루프를 이용한 동시 처리 수행

## 코루틴 ── 처리 도중에 중단, 다시 시작

### async 구문을 사용한 코루틴 구현

In [43]:
async def coro():
    return 1

In [44]:
# 반환값은 1이 아닌 코루틴 객체
coro()

<coroutine object coro at 0x7fd060604540>

In [45]:
import asyncio

In [46]:
await coro()

1

### await 구문을 사용한 코루틴 호출과 중단

In [47]:
import asyncio
import random

In [48]:
async def call_web_api(url):
    # Web API 처리를 여기에서는 슬립(sleep)으로 대신함
    print(f'send a request: {url}')
    await asyncio.sleep(random.random())
    print(f'got a response: {url}')
    return url

In [49]:
async def async_download(url):
    # await를 사용해 코루틴을 호출
    response = await call_web_api(url)
    return response

In [50]:
result = await async_download('https://twitter.com/')

send a request: https://twitter.com/
got a response: https://twitter.com/


In [51]:
result

'https://twitter.com/'

### 코루틴 동시 실행

In [52]:
async def main():
    task = asyncio.gather(
        async_download('https://twitter.com/'),
        async_download('https://facebook.com'),
        async_download('https://instagram.com'),
    )
    return await task

In [53]:
result = await main()

send a request: https://twitter.com/
send a request: https://facebook.com
send a request: https://instagram.com
got a response: https://facebook.com
got a response: https://twitter.com/
got a response: https://instagram.com


In [54]:
result

['https://twitter.com/', 'https://facebook.com', 'https://instagram.com']

## 코루틴 스케줄링과 실행

### 이벤트 루프 ── asyncio 모듈의 핵심 구조

In [55]:
import asyncio
async def main():
    loop = asyncio.get_running_loop()
    print(loop)

In [56]:
await main()

<_UnixSelectorEventLoop running=True closed=False debug=False>


### 태스크 ── 스케줄링한 코루틴을 캡슐화

In [57]:
async def coro(n):
    await asyncio.sleep(n)
    return n

In [58]:
async def main():
    task = asyncio.create_task(coro(1))
    print(task)
    return await task

In [59]:
# print() 시점에서는 아직 Pending 상태
await main()

<Task pending name='Task-9' coro=<coro() running at <ipython-input-57-adc0461ab5af>:1>>


1

In [60]:
# 태스크를 작성해 실행
# 3초에 완료됨
async def main():
    task1 = asyncio.create_task(coro(1))
    task2 = asyncio.create_task(coro(2))
    task3 = asyncio.create_task(coro(3))
    print(await task1)
    print(await task2)
    print(await task3)

In [61]:
await main()

1
2
3


In [62]:
# 코루틴인 상태로 실행
# 6초에 완료됨
async def main():
    print(await coro(1))
    print(await coro(2))
    print(await coro(3))

In [63]:
await main()

1
2
3


### 비동기 I/O ── 이벤트 루프에 적합한 I/O 처리

### 동기 I/O를 이용하는 처리의 태스크화

In [64]:
async def main():
    loop = asyncio.get_running_loop()
    # 동기 I/O를 이용하는 download에서 태스크를 작성
    futures = [loop.run_in_executor(
        None, download, url) for url in urls]
    for result in await asyncio.gather(*futures):
        print(result)

In [65]:
await main()

URLError: <urlopen error [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1108)>

## asyncioモジュールとHTTP通信

### aiohttp ── 비동기 I/O를 이용하는 HTTP 클라이언트 겸 서버 라이브러리

# 정리