### 並行処理と並列処理

### 逐次処理で実行

### 並行処理で実行する

### 並列処理で実行する

### Pythonと並行処理

### 並行処理と非同期処理の関係

### concurrent.futuresモジュール　　　並行処理のための高水準インタフェース

### FutureクラスとExecutorクラス　　　非同期処理のカプセル化と実行

In [1]:
# ThreadPoolExecutorはExecutorの具象サブクラス
from concurrent.futures import (
    ThreadPoolExecutor,
    Future
)

In [2]:
# 非同期に行いたい処理
def func():
    return 1

In [3]:
# 非同期に行いたい処理をsubmit()に渡す
future = ThreadPoolExecutor().submit(func)
isinstance(future, Future)

True

In [4]:
# 非同期で実行した処理の戻り値を取得
future.result()

1

In [5]:
# 現在の状態を確認する
future.done()

True

In [6]:
future.running()

False

In [7]:
future.cancelled()

False

### ThreadPoolExecutorクラス　　　スレッドベースの非同期実行

### スレッドベースの非同期実行が効果的なケース

### ThreadPoolExecutorクラスを利用したマルチスレッド処理の実例

In [38]:
# 対象ページのURL一覧
urls = [
    'https://twitter.com',
    'https://facebook.com',
    'https://instagram.com'
]

In [39]:
from hashlib import md5
from pathlib import Path
from urllib import request

In [40]:
def download(url):
    req = request.Request(url)
    # ファイrう名に/などが含まれないようにする
    name = md5(url.encode('utf-8')).hexdigest()
    file_path = './' + name
    with request.urlopen(req) as res:
        Path(file_path).write_bytes(res.read())
        return url, file_path

In [41]:
# 動きを確認
download(urls[0])

('https://twitter.com', './be8b09f7f1f66235a9c91986952483f0')

### 逐次処理で実装

In [42]:
import time
def elapsed_time(f):
    def wrapper(*args, **kwargs):
        st = time.time()
        v = f(*args, **kwargs)
        print(f"{f.__name__}: {time.time() - st}")
        return v
    return wrapper

In [43]:
@elapsed_time
def get_sequential():
    for url in urls:
        print(download(url))

In [44]:
get_sequential()

('https://twitter.com', './be8b09f7f1f66235a9c91986952483f0')
('https://facebook.com', './a023cfbf5f1c39bdf8407f28b60cd134')
('https://instagram.com', './09f8b89478d7e1046fa93c7ee4afa99e')
get_sequential: 2.4556055068969727


### マルチスレッドで実装

In [45]:
from concurrent.futures import (
    ThreadPoolExecutor,
    as_completed
)

In [46]:
@elapsed_time
def get_multi_thread():
    # max_workersのデフォルトはコア数x5
    with ThreadPoolExecutor(max_workers=3) as executor:
        futures = [executor.submit(download, url)
                   for url in urls]
        for future in as_completed(futures):
            # 完了したものから取得できる
            print(future.result())

In [47]:
get_multi_thread()

('https://twitter.com', './be8b09f7f1f66235a9c91986952483f0')
('https://facebook.com', './a023cfbf5f1c39bdf8407f28b60cd134')
('https://instagram.com', './09f8b89478d7e1046fa93c7ee4afa99e')
get_multi_thread: 0.9749932289123535


### マルチスレッドの注意点

### マルチスレッド動作に問題がある実装

In [48]:
from concurrent.futures import (
    ThreadPoolExecutor,
    wait
)

In [49]:
class Counter:
    def __init__(self):
        self.count = 0
    def increment(self):
        self.count = self.count + 1

In [50]:
def count_up(counter):
    # 1,000,000回インクリメントする
    for _ in range(1000000):
        counter.increment()

In [51]:
counter = Counter()
threads = 2
with ThreadPoolExecutor() as e:
    # 2つのスレッドを用意し、それぞれでcount_upを呼び出す
    futures = [e.submit(count_up, counter)
               for _ in range(threads)]
    done, not_done = wait(futures)

In [52]:
# 数値をカンマ区切りで表示
# 2,000,000にはなっていない
print(f'{counter.count:,}')

1,668,293


### スレッドセーフな実装

In [53]:
import threading
class ThreadSafeCounter:
    # ロックを用意する
    lock = threading.Lock()
    def __init__(self):
        self.count = 0
    def increment(self):
        with self.lock:
            # 排他制御したい一連の処理をこのブロック内に書く
            self.count = self.count + 1

In [54]:
counter = ThreadSafeCounter()
threads = 2
with ThreadPoolExecutor() as e:
    futures = [e.submit(count_up, counter)
               for _ in range(threads)]
    done, not_done = wait(futures)

In [55]:
# 期待通りの値になっている
print(f'{counter.count:,}')

2,000,000


### ProcessPoolExecutorクラス　　　プロセスベースの非同期実行

