In [None]:
import gymnasium as gym
import minigrid
from minigrid.wrappers import *
import random
import numpy as np
from os.path import exists
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from collections import namedtuple, deque

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

### Pre-processing functions

In [None]:
def extractObjectInformation2(observation):
    (rows, cols, x) = observation.shape
    tmp = np.reshape(observation,[rows*cols*x,1], 'F')[0:rows*cols]
    return np.reshape(tmp, [rows,cols],'C')

def normalize(observation, max_value):
    return np.array(observation)/max_value

def flatten(observation):
    return torch.from_numpy(np.array(observation).flatten()).float().unsqueeze(0)

def preprocess(observation):
    return flatten(normalize(extractObjectInformation2(observation), 10.0))

### Select action function & Optimise model function

In [None]:
def select_action(state, policy_net, numActions, steps_done, start_epsilon, stop_epsilon, decay_rate):
    sample = random.random()
    eps_threshold = stop_epsilon+(start_epsilon-stop_epsilon)*math.exp(-1. * steps_done / decay_rate)
    
    if sample > eps_threshold:
        with torch.no_grad():
            return policy_net(state).max(1)[1].unsqueeze(0)
    else:
        return torch.tensor([[random.randrange(numActions)]], device=device, dtype=torch.long)

In [None]:
def optimize_model(memory, policy_net, target_net, optimizer, criterion, gamma, batch_size, pretrain_length):
    if len(memory) < pretrain_length:
        return

    # Sample mini-batch
    experience = memory.sample(batch_size)
    batch = Transition(*zip(*experience))

    # Calculate action-values using policy network
    state_batch = torch.cat(batch.currentState)
    action_batch = torch.cat(batch.action)
    state_action_values = policy_net(state_batch).gather(1, action_batch)

    # Calculate TD-targets using target network
    reward_batch = torch.cat(batch.reward)
    non_final_next_states = torch.cat([s for s in batch.nextState if s is not None])
    next_state_values = torch.zeros(batch_size, device=device)
    non_final_mask = torch.tensor(tuple(map(lambda s: s is not None, batch.nextState)), device=device, dtype=torch.bool)
    next_state_values[non_final_mask] = target_net(non_final_next_states).max(1)[0].detach()
    TDtargets = (next_state_values * gamma) + reward_batch
    TDerrors = TDtargets.unsqueeze(1) - state_action_values

    # Calculate loos
    loss = criterion(state_action_values, TDtargets.unsqueeze(1))
    # Make gradient descrent step and update policy network
    optimizer.zero_grad()
    loss.backward()
    for param in policy_net.parameters():
        param.grad.data.clamp_(-1, 1)
    optimizer.step()

### Hyperparameters

### Create NN and Memory

In [None]:
class DQN(nn.Module):
    def __init__(self, inputSize, numActions, hiddenLayerSize=(512, 256)):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(inputSize, hiddenLayerSize[0])
        self.fc2 = nn.Linear(hiddenLayerSize[0], hiddenLayerSize[1])
        self.fc3 = nn.Linear(hiddenLayerSize[1], numActions)

    def forward(self, x):
        x = x.to(device)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

Transition = namedtuple('Transition',('currentState', 'action', 'nextState', 'reward'))

class ReplayMemory(object):

    def __init__(self, capacity):
        self.memory = deque([], maxlen=capacity)

    def push(self, *args):
        """Save a transition"""
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

### Training

