# Measuring Performance of a random agent
This activity will guide you through an important phase of every RL experiment: the measurement of performances and the design of agents. You have to design a random agent using a Python class to modularize and keep the agent independent from the main loop. After that, you have to measure the mean and the variance of the Discounted Return using a batch of 100 episodes. You can use every environment you want, taking into account that the agent’s action should be compatible with the environment. You can design two different types of agent for discrete action spaces and for continuous action spaces.

## Agents

In [1]:
import abc

import numpy as np
import gym

In [2]:
"""
Abstract class representing the agent
Init with the action space and the function pi returning the action
"""


class Agent:
    def __init__(self, action_space: gym.spaces.Space):
        """
        Constructor of the agent class.
        
        Args:
            action_space (gym.spaces.Space): environment action space
        """
        raise NotImplementedError("This class cannot be instantiated.")

    @abc.abstractmethod
    def pi(self, state: np.ndarray) -> np.ndarray:
        """
        Agent's policy.
        
        Args:
            state (np.ndarray): environment state
        
        Returns:
            The selected action
        """
        pass

## Continuous Agent

In [3]:
class ContinuousAgent(Agent):
    def __init__(self, action_space: gym.spaces.Space, seed=46):
        # setup seed
        np.random.seed(seed)
        # check the action space type
        if not isinstance(action_space, gym.spaces.Box):
            raise ValueError("This is a Continuous Agent pass as input a Box Space.")

        # initialize the distribution according to the action space type
        if (action_space.low == -np.inf) and (action_space.high == np.inf):
            # the distribution is a normal distribution
            self._pi = lambda: np.random.normal(loc=0, scale=1, size=action_space.shape)
            return
        if (action_space.low != -np.inf) and (action_space.high != np.inf):
            # the distribution is a uniform distribution
            self._pi = lambda: np.random.uniform(
                low=action_space.low, high=action_space.high, size=action_space.shape
            )
            return
        if action_space.low == -np.inf:
            # negative exponential distribution
            self._pi = (
                lambda: -np.random.exponential(size=action_space.shape)
                + action_space.high
            )
            return
        if action_space.high == np.inf:
            # exponential distribution
            self._pi = (
                lambda: np.random.exponential(size=action_space.shape)
                + action_space.low
            )
            return

    def pi(self, observation: np.ndarray) -> np.ndarray:
        """
        Policy: simply call the internal _pi().
        
        This is a random agent so the action is independent from the observation.
        For real agents the action depends on the observation.
        """
        return self._pi()

## Discrete Agent

In [4]:
class DiscreteAgent(Agent):
    def __init__(self, action_space: gym.spaces.Space, seed=46):
        # setup seed
        np.random.seed(seed)
        # check the action space type
        if not isinstance(action_space, gym.spaces.Discrete):
            raise ValueError("This is a Discrete Agent pass as input a Discrete Space.")

        # initialize the distribution according to the action space n attribute
        # the distribution is a uniform distribution
        self._pi = lambda: np.random.randint(low=0, high=action_space.n)

    def pi(self, observation: np.ndarray) -> np.ndarray:
        """
        Policy: simply call the internal _pi().
        
        This is a random agent so the action is independent from the observation.
        For real agents the action depends on the observation.
        """
        return self._pi()

## Utility function
Utility function to initialize the correct agent based on the action space

In [5]:
def make_agent(action_space: gym.spaces.Space, seed=46):
    """
    Returns the correct agent based on the action space type
    """
    if isinstance(action_space, gym.spaces.Discrete):
        return DiscreteAgent(action_space, seed)
    if isinstance(action_space, gym.spaces.Box):
        return ContinuousAgent(action_space, seed)
    raise ValueError(
        "Only Box spaces or Discrete Spaces are allowed, check the action space of the environment"
    )

## RL Loop

Define the parameters

In [6]:
# Environment Name
env_name = "CartPole-v0"
# Number of episodes
episodes = 10
# Number of Timesteps of each episodes
timesteps = 100
# Discount factor
gamma = 1.0
# seed environment
seed = 46

In [7]:
# Needed to show the environment in a notebook
from gym import wrappers

In [8]:
env = gym.make(env_name)
env.seed(seed)
# the last argument is needed to record all episodes
# otherwise gym would record only some of them
# The monitor saves the episodes inside the folder ./gym-results
env = wrappers.Monitor(
    env, "./gym-results", force=True, video_callable=lambda episode_id: True
)

agent = make_agent(env.action_space, seed)

# list of returns
episode_returns = []

# loop for the episodes
for episode_number in range(episodes):
    # here we are inside an episode

    # reset cumulated gamma
    gamma_cum = 1

    # return of the current episode
    episode_return = 0

    # the reset function resets the environment and returns
    # the first environment observation
    observation = env.reset()

    # loop for the given number of timesteps or
    # until the episode is terminated
    for timestep_number in range(timesteps):

        # render the environment
        # env.render()

        # select the action
        action = agent.pi(observation)

        # apply the selected action by calling env.step
        observation, reward, done, info = env.step(action)

        # increment the return
        episode_return += reward * gamma_cum

        # update the value of cumulated discount factor
        gamma_cum = gamma_cum * gamma

        # if done the episode is terminated, we have to reset
        # the environment
        if done:
            print(
                f"Episode Number: {episode_number}, Timesteps: {timestep_number}, Return: {episode_return}"
            )
            # break from the timestep loop
            break

    episode_returns.append(episode_return)

# close the environment
env.close()

# Calculate return statistics
avg_return = np.mean(episode_returns)
std_return = np.std(episode_returns)
var_return = std_return ** 2  # variance is std^2

print(f"Statistics on Return: Average: {avg_return}, Variance: {var_return}")

Episode Number: 0, Timesteps: 27, Return: 28.0
Episode Number: 1, Timesteps: 9, Return: 10.0
Episode Number: 2, Timesteps: 13, Return: 14.0
Episode Number: 3, Timesteps: 16, Return: 17.0
Episode Number: 4, Timesteps: 31, Return: 32.0
Episode Number: 5, Timesteps: 10, Return: 11.0
Episode Number: 6, Timesteps: 14, Return: 15.0
Episode Number: 7, Timesteps: 11, Return: 12.0
Episode Number: 8, Timesteps: 10, Return: 11.0
Episode Number: 9, Timesteps: 30, Return: 31.0
Statistics on Return: Average: 18.1, Variance: 68.89000000000001


### Rendering

Let's render the episodes inside a notebook

In [9]:
# Render the episodes
import io
import base64
from IPython.display import HTML, display

episodes_to_watch = 1
for episode in range(episodes_to_watch):
    video = io.open(
        f"./gym-results/openaigym.video.{env.file_infix}.video{episode:06d}.mp4", "r+b"
    ).read()
    encoded = base64.b64encode(video)
    display(
        HTML(
            data="""
        <video width="360" height="auto" alt="test" controls><source src="data:video/mp4;base64,{0}" type="video/mp4" /></video>""".format(
                encoded.decode("ascii")
            )
        )
    )

As you can see the episode duration is not too long, this is because the actions are taken at random, thus the pole falls after some timesteps.