In [1]:
import gym
from typing import Optional


class DynaBasedEnv(gym.Env):
    """Custom Environment that follows gym interface."""

    metadata = {"render_modes": ["human"], "render_fps": 30}

    def __init__(
            self, 
            main_env,
            expert_traj,
            reward_estimator = None,
            render_mode: Optional[str] = None,
            ):
        super().__init__()
        # parent environment
        self.main_env = main_env
        # reference trajectory
        # self.__indeces = 93+np.array([1,2,3,4,5,6,8,9,10,11,12,13,33,34,38,45,46,50]).reshape(-1)
        self.__indeces = np.array([list(range(3*x,3*x+3,1)) for x in [2,7,19,26,10,16]]).reshape(-1)
        self.expert_traj = expert_traj
        target_bodies = self.expert_traj['body_positions'].reshape(-1,93)
        target_joints = self.expert_traj['joints'].reshape(-1,56)
        self.target_state = torch.tensor(np.concatenate((target_bodies,target_joints), axis=-1)).numpy()#[:,self.__indeces]
        # modified spaces
        self.action_space = spaces.Box(low=float('-1'), high=float('1'), shape=(56,), dtype=np.float32)
        self.observation_space = spaces.Box(low=float('-inf'), high=float('inf'), shape=(149,), dtype=np.float32)
        self.render_mode = render_mode
        self.reward_estimator = reward_estimator

    def step(self, action):
        # current state
        current_state = np.concatenate((_utils.get_features(physics=self.main_env.dm_env.physics, walker=self.main_env.dm_env._task._walker, props=[])['body_positions'].reshape(-1),
                      np.array(self.main_env.dm_env.physics.bind(self.main_env.dm_env._task._walker.mocap_joints).qpos)), axis=-1)

        # observation, reward, terminated, truncated, info
        _, rew, done, _ = self.main_env.step(action)

        # trajectory tracking reward
        _current_time = self.main_env.dm_env._task._time_step
        
        next_state = np.concatenate((_utils.get_features(physics=self.main_env.dm_env.physics, walker=self.main_env.dm_env._task._walker, props=[])['body_positions'].reshape(-1),
                      np.array(self.main_env.dm_env.physics.bind(self.main_env.dm_env._task._walker.mocap_joints).qpos)), axis=-1)
        
        # original open ai error
        # error_joints = np.max(np.abs(next_state[:93]-self.target_state[_current_time, :93]))
        error_bodies = np.max(np.abs(next_state[self.__indeces]- self.target_state[_current_time, self.__indeces]))
        # rew = -(0.5*error_bodies+0.5*error_joints)
        rew = 1-error_bodies

        # if self.reward_estimator is not None:
        #     rew = rew + 0.1*self.reward_estimator(torch.tensor(current_state, dtype=torch.float32),torch.tensor(action, dtype=torch.float32)).detach().cpu().numpy().reshape((-1))

        if (done is True) and (_current_time <= 204-1): rew = np.zeros_like(rew)

        # rew = -(chebyshev(self.target_state[_current_time], next_state[self.__indeces])**2)

        return next_state, rew, done, False, {"TimeLimit.truncated":None}

    def reset(self, seed=None, options=None):
        _, done = self.main_env.reset(), False
        return np.concatenate((_utils.get_features(physics=self.main_env.dm_env.physics, walker=self.main_env.dm_env._task._walker, props=[])['body_positions'].reshape(-1),
                              np.array(self.main_env.dm_env.physics.bind(self.main_env.dm_env._task._walker.mocap_joints).qpos)), axis=-1), done

    def render(self, mode='human', close=False):
        return self.main_env.render("rgb_array")
        
        
    def close(self):
        self.main_env.close()


In [2]:
from sac_modified import SAC
from mocapact.envs import tracking
from dm_control.locomotion.tasks.reference_pose import types
import numpy as np
import torch, pickle
from gym import spaces

# region file path
# path information
root_folder = "E:\MoCAP\MCDH\\root_1"
policy_model_path = f"{root_folder}\\sac_model_3 - Copy.zip"
dynamic_model_path = f"{root_folder}\d2_060_035.pt"
reward_model_path = f"{root_folder}\\r1_512.pt"
dynamic_model_backup_path = f"{root_folder}\d2_060_035_backup.pt"
reward_model_backup_path = f"{root_folder}\\r1_64_backup.pt"
replay_buffer_path = f"{root_folder}\\replay_buffer.pt"
dynamic_model_replay_buffer = f"{root_folder}\dyna_replay_buffer.pt"
logger_path = f"{root_folder}\logs"
# logger2_path = f"{root_folder}\logs2"
logger2_path = logger_path
reference_trajectory_path = f"{root_folder}/traj_info.np"
# endregion
# expert info
dataset = types.ClipCollection(ids=['CMU_075_09'], start_steps=[0], end_steps=[194])
# referenct trajectory information
with open(reference_trajectory_path, "rb") as f: reference_info = pickle.load(f)
# environment
env  = tracking.MocapTrackingGymEnv(dataset, task_kwargs=dict(ghost_offset=np.array([0., 0., 0.])),)
denv = DynaBasedEnv(env, reference_info, None)
# policy model
policy_kwargs = dict(
    net_arch=dict(pi=3*[1024], qf=3*[1024]),
    activation_fn=torch.nn.ReLU,
)
lr_schedule = 3e-5
format_strings = ['csv', 'tensorboard', 'stdout']
model = SAC.load(policy_model_path, env=denv, )
model.load_replay_buffer(replay_buffer_path)


pygame 2.1.3 (SDL 2.26.5, Python 3.10.10)
Hello from the pygame community. https://www.pygame.org/contribute.html


  if not hasattr(tensorboard, "__version__") or LooseVersion(
  ) < LooseVersion("1.15"):


[INFO] : meta-information updated successfully.




Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.


In [3]:
x = model.replay_buffer.sample(80,reward_training=True)