### **Import necessary libraries**

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
from collections import deque, namedtuple
from assignment3_utils import process_frame
import warnings
import gymnasium as gym
from gymnasium.wrappers import FrameStack
from torch.utils.tensorboard import SummaryWriter
# Suppress Warnings
warnings.filterwarnings("ignore")

### **DQN Architecture**

In [2]:
class DQN(nn.Module):
    def __init__(self, input_shape, num_actions):
        """
        Initialize the DQN model.

        Parameters:
        - input_shape: Tuple representing the shape of the input (channels:4, height:84, width:80).
        - num_actions: Number of possible actions the agent can take.
        """
        super(DQN, self).__init__()
        # Define convolutional layers
        self.conv1 = nn.Conv2d(input_shape[0], 32, kernel_size=8, stride=4)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=4, stride=2)
        self.conv3 = nn.Conv2d(64, 64, kernel_size=3, stride=1)
        
        # Define fully connected layers
        self.fc1 = nn.Linear(self._calculate_conv_output_size(input_shape), 512)
        self.fc2 = nn.Linear(512, num_actions)

    def _calculate_conv_output_size(self, input_shape):
        """
        Calculate the output size of the convolutional layers.

        Parameters:
        - input_shape: Tuple representing the shape of the input.

        Returns:
        - The size of the output after the convolutional layers.
        """
        with torch.no_grad():
            return self.conv3(self.conv2(self.conv1(torch.zeros(1, *input_shape)))).view(1, -1).size(1)

    def forward(self, x):
        """
        Forward pass through the network.

        Parameters:
        - x: Input tensor.

        Returns:
        - Output tensor after passing through the network.
        """
        # Pass input through convolutional layers with ReLU activation
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        
        # Flatten the tensor for the fully connected layers
        x = x.view(x.size(0), -1)
        
        # Pass through fully connected layers
        x = F.relu(self.fc1(x))
        return self.fc2(x)


### **Replay memory class**

In [3]:
class ExperienceMemory:
    def __init__(self, capacity):
        """
        Initialize the replay memory.

        Parameters:
        - capacity: Maximum number of experiences to store in the buffer.
        """
        self.buffer = deque(maxlen=capacity)

    def __len__(self):
        """
        Return the current size of the buffer.
        """
        return len(self.buffer)

    def append(self, experience):
        """
        Add a new experience to the buffer.

        Parameters:
        - experience: A tuple containing (state, action, reward, done, next_state).
        """
        self.buffer.append(experience)

    def sample(self, batch_size):
        """
        Sample a batch of experiences from the buffer.

        Parameters:
        - batch_size: Number of experiences to sample.

        Returns:
        - Tuple of arrays: (states, actions, rewards, dones, next_states).
        """
        batch_size = min(batch_size, len(self.buffer))  # Ensure batch size is not larger than buffer size
        indices = np.random.choice(len(self.buffer), batch_size, replace=False)  # Randomly select indices
        states, actions, rewards, dones, next_states = zip(*[self.buffer[idx] for idx in indices])  # Extract experiences
        
        # Convert to numpy arrays for easier manipulation
        return (
            np.array(states),
            np.array(actions, dtype=np.int64),
            np.array(rewards, dtype=np.float32),
            np.array(dones, dtype=np.uint8),
            np.array(next_states, dtype=np.float32)
        )


### **Agent class**

