In [30]:
# import 'gymnasium' and 'minigrid' for our environment
import gymnasium as gym
import minigrid
from minigrid.wrappers import *

# import 'random' to generate random numbers
import random

# import 'numpy' for various mathematical, vector and matrix functions
import numpy as np

from os.path import exists

# import 'Pytorch' for all our neural network needs

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.tensorboard import SummaryWriter

# if gpu is to be used, otherwise use cpu
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device = torch.device("cpu")

# Import 'namedtuple' and 'deque' for Experience Replay Memory
from collections import namedtuple, deque
%load_ext tensorboard

The tensorboard extension is already loaded. To reload it, use:
  %reload_ext tensorboard


#### Necessary functions

In [16]:
# Make the gym environment
env = gym.make('MiniGrid-Empty-8x8-v0')
env = ImgObsWrapper(env)
env.reset()

# extract the object_idx information as a matrix
def extractObjectInformation(observation):
    (rows, cols, x) = observation.shape
    view = np.zeros((rows, cols))
    for r in range(rows): 
        for c in range(cols): 
            view[r,c] = observation[r,c,0]
    return view

# the following is a more efficient method of extracting the information using numpy slicing and reshaping
def extractObjectInformation2(observation):
    (rows, cols, x) = observation.shape
    tmp = np.reshape(observation,[rows*cols*x,1], 'F')[0:rows*cols]
    return np.reshape(tmp, [rows,cols],'C')

# Normalise the input observation so each element is a scalar value between [0,1]
def normalize(observation, max_value):
    return np.array(observation)/max_value

# Flatten the [7,7] observation matrix into a [1,49] tensor
def flatten(observation):
    return torch.from_numpy(np.array(observation).flatten()).float().unsqueeze(0)

# Combine all the preprocessing fuctions into a single function
def preprocess(observation):
    return flatten(normalize(extractObjectInformation2(observation), 10.0))

#### Setting the hyperparameters

In [45]:
### MODEL HYPERPARAMETERS 
numActions = 3               # 3 possible actions: left, right, move forward
inputSize = 49               # size of the flattened input state (7x7 matrix of tile IDs)

### TRAINING HYPERPARAMETERS
alpha = 0.0002               # learning_rate
episodes = 5000              # Total episodes for training
batch_size = 128             # Neural network batch size
target_update = 20000        # Number of episodes between updating target network

# Q learning hyperparameters
gamma = 0.90                 # Discounting rate

# Exploration parameters for epsilon greedy strategy
start_epsilon = 1.0          # exploration probability at start
stop_epsilon = 0.01          # minimum exploration probability 
decay_rate = 100           # exponential decay rate for exploration prob

### MEMORY HYPERPARAMETERS
pretrain_length = batch_size # Number of experiences stored in the Memory when initialized for the first time
memorySize = 500000          # Number of experiences the Memory can keep - 500000

### TESTING HYPERPARAMETERS
# Evaluation hyperparameter
evalEpisodes = 1000          # Number of episodes to be used for evaluation

# Change this to 'False' if you only want to evaluate a previously trained agent
train = True     

#### DQN

In [11]:
### Neural network model definition
class DQN(nn.Module):

    def __init__(self, inputSize, numActions, hiddenLayerSize=(512, 256)):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(inputSize, hiddenLayerSize[0])
        self.fc2 = nn.Linear(hiddenLayerSize[0], hiddenLayerSize[1])
        self.fc3 = nn.Linear(hiddenLayerSize[1], numActions)

    # Called with either one element to determine next action, or a batch
    # during optimization. Returns tensor([[left0exp,right0exp]...]).
    def forward(self, x):
        x = x.to(device)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x
    
# Instantiate the policy network and the target network

hiddenLayerSize = (128,128)
policy_net = DQN(inputSize, numActions, hiddenLayerSize)
target_net = DQN(inputSize, numActions, hiddenLayerSize)

# Copy the weights of the policy network to the target network
target_net.load_state_dict(policy_net.state_dict())

# We don't want to update
target_net.eval()

DQN(
  (fc1): Linear(in_features=49, out_features=128, bias=True)
  (fc2): Linear(in_features=128, out_features=128, bias=True)
  (fc3): Linear(in_features=128, out_features=3, bias=True)
)

In [17]:
# For this example we first need to get an observation by resetting the environment
obs, _ =  env.reset()

# We then preprocess the observation to obtain our state
state = preprocess(obs)

