# Libero 策略轨迹可视化分析

本 notebook 用于分析和可视化在 Libero 环境中记录的策略行为数据。

In [None]:
import pathlib
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from IPython.display import display, HTML
from PIL import Image
import ipywidgets as widgets
from ipywidgets import interact, IntSlider, Dropdown

# 设置matplotlib参数
plt.rcParams['figure.figsize'] = [12, 8]
plt.rcParams['axes.grid'] = True
plt.style.use('seaborn-v0_8')

In [None]:
# 加载策略记录数据
def load_policy_records(records_path="policy_records"):
    """加载所有记录的策略数据"""
    record_path = pathlib.Path(records_path)
    
    if not record_path.exists():
        raise FileNotFoundError(f"Records path {records_path} does not exist")
    
    # 获取所有step文件
    step_files = sorted(record_path.glob("step_*.npy"), 
                       key=lambda x: int(x.stem.split('_')[1]))
    
    records = []
    for step_file in step_files:
        try:
            record = np.load(step_file, allow_pickle=True).item()
            records.append(record)
        except Exception as e:
            print(f"Warning: Could not load {step_file}: {e}")
            continue
    
    print(f"Loaded {len(records)} policy records")
    return records

# 加载数据
records = load_policy_records()

# 数据结构分析
print("\n=== Data Structure ===")
print(f"Total steps: {len(records)}")
print(f"Keys: {list(records[0].keys())}")

for key, value in records[0].items():
    if isinstance(value, np.ndarray):
        print(f"{key}: shape {value.shape}, dtype {value.dtype}")
    else:
        print(f"{key}: type {type(value)}, value: {value}")

In [None]:
# 提取关键数据
positions = np.array([record['inputs/observation/state'][:3] for record in records])
orientations = np.array([record['inputs/observation/state'][3:6] for record in records])
gripper_states = np.array([record['inputs/observation/state'][6:8] for record in records])

# 提取动作（只取第一个预测步骤）
actions = np.array([record['outputs/actions'][0] for record in records])
action_positions = actions[:, :3]
action_orientations = actions[:, 3:6]
action_grippers = actions[:, 6]

# 提取图像
main_images = [record['inputs/observation/image'] for record in records]
wrist_images = [record['inputs/observation/wrist_image'] for record in records]

# 提取推理时间
inference_times = [record['outputs/policy_timing/infer_ms'] for record in records]

# 任务描述
task_prompt = records[0]['inputs/prompt']
print(f"\nTask: {task_prompt}")
print(f"Total trajectory length: {len(records)} steps")

## 1. 轨迹分析

In [None]:
# 3D轨迹可视化
fig = plt.figure(figsize=(15, 5))

# 3D轨迹
ax1 = fig.add_subplot(131, projection='3d')
ax1.plot(positions[:, 0], positions[:, 1], positions[:, 2], 'b-', linewidth=2, alpha=0.7)
ax1.scatter(positions[0, 0], positions[0, 1], positions[0, 2], color='green', s=100, label='Start')
ax1.scatter(positions[-1, 0], positions[-1, 1], positions[-1, 2], color='red', s=100, label='End')
ax1.set_xlabel('X (m)')
ax1.set_ylabel('Y (m)')
ax1.set_title('3D End-Effector Trajectory')
ax1.legend()

# XY平面投影
ax2 = fig.add_subplot(132)
ax2.plot(positions[:, 0], positions[:, 1], 'b-', linewidth=2, alpha=0.7)
ax2.scatter(positions[0, 0], positions[0, 1], color='green', s=100, label='Start')
ax2.scatter(positions[-1, 0], positions[-1, 1], color='red', s=100, label='End')
ax2.set_xlabel('X (m)')
ax2.set_ylabel('Y (m)')
ax2.set_title('XY Plane Projection')
ax2.legend()
ax2.axis('equal')

# Z轴高度变化
ax3 = fig.add_subplot(133)
ax3.plot(range(len(positions)), positions[:, 2], 'b-', linewidth=2)
ax3.set_xlabel('Time Step')
ax3.set_ylabel('Z Position (m)')
ax3.set_title('Height Over Time')

