In [48]:
import gymnasium as gym
from gymnasium.envs.registration import register
from abc import ABC
import renderlab
import copy
import numpy as np
import json
import cv2

from policies import LinearGaussianPolicy, LinearPolicy
from envs.utils import ActionBoundsIdx

In [49]:
def evaluate(env, policy):
    """
    Evaluate a RL agent
    :param env: (Env object) the Gym environment
    :param policy: (BasePolicy object) the policy in stable_baselines3
    :param gamma: (float) the discount factor
    :param num_episodes: (int) number of episodes to evaluate it
    :return: (float) Mean reward for the last num_episodes
    """
    episode_rewards = []
    done = False
    obs, _ = env.reset()
    frames = []
    
    #while not done: # iterate over the steps until termination
    for _ in range (1000):
        action = policy.draw_action(obs)
        obs, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated
        episode_rewards.append(reward) # compute discounted reward
        
        frames.append(env.render())

    mean_episode_reward = np.mean(episode_rewards)
    std_episode_reward = np.std(episode_rewards)
    print("Mean reward:", mean_episode_reward,
          "Std reward:", std_episode_reward)

    return frames

In [50]:
def create_video(source, fps=60, output_name='/Users/leonardo/Desktop/Thesis/MagicRL/videos'):
    out = cv2.VideoWriter(output_name + '.mp4', cv2.VideoWriter_fourcc(*'mp4v'), fps, (source[0].shape[1], source[0].shape[0]))
    for i in range(len(source)):
        out.write(source[i])
    out.release()

In [51]:
class CostHopper(gym.Env, ABC):
    """Hopper environment with added cost functionality."""
    metadata = {"render.modes": ["rgb_array"], "video.frames_per_second": 24}
    def __init__(
        self, horizon: int = 0, gamma: float = 0.99, verbose: bool = False, clip: bool = False, render_mode: str = None
    ) -> None:
        """Initializes the CostHopper environment."""
        # Initialize base attributes
        self.state = None
        self.horizon = horizon
        assert 0 <= gamma <= 1, "[ERROR] Invalid Discount Factor value."
        self.gamma = gamma
        self.time = 0
        self.verbose = verbose
        self.state_dim = 0
        self.action_dim = 0
        self.clip = clip
        self.with_costs = True
        self.how_many_costs = 1
        self.continuous_env = True

        # Set render mode and gym environment
        self.render_mode = render_mode
        self.gym_env = gym.make('Hopper-v4', render_mode=render_mode)
        self.action_bounds = [-1, 1]
        self.state_dim = self.gym_env.observation_space.shape[0]    # 11
        self.action_dim = self.gym_env.action_space.shape[0]        # 3
        self.action_space = self.gym_env.action_space
        self.observation_space = self.gym_env.observation_space

    def step(self, action):
        """Takes an action in the environment and computes the cost."""
        if self.clip:
            clipped_action = np.clip(
                action,
                self.action_bounds[ActionBoundsIdx.lb],
                self.action_bounds[ActionBoundsIdx.ub],
                dtype=np.float64
            )
        else:
            clipped_action = action

        slack = action - clipped_action
        cost = np.linalg.norm(slack) if slack.any() > 0 else 0

        obs, reward, done, _ , _ = self.gym_env.step(clipped_action)
        self.state = copy.deepcopy(obs)
        info = {"costs": np.array([cost], dtype=np.float64)}

        return obs, reward, done, None, info

    def reset(self, seed = None, options = None):
        """Resets the environment."""
        obs = self.gym_env.reset()
        self.state = copy.deepcopy(obs[0])
        return obs

    def render(self):
        """Renders the environment."""
        return self.gym_env.render()

    def close(self):
        """Closes the environment."""
        self.gym_env.close()

    def sample_action(self):
        """Samples a random action from the action space."""
        return self.gym_env.action_space.sample()

    def sample_state(self, args: dict = None):
        """Samples a random state from the observation space."""
        return self.gym_env.observation_space.sample()

    def set_state(self, state):
        """Sets the state of the environment."""
        self.state = state

register(
    id='CostHopper-v4',
    entry_point='__main__:CostHopper',
    max_episode_steps=100,
    kwargs={'render_mode': 'rgb_array'}, 
)

