# Pytorch RL - DQN(cartpole)

In [None]:
!pip3 install gymnasium[classic_control]

In [27]:
import gymnasium as gym
import math
import random
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple, deque
from itertools import count

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

env = gym.make("CartPole-v1")

# set up matplotlib
is_ipython = 'inline' in matplotlib.get_backend() # interactive python 백엔드인지 확인하는 코드
if is_ipython:
    from IPython import display

plt.ion() # plot이 다이나믹하게 matplotlib과 상호작용 할 수 있게 해주는 설정

# if GPU is to be used
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [4]:
device

device(type='cuda')

## Replay Memory

### Replay Memory란?

에이전트가 경험하는 관측의 전환(transition)을 저장하고, 나중에 재사용할 수 있게 하는 기법이다. Experience Replay memory에서 무작위로 전환을 추출함으로서, 배치를 구성하는 전환이 서로 자동 상관관계를 갖지 않게 할 수 있다. 이는 DQN이 학습하는 과정을 매우 향상시키는 것으로 알려져 있다.

이를 구현하기 위해서는 2개의 클래스가 필요하다.

* transition : 환경에서 발생하는 하나의 전환을 표현하는 `named tuple`객체 입니다.
* ReplayMemory : 가장 최근에 관측된 전환을 저장하는 주기적인 버퍼입니다. 랜덤 추출을 위해 `.sample()`메소드를 구현해야 합니다.



In [25]:
Transition = namedtuple('Trainsition',
                        ('state', 'action', 'next_state', 'reward') # (S_t, a_t, S_{t+1}, r)
)

In [26]:
class ReplayMemory(object):
  def __init__(self, capacity):
     self.memory = deque([], maxlen=capacity)
  def push(self, *args):
    "save a transition"
    self.memory.append(Transition(*args))
  def sample(self, batch_size):
    return random.sample(self.memory, batch_size)
  def __len__(self):
    return len(self.memory)

#DQN 알고리즘

Q-learning의 핵심아래와 같다


 \begin{align}
 (1)\\
 Q^* : State
\times Action → R
\end{align}


(1)을 통해서 보상이 어떤 형태일지 알 수 있다. 그리고 이를 통해 보상을 최대화하는 정책을 만들어낼 수 있다.

\begin{align}
(2)\\
π^*(s) =  \rm{arg}\max_a Q^* (s, a)
\end{align}

(2) 하지만 환경 전체를 관측하는 것은 불가능하므로 실제로 우리는 $Q^*$ 함수에 접근 할 수 없다. 하지만 UFA(universal function approximator) 정리를 근거로 우리는 실제  $Q^*$를 근사하는 신경망을 만들어낼 수 있다.

\begin{align}
(3)\\
Q^{π}(s, a) = r + \gamma Q^{\pi}( \acute{s} , π( \acute{s}))
\end{align}

(3) 훈련 업데이트 룰을 위해 특정 정책에 관한 모든 $Q$ 함수는 벨만 방정식을 따른다는 사실을 활용할 것이다.

