In [None]:
%load_ext autoreload
%autoreload 2

import sys
from pathlib import Path

sys.path.append(str(Path("..").resolve()))
import torch

from PIL import Image

from rl.agent import (
    HRMQNetTrainingConfig,
)
from rl.dataset import MiniHackFullObservationSimpleEnvironmentDataset
from rl.interfaces import evaluating_net_context
from rl.environment import MiniHackDynamicMaze, MiniHack4Room
from rl.dqn_train_loop import HRMAgentTrainingModule

In [None]:
# utilities to visualise simple minihack room environment
def tensor_to_string(chars_tensor):
    np_chars = chars_tensor.cpu().numpy().transpose()
    out = []
    for row in range(np_chars.shape[0]):
        string = "".join([chr(val) for val in np_chars[row, :]])
        if string.strip() == "":
            continue
        out.append(string)
    return out


def pixels_to_img(pixels_tensor):
    return Image.fromarray(pixels_tensor.numpy()[79:287, 608:816, :])


# for navigation only
def action_one_hot_to_string(action_one_hot_tensor):
    np_action = action_one_hot_tensor.cpu().numpy()
    np_action_idx = int(np_action.argmax())
    dirs = {
        0: "north",
        1: "east",
        2: "south",
        3: "west",
        4: "north east",
        5: "south east",
        6: "south west",
        7: "north west",
    }

    return dirs[np_action_idx]

In [None]:
# get config
from hydra import compose, initialize_config_dir
from omegaconf import OmegaConf, SCMode

with initialize_config_dir(
    version_base=None,
    config_dir=str(Path("../rl/config").resolve()),
    job_name="test_cfg",
):
    cfg = compose(config_name="cfg_dqn.yaml")

typed_cfg: HRMQNetTrainingConfig = OmegaConf.to_container(
    OmegaConf.merge(OmegaConf.structured(HRMQNetTrainingConfig), cfg),
    structured_config_mode=SCMode.INSTANTIATE,
)

# for speed, we will reduce batch size, number of frames, etc.
typed_cfg.dataset.env_kwargs["observation_keys"] = ["chars"]
# typed_cfg.dataset.env_name = "MiniHack-4-Rooms"
typed_cfg.dataset.env_name = "MiniHack-Corridor-Maze-4-Way-Dynamic"
typed_cfg.resume_from_run = None
# typed_cfg.dataset.seq_len = 121
typed_cfg.dataset.seq_len = 143
typed_cfg.dataset.data_collection_batch_size = 32
typed_cfg.dataset.frames_per_update = 320
typed_cfg.log_wandb = False

# set 4 or 8 way
typed_cfg.dataset.action_space_size = 4
typed_cfg.dataset.vocab_size = 131  # if 8, else 131
typed_cfg.dataset.env_kwargs["action-space"] = 4

dataset = MiniHackFullObservationSimpleEnvironmentDataset(config=typed_cfg.dataset)

# Test playing with 1 env

In [None]:
from IPython.display import clear_output
from copy import deepcopy

dg_shape = (11, 13)

env = dataset.create_env()
inner_current_state = env.reset()

while True:
    clear_output(wait=True)

    # print last reward if any
    if "next" in inner_current_state:
        print(
            "Last action:", action_one_hot_to_string(inner_current_state[0]["action"])
        )
        print("Last action reward:", inner_current_state[0]["reward"].item())

    # visualise the current environment
    t = "\n".join(tensor_to_string(inner_current_state["inputs"].reshape(dg_shape).T))
    print(t)

    # Keyboard control (4 way)
    x = input()

    input_action_map = {
        # Cardinals
        "w": 0,  # move agent '@' north
        "a": 3,  # west
        "s": 2,  # south
        "d": 1,  # east
        # diagonals
        "k": 4,  # NE
        "m": 5,  # SE
        "n": 6,  # SW
        "h": 7,  # NW
    }

    # Cardinals
    if x in input_action_map:
        a = input_action_map[x]
    else:
        print("Breaking out of loop")
        break

    # pass the current state through to our policy and get the actions to take
    policy_decision = inner_current_state.clone()
    policy_decision["action"] = torch.tensor(
        [0 for _ in range(typed_cfg.dataset.action_space_size)]
    )
    policy_decision["action"][a] = 1

    # step the environmet
    transitions, inner_current_state = env.step_and_maybe_reset(policy_decision)

# Test trained model

In [None]:
from IPython.display import clear_output

dg_shape = (11, 13)

test_cfg = deepcopy(typed_cfg)

# interesting keys
test_cfg.dataset.env_kwargs[MiniHackDynamicMaze.KEY_P_CHANGE_DOORS] = 0.25
# test_cfg.dataset.env_kwargs[MiniHack4Room.KEY_MIN_ROOM_DISTANCE_BETWEEN_START_END] = 1  # does not apply for maze

test_cfg.use_last_hidden_state_to_seed_next_environment_step = (
    True  # could be true to test how things change
)
test_cfg.dataset.do_not_skip_running_model_if_random_action = (
    test_cfg.use_last_hidden_state_to_seed_next_environment_step
)