In [4]:
class Agent:
    def __init__(self, env, start_epsilon, batch_size, gamma, root_dir, train_mode=False):
        """
        Initialize the Agent.

        Parameters:
        - env: The environment object.
        - start_epsilon: Initial epsilon value for epsilon-greedy policy.
        - batch_size: Number of samples per batch for training.
        - gamma: Discount factor for future rewards.
        - root_dir: path to model directory to save checkpoint
        - train_mode: Boolean indicating whether the agent is in training mode.
        """
        # Set device for PyTorch
        self.device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
        self.env = env
        self.gamma = gamma
        self.epsilon = start_epsilon
        self.batch_size = batch_size
        self.root_dir = root_dir
        self.replay_memory = ExperienceMemory(10000)
        self.model = DQN((4, 84, 80), env.action_space.n).to(self.device)
        self.target_model = DQN((4, 84, 80), env.action_space.n).to(self.device)
        self.train_mode = train_mode
        self.episode = 0
        self.learns = 0
        self.optimizer = optim.Adam(self.model.parameters(), lr=0.0001)
        self.reset()
        self.replay = namedtuple('Replay', field_names=['state', 'action', 'reward', 'done', 'next_state']) # Define named tuple for replay memory

    def reset(self):
        """
        Reset the environment and initialize state, steps, and total reward.
        """
        self.state = process_frame(self.env.reset(seed=0)[0])
        self.steps = 0
        self.total_reward = 0

    def act(self):
        """
        Choose an action based on the current state.

        Returns:
        - action: The chosen action.
        """
        if self.train_mode and np.random.random() < self.epsilon:
            return self.env.action_space.sample()
        state_tensor = torch.tensor(np.array(self.state), dtype=torch.float32).to(self.device)
        return np.argmax(self.model(state_tensor).cpu().detach().numpy())

    def train(self):
        """
        Perform one step of interaction with the environment and store the experience.

        Returns:
        - done: Boolean indicating if the episode is finished.
        - episode_reward: Total reward for the episode if done, else None.
        """
        action = self.act()
        next_state, reward, done, _, _ = self.env.step(action)
        next_state = process_frame(next_state)
        self.replay_memory.append(self.replay(np.squeeze(self.state, axis=0), action, reward, done, np.squeeze(next_state, axis=0)))
        self.state = next_state
        self.steps += 1
        self.total_reward += reward

        if done:
            episode_reward = self.total_reward
            print(f"Steps: {self.steps}, Score: {episode_reward}")
            self.episode += 1
            self.reset()
            return True, episode_reward

        return False, None

    def update_weights(self):
        """
        Update the weights of the model based on experiences sampled from the replay memory.
        """
        states, actions, rewards, dones, next_states = self.replay_memory.sample(self.batch_size)

        states_t = torch.tensor(states, dtype=torch.float32).to(self.device)
        next_states_t = torch.tensor(next_states, dtype=torch.float32).to(self.device)
        actions_t = torch.tensor(actions).to(self.device)
        rewards_t = torch.tensor(rewards).to(self.device)
        done_mask = torch.BoolTensor(dones).to(self.device)

        current_q_values = self.model(states_t).gather(1, actions_t.unsqueeze(-1)).squeeze(-1)
        next_q_values = self.target_model(next_states_t).max(1)[0]
        next_q_values[done_mask] = 0.0
        next_q_values = next_q_values.detach()

        expected_q_values = rewards_t + self.gamma * next_q_values
        loss = F.mse_loss(current_q_values, expected_q_values)

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        self.learns += 1

        if self.learns % 1000 == 0:
            self.target_model.load_state_dict(self.model.state_dict())
            print(f"Episode {self.episode}: Target model weights updated")

    def demo(self, model_path):
        """
        Demonstrate the agent's performance using a pre-trained model.

        Parameters:
        - model_path: Path to the pre-trained model.
        """
        self.load(model_path)
        self.model.eval()

        self.reset()
        done = False
        steps = 0
        episode_reward = 0

        while not done:
            action = self.act()
            next_state, reward, done, _, _ = self.env.step(action)
            self.state = process_frame(next_state)
            episode_reward += reward
            steps += 1

        print(f"Steps: {steps}, Reward: {episode_reward:.2f}")

    def save(self, path):
        """
        Save the current model's state dictionary to a file.

        Parameters:
        - path: Path to save the model.
        """
        torch.save(self.model.state_dict(), path)

    def load(self, path):
        """
        Load a model's state dictionary from a file.

        Parameters:
        - path: Path to load the model from.
        """
        self.model.load_state_dict(torch.load(path))

    def save_checkpoint(self, episode):
        """
        Save a checkpoint of the current training state.

        Parameters:
        - episode: Current episode number.
        """
        checkpoint = {
            'episode': episode,
            'model_state_dict': self.model.state_dict(),
            'optimizer_state_dict': self.optimizer.state_dict(),
            'epsilon': self.epsilon
        }
        torch.save(checkpoint, '{}/checkpoint.pth'.format(self.root_dir))
        print(f'Checkpoint saved at episode {episode}')

    def load_checkpoint(self, filename):
        """
        Load a checkpoint of the training state.

        Parameters:
        - filename: Path to the checkpoint file.

        Returns:
        - episode: Episode number from the checkpoint.
        """
        checkpoint = torch.load(filename)
        self.model.load_state_dict(checkpoint['model_state_dict'])
        self.optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        self.epsilon = checkpoint['epsilon']
        return checkpoint['episode']


