<a href="https://colab.research.google.com/github/etomaro/RL/blob/main/Grid_world_V.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [84]:
import numpy as np
import matplotlib.pyplot as plt


class Agent():

    GAMMA = 0.9  # 割引率
    ACTIONS = ['right', 'up', 'left', 'down']
    # 移動の際に使う値
    act_dict = {
        'right': np.array([0,1]),
        'up': np.array([-1,0]),
        'left': np.array([0,-1]),
        'down': np.array([1,0])
    }
    # 方策(固定)
    pi_dict1 = {
        'right': 0.25,
        'up': 0.25,
        'left': 0.25,
        'down': 0.25
    }
    # 行動数
    num_action = len(ACTIONS)  # 4

    def __init__(self, array_or_list=np.array([0,0])):
        # listもnp.arrayでもいい
        if type(array_or_list) == list:
            array = np.array(array_or_list)
        else:
            array = array_or_list

        # 条件に当てはまらない場合エラーを返す
        assert (array[0] >= 0 and array[0] < 5 and \
                array[1] >= 0 and array[1] < 5)
 
        self.pos = array  # pos -> position


    # 現在位置を返す
    def get_pos(self):
        return self.pos
    
    # 現在位置をセットする
    def set_pos(self, array_or_list):
        # listもnp.arrayでもいい
        if type(array_or_list) == list:
            array = np.array(array_or_list)
        else:
            array = array_or_list

        # 条件に当てはまらない場合エラーを返す
        assert (array[0] >= 0 and array[0] < 5 and \
                array[1] >= 0 and array[1] < 5)
 
        self.pos = array  # pos -> position

    # 現在位置から移動
    def move(self, action):
        """
        - A - B -
        - - - - -
        - - - b -
        - - - - -
        - a - - -
        """
        # 移動量を取得
        move_coord = self.act_dict[action].copy()
    
        # A地点
        if (self.get_pos() == np.array([0,1])).all():
            pos_new = [4,1]  # a地点
        elif (self.get_pos() == np.array([0,3])).all():
            pos_new = [2,3]  # b地点
        else:
            pos_new = self.get_pos() + move_coord 
        
        # グリッドの外に出た場合の処理(出ないようにする)
        pos_new[0] = np.clip(pos_new[0], 0, 4)  # 0未満なら0に。4より上なら4に
        pos_new[1] = np.clip(pos_new[1], 0, 4)
        self.set_pos(pos_new)

    # 現在位置から移動することによる報酬。この関数では移動自体は行わない
    def reward(self, state, action):
        # A地点
        if (state == np.array([0,1])).all():
            r = 10
            return r
        # B地点
        if (state == np.array([0,3])).all():
            r = 5
            return r
        
        # グリッドの外に行く場合は罰則
        if state[0] == 0 and action == 'up':
            r = -1
        elif state[0] == 4 and action == 'down':
            r = -1
        elif state[1] == 0 and action == 'left':
            r = -1
        elif state[1] == 4 and action == 'right':
            r = -1
        else:
            r = 0

        return r
 
    # 方策(π)
    def pi(self, state, action):
        # 変数にstateを持っているが、今回は方策はstateに依存しない
        return self.pi_dict1[action]
    
    # 状態価値関数の算出
    def V_pi(self, state, n, out, iter_num):
        """
        state: 関数呼び出した時の状態
        out: 戻り値用。関数実行時は0を指定。※この引数を消してresult=0としてやった場合変わるかどうかテストする。
        n: 再帰関数の呼び出し回数が何回目か。最初の呼び出しは1
        iter_num: 再帰関数を何回呼び出す予定か。
        """
        if n == iter_num:  # 末端状態
            for action in self.ACTIONS:
                out += self.pi(state, action) * self.reward(state, action)
            return out
        else:
            for action in self.ACTIONS:
                out += self.pi(state, action) * self.reward(state, action)
                # 次の状態に遷移
                self.move(action)

                # 再帰
                out += self.pi(self.get_pos(), action) * \
                       self.V_pi(self.get_pos(), n+1, 0, iter_num) * self.GAMMA

                # stateを関数(または再帰関数)呼び出し時に戻す
                self.set_pos(state)

            return out

    # 全てのstateでの状態価値関数を算出
    def V_pi_all(self, n, out, iter_num):
        """
        argのstateは全てのstateでの価値を算出するので必要ない

        戻り値
          V_pi_all[2][3] -> (2,3)での状態価値
        """
        grid_pos_array = np.array([
            [0,0], [0,1], [0,2], [0,3], [0,4],
            [1,0], [1,1], [1,2], [1,3], [1,4],
            [2,0], [2,1], [2,2], [2,3], [2,4],
            [3,0], [3,1], [3,2], [3,3], [3,4],
            [4,0], [4,1], [4,2], [4,3], [4,4]
        ])

        # 戻り値を初期化
        result_V_all = np.zeros(shape=(5,5))

        # ループで5*5回Vを求めていく
        for pos in grid_pos_array:
            x = pos[0]
            y = pos[1]

            tmp_V = self.V_pi(
                state=pos,
                n=1,
                out=0,
                iter_num=iter_num
            )
            
            # 結果を挿入
            result_V_all[x][y] = tmp_V
        
        return result_V_all
        

    # 行動価値関数の算出
    def Q_pi(self):
        pass





