HELPERWRAPPER

In [1]:
from collections import deque
from typing import Union

import numpy as np

import gymnasium as gym
from gymnasium.error import DependencyNotInstalled
from gymnasium.spaces import Box

from lz4.block import compress, decompress



class LazyFrames:
    """Ensures common frames are only stored once to optimize memory use.

    To further reduce the memory use, it is optionally to turn on lz4 to compress the observations.

    Note:
        This object should only be converted to numpy array just before forward pass.
    """

    __slots__ = ("frame_shape", "dtype", "shape", "lz4_compress", "_frames")

    def __init__(self, frames: list, lz4_compress: bool = False):
        """Lazyframe for a set of frames and if to apply lz4.

        Args:
            frames (list): The frames to convert to lazy frames
            lz4_compress (bool): Use lz4 to compress the frames internally

        Raises:
            DependencyNotInstalled: lz4 is not installed
        """
        self.frame_shape = tuple(frames[0].shape)
        self.shape = (len(frames),) + self.frame_shape
        self.dtype = frames[0].dtype
        if lz4_compress:
            try:
                from lz4.block import compress
            except ImportError as e:
                raise DependencyNotInstalled(
                    "lz4 is not installed, run `pip install gymnasium[other]`"
                ) from e

            frames = [compress(frame) for frame in frames]
        self._frames = frames
        self.lz4_compress = lz4_compress

    def __array__(self, dtype=None):
        """Gets a numpy array of stacked frames with specific dtype.

        Args:
            dtype: The dtype of the stacked frames

        Returns:
            The array of stacked frames with dtype
        """
        arr = self[:]
        if dtype is not None:
            return arr.astype(dtype)
        return arr

    def __len__(self):
        """Returns the number of frame stacks.

        Returns:
            The number of frame stacks
        """
        return self.shape[0]

    def __getitem__(self, int_or_slice: Union[int, slice]):
        """Gets the stacked frames for a particular index or slice.

        Args:
            int_or_slice: Index or slice to get items for

        Returns:
            np.stacked frames for the int or slice

        """
        if isinstance(int_or_slice, int):
            return self._check_decompress(self._frames[int_or_slice])  # single frame
        return np.stack(
            [self._check_decompress(f) for f in self._frames[int_or_slice]], axis=0
        )

    def __eq__(self, other):
        """Checks that the current frames are equal to the other object."""
        return self.__array__() == other

    def _check_decompress(self, frame):
        if self.lz4_compress:
            from lz4.block import decompress

            return np.frombuffer(decompress(frame), dtype=self.dtype).reshape(
                self.frame_shape
            )
        return frame