# Lastly we apply the state as input to our policy network
action_values = policy_net(state)

print('action_values: ',action_values)

# If want want to get the action that has the highest Q-value we use the 'max' function. 
# The result is a tuple where the first element is the value, and the second element is the index
action_values.max(1)
print('\nbest action: ', action_values.max(1))
a = action_values.max(1)[1]
print('\na: ', a)

action_values:  tensor([[-0.0493,  0.0913, -0.0486]], grad_fn=<AddmmBackward0>)

best action:  torch.return_types.max(
values=tensor([0.0913], grad_fn=<MaxBackward0>),
indices=tensor([1]))

a:  tensor([1])


In [18]:
## Function to e-greedily select next action based on current state
def select_action(state):
    # generate a random number
    sample = random.random()
    
    # calculate the epsilon threshold, based on the epsilon-start value, the epsilon-stop value, 
    # the number of training steps taken and the epsilon decay rate
    # here we are using an exponential decay rate for the epsilon value
    eps_threshold = stop_epsilon+(start_epsilon-stop_epsilon)*math.exp(-1. * steps_done / decay_rate)
    
    # compare the generated random number to the epsilon threshold
    if sample > eps_threshold:
        # act greedily towards the Q-values of our policy network, given the state
        
        # we do not want to gather gradients as we are only generating experience, not training the network
        with torch.no_grad():
            # t.max(1) will return largest column value of each row.
            # second column on max result is index of where max element was
            # found, so we pick action with the larger expected reward.
            return policy_net(state).max(1)[1].unsqueeze(0)
    else:
        # select a random action with equal probability
        return torch.tensor([[random.randrange(numActions)]], device=device, dtype=torch.long)

In [19]:
from collections import namedtuple, deque

Transition = namedtuple('Transition',
                        ('currentState', 'action', 'nextState', 'reward'))

class ReplayMemory(object):

    def __init__(self, capacity):
        self.memory = deque([], maxlen=capacity)

    def push(self, *args):
        """Save a transition"""
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)
    
# Instantiate memory
memory = ReplayMemory(memorySize)

In [20]:
writer = SummaryWriter()
criterion = nn.MSELoss()
optimizer = optim.Adam(policy_net.parameters(), lr=alpha)

In [21]:
## Training of the model
batch_size = 2
def optimize_model():

    # check if the replay memory has stored enough experience
    if len(memory) < batch_size:
        return

    # Sample mini-batch
    experience = memory.sample(batch_size)
    batch = Transition(*zip(*experience))
    
    # Calculate action-values using policy network
    state_batch = torch.cat(batch.currentState)
    action_batch = torch.cat(batch.action)
    state_action_values = policy_net(state_batch).gather(1, action_batch)
    
    # Calculate TD-targets using target network
    reward_batch = torch.cat(batch.reward)
    non_final_next_states = torch.cat([s for s in batch.nextState
                                                if s is not None])
    next_state_values = torch.zeros(batch_size, device=device)
    non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
                                          batch.nextState)), device=device, dtype=torch.bool)
    next_state_values[non_final_mask] = target_net(non_final_next_states).max(1)[0].detach()
    TDtargets = (next_state_values * gamma) + reward_batch
    TDerrors = TDtargets.unsqueeze(1) - state_action_values
    
    # Calculate loss
    criterion = nn.MSELoss()
    loss = criterion(state_action_values, TDtargets.unsqueeze(1))
    
    # Make gradient descrent step and update policy network
    optimizer.zero_grad()
    loss.backward()
    for param in policy_net.parameters():
        param.grad.data.clamp_(-1, 1)
    optimizer.step()

    
optimize_model()

In [31]:
%tensorboard --logdir=runs

In [47]:
# Make the gym environment
env = gym.make('MiniGrid-Empty-8x8-v0')

# Use a wrapper so the observation only contains the grid information
env = ImgObsWrapper(env)

episodes = 10               # total number of training episodes
max_steps = env.max_steps  # maximum number of steps allowed before truncating episode
steps_done = 0             # total training steps taken

memory = ReplayMemory(memorySize) # Instantiate memory
batch_size = 2
total_rewards = 0

