In [1]:
!pip3 install gymnasium[classic_control]

Collecting gymnasium[classic_control]
  Downloading gymnasium-1.0.0-py3-none-any.whl.metadata (9.5 kB)
Collecting farama-notifications>=0.0.1 (from gymnasium[classic_control])
  Downloading Farama_Notifications-0.0.4-py3-none-any.whl.metadata (558 bytes)
Downloading Farama_Notifications-0.0.4-py3-none-any.whl (2.5 kB)
Downloading gymnasium-1.0.0-py3-none-any.whl (958 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m958.1/958.1 kB[0m [31m11.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: farama-notifications, gymnasium
Successfully installed farama-notifications-0.0.4 gymnasium-1.0.0


In [2]:
## 라이브러리 import
import gymnasium as gym
import collections
import random
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import pickle
import os
from gym.wrappers.record_video import RecordVideo
import matplotlib.pyplot as plt

learning_rate = 0.005
gamma = 0.98
buffer_limit = 50000
batch_size = 64

class ReplayBuffer():
    def __init__(self):
        self.buffer = collections.deque(maxlen=buffer_limit)

    def put(self, transition):
        self.buffer.append(transition)

    def sample(self, n):
        mini_batch = random.sample(self.buffer, n)
        s_lst, a_lst, r_lst, s_prime_lst, done_mask_lst = [], [], [], [], []

        for transition in mini_batch:
            s, a, r, s_prime, done_mask = transition
            s_lst.append(s)
            a_lst.append([a])
            r_lst.append([r])
            s_prime_lst.append(s_prime)
            done_mask_lst.append([done_mask])

        return torch.tensor(s_lst, dtype=torch.float), torch.tensor(a_lst), \
               torch.tensor(r_lst), torch.tensor(s_prime_lst, dtype=torch.float), \
               torch.tensor(done_mask_lst)

    def size(self):
        return len(self.buffer)

class Qnet(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(Qnet, self).__init__()
        self.fc1 = nn.Linear(input_dim, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, output_dim)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

    def sample_action(self, obs, epsilon):
        out = self.forward(obs)
        coin = random.random()
        if coin < epsilon:
            return random.randint(0,1)
        else :
            return out.argmax().item()

def train(q, q_target, memory, optimizer):
    for i in range(10):
        s,a,r,s_prime,done_mask = memory.sample(batch_size)

        q_out = q(s)
        q_a = q_out.gather(1,a)
        max_q_prime = q_target(s_prime).max(1)[0].unsqueeze(1)
        target = r + gamma * max_q_prime * done_mask
        loss = F.smooth_l1_loss(q_a, target)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

def main():
    env = gym.make("MountainCar-v0", render_mode="rgb_array")

    state_dim = env.observation_space.shape[0]
    action_dim = env.action_space.n

    q = Qnet(state_dim, action_dim)
    q_target = Qnet(state_dim, action_dim)
    q_target.load_state_dict(q.state_dict())
    memory = ReplayBuffer()

    best_score = -float('inf')
    print_interval = 1
    score = 0.0
    avg_scores = []
    optimizer = optim.Adam(q.parameters(), lr=learning_rate)

    for n_epi in range(1000):
        epsilon = max(0.001, 0.1 - 0.0001 * (n_epi / 10))
        s, _ = env.reset()
        done = False
        episode_reward = 0
        t = 0

        while not done:
            a = q.sample_action(torch.from_numpy(s).float(), epsilon)
            step_result = env.step(a)  # 반환 값을 변수로 저장
            if len(step_result) == 5:
                s_prime, r, terminated, truncated, info = step_result
            else:  # truncated가 없는 경우
                s_prime, r, terminated, info = step_result
                truncated = False

            done = terminated or truncated  # 종료 조건 업데이트

            position, velocity = s_prime
            if velocity < 0 and a == 0:
                r += 2
            elif velocity > 0 and a == 2:
                r += 2
            elif velocity < 0 and a == 2:
                r -= 1
            elif velocity > 0 and a == 0:
                r -= 1
            if position >= 0.5:
                r += 100

            r += abs(velocity) * 0.1
            r += abs(position) * 0.1
            r += max(0, 200 - t) * 0.01

            if done and position < 0.5:
                r -= 50

            done_mask = 0.0 if done else 1.0
            memory.put((s, a, r, s_prime, done_mask))
            s = s_prime
            episode_reward += r
            t += 1

            if done:
                break

        score += episode_reward
        if memory.size() > 2000:
            train(q, q_target, memory, optimizer)

        if score >= best_score:
            best_score = score
            torch.save(q.state_dict(), f"dqn_mcar{n_epi}__.pth")
            print(f"New best score: {best_score}. Model saved.")

        if n_epi % print_interval == 0 and n_epi != 0:
            avg_score = score / print_interval
            avg_scores.append(avg_score)
            print("n_episode :{}, score : {:.1f}, n_buffer : {}, eps : {:.1f}%".format(
                n_epi, score / print_interval, memory.size(), epsilon * 100))
            score = 0.0

        if n_epi % 100 == 0:
            q_target.load_state_dict(q.state_dict())

        video_env = RecordVideo(gym.make('MountainCar-v0', render_mode="rgb_array"), video_folder="./dqn_videos", episode_trigger=lambda e: True)
        for episode in range(10):
            observation, _ = video_env.reset()
            done = False
            while not done:
                prob = agent(torch.from_numpy(observation).float().to(device))
                m = Categorical(prob)
                action = m.sample()
                observation, _, done, info = video_env.step(action.item())
            print(f"Recorded Episode {episode}")

        video_env.close()

    env.close()

    with open("dqn_scores.pkl", "wb") as f:
        pickle.dump(avg_scores, f)
    print("DQN 학습 결과 저장 완료: dqn_scores.pkl")

    plt.plot(avg_scores)
    plt.xlabel('Episodes')
    plt.ylabel('Average Score')
    plt.title('Average Score vs Episodes')
    plt.show()

if __name__ == '__main__':
    main()


New best score: -32.36760248302778. Model saved.


  deprecation(
  from pkg_resources import resource_stream, resource_exists
Implementing implicit namespace packages (as specified in PEP 420) is preferred to `pkg_resources.declare_namespace`. See https://setuptools.pypa.io/en/latest/references/keywords.html#keyword-namespace-packages
  declare_namespace(pkg)
Implementing implicit namespace packages (as specified in PEP 420) is preferred to `pkg_resources.declare_namespace`. See https://setuptools.pypa.io/en/latest/references/keywords.html#keyword-namespace-packages
  declare_namespace(pkg)
Implementing implicit namespace packages (as specified in PEP 420) is preferred to `pkg_resources.declare_namespace`. See https://setuptools.pypa.io/en/latest/references/keywords.html#keyword-namespace-packages
  declare_namespace(pkg)


NameError: name 'agent' is not defined