# MiniGrid Environment

The minigrid environments provide a number of "simple" environments available as both gridworlds and pixelated images. They enable us to avoid doing CNN feature visualisation analysis while still studying problems that might involve search like notions. 

Try out the environment by running the following command:


```bash
python -m minigrid.manual_control
```

Later we can benchmark against torch-rl 

In [290]:
import gymnasium as gym
from minigrid.wrappers import RGBImgPartialObsWrapper, ImgObsWrapper

env = gym.make('MiniGrid-Empty-8x8-v0')
env = RGBImgPartialObsWrapper(env) # Get pixel observations
env = ImgObsWrapper(env) # Get rid of the 'mission' field
obs, _ = env.reset() # This now produces an RGB tensor only
# obs

In [291]:
import torch as t 
import plotly.express as px
obs = t.tensor(obs)
obs.shape
px.imshow(obs)


distutils Version classes are deprecated. Use packaging.version instead.


distutils Version classes are deprecated. Use packaging.version instead.



In [292]:
env = gym.make('MiniGrid-Empty-8x8-v0')
env = RGBImgPartialObsWrapper(env) # Get pixel observations
env = ImgObsWrapper(env) # Get rid of the 'mission' field
obs, _ = env.reset() # This now produces an RGB tensor only

# take several actions, store the observations, actions, returns and timesteps
all_obs = []
all_actions = []
all_returns = []
all_timesteps = []


for i in range(10):
    action = env.action_space.sample()
    obs, reward, terminated, truncated, info = env.step(action)
    all_obs.append(obs)
    all_actions.append(action)
    all_returns.append(reward)
    all_timesteps.append(i)

# convert to tensors.unsqueeze(0)
all_obs = t.tensor(all_obs)
all_actions = t.tensor(all_actions).reshape(-1, 1)
all_returns = t.tensor(all_returns)
all_returns = t.randn((10, 1))
all_returns_to_go = all_returns.flip(0).cumsum(0).flip(0).reshape(-1, 1)
all_timesteps = t.tensor(all_timesteps).reshape(-1, 1)


Creating a tensor from a list of numpy.ndarrays is extremely slow. Please consider converting the list to a single numpy.ndarray with numpy.array() before converting to a tensor. (Triggered internally at /Users/runner/work/pytorch/pytorch/pytorch/torch/csrc/utils/tensor_new.cpp:233.)



# CNN 

In [294]:
# for the grid world environment we will a small CNN to extract features from the image
# we will use the same CNN as in the original paper

import torch as t
import torch.nn as nn
import torch.nn.functional as F
from einops import rearrange


class StateEncoder(nn.Module):
    def __init__(self, n_embed):
        super(StateEncoder, self).__init__()
        self.n_embed = n_embed
        # input has shape 56 x 56 x 3
        # output has shape 1 x 1 x 512
        self.conv1 = nn.Conv2d(3, 32, 8, stride=4, padding=0) # 56 -> 13
        self.conv2 = nn.Conv2d(32, 64, 4, stride=2, padding=0) # 13 -> 5
        self.conv3 = nn.Conv2d(64, 64, 3, stride=1, padding=0) # 5 -> 3
        self.flatten = nn.Flatten()
        self.fc = nn.Linear(576, n_embed)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        x = self.flatten(x)
        x = self.fc(x)
        x = F.relu(x)
        return x

# we will use the same CNN as in the original paper
cnn = StateEncoder(64).to("cpu")
x = obs.unsqueeze(0).to(t.float32)
x = rearrange(x, 'b h w c-> b c h w')
cnn(x)