print('Start training...')
for e in range(episodes):
    
    # reset the environment
    obs, _ = env.reset()

    # extract the current state from the observation
    state = preprocess(obs)
    
    for i in range(0, max_steps):

        # Choose an action
        # Pick a random action
        action = select_action(state)
        a = action.item()
        
        # take action 'a', receive reward 'reward', and observe next state 'obs'
        # 'done' indicate if the termination state was reached
        obs, reward_, done, truncated, info = env.step(a)
        reward = torch.tensor([reward_], device = device)
   
        # extract the next state from the observation
        nextState = preprocess(obs)
        
        # if the episode is finished, the nextState is set to None to indicate that the
        # <s,a,r,s'> transition led to a terminating state
        if (done or truncated):
            nextState = None
        
        # Store the transition <s,a,r,s'> in the replay memory
        memory.push(state, action, nextState, reward)

        # Move to the next state          
        currentState = nextState

        # Perform one step of the optimization (on the policy network) by
        # sample a mini-batch and train the model using the sampled mini-batch
        optimize_model()
        
        # If the target update threshold is reached, update the target network, 
        # copying all weights and biases in the policy network   
        if steps_done % target_update == 0:
            target_net.load_state_dict(policy_net.state_dict())

        
        # Episode finished when done or truncated is true
        if (done or truncated):
            # Record the reward and total training steps taken
            if (done):
                # if agent reached its goal successfully
                print('Finished episode successfully taking %d steps and receiving reward %f' % (env.step_count, reward))
            else:
                # agent failed to reach its goal successfully 
                print('Truncated episode taking %d steps and receiving reward %f' % (env.step_count, reward))
            break
            
    steps_done += 1
    total_rewards += reward_
    writer.add_scalar("Reward/train", total_rewards/(e+1), e)
            
        
print('Done training...')

Start training...
Finished episode successfully taking 218 steps and receiving reward 0.233594
Truncated episode taking 256 steps and receiving reward 0.000000
Finished episode successfully taking 127 steps and receiving reward 0.553516
Finished episode successfully taking 45 steps and receiving reward 0.841797
Truncated episode taking 256 steps and receiving reward 0.000000
Finished episode successfully taking 203 steps and receiving reward 0.286328
Truncated episode taking 256 steps and receiving reward 0.000000
Finished episode successfully taking 170 steps and receiving reward 0.402344
Finished episode successfully taking 96 steps and receiving reward 0.662500
Truncated episode taking 256 steps and receiving reward 0.000000
Done training...


In [42]:
# evaluation loop
finishCounter = 0.0
totalSteps = 0.0
totalReward = 0.0

steps_done = 1000000
stop_epsilon = 0.0
evalEpisodes = 2

for e in range(evalEpisodes):
    # Initialize the environment and state
    currentObs, _ = env.reset()
    currentState = preprocess(currentObs)
   
    # the main RL loop
    for i in range(0, env.max_steps):
        # Select and perform an action
        action = select_action(currentState)
        a = action.item()

        # take action 'a', receive reward 'reward', and observe next state 'obs'
        # 'done' indicate if the termination state was reached
        obs, reward, done, truncated, info = env.step(a)
        
        if (done or truncated):
            # Observe new state
            nextState = None
        else:
            nextState = preprocess(obs)

        if (done or truncated):
            totalReward += reward
            totalSteps += env.step_count
            if (done):
                print('Finished evaluation episode %d with reward %f,  %d steps, reaching goal ' % (e, reward, env.step_count))
                finishCounter += 1
            if (truncated):
                print('Failed evaluation episode %d with reward %f, %d steps' % (e,reward, env.step_count))
            break
        
        # Move to the next state
        currentState = nextState

# Print a summary of the evaluation results
print('Completion rate %.2f with average reward %0.4f and average steps %0.2f' % (finishCounter/evalEpisodes, totalReward/evalEpisodes,  totalSteps/evalEpisodes))

Failed evaluation episode 0 with reward 0.000000, 256 steps
Failed evaluation episode 1 with reward 0.000000, 256 steps
Failed evaluation episode 2 with reward 0.000000, 256 steps
Failed evaluation episode 3 with reward 0.000000, 256 steps
Failed evaluation episode 4 with reward 0.000000, 256 steps
Failed evaluation episode 5 with reward 0.000000, 256 steps
Failed evaluation episode 6 with reward 0.000000, 256 steps
Failed evaluation episode 7 with reward 0.000000, 256 steps
Failed evaluation episode 8 with reward 0.000000, 256 steps
Failed evaluation episode 9 with reward 0.000000, 256 steps
Completion rate 0.00 with average reward 0.0000 and average steps 1280.00
