In [3]:
%load_ext autoreload
%autoreload 2

import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn as nn

import sys

sys.path.append('..')

from algorithms.random_policy import RandomPolicy
from algorithms.sequence_models.decision_sequence_policy import DTPolicy
from algorithms.sequence_models.decision_transformer.decision_transformer import DecisionTransformer
from algorithms.sequence_models.evaluate import evaluate_on_env
from data.door_key_dataset import DoorKeyDataset
from data.random_walk_dataset import RandomWalkDataset
from envs.door_key import DoorKeyEnv, DoorKeyEnvSmall
from data.trajectory import LimitedContextWrapper
from algorithms.sequence_models.config import TrainConfig
from algorithms.sequence_models.decision_transformer.trainer import TrainerDT
from envs.random_walk import RandomWalkEnv
from experiment import Experiment
import gymnasium as gym
from functools import partial

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print("device is ", device)

# Random Walk Env

# Door Key Env

In [22]:
from minigrid.wrappers import ImgObsWrapper

inner_env = gym.make('MiniGrid-DoorKey-5x5-v0')
inner_env_human = gym.make('MiniGrid-DoorKey-5x5-v0', render_mode='human')
env = ImgObsWrapper(inner_env)
env_human = ImgObsWrapper(inner_env_human)


config = TrainConfig(max_eval_ep_len=env.max_steps, context_len=32)

# todo save the dataset for the experiment
traj_dataset = DoorKeyDataset(env, n_trajectories=100, reward_scale=20)

model = DecisionTransformer(
    state_dim=traj_dataset.state_dim(),
    act_dim=traj_dataset.action_dim(),
    n_blocks=config.n_blocks,
    h_dim=config.embed_dim,
    context_len=config.context_len,
    n_heads=config.n_heads,
    drop_p=config.dropout_p,
).to(device)

DoorKeyDTPolicy = partial(DTPolicy, model=model, traj_dataset=traj_dataset, device=device, max_test_ep_len=config.max_eval_ep_len, context_length=config.context_len)

experiment = Experiment(
    model_name='dt',
    model=model,
    env_name='MiniGrid-DoorKey-5x5-v0',
    env=env,
    experiment_name='starter',
    traj_dataset=traj_dataset,
    dataset_name=f'size={len(traj_dataset)}',
    config=config,
    device=device,
    eval_policies_and_names=[
        (DoorKeyDTPolicy(rtg=rtg), f'dt,rtg={rtg}')
        for rtg in np.linspace(0, 1.5, 10)
    ]
)

In [23]:
report = experiment.train_for(10)

In [24]:
experiment.plot_loss(report)

In [11]:
experiment.train_for(100)

### how to evaluate policy?

In [25]:
from algorithms.evaluate_policy import evaluate_policy

DoorKeyDTPolicy = partial(DTPolicy, model=model, traj_dataset=traj_dataset, device=device, max_test_ep_len=config.max_eval_ep_len, context_length=config.context_len)

# one way: use DTPolicy and evaluate_policy
policy = DoorKeyDTPolicy(rtg=1)
evaluate_policy(policy, env, num_eval_ep=config.num_eval_ep, max_test_ep_len=config.max_eval_ep_len)


In [26]:
# or manually evaluate!

obs, _ = env.reset()
policy.reset()  # important
done = False

while not done:
    action = policy.predict(obs)
    obs, reward, terminated, truncated, _ = env.step(action)
    done = terminated or truncated
    policy.add_to_history(obs, action, reward, done) # important
    print(obs, reward, action)