In [1]:
"""
強化学習のマルコフ決定過程（MDP）を理解するためのサンプルプログラムです。

「機械学習スタートアップシリーズ Pythonで学ぶ強化学習 入門から実践まで」の
サンプルプログラム「baby-steps-of-rl-ja/DP」を参考に作っています。

ユーザーがエージェントとなって迷路を探索します。
攻略を通し、コードを読んで MDP の基本的な概念を学習していってください。

環境：迷路
エージェント：ユーザー
"""


from enum import Enum
import numpy as np


class State():
    """
    迷路の一つのセル＝状態を表すクラスです

    Attributes
    ----------
    row : int
        迷路全体の中のこのセルのy座標
    column : int
        迷路全体の中のこのセルのx座標
    """
    def __init__(self, row=-1, column=-1):
        self.row = row
        self.column = column

    def __repr__(self):
        return "<State: [{}, {}]>".format(self.row, self.column)

    def clone(self):
        return State(self.row, self.column)

    def __hash__(self):
        return hash((self.row, self.column))

    def __eq__(self, other):
        return self.row == other.row and self.column == other.column


class Action(Enum):
    """
    迷路で取れる行動を表すクラスです
    """

    UP = 1
    DOWN = -1
    LEFT = 2
    RIGHT = -2


class Environment():
    """
    迷路環境クラスです

    Attributes
    ----------
    grid : 2d-array
        迷路のマップ
    agent_state : State
        現在エージェントがいる迷路の位置
    default_reward: float
        エージェントが移動するたびに、セルの値に関係なく与えられる報酬
        つまり移動することにも小さな罰が与えられ、早くゴールする必要があることを意味する
    move_prob : float
        エージェントはこの確率で選択した方向に移動できる
        これは (1 - move_prob) の確率で選択したのと逆の方向に移動することを意味する

    Notes
    -----
    grid は二次元配列で、迷路のマップを表している
    マップはセルで表されていて、それぞれのセルの値は以下のような意味を持つ
         0: 普通のセル
         5: 壁セル、エージェントは通れない
         9: ゴールセル、ここに到達することでエピソードが終わり、ゴール報酬がもらえる
        -1: 罰セル、小さな負の報酬が与えられる

    単純な迷路のマップを以下に示す
    grid = [
        [0, 0, 0, 5, 9],
        [0, 5, 0, 5, 0],
        [0, 5, 0, 5, 0],
        [0, 5, 0, 0, 0],
    ]
    """
    def __init__(self, grid, move_prob=0.8):
        self.grid = grid
        self.agent_state = State()
        self.default_reward = -3
        self.move_prob = move_prob

    @property
    def row_length(self):
        return len(self.grid)

    @property
    def column_length(self):
        return len(self.grid[0])

    @property
    def actions(self):
        """
        環境の行動空間を表す
        この場合行動空間の大きさは４
        """
        return [Action.UP, Action.DOWN,
                Action.LEFT, Action.RIGHT]

    @property
    def states(self):
        """
        環境の状態空間を表す
        この場合状態空間の大きさは row x column
        """
        states = []
        for row in range(self.row_length):
            for column in range(self.column_length):
                states.append(State(row, column))
        return states

    def can_action_at(self, state):
        if self.grid[state.row][state.column] == 9:
            return False
        else:
            return True

    def _move(self, state, action):
        if not self.can_action_at(state):
            raise Exception("Can't move from here!")

        next_state = state.clone()

        # 行動する（移動する）
        if action == Action.UP:
            next_state.row -= 1
        elif action == Action.DOWN:
            next_state.row += 1
        elif action == Action.LEFT:
            next_state.column -= 1
        elif action == Action.RIGHT:
            next_state.column += 1

        # 壁の向こう側に移動しようとしていたら、何もしなかったことにする
        if not (0 <= next_state.row < self.row_length):
            next_state = state
        if not (0 <= next_state.column < self.column_length):
            next_state = state

        # ブロックセルの上に移動しようとしていたら、何もしなかったことにする
        if self.grid[next_state.row][next_state.column] == 5:
            next_state = state

        return next_state

    def transit_func(self, state, action):
        """
        状態遷移関数
        状態と行動を引数に次の状態とそこへの遷移確率を返す
        """
        transition_probs = {}
        if not self.can_action_at(state):
            # すでにゴールセルにいる
            return transition_probs

        opposite_direction = Action(action.value * -1)

        for a in self.actions:
            prob = 0
            if a == action:
                prob = self.move_prob
            elif a != opposite_direction:
                prob = (1 - self.move_prob) / 2

            next_state = self._move(state, a)
            if next_state not in transition_probs:
                transition_probs[next_state] = prob
            else:
                transition_probs[next_state] += prob

        print(f"Transit Probabilities: {transition_probs}")
        return transition_probs

    def reward_func(self, state):
        """
        報酬関数
        状態を引数にその状態に対する報酬を返す
        """
        reward = self.default_reward
        done = False

        # セルの値を参考に報酬を決める
        attribute = self.grid[state.row][state.column]
        if attribute == 9:
            # ゴール！ゴール報酬を与える
            reward = 100
            done = True
        elif attribute == -1:
            # 罰セル。負の報酬を与える
            reward = reward - 10

        return reward, done

    def transit(self, state, action):
        transition_probs = self.transit_func(state, action)
        if len(transition_probs) == 0:
            return None, None, True

        next_states = []
        probs = []
        for s in transition_probs:
            next_states.append(s)
            probs.append(transition_probs[s])

        next_state = np.random.choice(next_states, p=probs)
        reward, done = self.reward_func(next_state)
        return next_state, reward, done

    def interact(self, action):
        """
        行動を引数に新しい状態の観測結果を返す
        """
        next_state, reward, done = self.transit(self.agent_state, action)
        if next_state is not None:
            self.agent_state = next_state

        return next_state, reward, done

    def reset(self):
        # エージェントを初期位置（左下）に戻す
        self.agent_state = State(self.row_length - 1, 0)
        return self.agent_state


