In [None]:
import torch
import torch.nn as nn
import random
import os
import numpy as np

def set_seed(seed: int) -> None:
    """Set the seed for python, numpy, and torch.

    Args:
        seed (int): seed to set.
    """
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)  # type: ignore
    # Module has no attribute "manual_seed_all"  [attr-defined]
    os.environ["PYTHONHASHSEED"] = str(seed)


LOAD_PATH = './ckpt/'
set_seed(2)
file_names = ['task_encoder_0.95.pt', 'vae_0.95.pt', 'actor_0.95.pt']


# LOAD_PATH = './02/'
# set_seed(3)
# file_names = ['task_encoder_1.0.pt', 'vae_1.0.pt', 'actor_1.0.pt']

# LOAD_PATH = './03/'
# set_seed(3)
# file_names = ['task_encoder_0.99.pt', 'vae_0.99.pt', 'actor_0.99.pt']


class Actor(nn.Module):
    def __init__(self) -> None:
        super().__init__()
        self.model = nn.Sequential(
            nn.Linear(22, 300),
            nn.LayerNorm(300),
            nn.Tanh(),
            nn.Linear(300, 300),
            nn.LeakyReLU(),
            nn.Linear(300, 300),
            nn.LeakyReLU(),
            nn.Linear(300, 300),
            nn.LeakyReLU(),
            nn.Linear(300, 8),
        )
    def forward(self, input):
        return self.model(input)

class VAE_Inference(nn.Module):
    def __init__(self) -> None:
        super().__init__()
        self.trunk = nn.Sequential(
            nn.Linear(in_features=780, out_features=400),
            nn.ReLU(),
            nn.Linear(in_features=400, out_features=400),
            nn.ReLU(),
            nn.Linear(in_features=400, out_features=400),
            nn.ReLU(),
            )
        self.mu_latent = nn.Linear(in_features=400, out_features=10)
        self.log_var_latent = nn.Linear(in_features=400, out_features=10)
        self.decoder_context = nn.Sequential(
            nn.Linear(in_features=10, out_features=400),
            nn.ReLU(),
            nn.Linear(in_features=400, out_features=400),
            nn.ReLU(),
            nn.Linear(in_features=400, out_features=400),
            nn.ReLU(),
            nn.Linear(in_features=400, out_features=768),
        )
        self.decoder_state = nn.Sequential(
            nn.Linear(in_features=26, out_features=400),
            nn.ReLU(),
            nn.Linear(in_features=400, out_features=400),
            nn.ReLU(),
            nn.Linear(in_features=400, out_features=400),
            nn.ReLU(),
            nn.Linear(in_features=400, out_features=12),
        )

    def forward(self, input):
        mu = self.mu_latent(self.trunk(input))
        log_var = self.log_var_latent(self.trunk(input))
        z = self.sample(mu, log_var)
        return z, mu, log_var
        
    
    def sample(self, mu, log_var):
        """
        Reparameterization trick to sample from N(mu, var) from
        N(0,1).
        :param mu: (Tensor) Mean of the latent Gaussian [B x D]
        :param logvar: (Tensor) log variance of the latent Gaussian [B x D]
        :return: z (Tensor) [B x D]
        """
        std = torch.exp(0.5 * log_var)
        eps = torch.randn_like(std)
        return eps * std + mu

class Task_Encoder(nn.Module):
    def __init__(self) -> None:
        super().__init__()
        self.pretrained_embedding = nn.Embedding(10, 768)
    
    def forward(self, task_idx):
        return self.pretrained_embedding(task_idx)

class LEGION(nn.Module):
    '''
    Legion for final evaluation
    '''
    def __init__(self) -> None:
        super().__init__()

        self.pretrain_language_module = Task_Encoder()
        self.pretrain_language_module.load_state_dict(torch.load(LOAD_PATH+file_names[0]))

        self.task_inference_module = VAE_Inference()
        self.task_inference_module.load_state_dict(torch.load(LOAD_PATH+file_names[1]))

        self.policy_module = Actor()
        self.policy_module.load_state_dict(torch.load(LOAD_PATH+file_names[2]))

        self.task_inference_module = self.task_inference_module.requires_grad_(False)
        self.pretrain_language_module = self.pretrain_language_module.requires_grad_(False)
        self.policy_module = self.policy_module.requires_grad_(False)
    
    def forward(self, mtobs:dict, return_z:bool=False):
        env_obs = mtobs['env_obs']
        task_idx = mtobs['task_obs']
        language_encoding = self.pretrain_language_module(task_idx)
        # language_encoding = self.pretrain_language_module(torch.tensor([task_idx]))
        # env_obs = torch.tensor(observation)
        vae_input = torch.cat([env_obs, language_encoding], dim=-1)

        latent_obs, _, _ = self.task_inference_module(vae_input.float())

        actor_obs = torch.cat([env_obs, latent_obs], dim=-1)
        mu_and_log_std = self.policy_module(actor_obs.float())
        action, _ = mu_and_log_std.chunk(2, dim=-1)
        action = torch.tanh(action)
        if return_z:
            return action.detach().cpu().numpy(), latent_obs.detach().cpu().numpy()
        else:
            return action.detach().cpu().numpy()

