<a href="https://colab.research.google.com/github/hyeminboo/25-1-SelfDriving/blob/main/02_week4_DQN/02_week4_DQN_%EB%B6%80%ED%98%9C%EB%AF%BC.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

세 개의 레버가 달린 장치와 원숭이가 있다.

장치의 첫 번째 레버를 당기면 쓴 약이, 세 번째 레버를 당기면 바나나가 나온다.

한 번 당긴 레버는 자동으로 다시 올라간다고 가정할 때, 이 장치와 원숭이를 코드로 추상화시키고, DQN을 이용해 학습시켜라.

In [2]:
import random
import numpy as np
import gym
from collections import deque
import torch
import torch.nn as nn
import torch.optim as optim

In [26]:
class LeverEnv(gym.Env): # Gym의 Env를 기반으로 환경 만듦
    def __init__(self):
        super(LeverEnv, self).__init__()
        self.action_space = gym.spaces.Discrete(3)

    def reset(self):
        return np.array([0.0], dtype=np.float32)

    def step(self, action):
        if action == 0:
            reward = -100
        elif action == 1:
            reward = 10
        else:
            reward = 100
        done = True
        return np.array([0.0], dtype=np.float32), reward, done, {} # [0.0] : dummy state (원숭이가 행동을 결정할 때 어떤 상태 정보도 필요하지 않기 때문)

In [4]:
class QNetwork(nn.Module):
    def __init__(self):
        super(QNetwork, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(1, 32),
            nn.ReLU(),
            nn.Linear(32, 3) # 액션 수 만큼 출력
        )

    def forward(self, x):
        return self.fc(x)


In [5]:
class ReplayBuffer:
    def __init__(self, maxlen=1000):
        self.buffer = deque(maxlen=maxlen)

    def push(self, state, action, reward, next_state, done):
        # 경험 하나를 버퍼에 저장
        self.buffer.append((state, int(action), reward, next_state, done))

    def sample(self, batch_size):
        # 랜덤 샘플링
        batch = random.sample(self.buffer, batch_size)

        states = torch.FloatTensor([b[0] for b in batch])
        actions = torch.LongTensor([b[1] for b in batch])
        rewards = torch.FloatTensor([b[2] for b in batch])
        next_states = torch.FloatTensor([b[3] for b in batch])
        dones = torch.FloatTensor([float(b[4]) for b in batch])

        return states, actions, rewards, next_states, dones

    def __len__(self):
        return len(self.buffer)

In [18]:
def train():
    env = LeverEnv()
    qnet = QNetwork()
    target_net = QNetwork()
    target_net.load_state_dict(qnet.state_dict())
    target_net.eval() # target network는 학습하지 않음

    buffer = ReplayBuffer()
    optimizer = optim.Adam(qnet.parameters(), lr=0.001)
    loss_fn = nn.MSELoss()

    episodes = 1000
    batch_size = 32
    gamma = 0.9
    epsilon = 0.1
    update_target = 20


    for episode in range(episodes):
        state = env.reset()
        done = False
        total_reward = 0

        while not done:
            if random.random() < epsilon:
                action = env.action_space.sample()
            else:
                with torch.no_grad():
                    q_vals = qnet(torch.FloatTensor(state)) # state의 q 값 예측
                    action = torch.argmax(q_vals).item() # 가장 큰 q 값의 액션 선택

            next_state, reward, done, _ = env.step(action)
            buffer.push(state, action, reward, next_state, done)
            state = next_state
            total_reward += reward

            if len(buffer) >= batch_size:
                s, a, r, s_, d = buffer.sample(batch_size)

                q_values = qnet(s).gather(1, a.unsqueeze(1)).squeeze()

                with torch.no_grad():
                    next_q_values = target_net(s_).max(1)[0]
                    target = r + gamma * next_q_values * (1-d)

                loss = loss_fn(q_values, target)
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

        if (episode + 1) % update_target == 0:
            target_net.load_state_dict(qnet.state_dict())


        print(f"Episode {episode+1}, Reward: {np.mean(total_reward)}")


    return qnet

In [6]:
def test(qnet):
    env = LeverEnv()
    for a in range(3):
        with torch.no_grad():
            q_val = qnet(torch.FloatTensor([0]))[a].item()
            print(f"Lever {a}: Q value = {q_val:.2f}")

    state = env.reset()
    with torch.no_grad():
        action = torch.argmax(qnet(torch.FloatTensor(state))).item()
    print(f"Lever {action}을 당김")

In [None]:
qnet = train()
test(qnet)

# reward : -1, 0, 1

  self.buffer.append((state, int(action), reward, next_state, done))


Episode 20, Total Reward: 1
Episode 40, Total Reward: 1
Episode 60, Total Reward: 1
Episode 80, Total Reward: 1
Episode 100, Total Reward: 1
Episode 120, Total Reward: 1
Episode 140, Total Reward: 1
Episode 160, Total Reward: 1
Episode 180, Total Reward: 1
Episode 200, Total Reward: 1
Episode 220, Total Reward: 1
Episode 240, Total Reward: 1
Episode 260, Total Reward: 1
Episode 280, Total Reward: 1
Episode 300, Total Reward: 1
Lever 0: Q value = 1.00
Lever 1: Q value = 0.03
Lever 2: Q value = 1.00
Lever 2을 당김


In [27]:
qnet = train()
test(qnet)

# reward : -100 10 100

Episode 1, Reward: 10.0
Episode 2, Reward: 10.0
Episode 3, Reward: 10.0
Episode 4, Reward: 10.0
Episode 5, Reward: 10.0
Episode 6, Reward: 10.0
Episode 7, Reward: 10.0
Episode 8, Reward: 10.0
Episode 9, Reward: 10.0
Episode 10, Reward: 10.0
Episode 11, Reward: 10.0
Episode 12, Reward: 10.0
Episode 13, Reward: 10.0
Episode 14, Reward: 10.0
Episode 15, Reward: 10.0
Episode 16, Reward: 10.0
Episode 17, Reward: 10.0
Episode 18, Reward: 10.0
Episode 19, Reward: 10.0
Episode 20, Reward: 10.0
Episode 21, Reward: 10.0
Episode 22, Reward: 10.0
Episode 23, Reward: 10.0
Episode 24, Reward: -100.0
Episode 25, Reward: 10.0
Episode 26, Reward: -100.0
Episode 27, Reward: 10.0
Episode 28, Reward: 10.0
Episode 29, Reward: 10.0
Episode 30, Reward: 10.0
Episode 31, Reward: 10.0
Episode 32, Reward: 10.0
Episode 33, Reward: 10.0
Episode 34, Reward: 10.0
Episode 35, Reward: 10.0
Episode 36, Reward: 10.0
Episode 37, Reward: 10.0
Episode 38, Reward: 10.0
Episode 39, Reward: 10.0
Episode 40, Reward: 10.0
Episo