In [2]:
import docopt

In [3]:
import torch

In [4]:
import gymnasium as gym

In [5]:
import pybullet_envs_gymnasium 

In [6]:
from gymnasium import spaces
import numpy as np


######################################################################################
class POMDPWrapper(gym.Wrapper):
    def __init__(self, env, partially_obs_dims: list):
        super().__init__(env)
        self.partially_obs_dims = partially_obs_dims
        # can equal to the fully-observed env
        assert 0 < len(self.partially_obs_dims) <= self.observation_space.shape[0]

        self.observation_space = spaces.Box(
            low=self.observation_space.low[self.partially_obs_dims],
            high=self.observation_space.high[self.partially_obs_dims],
            dtype=np.float32,
        )

        if self.env.action_space.__class__.__name__ == "Box":
            self.act_continuous = True
            # if continuous actions, make sure in [-1, 1]
            # NOTE: policy won't use action_space.low/high, just set [-1,1]
            # this is a bad practice...
        else:
            self.act_continuous = False
        self.true_state = None
    # def seed(self, seed):
    #     self.env.seed(seed)

    def get_obs(self, state):
        return state[self.partially_obs_dims].copy()

    def get_unobservable(self):
        unobserved_dims = [i for i in range(self.true_state.shape[0]) if i not in self.partially_obs_dims]
        return self.true_state[unobserved_dims].copy()

    def reset(self, seed, **kwargs):
        state, _ = self.env.reset(seed=seed)  # no kwargs
        self.true_state = state
        return self.get_obs(state), {}

    def step(self, action):
        if self.act_continuous:
            # recover the action
            action = np.clip(action, -1, 1)  # first clip into [-1, 1]
            lb = self.env.action_space.low
            ub = self.env.action_space.high
            action = lb + (action + 1.0) * 0.5 * (ub - lb)
            action = np.clip(action, lb, ub)

        state, reward, trunc, term, info = self.env.step(action)
        self.true_state = state
        return self.get_obs(state), reward, trunc, term, info
    

def make_env(env_id, seed):
    def thunk():
        # if capture_video and idx == 0:
        #     env = gym.make(env_id, render_mode="rgb_array")
        #     env = gym.wrappers.RecordVideo(env, f"videos/{run_name}")
        # else:
        #     env = gym.make(env_id)

        if env_id == "HalfCheetahBLT-V-v0":
            env = POMDPWrapper(gym.make("HalfCheetahBulletEnv-v0"), partially_obs_dims=[3, 4, 5, 9, 11, 13, 15, 17, 19])
        elif env_id == "HalfCheetahBLT-P-v0":
            env = POMDPWrapper(gym.make("HalfCheetahBulletEnv-v0"), partially_obs_dims=[0, 1, 2, 6, 7, 8, 10, 12, 14, 16, 18, 20, 21, 22, 23, 24, 25])
        elif env_id == "AntBLT-V-v0":
            env = POMDPWrapper(gym.make("AntBulletEnv-v0"), partially_obs_dims=[3, 4, 5, 9, 11, 13, 15, 17, 19, 21, 23])
        elif env_id == "AntBLT-P-v0":
            env = POMDPWrapper(gym.make("AntBulletEnv-v0"), partially_obs_dims=[0, 1, 2, 6, 7, 8, 10, 12, 14, 16, 18, 20, 22, 24, 25, 26, 27])
        elif env_id == "WalkerBLT-V-v0":
            env = POMDPWrapper(gym.make("Walker2DBulletEnv-v0"), partially_obs_dims=[3, 4, 5, 9, 11, 13, 15, 17, 19])
        elif env_id == "WalkerBLT-P-v0":
            env = POMDPWrapper(gym.make("Walker2DBulletEnv-v0"), partially_obs_dims=[0, 1, 2, 6, 7, 8, 10, 12, 14, 16, 18, 20, 21])
        elif env_id == "HopperBLT-V-v0":
            env = POMDPWrapper(gym.make("HopperBulletEnv-v0"), partially_obs_dims=[3, 4, 5, 9, 11, 13])
        elif env_id == "HopperBLT-P-v0":
            env = POMDPWrapper(gym.make("HopperBulletEnv-v0"), partially_obs_dims=[0, 1, 2, 6, 7, 8, 10, 12, 14])
        else:
            assert 0

        env = gym.wrappers.RecordEpisodeStatistics(env)
        env.action_space.seed(seed)

        return env


    return thunk    
############################################################################    


class GPUObservationWrapper(gym.ObservationWrapper):
    def __init__(self, env: gym.Env, device: torch.device):
        super().__init__(env)
        self.device = device  # Устройство (GPU или CPU)

    def reset(self, **kwargs):
        # Получаем начальное наблюдение
        obs = self.env.reset(**kwargs)
        obs_tensor = self._to_tensor(obs[0]).unsqueeze(0)
        #return {'state': obs_tensor}
        return obs_tensor
        
    def step(self, action):
        # Выполняем шаг в среде
        obs, reward, done, info, _ = self.env.step(action)
        
        # Конвертируем наблюдения, вознаграждения и флаги завершения в тензоры
        obs_tensor = self._to_tensor(obs).unsqueeze(0)
        reward_tensor = self._to_tensor(np.array(reward, dtype=np.float32))
        done_tensor = self._to_tensor(np.array(done, dtype=np.bool_))
        
        #return {'state': obs_tensor}, reward_tensor, done_tensor, info
        return obs_tensor, reward_tensor, done_tensor, info

    def _to_tensor(self, obs: np.ndarray):
        if isinstance(obs, np.ndarray):
            if np.issubdtype(obs.dtype, np.bool_):
                tensor = torch.from_numpy(obs).to(torch.bool)
            elif np.issubdtype(obs.dtype, np.floating):
                tensor = torch.from_numpy(obs).float()
            elif np.issubdtype(obs.dtype, np.integer):
                tensor = torch.from_numpy(obs).long()
            else:
                # Для остальных типов данных используем float по умолчанию
                tensor = torch.tensor(obs, dtype=torch.float32)
            return tensor.to(self.device)
        return obs

    def seed(self, seed: int = None):
        # Вызываем seed метода оригинальной среды один раз
        return self.env.seed(seed)


