In [None]:
import gymnasium
import gym_aloha
import numpy as np
import imageio
import torch
from pathlib import Path

# 尝试导入 LeRobot 数据集加载器
try:
    from lerobot.common.datasets.lerobot_dataset import LeRobotDataset
except ImportError:
    try:
        from lerobot.datasets.lerobot_dataset import LeRobotDataset
    except ImportError:
        print("错误: 无法导入 LeRobotDataset。请确保安装了 lerobot 包 (pip install -e .)")
        exit(1)

def verify_dataset_env_match(
    dataset_repo_id: str,
    env_task_name: str,
    episode_index: int = 0,
    output_filename: str = "replay_verification.mp4",
    apply_flip_fix: bool = False,
    max_steps: int = 400
):
    print(f"Loading dataset: {dataset_repo_id}...")
    # 关键修复1: 强制使用 pyav 后端以避免 torchcodec 错误
    dataset = LeRobotDataset(dataset_repo_id, video_backend="pyav")
    
    # 关键修复2: 正确获取指定 Episode 的帧范围
    # LeRobotDataset 是扁平的，我们需要查询元数据来找到第 N 个 Episode 在哪
    if dataset.meta.episodes is None:
        dataset.meta.load_metadata()
    
    episode_meta = dataset.meta.episodes[episode_index]
    
    # 获取起始和结束帧索引
    # 注意: 这些值可能是 Tensor 也可能是 int，视版本而定，这里做统一处理
    from_idx = episode_meta["dataset_from_index"]
    to_idx = episode_meta["dataset_to_index"]
    
    if isinstance(from_idx, torch.Tensor): from_idx = from_idx.item()
    if isinstance(to_idx, torch.Tensor): to_idx = to_idx.item()
        
    # 计算实际要播放的步数
    num_frames = to_idx - from_idx
    actual_steps = min(num_frames, max_steps)
    end_idx = from_idx + actual_steps
    
    print(f"Loaded Episode {episode_index}")
    print(f"  - Frame Range: {from_idx} to {to_idx}")
    print(f"  - Replaying: {actual_steps} steps")

    # 初始化环境
    print(f"Initializing environment: {env_task_name}...")
    env = gymnasium.make(env_task_name)
    observation, info = env.reset()

    frames = []
    
    # 定义 OpenPi 标准翻转掩码 (用于测试 adapt_to_pi=True 的情况)
    # [1, -1, -1, 1, 1, 1, 1, 1, -1, -1, 1, 1, 1, 1]
    flip_mask = np.array([1, -1, -1, 1, 1, 1, 1, 1, -1, -1, 1, 1, 1, 1])

    print("Starting replay...")
    
    # 逐帧遍历
    for i, frame_idx in enumerate(range(from_idx, end_idx)):
        # 读取单帧数据
        item = dataset[frame_idx]
        action = item["action"]
        
        # 确保 action 是 numpy 数组且为 1D
        if isinstance(action, torch.Tensor):
            action = action.numpy()
        
        # --- 关键验证逻辑 ---
        if apply_flip_fix:
            action = action * flip_mask
            
        # 执行动作
        try:
            observation, reward, terminated, truncated, info = env.step(action)
        except AssertionError as e:
            print(f"Error at step {i} (frame {frame_idx}): {e}")
            print(f"Action shape: {action.shape}, Action ndim: {action.ndim}")
            break
        
        # 渲染图像 (兼容不同的 gym_aloha 版本)
        frame = None
        try:
            # 尝试直接 render
            rendered = env.render()
            if rendered is not None and rendered.shape[2] == 3: # HWC
                frame = rendered
        except Exception:
            pass
        
        # 如果 render 失败，尝试从 observation 获取 'top' 相机
        if frame is None:
            if 'pixels' in observation and 'top' in observation['pixels']:
                cam_data = observation['pixels']['top']
                # 转换维度 [C, H, W] -> [H, W, C]
                if cam_data.shape[0] == 3:
                    frame = np.transpose(cam_data, (1, 2, 0))
                else:
                    frame = cam_data

        if frame is not None:
            frames.append(frame.astype(np.uint8))
            
        if terminated or truncated:
            print(f"Episode terminated early by env at step {i}")
            break

    env.close()

    # 保存视频
    output_path = Path(output_filename)
    if len(frames) > 0:
        imageio.mimsave(output_path, frames, fps=30)
        print(f"\n✅ 视频已保存至: {output_path.absolute()}")
    else:
        print("\n❌ 未能录制到任何帧。")

    print("-" * 50)
    if apply_flip_fix:
        print(f"模式: [已应用翻转修复 (Simulating adapt_to_pi=True)]")
        print(f"如果此视频中机器人动作正常，说明 OpenPi 的默认设置是正确的。")
    else:
        print(f"模式: [原始数据直通 (Raw Dataset -> Env)]")
        print(f"如果此视频中机器人动作正常，说明你的环境与数据集定义【完全一致】，不需要 OpenPi 的翻转。")
    print("-" * 50)

if __name__ == "__main__":
    # 配置
    DATASET = "lerobot/aloha_sim_insertion_human"
    ENV_TASK = "gym_aloha/AlohaInsertion-v0"
    
    # 选择你想查看的 Episode 索引 (例如 0, 1, 2...)
    MY_EPISODE_INDEX = 5
    
    # 运行 1: 原始数据直通
    verify_dataset_env_match(
        dataset_repo_id=DATASET, 
        env_task_name=ENV_TASK, 
        episode_index=MY_EPISODE_INDEX,
        output_filename=f"replay_ep{MY_EPISODE_INDEX}_raw.mp4",
        apply_flip_fix=False
    )
    
    # 运行 2: 应用翻转
    verify_dataset_env_match(
        dataset_repo_id=DATASET, 
        env_task_name=ENV_TASK, 
        episode_index=MY_EPISODE_INDEX,
        output_filename=f"replay_ep{MY_EPISODE_INDEX}_flip.mp4",
        apply_flip_fix=True
    )