plt.tight_layout()
plt.show()

## 2. 状态演化分析

In [None]:
# 状态演化可视化
fig, axes = plt.subplots(2, 4, figsize=(16, 8))
axes = axes.flatten()

# 状态名称
state_names = ['X pos', 'Y pos', 'Z pos', 'Roll', 'Pitch', 'Yaw', 'Gripper Width', 'Gripper State']
states = np.array([record['inputs/observation/state'] for record in records])

for i, (ax, name) in enumerate(zip(axes, state_names)):
    ax.plot(states[:, i], linewidth=2, color=f'C{i}')
    ax.set_title(f'{name}')
    ax.set_xlabel('Time Step')
    ax.set_ylabel('Value')
    ax.grid(True, alpha=0.3)

plt.suptitle('Robot State Evolution Over Time', fontsize=16)
plt.tight_layout()
plt.show()

## 3. 动作分析

In [None]:
# 动作分析
fig, axes = plt.subplots(2, 4, figsize=(16, 8))
axes = axes.flatten()

action_names = ['X action', 'Y action', 'Z action', 'Roll action', 'Pitch action', 'Yaw action', 'Gripper action']

for i, (ax, name) in enumerate(zip(axes[:7], action_names)):
    ax.plot(actions[:, i], linewidth=2, color=f'C{i}')
    ax.set_title(f'{name}')
    ax.set_xlabel('Time Step')
    ax.set_ylabel('Action Value')
    ax.grid(True, alpha=0.3)

# 动作幅度
ax = axes[7]
action_magnitude = np.linalg.norm(actions[:, :6], axis=1)
ax.plot(action_magnitude, linewidth=2, color='black')
ax.set_title('Action Magnitude')
ax.set_xlabel('Time Step')
ax.set_ylabel('Magnitude')
ax.grid(True, alpha=0.3)

plt.suptitle('Action Sequence Over Time', fontsize=16)
plt.tight_layout()
plt.show()

## 4. 交互式可视化

In [None]:
# 交互式步骤查看器
@interact(step=IntSlider(min=0, max=len(records)-1, step=1, value=0, description='Step:'))
def view_step(step):
    """查看特定步骤的详细信息"""
    record = records[step]
    
    fig, axes = plt.subplots(2, 3, figsize=(15, 10))
    
    # 主相机图像
    axes[0, 0].imshow(record['inputs/observation/image'])
    axes[0, 0].set_title(f'Main Camera - Step {step}')
    axes[0, 0].axis('off')
    
    # 手腕相机图像
    axes[0, 1].imshow(record['inputs/observation/wrist_image'])
    axes[0, 1].set_title(f'Wrist Camera - Step {step}')
    axes[0, 1].axis('off')
    
    # 轨迹到当前步骤
    axes[0, 2].plot(positions[:step+1, 0], positions[:step+1, 1], 'b-', linewidth=2, alpha=0.7)
    if step > 0:
        axes[0, 2].scatter(positions[step, 0], positions[step, 1], color='red', s=100, zorder=5)
    axes[0, 2].set_xlabel('X Position')
    axes[0, 2].set_ylabel('Y Position')
    axes[0, 2].set_title(f'Trajectory up to Step {step}')
    axes[0, 2].axis('equal')
    axes[0, 2].grid(True, alpha=0.3)
    
    # 当前状态
    state = record['inputs/observation/state']
    state_labels = ['X', 'Y', 'Z', 'Roll', 'Pitch', 'Yaw', 'Grip_W', 'Grip_S']
    axes[1, 0].bar(state_labels, state)
    axes[1, 0].set_title('Current State')
    axes[1, 0].tick_params(axis='x', rotation=45)
    
    # 当前动作
    action = record['outputs/actions'][0]  # 第一个预测步骤
    action_labels = ['X', 'Y', 'Z', 'Roll', 'Pitch', 'Yaw', 'Gripper']
    axes[1, 1].bar(action_labels, action)
    axes[1, 1].set_title('Current Action')
    axes[1, 1].tick_params(axis='x', rotation=45)
    
    # 推理时间和其他信息
    info_text = f"""Step: {step}/{len(records)-1}
Inference Time: {record['outputs/policy_timing/infer_ms']:.2f} ms
Position: ({state[0]:.3f}, {state[1]:.3f}, {state[2]:.3f})
Gripper: {state[6]:.4f}
Task: {record['inputs/prompt'][:50]}..."""
    
    axes[1, 2].text(0.1, 0.5, info_text, transform=axes[1, 2].transAxes, 
                    fontsize=10, verticalalignment='center', 
                    bbox=dict(boxstyle='round', facecolor='lightgray', alpha=0.8))
    axes[1, 2].set_xlim(0, 1)
    axes[1, 2].set_ylim(0, 1)
    axes[1, 2].axis('off')
    axes[1, 2].set_title('Step Information')
    
    plt.tight_layout()
    plt.show()

