#  **ICT303 - Assignment 2**

**Your name: Caleb Goss**

**Student ID: 34090158**

**Email: cjmeepers@gmail.com**

In this assignment, you will build and train a deep learning model for solving a problem of your choice.


You are required to:
- Think of a practical problem that you would like to solve. The problem can be related, but not limted to, object detection and recognition from images, text analysis, speech analysis, image unpainting, converting images to artistic painting, action recognition (from images or videos), image to text (i.e., generating textual description for images or videos), or text to image (generating images from text) etc.,
- Find an appropriate data set to train and test the model you will develop. Note that the dataset should contain enough data (with groundtruth labels) so that when used for training, the model can generalize well to unseen data.
- Design a neural network that will solve the problem
- Train the neural network on your training data and then evaluate its performance on test data
- Analyze the performance of the network you developed and discuss its limitations.

**What to submit:**
- A colab notebook that describes:
 - The problem you would like to solve **[10 Marks]**
 - The dataset that you will use to train and test the deep learning model that you will develop **[10 marks]**
 - A diagram that describes the architecture of the neural network that you developed **[10 marks]**
 - Performance curves - this is includes the loss curves as well as the accuracy **[10 marks]**
 - A discussion, analysis and justification of the different choices you made and their effect on the performance **[15 marks]**
 - A discussion, analysis of the limitations of your method. You can also show failure cases and try to understand why did it fail on these cases **[15 marks]**

- Source code that runs - this includes both code for training and testing **[30 marks]**

You also need to submit the dataset you used for training and testing, or alternatively provide the code that downloads the data.

Make sure you reference all sources from which you took information.

You are allowed to use existing neural networks (not required to implement them from scratch). But, you must customize the architecture to the problem you want to solve.


## **Training an AI to Master 2048: A Deep Q-Learning Approach**

### **Problem Description [10 marks]**


- **Problem:** 2048 is a sliding tile puzzle game played on a 4x4 grid.

- **How to Play:** Combine matching number tiles by sliding them in four possible directions (up, down, left, right).

- **Objective:** Reach the 2048 tile.

##### *Why 2048 is an Interesting AI Problem*

The game seems simple at first, but can prove challenging to reach the 2048 tile for several reasons.

1. **Complex Decision Making**
    - Each move can affect possibilities many moves later
    - What looks like a good move now might lead to game-ending situations
    - Players need to maintain organized patterns while adapting to random elements

2. **Challenging Learning Environment**
    - The 4x4 grid has 4^16 possible states
    - Random tile spawns make planning difficult
    - Success requires both short-term tactics and a long-term strategy
    - Mistakes early in the game might only become apparent much later

##### Why Deep Q-Learning?

After considering various approaches, I chose Deep Q-Learning (DQN) for several reasons.

1. **Natural Fit for the Problem**
    - Can handle the continuous state space of the board
    - Works well with discrete actions (up, down, left, right)
    - Can learn complex patterns through experience

2. **Learning from Experience**
    - Similar to how humans learn the game
    - Improves through trial and error
    - Can discover strategies we might not think of

3. **Balanced Learning Approach**
    - Explores new strategies while exploiting learned patterns
    - Can handle delayed rewards (important for 2048)
    - Provides clear feedback through Q-values

### **Dataset/Environment [10 marks]**

Unlike traditional machine learning problems, where we start with a dataset, reinforcement learning creates its dataset through an agent's interactions with its environment.

#### *State Representation: Making the Game Board Learnable*

One of the most crucial decisions was how to represent the game board to the AI.

```python
def get_state(self) -> np.ndarray:
    state = self._board().flatten()
    return np.log2(np.where(state > 0, state, 1)).astype(float)
```

*This code does the following to our state representation:*

1. **Flattening the Board**
    - Converts the 4x4 grid into a 16-element vector
    - Preserves all information while simplifying input
    - Makes it easier for the neural network to process

2. **Log2 Transformation**
    - Handles the exponential growth of tile values
    - 2048 becomes 11 instead of 2048 (2^11 = 2048)
    - Makes learning more stable by reducing value ranges

#### *Reward Design: Teaching the AI What Matters*

The reward system is crucial - it's how we tell the AI what "good" and "bad" moves look like.

