In [78]:
import random
from enum import Enum
import numpy as np
import math
from collections import defaultdict


# 状態を表すクラス
class State():
    def __init__(self, row=-1, column=-1):
        self.column = column
        self.row = row

    # 状態の表現
    def repr(self):
        return "<State:[{}, {}]>".format(self.row, self.column)
    # クローン生成

    def clone(self):
        return State(self.row, self.column)
    # ハッシュ型のクローン?

    def __hash__(self):
        return hash((self.row, self.column))

    # 同値判定
    def __eq__(self, other):
        return self.row == other.row and self.column == other.column

# 行動の定義


class Action(Enum):
    UP = 0
    DOWN = 1
    LEFT = 2
    RIGHT = 3
    STAY = 4

# Agent_Base


class Agent():
    def __init__(self, env):
        self.observer = Observer(env)
        self.actions = env.actions

    def act(self, state):  # TODO 行動確率を返したい．行動を返す関数 & Plannerに行動選択も委託しよう
        a = random.choice(self.actions)
        return a

    def learn(self):  # TODO 学習方法について(引数には必要な要素) & Plannerに役割を分散できるか．
        pass


class MonteCarloAgent(Agent):

    def __init__(self, env, epsilon=0.1, gamma=0.9):
        super().__init__(env)
        self.epsilon = epsilon
        self.reward_log = []
        self.experience_log = []
        self.Q = defaultdict(lambda: [0] * len(self.actions))
        self.N = defaultdict(lambda: [0] * len(self.actions))
        self.gamma = gamma

    def act(self, now_state):  # epsilon-greedy
        if np.random.random() < self.epsilon:
            # ランダム行動選択
            return trans_a(np.random.randint(len(self.actions)))
        else:
            # 現時点の良い行動を選択
            if now_state in self.Q and sum(self.Q[now_state]) != 0:
                return trans_a(np.argmax(self.Q[now_state]))
            else:
                return trans_a(np.random.randint(len(self.actions)))
        

    def init_log(self):  # experienceの初期化
        self.experience_log = []

    def experience_add(self, now_state, action, reward):  # listにどんどんappendしていく
        #現在の状態からある行動によってどれだけの行動を得られたか
        self.experience_log.append({"state": now_state, "action": trans_aton(action), "reward": reward})
    
    def reward_add(self, reward):
        #reward_logに1エピソード分の報酬を追加
        self.reward_log.append(reward)

    def learn(self): #1エピソード終了後に発動
        for i, x in enumerate(self.experience_log):
            s, a = x["state"], x["action"]

            # Calculate discounted future reward of s.
            G, t = 0, 0
            for j in range(i, len(self.experience_log)):
                G += math.pow(self.gamma, t) * self.experience_log[j]["reward"]
                t += 1

            self.N[s][a] += 1  # count of s, a pair
            alpha = 1 / self.N[s][a]
            self.Q[s][a] += alpha * (G - self.Q[s][a])
            

def trans_ntoa(action_n):
    if action_n == 0:
        return Action.UP
    if action_n == 1:
        return Action.DOWN
    if action_n == 2:
        return Action.LEFT
    if action_n == 3:
        return Action.RIGHT
    if action_n == 4:
        return Action.STAY

def trans_aton(action):
    if action == Action.UP:
        return 0
    if action == Action.DOWN:
        return 1
    if action == Action.LEFT:
        return 2
    if action == Action.RIGHT:
        return 3
    if action == Action.STAY:
        return 4

# 環境の情報取得のためのクラス


class Observer():

    def __init__(self, env):
        self.env = env

    def get_state(self):  # 状態観測
        return self.env.agent_state

    def reset(self):
        # TODO
        pass

    def transform(self, state):  # 観測情報扱いやすい形に変換する
        # TODO
        pass


class Planner():

    def __init__(self):
        pass

    def learn(self, state, action, reward):  # 学習方法
        pass

    def policy(self):  # 行動選択
        pass


class LogShow():  # 記録用クラス

    def __init__(self, env):
        self.env = env
        pass

    def log_func(self):
        pass

    def show_q_value(self, Q):
        nrow = self.env.row_length
        ncol = self.env.column_length


In [83]:
#environment関連のクラス

#ライブラリimport
import numpy as np

