In [None]:
from collections import defaultdict


def load_h5(dataset_path, chunk_size=50, camera_names = ['cam_high', 'cam_left_wrist', 'cam_right_wrist'], img_size=(480, 640)):
    with h5py.File(dataset_path, 'r') as root:
        # 加载文本
        if 'language_instruction' in root:
            raw_lang = root['language_instruction'][0].decode('utf-8')
        else:
            raw_lang = ''
        # 加载动作 & 状态
        qpos = root[f'/observations/qpos'][()]
        ee = root[f'/action_eef']
        # 加载图像
        image_dict = defaultdict(list)
        for cam_name in camera_names:
            img_bytes = [root[f'/observations/images/{cam_name}'][i] for i in range(qpos.shape[0])]
            for img_byte in img_bytes:
                image_dict[cam_name].append(cv2.resize(cv2.imdecode(np.frombuffer(img_byte, np.uint8), cv2.IMREAD_COLOR), img_size))
    return {'qpos': qpos, 'ee_state': ee, 'image': {k:np.stack(image_dict[k]) for k in image_dict}}

In [None]:
h5data = load_h5("/inspire/hdd/project/robot-action/public/data/red_cube_and_blue_cup/success_3/episode_0.hdf5")
def get_images_from_data(data):
    images = data['image']
    res = {}
    for k in images:
        print(images[k].shape)
        imgs = [Image.fromarray(img.numpy()) for img in images[k].transpose(0, 2, 3, 1)]
        res[k] = imgs
    return res
res = get_images_from_data(h5data)
import imageio

def save_img_list_to_video(
    img_list,
    video_path: str,
    fps: int = 20,
    codec: str = "libx264",
    quality: int = 8,
    pix_fmt: str = "yuv420p"
) -> None:
    if not img_list:
        raise ValueError("img_list 不能为空")
    # 统一转换为 RGB（避免 RGBA 或灰度图导致编码错误）
    rgb_imgs = [img.convert("RGB") if img.mode != "RGB" else img for img in img_list]
    # 用 imageio-ffmpeg 写视频
    with imageio.get_writer(
        video_path,
        fps=fps,
        codec=codec,
        quality=quality,
        pixelformat=pix_fmt,
    ) as writer:
        for img in rgb_imgs:
            writer.append_data(imageio.core.util.asarray(img))

def save_h5_to_video(file_path):
    episode_name = os.path.split(file_path)[-1].split('.')[0]
    episode_dir = os.path.join(os.path.dirname(file_path), 'video', f'{episode_name}')
    if not os.path.exists(episode_dir): os.makedirs(episode_dir, exist_ok=True)
    h5data = load_h5(file_path)
    images = get_images_from_data(h5data)
    for k in images:
        img_k = images[k]
        video_path = os.path.join(episode_dir, f'{k}.mp4')
        if os.path.exists(video_path): continue
        save_img_list_to_video(img_k, video_path)
    return

# 把episode存成.mp4
data_dir = "/inspire/hdd/project/robot-action/public/data/red_cube_and_blue_cup/success_3"
all_episodes = [os.path.join(data_dir, f) for f in os.listdir(data_dir) if f.endswith('.hdf5')]
for fi in all_episodes:
    save_h5_to_video(fi)
