# Assignment 5, Problem 3

This is the starter code for Assignment 5, Problem 3.

In this assignment, you will solve increasingly challenging tasks from the [Minigrid benchmark](https://minigrid.farama.org/).

In [None]:
!pip install torch
!pip install gymnasium==0.29.1
!pip install minigrid==2.3.1

In [2]:
import gymnasium as gym
import minigrid
import numpy as np

np.set_printoptions(formatter={'float': lambda x: "{0:0.2f}".format(x)})

pygame 2.6.1 (SDL 2.28.4, Python 3.9.20)
Hello from the pygame community. https://www.pygame.org/contribute.html


In [3]:
def compute_score(task, policy):
  num_episodes = 10
  cur_episode  = 0

  seed_by_episode = [42, 34, 50, 1, 9, 7, 43, 56, 90, 11]
  score_by_episode = np.zeros(num_episodes)

  while cur_episode < num_episodes:

    cumulative_reward = 0
    cur_seed = seed_by_episode[cur_episode]

    observation, info = task.reset(seed=cur_seed)
    done = False

    while not done:
      action = policy(observation)
      observation, reward, terminated, truncated, info = task.step(action)
      cumulative_reward += reward

      if terminated or truncated:
        done = True
        score_by_episode[cur_episode] = cumulative_reward
        cur_episode += 1

  score_mean = round(score_by_episode.mean(), 3)
  score_std  = round(score_by_episode.std(), 3)
  score_best = round(score_by_episode.max(), 3)

  print(f"Best score: {score_best}")
  print(f"Average score: {score_mean, score_std}")

  return score_by_episode

## Problem 3
Solve the [Minigrid Blocked, Unlock and Pickup](https://minigrid.farama.org/environments/minigrid/UnlockEnv/) task.

This problem is optional for COMP 442 students.
This problem is mandatory for COMP 552 students.

This problem is worth 05 points for COMP 552 students.

![](https://minigrid.farama.org/_images/BlockedUnlockPickupEnv.gif)

In [4]:
third_task = gym.make("MiniGrid-BlockedUnlockPickup-v0")

In [5]:
obs, _ = third_task.reset()
print("Observation keys:", obs.keys())
print("Image shape:", obs['image'].shape)
print("Direction:", obs["direction"])
print("Mission:", obs["mission"])
print("Action space:", third_task.action_space)

Observation keys: dict_keys(['image', 'direction', 'mission'])
Image shape: (7, 7, 3)
Direction: 2
Mission: pick up the yellow box
Action space: Discrete(7)


In [8]:
obs["image"].shape
obs["mission"]

'pick up the yellow box'

In [None]:
######## PUT YOUR CODE HERE ########
# Train an agent to solve the task
import torch
import torch.nn as nn
from stable_baselines3.common.torch_layers import BaseFeaturesExtractor

class MinigridFeaturesExtractor(BaseFeaturesExtractor):
    def __init__(self, observation_space: gym.Space, 
                 features_dim: int = 512,
                 hidden_channel_size: int = [16,32,64], 
                 num_layers: int = 3,
                 num_linear_layers = 2, 
                 linear_layer_size = 64) -> None:
        super().__init__(observation_space, features_dim)

        assert len(hidden_channel_size) == num_layers, "hidden_channel_size should be a list of length num_layers"

        n_input_channels = observation_space.shape[0]
        
        # build CNN layers
        layers = []
        # input layer
        layers.append(nn.Conv2d(n_input_channels, hidden_channel_size[0], kernel_size=3, stride=1, padding=1))
        layers.append(nn.ReLU())

        # hidden layers
        for i in range(1, num_layers):
            layers.append(nn.Conv2d(hidden_channel_size[i-1], hidden_channel_size[i], kernel_size=3, stride=1, padding=1))
            layers.append(nn.ReLU())

        # output layer
        layers.append(nn.Flatten())

        self.cnn = nn.Sequential(*layers)

        # Compute shape by doing one forward pass
        with torch.no_grad():
            n_flatten = self.cnn(torch.as_tensor(observation_space.sample()[None]).float()).shape[1]

        # self.linear = nn.Sequential(nn.Linear(n_flatten, features_dim), nn.ReLU())
        linear_layers = []
        # input linear layer
        linear_layers.append(nn.Linear(n_flatten, linear_layer_size))
        linear_layers.append(nn.ReLU())
        # hidden linear layers
        for i in range(1, num_linear_layers):
            linear_layers.append(nn.Linear(linear_layer_size, linear_layer_size))
            linear_layers.append(nn.ReLU())
        # output linear layer
        linear_layers.append(nn.Linear(linear_layer_size, features_dim))

        self.linear = nn.Sequential(*linear_layers)

    def forward(self, observations: torch.Tensor) -> torch.Tensor:
        return self.linear(self.cnn(observations))
######## PUT YOUR CODE HERE ########

In [None]:
def third_policy(observation):
  ######## PUT YOUR CODE HERE ########

  ######## PUT YOUR CODE HERE ########
  return action

In [None]:
compute_score(task=third_task, policy=third_policy)