In [1]:
import gymnasium as gym
import matplotlib
from matplotlib.animation import FuncAnimation
import matplotlib.pyplot as plt
import torch
import cv2
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from collections import namedtuple, deque
from itertools import count
import random
import math
import pickle
from tqdm import tqdm

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


In [2]:
def image_preprocessing(img):
  img = cv2.resize(img, dsize=(84, 84))
  img = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY) / 255.0
  return img

In [3]:
class CarEnvironment(gym.Wrapper):
  def __init__(self, env, skip_frames=2, stack_frames=4, no_operation=5, **kwargs):
    super().__init__(env, **kwargs)
    self._no_operation = no_operation
    self._skip_frames = skip_frames
    self._stack_frames = stack_frames

  def reset(self):
    observation, info = self.env.reset()

    for i in range(self._no_operation):
      observation, reward, terminated, truncated, info = self.env.step(0)

    observation = image_preprocessing(observation)
    self.stack_state = np.tile(observation, (self._stack_frames, 1, 1))
    return self.stack_state, info


  def step(self, action):
    total_reward = 0
    for i in range(self._skip_frames):
      observation, reward, terminated, truncated, info = self.env.step(action)
      total_reward += reward
      if terminated or truncated:
        break

    observation = image_preprocessing(observation)
    self.stack_state = np.concatenate((self.stack_state[1:], observation[np.newaxis]), axis=0)
    return self.stack_state, total_reward, terminated, truncated, info

In [4]:
class CNN(nn.Module):
  def __init__(self, in_channels, out_channels, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self._n_features = 32 * 9 * 9

    self.conv = nn.Sequential(
        nn.Conv2d(in_channels, 16, kernel_size=8, stride=4),
        nn.ReLU(),
        nn.Conv2d(16, 32, kernel_size=4, stride=2),
        nn.ReLU(),
    )

    self.fc = nn.Sequential(
        nn.Linear(self._n_features, 256),
        nn.ReLU(),
        nn.Linear(256, out_channels),
    )


  def forward(self, x):
    x = self.conv(x)
    x = x.view((-1, self._n_features))
    x = self.fc(x)
    return x

In [5]:
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward'))

class ReplayMemory(object):

    def __init__(self, capacity):
        self.memory = deque([], maxlen=capacity)

    def push(self, *args):
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

In [6]:
class DQN:
  def __init__(self, action_space, batch_size=256, gamma=0.99, eps_start=0.9, eps_end=0.05, eps_decay=1000, lr=0.001):
    self._n_observation = 4
    self._n_actions = 5
    self._action_space = action_space
    self._batch_size = batch_size
    self._gamma = gamma
    self._eps_start = eps_start
    self._eps_end = eps_end
    self._eps_decay = eps_decay
    self._lr = lr
    self._total_steps = 0
    self._evaluate_loss = []
    self.network = CNN(self._n_observation, self._n_actions).to(device)
    self.target_network = CNN(self._n_observation, self._n_actions).to(device)
    self.target_network.load_state_dict(self.network.state_dict())
    self.optimizer = optim.AdamW(self.network.parameters(), lr=self._lr, amsgrad=True)
    self._memory = ReplayMemory(10000)

  """
  This function is called during training & evaluation phase when the agent
  interact with the environment and needs to select an action.

  (1) Exploitation: This function feeds the neural network a state
  and then it selects the action with the highest Q-value.
  (2) Evaluation mode: This function feeds the neural network a state
  and then it selects the action with the highest Q'-value.
  (3) Exploration mode: It randomly selects an action through sampling

  Q -> network (policy)
  Q'-> target network (best policy)
  """
  def select_action(self, state, evaluation_phase=False):

    # Generating a random number for eploration vs exploitation
    sample = random.random()

    # Calculating the threshold - the more steps the less exploration we do
    eps_threshold = self._eps_end + (self._eps_start - self._eps_end) * math.exp(-1. * self._total_steps / self._eps_decay)
    self._total_steps += 1

    if evaluation_phase:
      with torch.no_grad():
        return self.target_network(state).max(1).indices.view(1, 1)
    elif sample > eps_threshold:
      with torch.no_grad():
        return self.network(state).max(1).indices.view(1, 1)
    else:
      return torch.tensor([[self._action_space.sample()]], device=device, dtype=torch.long)

  def train(self):

    if len(self._memory) < self._batch_size:
        return

    # Initializing our memory
    transitions = self._memory.sample(self._batch_size)

    # Initializing our batch
    batch = Transition(*zip(*transitions))

    # Saving in a new tensor all the indices of the states that are non terminal
    non_final_mask = torch.tensor(tuple(map(lambda s: s is not None, batch.next_state)), device=device, dtype=torch.bool)

    # Saving in a new tensor all the non final next states
    non_final_next_states = torch.cat([s for s in batch.next_state if s is not None])

    state_batch = torch.cat(batch.state)
    action_batch = torch.cat(batch.action)
    reward_batch = torch.cat(batch.reward)

    # Feeding our Q network the batch with states and then we gather the Q values of the selected actions
    state_action_values = self.network(state_batch).gather(1, action_batch)

    # We then, for every state in the batch that is NOT final, we pass it in the target network to get the Q'-values and choose the max one
    next_state_values = torch.zeros(self._batch_size, device=device)
    with torch.no_grad():
        next_state_values[non_final_mask] = self.target_network(non_final_next_states).max(1).values

    # Computing the expecting values with: reward + gamma * max(Q')
    expected_state_action_values = (next_state_values * self._gamma) + reward_batch

    # Defining our loss criterion
    criterion = nn.SmoothL1Loss()
    loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1))

    # Updating with back propagation
    self.optimizer.zero_grad()
    loss.backward()

    torch.nn.utils.clip_grad_value_(self.network.parameters(), 100)
    self.optimizer.step()

    self._evaluate_loss.append(loss.item())

    return

  def copy_weights(self):
    self.target_network.load_state_dict(self.network.state_dict())

  def get_loss(self):
    return self._evaluate_loss

  def save_model(self, i):
    torch.save(self.target_network.state_dict(), f'model_weights_{i}.pth')

  def load_model(self, i):
    self.target_network.load_state_dict(torch.load(f'model_weights_{i}.pth', remap_location=device))

