In [1]:
import minigrid
from minigrid.wrappers import ImgObsWrapper
from stable_baselines3 import PPO
import torch.nn as nn

import torch
import gymnasium as gym
from stable_baselines3.common.torch_layers import BaseFeaturesExtractor
from typing import Dict
import multigrid


pygame 2.5.1 (SDL 2.28.2, Python 3.11.0)
Hello from the pygame community. https://www.pygame.org/contribute.html


In [2]:
def get_obss_preprocessor(obs_space):
    # Check if obs_space is an image space
    if isinstance(obs_space, gym.spaces.Box):
        obs_space = {"image": obs_space.shape}

        def preprocess_obss(obss, device=None):
            return torch_ac.DictList({
                "image": preprocess_images(obss, device=device)
            })

    # Check if it is a MiniGrid observation space
    elif isinstance(obs_space, gym.spaces.Dict) and "image" in obs_space.spaces.keys():
        obs_space = {"image": obs_space.spaces["image"].shape, "text": 100}

        vocab = Vocabulary(obs_space["text"])

        def preprocess_obss(obss, device=None):
            return torch_ac.DictList({
                "image": preprocess_images([obs["image"] for obs in obss], device=device),
                "text": preprocess_texts([obs["mission"] for obs in obss], vocab, device=device)
            })

        preprocess_obss.vocab = vocab

    else:
        raise ValueError("Unknown observation space: " + str(obs_space))

    return obs_space, preprocess_obss


def preprocess_images(images, device=None):
    # Bug of Pytorch: very slow if not first converted to numpy array
    images = numpy.array(images)
    return torch.tensor(images, device=device, dtype=torch.float)

