# Robot Grid World Assessment part 2
**Written by**  
Egor Danilov (33411115),  
Yash Balchandani(33279950),  
Gautam Ravi Kumar(33197970),  
Jacob Wicklund (31265936)

In [3]:
import numpy as np
import time
import random
import matplotlib.pyplot as plt
from collections import defaultdict, namedtuple, deque
from IPython.display import clear_output
import torch
from torch import tensor, optim
import torch.nn as nn
from torch.nn.modules import activation
import torch.nn.functional as F
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [42]:
class GridWorld:
    """
    A class representing a grid world environment for a robot to navigate and complete tasks.

    Attributes:
        state_tuple: namedtuple class for representing the state of the environment.
        n: Size of the grid world.
        action_space: List of possible actions (0: up, 1: down, 2: left, 3: right).
        action_map: Dictionary mapping actions to their corresponding (row, column) changes.

    Methods:
        __init__(self, n: int = 5): Constructor to initialize the grid world.
        _next_robot_position(self, action): Calculates the next position of the robot based on the action taken.
        step(self, action): Performs an action in the environment and returns the new state, reward, and done flag.
        reward(self, action): Computes the reward based on the action and the current state.
        reset(self): Resets the environment to create a new grid world configuration.
        render(self): Renders the current state of the grid world.
    """
    ############################################################################################################################
    def __init__(self, n: int=5):
        """
        Initializes a GridWorld instance.

        Parameters:
        ===========
            n (int): Size of the grid world (default is 5).
        """
        self.n = n
        self.action_space = [0, 1, 2, 3]
        self.action_map_ = {
            0: -n,  # up
            1: n,   # down
            2: -1,  # left
            3: 1    # right
        }
    ############################################################################################################################
    def _next_robot_position(self, action):
        """
        Calculates the next position of the robot based on the action taken.
        If the robot tries to go out of bounds it is returned back.

        Parameters:
        ===========
            action (int): Action taken by the robot.

        Returns:
            tuple: New position of the robot.
        """
        # Calculate the next robot location based on the action
        next_robot_loc = (self.state[0] + self.action_map_[action][0],
                        self.state[1] + self.action_map_[action][1])

        # Ensure the robot stays within bounds
        if (0 <= next_robot_loc[0] <= self.n - 1 and
            0 <= next_robot_loc[1] <= self.n - 1):
            return next_robot_loc

        return self.state[0], self.state[1]

    ############################################################################################################################
    def step(self, action):
        """
        Perform an action in the environment.

        Parameters:
        ===========
            action (int): Action taken by the robot.

        Returns:
            tuple: New state, immediate reward, and done flag.
        """
        done = False
        picked = self.state[6]
        next_robot_loc = self._next_robot_position(action)

        # Check if robot has picked up the load
        if next_robot_loc == (self.state[2], self.state[3]):
            picked = True

        # Check if robot reached the destination with the load
        if next_robot_loc == (self.state[4], self.state[5]) and picked:
            done = True

        reward = self.reward(action)
        self.state = tensor([next_robot_loc[0], next_robot_loc[1],
                            self.state[2], self.state[3],
                            self.state[4], self.state[5],
                            picked])

        return self.state, reward, done

    ############################################################################################################################
    def render(self):
        """
        Renders the current state of the grid world.

        Returns:
            None
        """
        Robot_loc_x, Robot_loc_y, Load_loc_x, Load_loc_y, Final_loc_x, Final_loc_y, picked = self.state

        for i in range(self.n):
            for j in range(self.n):
                if (i, j) == (Robot_loc_x, Robot_loc_y):
                    print("R", end=" ")
                elif (i, j) == (Load_loc_x, Load_loc_y) and not picked:
                    print("L", end=" ")
                elif (i, j) == (Final_loc_x, Final_loc_y):
                    print("F", end=" ")
                else:
                    print(".", end=" ")
            print()

    ############################################################################################################################
    def reset(self):
        """
        Resets the environment. Randomly creates a new grid world configuration.

        Returns:
            None
        """
        r, l, f = np.random.choice(self.n * self.n, size=3, replace=False)
        Robot_loc_x, Robot_loc_y = r % self.n, r // self.n
        Load_loc_x, Load_loc_y   = l % self.n, l // self.n
        Final_loc_x, Final_loc_y = f % self.n, f // self.n
        picked = False
        locations = np.array([r,l,f]).reshape(-1)
        self.state = np.eye(self.n * self.n)[locations]
        self.state = np.append(self.state,picked)
        self.state = tensor([Robot_loc_x, Robot_loc_y,Load_loc_x, Load_loc_y,Final_loc_x, Final_loc_y,picked])
        return self.state

    ############################################################################################################################
    def reward(self, action):
        """
        Computes the immediate reward based on the action and the current state.

        Parameters:
        ===========
            action (int): Action taken by the robot.

        Returns:
            float: Reward value.
        """
        next_robot_loc = self._next_robot_position(action)

        # Reward for picking up the load
        if next_robot_loc == (self.state[2], self.state[3]) and not self.state[6]:
            return tensor(1.0)

        # Reward for delivering the load to the final location
        if next_robot_loc == (self.state[4], self.state[5]) and self.state[6]:
            return tensor(2.0)

        # Penalty for every step that is not the final location and not the load location
        return tensor(-0.2)

    ############################################################################################################################
    @property
    def state_size(self):
        return 49



