## 7.3 DQN 구현

In [7]:
import warnings

warnings.filterwarnings(action='ignore') 

import argparse
import torch
import torch.nn
import numpy as np
import random
import gym
from collections import namedtuple
from collections import deque
from typing import List, Tuple

class DQN(torch.nn.Module):
    def __init__(self, input_dim: int, output_dim: int, hidden_dim: int) -> None:
        super(DQN, self).__init__()

        self.layer1 = torch.nn.Sequential(
            torch.nn.Linear(input_dim, hidden_dim),
            torch.nn.BatchNorm1d(hidden_dim),
            torch.nn.PReLU()
        )

        self.layer2 = torch.nn.Sequential(
            torch.nn.Linear(hidden_dim, hidden_dim),
            torch.nn.BatchNorm1d(hidden_dim),
            torch.nn.PReLU()
        )

        self.final = torch.nn.Linear(hidden_dim, output_dim)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.final(x)

        return x


Transition = namedtuple("Transition",
                        field_names=["state", "action", "reward", "next_state", "done"])


class ReplayMemory(object):

    def __init__(self, capacity: int) -> None:
        self.capacity = capacity
        self.cursor = 0
        self.memory = []

    def push(self,
             state: np.ndarray,
             action: int,
             reward: int,
             next_state: np.ndarray,
             done: bool) -> None:

        if len(self) < self.capacity:
            self.memory.append(None)

        self.memory[self.cursor] = Transition(state,
                                              action, reward, next_state, done)
        self.cursor = (self.cursor + 1) % self.capacity

    def pop(self, batch_size: int) -> List[Transition]:
        return random.sample(self.memory, batch_size)

    def __len__(self) -> int:
        return len(self.memory)


class Agent(object):

    def __init__(self, input_dim: int, output_dim: int, hidden_dim: int) -> None:
        self.dqn = DQN(input_dim, output_dim, hidden_dim)
        self.input_dim = input_dim
        self.output_dim = output_dim

        self.loss_fn = torch.nn.MSELoss()
        self.optim = torch.optim.Adam(self.dqn.parameters())

    def _to_variable(self, x: np.ndarray) -> torch.Tensor:
        return torch.autograd.Variable(torch.Tensor(x))

    def get_action(self, states: np.ndarray, eps: float) -> int:
        if np.random.rand() < eps:
            return np.random.choice(self.output_dim)
        else:
            self.dqn.train(mode=False)
            scores = self.get_Q(states)
            _, argmax = torch.max(scores.data, 1)
            return int(argmax.numpy())

    def get_Q(self, states: np.ndarray) -> torch.FloatTensor:
        states = self._to_variable(states.reshape(-1, self.input_dim))
        self.dqn.train(mode=False)
        return self.dqn(states)

    def train(self, Q_pred: torch.FloatTensor, Q_true: torch.FloatTensor) -> float:
        self.dqn.train(mode=True)
        self.optim.zero_grad()
        loss = self.loss_fn(Q_pred, Q_true)
        loss.backward()
        self.optim.step()

        return loss


def train_helper(agent: Agent, minibatch: List[Transition], gamma: float) -> float:
    states = np.vstack([x.state for x in minibatch])
    actions = np.array([x.action for x in minibatch])
    rewards = np.array([x.reward for x in minibatch])
    next_states = np.vstack([x.next_state for x in minibatch])
    done = np.array([x.done for x in minibatch])

    Q_predict = agent.get_Q(states)
    Q_target = Q_predict.clone().data.numpy()
    Q_target[np.arange(len(Q_target)), actions] = rewards + gamma * np.max(agent.get_Q(next_states).data.numpy(), axis=1) * ~done
    Q_target = agent._to_variable(Q_target)

    return agent.train(Q_predict, Q_target)


def play_episode(env: gym.Env,
                 agent: Agent,
                 replay_memory: ReplayMemory,
                 eps: float,
                 batch_size: int) -> int:

    s = env.reset()
    done = False
    total_reward = 0

    while not done:

        a = agent.get_action(s, eps)
        s2, r, done, info = env.step(a)

        total_reward += r

        if done:
            r = -1
        replay_memory.push(s, a, r, s2, done)

        if len(replay_memory) > batch_size:

            minibatch = replay_memory.pop(batch_size)
            train_helper(agent, minibatch, 0.99)

        s = s2

    return total_reward


def get_env_dim(env: gym.Env) -> Tuple[int, int]:
    input_dim = env.observation_space.shape[0]
    output_dim = env.action_space.n

    return input_dim, output_dim


def epsilon_annealing(epsiode: int, max_episode: int, min_eps: float) -> float:
    slope = (min_eps - 1.0) / max_episode
    return max(slope * epsiode + 1.0, min_eps)


def main():
    """Main
    """
    try:
        env = gym.make("CartPole-v0")
        #env = gym.wrappers.Monitor(env, directory="monitors", force=True)
        rewards = deque(maxlen=10)
        input_dim, output_dim = get_env_dim(env)
        agent = Agent(input_dim, output_dim, 32)
        replay_memory = ReplayMemory(50000)

        for i in range(200):
            eps = epsilon_annealing(i, 50, 0.01)
            r = play_episode(env, agent, replay_memory, eps, 64)
            print("[Episode: {:5}] Reward: {:5} 𝜺-greedy: {:5.2f}".format(i + 1, r, eps))

            rewards.append(r)

            if len(rewards) == rewards.maxlen:

                if np.mean(rewards) >= 200:
                    print("10 에피소드 연속 성공")
                    print("Mean reward: {}".format(np.mean(rewards)))
                    break
    finally:
        env.close()


if __name__ == '__main__':
    main()

[2020-01-07 00:41:06,337] Making new env: CartPole-v0


[Episode:     1] Reward:  27.0 𝜺-greedy:  1.00
[Episode:     2] Reward:  12.0 𝜺-greedy:  0.98
[Episode:     3] Reward:  17.0 𝜺-greedy:  0.96
[Episode:     4] Reward:  32.0 𝜺-greedy:  0.94
[Episode:     5] Reward:  38.0 𝜺-greedy:  0.92
[Episode:     6] Reward:  40.0 𝜺-greedy:  0.90
[Episode:     7] Reward:  22.0 𝜺-greedy:  0.88
[Episode:     8] Reward:  11.0 𝜺-greedy:  0.86
[Episode:     9] Reward:  15.0 𝜺-greedy:  0.84
[Episode:    10] Reward:  15.0 𝜺-greedy:  0.82
[Episode:    11] Reward:  47.0 𝜺-greedy:  0.80
[Episode:    12] Reward:  12.0 𝜺-greedy:  0.78
[Episode:    13] Reward:  11.0 𝜺-greedy:  0.76
[Episode:    14] Reward:  15.0 𝜺-greedy:  0.74
[Episode:    15] Reward:  28.0 𝜺-greedy:  0.72
[Episode:    16] Reward:  11.0 𝜺-greedy:  0.70
[Episode:    17] Reward:  15.0 𝜺-greedy:  0.68
[Episode:    18] Reward:  29.0 𝜺-greedy:  0.66
[Episode:    19] Reward:  15.0 𝜺-greedy:  0.64
[Episode:    20] Reward:  33.0 𝜺-greedy:  0.62
[Episode:    21] Reward:  21.0 𝜺-greedy:  0.60
[Episode:    