In [77]:
# 全ての状態価値関数の算出
"""
値を再計算する場合は「V_all_〇〇.npy」ファイルを削除する必要あり

処理時間
  iter_num:10 -> 17:03(1038s)
"""
import time
import os

agent = Agent()

# 計算するステップ数
iter_num = 10

_filepath = f'./V_all_{str(iter_num)}.npy'
is_file = os.path.isfile(_filepath)
if is_file:
    load_V = np.load(_filepath)
    print(load_V)
else:
    start = time.time()
    # 算出
    V_all = agent.V_pi_all(
        n=1, out=0, iter_num=iter_num
    )
    print('処理時間: ', time.time()-start, '\n')

    # ファイル作成
    np.save(_filepath, V_all)
    print('fileを作成しました')
    
    # 出力
    print("V_all\n", V_all)


KeyboardInterrupt: ignored

In [None]:
# テスト
# ------Agent------
agent = Agent(np.array([0,0]))

# 1 get_pos()のテスト
now_pos = agent.get_pos()
print("1 get_pos()のテストのテスト\nnow_pos: ", now_pos, "\n")

# 2 set_pos()のテスト
agent.set_pos(np.array([3,3]))
print("2 set_pos()のテスト\nnow_pos: ", agent.get_pos(), "\n")

# 3.1 move()のテスト (3,3) -> (2,3)
agent.move("up")
print("3.1 move()のテスト\nnow_pos: ", agent.get_pos(), "\n")

# 3.2 move()のテスト (4,4) -> right -> (4,4)
agent.set_pos(np.array([4,4]))
agent.move("right")
print("3.2 move()のテスト\nnow_pos: ", agent.get_pos(), "\n")

# 4.1 A地点からアクションした際の報酬と次の状態
agent.set_pos(np.array([0,1]))
reward = agent.reward(agent.get_pos(), "up")
agent.move("up")
print("4.1 A地点からアクションした際の報酬と次の状態\nreward: ", reward, "\nnow_pos: ", agent.get_pos(), "\n")

# 4.2 B地点からアクションした際の報酬と次の状態
agent.set_pos(np.array([0,3]))
reward = agent.reward(agent.get_pos(), "right")
agent.move("right")
print("4.2 B地点からアクションした際の報酬と次の状態\nreward: ", reward, "\nnow_pos: ", agent.get_pos(), "\n")

# 4.3 (4,4)からrightした時の報酬
agent.set_pos(np.array([4,4]))
reward = agent.reward(agent.get_pos(), "right")
agent.move("right")
print("4.3 (4,4)からrightした時の報酬報酬\nreward: ", reward, "\nnow_pos: ", agent.get_pos(), "\n")




1 get_pos()のテストのテスト
now_pos:  [0 0] 

2 set_pos()のテスト
now_pos:  [3 3] 

3.1 move()のテスト
now_pos:  [2 3] 

3.2 move()のテスト
now_pos:  [4 4] 

4.1 A地点からアクションした際の報酬と次の状態
reward:  10 
now_pos:  [4 1] 

4.2 B地点からアクションした際の報酬と次の状態
reward:  5 
now_pos:  [2 3] 

4.3 (4,4)からrightした時の報酬報酬
reward:  -1 
now_pos:  [4 4] 



In [83]:
# (0,0)地点での状態価値関数の算出
"""
iter=10の時の時
10回行動した時まで考える。
考慮する状態のパターンは4**10(104万8576)
再帰関数を呼ぶ(実行する)回数は回数は9回。関数自体は10回。

計算速度の結果
  iter_num: 2 -> 0.0008s
  iter_num: 5 -> 0.17s  
  iter_num:10 -> 37s
  iter_num:11 -> 163s
"""
import time 


agent = Agent(np.array([0,0]))

start = time.time()  # 計測開始
v00 = agent.V_pi(
    state=agent.get_pos(),
    n=1,
    out=0,
    iter_num=2
)
time_result = time.time() - start  # 計測終了

print("v00: ", v00)
print("計測時間: ", time_result)

v00:  1.46875
計測時間:  0.0008413791656494141


In [None]:
# fileの作成のテスト
import os
import numpy as np


# ------通常ファイル------
# fileの作成
_filepath = './test.txt'
filecontents = """
1 aiu
2 eo
3 kakiku
4 keko
"""

