## **DQN Implementation with pytorch**

https://wikidocs.net/172566

In [30]:
# Import Libraries
import gymnasium as gym

import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import random
from collections import deque
from tqdm import tqdm

In [None]:
# Create Environment
env = gym.make("CartPole-v1")

In [None]:
print(env.action_space.n)
print(env.observation_space.shape)

2
(4,)


In [18]:
type(env.action_space)

In [23]:
env.action_space.sample()

0

In [24]:
env.reset()

(array([-0.03877578,  0.04721831,  0.01122632, -0.00451282], dtype=float32),
 {})

In [29]:
env.step(1) # cartpole에서 reward ->  1 per each time step

(array([-0.03783142,  0.24217747,  0.01113606, -0.29363266], dtype=float32),
 1.0,
 False,
 False,
 {})

In [61]:
# Implementation of Q-network
class Qnet(nn.Module):
  def __init__(self, state_size, action_size, hidden_size = 256):
    super(Qnet, self).__init__()

    self.layer1 = nn.Linear(state_size, hidden_size)     # Input : 4 parameters -> cart position, cart velocity, pole angle, pole velocity at tip
    self.layer2 = nn.Linear(hidden_size, hidden_size)
    self.layer3 = nn.Linear(hidden_size, action_size)   # Output : left or right
    self.act = nn.GELU()

  def forward(self, x):
    x = self.act(self.layer1(x))
    x = self.act(self.layer2(x))
    return self.layer3(x)

In [68]:
class DQNAgent():
  def __init__(self, env, buffer_limit = 2000, batch_size = 32, gamma = 0.99, lr = 0.001):
    self.env = env
    self.buffer_limit = buffer_limit
    self.replay_buffer = deque(maxlen = self.buffer_limit) # Replay Buffer for Replay Memory
    self.q_network = Qnet(
        state_size = self.env.observation_space.shape[0],
        action_size = self.env.action_space.n,
        hidden_size = 128
        )

    self.batch_size = batch_size
    self.gamma = gamma
    self.optimizer = optim.Adam(
        params = self.q_network.parameters(),
        lr = lr
    )

# Exploration - Exploitation trade-off
  def get_action(self, state, epsilon) -> int:
    if np.random.rand() <= epsilon: # -> Exploration
      return self.env.action_space.sample()

    else: # -> Exploitation
      q_value = self.q_network(torch.from_numpy(state).float().unsqueeze(0))[0]
      return torch.argmax(q_value).item()

  def append_sample(self, state, action, reward, next_state, done):
    self.replay_buffer.append((state, action, reward, next_state, done))

  def train_step(self):
    if len(self.replay_buffer) < self.batch_size:
      return

    mini_batch = random.sample(self.replay_buffer, self.batch_size)
    states, actions, rewards , next_states, dones = zip(*mini_batch)

    states = np.array(states, dtype=np.float32) # UserWarning: Creating a tensor from a list of numpy.ndarrays is extremely slow. Please consider converting the list to a single numpy.ndarray with numpy.array() before converting to a tensor
    actions = np.array(actions, dtype=np.int64)
    rewards = np.array(rewards, dtype=np.float32)
    next_states = np.array(next_states, dtype=np.float32)
    dones = np.array(dones, dtype=np.float32)

    states = torch.tensor(states, dtype = torch.float32)
    actions = torch.tensor(actions, dtype = torch.int64)
    rewards = torch.tensor(rewards, dtype = torch.float32)
    next_states = torch.tensor(next_states, dtype = torch.float32)
    dones = torch.tensor(dones, dtype = torch.float32)

    curr_Qs = self.q_network(states).gather(1, actions.unsqueeze(1)).squeeze(1)
    next_Qs = self.q_network(next_states).max(1)[0].detach() # .max() returns ((values), (indicies))
    target_Qs = rewards + self.gamma * next_Qs * (1 - dones)

    loss = F.mse_loss(curr_Qs, target_Qs)

    self.optimizer.zero_grad()
    loss.backward()
    self.optimizer.step()