In [7]:
class DynaQ:
    def __init__(self, action_space, batch_size=1024, gamma=0.99, eps_start=0.9, eps_end=0.05, eps_decay=1000, lr=0.002, model_learning_steps=5):
        self._n_observation = 4
        self._n_actions = 5
        self._action_space = action_space
        self._batch_size = batch_size
        self._gamma = gamma
        self._eps_start = eps_start
        self._eps_end = eps_end
        self._eps_decay = eps_decay
        self._lr = lr
        self._total_steps = 0
        self._evaluate_loss = []  # Stores per-iteration losses
        self.network = CNN(self._n_observation, self._n_actions).to(device)
        self.target_network = CNN(self._n_observation, self._n_actions).to(device)
        self.target_network.load_state_dict(self.network.state_dict())
        self.optimizer = optim.AdamW(self.network.parameters(), lr=self._lr, amsgrad=True)
        self._memory = ReplayMemory(60000)
        self._model_learning_steps = model_learning_steps  # Number of simulated experiences for Dyna-Q
        self.model = {}  # Stores the learned environment model

    def select_action(self, state, evaluation_phase=False):
        sample = random.random()
        eps_threshold = self._eps_end + (self._eps_start - self._eps_end) * math.exp(-1. * self._total_steps / self._eps_decay)
        self._total_steps += 1

        if evaluation_phase:
            with torch.no_grad():
                return self.target_network(state).max(1).indices.view(1, 1)
        elif sample > eps_threshold:
            with torch.no_grad():
                return self.network(state).max(1).indices.view(1, 1)
        else:
            return torch.tensor([[self._action_space.sample()]], device=device, dtype=torch.long)

    def train(self):
        if len(self._memory) < self._batch_size:
            return

        transitions = self._memory.sample(self._batch_size)
        batch = Transition(*zip(*transitions))

        non_final_mask = torch.tensor(tuple(map(lambda s: s is not None, batch.next_state)), device=device, dtype=torch.bool)
        non_final_next_states = torch.cat([s for s in batch.next_state if s is not None])

        state_batch = torch.cat(batch.state)
        action_batch = torch.cat(batch.action)
        reward_batch = torch.cat(batch.reward)

        state_action_values = self.network(state_batch).gather(1, action_batch)
        next_state_values = torch.zeros(self._batch_size, device=device)
        with torch.no_grad():
            next_state_values[non_final_mask] = self.target_network(non_final_next_states).max(1).values

        expected_state_action_values = (next_state_values * self._gamma) + reward_batch

        criterion = nn.SmoothL1Loss()
        loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1))

        self.optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_value_(self.network.parameters(), 100)
        self.optimizer.step()

        self._evaluate_loss.append(loss.item())

        # Perform Dyna-Q model learning and simulation
        self.simulate_experiences()

    def simulate_experiences(self):
        for _ in range(self._model_learning_steps):
            if len(self.model) == 0:
                break

            state = random.choice(list(self.model.keys()))
            action = random.choice(list(self.model[state].keys()))
            next_state, reward = self.model[state][action]

            state_tensor = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
            next_state_tensor = torch.tensor(next_state, dtype=torch.float32, device=device).unsqueeze(0) if next_state is not None else None
            reward_tensor = torch.tensor([reward], device=device)

            self._memory.push(state_tensor, torch.tensor([[action]], device=device), next_state_tensor, reward_tensor)

    def update_model(self, state, action, next_state, reward):
        state_tuple = tuple(state.cpu().numpy().flatten())
        next_state_tuple = tuple(next_state.cpu().numpy().flatten()) if next_state is not None else None

        if state_tuple not in self.model:
            self.model[state_tuple] = {}
        self.model[state_tuple][action.item()] = (next_state_tuple, reward.item())

    def copy_weights(self):
        self.target_network.load_state_dict(self.network.state_dict())

    def get_loss(self):
        # Return losses if available, otherwise return a default value
        return self._evaluate_loss if self._evaluate_loss else [0.0]

    def save_model(self, i):
        torch.save(self.target_network.state_dict(), f'model_weights_{i}.pth')

    def load_model(self, i):
        self.target_network.load_state_dict(torch.load(f'model_weights_{i}.pth', map_location=device))