# 書き込み
# fileが存在しない場合のみ実行する。これがないと同じファイル名で毎度上書きされる
is_file = os.path.isfile(_filepath)
if is_file:
    print('同じ名前のfileが既に存在します')
else:
    with open(_filepath, 'w') as f:
        f.write(filecontents)
    print('fileを作成しました')

# ファイルの読み込み
with open(_filepath, 'r') as f:
    print(f.read())


# ------numpyファイル------
# numpy.arrayのタイプを確認 -> numpy.ndarray
num_type = np.array([1,2,3])
print(type(num_type))

# file(numpy.array)の作成
_filepath_num = 'test.npy'
filecontents_num = np.array([1,2,3,4,5])

# 書き込み
is_file_num = os.path.isfile(_filepath_num)
if is_file_num:
    print('既にnpファイルは存在しています')
else:
    np.save(_filepath_num, filecontents_num)
    print('npファイルを作成しました')

# 読み込み
load_num = np.load(_filepath_num)
print(load_num)
print(type(load_num))





fileを作成しました

1 aiu
2 eo
3 kakiku
4 keko

<class 'numpy.ndarray'>
既にnpファイルは存在しています
[1 2 3 4 5]
<class 'numpy.ndarray'>


In [None]:
# numpyテスト
import numpy as np 

# 型のテスト
array = np.array([1,2,3])
print('array: ', array, '\n', 'arrayのタイプ: ', type(array), '\n')

array_multi = np.array([[1,2,3], [4,5,6], [7,8,9]])
print('array_multi: ', array_multi, '\n', 'array_multiのタイプ', type(array_multi), '\n')
print('array_multi[0]のタイプ: ', type(array_multi[0]), '\n')

# 追加
array = np.array([1,2])
array_new = np.append(array, 3)
print('array: ', array, '\n', 'array_new: ', array_new, '\n')

array_multi = np.array([[1,2], [3,4]])
array_multi_new = np.append(array_multi, [5,6])
print('array_multi: ', array_multi, '\n', 'array_multi_new', array_multi_new, '\n')

# 2次元配列の追加(axis=0, 追加先と追加する次元が一致する必要あり, 追加する値はnp.array()する必要なくリストでいい)
array_multi2 = np.array([[1,2], [3,4]])
array_multi2_new = np.append(array_multi2, [[5,6]], axis=0)
print('array_multi2: ', array_multi2, "\n", 'array_multi2_new: ', array_multi2_new, '\n')
print('array_multi2_new[0]のタイプ: ', type(array_multi2_new[0]), '\n', 'array_muulti2_new[2]のタイプ', type(array_multi2_new[2]), '\n')

# list->numpy
list1 = [1,2,3]
array1 = np.array(list1)
print("------list->numpyのテスト------")
print('list1:', list1, '\n', 'array1: ', array1)
print('list1のタイプ: ', type(list1), '\n', 'array1のタイプ: ', type(array1), '\n')

list2 = [[1,2], [3,4]]
array2 = np.array(list2)
print('list2:', list2, '\n', 'array2: ', array2)
print('list2のタイプ: ', type(list2), '\n', 'array2のタイプ: ', type(array2))

array:  [1 2 3] 
 arrayのタイプ:  <class 'numpy.ndarray'> 

array_multi:  [[1 2 3]
 [4 5 6]
 [7 8 9]] 
 array_multiのタイプ <class 'numpy.ndarray'> 

array_multi[0]のタイプ:  <class 'numpy.ndarray'> 

array:  [1 2] 
 array_new:  [1 2 3] 

array_multi:  [[1 2]
 [3 4]] 
 array_multi_new [1 2 3 4 5 6] 

array_multi2:  [[1 2]
 [3 4]] 
 array_multi2_new:  [[1 2]
 [3 4]
 [5 6]] 

array_multi2_new[0]のタイプ:  <class 'numpy.ndarray'> 
 array_muulti2_new[2]のタイプ <class 'numpy.ndarray'> 

------list->numpyのテスト------
list1: [1, 2, 3] 
 array1:  [1 2 3]
list1のタイプ:  <class 'list'> 
 array1のタイプ:  <class 'numpy.ndarray'> 

list2: [[1, 2], [3, 4]] 
 array2:  [[1 2]
 [3 4]]
list2のタイプ:  <class 'list'> 
 array2のタイプ:  <class 'numpy.ndarray'>


In [None]:
a = [[]]
a.append([[9]])

print(a)


[[], [[9]]]


In [None]:
import numpy as np

result = np.zeros(shape=(5,5))
print(result)
result[2][3] = 1
print(result)

a = np.array([[1,2], [3,4], [5,6]])
for i in a:
    print(i)

[[0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0.]]
[[0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0.]
 [0. 0. 0. 1. 0.]
 [0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0.]]
[1 2]
[3 4]
[5 6]


In [None]:
ai = 44
print(f'kakikuke{str(ai)}ko')

kakikuke44ko
