In [None]:
%load_ext autoreload
%autoreload 2

from stable_baselines3 import TD3
from stable_baselines3.td3.policies import MlpPolicy
from stable_baselines3.common.noise import NormalActionNoise
from stable_baselines3.common.evaluation import evaluate_policy
import numpy as np
import gym
from tqdm.auto import tqdm
from envs.escape_room_continuous_space_env import EscapeRoomEnv
from stable_baselines3.common.callbacks import BaseCallback
import imageio
from IPython.display import Image, display

In [None]:
class CustomEnvWrapper(gym.Wrapper):
    def __init__(self, env):
        super().__init__(env)

    def reset(self, **kwargs):
        observation, info = self.env.reset(**kwargs)
        return self._process_observation(observation)

    def step(self, action):
        observation, reward, terminated, truncated, info = self.env.step(action)
        done = terminated or truncated
        return self._process_observation(observation), reward, done, info

    def _process_observation(self, observation):
        if isinstance(observation, tuple):
            return np.concatenate(observation)
        else:
            return observation


In [None]:
class TD3LoggingCallback(BaseCallback):
    def __init__(self, eval_env, eval_freq=1000, verbose=1):
        super(TD3LoggingCallback, self).__init__(verbose)
        self.eval_env = eval_env
        self.eval_freq = eval_freq
        self.evaluation_rewards = []

    def _on_step(self):
        return True

    def _on_rollout_end(self):
        # Evaluate the policy at the end of each rollout/episode
        mean_reward, _ = evaluate_policy(self.model, self.eval_env, n_eval_episodes=1, deterministic=True)
        self.evaluation_rewards.append(mean_reward)
        print(f"Episode {len(self.evaluation_rewards)}: Mean reward = {mean_reward}")


In [None]:
# Initialize the environment and wrap it
env = CustomEnvWrapper(EscapeRoomEnv(max_steps_per_episode=3000, goal=(550, 450), delta=15))

# Set up action noise for exploration
n_actions = env.action_space.shape[-1]
action_noise = NormalActionNoise(mean=np.zeros(n_actions), sigma=0.1 * np.ones(n_actions))

# Initialize the TD3 model
model = TD3("MlpPolicy", env, action_noise=action_noise, verbose=0)

# Set up tqdm progress bar
total_episodes = 10

progress_bar = tqdm(total=total_episodes, desc="Training Progress", leave=False)

# Path for saving the model
model_path = "./tmp/td3/td3_escape_room_checkpoint.zip"

# Initialize the callback
callback = TD3LoggingCallback(eval_env=env, eval_freq=1000)  # Evaluate every 1000 timesteps

# Train the model
for episode in range(total_episodes):
    model.learn(total_timesteps=1000, callback=callback)  # Pass the callback to the learn method

    # Save the model periodically
    if (episode + 1) % (total_episodes // 10) == 0:
        model.save(model_path)

    # Update progress bar
    progress_bar.update(1)

# Save the final model
model.save("./tmp/td3/td3_escape_room_final.zip")

# Close the environment
env.close()

print("Training completed and model saved.")

In [None]:
import matplotlib.pyplot as plt

def plot_rewards(rewards):
    plt.figure(figsize=(12, 6))
    plt.plot(rewards, marker='o', linestyle='-', color='blue')
    plt.title('Mean Rewards per Episode')
    plt.xlabel('Episode')
    plt.ylabel('Mean Reward')
    plt.grid(True)
    plt.show()

# Assuming the rewards are stored in callback.evaluation_rewards
plot_rewards(callback.evaluation_rewards)


In [None]:
def test_render(env):
    env.reset()
    frame = env.render(mode='rgb_array')
    if frame is None:
        print("Render mode 'rgb_array' is not supported.")
    else:
        print("Render mode 'rgb_array' works correctly.")
        plt.imshow(frame)
        plt.show()

test_render(env) 

In [None]:
def display_gif(path):
    with open(path, 'rb') as file:
        display(Image(file.read()))

In [None]:


# Initialize the environment and wrap it
env = CustomEnvWrapper(EscapeRoomEnv(max_steps_per_episode=3000, goal=(550, 450), delta=15))

# Load the trained model
model_path = "./tmp/td3/td3_escape_room_final.zip"
model = TD3.load(model_path, env=env)

# Evaluate the model
mean_reward, std_reward = evaluate_policy(model, env, n_eval_episodes=20, deterministic=True)
print(f"Evaluated model on {20} episodes: Mean reward = {mean_reward}, Std reward = {std_reward}")

# Optionally: Visualize the agent's performance
frames = []  # List to store frames for GIF
try:
    for _ in range(5):  # Run 5 episodes
        obs = env.reset()
        done = False
        while not done:
            action, _states = model.predict(obs, deterministic=True)
            obs, reward, done, info = env.step(action)
            frame = env.render(mode='rgb_array')  # Capture the frame in RGB format
            if frame is not None:
                frames.append(frame)
finally:
    env.close()  # Ensure the environment is closed properly

# Save the captured frames as a GIF
gif_path = './td3_agent_performance.gif'
imageio.mimsave(gif_path, frames, fps=30)  # Save as GIF

print(f"Evaluation and visualization completed. GIF saved at {gif_path}")
display_gif(gif_path)