In [None]:
rewards_per_episode = []
episode_duration = []
average_episode_loss = []

episodes = 3000
C = 5

env = gym.make('CarRacing-v2', lap_complete_percent=0.95, continuous=False)
n_actions = env.action_space
agent = DynaQ(n_actions)

for episode in tqdm(range(1, episodes + 1)):

  if episode % 10 == 0:
    print(f"{episode} episodes done")

  env = gym.make('CarRacing-v2', continuous=False)
  env = CarEnvironment(env)

  state, info = env.reset()

  state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)

  episode_total_reward = 0

  for t in count():
    action = agent.select_action(state)
    observation, reward, terminated, truncated, _ = env.step(action.item())
    reward = torch.tensor([reward], device=device)
    episode_total_reward += reward
    done = terminated or truncated

    if terminated:
      next_state = None
      print("Finished the lap successfully!")
    else:
      next_state = torch.tensor(observation, dtype=torch.float32, device=device).unsqueeze(0)

    agent._memory.push(state, action, next_state, reward)

    state = next_state

    agent.train()

    if done:
      if agent._memory.__len__() >= 128:
        episode_duration.append(t + 1)
        rewards_per_episode.append(episode_total_reward)
        ll = agent.get_loss()
        #print(f"ll: {ll}")
        average_episode_loss.append(sum(ll) / len(ll))
      break

    if episode % 100 == 0:
      agent.save_model(episode)
      with open('statistics.pkl', 'wb') as f:
        pickle.dump((episode_duration, rewards_per_episode, average_episode_loss), f)


  if episode % C == 0:
    agent.copy_weights()

agent.save_model(episodes)
with open('statistics.pkl', 'wb') as f:
  pickle.dump((episode_duration, rewards_per_episode, average_episode_loss), f)

  0%|          | 9/3000 [01:38<9:42:30, 11.69s/it]

10 episodes done


  1%|          | 19/3000 [03:37<9:47:54, 11.83s/it]

20 episodes done


  1%|          | 29/3000 [05:39<10:01:49, 12.15s/it]

30 episodes done


  1%|▏         | 39/3000 [07:42<10:03:52, 12.24s/it]

40 episodes done


  2%|▏         | 49/3000 [09:46<10:08:35, 12.37s/it]

50 episodes done


  2%|▏         | 59/3000 [11:50<10:05:03, 12.34s/it]

60 episodes done


  2%|▏         | 67/3000 [13:26<9:24:54, 11.56s/it] 

Finished the lap successfully!


  2%|▏         | 69/3000 [13:51<9:50:32, 12.09s/it]

70 episodes done


  2%|▏         | 73/3000 [14:38<9:27:13, 11.63s/it]

Finished the lap successfully!


  3%|▎         | 76/3000 [15:14<9:33:28, 11.77s/it]

Finished the lap successfully!


  3%|▎         | 79/3000 [15:46<8:26:32, 10.40s/it]

Finished the lap successfully!
80 episodes done


  3%|▎         | 80/3000 [15:52<7:20:36,  9.05s/it]

Finished the lap successfully!


  3%|▎         | 81/3000 [16:02<7:41:53,  9.49s/it]

Finished the lap successfully!


  3%|▎         | 84/3000 [16:38<8:53:16, 10.97s/it]

Finished the lap successfully!


  3%|▎         | 87/3000 [17:14<9:12:04, 11.37s/it]

Finished the lap successfully!


  3%|▎         | 88/3000 [17:20<7:58:08,  9.85s/it]

Finished the lap successfully!


  3%|▎         | 89/3000 [17:30<8:00:42,  9.91s/it]