legion = LEGION()


### Evaluation

In [19]:
import metaworld
from mtenv.envs.metaworld.env import get_list_of_envs
benchmark_name = 'MT10_KUKA'
benchmark=metaworld.MT10_KUKA()
env_id_to_task_map=None
make_kwargs = {
    "benchmark": benchmark,
    "benchmark_name": benchmark_name, # MT10, MT50
    "env_id_to_task_map": env_id_to_task_map,
    "num_copies_per_env": 1,
    "should_perform_reward_normalization": True,
}
list_envs, env_id_to_task_map = get_list_of_envs(
    **make_kwargs
)
max_episode_steps=150
agent = legion.train(False)
task_obs = torch.arange(len(list_envs))
env_names =  list(env_id_to_task_map.keys())
select_env = [
    'reach-v2', 
    'push-v2', 
    'pick-place-v2', 
    'door-open-v2', 
    'faucet-open-v2', 
    'drawer-close-v2', 
    'button-press-topdown-v2', 
    'peg-unplug-side-v2', 
    'window-open-v2', 
    'window-close-v2'
    ]


print(env_names)

  "Box bound precision lowered by casting to {}".format(self.dtype)


['reach-v2', 'push-v2', 'pick-place-v2', 'door-open-v2', 'faucet-open-v2', 'drawer-close-v2', 'button-press-topdown-v2', 'peg-unplug-side-v2', 'window-open-v2', 'window-close-v2']


In [20]:
import imageio
import numpy as np
show_latent_z = False
record = False
render = False
width=1024 
height=1024


# for env_idx in range(len(env_names)):
# for env in select_env:
for env in [
    'reach-v2', 
    'push-v2',
    'pick-place-v2',
    'door-open-v2', 
    'faucet-open-v2', 
    'drawer-close-v2', 
    'button-press-topdown-v2', 
    'peg-unplug-side-v2',
    'window-open-v2', 
    'window-close-v2'
    ]:
    env_idx = env_names.index(env)
    # reset
    latent_traj=[]
    traj_obs=[]
    traj_actions = []
    episode_step = 0           
    env_obs = []
    success = 0.0


    obs = list_envs[env_idx].reset()
    env_obs.append(obs)
    multitask_obs = {"env_obs": torch.tensor(env_obs), "task_obs": torch.tensor([env_idx])}
    
    if record:
        writer = imageio.get_writer(f'{env}.mp4', fps=30)
        writer.append_data(list_envs[env_idx].render('rgb_array', width=width, height=height))
        # frames.append(list_envs[env_idx].render('rgb_array', width=width, height=height))
    elif render:
        list_envs[env_idx].render()
    
        
    for episode_step in range(max_episode_steps):
        # agent select action
        with torch.no_grad():
            if show_latent_z:
                action, z = agent(multitask_obs, return_z=True)
                latent_traj.append(z)
            else:
                action = agent(multitask_obs)
        # interactive with envs get new obs
        env_obs = []
        obs, reward, done, info = list_envs[env_idx].step(action[0])
        if record:
            # frames.append(list_envs[env_idx].render('rgb_array', width=width, height=height))
            writer.append_data(list_envs[env_idx].render('rgb_array', width=width, height=height))
        elif render:
            list_envs[env_idx].render()

        if (episode_step+1) % max_episode_steps == 0:
            obs = list_envs[env_idx].reset()

        env_obs.append(obs)
        success += info['success']


        multitask_obs = {"env_obs": torch.tensor(env_obs), "task_obs": torch.tensor([env_idx])}
        episode_step += 1

    success = float(success > 0)
    print(f'env {env_names[env_idx]}, success {success}')
    if show_latent_z:
        np.save(f'latent_{env}.npy',latent_traj)
    if not record:
        list_envs[env_idx].close()
    else:
        writer.close()

env reach-v2, success 1.0
env push-v2, success 1.0
env pick-place-v2, success 1.0
env door-open-v2, success 1.0
env faucet-open-v2, success 1.0
env drawer-close-v2, success 1.0
env button-press-topdown-v2, success 1.0
env peg-unplug-side-v2, success 1.0
env window-open-v2, success 1.0
env window-close-v2, success 1.0
