# LIBERO Policy Evaluation

This notebook evaluates a policy on LIBERO benchmark tasks.


## 1. Import Libraries


In [5]:
import collections
import dataclasses
import logging
import math, sys, os
import pathlib
from datetime import datetime
import cv2

sys.path.append("/hdd/zijianwang/openpi/third_party/LIBERO-PRO")

import imageio
from libero.libero import benchmark
from libero.libero import get_libero_path
from libero.libero.envs import OffScreenRenderEnv
import numpy as np
from openpi_client import image_tools
from openpi_client import websocket_client_policy as _websocket_client_policy
import tqdm
import matplotlib.pyplot as plt
from typing import Dict, Any
# Setup logging
logging.basicConfig(level=logging.INFO)


## 2. Define Constants and Helper Functions


In [6]:
LIBERO_DUMMY_ACTION = [0.0] * 6 + [-1.0]
LIBERO_ENV_RESOLUTION = 256  # resolution used to render training data


def _get_libero_env(task, resolution, seed):
    """Initializes and returns the LIBERO environment, along with the task description."""
    task_description = task.language
    task_bddl_file = pathlib.Path(get_libero_path("bddl_files")) / task.problem_folder / task.bddl_file
    env_args = {"bddl_file_name": task_bddl_file, "camera_heights": resolution, "camera_widths": resolution}
    print(env_args)
    env = OffScreenRenderEnv(**env_args)
    env.seed(seed)  # IMPORTANT: seed seems to affect object positions even when using fixed initial state
    return env, task_description


def _quat2axisangle(quat):
    """
    Copied from robosuite: https://github.com/ARISE-Initiative/robosuite/blob/eafb81f54ffc104f905ee48a16bb15f059176ad3/robosuite/utils/transform_utils.py#L490C1-L512C55
    """
    # clip quaternion
    if quat[3] > 1.0:
        quat[3] = 1.0
    elif quat[3] < -1.0:
        quat[3] = -1.0

    den = np.sqrt(1.0 - quat[3] * quat[3])
    if math.isclose(den, 0.0):
        # This is (close to) a zero degree rotation, immediately return
        return np.zeros(3)

    return (quat[:3] * 2.0 * math.acos(quat[3])) / den
def _plot_velocity_trajectory(velocity_trajectory, output_path):
    """
    Plot velocity trajectory with N dimensions as different colored lines.
    
    Args:
        velocity_trajectory: List of N-dimensional velocity vectors or 1D array
        output_path: Path to save the plot
    """
    if len(velocity_trajectory) == 0:
        logging.warning("No velocity data to plot")
        return
    
    velocity_array = np.array(velocity_trajectory)  # Shape: (T, N) or (T,)
    time_steps = np.arange(len(velocity_trajectory))
    
    # Handle 1D case (T,) by reshaping to (T, 1)
    if velocity_array.ndim == 1:
        velocity_array = velocity_array.reshape(-1, 1)
    
    # Get number of dimensions
    num_dims = velocity_array.shape[1]
    
    # Create figure with good size
    plt.figure(figsize=(12, 6))
    
    # Define colors - use a colormap for arbitrary number of dimensions
    colors = plt.cm.tab10(np.linspace(0, 1, max(num_dims, 1)))
    dimension_labels = [f'Joint {i+1}' for i in range(num_dims)]
    
    # Plot each dimension with different color
    for dim in range(num_dims):
        plt.plot(time_steps, velocity_array[:, dim], 
                color=colors[dim], label=dimension_labels[dim], linewidth=1.5, alpha=0.8)
    
    plt.xlabel('Time Step', fontsize=12)
    plt.ylabel('Velocity (rad/s)', fontsize=12)
    plt.title('Joint Velocity Trajectory', fontsize=14, fontweight='bold')
    plt.legend(loc='best', fontsize=10)
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    
    # Save the plot
    plt.savefig(output_path, dpi=100)
    plt.close()
    
    logging.info(f"Velocity plot saved to: {output_path}")


def add_text_to_image(temp_img, CoA_step):
    """Add text overlay to image showing length and step number.
    
    Args:
        temp_img (np.ndarray): Input image of shape (224, 224, 3)
        num_act_units (int): Number of action units
        CoA_step (int): Current step number
        
    Returns:
        np.ndarray: Image with text overlay
    """
    img = temp_img.copy()
    font = cv2.FONT_HERSHEY_SIMPLEX
    text = f"step: {CoA_step}"
    
    # Get text size to position it in upper right
    (text_width, text_height), _ = cv2.getTextSize(text, font, 0.5, 1)
    
    # Position text 10 pixels from right and top edges
    text_x = img.shape[1] - text_width - 10
    text_y = text_height + 10
    
    # Add white text with black outline for visibility
    cv2.putText(img, text, (text_x, text_y), font, 0.5, (0,0,0), 2)
    cv2.putText(img, text, (text_x, text_y), font, 0.5, (255,255,255), 1)
    
    return img