def env_constructor(env_name: str, seed: int = 1, obs_indices: list = None):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    env = gym.vector.SyncVectorEnv([make_env(env_name, seed)])# Один раз устанавливаем seed здесь
    
    # if obs_indices is not None:
    #     env = PartialObservation(env, obs_indices)
    
    env = GPUObservationWrapper(env, device)
 
    return env, env.observation_space.shape[-1], env.action_space.shape[-1]

In [7]:
env, o_s, a_s = env_constructor("HalfCheetahBLT-V-v0", 1, None)

pybullet build time: Jan 29 2025 23:20:52


In [8]:
env.reset(seed=2)


argv[0]=
argv[0]=


tensor([[[0., 0., 0., 0., 0., 0., 0., 0., 0.]]], device='cuda:0')

In [9]:
env = gym.vector.SyncVectorEnv([make_env("HalfCheetahBLT-V-v0", 1)])

In [10]:
d = env.reset()
t = 0
done = False

argv[0]=
argv[0]=


In [13]:
while done == False:
    t += 1
    state, r, trunc, done, _ = env.step(env.action_space.sample())
    print(state.shape)

In [16]:
state.shape

(1, 9)

In [40]:
envs = make_env("HalfCheetahBLT-V-v0", 1)

In [41]:
envs = gym.vector.SyncVectorEnv([make_env("HalfCheetahBLT-V-v0", 1)])

In [47]:
envs.action_space.sample()

array([[-0.34053656,  0.5768574 , -0.39361033, -0.09300422, -0.7319166 ,
        -0.19377403]], dtype=float32)

In [17]:
import gymnasium

In [18]:
gymnasium.__version__

'0.29.1'

In [26]:
env = gym.make("HalfCheetahBulletEnv-v0")

In [21]:
o = env.reset()

argv[0]=
argv[0]=


In [23]:
o = env.step(env.action_space.sample())

In [None]:
env.

<TimeLimit<OrderEnforcing<PassiveEnvChecker<HalfCheetahBulletEnv<HalfCheetahBulletEnv-v0>>>>>

In [18]:
def mean_padding(tensor, K):
    if len(tensor.shape) == 3:
        num_envs, context, state_dim = tensor.shape
        if context >= K:
            return tensor #[:, :, :K, :] 
        mean_state = tensor[:,0,:].unsqueeze(1)
        pad_tensor = mean_state.expand(num_envs, K - context, state_dim)
        padded_tensor = torch.cat([pad_tensor, tensor], dim=1)
        print(f"Padded to context {K}")
        return padded_tensor
    
    else:    
        num_envs, batch_size, context, state_dim = tensor.shape
        if context >= K:
            return tensor #[:, :, :K, :] 
        mean_state = tensor[:,:,0,:].unsqueeze(2)
        pad_tensor = mean_state.expand(num_envs, batch_size, K - context, state_dim)
        padded_tensor = torch.cat([pad_tensor, tensor], dim=2)
        print(f"Padded to context {K}")
        return padded_tensor

In [32]:
s = torch.rand(10,  9, 33)

In [34]:
 mean_padding(s, 10)

Padded to context 10


tensor([[[0.9324, 0.4218, 0.2059,  ..., 0.9033, 0.2874, 0.5513],
         [0.9324, 0.4218, 0.2059,  ..., 0.9033, 0.2874, 0.5513],
         [0.7634, 0.3416, 0.6589,  ..., 0.0961, 0.5652, 0.8297],
         ...,
         [0.5756, 0.1285, 0.7396,  ..., 0.1814, 0.8996, 0.6912],
         [0.7203, 0.5278, 0.6679,  ..., 0.2937, 0.5709, 0.2885],
         [0.1177, 0.5347, 0.3860,  ..., 0.5387, 0.9451, 0.3670]],

        [[0.4956, 0.9384, 0.8911,  ..., 0.3051, 0.2602, 0.6519],
         [0.4956, 0.9384, 0.8911,  ..., 0.3051, 0.2602, 0.6519],
         [0.3810, 0.4067, 0.3000,  ..., 0.2745, 0.2070, 0.6879],
         ...,
         [0.7535, 0.8238, 0.0327,  ..., 0.0889, 0.5228, 0.9270],
         [0.2375, 0.9383, 0.7587,  ..., 0.7395, 0.1385, 0.6115],
         [0.5802, 0.1373, 0.1429,  ..., 0.1372, 0.1281, 0.5293]],

        [[0.9900, 0.2496, 0.3874,  ..., 0.6468, 0.9998, 0.7530],
         [0.9900, 0.2496, 0.3874,  ..., 0.6468, 0.9998, 0.7530],
         [0.3298, 0.1811, 0.4103,  ..., 0.7697, 0.4648, 0.

In [31]:
 mean_padding(s, 11).shape

Padded to context 11


torch.Size([10, 11, 33])