In [52]:
class CostSwimmer(gym.Env, ABC):
    """Hopper environment with added cost functionality."""
    metadata = {"render.modes": ["human", "rgb_array"], "video.frames_per_second": 24}
    def __init__(
        self, horizon: int = 0, gamma: float = 0.99, verbose: bool = False, clip: bool = False, render_mode: str = None
    ) -> None:
        """Initializes the CostHopper environment."""
        # Initialize base attributes
        self.state = None
        self.horizon = horizon
        assert 0 <= gamma <= 1, "[ERROR] Invalid Discount Factor value."
        self.gamma = gamma
        self.time = 0
        self.verbose = verbose
        self.state_dim = 0
        self.action_dim = 0
        self.clip = clip
        self.with_costs = True
        self.how_many_costs = 1
        self.continuous_env = True

        # Set render mode and gym environment
        self.render_mode = render_mode
        self.gym_env = gym.make('Swimmer-v4', render_mode=render_mode)
        self.action_bounds = [-1, 1]
        self.state_dim = self.gym_env.observation_space.shape[0]    # 11
        self.action_dim = self.gym_env.action_space.shape[0]        # 3
        self.action_space = self.gym_env.action_space
        self.observation_space = self.gym_env.observation_space

    def step(self, action):
        """Takes an action in the environment and computes the cost."""
        if self.clip:
            clipped_action = np.clip(
                action,
                self.action_bounds[ActionBoundsIdx.lb],
                self.action_bounds[ActionBoundsIdx.ub],
                dtype=np.float64
            )
        else:
            clipped_action = action

        slack = action - clipped_action
        cost = np.linalg.norm(slack) if slack.any() > 0 else 0

        obs, reward, done, _ , _ = self.gym_env.step(clipped_action)
        self.state = copy.deepcopy(obs)
        info = {"costs": np.array([cost], dtype=np.float64)}

        return obs, reward, done, None, info

    def reset(self, seed = None, options = None):
        """Resets the environment."""
        obs = self.gym_env.reset()
        self.state = copy.deepcopy(obs[0])
        return obs

    def render(self):
        """Renders the environment."""
        return self.gym_env.render()

    def close(self):
        """Closes the environment."""
        self.gym_env.close()

    def sample_action(self):
        """Samples a random action from the action space."""
        return self.gym_env.action_space.sample()

    def sample_state(self, args: dict = None):
        """Samples a random state from the observation space."""
        return self.gym_env.observation_space.sample()

    def set_state(self, state):
        """Sets the state of the environment."""
        self.state = state

register(
    id='CostSwimmer-v4',
    entry_point='__main__:CostSwimmer',
    max_episode_steps=100,
    kwargs={'render_mode': 'rgb_array'}, 
)


In [53]:
env_eval = gym.make('CostHopper-v4', render_mode = "rgb_array")
# env_eval = renderlab.RenderFrame(env_eval, "/Users/leonardo/Desktop/Thesis/MagicRL/videos")

In [54]:
# import the json with the optimal policy

# Swimmer
"""
with open("/Users/leonardo/Desktop/Thesis/Data/cpgpe_3000_swimmer_300_adam_p0001_d001_linear_batch_100_reg_00001_risk_tc_p16_var_001_a/cpgpe_3000_swimmer_300_adam_p0001_d001_linear_batch_100_reg_00001_risk_tc_p16_var_001_a_trial_0/cpgpe_results.json", "r") as read_file:
    optimal_policy = json.load(read_file)
"""
# Hopper
with open("/Users/leonardo/Desktop/Thesis/Data/cpgpe_3000_hopper_300_adam_p001_d01_linear_batch_100_reg_00001_risk_tc_p33_var_01_a/cpgpe_3000_hopper_300_adam_p001_d01_linear_batch_100_reg_00001_risk_tc_p33_var_01_a_trial_0/cpgpe_results.json", "r") as read_file:
    optimal_policy = json.load(read_file)


In [56]:
# get the optimal policy
best_thetas = np.array(optimal_policy["best_rho"][0])
# set the policy
policy = LinearGaussianPolicy(parameters=np.array(np.split(best_thetas, env_eval.action_dim)),
            dim_state=env_eval.state_dim,
            dim_action=env_eval.action_dim)
# make the policy deterministic
policy.std_dev = 0
policy.sigma_noise = 0

In [57]:
frames = evaluate(env_eval, policy)

Mean reward: -0.0003710601368786612 Std reward: 0.021104402599125775


In [58]:
create_video(frames, 60, "/Users/leonardo/Desktop/Thesis/MagicRL/videos/hopper")