def is_gripper_closed(obs: Dict[str, Any], tolerance: float = 0.005) -> bool:
    """
    根据 robosuite 的 observation 中 'robot0_gripper_qpos' 的值判断夹爪是否闭合。

    Args:
        obs (Dict[str, Any]): 来自 env.step() 的观察结果字典。
        tolerance (float): 判断夹爪是否闭合时的容差阈值。

    Returns:
        bool: 如果夹爪被认为是闭合的，则返回 True，否则返回 False。
    """
    if 'robot0_gripper_qpos' not in obs:
        raise KeyError("Observation dictionary does not contain 'robot0_gripper_qpos'.")

    gripper_qpos = obs['robot0_gripper_qpos']
    
    # 假设夹爪是双指且左右关节位置大小相等、符号相反。
    # 我们可以计算两个关节位置的绝对值之和，或计算它们之间的距离。
    # 当夹爪闭合时，关节位置接近于0。
    # 这里的 total_closure 是一个简化的度量，表示夹爪的闭合程度。
    total_closure = np.sum(np.abs(gripper_qpos))
    
    # 如果总闭合度小于容差，则认为夹爪已闭合。
    return total_closure < tolerance

## 3. Set Hyperparameters


In [3]:
@dataclasses.dataclass
class Args:
    #################################################################################################################
    # Model server parameters
    #################################################################################################################
    host: str = "0.0.0.0"
    port: int = 8001
    resize_size: int = 224
    replan_steps: int = 5

    #################################################################################################################
    # LIBERO environment-specific parameters
    #################################################################################################################
    task_suite_name: str = (
        "libero_10"  # Task suite. Options: libero_spatial, libero_object, libero_goal, libero_10, libero_90
    )
    num_steps_wait: int = 10  # Number of steps to wait for objects to stabilize in sim
    num_trials_per_task: int = 50  # Number of rollouts per task

    #################################################################################################################
    # Utils
    #################################################################################################################
    video_out_path: str = "data/libero/videos"  # Path to save videos

    seed: int = 7  # Random Seed (for reproducibility)


# Create args instance - you can modify these values as needed
args = Args(
    host="0.0.0.0",
    port=8001,
    resize_size=224,
    replan_steps=5,
    task_suite_name="libero_spatial",
    num_steps_wait=10,
    num_trials_per_task=1,
    video_out_path="data/libero/videos",
    seed=7
)

print("Hyperparameters:")
print(f"  Host: {args.host}")
print(f"  Port: {args.port}")
print(f"  Resize size: {args.resize_size}")
print(f"  Replan steps: {args.replan_steps}")
print(f"  Task suite: {args.task_suite_name}")
print(f"  Num steps wait: {args.num_steps_wait}")
print(f"  Num trials per task: {args.num_trials_per_task}")
print(f"  Video output path: {args.video_out_path}")
print(f"  Seed: {args.seed}")


Hyperparameters:
  Host: 0.0.0.0
  Port: 8001
  Resize size: 224
  Replan steps: 5
  Task suite: libero_spatial
  Num steps wait: 10
  Num trials per task: 1
  Video output path: data/libero/videos
  Seed: 7


## 4. Run Policy Evaluation

**Note:** Make sure your model server is running before executing this cell!


In [8]:
# Set random seed
np.random.seed(args.seed)

# Get current timestamp for this run
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

# Create timestamped video output directory
video_out_dir = pathlib.Path(args.video_out_path) / timestamp / args.task_suite_name
video_out_dir.mkdir(parents=True, exist_ok=True)
logging.info(f"Videos will be saved to: {video_out_dir}")

# Initialize LIBERO task suite
benchmark_dict = benchmark.get_benchmark_dict()
task_suite = benchmark_dict[args.task_suite_name]()
num_tasks_in_suite = task_suite.n_tasks
logging.info(f"Task suite: {args.task_suite_name}")

if "libero_spatial" in args.task_suite_name:
    max_steps = 220  # longest training demo has 193 steps
elif "libero_object" in args.task_suite_name:
    max_steps = 280  # longest training demo has 254 steps
elif "libero_goal" in args.task_suite_name:
    max_steps = 300  # longest training demo has 270 steps
elif "libero_10" in args.task_suite_name:
    max_steps = 520  # longest training demo has 505 steps
elif "libero_90" in args.task_suite_name:
    max_steps = 400  # longest training demo has 373 steps
else:
    raise ValueError(f"Unknown task suite: {args.task_suite_name}")