\begin{align}
(4)\\
\delta = Q (s, a) - (r + \gamma \max_a^{'} Q (\acute{s}, a))
\end{align}

(4)의 등호 양쪽에 있는 차이를 시차 오차라고 한다.


\begin{align}
(5)\\
\mathcal{L} = \frac{1}{|B|}\sum_{(s, a, s', r) \ \in \ B} \mathcal{L}(\delta)
\end{align}

\begin{align}
\text{where} \quad \mathcal{L}(\delta) = \begin{cases}
     \frac{1}{2}{\delta^2}  & \text{for } |\delta| \le 1, \\
     |\delta| - \frac{1}{2} & \text{otherwise.}
   \end{cases}
   \end{align}

   (5) 오차를 해결하기 위한 로스 함수로 Huber loss를 사용할 것이다. Huber loss는 오차가 작을 때는 MSE 처럼 동작하지만 오차가 크면 MAE처럼 동작한다. 이는 $Q$ 함수가 매우 많은 노이즈를 가지고 있을 때 아웃라이어에도 견고하게 동작할 수 있게 한다. 이때, 전환의 배치인 B를 분모로 두어서 계산한다.
  

## Q-network

간단한 feed forward 네크워크를 통해 q-network를 구현할 것이다.
* input : 현재 스크린과 이전스크린의 차이
*output : Q(s, left), Q(s, right) - 이는 상태에 대한 액션


In [16]:
class DQN(nn.Module):
  def __init__(self, n_observations, n_actions):
    super(DQN, self).__init__()
    self.layer1 = nn.Linear(n_observations, 128)
    self.layer2 = nn.Linear(128, 128)
    self.layer3 = nn.Linear(128, n_actions)

  def forward(self, x):
    x = F.relu(self.layer1(x))
    x = F.relu(self.layer2(x))
    return self.layer3(x)

## Training

### 하이퍼파라미터와 유틸리티

아래의 셀은 모델과 옵티마이저를 인스턴스화 할 것이다, 그리고 몇가지 도구들을 정의할 것이다.

* select_action : 입실론 그리디 정책에 따라 액션을 선택하는 도구이다. 때때로는 모델이 액션을 선택하도록 하고 때때로는 균일하게 하나의 액션을 랜덤하게 고를 것이다. 랜덤한 액션을 고르는 확률은 `EPS_START` 에서 시작해서 지수적으로 `EPS_END`로 감쇠할 것이다. `EPS_ DECAY`는 감쇠의 속도를 제어하는 인자이다.
* `plot_duration` :  이전의 100개의 에피소드의 평균값 - eval과정에서의 측정값 - 과 함께 에피소드의 지속을 표시하는 helper입니다. 해당 플롯은 트레이닝 루프가 포함된 셀 아래에 표시되고 에피소드마다 업데이트 됩니다.

In [23]:
# BATCH_SIZE 는 리플레이 버퍼에서 샘플되는 전환의 숫자입니다.
# GAMMA 는 할인율입니다.
# EPS_START 입실론의 초깃값입니다.
# EPS_END 입실론의 마지막 값입니다.
# EPS_DECAY 입실론이 지수적으로 감쇠하는 속도를 제어합니다.
# TAU 는 타겟 네트워크를 업데이트 하는 비율입니다.
# LR 학습률입니다.
BATCH_SIZE = 128
GAMMA = 0.99
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 1000
TAU = 0.005
LR = 1e-4

In [28]:
# 액션의 가짓수. 이산 액션이며 갯수는 2이다.
n_actions = env.action_space.n

#GYM의 상태 관측 값을 갖고 온다.
state, info = env.reset()

# state[ 0.03291552  0.02558019 -0.0495155   0.0120102 ], 크기 4의 리스트이다.
# info 는 비워져있는 딕트 혹은 셋이다.

n_observations = len(state) # 관측의 갯수


policy_net = DQN(n_observations, n_actions).to(device)
target_net = DQN(n_observations, n_actions).to(device) # 부록
target_net.load_state_dict(policy_net.state_dict()) # 폴리시 네트워크 파라미터를 복사하여 갖고 있는다

optimizer = optim.AdamW(policy_net.parameters(), lr=LR, amsgrad=True)
memory = ReplayMemory(10000) # replace 메모리 큐의 최대 크기 선언

steps_done = 0


### 부록 : 타겟 네트워크가 필요한 이유

>Q-learning에서는 추측으로 추측을 업데이트 한다. - from reference -

Q-learning의 핵심 아이디어는 Q-value 함수를 반복적으로 업데이트하여 최적의 Q-values를 찾는 것이다. 이건 매우 위험한 상관관계를 잠재적으로 유발할 수 있다. 즉, Q-value 업데이트 중에 나타나는 값들이 서로 상호 의존적이라는 점이다.

벨만 방정식은  $Q(s^{'}, a^{'})$를 통해 $Q(s, a)$의 값을 제공한다. s와 s' 사이에는 단 한단계의 차이만 있기 때문에 네트워크가 이를 구분하는 것은 쉽지 않다.

이러한 상호 의존성은 학습 과정에서 불안정성을 야기할 수 있다. 예를 들어, Q-value를 업데이트하면서 변화한 값들이 다음 업데이트에 영향을 미치면서 값들이 불규칙하게 수렴하거나 수렴하지 않을 수 있기 때문이다. 이런 문제를 해결하기 위해 타겟 네트워크(target network)라는 개념이 도입되었다.

타겟 네트워크는 현재 학습 중인 Q-value 함수와 별개로 존재하는 네트워크다. 주요한 역할은 안정성을 제공하는 것이다. Q-value 업데이트 시에는 타겟 네트워크의 Q-values를 사용하여 업데이트를 수행합니다. 그러나 **실제 학습은 현재의 Q-value 함수**를 기반으로 이루어진다. 이렇게 함으로써 업데이트가 안정적으로 이루어지며, 상호 의존성으로 인한 불안정성을 줄일 수 있다.

즉, Q-value 함수를 업데이트하기 위해 타겟 네트워크의 q-value를 사용하여 Q-value 함수의 파라미터를 업데이틑 시키는 것이다.

*Reference*

[dqn-targetnetwork](https://towardsdatascience.com/deep-q-network-dqn-ii-b6bf911b6b2c)



In [31]:

def select_action(state):
    global steps_done
    sample = random.random()
    #
    eps_threshold = EPS_END + (EPS_START - EPS_END) * \
        math.exp(-1. * steps_done / EPS_DECAY)
    steps_done += 1
    if sample > eps_threshold: # 입실론 비율을 넘어서면
        with torch.no_grad():
          # max(1)을 통해서 각 로우의 가장 큰 칼럼 값이 반환 될 것이다.
          # max 메소드를 통해 나온 결과의 두 번째 칼럼은 최대 요소가 존재하던 인덱스이다.
          # 따라서 이를 통해 더 커다란 기대 보상을 얻을 수 있다.
            return policy_net(state).max(1)[1].view(1, 1)
    else:
        return torch.tensor([[env.action_space.sample()]], device=device, dtype=torch.long)


In [32]:
episode_durations = []

def plot_durations(show_result=False):
    plt.figure(1)
    durations_t = torch.tensor(episode_durations, dtype=torch.float)
    if show_result:
        plt.title('Result')
    else:
        plt.clf()
        plt.title('Training...')
    plt.xlabel('Episode')
    plt.ylabel('Duration')
    plt.plot(durations_t.numpy())
    # 100개의 에피소드의 평균을 가져와 그리기
    if len(durations_t) >= 100:
        means = durations_t.unfold(0, 100, 1).mean(1).view(-1)
        means = torch.cat((torch.zeros(99), means))
        plt.plot(means.numpy())

    plt.pause(0.001)
    if is_ipython:
        if not show_result:
            display.display(plt.gcf())
            display.clear_output(wait=True)
        else:
            display.display(plt.gcf())

## 훈련 루프

1) 배치 샘플링

2) 텐서를 concat 하나로 만듦

3) $Q(S_t, a_t)$ 와 $V(S_{t+1}) = \max_a Q(s_{t+1}, a)$ 계산

4) (3)의 값을 결합하여 로스 함수에 넣음

5) 정의에 의해, 만약 입실론이 종료 상태에 다다르면 $V(s)= 0$

