# OpenAI Gym

In [1]:
import gym

env = gym.make('CartPole-v0')

In [2]:
state = env.reset()
print(state)  # カートの位置、速度、棒の角度、角速度

action_space = env.action_space 
print(action_space)  # 行動の次元数

[-0.04039582  0.00243626 -0.03664973  0.00888709]
Discrete(2)


In [3]:
action = 0  # or 1
next_state, reward, done, info = env.step(action)
print(next_state)

[-0.0403471  -0.19214144 -0.03647199  0.28978503]


In [4]:
import gym 
import time

env = gym.make('CartPole-v0')
state = env.reset()
done = False

while not done:
    env.render()
    time.sleep(0.1)
    action = env.action_space.sample()
    print(action)
    next_state, reward, done, info = env.step(action)
env.close()

1
0
1
0
1
0
1
1
1
0
0
0
0
1
0
0
0
0
0
1
1
0
1
0
0
0
0
1
0
0
1


# DQNのコア技術

### 経験再生
今の経験と、1秒後の経験の間に相関関係があるので、うまく学習できない。経験をいったんバッファに保存して、ランダムに取り出して学習する。

In [5]:
from collections import deque
import random
import numpy as np

class ReplayBuffer:
    def __init__(self, buffer_size, batch_size):
        self.buffer = deque(maxlen=buffer_size) # 最大のサイズを指定
        self.batch_size = batch_size

    def add(self, state, action, reward, next_state, done):
        data = (state, action, reward, next_state, done)
        self.buffer.append(data)

    def __len__(self):
        return len(self.buffer)

    def get_batch(self):
        data = random.sample(self.buffer, self.batch_size)

        state = np.stack([x[0] for x in data])
        action = np.array([x[1] for x in data])
        reward = np.array([x[2] for x in data])
        next_state = np.stack([x[3] for x in data])
        done = np.array([x[4] for x in data]).astype(np.int)
        return state, action, reward, next_state, done

In [6]:
import gym

env = gym.make('CartPole-v0')
replay_buffer = ReplayBuffer(buffer_size=10000, batch_size=32)
state = env.reset()
done = False

for episode in range(10):
    while not done:
        action = 0
        next_state, reward, done, info = env.step(action)
        replay_buffer.add(state, action, reward, next_state, done)
    state = env.reset()
    done = False

state, action, reward, next_state, done = replay_buffer.get_batch()
print(state.shape)  # (32, 4)
print(action.shape)  # (32,)
print(reward.shape)  # (32,)
print(next_state.shape)  # (32, 4)
print(done.shape)  # (32,)

(32, 4)
(32,)
(32,)
(32, 4)
(32,)


Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  done = np.array([x[4] for x in data]).astype(np.int)


### ターゲットネットワーク
教師学習とは違い、正解ラベルがQ関数を更新すると変動する為適用する。モデルを用意して、定期的に重みを同期して、このモデルを用いてターゲットを計算する。つまり、ターゲットを一定期間固定するためのテクニック。

In [7]:
import copy
from dezero import Model
from dezero import optimizers
import dezero.functions as F
import dezero.layers as L

class QNet(Model):
    def __init__(self, action_size):
        super().__init__()
        self.l1 = L.Linear(128)
        self.l2 = L.Linear(128)
        self.l3 = L.Linear(action_size)

    def forward(self, x):
        x = F.relu(self.l1(x))
        x = F.relu(self.l2(x))
        x = self.l3(x)
        return x

class DQNAgent:
    def __init__(self):
        self.gamma = 0.98
        self.lr = 0.0005
        self.epsilon = 0.05
        self.buffer_size = 100000
        self.batch_size = 32
        self.action_size = 2

        self.replay_buffer = ReplayBuffer(self.buffer_size, self.batch_size)
        self.qnet = QNet(self.action_size)
        self.qnet_target = QNet(self.action_size)
        self.optimizer = optimizers.Adam(self.lr)
        self.optimizer.setup(self.qnet)  # qnetだけを更新する

    def sync_qnet(self):
        self.qnet_target = copy.deepcopy(self.qnet)

    def get_action(self, state):
        if np.random.rand() < self.epsilon:
            return np.random.choice(self.action_size)
        else:
            state = state[np.newaxis, :]  # バッチの次元を追加
            qs = self.qnet(state)
            return qs.data.argmax()
        
    def update(self, state, action, reward, next_state, done):
        self.replay_buffer.add(state, action, reward, next_state, done)
        if len(self.replay_buffer) < self.batch_size:
            return

        state, action, reward, next_state, done = self.replay_buffer.get_batch()
        qs = self.qnet(state)  # 32x4のstateを与える、qsは32x2(2つの行動のq関数の値)
        q = qs[np.arange(self.batch_size), action]  # 取った行動のq関数の値を取得

        next_qs = self.qnet_target(next_state)  # qnet_targetを使って更新
        next_q = next_qs.max(axis=1)
        next_q.unchain()
        td_target = reward + (1 - done) * self.gamma * next_q  # doneをマスクとしてターゲットを計算

        loss = F.mean_squared_error(q, td_target)

        self.qnet.cleargrads()
        loss.backward()
        self.optimizer.update()

In [None]:
episodes = 300
sync_interval = 20
env = gym.make('CartPole-v0')
agent = DQNAgent()
reward_log = []

for episode in range(episodes):
    state = env.reset()
    done = False
    sum_reward = 0

    while not done:
        action = agent.get_action(state)
        next_state, reward, done, info = env.step(action)

        agent.update(state, action, reward, next_state, done)
        state = next_state
        sum_reward += reward

    if episode % sync_interval == 0:
        agent.sync_qnet() # 20episodeごとに同期

    reward_log.append(sum_reward)
    if episode % 10 == 0:
        print("episode :{}, total reward : {}, epsilon: {}".format(episode, sum_reward, agent.epsilon))

episode :0, total reward : 10.0, epsilon: 0.05
episode :10, total reward : 10.0, epsilon: 0.05


Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  done = np.array([x[4] for x in data]).astype(np.int)


episode :20, total reward : 12.0, epsilon: 0.05
episode :30, total reward : 9.0, epsilon: 0.05
episode :40, total reward : 9.0, epsilon: 0.05
episode :50, total reward : 11.0, epsilon: 0.05
episode :60, total reward : 13.0, epsilon: 0.05
episode :70, total reward : 11.0, epsilon: 0.05
episode :80, total reward : 13.0, epsilon: 0.05
episode :90, total reward : 9.0, epsilon: 0.05
episode :100, total reward : 10.0, epsilon: 0.05
episode :110, total reward : 9.0, epsilon: 0.05
episode :120, total reward : 9.0, epsilon: 0.05
episode :130, total reward : 9.0, epsilon: 0.05
episode :140, total reward : 60.0, epsilon: 0.05
episode :150, total reward : 130.0, epsilon: 0.05
episode :160, total reward : 91.0, epsilon: 0.05
episode :170, total reward : 173.0, epsilon: 0.05
episode :180, total reward : 197.0, epsilon: 0.05
episode :190, total reward : 200.0, epsilon: 0.05
episode :200, total reward : 200.0, epsilon: 0.05
episode :210, total reward : 145.0, epsilon: 0.05
episode :220, total reward :

In [None]:
agent.epsilon = 0  # greedy policy
state = env.reset()
done = False
sum_reward = 0

while not done:
    action = agent.get_action(state)
    next_state, reward, done, info = env.step(action)
    state = next_state
    sum_reward += reward
    env.render()
print('Total Reward:', sum_reward)

In [None]:
import matplotlib.pyplot as plt
plt.plot(reward_log)