# Pythonで学ぶ強化学習[改訂第2版]　入門から実践まで　著:久保隆宏

## Code1

### ライブラリのインストール

In [5]:
import numpy as np
import random
from enum import Enum

### 1. クラス State

目的: グリッドワールド内のエージェントの位置を表現するためのクラスです。行（row）と列（column）のインデックスを保持します。

メソッド:

__init__: 初期状態を設定します。デフォルトでは行と列は -1 に設定されていますが、通常は有効なグリッド位置が設定されます。

__repr__: 状態の文字列表現を返します。デバッグやログ出力時に便利です。

**clone**: 現在の状態のコピーを作成します。状態の変更が他の参照に影響しないようにします。

__hash__ と __eq__: 状態をハッシュ可能にし、集合や辞書のキーとして使用可能にします。また、状態同士の比較を可能にします。

In [6]:
# 状態を表すクラス
class State:
    def __init__(self, row=-1, column=-1):
        self.row = row        # 行番号
        self.column = column  # 列番号

    def __repr__(self):
        return f"<State: [{self.row}, {self.column}]>"

    def clone(self):
        return State(self.row, self.column)

    def __hash__(self):
        # 状態をハッシュ化可能にするために必要
        return hash((self.row, self.column))

    def __eq__(self, other):
        # 状態同士の比較を可能にする
        return self.row == other.row and self.column == other.column


### 2. 列挙型 Action
目的: エージェントが取れる行動（UP, DOWN, LEFT, RIGHT）を定義します。各行動にはユニークな値が割り当てられています。

値:

UP = 1, DOWN = -1, LEFT = 2, RIGHT = -2

In [7]:
# 行動を定義する列挙型
class Action(Enum):
    UP = 1
    DOWN = -1
    LEFT = 2
    RIGHT = -2

### 3. クラス Environment
目的: グリッドワールドの環境を管理します。エージェントの位置、グリッドの状態、行動の遷移、報酬の計算などを担当します。

属性:

grid: グリッドの2次元リスト(形や障害物はmainで指定)。各セルの値はそのセルの属性を示します。

0: 通常のセル

1: ゴールセル（報酬 +1）

-1: 罰則セル（報酬 -1）

9: 障害物（エージェントは通過できません）

agent_state: エージェントの現在の状態（Stateオブジェクト）。

default_reward: 通常の移動に対するデフォルトの報酬（-0.04）。

move_prob: 指定した行動が成功する確率（デフォルトは0.8）。

