## 【FutureクラスとExcutorクラス - 非同期処理のカプセル化と実行】

### `ThreadPoolExecutor`は`Excutor`の具象サブクラス

In [1]:
from concurrent.futures import Future, ThreadPoolExecutor

### 非同期に行いたい処理

In [2]:
def func():
    return 1

### 非同期に行いたい処理(func)をsubmit()に渡す

In [11]:
future: Future = ThreadPoolExecutor().submit(func)
print(type(future))
print(f"{isinstance(future, Future)=}")

<class 'concurrent.futures._base.Future'>
isinstance(future, Future)=True


### 非同期に実行した処理の戻り値を取得

In [12]:
future.result()

1

### 現在の状態を確認する

In [13]:
future.done()

True

In [14]:
future.running()

False

In [15]:
future.cancelled()

False

## 【ThreadPoolExecutorクラスを利用したマルチスレッド処理の実例】

### download関数はURLをひとつ受け取り、そのページをファイルに保存する

In [18]:
# 対象ページのURL一覧
urls = [
    "https://twitter.com",
    "https://facebook.com",
    "https://instagram.com",
]
from hashlib import md5
from pathlib import Path
from urllib import request


def download(url):
    req = request.Request(url)
    # ファイル名に/などが含まれないようにする
    name = md5(url.encode("utf-8")).hexdigest()
    file_path = "./" + name
    with request.urlopen(req) as res:
        Path(file_path).write_bytes(res.read())
        return url, file_path

### 動きを確認

In [21]:
%%time
download(urls[0])

Wall time: 1.58 s


('https://twitter.com', './be8b09f7f1f66235a9c91986952483f0')

### 逐次処理で実装

In [22]:
%%time
def get_sequential():
    for url in urls:
        print(download(url))


get_sequential()

('https://twitter.com', './be8b09f7f1f66235a9c91986952483f0')
('https://facebook.com', './a023cfbf5f1c39bdf8407f28b60cd134')
('https://instagram.com', './09f8b89478d7e1046fa93c7ee4afa99e')
Wall time: 18.4 s


### マルチスレッドで実装

In [28]:
from concurrent.futures import ThreadPoolExecutor, as_completed


def get_multi_thread():
    # max_workersのデフォルトはコア数 * 5
    with ThreadPoolExecutor(max_workers=3) as executor:
        futures = [executor.submit(download, url) for url in urls]
        for future in as_completed(futures):
            print(future.result())

In [29]:
%%time
get_multi_thread()

('https://twitter.com', './be8b09f7f1f66235a9c91986952483f0')
('https://instagram.com', './09f8b89478d7e1046fa93c7ee4afa99e')
('https://facebook.com', './a023cfbf5f1c39bdf8407f28b60cd134')
Wall time: 2.73 s


## ■マルチスレッドでの動作に問題がある実装

In [38]:
%%time
from concurrent.futures import ThreadPoolExecutor, wait


class Counter:
    def __init__(self):
        self.count = 0

    def increment(self):
        self.count = self.count + 1


def count_up(counter):
    # 1,000,000回インクリメント
    for _ in range(1_000_000):
        counter.increment()


counter = Counter()
threads = 2
with ThreadPoolExecutor() as e:
    futures = [e.submit(count_up, counter) for _ in range(threads)]
    done, not_done = wait(futures)

# 数値をカンマ区切りで表示
# 2,000,000にはなっていない
print(f"{counter.count=:,}")

counter.count=1,887,762
Wall time: 303 ms


### ■スレッドセーフな実装

In [39]:
%%time
import threading


class ThreadSafeCounter:
    # ロックを用意する
    lock = threading.Lock()

    def __init__(self):
        self.count = 0

    def increment(self):
        with self.lock:
            # 排他制御したい一連の処理をこのブロック内に書く
            self.count = self.count + 1


counter = ThreadSafeCounter()
threads = 2
with ThreadPoolExecutor() as e:
    futures = [e.submit(count_up, counter) for _ in range(threads)]
    done, not_done = wait(futures)

# 期待通りの値になっている
print(f"{counter.count=:,}")

counter.count=2,000,000
Wall time: 1.49 s


## 【ProcessPoolExecutorクラスを利用したマルチプロセス処理の実例】

In [16]:
%%time
!python fib.py 1000000

返される値は環境で異なる
6
Wall time: 39.2 s


In [20]:
%%time
!python fib.py 1000000

返される値は環境で異なる
os.cpu_count()=6
Wall time: 7.16 s