Finished the lap successfully!
90 episodes done


  3%|▎         | 93/3000 [18:19<9:12:23, 11.40s/it]

Finished the lap successfully!


  3%|▎         | 96/3000 [18:52<8:35:36, 10.65s/it]

Finished the lap successfully!


  3%|▎         | 99/3000 [19:27<8:56:49, 11.10s/it]

Finished the lap successfully!
100 episodes done


  3%|▎         | 100/3000 [19:35<8:12:11, 10.18s/it]

Finished the lap successfully!


  3%|▎         | 104/3000 [20:19<8:00:50,  9.96s/it]

Finished the lap successfully!


  4%|▎         | 105/3000 [20:26<7:13:38,  8.99s/it]

Finished the lap successfully!


  4%|▎         | 108/3000 [21:03<8:48:10, 10.96s/it]

Finished the lap successfully!


  4%|▎         | 109/3000 [21:12<8:24:26, 10.47s/it]

Finished the lap successfully!
110 episodes done


  4%|▍         | 115/3000 [22:25<8:55:01, 11.13s/it] 

Finished the lap successfully!


  4%|▍         | 116/3000 [22:34<8:22:49, 10.46s/it]

Finished the lap successfully!


  4%|▍         | 117/3000 [22:40<7:19:26,  9.15s/it]

Finished the lap successfully!


  4%|▍         | 118/3000 [22:48<6:56:17,  8.67s/it]

Finished the lap successfully!


  4%|▍         | 119/3000 [23:01<8:01:38, 10.03s/it]

120 episodes done


  4%|▍         | 120/3000 [23:06<6:53:06,  8.61s/it]

Finished the lap successfully!


  4%|▍         | 121/3000 [23:13<6:17:51,  7.87s/it]

Finished the lap successfully!


  4%|▍         | 122/3000 [23:20<6:04:34,  7.60s/it]

Finished the lap successfully!


  4%|▍         | 125/3000 [23:53<7:21:37,  9.22s/it]

Finished the lap successfully!


  4%|▍         | 129/3000 [24:48<10:05:52, 12.66s/it]

130 episodes done


  4%|▍         | 130/3000 [24:55<8:52:43, 11.14s/it] 

Finished the lap successfully!


  4%|▍         | 134/3000 [25:50<10:25:09, 13.09s/it]

In [None]:
def plot_statistics(x, y, title, x_axis, y_axis):
    plt.plot(x, y)
    plt.xlabel(x_axis)
    plt.ylabel(y_axis)
    plt.title(title)
    plt.grid(True)
    plt.savefig(f'{title.replace(" ", "_")}.png')  # 공백 대신 밑줄 사용
    plt.show()
     

In [None]:
eval_env = gym.make('CarRacing-v2', continuous=False, render_mode='rgb_array')
eval_env = CarEnvironment(eval_env)
n_actions = eval_env.action_space
agent = DynaQ(n_actions)
agent.load_model(3000)

frames = []
scores = 0
s, _ = eval_env.reset()

eval_env.np_random = np.random.default_rng(42)

done, ret = False, 0

from PIL import Image as PILImage
def render2img(_img): return PILImage.fromarray(_img, "RGB")
handle = display(None, display_id=True)
while not done:
    _render = eval_env.render()
    handle.update(render2img(_render))
    frames.append(_render)
    s = torch.tensor(s, dtype=torch.float32, device=device).unsqueeze(0)
    a = agent.select_action(s, evaluation_phase=True)
    discrete_action = a.item() % 5
    s_prime, r, terminated, truncated, info = eval_env.step(discrete_action)
    s = s_prime
    ret += r
    done = terminated or truncated
    if terminated:
      print(terminated)
      
scores += ret

print(scores)
def animate(imgs, video_name, _return=True):
    import cv2
    import os
    import string
    import random

    if video_name is None:
        video_name = ''.join(random.choice(string.ascii_letters) for i in range(18)) + '.webm'
    height, width, layers = imgs[0].shape
    fourcc = cv2.VideoWriter_fourcc(*'VP90')
    video = cv2.VideoWriter(video_name, fourcc, 10, (width, height))

    for img in imgs:
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        video.write(img)
    video.release()

In [None]:
animate(frames, None)

with open('statistics.pkl', 'rb') as f:
    data_tuple = pickle.load(f)

episode_duration, rewards_per_episode, average_episode_loss = data_tuple

x = [k for k in range(299)]

rewards_per_episode = [tensor.cpu() if tensor.is_cuda else tensor for tensor in rewards_per_episode]

plot_statistics(x, rewards_per_episode, "Rewards for every episode", "Episode", "Reward")
plot_statistics(x, average_episode_loss, "Average loss for every episode", "Episode", "Average Loss")