* 실행환경: Colab
* python version: 3.9.9
* gymnasium: 1.0.0
* torch: 2.5.1+cu118

In [None]:
# %%capture
# %%bash
# pip install gymnasium

In [None]:
import gymnasium as gym
import os
import math
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import random
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple, deque
from itertools import count
from gymnasium.envs.registration import register
from gymnasium.envs.classic_control.acrobot import AcrobotEnv
import warnings

warnings.filterwarnings('ignore')

# set up matplotlib
is_ipython = 'inline' in matplotlib.get_backend()
if is_ipython:
    from IPython import display

plt.ion() # matplotlib를 interactive mode로 설정 -> 그래프를 실시간으로 업데이트할 수 있도록 함

device = torch.device(
    "cuda" if torch.cuda.is_available() else
    "mps" if torch.backends.mps.is_available() else
    "cpu"
)
print(device)

cuda


In [3]:
class CustomAcrobotEnv(AcrobotEnv):
    def __init__(self, link1_length=1.0, link2_length=1.0, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.LINK_LENGTH_1 = link1_length 
        self.LINK_LENGTH_2 = link2_length 

register(
    id='CustomAcrobot-v0',
    entry_point=__name__ + ':CustomAcrobotEnv',
    kwargs={'link1_length': 1.0, 'link2_length': 1.0},
    max_episode_steps=500
)

# Load the custom environment
env = gym.make('CustomAcrobot-v0')
print(env.spec)

EnvSpec(id='CustomAcrobot-v0', entry_point='__main__:CustomAcrobotEnv', reward_threshold=None, nondeterministic=False, max_episode_steps=500, order_enforce=True, disable_env_checker=False, kwargs={'link1_length': 1.0, 'link2_length': 1.0}, namespace=None, name='CustomAcrobot', version=0, additional_wrappers=(), vector_entry_point=None)


In [4]:
SEED = 42

env.action_space.seed(SEED)
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)

<torch._C.Generator at 0x1d18a4f01b0>

In [5]:
def plot_rewards(show_result=False):
    
    rewards_t = torch.tensor(episode_rewards, dtype=torch.float)

    plt.clf()
    plt.title('Result')
    plt.xlabel('Episode')
    plt.ylabel('Total Reward')
    plt.plot(rewards_t.numpy(), label='Episode Reward')

    # 100 에피소드 이동 평균 계산 및 플롯
    if len(rewards_t) >= 100:
        means = rewards_t.unfold(0, 100, 1).mean(1).flatten()
        means = torch.cat((torch.zeros(99), means))
        plt.plot(means.numpy(), label='100 Episode Avg', linestyle='--')

    plt.legend()
    plt.pause(0.001)

    if is_ipython:
        if not show_result:
            display.display(plt.gcf())
            display.clear_output(wait=True)
        else:
            display.display(plt.gcf())

In [6]:
class DuelingDQN(nn.Module):
    def __init__(self, n_observations, n_actions):
        super(DuelingDQN, self).__init__()
        self.fc1 = nn.Linear(n_observations, 256)
        self.fc2 = nn.Linear(256, 256)
        self.fc3 = nn.Linear(256, 128)

        # DuelingDQN 적용
        # 상태값 계산을 위한 레이어
        self.fc_value = nn.Sequential(
            nn.Linear(128, 64),
            nn.LeakyReLU(),
            nn.Dropout(p=0.2),
            nn.Linear(64, 1)
            )

        # 행동 이점 계산을 위한 레이어
        self.fc_advantage = nn.Sequential(
            nn.Linear(128, 64),
            nn.LeakyReLU(),
            nn.Dropout(p=0.2),
            nn.Linear(64, n_actions)  # 행동 이점은 행동 공간 크기
        )

    def forward(self, x):
        x = torch.nn.functional.leaky_relu(self.fc1(x))
        x = torch.nn.functional.leaky_relu(self.fc2(x))
        x = torch.nn.functional.leaky_relu(self.fc3(x))

        # 상태값과 행동 이점 계산
        value = self.fc_value(x)
        advantage = self.fc_advantage(x)

        # 최종 Q값 계산
        q_values = value + advantage - advantage.mean(1, keepdim=True)
        return q_values

Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward'))