マルチプロ背sうで非同期処理を行う場合、具象サブクラスにはconcurrent.futures.ProcessPoolExecutorクラスを利用します。APIや使い方はThreadPoolExecutorクラスとほぼ同じです。つまり、利用するクラス名を変更するだけで、マルチスレッドとマルチプロセスを簡単に切り替えられます。これはcouncurrent.futuresモジュールの特徴の1つです。

### プロセスベースの非同期実行が効果的なケース

マルチプロセスは、I/Oバウンドな処理だけでなく数値計算などのCPU場運dおな処理の高速化にも有効です。これは、マルチプロセスであればGILの制約を受けずに、複数コアを同時に使って並列処理を行えるためです。

### ProcessPoolExecutorクラスを利用したマルチプロセス処理の実例

(注:fib.py) import sys

def fibonacci(n): a, b = 0, 1 for _ in range(n): a, b = b, b + a else: return a

def main(): n = int(sys.argv[1]) print(fibonacci(n))

if name == 'main': main()


### 適当な値に調整すること

$python3 fib.py 1000000

### 逐次処理で実装

### マルチプロセスで実装

### マルチプロセスの注意点

### pickle化できるオブジェクトを使う

### 乱数の取り扱い方

### asyncioモジュール　　　イベントループを利用した並行処理を行う

### コルーチン　　　処理の途中で中断,再開する

### async構文を使ったコルーチンの実装

In [56]:
async def coro():
    return 1

In [57]:
# 戻り値は1ではなくコルーチンオブジェクト
coro()

<coroutine object coro at 0x000001F4F44F08C8>

In [58]:
import asyncio

In [59]:
await coro()

1

### await構文を使ったコルーチンの呼び出しと中断

In [60]:
import asyncio
import random

In [61]:
async def call_web_api(url):
    # Web APIの処理をここではスリープで代用
    print(f'send a request: {url}')
    await asyncio.sleep(random.random())
    print(f'got a response: {url}')
    return url

In [62]:
async def async_download(url):
    # awaitを使ってコルーチンを呼び出す
    response = await call_web_api(url)
    return response

In [63]:
result = await async_download('https://twitter.com/')

send a request: https://twitter.com/
got a response: https://twitter.com/


In [64]:
result

'https://twitter.com/'

### コルーチンの並行実行

In [65]:
async def main():
    task = asyncio.gather(
        async_download('https://twitter.com/'),
        async_download('https://facebook.com'),
        async_download('https://instagram.com')
    )
    return await task

In [66]:
result = await main()

send a request: https://twitter.com/
send a request: https://facebook.com
send a request: https://instagram.com
got a response: https://facebook.com
got a response: https://instagram.com
got a response: https://twitter.com/


In [67]:
result

['https://twitter.com/', 'https://facebook.com', 'https://instagram.com']

### コルーチンのスケジューリング

### イベントループ　　　asyncioモジュールの中心的な機構

In [68]:
import asyncio
async def main():
    loop = asyncio.get_running_loop()
    print(loop)

In [69]:
await main()

<_WindowsSelectorEventLoop running=True closed=False debug=False>


### タスク　　　スケジューリングしたコルーチンをカプセル化

In [70]:
async def coro(n):
    await asyncio.sleep(n)
    return n

In [71]:
async def main():
    task = asyncio.create_task(coro(1))
    print(task)
    return await task

In [72]:
await main()

<Task pending coro=<coro() running at <ipython-input-70-adc0461ab5af>:1>>


1

In [73]:
# タスクを作成して実行
# 3秒で完了する
async def main():
    task1 = asyncio.create_task(coro(1))
    task2 = asyncio.create_task(coro(2))
    task3 = asyncio.create_task(coro(3))
    print(await task1)
    print(await task2)
    print(await task3)

In [74]:
await main()

1
2
3


In [75]:
# コルーチンのまま実行
# こちらは6秒かかる
async def main():
    print(await coro(1))
    print(await coro(2))
    print(await coro(3))

In [76]:
await main()

1
2
3


### 非同期I/O　　　イベントループに適したI/O処理

### 同期I/Oを利用する処理のタスク化

In [77]:
async def main():
    loop = asyncio.get_running_loop()
    # 同期I/Oを利用するdownloadからタスクを作成
    futures = [loop.run_in_executor(
        None, download, url) for url in urls]
    for result in await asyncio.gather(*futures):
        print(result)

In [78]:
await main()

('https://twitter.com', './be8b09f7f1f66235a9c91986952483f0')
('https://facebook.com', './a023cfbf5f1c39bdf8407f28b60cd134')
('https://instagram.com', './09f8b89478d7e1046fa93c7ee4afa99e')


### asyncioモジュールとHTTP通信

### aiohttp---非同期I/Oを利用するHTTPクライアント兼サーバライブラリ

### 本章のまとめ