class Agent():
    """
    エージェントクラス
    今回は人間がエージェントであり方策です！
    """
    def __init__(self, env):
        self.actions = env.actions

    def policy(self, state):
        while True:
            _input = input('Action > ')
            if _input in ['q', 'quit', 'exit']:
                exit()
            elif _input == 'up':
                return Action.UP
            elif _input == 'down':
                return Action.DOWN
            elif _input == 'left':
                return Action.LEFT
            elif _input == 'right':
                return Action.RIGHT
            else:
                print("Invalid action input. "
                      "plese input one of 'up', 'down', 'left', 'right'")


def main(grid):
    env = Environment(grid)
    agent = Agent(env)

    # 環境を初期化し、初期状態を観測する
    state = env.reset()

    done = False
    gain = 0

    # 環境からエピソード終了信号が来るまで相互作用を行う
    while not done:
        print(f"New state: {state}")
        action = agent.policy(state)
        state, reward, done = env.interact(action)
        print(f"Reward: {reward}")
        gain += reward

    print(f"Goal! Your score is {gain}.")



In [None]:
# 簡単な迷路を解いてみます

grid = [
    [0, 0, 0, 5, 9],
    [0, 5, 0, 5, 0],
    [0, 5, 0, 5, 0],
    [0, 5, 0, 0, 0],
]
main(grid)

In [5]:
# 複雑な迷路を解いてみましょう
# 最大利得 61 を得られるか？！

import pickle

grid = pickle.loads(b'\x80\x04\x95h\x00\x00\x00\x00\x00\x00\x00]\x94(]\x94(J\xff\xff\xff\xffK\x00K\x00K\x00K\x05e]\x94(K\x05K\x00K\x05K\x00J\xff\xff\xff\xffe]\x94(K\x00K\x00K\x05K\x05K\x00e]\x94(K\x00K\x05K\tK\x00K\x00e]\x94(K\x00K\x05J\xff\xff\xff\xffK\x05K\x05e]\x94(K\x00J\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00K\x00ee.')
main(grid)

New state: <State: [5, 0]>
Action > left
Transit Probabilities: {<State: [4, 0]>: 0.09999999999999998, <State: [5, 0]>: 0.9, <State: [5, 1]>: 0}
Reward: -3
New state: <State: [4, 0]>
Action > up
Transit Probabilities: {<State: [3, 0]>: 0.8, <State: [5, 0]>: 0, <State: [4, 0]>: 0.19999999999999996}
Reward: -3
New state: <State: [3, 0]>
Action > up
Transit Probabilities: {<State: [2, 0]>: 0.8, <State: [4, 0]>: 0, <State: [3, 0]>: 0.19999999999999996}
Reward: -3
New state: <State: [2, 0]>
Action > left
Transit Probabilities: {<State: [2, 0]>: 0.9, <State: [3, 0]>: 0.09999999999999998, <State: [2, 1]>: 0}
Reward: -3
New state: <State: [2, 0]>


KeyboardInterrupt: 

In [4]:
# このセルを実行すると難しい方の迷路のマップを見ることが出来ます

attributes = {0: ' ', 5: '+', -1:'x', 9: 'G'}
for row in grid:
    print(f"|{'|'.join([attributes[col] for col in row])}|")

|x| | | |+|
|+| |+| |x|
| | |+|+| |
| |+|G| | |
| |+|x|+|+|
| |x|x| | |
