In [None]:
import os
import functools

import cv2
import numpy as np
from mujoco_py import MujocoException

from tests.metaworld.envs.mujoco.sawyer_xyz.test_scripted_policies_visual import ALL_ENVS, test_cases


In [None]:
def trajectory_generator(env, res=(640, 480), camera='corner'):
    env.reset()
    env.reset_model()
    o = env.reset()

    for _ in range(env.max_path_length):
        try:
            o, r, done, info = env.step(np.hstack((o[:3], 0)))
            # Camera is one of ['corner', 'topview', 'behindGripper', 'gripperPOV']
            yield r, done, info, env.sim.render(*res, mode='offscreen', camera_name=camera)[:,:,::-1]
        except MujocoException:
            r = 0
            done = False
            info = {'success': True}
            return r, done, info, np.zeros((*res, 3))

def writer_for(tag, fps, res):
    if not os.path.exists('movies'):
        os.mkdir('movies')
    return cv2.VideoWriter(
        f'movies/{tag}.avi',
        cv2.VideoWriter_fourcc('M','J','P','G'),
        fps,
        res
    )

In [None]:
resolution = (1920, 1080)
camera = 'gripperPOV' # one of ['corner', 'topview', 'behindGripper', 'gripperPOV']
flip=True # if True, flips output image 180 degrees

config = [
    # env, action noise pct, cycles, quit on success
    ('visual-sandbox-small', 5, True),
]

for env, cycles, quit_on_success in config:
    tag = env

    policy = functools.reduce(lambda a,b : a if a[0] == env else b, test_cases)[1]
    env = ALL_ENVS[env]()
    env._partially_observable = False
    env._freeze_rand_vec = False
    env._set_task_called = True

    writer = writer_for(tag, env.metadata['video.frames_per_second'], resolution)
    for _ in range(cycles):
        for r, done, info, img in trajectory_generator(env, resolution, camera):
            if flip: img = cv2.rotate(img, cv2.ROTATE_180)
            writer.write(img)
            if quit_on_success and info['success']:
                break

    writer.release()