In [79]:
ENV_NAME = "CartPole-v1"
# Hyperparameters
BUFFER_LIMIT = 2000

MAX_EPISODES = 200
BATCH_SIZE  = 32

LEARNING_RATE = 0.001   # increase when cart keep moves one-way

epsilon = 1.0
MAX_EPSILON = 1.0                # Upper bound of epsilon
MIN_EPSILON = 0.01               # Lower bound of epsilon
DECAY_RATE = 0.005

GAMMA = 0.99                       # Discount factor

SEED = 1
# ------------------------------------------------------- #
env = gym.make(ENV_NAME)
agent = DQNAgent(
    env = env,
    buffer_limit = BUFFER_LIMIT,
    batch_size = BATCH_SIZE,
    gamma = GAMMA,
    lr = LEARNING_RATE
)

# ------------------------------------ train loop ------------------------------------ #
with tqdm(total = MAX_EPISODES, desc = "Episode") as pbar:
  for episode in range(MAX_EPISODES):
    score = []
    ## Reset environment and get first new observation
    state, _ = agent.env.reset(seed = SEED)
    episode_reward = 0
    done = False  # has the enviroment finished?

    while not done:
      action = agent.get_action(state, epsilon = epsilon)
      next_state, reward, done, _, _ = agent.env.step(action)
      agent.append_sample(state, action, reward, next_state, done)

      state = next_state # update state
      episode_reward += reward

      # for visualization (not necessary)
      if done:
        scores.append(episode_reward)
        pbar.set_postfix({'episode_reward': episode_reward})
        pbar.update(1)
        break

      if len(agent.replay_buffer) >= agent.batch_size:
        agent.train_step()

    # epsilon -= (MAX_EPSILON - MIN_EPSILON) / MAX_EPISODES # Linear decay
    epsilon = MIN_EPSILON + (MAX_EPSILON - MIN_EPSILON)*np.exp(-DECAY_RATE*episode)  # Exponential decay

Episode: 100%|██████████| 200/200 [00:48<00:00,  4.09it/s, episode_reward=155]


In [82]:
from gymnasium.wrappers import RecordVideo
from IPython.display import Video
import os

def evaluate_dqn_model(env_name, model, num_episodes=5, device="cpu"):

    video_folder = "./videos/"
    os.makedirs(video_folder, exist_ok=True)

    # Gymnasium 환경 생성
    env = RecordVideo(gym.make(env_name, render_mode="rgb_array"), video_folder=video_folder, episode_trigger=lambda x: True)

    for episode in range(num_episodes):
        state, _ = env.reset()  # 환경 초기화
        done = False
        total_reward = 0

        print(f"Episode {episode + 1}")
        while not done:

            # 행동 선택 (탐욕 정책)
            with torch.no_grad():
                action = model.get_action(state, epsilon = epsilon)

            # 환경에서 행동 수행
            next_state, reward, done, _, _ = env.step(action)

            # 보상 합산
            total_reward += reward

            # 다음 상태로 이동
            state = next_state

        print(f"Total Reward in Episode {episode + 1}: {total_reward}")

    env.close()

    video_path = os.path.join(video_folder, os.listdir(video_folder)[0])
    return video_path

# 모델 추론 실행
if __name__ == "__main__":
    ENV_NAME = "CartPole-v1"
    # MODEL_PATH = "dqn_cartpole.pth"  # 학습된 모델 경로
    video_path = evaluate_dqn_model(ENV_NAME, agent, num_episodes=5, device="cpu")

  logger.warn(


Episode 1
Total Reward in Episode 1: 117.0
Episode 2
Total Reward in Episode 2: 9.0
Episode 3
Total Reward in Episode 3: 182.0
Episode 4
Total Reward in Episode 4: 199.0
Episode 5
Total Reward in Episode 5: 194.0


AttributeError: 'Video' object has no attribute 'display'