tensor([[ 2.2918,  2.3031,  0.0000,  0.0000,  0.0000,  0.0000,  4.8692,  0.0000,
          0.0000,  0.0000,  1.3046,  3.2425,  0.0000,  2.4428,  0.0000,  5.8740,
          0.9391,  5.1203,  2.1983,  0.0000,  0.0000,  0.0000,  1.6969,  7.1444,
          0.0000,  1.0069,  1.9155,  0.0000,  1.5764,  2.6883,  0.0000,  5.2671,
          0.0000,  0.0000,  3.8132,  0.0000,  0.0000,  4.7259,  0.5788,  0.0000,
          0.8931,  0.0000,  0.0000,  3.6706,  0.0000,  0.7813,  3.8906,  0.0000,
          0.9702,  2.0933,  0.0000, 12.0165,  2.1961,  0.0000,  4.0366,  2.6068,
          7.6517,  0.0000,  0.7258,  5.9120,  0.0000,  0.0000,  0.0000,  0.0000]],
       grad_fn=<ReluBackward0>)

For reference: https://github.com/kzl/decision-transformer/blob/master/atari/mingpt/model_atari.py

In [296]:
import torch as t 
import gymnasium as gym
from minigrid.wrappers import RGBImgPartialObsWrapper, ImgObsWrapper

env = gym.make('MiniGrid-Empty-8x8-v0')
env = RGBImgPartialObsWrapper(env) # Get pixel observations
env = ImgObsWrapper(env) # Get rid of the 'mission' field
obs, _ = env.reset() # This now produces an RGB tensor only

# take several actions, store the observations, actions, returns and timesteps
all_obs = []
all_actions = []
all_returns = []
all_timesteps = []


for i in range(10):
    action = env.action_space.sample()
    obs, reward, terminated, truncated, info = env.step(action)
    all_obs.append(obs)
    all_actions.append(action)
    all_returns.append(reward)
    all_timesteps.append(i)

# convert to tensors.unsqueeze(0)
all_obs = t.tensor(all_obs).to(t.float32).unsqueeze(0)
all_actions = t.tensor(all_actions).reshape(-1, 1).unsqueeze(0)
all_returns = t.randn((10, 1))
all_returns_to_go = all_returns.flip(0).cumsum(0).flip(0).reshape(-1, 1).unsqueeze(0)
all_timesteps = t.tensor(all_timesteps).reshape(-1, 1).unsqueeze(0)


# Train a decision transformer on minigrid

I thought it might be easy to sample trajectories from a random agent on minigrid and train on these. 

The problem with this approach is that it's just massively too slow. We code that does parallelized environments and agents which sample from "solution trajectories" more often than random agents.

In [307]:
import numpy as np 
from typing import Union
ActType = Union[int, np.ndarray]

class Agent:
    '''Base class for agents in a multi-armed bandit environment (you do not need to add any implementation here)'''

    rng: np.random.Generator

    def __init__(self, num_arms: int, seed: int):
        self.num_arms = num_arms
        self.reset(seed)

    def get_action(self) -> ActType:
        raise NotImplementedError()

    def observe(self, action: ActType, reward: float, info: dict) -> None:
        pass

    def reset(self, seed: int) -> None:
        self.rng = np.random.default_rng(seed)

class RandomAgent(Agent):
    def __init__(self, env):
        self.env = env
    def get_action(self):
        return self.env.action_space.sample()

def run_episode(env: gym.Env, agent: Agent, seed: int):
    rewards = []
    actions = []
    states = []
    env.reset(seed=seed)
    agent.reset(seed=seed)
    done = False
    truncated = False
    while ((not done) and (not truncated)):
        arm = agent.get_action()
        actions.append(arm)
        (obs, reward, done, truncated, info) = env.step(arm)
        agent.observe(arm, reward, info)
        states.append(obs)
        rewards.append(reward)
    rewards = np.array(rewards, dtype=float)
    actions = np.array(actions, dtype=int)

    return rewards, np.array(states), actions

env = gym.make('MiniGrid-Empty-5x5-v0')
env = RGBImgPartialObsWrapper(env) # Get pixel observations
env = ImgObsWrapper(env) # Get rid of the 'mission' field
# add a truncation wrapper
agent = RandomAgent(env)

reward_trajs = []
states_trajs = []
actions_trajs = []
for event in range(100):
    reward_traj, states_traj, actions_traj = run_episode(env, agent, seed=i)
    reward_trajs.append(reward_traj)
    states_trajs.append(states_traj)
    actions_trajs.append(actions_traj)