In [5]:
class QNetwork(nn.Module):
    """Q-Network model used for Deep Q-Learning."""
    ############################################################################################################################
    def __init__(self, state_size, action_size, fc1_units=64, fc2_units=32):
        """
        Initialize the Q-Network.

        Parameters:
        ===========
            state_size (int): Dimension of each state (input size).
            action_size (int): Dimension of each action (output size).
            fc1_units (int): Number of nodes in the first hidden layer.
            fc2_units (int): Number of nodes in the second hidden layer.
        """
        super(QNetwork, self).__init__()
        self.fc1 = nn.Linear(state_size, fc1_units)
        self.fc2 = nn.Linear(fc1_units, fc2_units)
        self.fc3 = nn.Linear(fc2_units, action_size)

    ############################################################################################################################
    def forward(self, state):
        """
        Build the neural network that maps state -> action values.

        Parameters:
        ===========
            state (Tensor): The input state for which to compute the action values.

        Returns:
            x (Tensor): The action values for the given state.
        """
        
        x = F.relu(self.fc1(state))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)

        return x

In [6]:
class ReplayBuffer:
    ############################################################################################################################
    def __init__(self, action_size, buffer_size, batch_size):
        """
        Initialize a ReplayBuffer object.

        Parameters:
        ===========
            action_size (int): Dimensionality of each action.
            buffer_size (int): Maximum number of experiences to store in the buffer.
            batch_size (int): Number of experiences to sample during training.
        """
        self.action_size = action_size # Maximum possible ways each action can be done.

        self.memory = deque(maxlen=buffer_size) #double ended queue for a short term memory of agent.

        self.batch_size = batch_size # Resampling of the past experience what is past experience? Named Tuple from part 1.

        # (Tagged) named tuple to represent a single experience in the replay memory
        self.experience = namedtuple("Experience", field_names=["state", "action", "reward", "next_state", "done"])

    ############################################################################################################################
    def __len__(self):
        """Return the current size of internal memory."""
        return len(self.memory) # That is the dbl qu length.

    ############################################################################################################################
    def add(self, state, action, reward, next_state, done):
        """
        Add a new experience to memory.
        """
        exp = self.experience(state, action, reward, next_state, done) # Experience tuple. Defined above.
        # Pass in the experience to the short term memory queue.
        self.memory.append(exp) # Passes the list of attributes into the memory dbl queue.

    ############################################################################################################################
    def sample(self): #Segway into the deep q network. This is the sampler method.
        """
        Randomly sample a batch of experiences from memory for training.
        """
        experiences = random.sample(self.memory, k=self.batch_size) # Sample all the past experiences to learn the most important qualities so we can delete the experience data.

        states = torch.from_numpy(np.vstack([e.state for e in experiences])).float().to(device) #retrieve the states from experience stored in double qu
        actions = torch.from_numpy(np.vstack([e.action for e in experiences])).long().to(device) # retrieve the actions
        rewards = torch.from_numpy(np.vstack([e.reward for e in experiences])).float().to(device) # the rewards
        next_states = torch.from_numpy(np.vstack([e.next_state for e in experiences])).float().to(device) # possible next moves, choosing the probabilistic best reward. retrieved from the DQN calculation
        dones = torch.from_numpy(np.vstack([e.done for e in experiences]).astype(np.uint8)).float().to(device) # task completed flag. IE shut down.

        return (states, actions, rewards, next_states, dones) # send this info back to caller


### Define Deep Q Learning Algorithm