if test_cfg.use_last_hidden_state_to_seed_next_environment_step:
    test_cfg.dataset.training_batch_size = test_cfg.dataset.data_collection_batch_size

test_dataset = MiniHackFullObservationSimpleEnvironmentDataset(config=test_cfg.dataset)
env = test_dataset.create_env()
with torch.device(
    "cuda"
):  # make sure that the buffers used in HRM are initialised on CUDA for backprop
    hrm_agent_training_module = HRMAgentTrainingModule(test_cfg, test_dataset)

# reconfigure the out keys of the modules so that things are kept
if test_cfg.use_last_hidden_state_to_seed_next_environment_step:
    hrm_agent_training_module.qvalue_net.config.use_last_hidden_state_to_seed_next_environment_step = test_cfg.use_last_hidden_state_to_seed_next_environment_step
    hrm_agent_training_module.actor.out_keys += ["seed_h_init", "seed_l_init"]
    hrm_agent_training_module.qvalue_net.out_keys += ["seed_h_init", "seed_l_init"]

test_dataset.initialise_policy_and_collector(
    hrm_agent_training_module.actor, hrm_agent_training_module.egreedy_module
)
hrm_agent_training_module.pre_training_setup(checkpoint_dir="s3", run_name="exnjvjjo")
hrm_agent_training_module.load_from_checkpoint(
    "o2kstgjy", ckpt_path_name="last.ckpt", restore_config=False
)

model = hrm_agent_training_module.actor
inner_current_state = env.reset()

while True:
    clear_output(wait=True)

    # print last reward if any
    if "next" in inner_current_state:
        print("Last action:", action_one_hot_to_string(inner_current_state["action"]))
        print("Last action reward:", inner_current_state["next"]["reward"].item())

    # visualise the current environment
    t = "\n".join(tensor_to_string(inner_current_state["inputs"].reshape(dg_shape).T))
    print(t)

    # Keyboard control (4 way)
    x = input("Continue?")

    # Cardinals
    if (not x.lower().strip().startswith("y")) and (not x == ""):
        break

    # pass the current state through to our policy and get the actions to take
    policy_decision = inner_current_state.clone()

    with torch.no_grad(), evaluating_net_context(model):
        policy_decision = model(policy_decision.unsqueeze(0).cuda()).cpu()[0]

    # step the environmet
    inner_current_state = env.step(policy_decision)
    inner_current_state["inputs"] = inner_current_state["next"]["inputs"]

    if inner_current_state["next"]["terminated"].item():
        print("Episode ended. Resetting new environment.")
        env = test_dataset.create_env()
        inner_current_state = env.reset()

# Test validation loop

In [None]:
est_cfg = deepcopy(typed_cfg)

# interesting keys
test_cfg.dataset.env_kwargs[MiniHackDynamicMaze.KEY_P_CHANGE_DOORS] = 0.05
# test_cfg.dataset.env_kwargs[MiniHack4Room.KEY_MIN_ROOM_DISTANCE_BETWEEN_START_END] = (
#     None
# )

test_cfg.dataset.data_collection_batch_size = 1024
test_cfg.dataset.frames_per_update = 1024
test_cfg.use_last_hidden_state_to_seed_next_environment_step = (
    True  # could be true to test how things change
)
test_cfg.dataset.do_not_skip_running_model_if_random_action = (
    test_cfg.use_last_hidden_state_to_seed_next_environment_step
)

if test_cfg.use_last_hidden_state_to_seed_next_environment_step:
    test_cfg.dataset.training_batch_size = test_cfg.dataset.data_collection_batch_size

test_dataset = MiniHackFullObservationSimpleEnvironmentDataset(config=test_cfg.dataset)
env = test_dataset.create_env()
with torch.device(
    "cuda"
):  # make sure that the buffers used in HRM are initialised on CUDA for backprop
    hrm_agent_training_module = HRMAgentTrainingModule(test_cfg, test_dataset)

# reconfigure the out keys of the modules so that things are kept
if test_cfg.use_last_hidden_state_to_seed_next_environment_step:
    hrm_agent_training_module.qvalue_net.config.use_last_hidden_state_to_seed_next_environment_step = test_cfg.use_last_hidden_state_to_seed_next_environment_step
    hrm_agent_training_module.actor.out_keys += ["seed_h_init", "seed_l_init"]
    hrm_agent_training_module.qvalue_net.out_keys += ["seed_h_init", "seed_l_init"]

test_dataset.initialise_policy_and_collector(
    hrm_agent_training_module.actor, hrm_agent_training_module.egreedy_module
)
hrm_agent_training_module.pre_training_setup(checkpoint_dir="s3", run_name="exnjvjjo")
hrm_agent_training_module.load_from_checkpoint(
    "o2kstgjy",
    ckpt_path_name="last.ckpt",
    restore_config=False,
)

validation_trajectories = test_dataset.validation_rollout()

print(f"% of episodes completed: {validation_trajectories['next']['reward'][:, -1, 0].mean().item():.2%}")