# gym.vector.SyncVectorEnv(
#     env_fns=[lambda: gym.make('MiniGrid-Empty-5x5-v0') for _ in range(10)],
# )

reward_trajs = np.array(reward_trajs)
states_trajs = np.array(states_trajs)
actions_trajs = np.array(actions_trajs)



Creating an ndarray from ragged nested sequences (which is a list-or-tuple of lists-or-tuples-or ndarrays with different lengths or shapes) is deprecated. If you meant to do this, you must specify 'dtype=object' when creating the ndarray.


Creating an ndarray from ragged nested sequences (which is a list-or-tuple of lists-or-tuples-or ndarrays with different lengths or shapes) is deprecated. If you meant to do this, you must specify 'dtype=object' when creating the ndarray.


Creating an ndarray from ragged nested sequences (which is a list-or-tuple of lists-or-tuples-or ndarrays with different lengths or shapes) is deprecated. If you meant to do this, you must specify 'dtype=object' when creating the ndarray.



In [308]:
import plotly.express as px 
lengths = [len(traj) for traj in reward_trajs]
px.histogram(lengths)

In [311]:
# import decision transformer
from src.decision_transformer import DecisionTransformer

# let's try an example with a single trajectory
reward_traj = reward_trajs[0]
states_traj = states_trajs[0]
actions_traj = actions_trajs[0]

reward_traj

rtg = np.flip(reward_traj).cumsum(0)

decision_transformer = DecisionTransformer(env, max_game_length= 10*5)

logits, _ = decision_transformer(
    states = t.tensor(states_traj).to(t.float32).unsqueeze(0),
    actions = t.tensor(actions_traj).unsqueeze(0).unsqueeze(-1),
    rtgs = t.tensor(rtg).unsqueeze(0).unsqueeze(-1),
    timesteps = t.tensor(np.arange(len(reward_traj))).unsqueeze(0).unsqueeze(-1)
)

IndexError: index out of range in self

# See if I can load a replay buffer from D4RL

The original paper appears to use loaded trajectories from d4rl. We can look at these trajectories, their format and structure for reference when storing our own trajectories

In [312]:
import gymnasium as gym
import d4rl
import minigrid
from minigrid.wrappers import RGBImgPartialObsWrapper, ImgObsWrapper
from warnings import simplefilter
simplefilter(action='ignore', category=DeprecationWarning)
env = gym.make('maze2d-eval-medium-v1')
# _ = env.reset() # This now produces an RGB tensor only
env.get_dataset()

NameNotFound: Environment maze2d-eval-medium doesn't exist. 

After a huge amount of work this seems not good. I will need to train my own agent.

# Training on PPO and Storing Trajectories


### Trying Torch-RL

```bash
python3 -m scripts.train --algo ppo --env MiniGrid-DoorKey-5x5-v0 --model DoorKey --save-interval 10 --frames 80000
python3 -m scripts.visualize --env MiniGrid-DoorKey-5x5-v0 --model DoorKey
python3 -m scripts.evaluate --env MiniGrid-DoorKey-5x5-v0 --model DoorKey
```

Unfortunately, it doesn't appear super simple to use these models because we need to actually load all their classes and stuff. Let's use Callums.

## CartPole