In [44]:
class DQN():
    """Deep Q-Network (DQN) agent."""
    ############################################################################################################################
    def __init__(self, state_size, action_size,
                buffer_size: int=100000, batch_size: int=32,
                learning_rate: float=3e-4, gamma: float=0.99,
                start_sampling: int=100):

        self.state_size = state_size # number of possible states.
        self.action_size = action_size # how many possible actions.
        self.start_sampling = start_sampling # "experiences of the agent object".
        self.buffer_size = buffer_size # replay buffer.
        self.batch_size = batch_size # Size of the train batch...?
        self.learning_rate = learning_rate # Network optimizing parameter.
        self.gamma = gamma # Discounting the rewards where as the most immediate one is carrying the most weight.

        # Initialize two Q-Networks: local and target
        self.qnetwork_local = QNetwork(state_size, action_size).to(device) # primary train "expected value" network
        self.qnetwork_target = QNetwork(state_size, action_size).to(device) #secondary "expected value" network for stabilizing the dynamic behavior of the primary network.
        self.optimizer = optim.Adam(self.qnetwork_local.parameters(), lr=learning_rate) # This is a basic initializer for optimization of the NN.

        # Initialize short term memory
        self.memory = ReplayBuffer(action_size, buffer_size, batch_size) # User Defined above ______^

    ############################################################################################################################
    def act(self, state, eps=0.):
        """
        Choose an action based on the current state using epsilon-greedy policy.

        Parameters:
        ===========
            state (torch.Tensor): Current state.
            eps (float): Epsilon for epsilon-greedy action selection.
        """
        #state = state.float().unsqueeze(0).to(device) # here we take the state tensor and add another dimension at the start of this tensor
        self.qnetwork_local.eval() # this sets the nn from torch into evaluation mode.
        with torch.no_grad(): # Disable gradient tracking (local net doesn't learn from the calculation following:::)
            action_values = self.qnetwork_local(state) # compute the value of taking each act --- then store it.
        self.qnetwork_local.train() # return local q net into train mode (learn from acting in XYZ way)

        # Epsilon-greedy action selection
        if np.random.random() > eps: # zero and 1 random selection number comparison with epsilon
            return np.argmax(action_values.cpu().data.numpy())  # should epsilon be smaller, this retrieves the calculated values from processing unit
                                                                # puts it into a numpy array. picks the highest reward
        else:
            return random.choice(np.arange(self.action_size)) # randomly makes a choice for its next step.

    ############################################################################################################################
    def fit(self, env, n_episodes: int=5000, max_t: int=100,
            rolling_epochs: int=200, target_score=None,
            eps_start=1.0, eps_end=0.01, eps_decay=0.99):
        """
        Train the agent using deep Q-learning.

        Parameters:
        ===========
            env: Environment to train in.
            n_episodes (int): Maximum number of training episodes.
            max_t (int): Maximum number of time steps per episode.
            rolling_epochs (int): Number of episodes for calculating average score.
            target_score (float): Target score for the environment.
            eps_start (float): Starting value of epsilon.
            eps_end (float): Minimum value of epsilon.
            eps_decay (float): Decay factor for epsilon.
        """
        scores = []  # List to store scores (rewards) from each episode
        scores_window = deque(maxlen=rolling_epochs)  # Double ended que, once rolling_epochs is reached by one of the ends, the other end is bumped.
        begin = eps_start # The local variable is what gives flexibility from the input param.
        for epis in range(1, n_episodes + 1): # Simply for each episode...:
            state = env.reset() # Reset environment
            score = 0 # Agent's reward score.
            for _ in range(max_t): # For everything from 0 to the max allowed time
                action = self.act(state, begin) # Calling the action function.
                next_state, reward, done = env.step(action) # Calling the step function from the "grid" getting reward values for taking steps, getting items etc..
                self.step(state, action, reward, next_state, done) # Take actual step based on the above line's values.
                state = next_state # Updating the state variable
                score += reward # Add the step consequence.
                if done: # Done condition is package delivery.
                    break

            scores_window.append(score) # The last step's reward update is put into the double end queue.
            scores.append(score) # Adds the score to the agent's personal record.
            begin = max(eps_end, eps_decay * begin) # Reset the begin variable to the max of either the ending episode or decay times begin.

            # Display training progress
            print(f'\rEpisode {epis}, Average Score: {np.mean(scores_window):.2f}', end="")

            if epis % rolling_epochs == 0:
                print(f'\rEpisode {epis}, Average Score: {np.mean(scores_window):.2f}')

            if target_score and np.mean(scores_window) >= target_score:
                torch.save(self.qnetwork_local.state_dict(), 'checkpoint.pth')
                break

                    # Plot the scores
        fig = plt.figure(figsize=(6,3))
        plt.plot(range(len(scores)), scores)
        plt.ylabel('Score')
        plt.xlabel('Episode')
        plt.show()

    ############################################################################################################################
    def learn(self, experiences, gamma):
        """
        Update Q-network parameters using a batch of experience tuples.

        Parameters:
        ===========
            experiences (Tuple[torch.Tensor]): Tuple of (s, a, r, s', done).
            gamma (float): Discount factor.
        """
        states, actions, rewards, next_states, dones = experiences # Split the tensor into respective variables the named tuple of length 5

        Q_targets_next = self.qnetwork_target(next_states).detach().max(1)[0].unsqueeze(1) # Q (expected return) from the target network.
        # Detach the current matrix from a tensor, so its not included in updating.
        # calculate the maximum of the first 0th element which is the states, carrying their respective rewards from the map
        # unsqueeze (grow)_ is just the tensor reshape to add another dimension along the position 1 element
        # Giving the next best target for a future calculation.v

        # Compute Q targets for current states
        Q_targets = rewards + (gamma * Q_targets_next * (1 - dones)) # Best decision given the current rules and move limitation - initialized by the policy.
        # Also taking the Q-rewards out of the detached tensor and adding it into the second half of the bellmen equation
        # Get expected Q values from local network

        Q_expected = self.qnetwork_local(states).gather(1, actions) # Expected 'rewards' (equal to Q-rewards) out of all next possible states.

        # Optimize
        loss = F.mse_loss(Q_expected, Q_targets) # Neural network functions, computing the temporal difference error.
        # holding the knowledge of the best rewards for an mse loss between expectation and target. Minimizing this difference is the DQ goal.

        self.optimizer.zero_grad() # This is like a memory reset for the gradients, so previous runs aren't stored in new runs.
        loss.backward() # Do one backward pass through gradients to see the optimum path to best return.
        self.optimizer.step() # update "weights and biases" through the game board network. According to optimizer set point, Adam in this case.

        # Update target network
        self.update(self.qnetwork_local, self.qnetwork_target) # This aligns the local network (possible moves) with the target network "game board".

    ############################################################################################################################
    def step(self, state, action, reward, next_state, done):
        """Store experience in replay memory and learn if enough samples are available."""

        # Store experience in replay memory
        self.memory.add(state, action, reward, next_state, done) #Each agent has its own memory attribute, holding the parameters inside. Now i gotta know what replay buffer is doing.

        # Learn from a random subset of experiences if enough samples are available
        if len(self.memory) > self.start_sampling: # if theres open memory randomly fill it with samples
            experiences = self.memory.sample() # sampling the replay buffer obj from torch
            self.learn(experiences, self.gamma) # calls the learn method with above line executed and stored and the set future reward gamma discount rate.

    ############################################################################################################################
    def update(self, local_model, target_model):
        """Copy weights from local model to target model."""
        target_model.load_state_dict(local_model.state_dict())