```python
if valid_move:
    # Merge reward, use log2 to prevent reward explosion
    move_reward = np.log2(move_score) if move_score > 0 else 0

    # Reward for achieving new highest tile
    new_tile_reward = (np.log2(max_tile) - np.log2(prev_max_tile)
                        if max_tile > prev_max_tile else 0)

    # Add a small reward for empty spaces
    empty_spaces_reward = 0.1 * np.sum(self._board() == 0)

    # Reward is the sum of all previous rewards plus a small bonus for each step
    reward = move_reward + new_tile_reward + empty_spaces_reward + 0.1
else:
    # Penalize invalid moves
    reward = -2
```

*Each component serves a specific purpose:*

1. **Move Reward**
    - Encourages combining tiles
    - Logarithmic scaling prevents reward explosion
    - Immediate feedback for good moves

2. **New Tile Reward**
    - Bonus for reaching new tile values
    - Encourages progression towards 2048

3. **Empty Spaces Reward**
    - Promotes board management
    - Keeps options open for future moves

4. **Invalid Move Penalty**
    - Discourages trying impossible moves
    - Helps learn board boundaries
    - Encourages efficient play

### **Network Architecture [10 marks]**

The DQN network is what powers the AI decision-making process.

#### *Core Architecture*

```python
class DQN(nn.Module):
    def __init__(self):
        super().__init__()

        self.network = nn.Sequential(
            # First hidden layer
            nn.Linear(16, 128),
            nn.LayerNorm(128),
            nn.ReLU(),
            nn.Dropout(),

            # Second hidden layer
            nn.Linear(128, 128),
            nn.LayerNorm(128),
            nn.ReLU(),
            nn.Dropout(),

            # Output layer
            nn.Linear(128, 4)
        )
```

*Let's break down why each component matters:*

##### *Network Components*

1. **Input Layer (16 neurons)** 
    - Matches the flattened board size (4x4)
    - Each neuron represents one tile position
    - Log2-transformed values keep input ranges manageable

2. **Hidden Layers (128 neurons each)** 
    - Two layers provide depth without complexity
    - 128 neurons offer enough capacity for pattern recognition
    - Consistent size maintains information flow

3. **Layer Normalization**
    - Stabalizes learning across different board states
    - Helps handle varying tile value ranges
    - Better than BatchNorm for RL cases (handles individual states better)

4. **ReLU Activation**
    - Fast, simple, and effective
    - Helps learn non-linear patterns
    - Prevents vanishing gradient problems

5. **Dropout**
    - Randomly deactivates neurons during training
    - Forces network to learn robust features
    - Prevents over-reliance on any single path
    - Adds noise and prevents co-adaptation (overfitting)

5. **Output Layer (4 neurons)** 
    - One Q-value for each possible move
    - No activation (raw Q-values)
    - Directly interpretable as action values

### **Performance Curves [10 marks]**

### **Design Decisions & Performance Impacts [15 marks]**

#### *Network Architecture*

- LayerNorm instead of BatchNorm
    - More stable training in reinforcement learning context
    - Better handles varying state distributions
- Dropouts for regularization
    - Prevents overfitting to specific board patterns

#### *Hyperparameters*

##### *Exploration Strategy*

- **Epsilon Start:** *0.1*
    - 10% initial exploration balances learning with competence

- **Epsilon End:** *0.001*
    - 0.1% final exploration maintains adaptability

- **Epsilon Decay:** *0.993*
    - Decay rate carefully calculated for 1000 episodes

##### *Memory & Batch Size*

- **Hidden Layers:** *[128, 128]*
    - Small enough to train quickly but deep enough to learn patterns
    - Two layers allow for hierarchical feature learning
    - 128 neurons provide enough capacity for 2048's state space
    - Same size layers maintain consistent representation power
- **Batch Size:** *128*
    - Large enough for stable gradient updates
    - Small enough for efficient computation
    - Common power of 2 size of GPU optimization
- **Buffer Size:** *100,000*
    - Games can be long (100s of moves)
    - Need enough diversity in experience
    - Stores roughly 500-1000 full games
    - Prevents overfitting to recent experience
- **Train Episodes:** *1000*
    - Enough episodes to learn basic strategies
    - Allows for exploration decay
    - Can see clear learning trends
    - Reasonable training time (10 - 20 minutes)
