# o'reillyのツノガレイ強化学習の本

## 多腕バンディット問題、動的計画法

## [目次](TableOfContents.ipynb)
- [環境準備](#環境準備)
  - [インストール](#インストール)
  - [インポート](#インポート)
  - [共通関数](#共通関数)
- [多腕バンディット問題](#多腕バンディット問題)
  - [期待値＝報酬の平均として価値を求める実装](#期待値＝報酬の平均として価値を求める実装)
    - [naive implementation](#naive_implementation)
    - [incremental implementation](#incremental_implementation)
  - [クラス定義](#クラス定義)
    - [スロット台](#プレーヤー)
    - [プレーヤー](#プレーヤー)
  - [プレーヤーがスロットしつつ強化学習](#プレーヤーがスロットしつつ強化学習)
    - [1000ステップで強化学習](#1000ステップで強化学習)
      - [1000ステップの強化学習を１回実行](#1000ステップの強化学習を１回実行)
      - [1000ステップの強化学習を10回実行](#1000ステップの強化学習を10回実行)
    - [1000ステップで強化学習を200回繰り返した平均で評価](#1000ステップで強化学習を200回繰り返した平均で評価)
      - [探索の確率を10％にして実行](#探索の確立を10％にして実行)
      - [探索の確率を1～30％にして実行](#探索の確率を1～30％にして実行)
  - [非定常な多腕バンディット問題](#非定常な多腕バンディット問題)
    - [コードの修正](#コードの修正)
      - [スロット台](#スロット台)
      - [プレイヤー](#プレイヤー)
    - [コードの実行](#コードの実行)
- [動的計画法（方策反復法、価値反復法）](#動的計画法（方策反復法、価値反復法）)
  - [連立方程式ソルバをベルマン作用素的に解く](#連立方程式ソルバをベルマン作用素的に解く)
    - [普通に実装](#普通に実装)
    - [上書き方式の実装](#上書き方式の実装)
  - [3*4マスのグリッド・ワールド](#3*4マスのグリッド・ワールド)
    - [状態価値関数のランダム初期化＆表示](#状態価値関数のランダム初期化＆表示)
    - [方策反復](#方策反復)
      - [方策評価、状態価値関数の推定](#方策評価、状態価値関数の推定)
      - [方策改善、状態価値関数の変化](#方策改善、状態価値関数の変化)
    - [価値反復](#価値反復)


## 参考
- https://github.com/oreilly-japan/deep-learning-from-scratch-4/tree/master/ch01
- [強化学習（Reinforcement Learning） - .NET 開発基盤部会 Wiki](https://dotnetdevelopmentinfrastructure.osscons.jp/index.php?%E5%BC%B7%E5%8C%96%E5%AD%A6%E7%BF%92%EF%BC%88Reinforcement%20Learning%EF%BC%89)

## 環境準備

### インストール

In [None]:
!pip install numpy
!pip install matplotlib
!pip install dezerogym

### インポート

In [None]:
import numpy as np
import matplotlib.pyplot as plt

In [None]:
import warnings
warnings.filterwarnings('ignore')
import sys, os
sys.path.append(os.pardir)  # 親ディレクトリのファイルをインポートするための設定

### 共通関数
greedy法、貪欲法

In [None]:
def argmax(d):
    """d (dict)"""
    max_value = max(d.values())
    max_key = -1
    for key, value in d.items():
        if value == max_value:
            max_key = key
    return max_key

In [None]:
def greedy_policy(V, env, gamma):
    pi = {}

    for state in env.states():
        action_values = {}

        for action in env.actions():
            next_state = env.next_state(state, action)
            r = env.reward(state, action, next_state)
            value = r + gamma * V[next_state]
            action_values[action] = value

        max_action = argmax(action_values)
        action_probs = {0: 0, 1: 0, 2: 0, 3: 0}
        action_probs[max_action] = 1.0
        pi[state] = action_probs
    return pi

## 多腕バンディット問題

### 期待値＝報酬の平均として価値を求める実装

#### naive_implementation

In [None]:
np.random.seed(0)
rewards = []

for n in range(1, 11):
    reward = np.random.rand()
    rewards.append(reward)
    Q = sum(rewards) / n
    print(Q)

#### incremental_implementation
インクリメンタルな実装は計算量が少なくて済む。

In [None]:
np.random.seed(0)
Q = 0

for n in range(1, 11):
    reward = np.random.rand()
    Q = Q + (reward - Q) / n
    print(Q)

### クラス定義

#### スロット台

In [None]:
class Bandit:
    def __init__(self, arms=10):
        self.rates = np.random.rand(arms)

    def play(self, arm):
        rate = self.rates[arm]
        if rate > np.random.rand():
            return 1
        else:
            return 0

#### プレーヤー

In [None]:
class Agent:
    def __init__(self, epsilon, action_size=10):
        self.epsilon = epsilon           # ε貪欲法
        self.Qs = np.zeros(action_size)  # 価値
        self.ns = np.zeros(action_size)  # 台毎のカウンタ

    def update(self, action, reward):
        self.ns[action] += 1 # 台毎のカウンタをインクリメント
        self.Qs[action] = self.Qs[action] + (1 / self.ns[action]) * (reward - self.Qs[action])
        
    def get_action(self):
        if np.random.rand() < self.epsilon:
            return np.random.randint(0, len(self.Qs))
        return np.argmax(self.Qs)

### プレーヤーがスロットしつつ強化学習

#### 1000ステップで強化学習

In [None]:
def execute1():
    steps = 1000
    epsilon = 0.1

    # 台の初期化（10台）
    bandit = Bandit()
    agent = Agent(epsilon)
    total_reward = 0
    total_rewards = []
    rates = []

    for step in range(steps):
        # 台の選択
        action = agent.get_action()
        # 台のプレイ
        reward = bandit.play(action)
        # 台の価値の更新
        agent.update(action, reward)
        # 報酬の総額＝収益
        total_reward += reward

        # グラフ描画の履歴
        total_rewards.append(total_reward)
        rates.append(total_reward / (step + 1))

    # グラフ描画の履歴を戻す
    return total_rewards, rates

##### 1000ステップの強化学習を１回実行

In [None]:
total_rewards, rates = execute1()

plt.ylabel('Total reward')
plt.xlabel('Steps')
plt.plot(total_rewards)
plt.show()

plt.ylabel('Rates')
plt.xlabel('Steps')
plt.plot(rates)
plt.show()

##### 1000ステップの強化学習を10回実行

In [None]:
plt.ylabel('Rates')
plt.xlabel('Steps')

for cnt in range(10):
    total_rewards, rates = execute1()
    color = plt.cm.jet(cnt / 10)
    plt.plot(rates, color=color)

plt.show()

#### 1000ステップの強化学習を200回繰り返した平均で評価

In [None]:
def execute2(epsilon):
    runs = 200
    steps = 1000
    all_rates = np.zeros((runs, steps))  # (2000, 1000)

    # 強化学習を200回繰り返した平均で評価
    for run in range(runs):
        bandit = Bandit()
        agent = Agent(epsilon)
        total_reward = 0
        rates = []

        # 1000ステップ
        for step in range(steps):
            action = agent.get_action()
            reward = bandit.play(action)
            agent.update(action, reward)
            total_reward += reward
            rates.append(total_reward / (step + 1))

        all_rates[run] = rates

    return np.average(all_rates, axis=0)

##### 探索の確立を10％にして実行
epsilon = 0.1

In [None]:
epsilon = 0.1
avg_rates = execute2(epsilon)
plt.ylabel('Rates')
plt.xlabel('Steps')
plt.plot(avg_rates)
plt.show()

##### 探索の確率を1～30％にして実行
epsilon = 0.01, 0.1, 0.3

In [None]:
avg_rates1 = execute2(0.01)
avg_rates2 = execute2(0.1)
avg_rates3 = execute2(0.3)

plt.plot(avg_rates1, color='blue', label='0.01')
plt.plot(avg_rates2, color='red', label='0.1')
plt.plot(avg_rates3, color='green', label='0.3')

plt.ylabel('Rates')
plt.xlabel('Steps')
plt.legend()
plt.show()

### 非定常な多腕バンディット問題
非定常問題では、スロット台の勝率が時間とともに変化する。

#### コードの修正

##### スロット台
プレイの度に、勝率を変化させる。

In [None]:
class NonStatBandit:
    def __init__(self, arms=10):
        self.arms = arms
        self.rates = np.random.rand(arms)

    def play(self, arm):
        rate = self.rates[arm]
        self.rates += 0.1 * np.random.randn(self.arms)  # Add noise
        if rate > np.random.rand():
            return 1
        else:
            return 0

##### プレイヤー
価値の更新のアルゴリズムを変更する。
- 元のアルゴリズムは試行回数が増えるほど、最新の結果の影響度は低くなっていた。
- 変更後のアルゴリズムは、最新の結果の影響度は試行回数の影響を受けないように変化させている。

In [None]:
class AlphaAgent:
    def __init__(self, epsilon, alpha, actions=10):
        self.epsilon = epsilon      # ε貪欲法
        self.Qs = np.zeros(actions) # 価値
        
        # 学習率を 1/n から 0 ≦ α ≦ 1 に変更。
        self.alpha = alpha

    def update(self, action, reward):
        self.Qs[action] = self.Qs[action] + self.alpha  * (reward - self.Qs[action])

    def get_action(self):
        if np.random.rand() < self.epsilon:
            return np.random.randint(0, len(self.Qs))
        return np.argmax(self.Qs)

#### コードの実行
非定常問題に合わせて価値の更新のアルゴリズムを変更した方が、勝率が伸びる。

In [None]:
runs = 200
steps = 1000
epsilon = 0.1
alpha = 0.8
agent_types = ['sample average', 'alpha const update']
results = {}

for agent_type in agent_types:
    all_rates = np.zeros((runs, steps))  # (200, 1000)

    # 強化学習を200回繰り返した平均で評価
    for run in range(runs):
        if agent_type == 'sample average':
            agent = Agent(epsilon)
        else:
            agent = AlphaAgent(epsilon, alpha)

        # 台の初期化（10台）
        bandit = NonStatBandit()
        total_reward = 0
        rates = []
        
        # 1000ステップ
        for step in range(steps):
            action = agent.get_action()
            reward = bandit.play(action)
            agent.update(action, reward)
            total_reward += reward
            rates.append(total_reward / (step + 1))

        all_rates[run] = rates

    avg_rates = np.average(all_rates, axis=0)
    results[agent_type] = avg_rates

# plot
plt.figure()
plt.ylabel('Average Rates')
plt.xlabel('Steps')
for key, avg_rates in results.items():
    plt.plot(avg_rates, label=key)
plt.legend()
plt.show()

## 動的計画法（方策反復法、価値反復法）

### 連立方程式ソルバをベルマン作用素的に解く

#### 普通に実装

In [None]:
V = {'L1': 0.0, 'L2': 0.0}
new_V = V.copy()

cnt = 0
while True:
    new_V['L1'] = 0.5 * (-1 + 0.9 * V['L1']) + 0.5 * (1 + 0.9 * V['L2'])
    new_V['L2'] = 0.5 * (0 + 0.9 * V['L1']) + 0.5 * (-1 + 0.9 * V['L2'])

    delta = abs(new_V['L1'] - V['L1'])
    delta = max(delta, abs(new_V['L2'] - V['L2']))
    V = new_V.copy()

    cnt += 1
    if delta < 0.0001:
        print(V)
        print(cnt)
        break

#### 上書き方式の実装
コッチのほうが少々早く収束する。

In [None]:
V = {'L1': 0.0, 'L2': 0.0}

cnt = 0
while True:
    t = 0.5 * (-1 + 0.9 * V['L1']) + 0.5 * (1 + 0.9 * V['L2'])
    delta = abs(t - V['L1'])
    V['L1'] = t

    t = 0.5 * (0 + 0.9 * V['L1']) + 0.5 * (-1 + 0.9 * V['L2'])
    delta = max(delta, abs(t - V['L2']))
    V['L2'] = t

    cnt += 1
    if delta < 0.0001:
        print(V)
        print(cnt)
        break

### 3*4マスのグリッド・ワールド

#### 状態価値関数のランダム初期化＆表示

In [None]:
import numpy as np
from dezerogym.gridworld import GridWorld

env = GridWorld()
V = {}
for state in env.states():
    V[state] = np.random.randn()
env.render_v(V)

#### 方策反復
方策評価（ベルマン作用素で状態価値関数を推定）、方策改善を繰り返す。

##### 方策評価、状態価値関数の推定
爆弾の周辺が赤く、リンゴの周辺が緑になる。

###### 定義

In [None]:
from collections import defaultdict
from dezerogym.gridworld import GridWorld

# 方策評価の1ステップ
def eval_onestep(pi, V, env, gamma=0.9):
    
    # 全状態を処理
    for state in env.states():
        
        # ゴールの
        if state == env.goal_state:
            V[state] = 0 # 状態価値は0（その前に報酬がある）
            continue

        # 固定の確率的方策を使用する
        action_probs = pi[state]
        
        # 以下の実装が ≒ 方策評価
        # ベルマン作用素で状態価値関数の推定
        new_V = 0
        for action, action_prob in action_probs.items():
            next_state = env.next_state(state, action)
            reward = env.reward(state, action, next_state)
            # 報酬や状態遷移確率は=1なので表記されていない。
            new_V += action_prob * (reward + gamma * V[next_state])
            
        # 新しい状態価値関数
        V[state] = new_V
        
    return V # 新しい状態価値関数

# 方策評価
def policy_eval(pi, V, env, gamma, threshold=0.001):
    
    # 収束するまで実行
    while True:
        old_V = V.copy()
        
        # 全状態の評価
        V = eval_onestep(pi, V, env, gamma)

        # 収束したか確認
        delta = 0
        for state in V.keys():
            t = abs(V[state] - old_V[state])
            if delta < t:
                delta = t

        if delta < threshold:
            break
    return V

###### 実行

In [None]:
# グリッドワールドのインスタンスを作成
env = GridWorld()

# 割引率を指定
gamma = 0.9

# 確率的方策を指定
pi = defaultdict(lambda: {0: 0.25, 1: 0.25, 2: 0.25, 3: 0.25})

# 状態価値関数を初期化
V = defaultdict(lambda: 0)

# 方策反復の方策評価（状態価値関数を更新）
V = policy_eval(pi, V, env, gamma)

# 状態価値関数のヒートマップを作図
env.render_v(V, pi)

##### 方策改善、状態価値関数の変化
方策反復で方策改善を行った後の方策 → 状態価値関数の変化

###### 実行

In [None]:
# 方策反復
def policy_iter(env, gamma, threshold=0.001, is_render=True):
    
    # 方策
    pi = defaultdict(lambda: {0: 0.25, 1: 0.25, 2: 0.25, 3: 0.25})
    # 状態価値関数
    V = defaultdict(lambda: 0)

    # 方策反復
    while True:
        
        # 方策評価
        V = policy_eval(pi, V, env, gamma, threshold)
        # 方策改善
        new_pi = greedy_policy(V, env, gamma)

        # 状態価値関数を描画
        if is_render:
            env.render_v(V, pi)

        # 方策が収束するまで継続
        if new_pi == pi:
            break
        pi = new_pi

    return pi

###### 実行

In [None]:
# グリッドワールドのインスタンスを作成
env = GridWorld()

# 割引率を指定
gamma = 0.9

# 方策反復
pi = policy_iter(env, gamma)

#### 価値反復
ベルマン最適作用素で最適価値関数を求め最適方策を決める。

###### 定義

In [None]:
# 方策評価の1ステップ
def value_iter_onestep(V, env, gamma):
    
    # 全状態を処理
    for state in env.states():
        
        # ゴールの
        if state == env.goal_state:
            V[state] = 0 # 状態価値は0（その前に報酬がある）
            continue

        # 以下の実装が ≒ 方策評価
        # ベルマン最適作用素で状態価値関数の推定
        action_values = []
        for action in env.actions():
            next_state = env.next_state(state, action)
            reward = env.reward(state, action, next_state)
            # 報酬や状態遷移確率は=1なので表記されていない。
            value = reward + gamma * V[next_state]
            action_values.append(value)

        # 新しい状態価値関数
        V[state] = max(action_values)
    
    return V # 新しい状態価値関数

# 価値反復（＝方策評価の反復で方策反復はしない）
def value_iter(V, env, gamma, threshold=0.001, is_render=True):
    
    # 状態価値関数
    V = defaultdict(lambda: 0)

    # 収束するまで実行
    while True:
        
        # 状態価値関数を描画
        if is_render:
            env.render_v(V)

        old_V = V.copy()
        
        # 全状態の評価
        V = value_iter_onestep(V, env, gamma)

        # 収束したか確認
        delta = 0
        for state in V.keys():
            t = abs(V[state] - old_V[state])
            if delta < t:
                delta = t

        if delta < threshold:
            break
    
    # 最適方策を描画
    pi = greedy_policy(V, env, gamma)
    env.render_v(V, pi)

###### 実行

In [None]:
# グリッドワールドのインスタンスを作成
env = GridWorld()

# 割引率を指定
gamma = 0.9

# 価値反復
value_iter(V, env, gamma)