In [None]:
from dataclasses import dataclass
import tyro

from pathlib import Path

import gymnasium as gym
import torch
from agent import Agent
from mani_skill.utils.wrappers.flatten import FlattenActionSpaceWrapper
from mani_skill.utils.wrappers.record import RecordEpisode
from mani_skill.vector.wrappers.gymnasium import ManiSkillVectorEnv
from mani_skill.utils import gym_utils

from twsim.envs import plane  # noqa: F401
from twsim.robots import transwheel  # noqa: F401

import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

from tqdm.notebook import tqdm

sns.set_context("paper")
sns.set_style("ticks")

In [None]:
@dataclass
class Args:
    "Evaluate a trained policy."

    # fmt: off

    checkpoint: str                          # Path to the checkpoint file
    env_id: str                              # Environment ID
    control_mode: str = "wheel_vel_ext_pos"  # Control mode
    capture_video: bool = True               # Save videos to ./runs/{run_name}/test_videos
    num_eval_envs: int = 1                   # Number of parallel evaluation environments
    num_eval_steps: int = 500                # Number of steps to run in each evaluation environment
    eval_reconfiguration_freq: int = 1       # Reconfigure the environment each reset to ensure objects are randomized
    eval_partial_reset: bool = False         # Let parallel evaluation environments reset upon termination instead of truncation
    cuda: bool = True                        # Use GPU for evaluation

    # fmt: on

# checkpoint = 'runs/Stiffer/final_ckpt.pt'
# checkpoint = 'runs/PlaneVel-v1__train-ppo__1__1747949506/final_ckpt.pt'
# checkpoint = 'runs/Step/final_ckpt.pt'
# checkpoint = 'runs/StepEnv/final_ckpt.pt'
checkpoint = 'runs/SensorEnvLongLong/ckpt_576.pt'

# env_id = 'StepVel-v1'
env_id = 'StepVelSensor-v1'

args = Args(checkpoint, env_id)

device = torch.device("cuda" if torch.cuda.is_available() and args.cuda else "cpu")

env_kwargs = dict(
    obs_mode="state",
    render_mode="rgb_array",
    sim_backend="physx_cuda",
    control_mode=args.control_mode,
    human_render_camera_configs=dict(shader_pack="rt"),
)

eval_output_dir = Path(args.checkpoint).parent / env_id

overwrite = True
print('Checking', eval_output_dir)
if (not overwrite) and eval_output_dir.exists():
    raise SystemError("Make sure that you are not overwriting existing output.")


eval_output_dir = str(eval_output_dir)
print(f"Saving eval videos to {eval_output_dir}")

In [None]:
# Create the evaluation environment
eval_envs = gym.make(
    args.env_id,
    num_envs=args.num_eval_envs,
    reconfiguration_freq=args.eval_reconfiguration_freq,
    **env_kwargs,  # type: ignore
)

# Flatten action spaces if needed
if isinstance(eval_envs.action_space, gym.spaces.Dict):
    eval_envs = FlattenActionSpaceWrapper(eval_envs)

eval_envs = RecordEpisode(
    eval_envs,  # type: ignore
    output_dir=eval_output_dir,
    save_trajectory=False,
    max_steps_per_video=args.num_eval_steps,
    video_fps=30,
)

eval_envs = ManiSkillVectorEnv(
    eval_envs,  # type: ignore
    args.num_eval_envs,
    ignore_terminations=not args.eval_partial_reset,
    record_metrics=True,
)

In [None]:
eval_envs.unwrapped.print_sim_details()  # type: ignore
print(f"{eval_envs.unwrapped.reward_mode=}")  # type: ignore

In [None]:
print("Evaluating")

labels = ["elapsed_steps", "velx", "vely", "velz", "vel", "velocity_error", "reward_velocity", "extension", "reward_extension", "reward", "contact", "distance"]
df_rows = []

eval_obs, _ = eval_envs.reset()
num_episodes = 0

observation_shape = eval_envs.single_observation_space.shape
action_shape = eval_envs.single_action_space.shape

agent = Agent(observation_shape, action_shape).to(device)
agent.load_state_dict(torch.load(args.checkpoint))
agent.eval()

num_steps = min(gym_utils.find_max_episode_steps_value(eval_envs._env), args.num_eval_steps)

for step in tqdm(range(num_steps)):
    with torch.no_grad():
        eval_action = agent.get_action(eval_obs, deterministic=True)
        eval_obs, eval_reward, _, _, eval_infos = eval_envs.step(eval_action)
            
        if "final_info" in eval_infos:
            mask = eval_infos["_final_info"]
            num_episodes += mask.sum()
            break

        eval_infos["velx"] = eval_infos["velocity"].squeeze()[0]
        eval_infos["vely"] = eval_infos["velocity"].squeeze()[1]
        eval_infos["velz"] = eval_infos["velocity"].squeeze()[2]
        eval_infos["vel"] =  eval_infos["velocity"].squeeze().norm()

        if "distance" in eval_infos:
            eval_infos["contact"] = eval_infos["distance"] < 0.01
            
        eval_infos["reward"] = eval_reward
    
        df_rows.append({l: eval_infos[l].item() for l in labels if l in eval_infos})

total_eval_steps = args.num_eval_steps * args.num_eval_envs
print(f"Evaluated {step} steps resulting in {num_episodes} episodes")

eval_envs.close()

In [None]:
df = pd.DataFrame(df_rows)
df.head()

In [None]:
df.describe()

In [None]:
to_plot = [["vel", "velx"], ["vely", "velz"], ["velocity_error", "reward_velocity"], ["extension", "reward_extension"], ["reward", "contact"], ["distance"]]
limits = [[0, 0.4]] * 5 + [[0, 1]] + [[-10, 10]] + [[0, 1]] * 4

num_rows = len(to_plot)
num_cols = len(to_plot[0])

fig, axes = plt.subplots(num_rows, num_cols, figsize=(10, 10))

labels = [l for row in to_plot for l in row]

for ax, label, lims in zip(axes.flatten(), labels, limits):
    if label not in df.columns: print('Skipping', label); continue
    sns.lineplot(ax=ax, data=df, x="elapsed_steps", y=label)
    ax.set_ylim(lims)
    print(label, lims)

sns.despine()