class FrameStack(gym.ObservationWrapper, gym.utils.RecordConstructorArgs):
    """Observation wrapper that stacks the observations in a rolling manner.

    For example, if the number of stacks is 4, then the returned observation contains
    the most recent 4 observations. For environment 'Pendulum-v1', the original observation
    is an array with shape [3], so if we stack 4 observations, the processed observation
    has shape [4, 3].

    Note:
        - To be memory efficient, the stacked observations are wrapped by :class:`LazyFrame`.
        - The observation space must be :class:`Box` type. If one uses :class:`Dict`
          as observation space, it should apply :class:`FlattenObservation` wrapper first.
        - After :meth:`reset` is called, the frame buffer will be filled with the initial observation.
          I.e. the observation returned by :meth:`reset` will consist of `num_stack` many identical frames.

    Example:
        >>> import gymnasium as gym
        >>> from gymnasium.wrappers import FrameStack
        >>> env = gym.make("CarRacing-v2")
        >>> env = FrameStack(env, 4)
        >>> env.observation_space
        Box(0, 255, (4, 96, 96, 3), uint8)
        >>> obs, _ = env.reset()
        >>> obs.shape
        (4, 96, 96, 3)
    """

    def __init__(
        self,
        env: gym.Env,
        num_stack: int,
        lz4_compress: bool = False,
    ):
        """Observation wrapper that stacks the observations in a rolling manner.

        Args:
            env (Env): The environment to apply the wrapper
            num_stack (int): The number of frames to stack
            lz4_compress (bool): Use lz4 to compress the frames internally
        """
        gym.utils.RecordConstructorArgs.__init__(
            self, num_stack=num_stack, lz4_compress=lz4_compress
        )
        gym.ObservationWrapper.__init__(self, env)

        self.num_stack = num_stack
        self.lz4_compress = lz4_compress

        self.frames = deque(maxlen=num_stack)

        low = np.repeat(self.observation_space.low[np.newaxis, ...], num_stack, axis=0)
        high = np.repeat(
            self.observation_space.high[np.newaxis, ...], num_stack, axis=0
        )
        self.observation_space = Box(
            low=low, high=high, dtype=self.observation_space.dtype
        )

    def observation(self, observation):
        """Converts the wrappers current frames to lazy frames.

        Args:
            observation: Ignored

        Returns:
            :class:`LazyFrames` object for the wrapper's frame buffer,  :attr:`self.frames`
        """
        assert len(self.frames) == self.num_stack, (len(self.frames), self.num_stack)
        return LazyFrames(list(self.frames), self.lz4_compress)

    def step(self, action):
        """Steps through the environment, appending the observation to the frame buffer.

        Args:
            action: The action to step through the environment with

        Returns:
            Stacked observations, reward, terminated, truncated, and information from the environment
        """
        observation, reward, terminated, truncated, info = self.env.step(action)
        self.frames.append(observation)
        return self.observation(None), reward, terminated, truncated, info

    def reset(self, **kwargs):
        """Reset the environment with kwargs.

        Args:
            **kwargs: The kwargs for the environment reset

        Returns:
            The stacked observations
        """
        obs, info = self.env.reset(**kwargs)

        [self.frames.append(obs) for _ in range(self.num_stack)]

        return self.observation(None), info


BUFFER

In [2]:
import numpy as np
import torch

class ReplayBuffer():
    
    def __init__(self, max_size, input_shape, device='cpu'):
        self.mem_size = max_size
        self.mem_ctr = 0
        self.state_memory = np.zeros((self.mem_size, *input_shape), dtype=np.uint8)
        self.next_state_memory = np.zeros((self.mem_size, *input_shape), dtype=np.uint8)
        self.action_memory = np.zeros(self.mem_size, dtype=np.uint8)
        self.reward_memory = np.zeros(self.mem_size, dtype=np.float32)
        self.terminal_memory = np.zeros(self.mem_size, dtype=bool)

        self.device = device


    def can_sample(self, batch_size):
        if self.mem_ctr > (batch_size * 5):
            return True
        else:
            return False
        
    
    def store_transition(self, state, action, reward, next_state, done):
        index = self.mem_ctr % self.mem_size

        self.state_memory[index] = state
        self.next_state_memory[index] = next_state
        self.action_memory[index] = torch.tensor(action).detach().cpu()
        self.reward_memory[index] = reward
        self.terminal_memory[index] = done

        self.mem_ctr += 1
    
    def sample_buffer(self, batch_size):
        max_mem = min(self.mem_ctr, self.mem_size)
        batch = np.random.choice(max_mem, batch_size)

        states = self.state_memory[batch]
        next_states = self.next_state_memory[batch]
        actions = self.action_memory[batch]
        rewards = self.reward_memory[batch]
        dones = self.terminal_memory[batch]

        states = torch.tensor(states, dtype=torch.float32).to(self.device)
        next_states = torch.tensor(next_states, dtype=torch.float32).to(self.device)
        actions = torch.tensor(actions, dtype=torch.float32).to(self.device)
        rewards = torch.tensor(rewards, dtype=torch.float32).to(self.device)
        dones = torch.tensor(dones, dtype=torch.bool).to(self.device)

        return states, actions, rewards, next_states, dones