- **Evaluation Episodes:** *20*
    - Enough episodes to account for randomness
    - Quick enough for frequent evaluation
    - Good balance of accuracy vs time
- **Evaluation Frequency:** *50*
    - Regular checkpoints of progress
    - Not too computationally expensive
    - Enough episodes to see trends
    - Good interval for saving models
- **Update Frequency:** *5*
    - Test

### **Discussion & Analysis of Limitations**

#### 1. Algorithm Limitations 

- DQN struggles with long-term planning
- Difficulty in connecting early moves to the final outcome
- Challenge of exploration in large state space

#### 2. Performance Bottlenecks

- Training efficiency
- Exploration vs exploitation balance
- Reward sparsity

### **Source Code [30 marks]**

#### Add Imports

In [None]:
%pip install tensorboardX

In [None]:
from typing import Any
from tensorboardX import SummaryWriter
from torch import nn

import torch
import random
import numpy as np

#### AI Source Code

##### Adjust Hyperparameters as necessary

In [None]:
HYPERPARAMETERS: dict = {
    "BATCH_SIZE": 128,
    "EPS_START": 0.05,
    "EPS_END": 1e-5,
    "EPS_DECAY": 0.9999,
    "LR": 5e-5,
    "HIDDEN_LAYERS": [128, 128]
}

##### Add DQN Model

In [None]:
class DQN(nn.Module):
    def __init__(self, input_size: int, output_size: int, hidden_layers: list[int]):
        super(DQN, self).__init__()
        
        # Board size is 4x4 
        
        # Possible tile values (0, 2, 4, 8, 
        #                       16, 32, 64, 128, 256, 512,
        #                       1024, 2048, 4096, 8192, 
        #                       16384, 32768)
        
        # Log2 (2^x) = (0, (0), 1 (2), 2 (4), 3 (8), 4 (16), 5 (32), 6 (64)
        #               7 (128), 8 (256), 9 (512), 
        #               10 (1024), 11 (2048), 12 (4096), 
        #               13 (8192), 14 (16384), 15 (32768))

        self.q_net = self.create_network(input_size, output_size, hidden_layers)

    def create_network(
        self, input_size: int, output_size: int, hidden_layers: list[int]
    ):
        layers = []
        in_features = input_size
        
        for hidden_size in hidden_layers:
            layers.append(nn.Linear(in_features, hidden_size))
            layers.append(nn.LayerNorm(hidden_size))
            layers.append(nn.ReLU())
            layers.append(nn.Dropout(0.3))

            in_features = hidden_size

        # Output layer
        layers.append(nn.Linear(in_features, output_size))

        return nn.Sequential(*layers)

    def forward(self, X):
        return self.q_net(X)

##### Add Replay Memory Buffer

In [None]:
from collections import namedtuple, deque

Transition = namedtuple(
    "Transition", ("state", "action", "next_state", "reward", "done")
)


class ReplayMemory:
    def __init__(self, capacity: int):
        self.capacity = capacity
        self.memory = deque([], maxlen=self.capacity)

    def push(self, *args):
        self.memory.append(Transition(*args))

    def sample(self, batch_size: int):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

##### Add DQN Agent

In [None]:
from torch.optim import Adam

Action = int

BATCH_SIZE = HYPERPARAMETERS.get("BATCH_SIZE", 128)
MEMORY_CAPACITY = HYPERPARAMETERS.get("BUFFER_SIZE", 100_000)
GAMMA = HYPERPARAMETERS.get("GAMMA", 0.99)
LEARNING_RATE = HYPERPARAMETERS.get("LEARNING_RATE", 1e-4)
HIDDEN_LAYERS = HYPERPARAMETERS.get("HIDDEN_LAYERS", [128, 128])