Our algorithm is summarized below:
<br>
* Initialize the weights of the policy network and target networks
* Initialize the environment
* Set the decay rate (that will use to calculate the $\epsilon_{\textrm{threshold}}$)
* Set fixed Q-value target update threshold
* Set total training steps to 0
* Create replay memory $\mathcal{D}$
<br><br>
* **For** episode to max_episode **do** 
    * Set step to 0
    * Make new episode
    * Observe the first state $s$
    <br><br>
    * **While** {(not done) and (step < max_steps)} **do**:
        * With $\epsilon$ select a random action $a$, otherwise select $a = \mathrm{argmax}_a Q(s,a)$
        * Increment the total training steps 
        * Execute action $a_t$ in environment, observe reward $r$ and new state $s'$
        * Store transition $<s, a, r, s'>$ in replay memory $\mathcal{D}$
        * **If** size($\mathcal{D}$) >= mini-batch size $N$: 
            * Sample random mini-batch from $\mathcal{D}$: $<s_i, a_i, r_i, s_i'>$ with $i=1\ldots N$
            * Set TD-target $\hat{Q} = r$ if the episode ends at $+1$, otherwise set $\hat{Q} = r + \gamma \max_{a'}{Q(s', a')}$
            * Make a gradient descent step with loss $(\hat{Q} - Q(s, a))^2$
        * **If** total_steps > fixed Q-value target update threshold:
            * Copy parameters from policy network to target network
    * **endwhile**
    <br><br>
* **endfor**

In [None]:
def train(env, policy_net, target_net, memory, optimizer, criterion, gamma, batch_size, target_update, start_epsilon, stop_epsilon, decay_rate, episodes, pretrain_length, numActions):
    steps_done = 0

    for e in range(episodes):
        currentObs, _ = env.reset()
        currentState = preprocess(currentObs)
        
        for i in range(0, env.max_steps):
            # Choose an action
            action = select_action(currentState, policy_net, numActions, steps_done, start_epsilon, stop_epsilon, decay_rate)
            a = action.item()
            steps_done += 1
            
            # take action 'a', receive reward 'reward', and observe next state 'obs'
            # 'done' indicate if the termination state was reached
            obs, reward, done, truncated, info = env.step(a)
    
            if (done or truncated):
                nextState = None
            else:
                nextState = preprocess(obs)
            
            # Store the transition <s,a,r,s'> in the replay memory
            reward = torch.tensor([reward], device = device)
            memory.push(currentState, action, nextState, reward)

            # Move to the next state          
            currentState = nextState

            # Perform one step of the optimization (on the policy network) by
            # sample a mini-batch and train the model using the sampled mini-batch
            optimize_model(memory, policy_net, target_net, optimizer, criterion, gamma, batch_size, pretrain_length)
            
            # If the target update threshold is reached, update the target network, 
            # copying all weights and biases in the policy network
            if steps_done % target_update == 0:
                target_net.load_state_dict(policy_net.state_dict())

            if (done or truncated):
                if (done):
                    print('Finished episode successfully taking %d steps and receiving reward %f' % (env.step_count, reward))
                else:
                    print('Truncated episode taking %d steps and receiving reward %f' % (env.step_count, reward))
                break

In [None]:
### MODEL HYPERPARAMETERS 
numActions = 3

### TRAINING HYPERPARAMETERS
alpha = 0.0002               # learning_rate
episodes = 5000              # Total episodes for training
batch_size = 128             # Neural network batch size
target_update = 5000        # Number of episodes between updating target network
gamma = 0.90

start_epsilon = 1.0          # exploration probability at start
stop_epsilon = 0.01          # minimum exploration probability 
decay_rate = 20000           # exponential decay rate for exploration prob

### MEMORY HYPERPARAMETERS
pretrain_length = batch_size # Number of experiences stored in the Memory when initialized for the first time
memorySize = 500000          # Number of experiences the Memory can keep - 500000

### TESTING HYPERPARAMETERS
evalEpisodes = 1000
train = True 

In [None]:
# Make the gym environment
env = gym.make('MiniGrid-Empty-8x8-v0')
env = ImgObsWrapper(env)

policy_net = DQN(49, numActions, (128,128))
target_net = DQN(49, numActions, (128,128))
target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

criterion = nn.MSELoss()
optimizer = optim.Adam(policy_net.parameters(), lr=alpha)

memory = ReplayMemory(memorySize)

train(env, policy_net, target_net, memory, optimizer, criterion, gamma, batch_size, target_update, start_epsilon, stop_epsilon, decay_rate, episodes, pretrain_length, numActions)