In [8]:
# 環境を表すクラス
class Environment:
    def __init__(self, grid, move_prob=0.8):
        self.grid = grid                    # 環境のグリッド（2Dリスト）
        self.agent_state = State()          # エージェントの現在の状態
        self.default_reward = -0.04        # デフォルトの報酬
        self.move_prob = move_prob          # 行動が成功する確率
        self.reset()                        # 環境の初期化

    """
    プロパティ:
    row_length: グリッドの行数。
    column_length: グリッドの列数。
    actions: 可能な行動のリスト（Action列挙型のリスト）。
    states: グリッド上の有効なすべての状態（障害物を除く）。
    """
    @property
    def row_length(self):
        return len(self.grid)  # グリッドの行数

    @property
    def column_length(self):
        return len(self.grid[0])  # グリッドの列数

    @property
    def actions(self):
        return [Action.UP, Action.DOWN, Action.LEFT, Action.RIGHT]  # 可能な行動のリスト

    @property
    def states(self):
        # グリッド上の有効なすべての状態を取得
        states = []
        for row in range(self.row_length):
            for column in range(self.column_length):
                if self.grid[row][column] != 9:  # 9は障害物を示す
                    states.append(State(row, column))
        return states

    """
    メソッド:
    """
    def transit_func(self, state, action):
        """
        現在の状態と行動に基づいて、可能な遷移先の状態とその確率を計算します。行動が成功する確率（move_prob）と失敗した場合に他の方向に動く確率を考慮します。
        """
        transition_probs = {}
        if not self.can_action_at(state):
            return transition_probs  # 行動できない場合は空の辞書を返す

        try:
            opposite_direction = Action(action.value * -1)  # 逆方向の行動を取得
        except ValueError:
            opposite_direction = None  # 逆方向が定義されていない場合

        for a in self.actions:
            prob = 0
            if a == action:
                prob = self.move_prob  # 指定された行動の成功確率
            elif opposite_direction and a != opposite_direction:
                prob = (1 - self.move_prob) / 2  # 他の行動の確率
            else:
                continue  # 逆方向の行動は確率に含めない

            next_state = self._move(state, a)  # 次の状態を計算
            if next_state not in transition_probs:
                transition_probs[next_state] = prob
            else:
                transition_probs[next_state] += prob  # 確率を累積

        return transition_probs

    def can_action_at(self, state):
        """
        指定された状態で行動が可能かどうかを判定します。
        0は通常のセル、9は障害物を示す。
        """
        return self.grid[state.row][state.column] == 0

    def _move(self, state, action):
        """
        指定された状態と行動に基づいて次の状態を計算します。
        """
        if not self.can_action_at(state):
            raise Exception("Can't move from here!")

        next_state = state.clone()

        # 行動に応じて状態を更新
        if action == Action.UP:
            next_state.row -= 1  # 上に移動
        elif action == Action.DOWN:
            next_state.row += 1  # 下に移動
        elif action == Action.LEFT:
            next_state.column -= 1  # 左に移動
        elif action == Action.RIGHT:
            next_state.column += 1  # 右に移動

        # グリッドの境界チェック
        if not (0 <= next_state.row < self.row_length):
            next_state = state  # 境界外なら移動しない
        if not (0 <= next_state.column < self.column_length):
            next_state = state  # 境界外なら移動しない
        # 障害物チェック
        if self.grid[next_state.row][next_state.column] == 9:
            next_state = state  # 障害物があれば移動しない

        return next_state

    def reward_func(self, state):
        """
        指定された状態に対する報酬とエピソードの終了判定を返します。
        """
        reward = self.default_reward  # デフォルトの報酬
        done = False

        attribute = self.grid[state.row][state.column]
        if attribute == 1:
            reward = 1   # ゴールへの報酬
            done = True  # エピソード終了
        elif attribute == -1:
            reward = -1  # 罰則の報酬
            done = True  # エピソード終了

        return reward, done

    def reset(self):
        """
        環境を初期状態にリセットします。
        エージェントの位置をグリッドの左下に設定。
        """
        self.agent_state = State(self.row_length - 1, 0)
        return self.agent_state

    def step(self, action):
        """
        エージェントを一歩進め、次の状態、報酬、終了フラグを返します。
        """
        next_state, reward, done = self.transit(self.agent_state, action)
        if next_state is not None:
            self.agent_state = next_state
        return next_state, reward, done

    def transit(self, state, action):
        """
        遷移確率に基づいて次の状態をサンプリングし、報酬と終了フラグを返します。
        """
        transition_probs = self.transit_func(state, action)
        if len(transition_probs) == 0:
            return None, None, True  # 行動できない場合はエピソード終了

        next_states = []
        probs = []
        for s in transition_probs:
            next_states.append(s)
            probs.append(transition_probs[s])

        # 確率に基づいて次の状態を選択
        next_state = np.random.choice(next_states, p=probs)
        reward, done = self.reward_func(next_state)
        return next_state, reward, done


### 4. クラス Agent
目的: 環境内で行動を選択するエージェントを表します。ここではランダムな行動選択ポリシーを実装しています。

属性:

actions: 環境で可能な行動のリスト。
メソッド:

policy: 現在の状態に基づいて行動を選択します。ここではランダムに行動を選択していますが、将来的にはより複雑なポリシー（例えば、価値反復法やQ学習）を実装することができます。

In [9]:
# エージェントを表すクラス
class Agent:
    def __init__(self, env):
        self.actions = env.actions  # 環境で可能な行動のリスト

    def policy(self, state):
        """
        現在の状態に基づいて行動を選択します。
        ここではランダムに行動を選択しています。
        """
        return np.random.choice(self.actions)