MODEL

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class Model(nn.Module):
    def __init__(self, action_dim, hidden_dim=512, observation_shape=(4, 84, 84)):
        super(Model, self).__init__()

        # CNN Layers
        # Original: in_channels=1, out_channels=8, kernel_size=4, stride=2
        self.conv1 = nn.Conv2d(in_channels=observation_shape[0], out_channels=32, kernel_size=8, stride=4)
        # Original: in_channels=8, out_channels=16, kernel_size=4, stride=2
        self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=4, stride=2)
        # Original: in_channels=16, out_channels=32, kernel_size=3, stride=2
        self.conv3 = nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=1)

        # Compute conv output size automatically
        conv_output_size = self.calculate_conv_output(observation_shape)
        print("conv_output_size: ", conv_output_size)

        # Fully connected layers
        # Original had 3 FC layers; now only 2 FC layers before output
        self.fc1 = nn.Linear(conv_output_size, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, hidden_dim)  # Optional: can remove this if you want exactly 2 FC

        self.output = nn.Linear(hidden_dim, action_dim)

        # Initialize weights
        self.apply(self.weights_init)

    def calculate_conv_output(self, shape):
        o = torch.zeros(1, *shape)
        o = F.relu(self.conv1(o))
        o = F.relu(self.conv2(o))
        o = F.relu(self.conv3(o))
        return int(o.numel())

    def weights_init(self, m):
        if isinstance(m, nn.Conv2d) or isinstance(m, nn.Linear):
            nn.init.kaiming_uniform_(m.weight, nonlinearity='relu')
            if m.bias is not None:
                nn.init.constant_(m.bias, 0)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        x = x.view(x.size(0), -1)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x))  # Optional if keeping 3 FC layers
        return self.output(x)


    def save_the_model(self, filename='models/latest.pt'):
        torch.save(self.state_dict(), filename)
    
    def load_the_model(self, filename='models/latest.pt'):
        try:
            self.load_state_dict(torch.load(filename))
            print(f"Loaded weights from {filename}")
        except FileNotFoundError:
            print(f"No weights file found at at {filename}")


def soft_update(target, source, tau=0.005):
    for target_param, param in zip(target.parameters(), source.parameters()):
        target_param.data.copy_(target_param.data * (1.0 - tau) + param.data * tau)


# CNN - Recognize Image
# FC layers

AGENT

In [4]:
from buffer import ReplayBuffer
from model import Model, soft_update
import torch
import torch.optim as optim
import torch.nn.functional as F
import datetime
import time
from torch.utils.tensorboard import SummaryWriter
import random
import os
import cv2
import numpy as np