### Agent Instantiation. Policy learning and Visualisation.

In [45]:
# Initialize a 4x4 (n^2) grid world environment.
env = GridWorld(4)

# Set agent's personal state size to the environment's state size.
# The possible actions in this environment set to 4: up, down, left, right.
policy = DQN(state_size=env.state_size, action_size=4) # Use the DQN class to govern agent object.

# Train agent object with GridWorld environment.
# Training process for n_episodes or until desired score is reached.
# over the last 'rolling_epochs' (defaulted to 200 in the DQN class) reaches or exceeds 2.2.
policy.fit(env=env, n_episodes=2500, target_score=2.2)

TypeError: linear(): argument 'input' (position 1) must be Tensor, not numpy.ndarray

In [41]:
env.reset()
len(env.state)

49

### Test the Agent

In [None]:
# load pretrained weights
policy.qnetwork_local.load_state_dict(torch.load('checkpoint.pth'))

<All keys matched successfully>

In [None]:
# Visualise 5 episodes using trained agent

for i in range(5):
    state = env.reset()
    for j in range(20):
        action = policy.act(state)
        state, reward, done = env.step(action)
        clear_output(wait=True)
        env.render()
        time.sleep(1)
        if done:
            break

. . . . 
. . . . 
. R . . 
. . . . 