class DQNAgent:
    def __init__(
        self,
        size: int,
        device=torch.device("cuda" if torch.cuda.is_available() else "cpu"),
    ):
        self.state_size = size * size
        self.action_size = 4
        
        self.device = device
        
        # Initialize replay memory and hyperparameters
        self.memory = ReplayMemory(MEMORY_CAPACITY)
        self.batch_size = BATCH_SIZE 
        self.gamma = GAMMA # Discount factor
        self.lr = LEARNING_RATE
        
        # Initialize policy and target networks
        self.policy_net = DQN(self.state_size, self.action_size, HIDDEN_LAYERS).to(device)
        self.target_net = DQN(self.state_size, self.action_size, HIDDEN_LAYERS).to(device)
        
        # Copy policy network weights to target network
        self.target_net.load_state_dict(self.policy_net.state_dict())
        self.target_net.eval()
        
        # Initialize optimizer and loss function
        self.optimizer = Adam(self.policy_net.parameters(), lr=self.lr, weight_decay=1e-9)
        self.criterion = nn.MSELoss() # Huber loss
        
    def act(self, state: np.ndarray, epsilon: float, valid_actions: list[int]):
        """
        Select an action using epsilon-greedy policy
        """
        if random.random() < epsilon:
            return random.choice(valid_actions)
        
        state = torch.from_numpy(state).float().unsqueeze(0).to(self.device)
        
        with torch.no_grad():
            q_values = self.policy_net(state)
         
        # Filter Q-values for only valid actions and select best action
        valid_q_values = q_values[0][valid_actions]
        best_action_idx = valid_q_values.argmax().item()
        best_action = valid_actions[best_action_idx]
    
        return best_action
            
    def optimize_model(self) -> float:
        """
        Perform one step of optimization on the DQN
        Returns the loss value
        """
        if len(self.memory) < self.batch_size:
            return 0.0
        
        # Sample and prepare batch from memory buffer
        transitions = self.memory.sample(self.batch_size)
        batch = Transition(*zip(*transitions))
        
        # Create tensors for 
        state_batch = torch.from_numpy(np.stack(batch.state)).float().to(self.device)
        action_batch = torch.tensor(batch.action).long().to(self.device)
        reward_batch = torch.tensor(batch.reward).float().to(self.device)
        next_state_batch = torch.from_numpy(np.stack(batch.next_state)).float().to(self.device)
        done_batch = torch.tensor(batch.done).float().to(self.device)
        
        # Compute Q(s,a)
        q_values = self.policy_net(state_batch).gather(1, action_batch.unsqueeze(1))
        
        # Compute V(s') for all next states
        with torch.no_grad():
            next_q_values = self.target_net(next_state_batch).max(1)[0]
        
        # Compute expected Q values
        expected_q_values = (next_q_values * self.gamma * (1 - done_batch)) + reward_batch
        
        # Reset gradients
        self.optimizer.zero_grad()
            
        # Compute loss and optimize
        loss = self.criterion(q_values, expected_q_values.unsqueeze(1))
        loss.backward()        
        self.optimizer.step()
        
        return loss.item()
    
    def update_target_network(self):
        """Update the target network with the policy network's weights"""
        self.target_net.load_state_dict(self.policy_net.state_dict())
        
    def train(self):
        self.policy_net.train()

##### Add Training Module

In [None]:
from tqdm import tqdm
from collections import defaultdict

%load_ext tensorboard

MAX_EPISODES = 1000
UPDATE_FREQUENCY = HYPERPARAMETERS.get("UPDATE_FREQ", 10)
EPSILON_START = HYPERPARAMETERS.get("EPS_START", 0.9)
EPSILON_END = HYPERPARAMETERS.get("EPS_END", 0.05)
EPSILON_DECAY = HYPERPARAMETERS.get("EPS_DECAY", 0.99)