class ReplayMemory(object):

    def __init__(self, capacity):
        self.memory = deque([], maxlen=capacity)

    def push(self, *args):
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

In [None]:
def optimize_model():
    if len(memory) < BATCH_SIZE:
        return
    transitions = memory.sample(BATCH_SIZE)

    batch = Transition(*zip(*transitions))

    non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
                                          batch.next_state)), device=device, dtype=torch.bool)
    non_final_next_states = torch.cat([s for s in batch.next_state
                                                if s is not None])
    state_batch = torch.cat(batch.state)
    action_batch = torch.cat(batch.action)
    reward_batch = torch.cat(batch.reward)

    state_action_values = policy_net(state_batch).gather(1, action_batch)

    next_state_values = torch.zeros(BATCH_SIZE, device=device)
    with torch.no_grad():
        next_state_values[non_final_mask] = target_net(non_final_next_states).max(1).values
    
    # 기대 Q값 계산
    expected_state_action_values = (next_state_values * GAMMA) + reward_batch

    # Compute Huber loss
    criterion = nn.SmoothL1Loss()
    loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1))

    # Optimize the model
    optimizer.zero_grad()
    loss.backward()
    torch.nn.utils.clip_grad_value_(policy_net.parameters(), 100)
    optimizer.step()

def select_action(state):
    global steps_done
    sample = random.random()
    eps_threshold = EPS_END + (EPS_START - EPS_END) * math.exp(-1. * steps_done / EPS_DECAY)
    steps_done += 1

    if sample > eps_threshold:
        with torch.no_grad():
            return policy_net(state).max(1).indices.view(1, 1) # 가장 큰 q 값을 뽑음
    else:
        return torch.tensor([[env.action_space.sample()]], device=device, dtype=torch.long) # 초기에는 많은 랜덤의 action을 취하게 됨

In [8]:
# 학습 파라미터
BATCH_SIZE = 128
GAMMA = 0.99
EPS_START = 1.0
EPS_END = 0.1
EPS_DECAY = 20000 # 크게 할수록 캄험이 더 오래 지속 -> 새로운 더 많이 탐색
TAU = 0.005
LR = 5e-4
memory_capacity = 50000 # 크게 할수록 에이전트가 더 많은 경험을 저장

n_actions = env.action_space.n # 3

state, info = env.reset()
n_observations = len(state) # 6

policy_net = DuelingDQN(n_observations, n_actions).to(device)
target_net = DuelingDQN(n_observations, n_actions).to(device)
target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

optimizer = optim.AdamW(policy_net.parameters(), lr=LR, amsgrad=True)
memory = ReplayMemory(memory_capacity)

steps_done = 0

episode_rewards = []
score = 0.0

if torch.cuda.is_available() or torch.backends.mps.is_available():
    num_episodes = 1000
else:
    num_episodes = 50

In [None]:
for episode in range(num_episodes+1):

    state, info = env.reset(seed=SEED)
    state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)

    episode_reward = 0

    for t in count(): # 무한 반복
        if episode < 10:  # 초기 10 에피소드는 무조건 랜덤 행동
            action = torch.tensor([[env.action_space.sample()]], device=device, dtype=torch.long)
        else:
            action = select_action(state)

        next_state, reward, terminated, truncated, _ = env.step(action.item())
        reward = torch.tensor([reward], device=device)
        done = terminated or truncated

        score += reward.item()
        episode_reward += reward.item()

        if not done:
            next_state = torch.tensor(next_state, dtype=torch.float32, device=device).unsqueeze(0)
        else:
            next_state = None

        memory.push(state, action, next_state, reward)
        state = next_state

        # policy network 업데이트
        optimize_model()

        # target network 업데이트
        target_net_state_dict = target_net.state_dict()
        policy_net_state_dict = policy_net.state_dict()
        for key in policy_net_state_dict:
            target_net_state_dict[key] = policy_net_state_dict[key]*TAU + target_net_state_dict[key]*(1-TAU)
        target_net.load_state_dict(target_net_state_dict)

        if done:
            episode_rewards.append(episode_reward)
            break

    if episode % 10 == 0 and episode != 0:
        plot_rewards()

print('Complete')
plot_rewards(show_result=True)
plt.ioff()
plt.show()
print('Average Reward: {}'.format(sum(episode_rewards)/num_episodes))

In [None]:
episode_rewards