# Test Door Key Offline Training with d3rlpy and Decision Transformer

We will use the Door Key 16x16 environment from Minigrid Gym to test the Decision Transformer algorithm from d3rlpy.

In [None]:
# Test if we are running on CoLab or not
if 'google.colab' in str(get_ipython()):
  print('Running on CoLab')
  !apt-get install -y xvfb ffmpeg > /dev/null 2>&1
  %pip install pyvirtualdisplay pygame moviepy > /dev/null 2>&1
  %pip install d3rlpy
else:
  print('Not running on CoLab')

In [None]:
# Directory creation
import os
path = "./models"
isExist = os.path.exists(path)
if not isExist:
  os.makedirs(path)

path = "./videos/video-doorkey-d3rlpy"
isExist = os.path.exists(path)
if not isExist:
  os.makedirs(path)

In [None]:
import gymnasium as gym
from gymnasium import spaces
from minigrid.envs import DoorKeyEnv
from gymnasium.core import ActType, ObsType
from typing import Any, SupportsFloat
import random, math

class CurriculumEnvWrapper(gym.Wrapper, gym.utils.RecordConstructorArgs):

    def __init__(self, env: gym.Env, beta: int = 0.1):
        gym.utils.RecordConstructorArgs.__init__(self)
        gym.Wrapper.__init__(self, env)
        self.beta = beta

        # Observations are dictionaries containing an
        # encoding of the grid and a textual 'mission' string
        image_observation_space = spaces.Box(
            low=0,
            high=255,
            shape=(self.unwrapped.grid.width, self.unwrapped.grid.height, 3),
            dtype="uint8",
        )
        self.env.unwrapped.observation_space["image"] =  image_observation_space

    def step(
        self, action: ActType
    ) -> tuple[ObsType, SupportsFloat, bool, bool, dict[str, Any]]:
        obs:  gym.wrappers.LazyFrames
        obs, reward, terminated, truncated, info = self.env.step(action)

        vis_mask = np.ones(shape=(self.grid.width, self.grid.height), dtype=bool)
        image = self.grid.encode(vis_mask)
        obs["image"] = image

        return obs, reward, terminated, truncated, info

    def reset(self, **kwargs):

        env : DoorKeyEnv = self.env.unwrapped

        obs, dc = self.env.reset(**kwargs)

        vis_mask = np.ones(shape=(self.unwrapped.grid.width, self.unwrapped.grid.height), dtype=bool)
        image = self.unwrapped.grid.encode(vis_mask)

        obs["image"] = image
        
        # Randomize the position of the agent
        curriculum = random.random()

        if curriculum < self.beta:
            env.agent_pos = (-1, -1)
            pos = env.place_obj(None, top=(0,0), size=None, max_tries=math.inf)
            env.agent_pos = pos

        return obs, dc
class NumpyStackWrapper(gym.Wrapper, gym.utils.RecordConstructorArgs):

    def __init__(self, env: gym.Env, n: int = 4):
        gym.utils.RecordConstructorArgs.__init__(self)
        gym.Wrapper.__init__(self, env)
        self.n = n

    def reset(self, **kwargs):
    
        obs:  gym.wrappers.LazyFrames

        obs, info = self.env.reset(**kwargs)

        return obs.__array__(), info

    def step(self, action):

        obs:  gym.wrappers.LazyFrames

        obs, reward, terminated, truncated, info  = self.env.step(action)

        return obs.__array__(), reward, terminated, truncated, info 


In [None]:
import d3rlpy
import gymnasium as gym
import torch
import torch.nn as nn
from d3rlpy.models.encoders import EncoderFactory

env_key = "MiniGrid-DoorKey-16x16-v0"

def create_env(env_key, max_episode_steps=1000, is_video=False, curriculum_mode=False):

    render_mode = None

    if is_video == True:
        render_mode = 'rgb_array'

    env = gym.make(env_key, max_episode_steps=max_episode_steps, render_mode=render_mode, see_through_walls=True)

    if (curriculum_mode):
        env = CurriculumEnvWrapper(env)

    if (is_video == False):
        env = gym.wrappers.FilterObservation(env, filter_keys=['image','direction']) 
        env = gym.wrappers.FlattenObservation(env)
        #env = gym.wrappers.FrameStack(env, 20)
        #env = NumpyStackWrapper(env)
        #env = gym.wrappers.FlattenObservation(env)

    return env

env = create_env(env_key, max_episode_steps=200, curriculum_mode=True)
eval_env = create_env(env_key, max_episode_steps=200, curriculum_mode=True)