client = _websocket_client_policy.WebsocketClientPolicy(args.host, args.port)

# Start evaluation
total_episodes, total_successes = 0, 0
for task_id in tqdm.tqdm(range(num_tasks_in_suite)):
    if task_id != 5:
        continue
    # Get task
    task = task_suite.get_task(task_id)

    # Get default LIBERO initial states
    initial_states = task_suite.get_task_init_states(task_id)

    # Initialize LIBERO environment and task description
    env, task_description = _get_libero_env(task, LIBERO_ENV_RESOLUTION, args.seed)

    # Start episodes
    task_episodes, task_successes = 0, 0
    for episode_idx in tqdm.tqdm(range(args.num_trials_per_task)):
        # if episode_idx != 0:
        #     sys.exit()

        # Reset environment
        env.reset()
        action_plan = collections.deque()

        # Set initial states
        obs = env.set_init_state(initial_states[episode_idx])
        robot_instance = env.robots[0]
        # Setup
        t = 0
        replay_images = []
        velocity_trajectory = []  # Store velocity data for visualization
        linear_speed_trajectory = []
        gripper_state_trajectory = []

        logging.info(f"Starting episode {task_episodes+1}...")
        while t < max_steps + args.num_steps_wait:
            try:
                # IMPORTANT: Do nothing for the first few timesteps because the simulator drops objects
                # and we need to wait for them to fall
                if t < args.num_steps_wait:
                    obs, reward, done, info = env.step(LIBERO_DUMMY_ACTION)
                    velocity_trajectory.append(np.zeros(7))
                    linear_speed_trajectory.append(0)
                    t += 1
                    continue

                # Get preprocessed image
                # IMPORTANT: rotate 180 degrees to match train preprocessing
                img = np.ascontiguousarray(obs["agentview_image"][::-1, ::-1])
                wrist_img = np.ascontiguousarray(obs["robot0_eye_in_hand_image"][::-1, ::-1])
                img = image_tools.convert_to_uint8(
                    image_tools.resize_with_pad(img, args.resize_size, args.resize_size)
                )
                wrist_img = image_tools.convert_to_uint8(
                    image_tools.resize_with_pad(wrist_img, args.resize_size, args.resize_size)
                )

                # Save preprocessed image for replay video
                tempimg = add_text_to_image(img, t)
                replay_images.append(tempimg)
                if not action_plan:
                    # Finished executing previous action chunk -- compute new chunk
                    # Prepare observations dict
                    element = {
                        "observation/image": img,
                        "observation/wrist_image": wrist_img,
                        "observation/state": np.concatenate(
                            (
                                obs["robot0_eef_pos"],
                                _quat2axisangle(obs["robot0_eef_quat"]),
                                obs["robot0_gripper_qpos"],
                            )
                        ),
                        "prompt": str(task_description),
                    }

                    # Query model to get action
                    action_chunk = client.infer(element)["actions"]
                    assert action_chunk.shape[-2] >= args.replan_steps, (
                        f"We want to replan every {args.replan_steps} steps, but policy only predicts {action_chunk.shape[-2]} steps."
                    )
                    action_chunk = action_chunk[0]
                    action_plan.extend(action_chunk[: args.replan_steps])


                action = action_plan.popleft()
                # print(f"action: {action}")

                # Execute action in environment
                obs, reward, done, info = env.step(action.tolist())
                gripper_is_closed_result = is_gripper_closed(obs)
                eef_total_velocity = robot_instance._hand_total_velocity  # vx, vy, vz, rx, ry, rz 3个线速度, 3个角速度

                # 提取前3个分量作为线速度向量
                linear_velocity = eef_total_velocity[:3]

                # 计算线速度的幅值
                linear_speed = np.linalg.norm(linear_velocity)
                # print(f"linear_speed: {linear_speed}")
                linear_speed_trajectory.append(linear_speed)
                gripper_state_trajectory.append(gripper_is_closed_result)
                
                # Collect velocity data
                # velocity = obs["robot0_joint_vel"]
                # velocity_trajectory.append(velocity)
                # print(velocity)
                action_length = len(action.tolist())
                if done:
                    task_successes += 1
                    total_successes += 1
                    break
                t += 1

            except Exception as e:
                logging.error(f"Caught exception: {e}")
                break

        task_episodes += 1
        total_episodes += 1

        # Save a replay video of the episode with unique filename
        # Include task_id, episode_idx, and success/failure status
        suffix = "success" if done else "failure"
        task_segment = task_description.replace(" ", "_")
        video_filename = f"task{task_id:02d}_ep{episode_idx:03d}_{task_segment}_{suffix}.mp4"
        video_path = video_out_dir / video_filename
        
        imageio.mimwrite(
            video_path,
            [np.asarray(x) for x in replay_images],
            fps=24,
        )
        logging.info(f"Video saved to: {video_path}")
        
        # Save velocity trajectory plot
        velocity_plot_filename = f"task{task_id:02d}_ep{episode_idx:03d}_{task_segment}_{suffix}_velocity.png"
        velocity_plot_path = video_out_dir / velocity_plot_filename
        _plot_velocity_trajectory(linear_speed_trajectory, str(velocity_plot_path))

        # Save gripper state trajectory plot
        gripper_state_plot_filename = f"task{task_id:02d}_ep{episode_idx:03d}_{task_segment}_{suffix}_gripper_state.png"
        gripper_state_plot_path = video_out_dir / gripper_state_plot_filename
        _plot_velocity_trajectory(gripper_state_trajectory, str(gripper_state_plot_path))

        # Log current results
        logging.info(f"Success: {done}")
        logging.info(f"# episodes completed so far: {total_episodes}")
        logging.info(f"# successes: {total_successes} ({total_successes / total_episodes * 100:.1f}%)")

    # Log final results
    logging.info(f"Current task success rate: {float(task_successes) / float(task_episodes)}")
    logging.info(f"Current total success rate: {float(total_successes) / float(total_episodes)}")