class Trainer:
    def __init__(self, env, agent: DQNAgent, 
                 writer: SummaryWriter | None):
        self.env = env
        self.agent = agent
        self.writer = writer
        
        # self.scheduler = optim.lr_scheduler.ReduceLROnPlateau(self.agent.optimizer)
        
        self.max_episodes = MAX_EPISODES
        
        # Update every 100 steps
        self.update_frequency = UPDATE_FREQUENCY
        
        self.epsilon_start = EPSILON_START
        self.epsilon_end = EPSILON_END
        self.epsilon_decay = EPSILON_DECAY
        
        self.epsilon = self.epsilon_start

    def train(self):
        tile_frequencies = defaultdict(int)
        
        progress_bar = tqdm(range(self.max_episodes), desc="Training")
        
        for episode in progress_bar:
            state = self.env.reset()
            
            episode_loss = 0
            episode_steps = 0
            episode_reward = 0
            done = False
            
            self.agent.train()
            
            while not done:
                action = self.agent.act(state, self.epsilon, self.env.get_valid_actions())
                next_state, reward, done = self.env.step(action)
                
                self.agent.memory.push(state, action, next_state, reward, done)
                
                # Move to the next state
                state = next_state
                
                # Perform one step of the optimization (on the policy network)
                episode_loss += self.agent.optimize_model()
        
                episode_steps += 1
                episode_reward += reward
                    
            episode_loss /= episode_steps
            
            # Update progress bar
            progress_bar.set_postfix({"reward": f"{episode_reward:.2f}"})
            
            score = self.env.get_score()
            max_tile = self.env.get_max_tile()
                
            tile_frequencies[max_tile] += 1
                
            # Log metrics to TensorBoard
            self.writer.add_scalar("Reward/Train", episode_reward, episode)
            self.writer.add_scalar("Score/Train", score, episode)
            self.writer.add_scalar("Score/Tile/Train", max_tile, episode)
            self.writer.add_scalar("Steps/Train", episode_steps, episode)
            self.writer.add_scalar("Loss/Train", episode_loss, episode)
            self.writer.add_scalar("Epsilon", self.epsilon, episode)
            
            # Log tile frequencies
            for tile in [512, 1024, 2048]:
                self.writer.add_scalar(f"Tile/Frequency/{tile}", tile_frequencies[tile], episode)

            self.epsilon = max(self.epsilon_end, self.epsilon * self.epsilon_decay)
            
            if episode % self.update_frequency == 0:
                self.agent.update_target_network()

#### 2048 Game Environment & Logic Source Code

##### Add 2048 Game Board

In [None]:
class Board:
    def __init__(self, size=4):
        self.size = size
        self.board = np.zeros((self.size, self.size), dtype=int)

    def __getitem__(self, key):
        return self.board[key]

    def __setitem__(self, key, value):
        self.board[key] = value

    def get_board(self) -> np.ndarray:
        return self.board

    def set_board(self, board: np.ndarray):
        self.board = board

    def reset(self):
        self.board = np.zeros((self.size, self.size), dtype=int)

        self.spawn_tile()
        self.spawn_tile()

    # Add new tile to the board.
    def spawn_tile(self):
        # Find all empty cells in board
        empty_cells = np.argwhere(self.board == 0)

        # If there are empty cells on the board
        if empty_cells.size > 0:
            i, j = random.choice(empty_cells)
            self.board[i, j] = 2 if random.random() < 0.9 else 4

    def __str__(self):
        return "\n".join(
            [" ".join([f"{tile:4}" for tile in row]) for row in self.board]
        )

##### Add 2048 Game Board Logic

In [None]:
ACTION_UP = 0
ACTION_DOWN = 1
ACTION_LEFT = 2
ACTION_RIGHT = 3

ACTION_SET = (ACTION_UP, ACTION_DOWN, ACTION_LEFT, ACTION_RIGHT)

class BoardLogic:
    @staticmethod
    def move(board: np.ndarray, action: int) -> tuple[np.ndarray, bool, int]:
        """
        Perform a move on the board.

        Args:
            board (np.ndarray): The current game board.
            action (int): The action to perform (UP, DOWN, LEFT, RIGHT).

        Returns:
            Tuple[np.ndarray, bool, int]: New board state, whether move was valid, and score.
        """
        prev_board = board.copy()
        new_board = board.copy()
        
        score: int = 0

        def merge(row):
            nonlocal score

            # Remove zeros and get non-zero values
            row = row[row != 0]

            # Merge adjacent equal values
            for i in range(len(row) - 1):
                if row[i] == row[i + 1]:
                    row[i] *= 2
                    score += row[i]
                    row[i + 1] = 0

            # Remove zeros again and pad with zeros
            row = row[row != 0]
            return np.pad(row, (0, 4 - len(row)), "constant")

        if action == ACTION_UP:
            new_board = np.apply_along_axis(merge, 0, new_board)
        elif action == ACTION_DOWN:
            new_board = np.apply_along_axis(lambda x: merge(x[::-1])[::-1], 0, new_board)
        elif action == ACTION_LEFT:
            new_board = np.apply_along_axis(merge, 1, new_board)
        elif action == ACTION_RIGHT:
            new_board = np.apply_along_axis(lambda x: merge(x[::-1])[::-1], 1, new_board)
        else:
            raise ValueError(f"Invalid action: {action}")

        is_valid_move: bool = not np.array_equal(prev_board, new_board)

        return new_board, is_valid_move, score
    
    @staticmethod
    def count_similar_tiles(board: np.ndarray) -> int:
        count = 0
        
        for i in range(board.shape[0]):
            for j in range(board.shape[1]):
                if i < board.shape[0] - 1 and board[i, j] == board[i+1, j] and board[i, j] != 0:
                    count += 1
                if j < board.shape[1] - 1 and board[i, j] == board[i, j+1] and board[i, j] != 0:
                    count += 1
        return count
    
    @staticmethod
    def sum_different_tiles(board: np.ndarray) -> int:
        total = 0
        
        for i in range(board.shape[0]):
            for j in range(board.shape[1]):
                if i < board.shape[0] - 1:
                    total += abs(board[i, j] - board[i+1, j])
                if j < board.shape[1] - 1:
                    total += abs(board[i, j] - board[i, j+1])
                    
        return total

    @staticmethod
    def game_over(board: np.ndarray) -> bool:
        # Check for empty spaces
        if np.any(board == 0):
            return False

        # Check for possible vertical or horizontal merges
        if np.any(board[:, :-1] == board[:, 1:]) or np.any(board[:-1, :] == board[1:, :]):
            return False

        return True