class Maze():

    def __init__(self, grid):

        self.grid = grid
        
        self.agent_state = State()
        self.init_agent_state = State(0,0)

        self.default_reward = -0.04
        self.collision_reward = -10 #マルチエージェントの時に使用
    
    @property
    def row_length(self):
        return len(self.grid)
    @property
    def column_length(self):
        return len(self.grid[0])
    @property
    def actions(self):
        return [Action.UP, Action.DOWN, Action.LEFT, Action.RIGHT, Action.STAY]

    #環境の初期化を行う
    def reset(self):
        self.agent_state = self.init_agent_state #エージェントの位置を初期化 *とりまこれだけ
        return self.agent_state
        
    #遷移のための関数    return 遷移確率
    def transit_func(self, state, action):
        transition_probs = {}
        #動けないときは空の辞書
        if not self.can_action_at(state):
            return transition_probs

        for a in self.actions:
            prob = 0
            #選んだ行動を確実にとる
            if a == action:
                prob = 1
            
            next_state = self._move(state, a)
            if next_state not in transition_probs:
                transition_probs[next_state] = prob
            else:
                transition_probs[next_state] += prob
        return transition_probs
    
    #遷移を行う
    def transit(self, state, action): #遷移確率をエージェントから獲得する．
        transition_probs = self.transit_func(state, action)
        if len(transition_probs) == 0:
            return None, None, True

        next_states = []
        probs = []

        for s in transition_probs:
            next_states.append(s)
            probs.append(transition_probs[s])

        #おそらく選択行動がそのまま反映されるはず
        next_state = np.random.choice(next_states, p=probs)

        #報酬獲得と終了判定
        reward, done = self.reward_func(next_state)
        return next_state, reward, done
    
    #1 step turn
    def step(self, action):
        #TODO
        next_state, reward, done = self.transit(self.agent_state, action)
        
        if next_state is not None:
            self.agent_state = next_state
            
        return next_state, reward, done

    #行動可能かの判定
    def can_action_at(self, state):
        #現在空マスにいるならTrue
        if self.grid[state.row][state.column] == 0:
            return True
        else:
            return False

    def _move(self, state, action):

        if not self.can_action_at(state):
            raise Exception("Can't move from here")

        next_state = state.clone()

        #移動
        if action == Action.UP:
            next_state.row -= 1
        if action == Action.DOWN:
            next_state.row += 1
        if action == Action.LEFT:
            next_state.column -= 1
        if action == Action.RIGHT:
            next_state.column += 1
        
        #移動可能かのチェック 無理なら元に戻す
        #迷路外に出たか
        if not (0 <= next_state.row < self.row_length):
            next_state = state
        if not (0 <= next_state.column < self.column_length):
            next_state = state

        #壁にいるか
        if self.grid[next_state.row][next_state.column] == 9:
            next_state = state

        return next_state

    #報酬を与える関数
    def reward_func(self, state):
        reward = self.default_reward
        done = False

        attribute = self.grid[state.row][state.column]

        if attribute == 1:
            reward = 10
            done = True
        elif attribute == -1:
            reward = -10
            done = True

        return reward, done

In [110]:
# Training全体を受け持つクラス


class Trainer():

    def __init__(self, agent, env, episode=1, report_interval=50):
        self.env = env
        self.agent = agent

        self.episode = episode
        self.report_interval = report_interval

    def train(self):
        for i in range(self.episode):
            if i % self.report_interval == 0:
                print("Episode {}: Agent gets {} reward.".format(i+1, self.one_episode()))

    def one_episode(self):
        agent_state = self.env.reset()
        total_reward = 0
        done = False

        while not done:
            action = self.agent.act(agent_state)
            next_state, reward, done = self.env.step(action)
            total_reward += reward
            agent_state = next_state

        return total_reward


class MonteCarloTrainer(Trainer):
    def __init__(self, agent, env, episode=1, report_interval=50):
        super().__init__(agent, env, episode)
        self.report_interval = report_interval

    def one_episode(self):
        agent_state = self.env.reset()
        total_reward = 0
        done = False

        while not done:
            self.agent.init_log()
            while not done:
                a = self.agent.act(agent_state)
                n_state, reward, done = self.env.step(a)
                self.agent.experience_add( agent_state, a, reward)
                total_reward += reward
                agent_state = n_state
            else:
                self.agent.reward_add(reward)

        self.agent.learn()
        return total_reward


def main():
    # 環境データ
    grid = [
        [0, 0, 0, 1],
        [0, 0, 0, -1],
        [9, 0, -1, -1],
        [0, 0, 0, 0],
    ]

    env = Maze(grid)
    agent = MonteCarloAgent(env, epsilon=0.3)

    trainer = MonteCarloTrainer(agent, env, 10000)

    trainer.train()


if __name__ == "__main__":
    main()


Episode 1: Agent gets -10.44 reward.
Episode 51: Agent gets -3.6799999999998843 reward.
Episode 101: Agent gets -10.96 reward.
Episode 151: Agent gets -18.240000000000002 reward.
Episode 201: Agent gets 9.48 reward.
Episode 251: Agent gets 9.52 reward.
Episode 301: Agent gets 9.84 reward.
Episode 351: Agent gets 9.92 reward.
Episode 401: Agent gets 9.92 reward.
Episode 451: Agent gets 9.92 reward.
Episode 501: Agent gets 9.88 reward.
Episode 551: Agent gets 9.92 reward.
Episode 601: Agent gets -10.28 reward.
Episode 651: Agent gets 9.92 reward.
Episode 701: Agent gets 9.84 reward.
Episode 751: Agent gets 9.6 reward.
Episode 801: Agent gets 9.88 reward.
Episode 851: Agent gets 9.68 reward.
Episode 901: Agent gets 9.84 reward.
Episode 951: Agent gets 9.68 reward.
Episode 1001: Agent gets 9.76 reward.
Episode 1051: Agent gets 9.84 reward.
Episode 1101: Agent gets 9.92 reward.
Episode 1151: Agent gets 9.8 reward.
Episode 1201: Agent gets 9.92 reward.
Episode 1251: Agent gets 9.92 reward.
E