In [1]:
import gym
from gym_minigrid.wrappers import ImgObsWrapper
from mini_behavior.utils.wrappers import MiniBHFullyObsWrapper
from mini_behavior.register import register
import mini_behavior
from stable_baselines3 import PPO
import numpy as np
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.torch_layers import BaseFeaturesExtractor
from stable_baselines3.common.callbacks import StopTrainingOnMaxEpisodes
import torch.nn as nn
import torch
import argparse

TASK = 'PlayAlligator'
PARTIAL_OBS = True
ROOM_SIZE = 10
MAX_STEPS = 1000
TOTAL_TIMESTEPS = 5e6
DENSE_REWARD = False
POLICY_TYPE = 'CnnPolicy'

In [2]:
class MinigridFeaturesExtractor(BaseFeaturesExtractor):
    def __init__(self, observation_space: gym.Space, features_dim: int = 512, normalized_image: bool = False) -> None:
        super().__init__(observation_space, features_dim)
        n_input_channels = observation_space.shape[0]
        self.cnn = nn.Sequential(
            nn.Conv2d(n_input_channels, 32, (2, 2)),
            nn.ReLU(),
            nn.Conv2d(32, 32, (2, 2)),
            nn.ReLU(),
            nn.Conv2d(32, 64, (2, 2)),
            nn.ReLU(),
            nn.Flatten(),
        )

        # Compute shape by doing one forward pass
        with torch.no_grad():
            n_flatten = self.cnn(torch.as_tensor(observation_space.sample()[None]).float()).shape[1]

        self.linear = nn.Sequential(nn.Linear(n_flatten, features_dim), nn.ReLU())

    def forward(self, observations: torch.Tensor) -> torch.Tensor:
        return self.linear(self.cnn(observations))

In [3]:
policy_kwargs = dict(
    features_extractor_class=MinigridFeaturesExtractor,
    features_extractor_kwargs=dict(features_dim=128),
)

# Env wrapping
env_name = f"MiniGrid-{TASK}-{ROOM_SIZE}x{ROOM_SIZE}-N2-v0"

print(f'register env {TASK}')

kwargs = {"room_size": ROOM_SIZE, "max_steps": MAX_STEPS}

if DENSE_REWARD:
    assert TASK in ["PuttingAwayDishesAfterCleaning", "WashingPotsAndPans"]
    kwargs["dense_reward"] = True

register(
    id=env_name,
    entry_point=f'mini_behavior.envs:{TASK}Env',
    kwargs=kwargs
)


config = {
    "policy_type": POLICY_TYPE,
    "total_timesteps": TOTAL_TIMESTEPS,
    "env_name": env_name,
}

env = gym.make(env_name)
if not PARTIAL_OBS:
    env = MiniBHFullyObsWrapper(env)
env = ImgObsWrapper(env)


register env PlayAlligator
atsamelocation
infovofrobot
True
inhandofrobot
False
inreachofrobot
False
insameroomasrobot
True
inside
nextto
onfloor
True
onTop
under
atsamelocation
infovofrobot


TypeError: in_view() argument after * must be an iterable, not NoneType

In [4]:
print('begin training')
# Policy training
model = PPO(config["policy_type"], env, n_steps=8000, policy_kwargs=policy_kwargs, verbose=1)
model.learn(config["total_timesteps"], callback=StopTrainingOnMaxEpisodes(max_episodes=3000))




begin training
Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.


In [5]:

if not PARTIAL_OBS:
    model.save(f"models/ppo_cnn/{env_name}")
else:
    model.save(f"models/ppo_cnn_partial/{env_name}")

In [7]:
test = {'mallet': ['m1', 'm2'], 'toy': ['t1']}
test2 = {'mallet': ['m1', 'm3'], 'toy': ['t1']}
def objdiff(objs1, objs2):
    for obj1, obj2 in zip(objs1.values(), objs2.values()):
        print(obj1)
        print(obj2)
objdiff(test, test2)

['m1', 'm2']
['m1', 'm3']
['t1']
['t1']