In [3]:
class MinigridFeaturesExtractor(BaseFeaturesExtractor):
    def __init__(self, observation_space: gym.Space, features_dim: int = 64, normalized_image: bool = False) -> None:
        super().__init__(observation_space, features_dim)
        self.cnn = nn.Sequential(
            nn.Conv2d(3, 16, (2, 2)),
            nn.ReLU(),
            nn.Conv2d(16, 32, (2, 2)),
            nn.ReLU(),
            nn.Conv2d(32, 64, (2, 2)),
            nn.ReLU(),
            nn.Flatten(),
        )

        print(observation_space)
        
        n = observation_space.shape[0]
        m = observation_space.shape[1]
        print(n,m)
        self.image_embedding_size = ((n-1)//2-2)*((m-1)//2-2)*64


        # Compute shape by doing one forward pass
        tens = torch.as_tensor(observation_space.sample()[None]).to(torch.uint8).float().permute(0,3,1,2)
        with torch.no_grad():
            n_flatten = self.cnn(tens).shape[1]
        print(features_dim, self.image_embedding_size)
        lin = nn.Linear(n_flatten, features_dim)
        self.linear = nn.Sequential(lin, nn.ReLU())

    def forward(self, observations: torch.Tensor) -> torch.Tensor:
        
        print(f"from_forward{observations.shape}")
        observations = torch.Tensor(observations)
        return self.linear(self.cnn(observations))

In [4]:
test_cnn = nn.Sequential(
            nn.Conv2d(3, 16, (2, 2)),
            nn.ReLU(),
            nn.Conv2d(16, 32, (2, 2)),
            nn.ReLU(),
            nn.Conv2d(32, 64, (2, 2)),
            nn.ReLU(),
            nn.Flatten(),
        )

In [4]:
env = gym.make("MiniGrid-Empty-6x6-v0", render_mode="rgb_array")
env = ImgObsWrapper(env)
obs = env.reset()
obs = obs[0]

In [6]:
from multigrid.envs import EmptyEnv
from multigrid.wrappers import MultiAgentImgObsWrapper

env2 = EmptyEnv(render_mode="rgb_array", agents=2)
env2 = MultiAgentImgObsWrapper(env2)

In [7]:


tens = torch.as_tensor(env.observation_space.sample()[None]).to(torch.uint8).float().permute(0,3,1,2)


# tens2 = torch.as_tensor(env2.observation_space[0].sample()[None]).to(torch.uint8).float().permute(0,3,1,2)




In [8]:
tens

tensor([[[[107., 170., 117., 146.,  40., 116., 166.],
          [ 54., 163., 225.,  73.,  52.,   9.,  64.],
          [188.,   3.,  21., 248.,  68., 166., 120.],
          [178., 152.,  85.,  33.,  47., 134., 125.],
          [ 39., 151.,  33.,   3.,  94., 187., 249.],
          [143., 177.,  21., 173.,  95.,  33., 205.],
          [ 33.,  27., 234.,  18., 148.,  40.,   4.]],

         [[ 30., 160.,  59.,  11.,  21., 212.,  65.],
          [253., 129.,  61., 187.,  14., 210.,  84.],
          [169.,  52.,   0.,  49., 252.,   9.,   4.],
          [228.,  84., 103., 251., 203., 253., 165.],
          [159.,  50.,  43., 225., 250.,  50., 127.],
          [ 99., 110.,  46.,  58., 219., 175., 152.],
          [ 78., 199.,  15., 129., 103., 214.,  90.]],

         [[ 11., 114.,  35.,  80.,  25., 171.,  37.],
          [ 12.,  72., 177., 207.,  94., 145., 175.],
          [199., 197., 186.,  42., 191., 213., 105.],
          [166.,  77., 208.,  38.,  97.,  43., 216.],
          [ 78., 129., 1

In [9]:
tens2

tensor([[[[229., 207., 107.,  20., 112., 114.,  58.],
          [236., 221., 208., 161., 209., 139.,  22.],
          [ 91., 109., 214., 255., 226., 169., 160.],
          [121., 147., 150., 214.,   3., 254., 162.],
          [ 27.,  76.,  81., 112., 164., 227.,  23.],
          [116.,  10.,  30., 241., 201., 195., 153.],
          [ 82.,  19.,  35., 228.,  80., 207., 135.]],

         [[ 29., 130.,  92., 241., 146., 221.,  81.],
          [ 93., 201., 159., 229., 214.,  77., 116.],
          [ 61.,  42.,  12.,  34., 249.,  87., 147.],
          [207., 238., 200., 180., 169., 212.,  45.],
          [ 64., 238., 137., 114., 203.,  61., 203.],
          [ 68.,  84.,  16.,  36., 162.,  70.,  54.],
          [ 79., 163., 161.,   5.,  29., 189., 180.]],

         [[187., 218., 115.,  66., 153.,   6.,  83.],
          [ 93.,  42., 212., 147., 140., 168.,  94.],
          [152., 147.,  91., 105., 209., 127., 116.],
          [103.,  35., 157.,  50., 198., 104., 240.],
          [ 98.,  26., 2

In [17]:
test_cnn(tens2)

tensor([[0.0000, 0.0000, 0.0000,  ..., 5.5412, 0.0000, 0.0000]],
       grad_fn=<ReshapeAliasBackward0>)

In [18]:
tens2

tensor([[[[ 72.,  68.,  48., 215., 167.,  59., 180.],
          [168., 183., 216., 245., 227.,  24., 229.],
          [213.,  72., 175., 236.,  96., 140., 174.],
          [ 40., 212., 101.,  10.,  50., 154., 103.],
          [189.,  97.,  31., 155.,  40., 194.,  88.],
          [100., 203., 187., 247., 199.,  16.,  19.],
          [153.,  27.,  93.,   1.,   7., 184., 135.]],

         [[253., 249.,  36., 190., 205.,  88., 162.],
          [ 12., 215., 169.,  15., 221., 236.,  39.],
          [202., 124., 230., 211.,  41., 156., 166.],
          [166., 217., 144., 212., 224.,  99., 200.],
          [107., 108., 208.,  13., 153.,  34., 220.],
          [ 73., 230., 180., 224., 111., 186., 196.],
          [138.,   5.,  70.,  82., 213., 135., 214.]],

         [[157.,  61., 181.,  23., 107., 173., 143.],
          [233., 186., 243., 136.,  53., 148., 146.],
          [213., 119., 144., 184., 124., 202.,  29.],
          [161., 169., 179., 112., 115., 162., 198.],
          [ 25., 105., 1

In [7]:
from multigrid.envs import EmptyEnv
from multigrid.wrappers import MultiAgentImgObsWrapper

env2 = EmptyEnv(render_mode="rgb_array", agents=2)
env2 = MultiAgentImgObsWrapper(env2)

obs2 = env2.reset()



: 

In [None]:
feature_extractor  = MinigridFeaturesExtractor(env2.observation_space[0], features_dim=64)


NameError: name 'env2' is not defined

In [None]:
class MinigridFeaturesExtractor(BaseFeaturesExtractor):
    def __init__(self, observation_space: gym.Space, features_dim: int = 64, normalized_image: bool = False) -> None:
        super().__init__(observation_space, features_dim)
        self.cnn = nn.Sequential(
            nn.Conv2d(3, 16, (2, 2)),
            nn.ReLU(),
            nn.Conv2d(16, 32, (2, 2)),
            nn.ReLU(),
            nn.Conv2d(32, 64, (2, 2)),
            nn.ReLU(),
            nn.Flatten(),
        )

        print(observation_space)
        
        n = observation_space.shape[0]
        m = observation_space.shape[1]
        print(n,m)
        self.image_embedding_size = ((n-1)//2-2)*((m-1)//2-2)*64


        # Compute shape by doing one forward pass
        tens = torch.as_tensor(observation_space.sample()[None]).to(torch.uint8).float().permute(0,3,1,2)
        with torch.no_grad():
            n_flatten = self.cnn(tens).shape[1]
        print(features_dim, self.image_embedding_size)
        lin = nn.Linear(n_flatten, features_dim)
        self.linear = nn.Sequential(lin, nn.ReLU())

    def forward(self, observations: torch.Tensor) -> torch.Tensor:
        
        print(f"from_forward{observations.shape}")
        observations = torch.Tensor(observations)
        return self.linear(self.cnn(observations))

In [None]:
class MinigridFeaturesExtractor(BaseFeaturesExtractor):
    def __init__(self, observation_space: gym.Space, features_dim: int = 64, normalized_image: bool = False) -> None:
        super().__init__(observation_space, features_dim)
        self.cnn = nn.Sequential(
            nn.Conv2d(3, 16, (2, 2)),
            nn.ReLU(),
            nn.Conv2d(16, 32, (2, 2)),
            nn.ReLU(),
            nn.Conv2d(32, 64, (2, 2)),
            nn.ReLU(),
            nn.Flatten(),
        )

        print(observation_space)
        
        n = observation_space.shape[0]
        m = observation_space.shape[1]
        print(n,m)
        self.image_embedding_size = ((n-1)//2-2)*((m-1)//2-2)*64


        # Compute shape by doing one forward pass
        tens = torch.as_tensor(observation_space.sample()[None]).to(torch.uint8).float().permute(0,3,1,2)
        with torch.no_grad():
            n_flatten = self.cnn(tens).shape[1]
        print(features_dim, self.image_embedding_size)
        lin = nn.Linear(n_flatten, features_dim)
        self.linear = nn.Sequential(lin, nn.ReLU())

    def forward(self, observations: torch.Tensor) -> torch.Tensor:
        
        print(f"from_forward{observations.shape}")
        observations = torch.Tensor(observations)
        return self.linear(self.cnn(observations))

In [7]:
env2.observation_space[0]

Box(0, 255, (7, 7, 3), int64)

In [4]:
env = gym.make("MiniGrid-Empty-6x6-v0", render_mode="rgb_array")
env = ImgObsWrapper(env)
obs = env.reset()
obs = obs[0]

feature_extractor  = MinigridFeaturesExtractor(env.observation_space, features_dim=128)

while not done:
    action, _states = model.predict(obs)
    obs, rewards, done, info, _ = env.step(action)
    obs_copy = obs
    tensor_obs = torch.tensor(obs).float().permute(2,0,1).unsqueeze(0)
    features = feature_extractor(tensor_obs)
    print("done")


Box(0, 255, (7, 7, 3), uint8)
7 7
128 64


NameError: name 'done' is not defined

In [79]:
tensor_obs.shape

torch.Size([1, 3, 7, 7])

In [96]:
obs = env.reset()
obs[0].shape

(7, 7, 3)