## 5. 统计分析

In [None]:
# 统计分析
print("=== State Statistics ===")
state_names = ['X pos', 'Y pos', 'Z pos', 'Roll', 'Pitch', 'Yaw', 'Gripper Width', 'Gripper State']
states = np.array([record['inputs/observation/state'] for record in records])

for i, name in enumerate(state_names):
    print(f"{name:15s}: mean={states[:, i].mean():.4f}, std={states[:, i].std():.4f}, "
          f"min={states[:, i].min():.4f}, max={states[:, i].max():.4f}")

print("\n=== Action Statistics ===")
action_names = ['X action', 'Y action', 'Z action', 'Roll action', 'Pitch action', 'Yaw action', 'Gripper action']

for i, name in enumerate(action_names):
    print(f"{name:15s}: mean={actions[:, i].mean():.4f}, std={actions[:, i].std():.4f}, "
          f"min={actions[:, i].min():.4f}, max={actions[:, i].max():.4f}")

print(f"\n=== Inference Time Statistics ===")
print(f"Mean: {np.mean(inference_times):.2f} ms")
print(f"Std:  {np.std(inference_times):.2f} ms")
print(f"Min:  {np.min(inference_times):.2f} ms")
print(f"Max:  {np.max(inference_times):.2f} ms")

# 可视化统计分布
fig, axes = plt.subplots(2, 2, figsize=(12, 8))

# 位置分布
axes[0, 0].hist(positions[:, 0], bins=20, alpha=0.7, label='X', color='C0')
axes[0, 0].hist(positions[:, 1], bins=20, alpha=0.7, label='Y', color='C1')
axes[0, 0].hist(positions[:, 2], bins=20, alpha=0.7, label='Z', color='C2')
axes[0, 0].set_xlabel('Position (m)')
axes[0, 0].set_ylabel('Frequency')
axes[0, 0].set_title('Position Distribution')
axes[0, 0].legend()

# 动作幅度分布
action_magnitude = np.linalg.norm(actions[:, :6], axis=1)
axes[0, 1].hist(action_magnitude, bins=20, alpha=0.7, color='purple')
axes[0, 1].set_xlabel('Action Magnitude')
axes[0, 1].set_ylabel('Frequency')
axes[0, 1].set_title('Action Magnitude Distribution')

# 夹爪状态分布
axes[1, 0].hist(gripper_states[:, 0], bins=20, alpha=0.7, color='green')
axes[1, 0].set_xlabel('Gripper Width')
axes[1, 0].set_ylabel('Frequency')
axes[1, 0].set_title('Gripper Width Distribution')

# 推理时间分布
axes[1, 1].hist(inference_times, bins=20, alpha=0.7, color='orange')
axes[1, 1].set_xlabel('Inference Time (ms)')
axes[1, 1].set_ylabel('Frequency')
axes[1, 1].set_title('Inference Time Distribution')

plt.tight_layout()
plt.show()

## 6. 性能分析

