In [26]:
import gymnasium as gym
from torch.distributions import Bernoulli
import torch
import numpy as np
from examples.offline.utils import load_buffer_d4rl

In [47]:
__all__ = ['RewardHighVelocity',
           'RewardUnhealthyPose',
           'RewardScale']


from typing import Any


class RewardHighVelocity(gym.RewardWrapper):
    """Wrapper to modify environment rewards of 'Cheetah','Walker' and
    'Hopper'.

    Penalizes with certain probability if velocity of the agent is greater
    than a predefined max velocity.
    Parameters
    ----------
    kwargs: dict
    with keys:
    'prob_vel_penal': prob of penalization
    'cost_vel': cost of penalization
    'max_vel': max velocity

    Methods
    -------
    step(action): next_state, reward, done, info
    execute a step in the environment.
    """

    def __init__(self, env, **kwargs):
        super(RewardHighVelocity, self).__init__(env)
        self.penal_v_distr = Bernoulli(kwargs['prob_vel_penal'])
        self.penal = kwargs['cost_vel']
        self.max_vel = kwargs['max_vel']
        self.max_step = kwargs['max_step']
        self.step_counter = 0
        allowed_envs = ['Cheetah', 'Hopper', 'Walker']
        assert(any(e in self.env.unwrapped.spec.id for e in allowed_envs)), \
            'Env {self.env.unwrapped.spec.id} not allowed for RewardWrapper'

    def step(self, action):
        observation, reward, terminated, truncated, info = self.env.step(action)
        vel = self.env.sim.data.qvel[0]
        info['risky_state'] = vel > self.max_vel
        info['angle'] = self.env.sim.data.qpos[2]
        self.step_counter += 1

        if self.step_counter > self.max_step:
            truncated = True

        if 'Cheetah' in self.env.unwrapped.spec.id:
            return (observation, self.new_reward(reward, info),
                     terminated, truncated, info)
        if 'Walker' in self.env.unwrapped.spec.id:
            return (observation, self.new_reward(reward, info),
                     terminated, truncated, info)
        if 'Hopper' in self.env.unwrapped.spec.id:
            return (observation, self.new_reward(reward, info),
                     terminated, truncated, info)

    def new_reward(self, reward, info):
        if 'Cheetah' in self.env.unwrapped.spec.id:
            forward_reward = info['reward_run']
        else:
            forward_reward = info['x_velocity']

        penal = info['risky_state'] * \
            self.penal_v_distr.sample().item() * self.penal

        # If penalty applied, substract the forward_reward from total_reward
        # original_reward = rew_healthy + forward_reward - cntrl_cost
        new_reward = penal + reward + (penal != 0) * (-forward_reward)
        return new_reward
    
    def reset(self, *, seed: int | None = None, options: dict[str, Any] | None = None) -> tuple[Any, dict[str, Any]]:
        self.step_counter = 0
        return super().reset(seed=seed, options=options)

    @property
    def name(self):
        return f'{self.__class__.__name__}{self.env}'


class RewardUnhealthyPose(gym.RewardWrapper):
    """Wrapper to modify environment rewards of 'Walker' and 'Hopper'.
    Penalizes with certain probability if pose of the agent doesn't lie
    in a 'robust' state space.
    Parameters
    ----------
    kwargs: dict
    with keys:
    'prob_pose_penal': prob of penalization
    'cost_pose': cost of penalization

    Methods
    -------
    step(action): next_state, reward, done, info
    execute a step in the environment.
    """

    def __init__(self, env, **kwargs):

        super(RewardUnhealthyPose, self).__init__(env)

        self.penal_distr = Bernoulli(kwargs['prob_pose_penal'])
        self.penal = kwargs['cost_pose']
        if 'Walker' in self.env.unwrapped.spec.id:
            self.robust_angle_range = (-0.5, 0.5)
            self.healthy_angle_range = (-1, 1)  # default env

        elif 'Hopper' in self.env.unwrapped.spec.id:
            self.robust_angle_range = (-0.1, 0.1)
            self.healthy_angle_range = (-0.2, 0.2)  # default env

        else:
            raise ValueError('Environment is not Walker neither Hopper '
                             f'for {self.__class__.__name__}')

    @property
    def is_robust_healthy(self):
        z, angle = self.env.sim.data.qpos[1:3]
        min_angle, max_angle = self.robust_angle_range
        robust_angle = min_angle < angle < max_angle
        is_robust_healthy = robust_angle  # and healthy_z
        return is_robust_healthy

    @property
    def is_healthy(self):
        z, angle = self.env.sim.data.qpos[1:3]
        h_min_angle, h_max_angle = self.healthy_angle_range
        healthy_angle = h_min_angle < angle < h_max_angle
        self.is_healthy = healthy_angle

    def step(self, action):
        observation, reward, done, info = self.env.step(action)
        info['risky_state'] = ~self.is_robust_healthy
        info['angle'] = self.env.sim.data.qpos[2]
        return observation, self.new_reward(reward), done, info

    def new_reward(self, reward):
        # Compute new reward according to penalty probability and agent state:

        # Penalty occurs if agent's pose is not robust with certain prob
        # If env.terminate when unhealthy=False (i.e. episode doesn't finish
        # when unhealthy pose), we do not add penalization when not in
        # healty pose.

        penal = (~self.is_robust_healthy) * (self.is_healthy) *\
            self.penal_distr.sample().item() * self.penal

        new_reward = penal + reward
        return new_reward

    @property
    def name(self):
        return f'{self.__class__.__name__}{self.env}'


class RewardScale(gym.RewardWrapper):
    def __init__(self, env, scale):

        gym.RewardWrapper.__init__(self, env)
        self.scale = scale

    def reward(self, reward):
        return reward * self.scale


In [33]:
task = "HalfCheetah-v3"
task_data = "halfcheetah-medium-v0"
prob_vel_penal = 0.05
max_vel = 4
cost_vel = -70

In [34]:
dataset = load_buffer_d4rl(task_data)

load datafile: 100%|██████████| 5/5 [00:00<00:00,  7.69it/s]


In [35]:
env = gym.make(task)

  logger.deprecation(
  logger.deprecation(


In [48]:
def create_stochastic_dataset_halfcheetah(env, dataset):
    env = RewardHighVelocity(env, prob_vel_penal=prob_vel_penal, max_vel=max_vel, cost_vel=cost_vel)
    done =True
    for i in range(len(dataset)):
        if done:
            env.reset()
        env.set_state(qpos=np.concatenate(([i],dataset.obs[i][:8])), qvel=dataset.obs[i][8:])
        act, _, done = dataset.act[i], dataset.obs_next[i], dataset.done[i]
        _, rew, _, _, _ =  env.step(act)
        dataset.rew[i] = rew
        
    return dataset
        

In [37]:
stochastic_dataset = create_stochastic_dataset_halfcheetah(env, dataset)

In [38]:
# save
stochastic_dataset.save_hdf5(f"tianshou_buffer_{task_data}_prob{prob_vel_penal}_vel{max_vel}_cost{cost_vel}.hdf5")