### 5. 関数 main
目的: 環境とエージェントを初期化し、指定されたエピソード数だけシミュレーションを実行します。各エピソードの終了時にエージェントの総報酬を出力します。

手順:

1.グリッドの定義:

*   3x4のグリッドを定義しています。
*   1 はゴールセル、-1 は罰則セル、9 は障害物を表します。
*   例えば、grid[0][3] = 1 は最上行の右端がゴールセルであることを示します。

2.環境とエージェントの初期化:
  

*   Environment オブジェクトを作成し、グリッドを渡します。
*   Agent オブジェクトを作成し、環境を渡します。

3.エピソードの実行:


*   指定されたエピソード数（ここでは10）だけループします。
*   各エピソードで環境をリセットし、エージェントの総報酬を初期化します。
*   エピソードが終了するまでループし、エージェントが選択した行動を環境に適用します。
*   エピソードの終了時に総報酬を出力します。
*   無限ループ防止: ステップ数が100を超えた場合、エピソードを強制終了します。

In [10]:
# メイン関数
def main():
    # グリッドの定義
    # 0: 通常のセル
    # 1: ゴール（報酬 +1）
    # -1: 罰則セル（報酬 -1）
    # 9: 障害物
    grid = [
        [0, 0, 0, 1],
        [0, 9, 0, -1],
        [0, 0, 0, 0]
    ]

    env = Environment(grid)  # 環境の初期化
    agent = Agent(env)       # エージェントの初期化

    num_episodes = 10  # エピソード数

    for i in range(num_episodes):
        state = env.reset()          # 環境のリセット
        total_reward = 0            # 総報酬の初期化
        done = False                # エピソード終了フラグ
        step_count = 0              # ステップ数のカウンタ（オプション）

        while not done:
            action = agent.policy(state)          # 行動の選択
            next_state, reward, done = env.step(action)  # 環境の更新
            total_reward += reward                # 報酬の累積
            state = next_state                    # 状態の更新
            step_count += 1

            # ステップ数が一定を超えたらエピソードを終了（無限ループ防止）
            if step_count > 100:
                print(f"Episode {i}: Exceeded step limit.")
                break

        print(f"Episode {i}: Agent Score: {total_reward}")



### 6.スクリプトの実行

if __name__ == "__main__": の条件下で main() 関数が呼び出され、スクリプトが実行されます。これにより、他のモジュールからインポートされた場合に main() が自動的に実行されないようになります。

In [11]:
# スクリプトとして実行された場合にmain()を呼び出す
if __name__ == "__main__":
    main()


Episode 0: Agent Score: -1.3599999999999999
Episode 1: Agent Score: -0.4000000000000006
Episode 2: Agent Score: -0.6400000000000008
Episode 3: Exceeded step limit.
Episode 3: Agent Score: -4.040000000000003
Episode 4: Agent Score: 0.52
Episode 5: Agent Score: -3.000000000000001
Episode 6: Agent Score: 0.31999999999999995
Episode 7: Agent Score: -1.9200000000000004
Episode 8: Agent Score: -3.4400000000000013
Episode 9: Agent Score: -1.2


## Code2

### bellman_equation

### 1.価値関数V(s)

目的: 状態 s における価値関数 V(s) を計算します。

動作:
* 現在の状態 s における即時報酬 R(s) を取得します。
* 割引率 gamma を用いて、次の状態における最大の価値 max_V_on_next_state(s) を計算し、これを即時報酬に加算します。

割引率 gamma: 将来の報酬に対する現在の価値の重要度を決定します。ここでは gamma=0.99 と設定されており、未来の報酬も高く評価されます。

In [12]:
def V(s, gamma=0.99):
  V = R(s) + gamma * max_V_on_next_state(s)
  return V

### 2.即時報酬関数R(s)

目的: 状態 s に対する即時報酬を返します。

動作:
*  状態が "happy_end" の場合、報酬 1 を返します。
*  状態が "bad_end" の場合、報酬 -1 を返します。
*  その他の状態では報酬 0 を返します。

