In [None]:
import copy
import random
import time

import numpy as np
import matplotlib.pyplot as plt

In [None]:
EPSILON = .1    # 探索率
ALPHA = .1      # 学習率
GAMMA = .90     # 割引率
ACTIONS = np.arange(4)  # 行動の集合

class Summon:

    def __init__(self, zero_list, population=2):
        self.agents = self.__generate_agents(zero_list, population)

    def __generate_agents(self, zero_list, population):
        agents = []
        for id in range(population):
            ini_state = random.choice(zero_list) # 初期状態（エージェントのスタート地点の位置）
            agents.append(
                QLearningAgent(
                    alpha=ALPHA,
                    gamma=GAMMA,
                    epsilon=EPSILON,
                    actions=ACTIONS,
                    observation=ini_state)) 
        return agents



class QLearningAgent:
    """
        Q学習 エージェント
    """

    def __init__(
            self,
            alpha=.2,
            epsilon=.1,
            gamma=.99,
            actions=None,
            observation=None):
        self.alpha = alpha
        self.gamma = gamma
        self.epsilon = epsilon
        self.reward_history = []
        self.actions = actions
        self.observation = observation
        self.state = str(observation)
        self.ini_state = str(observation)
        self.previous_state = None
        self.previous_action = None
        self.q_values = self._init_q_values()

    def _init_q_values(self):
        """
           Q テーブルの初期化
        """
        q_values = {}
        q_values[self.state] = np.repeat(0.0, len(self.actions))
        return q_values

    def init_state(self):
        """
            状態の初期化
        """
        self.previous_state = copy.deepcopy(self.ini_state)
        self.state = copy.deepcopy(self.ini_state)
        return self.state

    def act(self):
        # ε-greedy選択
        if np.random.uniform() < self.epsilon:  # random 行動
            action = np.random.randint(0, len(self.q_values[self.state]))
        else:   # greedy 行動
            action = np.argmax(self.q_values[self.state])

        self.previous_action = action
        return action

    def observe(self, next_state, reward=None):
        """
            次の状態と報酬の観測
        """
        next_state = str(next_state)
        if next_state not in self.q_values:  # 始めて訪れる状態であれば
            self.q_values[next_state] = np.repeat(0.0, len(self.actions))

        self.previous_state = copy.deepcopy(self.state)
        self.state = next_state

        if reward is not None:
            self.reward_history.append(reward)
            self.learn(reward)

    def learn(self, reward):
        """
            Q値の更新
        """
        q = self.q_values[self.previous_state][self.previous_action]  # Q(s, a)
        max_q = max(self.q_values[self.state])  # max Q(s')
        # Q(s, a) = Q(s, a) + alpha*(r+gamma*maxQ(s')-Q(s, a))
        self.q_values[self.previous_state][self.previous_action] = q + \
            (self.alpha * (reward + (self.gamma * max_q) - q))