### **Training Agent**

In [5]:
def train_agent(params):
    """
    Train the agent using the provided parameters.
    
    Args:
    params (dict): A dictionary containing training parameters.
    """
    
    # Environment setup
    env = FrameStack(gym.make('PongDeterministic-v4', render_mode='rgb_array'), num_stack=4)

    # Training setup
    writer = SummaryWriter()
    agent = Agent(env, 
                  params["EPSILON_INIT"],
                  params["BATCH_SIZE"],
                  params["GAMMA"], 
                  params["ROOT_DIR"],
                  params["TRAINING_MODE"])
    episode_rewards = []
    ave_rewards = []

    for episode in range(1, params["EPISODES"]):
        terminate = False
        while not terminate:
            # Decay epsilon for exploration-exploitation trade-off
            agent.epsilon = max(agent.epsilon * params["EPSILON_DECAY"], params["EPSILON_MIN"])
            
            # Perform one step of training
            terminate, reward = agent.train()
            
            if terminate:
                # Record and log episode results
                episode_rewards.append(reward)
                mean_reward = round(np.mean(episode_rewards[-params["AVG_LAST"]:]), 3)
                ave_rewards.append(mean_reward)
                writer.add_scalar('Reward/Episode', reward, episode)
                writer.add_scalar('Average_Cumulative_Reward/Last_5_Episodes', mean_reward, episode)
                print(f"Episode {episode}, Average reward of the last {params['AVG_LAST']} episodes: {mean_reward}")

            # Update model weights if enough experiences are collected
            if len(agent.replay_memory) >= params["UPDATE_RATE"]:
                agent.update_weights()

        # Save checkpoint every 10 episodes
        if episode % 10 == 0:
            agent.save_checkpoint(episode)

    # Finalize training
    writer.flush()
    model_path = f'{params["ROOT_DIR"]}/pongdeterministic_v4_batch{params["BATCH_SIZE"]}_rate{params["UPDATE_RATE"]}_ep{params["EPISODES"]}.pth'
    agent.save(model_path)
    env.close()

### **Demonstrate Agent**

In [6]:
def demo_agent(params):
    """
    Demonstrate the agent's performance using the trained model.
    
    Args:
    params (dict): A dictionary containing demo parameters.
    """
    
    # Environment setup
    env = FrameStack(gym.make('PongDeterministic-v4', render_mode='human'), num_stack=4)
    agent = Agent(env,
                  params["EPSILON_INIT"],
                  params["BATCH_SIZE"],
                  params["GAMMA"],
                  params["ROOT_DIR"],
                  params["TRAINING_MODE"])
    
    # Set rendering frames per second
    env.metadata['render_fps'] = params["RENDER_FPS"]
    
    # Load and demonstrate the trained model
    model_path = f'{params["ROOT_DIR"]}/pongdeterministic_v4_batch{params["BATCH_SIZE"]}_rate{params["UPDATE_RATE"]}_ep{params["EPISODES"]}.pth'
    agent.demo(model_path)
    
    env.close()

### **Parameters**

In [8]:
params = {
    "BATCH_SIZE": 8, # (8 default or 16)
    "UPDATE_RATE": 10, # (3 or 10 default)
    "TRAINING_MODE": None, # (True for training and False to show demonstrate agent winning pong)
    "EPSILON_MIN": 0.05,
    "EPSILON_DECAY": 0.995,
    "GAMMA": 0.95,
    "EPISODES": 1000,
    "EPSILON_INIT": 1.0,
    "ROOT_DIR": None,
    "RENDER_FPS": 60,
    "AVG_LAST": 5,
}

### **Train agent with batch size with default parameters**

In [10]:
params["BATCH_SIZE"] = 8
params["UPDATE_RATE"] = 10
params["TRAINING_MODE"] = True
params["ROOT_DIR"] = "model"

train_agent(params)

### **Demonstrate agent**

In [1]:
params["TRAINING_MODE"] = False
params["ROOT_DIR"] = "model"

demo_agent(params)