In [1]:
from src.ppo.train import train_ppo
from src.ppo.utils import PPOArgs
from src.utils import TrajectoryWriter
import warnings 
with warnings.catch_warnings():
    warnings.filterwarnings("ignore", category= DeprecationWarning)

    args = PPOArgs(
        exp_name = 'CartPole-v1',
        env_id = 'CartPole-v1',
        num_envs = 10,
        track = False,
        wandb_project_name="PPO-MiniGrid-test with cartpole",
        capture_video=True,
        cuda = False,
        total_timesteps=100000,
        max_steps=None)

    trajectory_writer = TrajectoryWriter(args.trajectory_path, args)

    ppo = train_ppo(args, trajectory_writer)

  value = getattr(cls, name)
  value = getattr(cls, name)
  value = getattr(cls, name)
  value = getattr(cls, name)
  value = getattr(cls, name)
  value = getattr(cls, name)
  value = getattr(cls, name)
  value = getattr(cls, name)
  value = getattr(cls, name)
  value = getattr(cls, name)
  value = getattr(cls, name)
  value = getattr(cls, name)
  value = getattr(cls, name)
  value = getattr(cls, name)
  value = getattr(cls, name)
  value = getattr(cls, name)
  value = getattr(cls, name)
  value = getattr(cls, name)
  value = getattr(cls, name)
  value = getattr(cls, name)
  value = getattr(cls, name)
  value = getattr(cls, name)
  value = getattr(cls, name)
  value = getattr(cls, name)
  value = getattr(cls, name)
  value = getattr(cls, name)
  value = getattr(cls, name)
  value = getattr(cls, name)
  value = getattr(cls, name)
  value = getattr(cls, name)
  value = getattr(cls, name)
  value = getattr(cls, name)
  value = getattr(cls, name)
  value = getattr(cls, name)
  value = geta

Output(layout=Layout(padding='15px'))

100%|██████████| 78/78 [00:08<00:00,  8.83it/s]

Trajectory written to trajectories/CartPole-v1.pkl





## Minigrid Env

In [21]:
from src.ppo.train import train_ppo
from src.ppo.utils import PPOArgs
from src.utils import TrajectoryWriter
import warnings 

with warnings.catch_warnings():
    warnings.filterwarnings("ignore", category= DeprecationWarning)

    args = PPOArgs(
        exp_name = 'MiniGrid-DistShift1-v0',
        env_id = 'MiniGrid-DistShift1-v0',
        num_envs = 4,
        num_steps=128,
        track = True,
        wandb_project_name="PPO-MiniGrid",
        capture_video=True,
        cuda = False,
        total_timesteps=80*10000,
        max_steps=200)

    trajectory_writer = TrajectoryWriter(args.trajectory_path, args)

    ppo = train_ppo(args, trajectory_writer=trajectory_writer)

Output(layout=Layout(padding='15px'))

100%|██████████| 1562/1562 [05:11<00:00,  5.01it/s]


Trajectory written to trajectories/MiniGrid-DistShift1-v0.pkl


