In [None]:
import os 
import numpy as np
import cv2
import IPython
e = IPython.embed
from pathlib import Path
import imageio
import h5py
import json
import tempfile
import lmdb
import pickle
tmp_dir = tempfile.TemporaryDirectory()

dataset_root_path = "/path/to/lmdb_data"
camera_name = "camera_0"

def get_data_path(dataset_root_path):
    for root, dirs, files in os.walk(dataset_root_path):
        for file in files:
            if file.endswith(".json"):
                yield Path(root) / file

def _init_name_to_id(camera_names, idToLabels, background_names):
    background_id = {}
    for camera_name in camera_names:
        background_id[camera_name] = []
        for id, data in idToLabels[camera_name].items():
            name = data["class"]
            if name in background_names:
                background_id[camera_name].append(int(id))
    return background_id

def mix_data_to_video(data_path: Path, camera_name: str):
    background_names={"ground", "table", "BACKGROUND"}
    with open(data_path.parent / "info.json", "r") as f:
        data = json.load(f)
        idToLabels = data["mask_idToLabels"]
    background_id = _init_name_to_id([camera_name], idToLabels, background_names)
    
    with lmdb.open(str(data_path.parent / "lmdb")) as lmdb_env:
        with lmdb_env.begin() as txn:
            action = np.array(pickle.loads(txn.get("action".encode("utf-8"))))
            length = action.shape[0]
            images = []
            for i in range(length):
                sim_image_key = f"observations/sim_images/{camera_name}/{i}"
                sim_image = cv2.imdecode(pickle.loads(txn.get(sim_image_key.encode("utf-8"))), cv2.IMREAD_COLOR)
                render_image_key = f"observations/fix_render_images/{camera_name}"
                if txn.get(render_image_key.encode("utf-8")) is not None:
                    render_image = pickle.loads(txn.get(render_image_key.encode("utf-8")))[0]
                else:
                    render_image_key = f"observations/render_images/{camera_name}/{i}"
                    print(render_image_key)
                    render_image = cv2.imdecode(pickle.loads(txn.get(render_image_key.encode("utf-8"))), cv2.IMREAD_COLOR)
                mask_key = f"observations/mask/{camera_name}/{i}"
                mask = cv2.imdecode(pickle.loads(txn.get(mask_key.encode("utf-8"))), cv2.IMREAD_GRAYSCALE)
                masks = [mask == id for id in background_id[camera_name]]
                masks = np.logical_or.reduce(masks, axis=0)[:, :, None]
                image = masks * render_image + (1 - masks) * sim_image
                images.append(np.uint8(image))
    return np.array(images)

                
iterator = get_data_path(dataset_root_path)


In [None]:

from IPython.display import Video
from loguru import logger
path = next(iterator)
logger.info(f"processing {path}")
images = mix_data_to_video(path, camera_name)
video_path = os.path.join(tmp_dir.name, f"{camera_name}_{np.random.randint(100000)}.mp4")
imageio.mimsave(video_path, np.array(images), fps=30)
Video(video_path, embed=True)