6) 추가적인 안정성을 위해 타겟 네트워크를 사용하여 $V(s_{t+1})$ 계산


In [None]:
def optimize_model():
    if len(memory) < BATCH_SIZE:
        return
    transitions = memory.sample(BATCH_SIZE) # (1) 배치 샘플링
    batch = Transition(*zip(*transitions)) ## 전환을 상태는 상태끼리, 액션은 액션끼리, 다음 상태는 다음 상태 끼리, 보상은 보상끼리 묶어줌

    non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
                                          batch.next_state)), device=device, dtype=torch.bool)
    non_final_next_states = torch.cat([s for s in batch.next_state
                                                if s is not None])
    state_batch = torch.cat(batch.state)
    action_batch = torch.cat(batch.action)
    reward_batch = torch.cat(batch.reward)

    # Compute Q(s_t, a) - the model computes Q(s_t), then we select the
    # columns of actions taken. These are the actions which would've been taken
    # for each batch state according to policy_net
    state_action_values = policy_net(state_batch).gather(1, action_batch)

    # Compute V(s_{t+1}) for all next states.
    # Expected values of actions for non_final_next_states are computed based
    # on the "older" target_net; selecting their best reward with max(1)[0].
    # This is merged based on the mask, such that we'll have either the expected
    # state value or 0 in case the state was final.
    next_state_values = torch.zeros(BATCH_SIZE, device=device)
    with torch.no_grad():
        next_state_values[non_final_mask] = target_net(non_final_next_states).max(1)[0]
    # Compute the expected Q values
    expected_state_action_values = (next_state_values * GAMMA) + reward_batch

    # Compute Huber loss
    criterion = nn.SmoothL1Loss()
    loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1))

    # Optimize the model
    optimizer.zero_grad()
    loss.backward()
    # In-place gradient clipping
    torch.nn.utils.clip_grad_value_(policy_net.parameters(), 100)
    optimizer.step()