### Libraries

In [None]:
import gymnasium as gym
import minigrid
from minigrid.wrappers import *
import random
import numpy as np
from os.path import exists
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.tensorboard import SummaryWriter
from collections import namedtuple, deque

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
class DQN(nn.Module):
    def __init__(self, inputSize, numActions, hiddenLayerSize=(512, 256)):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(inputSize, hiddenLayerSize[0])
        self.fc2 = nn.Linear(hiddenLayerSize[0], hiddenLayerSize[1])
        self.fc3 = nn.Linear(hiddenLayerSize[1], numActions)
        
    def forward(self, x):
        x = x.to(device)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

def extractObjectInformation2(observation):
    (rows, cols, x) = observation.shape
    tmp = np.reshape(observation,[rows*cols*x,1], 'F')[0:rows*cols]
    return np.reshape(tmp, [rows,cols],'C')

def normalize(observation, max_value):
    return np.array(observation)/max_value

def flatten(observation):
    return torch.from_numpy(np.array(observation).flatten()).float().unsqueeze(0)

def preprocess(observation):
    return flatten(normalize(extractObjectInformation2(observation), 10.0))

def select_action(state):
    sample = random.random()
    eps_threshold = stop_epsilon+(start_epsilon-stop_epsilon)*math.exp(-1. * steps_done / decay_rate)
    
    if sample > eps_threshold:
        with torch.no_grad():
            return policy_net(state).max(1)[1].unsqueeze(0)
    else:
        return torch.tensor([[random.randrange(numActions)]], device=device, dtype=torch.long)

Transition = namedtuple('Transition',
                        ('currentState', 'action', 'nextState', 'reward'))

class ReplayMemory(object):
    def __init__(self, capacity):
        self.memory = deque([], maxlen=capacity)

    def push(self, *args):
        """Save a transition"""
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

memory = ReplayMemory(memorySize)

def optimize_model():
    # check if the replay memory has stored enough experience
    if len(memory) < batch_size:
        return

    # Sample mini-batch
    experience = memory.sample(batch_size)
    batch = Transition(*zip(*experience))

    # Calculate action-values using policy network
    state_batch = torch.cat(batch.currentState)
    action_batch = torch.cat(batch.action)
    # Calculate the action-values for each state in the batch, and 
    # then gather the Q-value for the action associated with a specific state
    state_action_values = policy_net(state_batch).gather(1, action_batch)

    # Calculate TD-targets using target network
    reward_batch = torch.cat(batch.reward)
    non_final_next_states = torch.cat([s for s in batch.nextState if s is not None])
    next_state_values = torch.zeros(batch_size, device=device)
    non_final_mask = torch.tensor(tuple(map(lambda s: s is not None, batch.nextState)), device=device, dtype=torch.bool)
    next_state_values[non_final_mask] = target_net(non_final_next_states).max(1)[0].detach()
    TDtargets = (next_state_values * gamma) + reward_batch
    TDerrors = TDtargets.unsqueeze(1) - state_action_values

    # Calculate loss
    criterion = nn.MSELoss()
    loss = criterion(state_action_values, TDtargets.unsqueeze(1))

    # Make gradient descrent step and update policy network
    optimizer.zero_grad() #optimizer ?
    loss.backward()
    for param in policy_net.parameters():
        param.grad.data.clamp_(-1, 1)
    optimizer.step()

In [None]:
### MODEL HYPERPARAMETERS 
numActions = 3
inputSize = 49

### TRAINING HYPERPARAMETERS
alpha = 0.0002
episodes = 5000            
batch_size = 128
target_update = 20000

# Q learning hyperparameters
gamma = 0.90        

# Exploration parameters for epsilon greedy strategy
start_epsilon = 1.0      
stop_epsilon = 0.01       
decay_rate = 20000       

### MEMORY HYPERPARAMETERS
pretrain_length = batch_size
memorySize = 500000

### TESTING HYPERPARAMETERS
evalEpisodes = 1000

# Change this to 'False' if you only want to evaluate a previously trained agent
train = True

In [None]:
## Initialize the weights of the policy network and target networks  
hiddenLayerSize = (128,128)
policy_net = DQN(inputSize, numActions, hiddenLayerSize)
target_net = DQN(inputSize, numActions, hiddenLayerSize)
target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

## Initialize the environment
env = gym.make('MiniGrid-Empty-8x8-v0')
env = ImgObsWrapper(env)

In [None]:
episodes = 50
max_steps = env.max_steps
steps_done = 0

print('Start training...')
for e in range(episodes):
    obs, _ = env.reset()
    state = preprocess(obs)
    
    for i in range(0, max_steps):
        action = select_action(state)
        a = action.item()
        steps_done += 1
        
        obs, reward, done, truncated, info = env.step(a)
        reward = torch.tensor([reward], device = device)
   
        nextState = preprocess(obs)

        if (done or truncated):
            nextState = None
        
        memory.push(state, action, nextState, reward)
        
        currentState = nextState

        optimize_model()

        if steps_done % target_update == 0:
            print("updating network")
            target_net.load_state_dict(policy_net.state_dict())  

        
        # Episode finished when done or truncated is true
        if (done or truncated):
            # Record the reward and total training steps taken
            if (done):
                # if agent reached its goal successfully
                print('Finished episode successfully taking %d steps and receiving reward %f' % (env.step_count, reward))
            else:
                # agent failed to reach its goal successfully 
                print('Truncated episode taking %d steps and receiving reward %f' % (env.step_count, reward))
            break
            
        
print('Done training...')

### Evaluation Provided

In [None]:
# evaluation loop
finishCounter = 0.0
totalSteps = 0.0
totalReward = 0.0

steps_done = 1000000
stop_epsilon = 0.0
evalEpisodes = 2

for e in range(evalEpisodes):
    # Initialize the environment and state
    currentObs, _ = env.reset()
    currentState = preprocess(currentObs)
   
    # the main RL loop
    for i in range(0, env.max_steps):
        # Select and perform an action
        action = select_action(currentState)
        a = action.item()

        # take action 'a', receive reward 'reward', and observe next state 'obs'
        # 'done' indicate if the termination state was reached
        obs, reward, done, truncated, info = env.step(a)
        
        if (done or truncated):
            # Observe new state
            nextState = None
        else:
            nextState = preprocess(obs)

        if (done or truncated):
            totalReward += reward
            totalSteps += env.step_count
            if (done):
                print('Finished evaluation episode %d with reward %f,  %d steps, reaching goal ' % (e, reward, env.step_count))
                finishCounter += 1
            if (truncated):
                print('Failed evaluation episode %d with reward %f, %d steps' % (e,reward, env.step_count))
            break
        
        # Move to the next state
        currentState = nextState

# Print a summary of the evaluation results
print('Completion rate %.2f with average reward %0.4f and average steps %0.2f' % (finishCounter/evalEpisodes, totalReward/evalEpisodes,  totalSteps/evalEpisodes))