##### Add 2048 Game Environment

In [None]:
from abc import ABC, abstractmethod

# Abstract environment class (who cares)
class IEnv(ABC):
    @abstractmethod
    def reset(self):
        pass

    @abstractmethod
    def step(self, action):
        pass


class GameEnv(IEnv):
    def __init__(self, board_size: int):
        self.board_size = board_size
        self.board = Board(size=board_size)
        
        self.score = 0

    def reset(self):
        self.board.reset()

        self.score = 0

        return self.get_state()
    
    def _board(self):
        return self.board.get_board()
    
    def get_valid_actions(self):
        valid_actions = []
        
        for action in ACTION_SET:
            _, valid_move, _ = BoardLogic.move(self._board(), action)
            
            if valid_move:
                valid_actions.append(action)
                
        return valid_actions

    def step(self, action) -> tuple[np.ndarray, float, bool]:
        """Execute one step in the environment.
        
        Args:
            action (int): The action to take (0: up, 1: right, 2: down, 3: left)
            
        Returns:
            tuple: (next_state, reward, done)
                - next_state: The new board state after the action
                - reward: The reward received for the action
                - done: Whether the game is over
        """
        
        prev_max_tile = np.max(self._board())
        
        new_board, valid_move, move_score = BoardLogic.move(self._board(), action)

        if valid_move:
            self.board.set_board(new_board)
            self.board.spawn_tile()
            
            self.score += move_score
            
            max_tile = np.max(self._board())
            
            # Merge reward, use log2 to prevent reward explosion
            move_reward = np.log2(move_score) if move_score > 0 else 0
            
            # Add an expontential reward that scales with the magnitude of improvement
            new_tile_reward = (2 ** (np.log2(max_tile) - np.log2(prev_max_tile)) 
                               if max_tile > prev_max_tile else 0)
            
            # Add a small reward for empty spaces
            empty_spaces_reward = 0.1 * np.sum(self._board() == 0)
            
            # Reward is the sum of all previous reward plus a small bonus for each step
            reward = move_reward + new_tile_reward + empty_spaces_reward + 0.1
        else:
            # Penalize invalid moves
            reward = -2
        
        done = BoardLogic.game_over(self._board())
        
        return self.get_state(), reward, done

    def get_state(self) -> np.ndarray:
        state = self._board().flatten()
        return np.log2(np.where(state > 0, state, 1)).astype(int)

    def get_score(self) -> int:
        return self.score

    def get_max_tile(self) -> int:
        return np.max(self._board())

#### Main

In [None]:
BOARD_SIZE = 4

def main() -> int:
    env = GameEnv(BOARD_SIZE)
    agent = DQNAgent(BOARD_SIZE)
    
    writer = SummaryWriter()
    
    trainer = Trainer(env, agent, writer)
    trainer.train()
    
    writer.close()
    
    return 0

if __name__ == "__main__":
    main()

#### Load Tensorboard

In [None]:
%tensorboard --logdir runs