In [None]:
# 性能分析
# 计算轨迹的平滑度
def calculate_smoothness(trajectory):
    """计算轨迹的平滑度（二阶导数的方差）"""
    if len(trajectory) < 3:
        return 0
    
    # 计算二阶导数（加速度）
    acceleration = np.diff(trajectory, n=2, axis=0)
    # 返回加速度的RMS值
    return np.sqrt(np.mean(acceleration**2))

# 计算各轴的平滑度
position_smoothness = [calculate_smoothness(positions[:, i]) for i in range(3)]
action_smoothness = [calculate_smoothness(actions[:, i]) for i in range(3)]

print("=== Trajectory Smoothness Analysis ===")
print("Position Smoothness (lower is smoother):")
for i, axis in enumerate(['X', 'Y', 'Z']):
    print(f"  {axis}: {position_smoothness[i]:.6f}")

print("\nAction Smoothness (lower is smoother):")
for i, axis in enumerate(['X', 'Y', 'Z']):
    print(f"  {axis}: {action_smoothness[i]:.6f}")

# 计算任务完成情况（基于轨迹分析）
# 假设任务是从一个位置移动到另一个位置
start_pos = positions[0]
end_pos = positions[-1]
total_distance = np.linalg.norm(end_pos - start_pos)
path_length = np.sum(np.linalg.norm(np.diff(positions, axis=0), axis=1))
path_efficiency = total_distance / path_length if path_length > 0 else 0

print(f"\n=== Path Analysis ===")
print(f"Start position: ({start_pos[0]:.3f}, {start_pos[1]:.3f}, {start_pos[2]:.3f})")
print(f"End position: ({end_pos[0]:.3f}, {end_pos[1]:.3f}, {end_pos[2]:.3f})")
print(f"Straight-line distance: {total_distance:.3f} m")
print(f"Actual path length: {path_length:.3f} m")
print(f"Path efficiency: {path_efficiency:.3f} (1.0 = straight line)")

# 夹爪使用分析
gripper_changes = np.abs(np.diff(gripper_states[:, 0]))
significant_gripper_changes = np.sum(gripper_changes > 0.005)  # 阈值可调整

print(f"\n=== Gripper Analysis ===")
print(f"Number of significant gripper state changes: {significant_gripper_changes}")
print(f"Average gripper width: {np.mean(gripper_states[:, 0]):.4f}")
print(f"Final gripper width: {gripper_states[-1, 0]:.4f}")

## 7. 保存分析结果

In [None]:
# 保存分析结果
import json

analysis_results = {
    "task_info": {
        "prompt": task_prompt,
        "total_steps": len(records),
        "trajectory_length": path_length,
        "straight_line_distance": total_distance,
        "path_efficiency": path_efficiency
    },
    "performance_metrics": {
        "position_smoothness": {
            "x": position_smoothness[0],
            "y": position_smoothness[1],
            "z": position_smoothness[2]
        },
        "action_smoothness": {
            "x": action_smoothness[0],
            "y": action_smoothness[1],
            "z": action_smoothness[2]
        },
        "gripper_changes": int(significant_gripper_changes),
        "inference_time": {
            "mean": float(np.mean(inference_times)),
            "std": float(np.std(inference_times)),
            "min": float(np.min(inference_times)),
            "max": float(np.max(inference_times))
        }
    },
    "state_statistics": {
        name: {
            "mean": float(states[:, i].mean()),
            "std": float(states[:, i].std()),
            "min": float(states[:, i].min()),
            "max": float(states[:, i].max())
        } for i, name in enumerate(state_names)
    }
}

# 保存到文件
output_dir = pathlib.Path("visualizations")
output_dir.mkdir(exist_ok=True)

with open(output_dir / "analysis_results.json", "w") as f:
    json.dump(analysis_results, f, indent=2)

print(f"Analysis results saved to {output_dir / 'analysis_results.json'}")
print("\n=== Analysis Complete ===")
print(f"Total visualization files created in {output_dir}")