class Agent():

    def __init__(self, env, hidden_layer, learning_rate, step_repeat, gamma):
        
        self.env = env

        self.step_repeat = step_repeat

        self.gamma = gamma

        obs, info = self.env.reset()

        obs = self.process_observation(obs)

        self.device = 'cuda:0' if torch.cuda.is_available() else 'cpu'

        print(f'Loaded model on device {self.device}')

        self.memory = ReplayBuffer(max_size=500000, input_shape=obs.shape, device=self.device)

        self.model = Model(action_dim=env.action_space.n, hidden_dim=hidden_layer, observation_shape=obs.shape).to(self.device)
        
        self.target_model = Model(action_dim=env.action_space.n, hidden_dim=hidden_layer, observation_shape=obs.shape).to(self.device)

        self.target_model.load_state_dict(self.model.state_dict())

        self.optimizer = optim.Adam(self.model.parameters(), lr=learning_rate)

        self.learning_rate = learning_rate


    def process_observation(self, obs):
        obs = torch.tensor(obs, dtype=torch.float32).permute(2, 0, 1)
        return obs


    def test(self):

        self.model.load_the_model()

        obs, info = self.env.reset()

        done = False
        obs, info = self.env.reset()
        obs = self.process_observation(obs)

        episode_reward = 0

        while not done:

            if random.random() < 0.05:
                action = self.env.action_space.sample()
            else:
                q_values = self.model.forward(obs.unsqueeze(0).to(self.device))[0]
                action = torch.argmax(q_values, dim=-1).item()
            
            reward = 0

            for i in range(self.step_repeat):
                reward_temp = 0

                next_obs, reward_temp, done, truncated, info = self.env.step(action=action)

                reward += reward_temp

                # frame = self.env.env.env.render() 

                # resized_frame = cv2.resize(frame, (500, 400))

                # resized_frame = cv2.cvtColor(resized_frame, cv2.COLOR_RGB2BGR)

                # cv2.imshow("Pong AI", resized_frame)

                # if cv2.waitKey(1) & 0xFF == ord('q'):
                #     break

                # # time.sleep(0.05) #adjust speed to control playback speed

                if(done):
                    break
            
            obs = self.process_observation(next_obs)

            episode_reward += reward
    

    def train(self, episodes, max_episode_steps, summary_writer_suffix, batch_size, epsilon, epsilon_decay, min_epsilon):

            # Save TensorBoard logs in the same directory as the script
            script_dir = os.path.dirname(os.path.abspath(__file__))
            summary_writer_name = os.path.join(
                script_dir, 'runs',
                f'{datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")}_{summary_writer_suffix}'
            )
            writer = SummaryWriter(summary_writer_name)

            if not os.path.exists('models'):
                os.makedirs('models')

            total_steps = 0
            reward_history, loss_history, qvalue_history = [], [], []

            print(f"TensorBoard logs → {summary_writer_name}")

            for episode in range(episodes):
                done = False
                episode_reward = 0
                obs, info = self.env.reset()
                obs = self.process_observation(obs)
                episode_steps = 0
                episode_losses, episode_qvalues, td_errors, action_counts = [], [], [], []

                episode_start_time = time.time()

                while not done and episode_steps < max_episode_steps:
                    # ε-greedy action selection
                    if random.random() < epsilon:
                        action = self.env.action_space.sample()
                    else:
                        q_values = self.model.forward(obs.unsqueeze(0).to(self.device))[0]
                        action = torch.argmax(q_values, dim=-1).item()
                        episode_qvalues.append(q_values.mean().item())

                    action_counts.append(action)

                    reward = 0
                    for _ in range(self.step_repeat):
                        next_obs, reward_temp, done, truncated, info = self.env.step(action=action)
                        reward += reward_temp
                        if done:
                            break

                    next_obs = self.process_observation(next_obs)
                    self.memory.store_transition(obs, action, reward, next_obs, done)
                    obs = next_obs

                    episode_reward += reward
                    episode_steps += 1
                    total_steps += 1

                    # Training updates
                    if self.memory.can_sample(batch_size):
                        observations, actions, rewards, next_observations, dones = self.memory.sample_buffer(batch_size)
                        dones = dones.unsqueeze(1).float()

                        # Q(s,a)
                        q_values = self.model(observations)
                        actions = actions.unsqueeze(1).long()
                        qsa_batch = q_values.gather(1, actions)

                        # Q-target(s’, a’)
                        next_actions = torch.argmax(self.model(next_observations), dim=1, keepdim=True)
                        next_q_values = self.target_model(next_observations).gather(1, next_actions)
                        target_b = rewards.unsqueeze(1) + (1 - dones) * self.gamma * next_q_values

                        # TD error & loss
                        td_error = (target_b.detach() - qsa_batch)
                        loss = F.mse_loss(qsa_batch, target_b.detach())

                        episode_losses.append(loss.item())
                        td_errors.extend(td_error.abs().detach().cpu().numpy().flatten().tolist())


                        # Optimization step
                        self.model.zero_grad()
                        loss.backward()
                        self.optimizer.step()

                        if episode_steps % 4 == 0:
                            soft_update(self.target_model, self.model)

                        # Log per-step metrics
                        writer.add_scalar("Loss/step", loss.item(), total_steps)
                        writer.add_scalar("QValue/mean", q_values.mean().item(), total_steps)
                        writer.add_scalar("QValue/max", q_values.max().item(), total_steps)
                        writer.add_scalar("TD_Error/mean", td_error.abs().mean().item(), total_steps)

                # Model checkpoint
                self.model.save_the_model()

                # Episode metrics
                episode_time = time.time() - episode_start_time
                avg_loss = np.mean(episode_losses) if episode_losses else 0
                avg_qvalue = np.mean(episode_qvalues) if episode_qvalues else 0
                avg_td_error = np.mean(td_errors) if td_errors else 0
                fps = episode_steps / episode_time if episode_time > 0 else 0
                avg_reward = np.mean(reward_history[-10:]) if reward_history else 0

                reward_history.append(episode_reward)
                loss_history.append(avg_loss)
                qvalue_history.append(avg_qvalue)

                # TensorBoard logs (episode level)
                writer.add_scalar('Score/Episode', episode_reward, episode)
                writer.add_scalar('Score/Avg10', avg_reward, episode)
                writer.add_scalar('Loss/Episode', avg_loss, episode)
                writer.add_scalar('QValue/Avg', avg_qvalue, episode)
                writer.add_scalar('TD_Error/EpisodeMean', avg_td_error, episode)
                writer.add_scalar('Epsilon', epsilon, episode)
                writer.add_scalar('Performance/FPS', fps, episode)
                writer.add_scalar('Performance/EpisodeTime', episode_time, episode)

                # Action distribution histogram
                writer.add_histogram('Action/Distribution', np.array(action_counts), episode)

                # Console output
                print(f"\nEpisode {episode + 1}/{episodes}")
                print(f"  Reward: {episode_reward:.2f}  |  Avg(10): {avg_reward:.2f}")
                print(f"  Avg Loss: {avg_loss:.6f}  |  Avg Q: {avg_qvalue:.3f}  |  TD Error: {avg_td_error:.4f}")
                print(f"  Steps: {episode_steps}  |  FPS: {fps:.2f}  |  Time: {episode_time:.2f}s")
                print(f"  Epsilon: {epsilon:.4f}")

                # Epsilon decay
                if epsilon > min_epsilon:
                    epsilon *= epsilon_decay

            writer.close()
            print("\n✅ Training complete! You can now view metrics with:")
            print(f"   tensorboard --logdir {os.path.join(script_dir, 'runs')}")

