In [1]:
import os
import h5py
import cv2
import numpy as np
from pathlib import Path
import matplotlib.pyplot as plt


def load_hdf5(dataset_path):
    if not os.path.isfile(dataset_path):
        print(f"Dataset does not exist at \n{dataset_path}\n")
        exit()

    with h5py.File(dataset_path, "r") as root:
        left_gripper, left_arm = (
            root["/joint_action/left_gripper"][()],
            root["/joint_action/left_arm"][()],
        )
        right_gripper, right_arm = (
            root["/joint_action/right_gripper"][()],
            root["/joint_action/right_arm"][()],
        )
        image_dict = dict()
        for cam_name in root[f"/observation/"].keys():
            image_dict[cam_name] = root[f"/observation/{cam_name}/rgb"][()]
        third_view_rgb = root["/third_view_rgb"][()]
        pointcloud = root["/pointcloud"][()]

    return left_gripper, left_arm, right_gripper, right_arm, image_dict, third_view_rgb, pointcloud

In [2]:
# image = cv2.imdecode(np.frombuffer(image_bit, np.uint8), cv2.IMREAD_COLOR)

dataset_path = Path("../data/adjust_bottle/test1/data/episode0.hdf5")

lgripper, larm, rgripper, rarm, image_dict, third_view_rgb, pointcloud = load_hdf5(dataset_path)


In [None]:
# print(image_dict.keys())
# print(np.shape(image_dict["front_camera"]))
# print(np.shape(image_dict["head_camera"]))
# print(np.shape(image_dict["left_camera"]))
# print(np.shape(image_dict["right_camera"]))
# print(type(image_dict["front_camera"][0]))
# print(np.shape(third_view_rgb))

def byte_list_to_video(input_byte_list, output_video_path, encoder_code="avc1", fps=30, overwrite=True):
    images = [cv2.imdecode(np.frombuffer(image_bit, np.uint8), cv2.IMREAD_COLOR) for image_bit in input_byte_list]

    n, h, w, c = np.shape(images)
    fourcc = cv2.VideoWriter_fourcc(*encoder_code)

    output_file = Path(output_video_path)
    if output_file.exists():
        if not overwrite:
            print(f"Warning: file {output_video_path} already exists, exiting...")
            exit()
        else:
            print(f"Warning: file {output_video_path} already exists, overwriting...")
            output_file.unlink()
    video = cv2.VideoWriter(output_video_path, fourcc, fps, (w, h))

    if video.isOpened():
        for image in images:
            video.write(image)
        video.release()
        print(f"Video saved to: {output_video_path}")
    else:
        print(f"Error: failed to create video file {output_video_path}")

In [21]:
byte_list_to_video(image_dict["front_camera"], str(dataset_path.parent.joinpath("front_camera.mp4")))
byte_list_to_video(image_dict["head_camera"], str(dataset_path.parent.joinpath("head_camera.mp4")))
byte_list_to_video(image_dict["left_camera"], str(dataset_path.parent.joinpath("left_camera.mp4")))
byte_list_to_video(image_dict["right_camera"], str(dataset_path.parent.joinpath("right_camera.mp4")))
byte_list_to_video(third_view_rgb, str(dataset_path.parent.joinpath("third_view_rgb.mp4")))


Video saved to: ../data/adjust_bottle/test1/data/front_camera.mp4
Video saved to: ../data/adjust_bottle/test1/data/head_camera.mp4
Video saved to: ../data/adjust_bottle/test1/data/left_camera.mp4
Video saved to: ../data/adjust_bottle/test1/data/right_camera.mp4
File "../data/adjust_bottle/test1/data/third_view_rgb.mp4" exists, overwriting...
Video saved to: ../data/adjust_bottle/test1/data/third_view_rgb.mp4
