# 5주차 Dueling DQN


## [핵심 목표]

* Dueling DQN 개념 이해 및 Q-함수 구조 개선으로 학습 효율 증대
* Value Stream과 Advantage Stream 역할 이해 및 Dueling DQN 구현

## [Dueling DQN 아이디어]

* Q함수를 Value Stream과 Advantage Stream으로 분리
    * **Value Stream ($V(s)$)**: 상태 가치 예측, 상태 $s$의 좋음/나쁨 정도
    * **Advantage Stream ($A(s,a)$)**: 각 행동 Advantage 예측, 상태 $s$에서 행동 $a$ 선택 시 **평균 행동 대비** 얼마나 좋은지
* Q함수: Value Stream과 Advantage Stream 결합, Q값 계산
* 수식: $Q(s,a) = V(s) + A(s,a)$

## [Dueling DQN 장점]

* 상태 가치와 행동 Advantage 분리 학습, Q값 추정 정확도 향상
* **Value Stream**: 상태에 대한 일반 정보 학습, 다양한 행동에 대한 Q값 효율적 추정
* **Advantage Stream**: 특정 행동이 다른 행동보다 나은 정도 학습, 행동 간 미세 차이 구별 용이

## [Dueling DQN 네트워크]
* 네트워크 구조
\begin{array}{c}
\boxed{\text{State }(s)} \\
\downarrow \\
\boxed{\text{Feature Extraction }} \\
\swarrow \quad \searrow \quad \quad \quad \\
\boxed{\begin{array}{c} \text{Value Stream} \\ \downarrow \\ V(s) \end{array}} \quad
\boxed{\begin{array}{c} \text{Advantage Stream} \\ \downarrow \\ A(s, a_1), \dots, A(s, a_N) \end{array}} \\
\searrow \quad \swarrow \quad \quad \quad \\
\boxed{\text{Aggregation: } Q(s, a) = V(s) + \left(A(s, a) - \frac{1}{|\mathbb{A}|}\sum_{a'} A(s, a')\right)} \\
\downarrow \\
\boxed{Q(s, a_1), Q(s, a_2), \dots, Q(s, a_N)}
\end{array}

* 구성 요소

  1.  **State (s) (입력):**
      *   환경의 상태 정보 (예: 게임 화면, 센서 데이터)

  2.  **Feature Extraction (Shared Layers):**
      *   **종류:** Convolutional Layers (`Conv`, 이미지), Fully Connected Layers (`FC`, 벡터)
      *   **역할:** 입력 상태 $s$ $\rightarrow$ 유용한 특징 추출

  3.  **Value Stream (Separate Layers):**
      *   **종류:** 주로 Fully Connected Layers (`FC`)
      *   **역할:** 상태 가치 ($V(s)$) 추정
      *   **출력:** 단일 값 (scalar)

  4.  **Advantage Stream (Separate Layers):**
      *   **종류:** 주로 Fully Connected Layers (`FC`)
      *   **역할:** 각 행동의 이점 ($A(s, a)$) 추정
      *   **출력:** 행동 개수만큼의 값

  5.  **Aggregation (결합):**

      *   **수식:** $Q(s, a) = V(s) + \left( A(s, a) - \frac{1}{|\mathbb{A}|}\sum_{a'} A(s, a') \right)$
        * 안정성 위해 평균 Advantage 빼줌
        * $|\mathbb{A}|$: 가능한 행동의 개수
      *   **역할:** $V(s) + A(s, a) =$ 최종 Q-value


## [Dueling DQN 학습 과정]


### 1. Q값 계산

* 현재 Q값
  * 메인 네트워크($\theta$)를 사용하여 현재 상태에 대한 Q값을 계산
  * $Q(s, a; \theta)$
