# Interact with RoboCasa Environments (Notebook)

This notebook lets you interact with RoboCasa Gym environments using the same setup as `scripts/eval_policy_robocasa.py`.

- Builds the environment with `RoboCasaWrapper`, `TimeLimit`, `MultiStepWrapper`, and optional `RecordVideo`.
- Uses either a local `Gr00tPolicy` (set `model_path`) or a remote `RobotInferenceClient` (set `host`/`port`).
- Runs episodes and reports success metrics. Optionally renders saved videos inline.

In [None]:
# Install optional dependencies for video if needed (uncomment if missing)
# %pip install moviepy gymnasium[other] --quiet

import os

# Set GPU
os.environ["CUDA_VISIBLE_DEVICES"] = "0"

# # Avoid importing TensorFlow from transformers to sidestep protobuf conflicts
# os.environ.setdefault("TRANSFORMERS_NO_TF", "1")
# os.environ.setdefault("TRANSFORMERS_NO_FLAX", "1")
# os.environ.setdefault("TF_CPP_MIN_LOG_LEVEL", "3")

import numpy as np
from pathlib import Path

import gymnasium as gym
import robocasa  # noqa: F401
import robosuite  # noqa: F401

from gymnasium.wrappers import TimeLimit
from gr00t.eval.wrappers.robocasa_wrapper import RoboCasaWrapper, load_robocasa_gym_env
from gr00t.eval.wrappers.record_video import RecordVideo
from gr00t.eval.wrappers.multistep_wrapper import MultiStepWrapper

from gr00t.experiment.data_config import DATA_CONFIG_MAP
from gr00t.model.policy import Gr00tPolicy, BasePolicy
from gr00t.eval.robot import RobotInferenceClient

# Inline video utilities
from IPython.display import HTML
from base64 import b64encode

def show_video_inline(mp4_path: str):
    with open(mp4_path, "rb") as f:
        video_bytes = f.read()
    b64 = b64encode(video_bytes).decode("utf-8")
    return HTML(f"""
    <video width="640" height="480" controls>
      <source src="data:video/mp4;base64,{b64}" type="video/mp4">
      Your browser does not support the video tag.
    </video>
    """)

In [None]:
# Configuration
model_path = "/fsx/kimin/policy-v3/output/robocasa/gr00t_15_150h_30K_0801_robocasa_multitask_n1_g8_b32_lr0.0001_si-pg-noctx/checkpoint-30000"
embodiment_tag = "new_embodiment"

# Data config for modality/transform (if using local model)
data_config_name = "single_panda_gripper"  # must be a key in DATA_CONFIG_MAP

action_horizon = 16
denoising_steps = 4

# Environment
env_name = "TurnOffSinkFaucet"        # RoboCasa env name
#CloseDoubleDoor, OpenDoubleDoor, CoffeeServeMug, CoffeeSetupMug, PnPCabToCounter, PnPCounterToCab, TurnOffSinkFaucet,  TurnOnSinkFaucet, 
seed = 42
max_episode_steps = 1000
num_episodes = 1

# Video
save_video = False
video_folder = str(Path.cwd() / "robocasa_videos")
video_fps = 20

In [None]:
# Build policy

data_config = DATA_CONFIG_MAP[data_config_name]

if model_path is None:
    raise ValueError("Set model_path when use_local_model=True")
try:
    import torch  # noqa: F401
    device = "cuda" if torch.cuda.is_available() else "cpu"
except Exception:
    device = "cpu"

modality_config = data_config.modality_config()
modality_transform = data_config.transform()

policy: BasePolicy = Gr00tPolicy(
    model_path=model_path,
    modality_config=modality_config,
    modality_transform=modality_transform,
    embodiment_tag=embodiment_tag,
    denoising_steps=denoising_steps,
    device=device,
)

print("Policy modality config keys:", list(policy.get_modality_config().keys()))

In [None]:
# Build environment

# Construct base RoboCasa env and wrap
rc_env = load_robocasa_gym_env(
    env_name,
    seed=seed,
    generative_textures=None,
    layout_and_style_ids=[(1, 1), (2, 2), (4, 4), (6, 9), (7, 10)],
    layout_ids=None,
    style_ids=None,
)
env = RoboCasaWrapper(rc_env)
env = TimeLimit(env, max_episode_steps=max_episode_steps)

# Optional video recording
if save_video:
    trigger = lambda ep: True
    env = RecordVideo(env, video_folder=video_folder, episode_trigger=trigger, fps=video_fps, disable_logger=True)

# Multi-step wrapper to match policy action horizon
env = MultiStepWrapper(
    env,
    video_delta_indices=np.arange(1),
    state_delta_indices=np.arange(1),
    n_action_steps=action_horizon,
)

print("Observation space keys:", list(env.observation_space.spaces.keys()))
print("Action space keys:", list(env.action_space.spaces.keys()))

In [None]:
# Evaluation loop
from collections import defaultdict
from tqdm import tqdm

stats = defaultdict(list)
video_paths = []

for ep in range(num_episodes):
    obs, info = env.reset()
    done = False
    steps = 0

    pbar = tqdm(total=max_episode_steps, desc=f"Episode {ep+1}", leave=False)

    while not done:
        action = policy.get_action(obs)
        # postprocess to ensure last dim exists when needed (matches eval_policy_robocasa.py)
        post_action = {}
        for k, v in action.items():
            post_action[k] = v[..., None] if getattr(v, "ndim", 0) == 1 else v

        next_obs, reward, terminated, truncated, info = env.step(post_action)
        done = terminated or truncated
        obs = next_obs
        steps += action_horizon
        pbar.update(action_horizon)

    pbar.close()

    is_success = bool(info.get("is_success", False))
    stats["is_success"].append(is_success)

    # If saving video, explicitly stop the RecordVideo wrapper and then collect the file
    if save_video:
        # RecordVideo is the inner env inside MultiStepWrapper
        inner = getattr(env, "env", None)
        if hasattr(inner, "stop_recording") and getattr(inner, "recording", False):
            inner.stop_recording()
        if os.path.isdir(video_folder):
            files = sorted([str(Path(video_folder) / f) for f in os.listdir(video_folder) if f.endswith(".mp4")])
            if files:
                video_paths.append(files[-1])

success_rate = np.mean(stats["is_success"]) if stats["is_success"] else 0.0
print({"success_rate": float(success_rate)})

# Show the last video inline if available
if save_video and video_paths:
    show_video_inline(video_paths[-1])