logging.info(f"Total success rate: {float(total_successes) / float(total_episodes)}")
logging.info(f"Total episodes: {total_episodes}")
logging.info(f"All videos saved to: {video_out_dir}")


INFO:root:Videos will be saved to: data/libero/videos/20251030_162902/libero_spatial
INFO:root:Task suite: libero_spatial
INFO:root:Waiting for server at ws://0.0.0.0:8001...


[info] Applying task order index 0 (permutation: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]) for benchmark 'libero_spatial' (10 tasks).


  0%|          | 0/10 [00:00<?, ?it/s]

{'bddl_file_name': PosixPath('/hdd/zijianwang/openpi/third_party/LIBERO-PRO/libero/libero/bddl_files/libero_spatial/pick_up_the_black_bowl_on_the_ramekin_and_place_it_on_the_plate.bddl'), 'camera_heights': 256, 'camera_widths': 256}


INFO:root:Starting episode 1...


linear_speed: 0.04449995057193282
linear_speed: 0.07264213814136948
linear_speed: 0.09830925587567169
linear_speed: 0.13172485385010774
linear_speed: 0.16437926501377167
linear_speed: 0.19019679541492715
linear_speed: 0.20694882588643382
linear_speed: 0.21605528401734894
linear_speed: 0.2229828144728291
linear_speed: 0.22785469638847794
linear_speed: 0.24025529367097276
linear_speed: 0.24882314935189873
linear_speed: 0.25431446665935337
linear_speed: 0.25799803536876964
linear_speed: 0.2577473296914864
linear_speed: 0.23907238962141397
linear_speed: 0.22788622933931962
linear_speed: 0.22265017225745354
linear_speed: 0.21361860060103613
linear_speed: 0.20436677458727112
linear_speed: 0.21173650306783418
linear_speed: 0.2156857811201936
linear_speed: 0.22502108799748943
linear_speed: 0.2325232305882428
linear_speed: 0.24142677069592525
linear_speed: 0.23323281833934983
linear_speed: 0.222234780404894
linear_speed: 0.19745954276199115
linear_speed: 0.17248769926195215
linear_speed: 0.1429

INFO:root:Video saved to: data/libero/videos/20251030_162902/libero_spatial/task05_ep000_pick_up_the_black_bowl_on_the_ramekin_and_place_it_on_the_plate_success.mp4


linear_speed: 0.2643686212530977
linear_speed: 0.23821784189193584


INFO:root:Velocity plot saved to: data/libero/videos/20251030_162902/libero_spatial/task05_ep000_pick_up_the_black_bowl_on_the_ramekin_and_place_it_on_the_plate_success_velocity.png
INFO:root:Velocity plot saved to: data/libero/videos/20251030_162902/libero_spatial/task05_ep000_pick_up_the_black_bowl_on_the_ramekin_and_place_it_on_the_plate_success_gripper_state.png
INFO:root:Success: True
INFO:root:# episodes completed so far: 1
INFO:root:# successes: 1 (100.0%)
100%|██████████| 1/1 [00:06<00:00,  6.37s/it]
INFO:root:Current task success rate: 1.0
INFO:root:Current total success rate: 1.0
100%|██████████| 10/10 [00:07<00:00,  1.41it/s]
INFO:root:Total success rate: 1.0
INFO:root:Total episodes: 1
INFO:root:All videos saved to: data/libero/videos/20251030_162902/libero_spatial