用途: エージェントが特定の状態に到達した際の報酬を定義します。

In [13]:
def R(s):
  if s == "happy_end":
    return 1
  elif s == "bad_end":
    return -1
  else:
    return 0

### 3. 最大次状態価値関数 max_V_on_next_state(s)
目的: 現在の状態 s から遷移可能な次の状態の中で、最大の価値 V を持つものを見つけ、その値を返します。

動作:
* もし状態 s が "happy_end" または "bad_end" であれば、次の価値は 0 とします（終端状態のため）。
* それ以外の場合、可能な行動（"up", "down"）に対して以下を実行:
  * 行動 a を取った場合の遷移確率を transit_func(s, a) で取得。
  * 各遷移先 next_state とその確率 prob に対して、V(next_state) を計算し、確率で重み付けして合計します。
  * 各行動に対する総合価値 v を values リストに追加。
* values が空でない場合、リスト内の最大値を返します。空の場合は 0 を返します。

In [14]:
def max_V_on_next_state(s):
  if s in ["happy_end", "bad_end"]:
    return 0

  actions = ["up", "down"]
  values = []
  for a in actions:
    transition_probs = transit_func(s,a)
    v = 0
    for next_state in transition_probs:
      prob = transition_probs[next_state]
      v += prob * V(next_state)
    values.append(v)
  return max(values)

### 4. 遷移関数 transit_func(s, a)
目的: 現在の状態 s と行動 a に基づいて、次の状態とその遷移確率を返します。

動作:
* 行動履歴の取得:
  * 状態 s は "state_up_down_up" のような文字列で表現されており、split("_")[1:] によって行動履歴（例: ["up", "down", "up"]）を取得。
* ゲームステップ数のチェック:
  * 定数 LIMIT_GAME_COUNT = 5 に基づき、行動履歴の長さが5に達しているかを確認。
  * もし5に達している場合:
    * 行動履歴内の "up" の回数をカウント。
    * "up" の回数が HAPPY_END_BOTDER = 4 以上であれば "happy_end"、そうでなければ "bad_end" に遷移。
    * 遷移確率は 1.0（確定的）。
* ゲームステップ数が制限に達していない場合:
  * 行動 a が "up" の場合、逆行動は "down"、逆に "down" の場合は "up" と設定。
  * 行動 a が成功する確率 MOVE_PROB = 0.9 で、逆行動が発生する確率は 1 - MOVE_PROB = 0.1 として遷移先を設定。
  * 具体的には、次の状態は "state_up"（行動 a が "up" の場合）で 0.9 の確率で遷移し、逆行動 "down" の場合は "state_down" に 0.1 の確率で遷移します。
* 内部関数 next_state(state, action):
  * 現在の状態 state に行動 action を適用した次の状態を生成します。例: "state_up"。

In [15]:
def transit_func(s, a):
  actions = s.split("_")[1:]
  LIMIT_GAME_COUNT = 5
  HAPPY_END_BOTDER = 4
  MOVE_PROB = 0.9

  def next_state(state, action):
    return "_".join([state, action])

  if len(actions) == LIMIT_GAME_COUNT:
    up_count = sum([1 if a == "up" else 0 for a in actions])
    state = "happy_end" if up_count >= HAPPY_END_BOTDER else "bad_end"
    prob = 1.0
    return {state: prob}
  else:
    opposite = "up" if a == "down" else "down"
    return {
        next_state(s, a): MOVE_PROB,
        next_state(s, opposite): 1 - MOVE_PROB
    }

### 5. メインブロック
目的: 関数 V をいくつかの状態に対して実行し、その結果を表示します。

動作:

* 初期状態 "state" における価値 V("state") を計算して表示。
* 行動 "up" を2回適用した状態 "state_up_up" における価値 V("state_up_up") を計算して表示。
* 行動 "down" を2回適用した状態 "state_down_down" における価値 V("state_down_down") を計算して表示。


In [16]:
if __name__ == "__main__":
  print(V("state"))
  print(V("state_up_up"))
  print(V("state_down_down"))