VBox(children=(Label(value='1.391 MB of 1.600 MB uploaded (0.000 MB deduped)\r'), FloatProgress(value=0.869298…

0,1
approx_kl,▁▁▁▁▁▁▁▁▁▂▁▁▁▁▁▁▁▁▂▁▂▁▁▂▁▁▁▁▁▁▁█▁▁▁▁▁▁▁▁
avg_value,▁▁▂▃▄▅▆▇▇▇▇▇▇███████▇████████▆█▇███████▇
clipfrac,▁▂▁▁▂▂▁▂▁▆▁▂▄▁▂▂▃▂▃▁▃▁▁▂▁▁▂▁▁▂▁█▂▁▂▁▁▁▁▁
clipped_surrogate_objective,▃▄▃▃▄▄▄▄▄▁▄▄▅▃▅▄▄▄▂▃▆▄▄▃▂▃▆▃▅▄▃█▄▄▅▃▄▃▃▃
entropy,██▇▇▅▆▅▄▃▃▃▃▂▂▂▂▂▂▁▁▁▁▁▁▁▁▁▁▁▁▁▂▁▁▁▁▁▁▁▁
episode_length,█▇█▅▃▄▃▃▃▃▃▃▃▂▃▃▃▂▂▂▂▂▂▂▂▂▁▃▂▂▂▂▂▂▂▂▂▂▂▂
episode_return,▇█▇███████████████████████▁█████████████
learning_rate,███▇▇▇▇▇▇▆▆▆▆▆▅▅▅▅▅▅▄▄▄▄▄▄▃▃▃▃▃▂▂▂▂▂▂▁▁▁
value_loss,▁▁▂█▅▅▆▂▂▅▆▂▁▁▅▁▁▁▁▁▁▁▁▁▁▁▁▁▁▃▁▂▁▁▁▁▁▁▁▁

0,1
approx_kl,0.0
avg_value,0.92495
clipfrac,0.0
clipped_surrogate_objective,0.0
entropy,0.00011
episode_length,13.0
episode_return,0.9766
learning_rate,0.0
value_loss,2e-05


# Turning the stored trajectories into a dataset

In [258]:
from torch.utils.data import Dataset, DataLoader
from src.utils import TrajectoryReader
import numpy as np
import torch as t 
from einops import rearrange
import random

# not technically a data loader, rework later to work as one.
class TrajectoryLoader():

    def __init__(self, trajectory_path, pct_traj=1.0, rtg_scale = 1, normalize_state = False, device = 'cpu'):
        self.trajectory_path = trajectory_path
        self.pct_traj = pct_traj
        self.load_trajectories()
        self.device = device
        self.normalize_state = normalize_state
        self.rtg_scale = rtg_scale

    def load_trajectories(self) -> None:
        
        traj_reader = TrajectoryReader(self.trajectory_path)
        data= traj_reader.read()

        print(data['metadata'])

        observations = data['data'].get('observations')
        actions = data['data'].get('actions')
        rewards = data['data'].get('rewards')
        dones = data['data'].get('dones')
        infos = data['data'].get('infos')

        observations = np.array(observations)
        actions = np.array(actions)
        rewards = np.array(rewards)
        dones = np.array(dones)
        infos = np.array(infos, dtype=np.ndarray)

        t_observations = rearrange(t.tensor(observations), "t b h w c -> (b t) h w c")
        t_actions = rearrange(t.tensor(actions), "t b -> (b t)")
        t_rewards = rearrange(t.tensor(rewards), "t b -> (b t)")
        t_dones = rearrange(t.tensor(dones), "t b -> (b t)")

        done_indices = t.where(t_dones)[0]

        self.actions = t.tensor_split(t_actions, done_indices+1)
        self.rewards = t.tensor_split(t_rewards, done_indices+1)
        self.dones = t.tensor_split(t_dones, done_indices+1)
        self.returns = [r.sum() for r in self.rewards]
        self.states = t.tensor_split(t_observations, done_indices)
        self.timesteps = [t.arange(len(i)) for i in self.states]
        self.traj_lens = np.array([len(i) for i in self.states])
        self.num_timesteps = sum(self.traj_lens)
        self.num_trajectories = len(self.states)

        self.state_dim = list(self.states[0][0].shape)
        self.act_dim = list(self.actions[0][0].shape)
        self.max_ep_len = max([len(i) for i in self.states])

    def get_indices_of_top_p_trajectories(self, pct_traj):
        num_timesteps = max(int(pct_traj*self.num_timesteps), 1)
        sorted_inds = np.argsort(self.returns) 

        num_trajectories = 1
        timesteps = self.traj_lens[sorted_inds[-1]]
        ind = num_trajectories - 2
        while timesteps < num_timesteps:
            timesteps += self.traj_lens[sorted_inds[ind]]
            ind -= 1
            num_trajectories += 1

        sorted_inds = sorted_inds[-num_trajectories:]

        return sorted_inds

    def get_sampling_probabilities(self):
        sorted_inds = self.get_indices_of_top_p_trajectories(self.pct_traj)
        p_sample = self.traj_lens[sorted_inds] / sum(self.traj_lens[sorted_inds])
        return p_sample

    def discount_cumsum(self, x, gamma):
        discount_cumsum = np.zeros_like(x)
        discount_cumsum[-1] = x[-1]
        for t in reversed(range(x.shape[0]-1)):
            discount_cumsum[t] = x[t] + gamma * discount_cumsum[t+1]
        return discount_cumsum

    def get_state_mean_std(self):
        # used for input normalization
        all_states = np.concatenate(self.states, axis=0)
        state_mean, state_std = np.mean(all_states, axis=0), np.std(all_states, axis=0) + 1e-6
        return state_mean, state_std

    def get_batch(self, batch_size=256, max_len=100):

        rewards = self.rewards
        states = self.states
        actions = self.actions
        dones = self.dones

        # asset first dim is same for all inputs
        assert len(rewards) == len(states) == len(actions) == len(dones), f"shapes are not the same: {len(rewards)} {len(states)} {len(actions)} {len(dones)}"
        num_trajectories = self.num_trajectories
        p_sample = self.get_sampling_probabilities()
        sorted_inds = self.get_indices_of_top_p_trajectories(self.pct_traj)
        state_mean, state_std = self.get_state_mean_std()

        batch_inds = np.random.choice(
            np.arange(num_trajectories),
            size=batch_size,
            replace=True,
            p=p_sample,  # reweights so we sample according to timesteps
        )

        # initialize lists
        s, a, r, d, rtg, timesteps, mask = [], [], [], [], [], [], []
        for i in range(batch_size):

            # get the trajectory
            traj_rewards = rewards[sorted_inds[batch_inds[i]]]
            traj_states = states[sorted_inds[batch_inds[i]]]
            traj_actions = actions[sorted_inds[batch_inds[i]]]
            traj_dones = dones[sorted_inds[batch_inds[i]]]

            # start index
            si = random.randint(0, traj_rewards.shape[0] - 1)

            # get sequences from dataset
            s.append(traj_states[si:si + max_len].reshape(1, -1, *self.state_dim))
            a.append(traj_actions[si:si + max_len].reshape(1, -1, *self.act_dim))
            r.append(traj_rewards[si:si + max_len].reshape(1, -1, 1))
            d.append(traj_dones[si:si + max_len].reshape(1, -1))
            
            # get timesteps
            timesteps.append(np.arange(si, si + s[-1].shape[1]).reshape(1, -1))
            timesteps[-1][timesteps[-1] >= self.max_ep_len] = self.max_ep_len-1  # padding cutoff

            # get rewards to go
            rtg.append(self.discount_cumsum(traj_rewards[si:], gamma=1.)[:s[-1].shape[1] + 1].reshape(1, -1, 1))

            # if the trajectory is shorter than max_len, pad it
            if rtg[-1].shape[1] <= s[-1].shape[1]:
                rtg[-1] = np.concatenate([rtg[-1], np.zeros((1, 1, 1))], axis=1)

            # padding and state + reward normalization
            tlen = s[-1].shape[1]
            s[-1] = np.concatenate([np.zeros((1, max_len - tlen, *self.state_dim)), s[-1]], axis=1)


            if self.normalize_state:
                s[-1] = (s[-1] - state_mean) / state_std

            a[-1] = np.concatenate([np.ones((1, max_len - tlen, *self.act_dim)) * -10., a[-1]], axis=1)
            r[-1] = np.concatenate([np.zeros((1, max_len - tlen, 1)), r[-1]], axis=1)
            d[-1] = np.concatenate([np.ones((1, max_len - tlen)) * 2, d[-1]], axis=1)
            rtg[-1] = np.concatenate([np.zeros((1, max_len - tlen, 1)), rtg[-1]], axis=1) / self.rtg_scale
            timesteps[-1] = np.concatenate([np.zeros((1, max_len - tlen)), timesteps[-1]], axis=1)
            mask.append(np.concatenate([np.zeros((1, max_len - tlen)), np.ones((1, tlen))], axis=1))

        s = t.from_numpy(np.concatenate(s, axis=0)).to(dtype=t.float32, device=self.device)
        a = t.from_numpy(np.concatenate(a, axis=0)).to(dtype=t.float32, device=self.device)
        r = t.from_numpy(np.concatenate(r, axis=0)).to(dtype=t.float32, device=self.device)
        d = t.from_numpy(np.concatenate(d, axis=0)).to(dtype=t.long,    device=self.device)
        rtg = t.from_numpy(np.concatenate(rtg, axis=0)).to(dtype=t.float32, device= self.device)
        timesteps = t.from_numpy(np.concatenate(timesteps, axis=0)).to(dtype=t.long, device=self.device)
        mask = t.from_numpy(np.concatenate(mask, axis=0)).to(device=self.device)

        return s, a, r, d, rtg, timesteps, mask


path = "/Users/josephbloom/GithubRepositories/DecisionTransformerInterpretability/trajectories/MiniGrid-DistShift1-v0.pkl"
trajectory_data_set = TrajectoryLoader(path, pct_traj=1.0, device="cpu")

{'args': {'exp_name': 'MiniGrid-DistShift1-v0', 'seed': 1, 'cuda': False, 'track': True, 'wandb_project_name': 'PPO-MiniGrid', 'wandb_entity': None, 'capture_video': True, 'env_id': 'MiniGrid-DistShift1-v0', 'total_timesteps': 800000, 'learning_rate': 0.00025, 'num_envs': 4, 'num_steps': 128, 'gamma': 0.99, 'gae_lambda': 0.95, 'num_minibatches': 4, 'update_epochs': 4, 'clip_coef': 0.2, 'ent_coef': 0.01, 'vf_coef': 0.5, 'max_grad_norm': 0.5, 'max_steps': 500, 'trajectory_path': 'trajectories/MiniGrid-DistShift1-v0.pkl'}, 'time': 1671723079.665403}


In [259]:
state_mean, state_std = trajectory_data_set.get_state_mean_std()
print(state_mean.max())
print(state_mean.min())
print(state_std.max())
print(state_std.min())

9.0
0.0
3.8235957623816814
1e-06


In [260]:
s, a, r, d, rtg, timesteps, mask = trajectory_data_set.get_batch()

## Visualizing a Trajector

In [270]:
import gymnasium as gym
import plotly.express as px
from src.visualization import render_minigrid_observations, render_minigrid_observation

from minigrid.core.constants import IDX_TO_OBJECT
import numpy as np
import torch

def find_agent(observation):
    height = observation.shape[0]
    width = observation.shape[1]
    for i in range(width):
        for j in range(height):
            object = IDX_TO_OBJECT[int(observation[j,i][0])]
            if object == 'agent':
                return j, i
    
    raise Exception("Agent not found")


def render_minigrid_observation(env, observation):
    if isinstance(observation, np.ndarray):
        observation = observation.copy() # so we don't edit the original object
    elif isinstance(observation, torch.Tensor):
        observation = observation.numpy().copy()

    agent_pos = find_agent(observation)
    agent_dir = observation[agent_pos[0], agent_pos[1]][2]

    observation[agent_pos[0], agent_pos[1]] = [0,0,0]

    grid, _ = env.grid.decode(observation.astype(np.uint8))
    
    i = agent_pos[0]
    j = agent_pos[1]
    
    return grid.render(32, (i,j), agent_dir=agent_dir)

def render_minigrid_observations(env, observations):
    return np.array([render_minigrid_observation(env, observation) for observation in observations])


env = gym.make(data['metadata']['args']['env_id'], render_mode = 'rgb_array')
print(data['metadata']['args']['env_id'])
_, _ = env.reset()

from minigrid.core.actions import Actions

event = 200
print([Actions(int(i)) for i in a[event][mask[event].to(t.bool)]])
print(r[event][mask[event].to(t.bool)])
# print(rtg[event][t.tensor([0])+mask[event].to(t.bool)])
imgs = render_minigrid_observations(env, s[event][mask[event].to(t.bool)])
fig = px.imshow(imgs, animation_frame=0)
fig.show()

MiniGrid-DistShift1-v0
[<Actions.forward: 2>, <Actions.forward: 2>, <Actions.left: 0>, <Actions.forward: 2>, <Actions.forward: 2>]
tensor([[0.0000],
        [0.0000],
        [0.0000],
        [0.0000],
        [0.9766]])



distutils Version classes are deprecated. Use packaging.version instead.


distutils Version classes are deprecated. Use packaging.version instead.



We can see that the actions/states/rewards are now indexed so that action[0] is taken after state[0] and generates reward[0]. ie: SAR, SAR, SAR

In [284]:
a.shape

torch.Size([256, 100])

In [285]:
rtg.shape

torch.Size([256, 101, 1])

In [277]:
px.line(rtg[event])


distutils Version classes are deprecated. Use packaging.version instead.


distutils Version classes are deprecated. Use packaging.version instead.