In [None]:
class GridWorld:

    def __init__(self, x_max, y_max):

        self.x_max = x_max
        self.y_max = y_max
        self.filed_type = {
            "N": 0,  # 通常
            "G": 1,  # ゴール
            "W": 2,  # 壁
            "H": 3,  # 他の人間
        }
        self.actions = {
            "UP": 0,
            "DOWN": 1,
            "LEFT": 2,
            "RIGHT": 3
        }
        self.map = np.zeros((y_max,x_max))
        self.map[::2] = 2 
        self.map[:, ::2] = 0
        self.map[0,0] = 1

        self.zero_list = list(zip(*np.where( self.map < 1)))

        #self.start_pos = start_x,start_y   # エージェントのスタート地点(x, y)
        #self.agent_pos = copy.deepcopy(self.start_pos)  # エージェントがいる地点

    def step(self, start_x, start_y, action):
        """
            行動の実行
            状態, 報酬、ゴールしたかを返却
        """
        to_x, to_y = start_x, start_y

        # 移動可能かどうかの確認。移動不可能であれば、ポジションはそのままにマイナス報酬
        if self._is_possible_action(to_x, to_y, action) == False:
            self.agent_pos = to_x, to_y
            return self.agent_pos, -10, False

        if action == self.actions["UP"]:
            to_y += -1
        elif action == self.actions["DOWN"]:
            to_y += 1
        elif action == self.actions["LEFT"]:
            to_x += -1
        elif action == self.actions["RIGHT"]:
            to_x += 1

        is_goal = self._is_end_episode(to_x, to_y)  # エピソードの終了の確認
        reward = self._compute_reward(to_x, to_y)
        self.agent_pos = to_x, to_y
        return self.agent_pos, reward, is_goal

    def _is_end_episode(self, x, y):
        """
            x, yがエピソードの終了かの確認。
        """
        if self.map[y,x] == self.filed_type["G"]: # ゴール
            return True
        #elif self.map[y][x] == self.filed_type["T"]:    # トラップ
            return True
        else:
            return False

    def _is_wall(self, x, y):
        """
            x, yが壁または人間かどうかの確認
        """
        if self.map[y,x] == self.filed_type["W"]:
            return True
        elif self.map[y,x] == self.filed_type["H"]:
            return True
        else:
            return False

    def _is_possible_action(self, x, y, action):
        """
            実行可能な行動かどうかの判定
        """
        to_x = x
        to_y = y

        if action == self.actions["UP"]:
            print("下に行った")
            to_y += -1
            #print(to_y,to_x)
        elif action == self.actions["DOWN"]:
            print("上に行った")
            to_y += 1
            #print(to_y,to_x)
        elif action == self.actions["LEFT"]:
            print("左に行った")
            to_x += -1
            #print(to_y,to_x)
        elif action == self.actions["RIGHT"]:
            print("右に行った")
            to_x += 1
            #print(to_y,to_x)

        if self.map.shape[0] <= to_y or 0 > to_y:
            print("y行き過ぎ")
            return False
        elif self.map.shape[1] <= to_x or 0 > to_x:
            print("x行き過ぎ")
            return False
        elif self._is_wall(to_x, to_y):
            print("壁だった")
            #print(to_y,to_x)
            return False

        return True

    def _compute_reward(self, x, y):
        if self.map[y,x] == self.filed_type["N"]:
            return 0
        elif self.map[y,x] == self.filed_type["G"]:
            return 100
        #elif self.map[y,x] == self.filed_type["T"]:
            return -100

    def reset(self):
        self.map = np.zeros((self.y_max,self.x_max))
        self.map[::2] = 2 
        self.map[:, ::2] = 0
        self.map[0,0] = 1
        self.agent_pos = self.start_pos
        return self.start_pos


In [None]:
# 定数
NB_EPISODE = 1   # エピソード数
X_MAX = 15
Y_MAX = 18
POPULATION = 2
start = time.time()

if __name__ == '__main__':
    grid_env = GridWorld(   # grid worldの環境の初期化
        x_max=X_MAX,
        y_max=Y_MAX,)

    summon = Summon(    # エージェントの召喚
        zero_list=grid_env.zero_list,
        population=POPULATION)
    
    times = []
    is_end_episode = []
    for episode in range(NB_EPISODE):   # 実験
        #episode_reward = []  # 1エピソードの累積報酬
        for i in range(POPULATION):
            start_x = summon.agents[i].observation[0]
            start_y = summon.agents[i].observation[1]
            grid_env.map[start_x][start_y] = 3
            is_end_episode.append(False)
            print(start_x,start_y)
        plt.imshow(grid_env.map)
        plt.show()
        print(is_end_episode)
        while(False in is_end_episode):    # 全員がゴールするまで続ける
            for id in range(POPULATION):
                if is_end_episode[id] == False:
                    start_x = summon.agents[id].observation[0]
                    start_y = summon.agents[id].observation[1]
                    grid_env.map[start_x][start_y] = 0
                    print(start_x,start_y)
                    action = summon.agents[id].act()  # 行動選択
                    print(action)
                    state, reward, is_end_episode[id] = grid_env.step(start_x, start_y, action)
                else: grid_env.map[0,0] = 1
                if is_end_episode[id] == False:
                    grid_env.map[state[0]][state[1]] = 3
                    print(state,reward)
                    summon.agents[id].observe(state, reward)   # 状態と報酬の観測
                plt.imshow(grid_env.map)
                plt.show()
        print(is_end_episode)    
            #episode_reward.append(reward)
            #rewards.append(np.sum(episode_reward))  # このエピソードの平均報酬を与える
        #times.append(len(episode_reward)) #かかった時間をリストに追加
        state = grid_env.reset()  # 初期化
        #agent.observe(state)    # エージェントを初期位置に
        print(f"EP.{episode +1} End") #(t = {len(episode_reward)})"

    # 所要時間の計算
    print(f"time = {time.time()-start}")