TRAIN

In [None]:
from agent import Agent
import gymnasium as gym
from gymnasium.wrappers import TransformObservation, ResizeObservation, GrayscaleObservation

import ale_py
import cv2

def to_grayscale(obs):
    return cv2.cvtColor(obs, cv2.COLOR_RGB2GRAY)

episodes = 2500
max_episode_steps = 5000
hidden_layer = 128
learning_rate = 0.0001
step_repeat = 4
gamma = 0.99
batch_size = 32
epsilon = 1
min_epsilon = 0.1
epsilon_decay = 0.995



env = gym.make("ALE/Pong-v5", render_mode="rgb_array")

env = ResizeObservation(env, (64, 64))

env = GrayscaleObservation(env,  keep_dim=True)

agent = Agent(env, hidden_layer=hidden_layer,
              learning_rate=learning_rate, step_repeat=step_repeat,
              gamma=gamma)

summary_writer_suffix = f'dqn_lr={learning_rate}_hl={hidden_layer}_bs={batch_size}'

agent.train(episodes=episodes,
            max_episode_steps=max_episode_steps,
            summary_writer_suffix=summary_writer_suffix,
            batch_size=batch_size,
            epsilon=epsilon,
            epsilon_decay=epsilon_decay,
            min_epsilon=min_epsilon)

TEST

In [None]:
# from agent import Agent
# import gymnasium as gym
# from gymnasium.wrappers import GrayscaleObservation, ResizeObservation
# from gymnasium.wrappers import TransformObservation, ResizeObservation

# import ale_py
# import cv2  

# def to_grayscale(obs):
#     return cv2.cvtColor(obs, cv2.COLOR_RGB2GRAY)


# episodes = 10000
# max_episode_steps = 10000
# hidden_layer = 128
# learning_rate = 0.0001
# step_repeat = 4
# gamma = 0.99
# batch_size = 64
# epsilon = 1
# min_epsilon = 0.1
# epsilon_decay = 0.995


# env = gym.make("ALE/Pong-v5", render_mode="rgb_array")

# env = ResizeObservation(env, (64, 64))

# env = GrayscaleObservation(env,  keep_dim=True)

# # env = TransformObservation(env,  f=to_grayscale)

# agent = Agent(env, hidden_layer=hidden_layer,
#               learning_rate=learning_rate, step_repeat=step_repeat,
#               gamma=gamma)

# agent.test()