In [1]:
import gymnasium as gym
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import numpy as np
import random
import minigrid

In [55]:
env = gym.make("MiniGrid-Empty-5x5-v0", max_episode_steps=50)
env = minigrid.wrappers.FullyObsWrapper(env)
env = minigrid.wrappers.ImgObsWrapper(env)
env = gym.wrappers.NormalizeObservation(env)

obs, _ = env.reset()

In [61]:
action = env.action_space.sample()
next_obs, reward, terminated, truncated, info = env.step(action)
reward

0

In [3]:
# Autoencoder Model
class TransitionAutoencoder(nn.Module):
    def __init__(self, obs_shape, latent_dim):
        super().__init__()
        h, w, c = obs_shape
        self.encoder = nn.Sequential(
            nn.Conv2d(c*2, 32, kernel_size=2, stride=1, padding=0),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=2, stride=1, padding=0),
            nn.ReLU(),
            nn.Flatten(),
            nn.Linear((h - 2) * (w - 2) * 64, latent_dim),
            nn.ReLU()
        )
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, (h - 2) * (w - 2) * 64),
            nn.ReLU(),
            nn.Unflatten(1, (64, h - 2, w - 2)),
            nn.ConvTranspose2d(64, 32, kernel_size=2, stride=1, padding=0, output_padding=0),
            nn.ReLU(),
            nn.ConvTranspose2d(32, 2*c, kernel_size=2, stride=1, padding=0),
            nn.Sigmoid()  # Output normalized to [0, 1]
        )

    def forward(self, s_t, s_t1):
        x = torch.cat((s_t, s_t1), dim=3)  # Concatenate along the channel axis
        x = x.permute(0, 3, 1, 2)
        z = self.encoder(x)
        reconstructed = self.decoder(z)
        reconstructed = reconstructed.permute(0,3,2,1)
        reconstructed_s_t, reconstructed_s_t1 = reconstructed.split(int(reconstructed.shape[3] // 2), dim=3)
        return reconstructed_s_t, reconstructed_s_t1


In [4]:
# Environment and Training
env = gym.make("MiniGrid-BlockedUnlockPickup-v0")  # Example environment
env = minigrid.wrappers.ImgObsWrapper(env)
obs_shape = env.observation_space.shape # (7, 7, 3)
latent_dim = 64
model = TransitionAutoencoder(obs_shape, latent_dim)
optimizer = optim.Adam(model.parameters(), lr=1e-3)
loss_fn = nn.MSELoss()

replay_buffer = deque(maxlen=10000)
env.observation_space.shape

(7, 7, 3)

In [5]:
import lib.human_player as human_player
from hydra import initialize, compose
from omegaconf import OmegaConf

CONFIG = {
    'domain': 'MiniGrid'
}

# Convert the dictionary into an OmegaConf object
cfg = OmegaConf.create(CONFIG)

agent = human_player.Agent(name=f'Human', action_space=env.action_space, cfg=cfg)


Discrete(7)


In [6]:
# Collect data with random actions
for episode in range(10):  # Adjust number of episodes as needed
    obs, _ = env.reset()
    for _ in range(100):  # Adjust steps per episode
        action = env.action_space.sample()
        next_obs, _, terminate, truncate, _ = env.step(action)
        if terminate or truncate:
            break
        
        replay_buffer.append((obs, next_obs))
        obs = next_obs
        

In [7]:
batch = random.sample(replay_buffer, 16)
s_t, s_t1 = zip(*batch)

In [8]:
# Training Loop
for epoch in range(100):  # Adjust number of epochs
    batch = random.sample(replay_buffer, 32)
    s_t, s_t1 = zip(*batch)
    
    s_t = torch.tensor(np.stack(s_t), dtype=torch.float32) / 255.0
    s_t1 = torch.tensor(np.stack(s_t1), dtype=torch.float32) / 255.0

    optimizer.zero_grad()
    reconstructed_s_t, reconstructed_s_t1 = model(s_t, s_t1)
    loss_s_t = loss_fn(reconstructed_s_t, s_t)
    loss_s_t1 = loss_fn(reconstructed_s_t1, s_t1)
    loss = loss_s_t + loss_s_t
    loss.backward()
    optimizer.step()

    if epoch % 10 == 0:
        print(f"Epoch {epoch}, Loss: {loss.item()}")
print(f"Epoch {epoch}, Loss: {loss.item()}")

Epoch 0, Loss: 0.5201374292373657
Epoch 10, Loss: 0.3469119369983673
Epoch 20, Loss: 0.026081355288624763
Epoch 30, Loss: 0.0011141853174194694
Epoch 40, Loss: 7.29621751816012e-05
Epoch 50, Loss: 6.785574805689976e-05
Epoch 60, Loss: 5.8198169426759705e-05
Epoch 70, Loss: 6.160450720926747e-05
Epoch 80, Loss: 6.420983845600858e-05
Epoch 90, Loss: 6.322442641248927e-05
Epoch 99, Loss: 6.585419760085642e-05


In [None]:
import time

eval_env = gym.make("MiniGrid-BlockedUnlockPickup-v0", render_mode='rgb_array')  # Example environment
eval_env = minigrid.wrappers.ImgObsWrapper(eval_env)

# Collect data with random actions
for episode in range(1):  # Adjust number of episodes as needed
    obs, _ = eval_env.reset()
    for _ in range(100):  # Adjust steps per episode
        action = agent.get_action()
        # action = eval_env.action_space.sample()
        next_obs, _, terminate, truncate, _ = env.step(action)
        frame = env.render()
        time.sleep(0.05)
        if terminate or truncate:
            break
                
        s_t = torch.tensor(np.stack(s_t), dtype=torch.float32) / 255.0
        s_t1 = torch.tensor(np.stack(s_t1), dtype=torch.float32) / 255.0
        
        with torch.no_grad():
            reconstructed_s_t, reconstructed_s_t1 = model(s_t, s_t1)
            loss_s_t = loss_fn(reconstructed_s_t, s_t)
            loss_s_t1 = loss_fn(reconstructed_s_t1, s_t1)
            loss = loss_s_t + loss_s_t
        print(loss.detach().cpu().numpy())

        obs = next_obs