* 타겟 Q값
  * 타겟 네트워크($\theta^-$)를 사용하여 다음 상태에서 최대 Q값을 계산
  * $y = r + \gamma \underset{a'}{max} \text{ } Q(s', a'; \theta^-)$
    * $\gamma$: 할인율

### 2. 손실 함수 계산

* MSE 손실
  * 계산된 현재 Q값과 타겟 Q값 사이의 MSE 손실을 계산
  * $L(\theta) = \mathbb{E}[(Q(s, a; \theta) - y)^2]$

### 3. 네트워크 업데이트

* 역전파
  * 계산된 손실 $L(\theta)$을 사용하여 메인 네트워크의 파라미터 $\theta$를 업데이트
  * $\theta \leftarrow \theta - \alpha \nabla_{\theta} L(\theta)$
    * $\alpha$: 학습률

### 4. 타겟 네트워크 업데이트

* 일정 스텝(step)마다 메인 네트워크의 파라미터를 타겟 네트워크에 복사하여 타겟 네트워크 업데이트합니다.
* $\theta^- \leftarrow \theta$


## [실습: Dueling DQN을 이용한 CartPole 실습]


### 1. 라이브러리 설치 및 가져오기
* `gym`: CartPole 환경 제공
* `torch`: 딥러닝 모델 및 연산 처리
* `numpy`: 배열 및 수치 연산
* `collections.deque`: 경험 리플레이 버퍼 구현

In [1]:
import gymnasium as gym
import torch
import torch.nn as nn
import torch.optim as optim
import random
from collections import deque
import numpy as np
import imageio

### 2. GPU 설정 및 하이퍼파라미터 정의

*   **`BATCH_SIZE`**: 한 번 학습에 사용하는 샘플 수
*   **`GAMMA`**: 미래 보상 할인 정도 (0~1)
*   **`EPS_START`**: 초기 무작위 행동 확률
*   **`EPS_END`**: 최종 무작위 행동 확률
*   **`EPS_DECAY`**: 무작위 행동 확률 감소 속도
*   **`TARGET_UPDATE`**: Target Network 업데이트 간격
*   **`MEMORY_SIZE`**: Replay Buffer 저장 용량
*   **`LR`**: 가중치 업데이트 크기 (학습률)
*   **`NUM_EPISODES`**: 총 학습 에피소드 횟수
*   **`RENDER_EPISODE`**: 통계 출력 주기


In [2]:
# GPU 사용 가능 여부 확인 및 장치 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 하이퍼파라미터
BATCH_SIZE = 128
GAMMA = 0.99
EPS_START = 1.0
EPS_END = 0.01
EPS_DECAY = 0.995
TARGET_UPDATE = 5
MEMORY_SIZE = 30000
LR = 0.0003
NUM_EPISODES = 2000
PROGRESS_INTERVAL = 50

### 3. Dueling DQN 네트워크 정의

* **`DuelingDQN` 클래스**: PyTorch의 `nn.Module`을 상속받아 네트워크 구성.
* **`feature_layer`**: 입력 상태(state)를 처리하는 공통 특징 추출 레이어.
* **`value_stream`**: 상태의 가치(Value, V)를 추정하는 레이어.
* **`advantage_stream`**: 각 행동의 상대적 이점(Advantage, A)을 추정하는 레이어.
* **`forward` 메서드**: Value와 Advantage를 결합하여 최종 Q-value를 계산.

* **(Q 값 계산)**:
$Q(s, a) = V(s) + \left(A(s, a) - \frac{1}{|\mathbb{A}|}\sum_{a'} A(s, a')\right)$
  * $Q(s, a)$: 상태 $s$에서 행동 $a$를 선택했을 때의 Q-value
  * $V(s)$: 상태 $s$의 가치(Value)
  * $A(s, a)$: 상태 $s$에서 행동 $a$의 이점(Advantage)
  * $|\mathbb{A}|$: 행동 공간의 크기 (가능한 행동의 개수)

In [3]:
class DuelingDQN(nn.Module):
    def __init__(self, state_size, action_size, hidden_size):
        super(DuelingDQN, self).__init__()
        self.feature_layer = nn.Sequential(
            nn.Linear(state_size, hidden_size),
            nn.ReLU()
        )
        self.value_stream = nn.Sequential(
            nn.Linear(hidden_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, 1)
        )
        self.advantage_stream = nn.Sequential(
            nn.Linear(hidden_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, action_size)
        )

    def forward(self, state):
        x = self.feature_layer(state)
        value = self.value_stream(x)
        advantage = self.advantage_stream(x)
        q_values = value + (advantage - advantage.mean(dim=1, keepdim=True))
        return q_values

### 4. Replay Buffer 정의

* **`ReplayBuffer` 클래스**: `deque`를 사용하여 고정된 크기의 버퍼를 생성
* **`push` 메서드**: 새로운 경험 (state, action, reward, next\_state, done)을 버퍼에 추가
* **`sample` 메서드**: 버퍼에서 무작위로 `batch_size`만큼의 경험을 샘플링하여 PyTorch 텐서로 변환 (GPU 사용 고려)
* **`__len__` 메서드**: 버퍼에 저장된 경험의 개수를 반환

In [4]:
class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)

    def push(self, state, action, reward, next_state, done):
        state = np.array(state)
        next_state = np.array(next_state)
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        state, action, reward, next_state, done = zip(*random.sample(self.buffer, batch_size))
        return torch.tensor(np.array(state), dtype=torch.float32, device=device), \
           torch.tensor(np.array(action), dtype=torch.int64, device=device), \
           torch.tensor(np.array(reward), dtype=torch.float32, device=device), \
           torch.tensor(np.array(next_state), dtype=torch.float32, device=device), \
           torch.tensor(np.array(done), dtype=torch.float32, device=device)

    def __len__(self):
        return len(self.buffer)