0.7880942034605892
0.9068026334400001
-0.96059601


### code2-5 Value Iteration

In [17]:
class Planner():

  def __init__(self, env):
    self.env = env
    self.log = []

  def initialize(self):
    self.evn.reset()
    self.log = []

  def plan(self, gamma=0.9, threshold=0.0001):
    raise Exception("Planner have to implements plan method.")

  def transitions_at(self, state, action):
    transition_probs = self.env.transit_func(state, action)
    for next_state in transition_probs:
      prob = transition_probs[next_state]
      reward, _ = self.env.reward_func(next_state)
      yield prob, next_state, reward

  def dict_to_grid(self, state_reward_dict):
    grid = []
    for i in range(self.env.row_length):
      row = [0] * self.env.column_length
      grid.append(row)
    for s in state_reward_dict:
      grid[s.row][s.column] = state_reward_dict[s]

    return grid

### code2-6

In [18]:
class ValueIterationPlanner(Planner):

  def __init__(self, env):
     super().__init__(env)

  def plan(self, gamma=0.9, threshold=0.0001):
    self.initialize()
    actions = self.env.actions
    V = {}
    for s in self.env.states:
      V[s] = 0

    while True:
      delta = 0
      self.log.append(self.dict_to_grid(V))
      for s in V:
        if not self.env.can_action_at(s):
          continue
        expected_rewards = []
        for a in actions:
          r = 0
          for prob, next_state, reward in self.transitions_at(s, a):
            r += prob * (reward + gamma * V[next_state])
          expected_rewards.append(r)
        max_reward = max(expected_rewards)
        delta = max(delta, abs(max_reward - V[s]))
        V[s] = max_reward
      if delta < threshold:
        break
    V_grid = self.dict_to_grid(V)
    return V_grid


### code2-7 localhost8888を立ち上げて起動させる(server)

### code2-8

In [19]:
class PolicyIterationPlanner(Planner):

  def __init__(self, env):
     super().__init__(env)
     self.policy = {}

  def initialize(self):
    super().initialize()
    self.policy = {}
    actions = self.env.actions
    states = self.env.next_states
    for s in states:
      self.policy[s] = {}
      for a in actions:
        self.policy[s][a] = 1 / len(actions)

### code2-9

In [21]:
def estimate_by_policy(self, gamma, threshold):
  V = {}
  for s in self.env.states:
    V[s] = 0

  while True:
    delta = 0
    for s in V:
      expected_rewards = []
      for a in self.policy[s]:
        action_prob = self.policy[s][a]
        r = 0
        for prob, next_state, reward in self.transitions_at(s, a):
          r += action_prob * prob * \
          (reward + gamma * V[next_state])
        expected_rewards.append(r)
      value = sum(expected_rewards)
      delta = max(delta, abs(value - V[s]))
      V[s] = value
    if delta < threshold:
      break
  return V

### code2-10

In [22]:
def plan(self, gamma=0.9, threshold=0.0001):
  self.initialize()
  states = self.env.states
  actions = self.env.actions

  def take_max_action(action_value_dict):
    return max(action_value_dict, key=action_value_dict.get)

  while True:
    update_stable = True
    V = self.estimate_by_policy(gamma, threshold)
    self.log.append(self.dict_to_grid(V))

    for s in states:
      policy_action = take_max_action(self.policy[s])
      action_rewards = {}
      for a in actions:
        r = 0
        for prob, next_state, reward in self.transitions_at(s, a):
          r += prob * (reward + gamma * V[next_state])
        action_rewards[a] = r
    best_action = take_max_action(action_rewards)
    if policy_action != best_action:
      update_stable = False
    for a in self.policy[s]:
      prob = 1 if a == best_action else 0
      self.policy[s][a] = prob
    if update_stable:
      break
  V_grid = self.dict_to_grid(V)
  return V_grid

### code2-11

In [23]:
class Environment():

  def __init__(self, grid, move_prob=0.8):
    self.grid = grid
    self.agent_state = State()
    self.default_reward = -0.84
    self.move_prob = move_prob
    self.reset()