class CustomEncoder(nn.Module):
    def __init__(self, observation_shape):
        super().__init__()
        print(observation_shape)

        self.fc1 = nn.Linear(observation_shape[0], 2048)
        self.fc1dropout = nn.Dropout(0.5)
        self.fc2 = nn.Linear(2048, 1024)
        self.fc2dropout = nn.Dropout(0.5)
        self.fc3 = nn.Linear(1024, 512)

    def forward(self, x):
        h = torch.relu(self.fc2dropout(self.fc1(x)))
        h = torch.relu(self.fc2dropout(self.fc2(h)))
        h = torch.relu(self.fc3(h))
        return h
    
class CustomEncoderFactory(EncoderFactory):

    def create(self, observation_shape):
        return CustomEncoder(observation_shape)

    @staticmethod
    def get_type() -> str:
        return "custom"

dqn = d3rlpy.algos.DiscreteSACConfig(

).create(device="cuda:0")
#dqn = d3rlpy.algos.DQNConfig(
#    encoder_factory=CustomEncoderFactory(),
#    batch_size=100,
#    gamma=0.9,
#    target_update_interval=1000,
#    learning_rate=2.5e-4
#).create(device="cuda:0")

In [None]:
import numpy as np

from collections import deque
from typing import Deque, List, Sequence, Tuple

from typing_extensions import Protocol

from d3rlpy.dataset.components import EpisodeBase

from d3rlpy.dataset.buffers import BufferProtocol

from d3rlpy.dataset.writers import ExperienceWriter, _ActiveEpisode, WriterPreprocessProtocol
from d3rlpy.dataset.components import Signature


class CustomReplayBuffer(d3rlpy.dataset.ReplayBuffer):

    def clip_episode(self, terminated: bool) -> None:
        r"""Clips the current episode.

        Args:
            terminated: Flag to represent environment termination.
        """

        episode_to_remove = None
        # Check if the episode's reward is 0 or negative
        if not terminated and self._writer._active_episode.rewards.mean() <= 0:
            episode_to_remove = self._writer._active_episode
            
        self._writer.clip_episode(terminated)

        if episode_to_remove is not None:
            # Remove all transitions associated with the episode to remove
            self._buffer._transitions = [(ep, idx) for ep, idx in self._buffer._transitions if ep is not episode_to_remove]
            self._buffer.episodes.remove(episode_to_remove)  


class CustomWriterPreprocess(d3rlpy.dataset.WriterPreprocessProtocol):

    def process_observation(self, observation: d3rlpy.types.Observation) -> d3rlpy.types.Observation:
        return observation

    def process_action(self, action: np.ndarray) -> np.ndarray:
        #print(action)
        return action

    def process_reward(self, reward: np.ndarray) -> np.ndarray:
        #if (reward > 0.1):
        #    print(reward)
        return reward
    
writer_preprocessor = CustomWriterPreprocess()

#buffer = PriorityBuffer(200)
buffer = d3rlpy.dataset.FIFOBuffer(20000)
buffer = CustomReplayBuffer(
    buffer,
    env=env, 
    #observation_signature=observation_signature,
    writer_preprocessor=writer_preprocessor
)

#buffer = d3rlpy.dataset.create_fifo_replay_buffer(
#    limit=10000, env=env)

explorer = d3rlpy.algos.LinearDecayEpsilonGreedy(0.9, 0.3)
dqn.fit_online(
    env,
    buffer,
    explorer,
    n_steps=10000000,  # train for 100K steps
    eval_env=eval_env,
    n_steps_per_epoch=100000,  # evaluation is performed every 1K steps
    update_start_step=10000,  # parameter update starts after 1K steps
    update_interval=100
)

In [None]:
import gymnasium as gym

import numpy as np
from gym.wrappers import RecordVideo

# start virtual display
d3rlpy.notebook_utils.start_virtual_display()

env = create_env(env_key, max_episode_steps=1000, curriculum_mode=True)
# wrap RecordVideo wrapper
env_video = create_env(env_key, max_episode_steps=1000, is_video=True, curriculum_mode=True)

env_video = RecordVideo(env_video, './videos/video-doorkey-d3rlpy')

seed = 2

# interaction
observation, reward = env.reset(seed=seed)

env_video.reset(seed=seed)

explorer = d3rlpy.algos.ConstantEpsilonGreedy(0.3)
i = 0
done = False

while True:
    #action = dqn.predict(np.expand_dims(observation, axis=0))[0]
    x = np.expand_dims(observation, axis=0)

    action = explorer.sample(dqn, x, 0)[0]

    observation, reward, done, truncated, _ = env.step(action)
    env_video.step(action)

    if done:
        print("reward:", reward)
        print("DONE!!!")
        env_video.reset(seed=seed)
        break
    elif truncated:
        print("Truncated")
        break


d3rlpy.notebook_utils.render_video("./videos/video-doorkey-d3rlpy/rl-video-episode-0.mp4")