### 5. 학습 함수 정의
* **Epsilon-Greedy**: 탐험(Exploration)과 활용(Exploitation)의 균형을 맞추기 위해 사용
  * `epsilon` 값은 `EPS_START`에서 시작하여 `EPS_END`를 향해 지수적으로 감소(`EPS_DECAY` 사용)
  * 무작위 값이 `epsilon`보다 크면, 학습된 정책에 따라 행동 선택 (Exploitation)
  * 무작위 값이 `epsilon`보다 작거나 같으면, 무작위 행동 선택 (Exploration)

* **Replay Buffer**:
  * `push`: 매 스텝마다 얻은 경험(transition)을 Replay Buffer에 저장
  * `sample`: Replay Buffer에서 미니배치 크기만큼의 경험을 무작위로 추출하여 학습에 사용

* **Target Network**:
  * 학습의 안정성을 높이기 위해 사용
  * `TARGET_UPDATE` 에피소드마다 `policy_net`의 가중치를 `target_net`으로 복사

* **Loss Function (MSE Loss)**:
  * TD Error(Temporal Difference Error)를 최소화하는 방향으로 학습
  * TD Target: $r + \gamma \max_{a'} Q_{\text{target}}(s', a')$
  * Loss: $\text{MSE}(Q(s, a), r + \gamma \max_{a'} Q_{\text{target}}(s', a'))$
    * $Q(s, a)$: `policy_net`을 사용하여 예측한 Q-value
    * $r$: 현재 스텝에서 얻은 보상
    * $\gamma$: 할인율 (Discount Factor)
    * $\max_{a'} Q_{\text{target}}(s', a')$: `target_net`을 사용하여 계산한 다음 상태($s'$)의 최대 Q-value

In [5]:
def train(env, policy_net, target_net, optimizer, replay_buffer):
    all_rewards = []

    for episode in range(NUM_EPISODES):
        state, _ = env.reset()
        total_reward = 0
        done = False

        while not done:
            # Epsilon-Greedy Exploration
            epsilon = max(EPS_END, EPS_START * (EPS_DECAY ** episode))
            if random.random() > epsilon:
                with torch.no_grad():
                    state_tensor = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
                    q_values = policy_net(state_tensor)
                    action = q_values.argmax(dim=1).item()
            else:
                action = env.action_space.sample()

            next_state, reward, terminated, truncated, _ = env.step(action)
            total_reward += reward
            done = terminated or truncated

            # Replay Buffer에 경험 저장
            replay_buffer.push(state, action, reward, next_state, done)
            state = next_state

            # Replay Buffer에서 샘플링 및 학습
            if len(replay_buffer) >= BATCH_SIZE:
                state_batch, action_batch, reward_batch, next_state_batch, done_batch = replay_buffer.sample(BATCH_SIZE)

                # Double DQN Target Q-value 계산
                with torch.no_grad():
                    next_actions = policy_net(next_state_batch).argmax(dim=1, keepdim=True)
                    next_q_values = target_net(next_state_batch).gather(1, next_actions).squeeze(1)
                    target_q_values = reward_batch + (1 - done_batch) * GAMMA * next_q_values

                # 현재 Q-value 계산
                q_values = policy_net(state_batch)
                q_values_for_actions = q_values.gather(1, action_batch.unsqueeze(1)).squeeze(1)

                # Loss 계산 및 최적화
                loss = nn.functional.mse_loss(q_values_for_actions, target_q_values)
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

        all_rewards.append(total_reward)

        # Target Network 업데이트
        if (episode+1) % TARGET_UPDATE == 0:
            target_net.load_state_dict(policy_net.state_dict())

        # 평균 리워드 출력
        if (episode+1) % PROGRESS_INTERVAL == 0:
            print(f"Episode {episode+1}: Average Reward over last {PROGRESS_INTERVAL} episodes = {np.mean(all_rewards[-PROGRESS_INTERVAL:])}")

        # 오랜 기간 리워드가 충분히 크다면 학습 종료
        if np.min(all_rewards[-TARGET_UPDATE*2:]) >= 450:
            print(f"Solved in {episode+1} episodes with reward {total_reward}!")
            break


### 6. 학습 실행 및 결과 확인

  * **`imageio`**: GIF 이미지 생성을 위한 라이브러리
  * 환경 초기화: `gym.make("CartPole-v1")`로 CartPole 환경을 생성
  * 네트워크 생성: `policy_net` (학습용), `target_net` (안정성 향상)을 생성하고 GPU/CPU로 이동
  * 옵티마이저, Replay Buffer: `Adam` 옵티마이저와 `ReplayBuffer` 객체를 생성
  * `train` 함수 호출: 학습을 시작
  * **GIF 생성**: `imageio`를 사용하여 학습된 에이전트의 플레이를 GIF로 저장
  * `env.close()`: 환경을 종료


In [6]:
env = gym.make("CartPole-v1", render_mode = 'rgb_array')
state_size = env.observation_space.shape[0]
action_size = env.action_space.n
hidden_size = 128

policy_net = DuelingDQN(state_size, action_size, hidden_size).to(device)
target_net = DuelingDQN(state_size, action_size, hidden_size).to(device)
target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

optimizer = optim.Adam(policy_net.parameters(), lr=LR)
replay_buffer = ReplayBuffer(MEMORY_SIZE)

train(env, policy_net, target_net, optimizer, replay_buffer)

Episode 50: Average Reward over last 50 episodes = 27.6
Episode 100: Average Reward over last 50 episodes = 42.24
Episode 150: Average Reward over last 50 episodes = 93.7
Episode 200: Average Reward over last 50 episodes = 79.7
Episode 250: Average Reward over last 50 episodes = 116.52
Episode 300: Average Reward over last 50 episodes = 107.16
Episode 350: Average Reward over last 50 episodes = 128.58
Episode 400: Average Reward over last 50 episodes = 122.18
Episode 450: Average Reward over last 50 episodes = 185.16
Episode 500: Average Reward over last 50 episodes = 168.18
Episode 550: Average Reward over last 50 episodes = 291.82
Solved in 575 episodes with reward 500.0!


In [7]:
# 렌더링 및 GIF 저장
frames = []
state, _ = env.reset()
done = False
total_reward = 0
while not done:
    with torch.no_grad():
        state_tensor = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
        q_values = policy_net(state_tensor)
        action = q_values.argmax(dim=1).item()
    next_state, reward, terminated, truncated, _ = env.step(action)
    total_reward += reward
    state = next_state
    done = terminated or truncated
    frames.append(env.render())

imageio.mimsave('cartpole_simulation_dueling_double_DQN.gif', frames, duration=33)
print(f"Final episode reward: {total_reward}")
env.close()

Final episode reward: 399.0