In [None]:
import cv2
import os
import pathlib

def extract_frames_to_images(video_path, output_dir=None):
    """
    将视频逐帧保存为图片文件
    
    参数:
    video_path: 视频文件路径
    output_dir: 输出目录，如果不指定则使用视频所在目录
    """
    # 检查视频文件是否存在
    if not os.path.exists(video_path):
        print(f"错误: 视频文件 {video_path} 不存在")
        return
    
    # 如果没有指定输出目录，则使用视频文件所在目录
    if output_dir is None:
        output_dir = os.path.dirname(video_path)
    
    # 创建输出目录
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
    
    # 打开视频文件
    cap = cv2.VideoCapture(video_path)
    
    # 检查视频是否成功打开
    if not cap.isOpened():
        print(f"错误: 无法打开视频文件 {video_path}")
        return
    
    # 获取视频基本信息
    fps = cap.get(cv2.CAP_PROP_FPS)
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    
    print(f"视频信息:")
    print(f"  - 路径: {video_path}")
    print(f"  - FPS: {fps}")
    print(f"  - 总帧数: {frame_count}")
    print(f"  - 输出目录: {output_dir}")
    
    frame_number = 0
    
    # 逐帧读取并保存
    while True:
        ret, frame = cap.read()
        
        if not ret:
            break
        
        # 生成帧文件名
        frame_filename = os.path.join(output_dir, f"frame_{frame_number:04d}.png")
        
        # 保存帧为PNG文件
        cv2.imwrite(frame_filename, frame)
        
        frame_number += 1
        
        # 显示进度
        if frame_number % 30 == 0 or frame_number == frame_count:
            print(f"已处理 {frame_number}/{frame_count} 帧")
    
    # 释放视频捕获对象
    cap.release()
    
    print(f"完成! 共保存 {frame_number} 帧到 {output_dir}")

if __name__ == "__main__":
    # 指定视频文件路径
    video_file = "data/aloha_sim/videos/out_0.mp4"
    
    # 提取帧并保存为图片
    extract_frames_to_images(video_file)

In [25]:
import gymnasium
import gym_aloha
import numpy as np
import imageio
import torch
from pathlib import Path

# 尝试导入 LeRobot 数据集加载器
try:
    from lerobot.common.datasets.lerobot_dataset import LeRobotDataset
except ImportError:
    try:
        from lerobot.datasets.lerobot_dataset import LeRobotDataset
    except ImportError:
        print("错误: 无法导入 LeRobotDataset。请确保安装了 lerobot 包 (pip install -e .)")
        exit(1)

def print_first_frame_joint_positions(dataset_repo_id: str, num_episodes: int = 5):
    """
    打印每个episode中第一帧的关节姿态
    
    参数:
    dataset_repo_id: 数据集ID
    num_episodes: 要检查的episode数量
    """
    print(f"Loading dataset: {dataset_repo_id}...")
    # 强制使用 pyav 后端以避免 torchcodec 错误
    dataset = LeRobotDataset(dataset_repo_id, video_backend="pyav")
    
    # 加载元数据
    if dataset.meta.episodes is None:
        dataset.meta.load_metadata()
    
    # 遍历每个episode
    for episode_index in range(min(num_episodes, len(dataset.meta.episodes))):
        episode_meta = dataset.meta.episodes[episode_index]
        
        # 获取起始帧索引
        from_idx = episode_meta["dataset_from_index"]
        if isinstance(from_idx, torch.Tensor): 
            from_idx = from_idx.item()
        
        # 读取第一帧数据
        item = dataset[from_idx]
        
        # 打印关节位置信息
        print(f"Episode {episode_index}:")
        joint_positions = item["observation.state"]
        if isinstance(joint_positions, torch.Tensor):
            joint_positions = joint_positions.numpy()
        print(f"  Joint positions: {joint_positions}")
        print()

if __name__ == "__main__":
    # 配置
    DATASET = "lerobot/aloha_sim_insertion_human"
    
    # 打印前5个episode的第一帧关节姿态
    print_first_frame_joint_positions(DATASET, num_episodes=5)

Loading dataset: lerobot/aloha_sim_insertion_human...
Episode 0:
  Joint positions: [ 0.   -0.96  1.16  0.   -0.3   0.    0.    0.   -0.96  1.16  0.   -0.3
  0.    0.  ]

Episode 1:
  Joint positions: [ 0.   -0.96  1.16  0.   -0.3   0.    0.    0.   -0.96  1.16  0.   -0.3
  0.    0.  ]

Episode 2:
  Joint positions: [ 0.   -0.96  1.16  0.   -0.3   0.    0.    0.   -0.96  1.16  0.   -0.3
  0.    0.  ]

Episode 3:
  Joint positions: [ 0.   -0.96  1.16  0.   -0.3   0.    0.    0.   -0.96  1.16  0.   -0.3
  0.    0.  ]

Episode 4:
  Joint positions: [ 0.   -0.96  1.16  0.   -0.3   0.    0.    0.   -0.96  1.16  0.   -0.3
  0.    0.  ]





In [26]:
import gym_aloha
import os

# 获取 gym_aloha 的安装路径
package_path = os.path.dirname(gym_aloha.__file__)
constants_path = os.path.join(package_path, 'constants.py')

print(f"你需要修改的文件路径是: {constants_path}")

你需要修改的文件路径是: /home/user/anaconda3/envs/openpi/lib/python3.11/site-packages/gym_aloha/constants.py
