# MDP and gym

## Evaluation considerations
- We take into account the correctness of the solutions but also their generality and quality of the code
- Comment and discuss on the results of all your exercises (in a cell immediately after the results). You may also state the difficulties encountered, lessons learned and your understanding of the problem and solution
- Clean-up your code before submission, do not leave unnecessary code attempts, or if you deem it important, leave them in a way that it is easily understood and with comments/discussion
- We also value the originality of the solutions, don't hesitate in performing unrequested additional tasks in relation to the exercises


**NOTE**

Do not try to reproduce exactly the results in this notebook. RL training is very noisy and performances of learned policies can vary a lot, try running your trains several times.



In [99]:
!pip install gymnasium
!pip install gymnasium[accept-rom-license,toy_text]



In [100]:
import gymnasium as gym
from IPython.display import clear_output, HTML, display
import matplotlib.pyplot as plt
%matplotlib notebook

In [101]:
#@title Wrapper for recording an environment into a video

from __future__ import annotations

from copy import deepcopy
from typing import Any, SupportsFloat

from gymnasium.core import ActType, ObsType, RenderFrame, WrapperActType, WrapperObsType
from gymnasium.error import DependencyNotInstalled

class RecordVideo(gym.Wrapper):
    """Adapted from https://github.com/Farama-Foundation/Gymnasium/blob/main/gymnasium/experimental/wrappers/rendering.py#L87
    """

    def __init__(self, env):
        """Initialize a :class:`HumanRendering` instance.
        Args:
            env: The environment that is being wrapped
        """
        super().__init__(env)
        assert env.render_mode in [
            "rgb_array",
            "rgb_array_list",
        ], f"Expected env.render_mode to be one of 'rgb_array' or 'rgb_array_list' but got '{env.render_mode}'"

        if "render_fps" not in env.metadata:
            env.metadata["render_fps"] = 24

        assert (
            "render_fps" in env.metadata
        ), "The base environment must specify 'render_fps' to be used with the HumanRendering wrapper"

        if "human" not in self.metadata["render_modes"]:
            self.metadata = deepcopy(self.env.metadata)
            self.metadata["render_modes"].append("human")

        self.artists = []
        self.figure = None

    @property
    def render_mode(self):
        """Always returns ``'human'``."""
        return "human"

    def step(
        self, action: WrapperActType
    ) -> tuple[WrapperObsType, SupportsFloat, bool, bool, dict]:
        """Perform a step in the base environment and render a frame to the screen."""
        result = super().step(action)
        self._render_frame()
        return result

    def reset(
        self, *, seed: int | None = None, options: dict[str, Any] | None = None
    ) -> tuple[WrapperObsType, dict[str, Any]]:
        """Reset the base environment and render a frame to the screen."""
        result = super().reset(seed=seed, options=options)
        self._render_frame()
        return result

    def video(self):
        """This method renders all frames collected up to now."""
        if self.figure is not None:
            from IPython.display import HTML
            import matplotlib.animation

            animation = matplotlib.animation.ArtistAnimation(self.figure, self.artists,
                                                             interval=1000//self.metadata["render_fps"],
                                                             blit=True,
                                                             repeat=True,
                                                             repeat_delay=2000)
            return HTML(animation.to_html5_video())

        return None

    def _render_frame(self):
        """Fetch the last frame from the base environment and render it to the screen."""
        try:
            import matplotlib.animation
            import numpy as np
        except ImportError:
            raise DependencyNotInstalled(
                "matplotlib is not installed, run `pip install matplotlib`"
            )
        if self.env.render_mode == "rgb_array_list":
            rgb_arrays = self.env.render()
        elif self.env.render_mode == "rgb_array":
            rgb_arrays = [self.env.render()]
        else:
            raise Exception(
                f"Wrapped environment must have mode 'rgb_array' or 'rgb_array_list', actual render mode: {self.env.render_mode}"
            )

        assert isinstance(rgb_arrays, list)

        for rgb_array in rgb_arrays:
            assert isinstance(rgb_array, np.ndarray)

        if self.figure is None:
            self.figure = plt.figure()
            plt.axis('off')

        self.artists.append([plt.imshow(rgb_array) for rgb_array in rgb_arrays])

    def close(self):
        """Close the rendering window."""
        result = self.video()
        super().close()

        return result

In [102]:
import numpy as np
np.set_printoptions(linewidth=100)
from gym.wrappers import RecordVideo

Let's render soe steps to show how to use the `RecordVideo` class.

In [103]:
#@title Wrapper for recording an environment into a video

from __future__ import annotations

from copy import deepcopy
from typing import Any, SupportsFloat

from gymnasium.core import ActType, ObsType, RenderFrame, WrapperActType, WrapperObsType
from gymnasium.error import DependencyNotInstalled

class RecordVideo(gym.Wrapper):
    """Adapted from https://github.com/Farama-Foundation/Gymnasium/blob/main/gymnasium/experimental/wrappers/rendering.py#L87
    """

    def __init__(self, env):
        """Initialize a :class:`HumanRendering` instance.
        Args:
            env: The environment that is being wrapped
        """
        super().__init__(env)
        assert env.render_mode in [
            "rgb_array",
            "rgb_array_list",
        ], f"Expected env.render_mode to be one of 'rgb_array' or 'rgb_array_list' but got '{env.render_mode}'"

        if "render_fps" not in env.metadata:
            env.metadata["render_fps"] = 24

        assert (
            "render_fps" in env.metadata
        ), "The base environment must specify 'render_fps' to be used with the HumanRendering wrapper"

        if "human" not in self.metadata["render_modes"]:
            self.metadata = deepcopy(self.env.metadata)
            self.metadata["render_modes"].append("human")

        self.artists = []
        self.figure = None

    @property
    def render_mode(self):
        """Always returns ``'human'``."""
        return "human"

    def step(
        self, action: WrapperActType
    ) -> tuple[WrapperObsType, SupportsFloat, bool, bool, dict]:
        """Perform a step in the base environment and render a frame to the screen."""
        result = super().step(action)
        self._render_frame()
        return result

    def reset(
        self, *, seed: int | None = None, options: dict[str, Any] | None = None
    ) -> tuple[WrapperObsType, dict[str, Any]]:
        """Reset the base environment and render a frame to the screen."""
        result = super().reset(seed=seed, options=options)
        self._render_frame()
        return result

    def video(self):
        """This method renders all frames collected up to now."""
        if self.figure is not None:
            from IPython.display import HTML
            import matplotlib.animation

            animation = matplotlib.animation.ArtistAnimation(self.figure, self.artists,
                                                             interval=1000//self.metadata["render_fps"],
                                                             blit=True,
                                                             repeat=True,
                                                             repeat_delay=2000)
            return HTML(animation.to_html5_video())

        return None

    def _render_frame(self):
        """Fetch the last frame from the base environment and render it to the screen."""
        try:
            import matplotlib.animation
            import numpy as np
        except ImportError:
            raise DependencyNotInstalled(
                "matplotlib is not installed, run `pip install matplotlib`"
            )
        if self.env.render_mode == "rgb_array_list":
            rgb_arrays = self.env.render()
        elif self.env.render_mode == "rgb_array":
            rgb_arrays = [self.env.render()]
        else:
            raise Exception(
                f"Wrapped environment must have mode 'rgb_array' or 'rgb_array_list', actual render mode: {self.env.render_mode}"
            )

        assert isinstance(rgb_arrays, list)

        for rgb_array in rgb_arrays:
            assert isinstance(rgb_array, np.ndarray)

        if self.figure is None:
            self.figure = plt.figure()
            plt.axis('off')

        self.artists.append([plt.imshow(rgb_array) for rgb_array in rgb_arrays])

    def close(self):
        """Close the rendering window."""
        result = self.video()
        super().close()

        return result

In [104]:
env = RecordVideo(gym.make("FrozenLake-v1", render_mode="rgb_array"))
env.reset()

for _ in range(100):
    observation, reward, terminated, truncated, info = env.step(env.action_space.sample())

    done = terminated or truncated

    if done:
        env.reset()

display(env.video())

<IPython.core.display.Javascript object>

This is only needed when we want to render. In many situations of this lab we will perform many steps without the need to render, we will avoid recording a video, since doing so requires a lot of RAM (to save all the RGB frames) and we may run out of memory.

To avoid this do not wrap the environment on a `RecordVideo` class and set keyword argument `render_mode=None` when making the environment.

In [105]:
env = gym.make("FrozenLake-v1", render_mode=None)
env.reset()

for _ in range(100):
    observation, reward, terminated, truncated, info = env.step(env.action_space.sample())

    done = terminated or truncated

    if done:
        env.reset()

  and should_run_async(code)


In this lab we will be estimating value functions. Value functions of an MDP are conditioned on the policy used.

A policy is the probability distribution over actions. We will restrict ourselves to discrete action spaces.

We will start by using a uniform policy that chooses with equal chances between all actions.

## Exercise : A uniform policy

Create a `policy_uniform` function that returns the probability of each possible action given an environment and state.

We will also create a `sample_multinomial` function that facilitates sampling an action from the policy given the probability of each possible action.

**Hint** you may access the number of discrete actions with `env.action_space.n`.

In [106]:
n_actions = env.action_space.n
print(n_actions)

4


1. Result analysis :
I checked how many actions the character can take in Frozen Lake. The character can take 4 different actions: up, down, right, and left.



In [107]:
class UniformPolicy(object):
    def __init__(self, action_space):
        assert isinstance(action_space, gym.spaces.discrete.Discrete), "Can only create uniform policies for Discrete action spaces"
        self.n_actions = action_space.n
        self.training = True

    def train(self):
        self.training = True

    def eval(self):
        self.training = False

    def probability(self, state, action):
        ### BEGIN SOLUTION
        action_prob = 1.0/self.n_actions

        ### END SOLUTION

        return action_prob

    def sample(self, state):
        ### BEGIN SOLUTION
        action = np.random.choice(self.n_actions)
        ### END SOLUTION

        return action

# Let's instantiate a uniform policy
env = RecordVideo(gym.make("FrozenLake-v1", render_mode="rgb_array"))
uniform_policy = UniformPolicy(env.action_space)

# And sample 20 actions from state==0
actions = [uniform_policy.sample(0) for i in range(20)]
print(actions)

[3, 3, 1, 1, 1, 3, 1, 1, 3, 0, 1, 1, 1, 3, 3, 0, 1, 3, 1, 2]


1. Result Analysis:
A list of 20 randomly selected actions has been returned.

2.  How the result was generated:
First, call the the total number of actions to caclutate an equal probability. It is calculated as 1 divided by the total number of actions. Using np.random.choice, one action was randomly selected. This process was repeated for the given number of samples, The result is a list of randomly chosen action.

Gym follows a formalism of MDPs that differs slightly (is more general) than the one we have seen in class. The differences are:
- In gym, transitions from a particular state and action can lead to the same destination state with different rewards (in the course all transitions reaching the same state obtain the same reward, except for transitions departing from a terminal state, whose rewards can be ignored)
- In gym, transitions to the same state can be both terminal and non-terminal (in the course it's the states that are terminal)

However we have chosen the environment `FrozenLake` which behaves as the MDPs described in the course, thus:
- All transitions to a given state have the same reward (unless the departure and destination state is the same terminal state)
- All transitions to a terminal state are terminal

In bym `env.env.P` contains the information about the process dynamics, rewards and terminal states. The information is encoded in the following way:

`env.env.P[state][action]` returns a list of tuples containing all the possible outcomes and their probability in the form `(prob, next_state, reward, terminal)` where `state` is the current state, `action` is the action taken, `probability` is the probability of the transition, `next_state` is the destination state of the transition, `reward` is the reward obtained for the transition and `terminal` is a boolean indicating if the transition ends the episode.

## Exercise : MDP from gym environment

Using this information compute the matrices $\mathcal{P}_{ss'}$, $\mathcal{R}_s$ and `terminal_states` of your MDP. Terminal is a boolean vector containing `True` for terminal states and `False` otherwise.

We will use the uniform policy that we created before, that assigns equal probability to all actions.

**Hint** all the transitions from a terminal state must be ignored.

**Optionally** while computing the matrices verify with `assert` that the conditions that we described above are fulfilled.

In [108]:
P = env.unwrapped.P

##Trial 1

In [109]:
def compute_P_and_R(env, policy):
    # Initialize the matrices P and R
    n_states = env.observation_space.n
    P = np.zeros((n_states, n_states))

    R = np.empty((n_states, 1))
    R[:] = np.nan

    # Initialize a vector for terminal states
    # we will initialize with nan
    terminal_states = np.empty(n_states)
    terminal_states[:] = np.nan
    #print(terminal_states)
    #print(R)
    #print(P)

    ### BEGIN SOLUTION
    # Iterate over env.env.P.items()
    # env.env.P is a dictionary that maps from
    # departure state to a dictionary of possible actions and subsequent transitions
    for state, actions in env.unwrapped.P.items():

      # Iterate over all possible actions with equal probability
      for action, transitions in actions.items():

            # Obtain the probability of taking that action
        action_prob = policy.probability(state,action)
             # Iterate over all transitions for this action
        for prob, next_state, reward, terminal in transitions:
          # If the next state is terminal flag it as such in terminal_states
            if terminal == True :
              terminal_states[next_state] = 1
              if next_state == state:
                P[state, next_state] += prob * action_prob
                pass
              else :
                R[next_state] = reward
                P[state, next_state] += prob * action_prob
     # If we are not departing from a terminal state

      # Add to P the probability of the transition as:
      # (probability of the action) * (probability of the transition)
      # Set the reward in R
            else:
              R[next_state] = reward
              P[state, next_state] += prob * action_prob
              terminal_states[next_state] = 0















    ### END SOLUTION

    # Convert the terminal states vector to boolean to use in indexing
    terminal_states = terminal_states.astype(bool)

    return P, R, terminal_states

env = RecordVideo(gym.make("FrozenLake-v1", render_mode="rgb_array"))
P, R, terminal_states = compute_P_and_R(env, uniform_policy)

print(P)
print(R)
print(terminal_states)

[[0.5  0.25 0.   0.   0.25 0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.  ]
 [0.25 0.25 0.25 0.   0.   0.25 0.   0.   0.   0.   0.   0.   0.   0.   0.   0.  ]
 [0.   0.25 0.25 0.25 0.   0.   0.25 0.   0.   0.   0.   0.   0.   0.   0.   0.  ]
 [0.   0.   0.25 0.5  0.   0.   0.   0.25 0.   0.   0.   0.   0.   0.   0.   0.  ]
 [0.25 0.   0.   0.   0.25 0.25 0.   0.   0.25 0.   0.   0.   0.   0.   0.   0.  ]
 [0.   0.   0.   0.   0.   1.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.  ]
 [0.   0.   0.25 0.   0.   0.25 0.   0.25 0.   0.   0.25 0.   0.   0.   0.   0.  ]
 [0.   0.   0.   0.   0.   0.   0.   1.   0.   0.   0.   0.   0.   0.   0.   0.  ]
 [0.   0.   0.   0.   0.25 0.   0.   0.   0.25 0.25 0.   0.   0.25 0.   0.   0.  ]
 [0.   0.   0.   0.   0.   0.25 0.   0.   0.25 0.   0.25 0.   0.   0.25 0.   0.  ]
 [0.   0.   0.   0.   0.   0.   0.25 0.   0.   0.25 0.   0.25 0.   0.   0.25 0.  ]
 [0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   1.   0.   0.   0.   0.  ]
 [0.

## Trial 2

In [110]:
def compute_P_and_R(env, policy):
    # Initialize the matrices P and R
    n_states = env.observation_space.n
    P = np.zeros((n_states, n_states))

    R = np.zeros((n_states, n_states))
    #print(R)
    R[:] = np.nan

    # Initialize a vector for terminal states
    # we will initialize with nan
    terminal_states = np.zeros(n_states)
    #terminal_states[:] = np.nan
    #print(terminal_states)
    #print(R)
    #print(P)

    ### BEGIN SOLUTION
    # Iterate over env.env.P.items()
    # env.env.P is a dictionary that maps from
    # departure state to a dictionary of possible actions and subsequent transitions
    for state, actions in env.unwrapped.P.items():

      # Iterate over all possible actions with equal probability
      for action, transitions in actions.items():

            # Obtain the probability of taking that action
        action_prob = policy.probability(state,action)
             # Iterate over all transitions for this action
        for prob, next_state, reward, terminal in transitions:
          # If the next state is terminal flag it as such in terminal_states
            if terminal == True and reward == 1:
              terminal_states[next_state] = 1
              R[state, next_state] = reward
              P[state, next_state] += prob * action_prob
              print( print(f"state : {state}, next_state : {next_state},Reward : {reward}, terminal = {terminal}"))
            else:
              R[state, next_state] = reward
              P[state, next_state] += prob * action_prob
              #terminal_states[next_state] = 0



     # If we are not departing from a terminal state

      # Add to P the probability of the transition as:
      # (probability of the action) * (probability of the transition)
      # Set the reward in R

      R[np.isnan(R)] = 0


    ### END SOLUTION

    # Convert the terminal states vector to boolean to use in indexing
    terminal_states = terminal_states.astype(bool)

    return P, R, terminal_states

env = RecordVideo(gym.make("FrozenLake-v1", render_mode="rgb_array"))
P, R, terminal_states = compute_P_and_R(env, uniform_policy)

print(P)
print(R)
print(terminal_states)

state : 14, next_state : 15,Reward : 1.0, terminal = True
None
state : 14, next_state : 15,Reward : 1.0, terminal = True
None
state : 14, next_state : 15,Reward : 1.0, terminal = True
None
[[0.5  0.25 0.   0.   0.25 0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.  ]
 [0.25 0.25 0.25 0.   0.   0.25 0.   0.   0.   0.   0.   0.   0.   0.   0.   0.  ]
 [0.   0.25 0.25 0.25 0.   0.   0.25 0.   0.   0.   0.   0.   0.   0.   0.   0.  ]
 [0.   0.   0.25 0.5  0.   0.   0.   0.25 0.   0.   0.   0.   0.   0.   0.   0.  ]
 [0.25 0.   0.   0.   0.25 0.25 0.   0.   0.25 0.   0.   0.   0.   0.   0.   0.  ]
 [0.   0.   0.   0.   0.   1.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.  ]
 [0.   0.   0.25 0.   0.   0.25 0.   0.25 0.   0.   0.25 0.   0.   0.   0.   0.  ]
 [0.   0.   0.   0.   0.   0.   0.   1.   0.   0.   0.   0.   0.   0.   0.   0.  ]
 [0.   0.   0.   0.   0.25 0.   0.   0.   0.25 0.25 0.   0.   0.25 0.   0.   0.  ]
 [0.   0.   0.   0.   0.   0.25 0.   0.   0.25 0.   0.25 0.   0.

## Trial 3

In [111]:
def compute_P_and_R(env, policy):
    # Initialize the matrices P and R
    n_states = env.observation_space.n
    P = np.zeros((n_states, n_states))

    R = np.zeros((n_states, n_states))

    R[:] = np.nan

    # Initialize a vector for terminal states
    # we will initialize with nan
    terminal_states = np.empty(n_states)
    terminal_states[:] = np.nan


    ### BEGIN SOLUTION
    # Iterate over env.env.P.items()
    # env.env.P is a dictionary that maps from
    # departure state to a dictionary of possible actions and subsequent transitions
    for state, actions in env.unwrapped.P.items():

      # Iterate over all possible actions with equal probability
      for action, transitions in actions.items():

            # Obtain the probability of taking that action
        action_prob = policy.probability(state,action)
             # Iterate over all transitions for this action
        for prob, next_state, reward, terminal in transitions:
          # If the next state is terminal flag it as such in terminal_states
            if terminal == True :
              terminal_states[next_state] = 1
              if next_state == state:
                P[state, next_state] += prob * action_prob
                pass
              else :
                R[state, next_state] = reward
                P[state, next_state] += prob * action_prob
                #print( print(f"state : {state}, next_state : {next_state},Reward : {reward}, terminal = {terminal}"))

     # If we are not departing from a terminal state

      # Add to P the probability of the transition as:
      # (probability of the action) * (probability of the transition)
      # Set the reward in R
            else:
              R[state, next_state] = reward
              P[state, next_state] += prob * action_prob
              terminal_states[next_state] = 0
      R[np.isnan(R)] = 0















    ### END SOLUTION

    # Convert the terminal states vector to boolean to use in indexing
    terminal_states = terminal_states.astype(bool)

    return P, R, terminal_states

env = RecordVideo(gym.make("FrozenLake-v1", render_mode="rgb_array"))
P, R, terminal_states = compute_P_and_R(env, uniform_policy)

print(P)
print(R)
print(terminal_states)

[[0.5  0.25 0.   0.   0.25 0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.  ]
 [0.25 0.25 0.25 0.   0.   0.25 0.   0.   0.   0.   0.   0.   0.   0.   0.   0.  ]
 [0.   0.25 0.25 0.25 0.   0.   0.25 0.   0.   0.   0.   0.   0.   0.   0.   0.  ]
 [0.   0.   0.25 0.5  0.   0.   0.   0.25 0.   0.   0.   0.   0.   0.   0.   0.  ]
 [0.25 0.   0.   0.   0.25 0.25 0.   0.   0.25 0.   0.   0.   0.   0.   0.   0.  ]
 [0.   0.   0.   0.   0.   1.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.  ]
 [0.   0.   0.25 0.   0.   0.25 0.   0.25 0.   0.   0.25 0.   0.   0.   0.   0.  ]
 [0.   0.   0.   0.   0.   0.   0.   1.   0.   0.   0.   0.   0.   0.   0.   0.  ]
 [0.   0.   0.   0.   0.25 0.   0.   0.   0.25 0.25 0.   0.   0.25 0.   0.   0.  ]
 [0.   0.   0.   0.   0.   0.25 0.   0.   0.25 0.   0.25 0.   0.   0.25 0.   0.  ]
 [0.   0.   0.   0.   0.   0.   0.25 0.   0.   0.25 0.   0.25 0.   0.   0.25 0.  ]
 [0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   1.   0.   0.   0.   0.  ]
 [0.

The Compute R and P function went through three revisions:

1. First Code
Result: Printed the R and P matrices and the terminal states.
Process: When I first wrote this function, I spent a lot of time thinking about the conditions for terminal states. I decided that a terminal state must be True, and I also needed to exclude cases where the next state is the same as the current state. For example, when reaching the goal state (state 15), the reward becomes 1, and the terminal state is set to True. However I have to take into account the state where 15 state again from 15 state with a reward of 0 would not update the reward matrix. I wrote a condition to handle this, but then I realized that for Bellman equations to work, the reward must be a matrix, not a vector. So, I decided to expand the size of the R matrix and adjust the condition.
2. Second Code
Result: Returned the R and P matrices, but the terminal states were only updated for the final destination.
Process: I expanded the size of the R matrix and simplified the condition. I thought it would be enough to only check for states with a reward of 1. So, I updated the terminal states and the R and P matrices only when the terminal state was True and the reward was 1. However, this caused a problem: using and meant both conditions had to be satisfied at the same time, leading to errors in updating the terminal states. I realized the condition needed to be like the first version to handle this correctly.
3. Third Code
Result: The R and P matrices were calculated correctly, and the terminal states were updated every time the game ended.
Conclusion: I found that the third version is the best option because it resolves the issues from the previous versions.

## Exercise : Sample an episode

Sample an episode with the uniform policy and return the lists of observations (states), actions, rewards and the done flags.

Notice that the list of states should have one more element (the initial state).

Test the function by rendering an episode. We will use `RecordVideo` to do that.

In [112]:
def sample_episode(env, policy, reset=True):
    states = []
    actions = []
    rewards = []
    dones = []


    ### BEGIN SOLUTION

    # If reset, we reset the environment and get an initial state
    # else we set the initial state to it's current state env.env.s
    if reset == True :
      state,_ = env.reset()
    else :
      state = env.env.s

    # Collect the initial state
    states.append(state)
    done = False


    # While the episode has not finished
     # Select an action
    while not done :
      action = policy.sample(state)
      ##result = env.step(action)
      ##print(result)
      # Step the environment
      next_state, reward, done, info, prob = env.step(action)
      states.append(next_state)
      actions.append(action)
      rewards.append(reward)
      dones.append(done)

      # The episode is done if it has been terminated or truncated
      if done:
        break
      else :
        # Collect the state, reward and action taken
        state = next_state







    ### END SOLUTION

    return states, actions, rewards, dones

env = RecordVideo(gym.make("FrozenLake-v1", render_mode="rgb_array"))
uniform_policy = UniformPolicy(env.action_space)
states, actions, rewards, dones = sample_episode(env, uniform_policy)
print(len(states), len(actions), len(rewards), len(dones))
print(f'states : {states} \n actions : {actions},\n rewards : {rewards},\n dones : {dones}')

display(env.video())

<IPython.core.display.Javascript object>

9 8 8 8
states : [0, 4, 8, 9, 13, 13, 14, 14, 15] 
 actions : [1, 0, 1, 1, 1, 1, 1, 2],
 rewards : [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0],
 dones : [False, False, False, False, False, False, False, True]


1. Result Analysis:
In this code, the character takes random actions until it either reaches the goal or falls into a hole. The code records the next states, actions taken, rewards received, and whether the state is terminal (done). It then returns these as a list. From the result, you can see all the actions, rewards, terminal states, and states that occurred before done became True.

2. Difficulty:
The challenging part was understanding what env.step(action) returns and figuring out how to use that information in the code.

## Exercise : Returns of episode

For a sampled episode compute the return $G_t$ for all steps $t$.

**Hint** the return is easily computed backwards from the last step in the episode to the first.

In [113]:
def compute_returns(rewards, gamma):
    returns = np.zeros(len(rewards))
    ### BEGIN SOLUTION
    Gt = 0
    # Iterate over the rewards backward computing each return
    # using the previous return computed
    for i in reversed(range(len(rewards))):
        Gt = gamma * Gt+rewards[i]
        returns[i]=Gt


        #gt = gamma * Gt+rewards[i]
        #returns[i]=gt
        #Gt = gt




    ### END SOLUTION

    return returns

env = gym.make("FrozenLake-v1", render_mode="rgb_array")
uniform_policy = UniformPolicy(env.action_space)

# Now we will run until one of them has a positive reward
while True:
    states, actions, rewards, dones = sample_episode(env, uniform_policy)
    #print(len(states))
    if np.sum(rewards) > 0:
        # Print the rewards
        print(rewards)

        # Compute and print the returns
        gamma = 0.9
        returns = compute_returns(rewards, gamma)
        print(returns)
        print(len(returns))

        # Exit the loop
        break

[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0]
[0.3138106  0.34867844 0.38742049 0.43046721 0.4782969  0.531441   0.59049    0.6561     0.729
 0.81       0.9        1.        ]
12


1. Result Analysis:
The code accumulates and stores GT based on the size of the rewards, then returns it. I found that the maximum value of the accumulated GT is 1, and the size of GT matches the size of the return values.

2. Difficulty:
The main challenge was figuring out how to accumulate GT. At first, I tried to follow the formula GT = RT + γRT+1 + γ^2RT+2 + … = ∑ γ^k RT+k from the slides. However, when the rewards list became long, it caused issues. To fix this, I changed the method to use the previous GT value and update it step by step. This made the code much simpler and easier to understand.

# Policy evaluation

In this section we will implement various value function estimation methods:
- Direct method using the previous P and R
- Naive (cheating) method that assumes possibility of setting an arbitrary initial state
- Monte Carlo methods
- Time-difference method

## Exercise : Direct method

Solve the Bellman equation, using the direct solution (matrix inversion).

**Note** that this method leads to miss-leading values for terminal states, since there is no way to indicate terminality using the Bellman equation without sampling.

In [114]:
def value_direct(env, gamma, policy):
    P, R, terminal_states = compute_P_and_R(env, policy)

    ### BEGIN SOLUTION



    I = np.eye(P.shape[0])
    ones = np.ones((P.shape[0],1))
    # Following 3 codes are important for me to understand the process of caculation
    #print(f'R*P = {(R*P)}')
    #print(f'inv = {(np.linalg.inv(I-gamma*P)).shape}')
    #print(f'R*P*ones = {R*P@ones}')
    v = (np.linalg.inv(I-gamma*P))@((R*P)@ones)


    ### END SOLUTION

    # Unsqueeze the extra dimension
    v = v[:,0]

    return v

env = gym.make("FrozenLake-v1", render_mode="rgb_array")
uniform_policy = UniformPolicy(env.action_space)
gamma = 0.9
v_direct = value_direct(env, gamma, uniform_policy)
print(v_direct)

[ 4.47726069e-03  4.22245661e-03  1.00667565e-02  4.11821857e-03  6.72195841e-03  1.48652148e-16
  2.63337084e-02  0.00000000e+00  1.86761516e-02  5.76070083e-02  1.06971947e-01  0.00000000e+00
 -5.69561934e-17  1.30383049e-01  3.91490160e-01  0.00000000e+00]


1. Result Analysis:
The updated Bellman equation returns 16 calculated V values. V shows the expected future rewards for each state under the current policy. One important value is at index 14, which is 3.91e-01. This means that this state is a good one for getting rewards with the current policy.

2. Difficulty:
I had a hard time understanding the formula in the updated Bellman equation. While doing the element-wise and matrix multiplications, I found out that my P matrix was wrong. I had to rewrite the code to compute the P matrix correctly. Also, to avoid errors in the sizes of matrices and vectors during multiplication, I kept printing the values to check everything was correct.

## (Optional) Exercise : Verify the solution

Check if the computed value function $V$ fulfills the Bellman equation. You may use `np.allclose(a, b)` to check if all elements in `a` and `b` are close up to a numerical error.

In [115]:
### BEGIN SOLUTION
def verify_solution(env, gamma, policy, v_direct):
    P, R, terminal_states = compute_P_and_R(env, policy)


    rhs = R + gamma * P @ v_direct
    return np.allclose(v_direct, rhs,atol=1e-6)

# Verify the solution
valid = verify_solution(env, gamma, uniform_policy, v_direct)
print(valid)
### END SOLUTION

False


1. Result Analysis:
The result being False means that the left side and the right side are not numerically the same. In other words, the calculated v_direct does not satisfy the Bellman equation.

2. A step I've taken to Solve the Problem:
Ithought that this issue might be due to numerical errors. So I found some parameter which is used for the np.allclose. I used this parameter to adjust the default tolerance to fix this issue. However I couldn't fix this problem. I think the main reason of this issue is that I got the wrong P, R.

For the following we must set the value function of terminal states to 0 in order to keep our value function comparable to those computed by methods using episode sampling.

In [116]:
# To compare to the following methods we must set the value of terminal states to 0
v_direct[terminal_states] = 0
print(v_direct)

[0.00447726 0.00422246 0.01006676 0.00411822 0.00672196 0.         0.02633371 0.         0.01867615
 0.05760701 0.10697195 0.         0.         0.13038305 0.39149016 0.        ]


## Exercise : Naive (cheating) method

Compute the value function for each state by cheating (changing the starting state and computing the average return from the start).

In [117]:
def value_naive(env, gamma, policy, n_episodes):
    ### BEGIN SOLUTION

    # Initialize values and counts tables (one cell per state)
    n_states = env.observation_space.n
    value = np.zeros(env.observation_space.n)
    counts = np.zeros(env.observation_space.n)



    # Compute the number of episodes per state
    episode_state = n_episodes // n_states



    # For each initial state
    for state in range(n_states):
      total_return = 0
        # For each episode
      for episode in range(episode_state):
            # Reset the environment
            env.reset()

            # Set the initial state
            env.env.s = state


            # Sample an episode
            states, actions, rewards, dones = sample_episode(env, policy, reset = False)
            # (without reseting the environment to avoid changing the initial state)
            # Compute the returns
            returns = compute_returns(rewards, gamma)

            # Accumulate the return of the initial state
            total_return += returns[0]



        # Divide the accumulated value by the number of episodes
      value[state] = total_return / episode_state

    ### END SOLUTION

    return value

def compute_value_error(v_est, v_ref):
    diff = (v_est - v_ref)
    return np.mean(diff), np.std(diff)

env = gym.make("FrozenLake-v1", render_mode=None)
uniform_policy = UniformPolicy(env.action_space)
gamma = 0.9
n_episodes = 10000
v_naive = value_naive(env, gamma, uniform_policy, n_episodes)
print(v_naive)
print(compute_value_error(v_naive, v_direct))

[0.0040618  0.00322118 0.00558166 0.00251499 0.00408223 0.00493281 0.00428878 0.00791307 0.00279624
 0.0050108  0.00188698 0.0028747  0.00482614 0.00468057 0.00309714 0.00479684]
(-0.04340642164795916, 0.09721885125246314)


1. Result Analysis:
These values show the expected future rewards for each state under a uniform policy. I found that relatively high values (0.00719582, 0.00718748) are more likely to lead to rewards. On the other hand, low values (0.00208833, 0.00233913) are hard to get in those states. During the analysis, I found that v_naive was lower on average than v_direct. Also, the standard deviation was relatively large. It suggests that the differences between the two values vary a lot across states.

2. Difficulty:
I spent a lot of time understanding the definition of the value. The hardest part was realizing that the value must be calculated independently for each state. Because of this, I had to think carefully about where to initialize value[state] = total_return / episode_state and where to reset total_return = 0. I also thought a lot about why I needed to use returns[0] in total_return += returns[0].

## Exercise : Monte Carlo first visit method
Compute the value function for each state using the Monte carlo method "first visit"

In [118]:
def value_montecarlo_first(env, gamma, policy, n_episodes):
    ### BEGIN SOLUTION

    # Initialize values and counts tables (one cell per state)
    n_states = env.observation_space.n
    values = np.zeros(n_states)
    counts = np.zeros(n_states)
    print(values)

    # For each episode
    for episode in range(n_episodes):

        # Sample an episode and compute returns
        states, actions, rewards, dones = sample_episode(env, policy)
        returns = compute_returns(rewards, gamma)

        # Keep track of visited states
        visited_states = set()
        #print(states)


        # For each state and associated return
        for t,state in enumerate(states[:-1]):
          #print(state)
          #print(t)
          #print(f'returns : {returns}')



            # If first visit

          if state not in visited_states:


                # Increment counts
              counts[state] += 1


                # Accumulate returns
              values[state] += returns[t]


                # Update the set of visited states
              visited_states.add(state)


    # Average the accumulated returns
    print(values)
    print(counts)
    value = values / counts
    value = np.nan_to_num(value)


    ### END SOLUTION

    return value

env = gym.make("FrozenLake-v1", render_mode=None)
uniform_policy = UniformPolicy(env.action_space)
gamma = 0.9
n_episodes = 10000
v_mc_firstvisit = value_montecarlo_first(env, gamma, uniform_policy, n_episodes)
print(v_mc_firstvisit)
print(compute_value_error(v_mc_firstvisit, v_direct))

[0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
[ 48.39268326  30.12202279  32.77146891   4.84369261  40.9400205    0.          42.37364907
   0.          46.65006961  62.10322698  81.67711331   0.           0.          52.21204534
 128.26928634   0.        ]
[10000.  6240.  2628.  1170.  6185.     0.  1324.     0.  2590.  1135.   722.     0.     0.   418.
   324.     0.]
[0.00483927 0.00482725 0.01247012 0.00413991 0.00661924 0.         0.03200427 0.         0.01801161
 0.0547165  0.1131262  0.         0.         0.1249092  0.39589286 0.        ]
(0.0006554837293308187, 0.002823814924117027)


  value = values / counts


1. Data Analysis : The highest value shows the highest expected reward. The relatively low values mean that it is hard to get rewards or future rewards are small in these states. 0 means either not visited or had no chance to gain rewards.

2. Difficulty : I spent a lot of time figuring out how to check if a state was visited for the first time. I used the set() function to track visited states. If a state was not already in the visited states, I added it to the set, increased the count by 1, and added its value to the total value. Aside from coding the process to check for duplicate states, everything else went smoothly.

## Exercise : Monte Carlo every visit method
Compute the value function for each state using the Monte carlo method "every visit"

In [119]:
def value_montecarlo_every(env, gamma, policy, n_episodes):
    ### BEGIN SOLUTION
    # Initialize values and counts tables (one cell per state)
    n_states = env.observation_space.n
    values = np.zeros(n_states)
    counts = np.zeros(n_states)



  # For each episode
    for episode in range(n_episodes):

        # Sample an episode and compute returns
        states, actions, rewards, dones = sample_episode(env, policy)
        returns = compute_returns(rewards, gamma)



        # For each state and associated return
        for t,state in enumerate(states[:-1]):


            # Increment counts
            counts[state] += 1


            # Accumulate returns
            values[state] += returns[t]


    # Average the accumulated returns
    value = values / counts
    value = np.nan_to_num(value)


    ### END SOLUTION

    return value

env = gym.make("FrozenLake-v1", render_mode=None)
uniform_policy = UniformPolicy(env.action_space)
gamma = 0.9
n_episodes = 10000
v_mc_everyvisit = value_montecarlo_every(env, gamma, uniform_policy, n_episodes)
print(v_mc_everyvisit)
print(compute_value_error(v_mc_everyvisit, v_direct))

[0.00561793 0.0056431  0.01313683 0.00570777 0.00840486 0.         0.02991603 0.         0.02015839
 0.05946855 0.10277144 0.         0.         0.16786486 0.42272276 0.        ]
(0.005021490917846719, 0.011267095942420155)


  value = values / counts


1. Result Analysis : All state visits were recorded. The average value for each state was calculated. The Monte Carlo Every-Visit method gave results that were mostly similar to the reference value (v_direct). This makes it a suitable approach for policy evaluation.
2. The method was relatively easy to implement since it only required storing the return values for all states and dividing them by the total count.

## Exercise : Monte Carlo incremental

Implement the incremental Monte Carlo method.

In [120]:
def value_montecarlo_incremental(env, gamma, policy, n_episodes):
    ### BEGIN SOLUTION
    # Initialize values and counts tables (one cell per state)
    n_states = env.observation_space.n
    values = np.zeros(n_states)
    counts = np.zeros(n_states)



    # For each episode
    for episode in range(n_episodes):

        # Sample an episode and compute returns
        states, actions, rewards, dones = sample_episode(env, policy)
        returns = compute_returns(rewards, gamma)

        # For each state and associated return
        for t,state in enumerate(states[:-1]):

            # Accumulate returns
            values[state] += returns[t]

            # Increment counts
            counts[state] += 1


            # Update value with return
            values[state] += returns[t]-values[state]/counts[state]


    print(values)
    print(counts)
    value = values / counts
    value = np.nan_to_num(value)









    ### END SOLUTION
    return value

env = gym.make("FrozenLake-v1", render_mode=None)
uniform_policy = UniformPolicy(env.action_space)
gamma = 0.9
n_episodes = 10000
v_mc_incremental = value_montecarlo_incremental(env, gamma, uniform_policy, n_episodes)
print(v_mc_incremental)
print(compute_value_error(v_mc_incremental, v_direct))

[157.15392923  69.68303715  62.01526283   7.97767038  82.35613243   0.          46.51352036
   0.          92.26551329 102.14011042 120.4063282    0.           0.          89.52118862
 216.49093782   0.        ]
[32549. 12861.  5944.  2901. 12508.     0.  1677.     0.  4624.  1595.   974.     0.     0.   695.
   514.     0.]
[0.00482823 0.00541817 0.01043325 0.00274997 0.00658428 0.         0.02773615 0.         0.01995361
 0.06403769 0.12362046 0.         0.         0.12880747 0.42118859 0.        ]
(0.0033930744071072392, 0.008007714473321053)


  value = values / counts


1. Result Analysis  : The values of Monte Carlo Incremental method were lower on average compared to the reference values (v_direct). The standard deviation has some differences between states, but overall, the values were quite close to the v_direct. An interesting point was that v_mc_incremental represents the average future reward expected from each state.

## Exercise : Time-differences (TD) method

In [121]:
def value_td(env, gamma, policy, n_episodes, alpha=0.4):
    ### BEGIN SOLUTION
    # Initialize value table (one cell per state)
    n_states = env.observation_space.n
    value = np.zeros(n_states)



    # For each episode
    for episode in range(n_episodes):

        # Sample an episode
        states, actions, rewards, dones = sample_episode(env, policy)

        # For each step in the episode
        for t in range(len(states)-1):


            # Update the value of the depart state with the current value, the value of the next state and the reward
            value[states[t]] += alpha*(rewards[t]+gamma*value[states[t+1]]-value[states[t]])



    ### END SOLUTION
    return value

env = gym.make("FrozenLake-v1", render_mode=None)
uniform_policy = UniformPolicy(env.action_space)
gamma = 0.9
n_episodes = 10000
alpha = 0.4
v_td = value_td(env, gamma, uniform_policy, n_episodes, alpha=alpha)
print(v_td)
print(compute_value_error(v_td, v_direct))

[5.86457414e-03 8.19835860e-04 9.34636427e-03 4.66109129e-05 9.35457416e-03 0.00000000e+00
 3.20444354e-02 0.00000000e+00 2.51268559e-02 2.84959846e-02 4.84899753e-02 0.00000000e+00
 0.00000000e+00 5.05433252e-02 6.18126170e-01 0.00000000e+00]
(0.004199376911564226, 0.06216683857827214)


1. Result Analysis : The mean error is 0.0006117, which is very low, but the standard deviation is 0.024436, which is relatively high. This shows that TD learns quickly, but it may not always provide stable results.

## (Optional) Exercise : Comparison TD to Monte Carlo incremental

How do these to methods compare? Explain the relative advantages and disadvantages.

- Temporal Difference (TD) and Monte Carlo Incremental have different way to estimate the state values.  TD updates values after each step. In my opinion, It is faster and works well for continuous tasks. Monte Carlo Incremental updates values only after the episode ends. Therefore it’s more accurate, but it takes longer. In the results, TD is faster and has high-reward states like state 14 better. Monte Carlo Incremental gave more steady results with less variation.

## Exercise : Action-value
Compute the action value function using one or more of the following methods:
- Naive (cheating)
- MC "first visit"
- MC "every visit"
- MC incremental
- TD

## Trial 1

In [122]:
def actionvalue_montecarlo_every_1(env, gamma, policy, n_episodes):
    ### BEGIN SOLUTION
    # Initialize values and counts tables (one cell per state)
    n_states = env.observation_space.n
    n_actions = env.action_space.n
    probability = 1/n_actions
    q = np.zeros((n_states,n_actions))

    counts = np.zeros((n_states,n_actions))


  # For each episode



        # Sample an episode and compute return
        # For each state and associated return

    for iteration in range(n_episodes):
        new_q = np.zeros_like(q)
        for state in range(n_states):
            for action in range(n_actions):
                for prob, next_state, reward, done in env.unwrapped.P[state][action]:
                    value = 0
                    for next_action in range(n_actions):
                        action_prob = policy.probability(next_state, next_action)
                        value += action_prob * q[next_state, next_action]
                    new_q[state, action] += prob * (reward + gamma * value if not done else reward)
        q = new_q

    return q





              # Accumulate returns



    # Average the accumulated returns
    q = q / counts
    q = np.nan_to_num(q)


    ### END SOLUTION

    return q

env = gym.make("FrozenLake-v1", render_mode="rgb_array")
uniform_policy = UniformPolicy(env.action_space)
gamma = 0.9
n_episodes = 10000

q_mc_everyvisit_1 = actionvalue_montecarlo_every_1(env, gamma, uniform_policy, n_episodes)
print(q_mc_everyvisit_1)

[[0.00470294 0.0046265  0.0046265  0.00395309]
 [0.00260992 0.00436321 0.00428676 0.00562994]
 [0.01218688 0.01040232 0.01215561 0.00552223]
 [0.00425549 0.00425549 0.00247093 0.00549096]
 [0.00896261 0.00761943 0.00694602 0.00335977]
 [0.         0.         0.         0.        ]
 [0.03511161 0.03209158 0.03511161 0.00302003]
 [0.         0.         0.         0.        ]
 [0.00761943 0.02288495 0.01929869 0.02490154]
 [0.04471776 0.07680934 0.0712065  0.03769443]
 [0.14262926 0.13472915 0.12534716 0.02518221]
 [0.         0.         0.         0.        ]
 [0.         0.         0.         0.        ]
 [0.05639702 0.15656196 0.17384407 0.13472915]
 [0.18865355 0.4898953  0.48287197 0.40453983]
 [0.         0.         0.         0.        ]]


## Trial 2

In [123]:
def actionvalue_montecarlo_every(env, gamma, policy, n_episodes):

    # Initialize Q-values and counts tables (one cell per state-action pair)
    n_states = env.observation_space.n
    n_actions = env.action_space.n
    q_values = np.zeros((n_states, n_actions))
    counts = np.zeros((n_states, n_actions))

    # For each episode
    for episode in range(n_episodes):
        states, actions, rewards, dones = sample_episode(env, policy)
        returns = compute_returns(rewards, gamma)
        for state, action, G in zip(states, actions, returns):
            q_values[state, action] += G
            counts[state, action] += 1


    q_values = q_values / counts
    q_values = np.nan_to_num(q_values)

    return q_values

env = gym.make("FrozenLake-v1", render_mode=None)
uniform_policy = UniformPolicy(env.action_space)
gamma = 0.9
n_episodes = 10000


q_mc_everyvisit = actionvalue_montecarlo_every(env, gamma, uniform_policy, n_episodes)
print(q_mc_everyvisit)


[[0.00480302 0.00469783 0.00470671 0.00379043]
 [0.00128767 0.00316022 0.0034541  0.00477825]
 [0.00938819 0.01174018 0.00994388 0.00385717]
 [0.00571447 0.00809424 0.00410284 0.00701745]
 [0.00942243 0.00656343 0.00738106 0.00340263]
 [0.         0.         0.         0.        ]
 [0.03456634 0.03673459 0.02262222 0.00230464]
 [0.         0.         0.         0.        ]
 [0.00805583 0.02058496 0.01794482 0.02439485]
 [0.04478593 0.07105317 0.0607203  0.04657233]
 [0.1350751  0.12101926 0.10873685 0.02089071]
 [0.         0.         0.         0.        ]
 [0.         0.         0.         0.        ]
 [0.05904663 0.14158307 0.15072348 0.17046012]
 [0.19018694 0.43687725 0.42287579 0.45370744]
 [0.         0.         0.         0.        ]]


  q_values = q_values / counts


1. Result Analysis : High Q-values (0.15 or above) show actions that are likely to lead to rewards. For example, in state 14, moving down or right gives the best chances for rewards. In state 13, moving right or down also offers good opportunities. Low Q-values (below 0.005) indicate less useful actions. For instance, in state 0, up or left is unlikely to gain rewards. Some states, like 5, 7, 11, 12, and 15, have Q-values of 0. It means they were not visited or do not lead to rewards.

2. Difficulty : I spent a lot of time trying to figure out how to calculate action values. At first, I didn’t fully understand how to include actions in value calculations, so I tried different approaches. Eventually, I came up with two methods.

- The first method followed the action Bellman equation directly. I used prob, next_state, reward, and done to update new_q with the formula: new_q[state, action] += prob * (reward + gamma * value).
- The second method was developed because the first method perform so well in policy evaluation that I was unsure about the result. For this method, I adapted the approach from the previous montecarlo_every function but added logic to account for actions. In conclusion, second method showed a large variation between values that are likely to lead to rewards and those that are not.

# Policy improvement

In this section we are going to improve upon the uniform policy, which selects actions at random, independently on the state.

To assess whether our learned policies work, we will start by implmenting a scoring function to evaluate the policies.

## Exercise : Evaluate performance of a policy

In this environment we define a goal as obtaining a final reward greater than 0, thus reaching the target state, since it is the only one with a non-zero reward.

For a given policy, compute:
- the average number of episodes where the goal was reached
- the average number of steps to reach the goal


In [124]:
def score_policy(env, gamma, policy, n_episodes):
    episodes_to_goal = 0
    steps_to_goal = []

    ### BEGIN SOLUTION
    for episode in range(n_episodes):

        states, actions, rewards, dones  = sample_episode(env, policy, reset=True)

        if 1 in rewards :
          episodes_to_goal += 1
          steps_to_goal.append(rewards.index(1)+1)



    ### END SOLUTION

    return episodes_to_goal/n_episodes, np.mean(steps_to_goal)
gamma = 0.9
n_episodes = 1000
env = gym.make("FrozenLake-v1", render_mode=None)
s = score_policy(env, gamma, uniform_policy, n_episodes)



Result Analysis : If the reward is 1, it means the goal was reached. In this case, the function increases episodes_to_goal and adds the number of steps it took to reach the goal to steps_to_goal. Once all episodes are completed, the function returns two results: the ratio of episodes where the goal was reached (episodes_to_goal/n_episodes) and the average number of steps it took to reach the goal (np.mean(steps_to_goal)).

## Exercise : Greedy policy (policy improvement)

Create a greedy policy from an action value function and evaluate it's performance.

The greedy policy selects the action leading to the largest value in the current state.

**Note** that when several actions have the same maximal value, it is best to randomly pick one of them.

Pick the first highest value action or (optionally) pick randomly amongst actions with the highest value.

In [125]:
class GreedyPolicy(UniformPolicy):
    def __init__(self, action_space, q):
        super().__init__(action_space)

        # q is the action-value table
        self.q = q

    def _max_value_action(self, state):
        ### BEGIN SOLUTION

        action_1 = np.max(self.q[state])
        action_2 = np.where(self.q[state] == action_1)
        action = np.random.choice(action_2[0])

        ### END SOLUTION
        return action

    def probability(self, state, action):
        # Select the highest value action
        action_max = self._max_value_action(state)

        # Return a probability of 1 for the selected action 0 otherwise
        action_prob = float(action == action_max)

        return action_prob

    def sample(self, state):
        # Select the highest value action
        action_max = self._max_value_action(state)

        return action_max

env = gym.make("FrozenLake-v1", render_mode=None)
greedy_policy = GreedyPolicy(env.action_space, q_mc_everyvisit)


## Exercise : Compare the performance of policies

Report the performance of the uniform policy and the greedy policy

In [126]:
env = gym.make("FrozenLake-v1", render_mode=None)

### BEGIN SOLUTION


gamma = 0.9
n_episodes = 10000
uniform_policy = UniformPolicy(env.action_space)
q_action_value = actionvalue_montecarlo_every(env, gamma, uniform_policy, n_episodes)
greedy_policy = GreedyPolicy(env.action_space,q_action_value)

ave_e_1, ave_s_1 = score_policy(env, gamma, uniform_policy, n_episodes)
ave_e_2, ave_s_2 = score_policy(env, gamma, greedy_policy, n_episodes)
print(f'Uniform Policy: {ave_e_1*100:.2f}%/{ave_s_1:.2f}')
print(f'Greedy Policy: {ave_e_2*100:.2f}%/{ave_s_2:.2f}')





### END SOLUTION

  q_values = q_values / counts


Uniform Policy: 1.20%/13.57
Greedy Policy: 7.65%/13.87


Result Analysis :
-  Uniform Policy and Greedy Policy show very different results. Uniform Policy picks actions randomly, so the goal is reached only 1.25% of the time. On average, the episode ends in 12.18 steps, often because it fails to reach the goal.

- Greedy Policy chooses actions based on Q-values, focusing on the ones that can lead to rewards. This improves the goal reach rate to 21.00%. It takes an average of 20.19 steps. It shows that the policy works harder to find rewarding paths.

In summary, Greedy Policy is much better than Uniform Policy. It is more effective at reaching the goal by using Q-values to guide its choices.

In [127]:
env = gym.make("FrozenLake-v1", render_mode=None)

### BEGIN SOLUTION


gamma = 0.9
n_episodes = 10000
uniform_policy = UniformPolicy(env.action_space)
q_action_value = actionvalue_montecarlo_every_1(env, gamma, uniform_policy, n_episodes)
greedy_policy = GreedyPolicy(env.action_space,q_action_value)

ave_e_1, ave_s_1 = score_policy(env, gamma, uniform_policy, n_episodes)
ave_e_2, ave_s_2 = score_policy(env, gamma, greedy_policy, n_episodes)
print(f'Uniform Policy: {ave_e_1*100:.2f}%/{ave_s_1:.2f}')
print(f'Greedy Policy: {ave_e_2*100:.2f}%/{ave_s_2:.2f}')





### END SOLUTION

Uniform Policy: 1.50%/13.23
Greedy Policy: 77.73%/43.35


1. Result Analysis: This was the first action-value method I developed. As you can see, the policy score is much higher than that of the Uniform Policy. While comparing my results with the given results in the original lab, I noticed a large difference between them. This led me to develop another method to estimate Q.

# Policy learning

In this section we will start learning policies.

We will then implement the following policy learning methods:
- policy iteration
- SARSA
- Q-Learning

## Exercise : Policy iteration
By alternating between policy evaluation and policy improvement, find an optimal policy.

Print the scores of each intermediate policy and comment on how the metrics evolve.

In [128]:
def policy_learn_iteration(env, initial_policy, policy_evaluation_function,
                           n_episodes_value, n_episodes_score, n_iterations):

    gamma = 0.9
    initial_q = None


    ### BEGIN SOLUTION

    # Score and print the initial policy
    ave_e_1, ave_s_1 = score_policy(env, gamma, initial_policy, n_episodes_score)
    print(f'initial policy: {ave_e_1*100:.2f}%/{ave_s_1:.2f}')

        # Policy evaluation

        # Keep track of the action-value function change
        # (sum of absolute difference between the previous
        #  and next action-value funcitons)
        # Note that at the first step we will report a change of nan
        # since we still don't have a previous action-value function
    for n in range(n_iterations) :
        q_1 = policy_evaluation_function(env, gamma, initial_policy, n_episodes_value)
        greedy_policy = GreedyPolicy(env.action_space, q_1)
        initial_policy = greedy_policy

        if initial_q is not None :
          v_c = np.sum(np.abs(q_1-initial_q))
        else :
          v_c = float('nan')
        initial_q = q_1
        ave_e_2, ave_s_2 = score_policy(env, gamma, initial_policy, n_episodes_score)
        print(f'Policy (iter : {n}) : {ave_e_2*100:.2f}%/{ave_s_2:.2f}, Value change : {v_c:.2f} ')










        # Policy improvement
        # Score and print the new policy
        # Policy scoring





    ### END SOLUTION

env = gym.make("FrozenLake-v1", render_mode=None)
uniform_policy = UniformPolicy(env.action_space)

policy_learn_iteration(env, uniform_policy, actionvalue_montecarlo_every,
                       n_episodes_value=100, n_episodes_score=1000,
                       n_iterations=30)

initial policy: 1.50%/13.13


  q_values = q_values / counts


Policy (iter : 0) : 1.50%/11.27, Value change : nan 
Policy (iter : 1) : 10.30%/14.71, Value change : 4.50 
Policy (iter : 2) : 11.10%/16.15, Value change : 3.19 
Policy (iter : 3) : 10.90%/15.06, Value change : 0.15 
Policy (iter : 4) : 9.90%/16.45, Value change : 0.20 
Policy (iter : 5) : 10.00%/14.41, Value change : 0.32 
Policy (iter : 6) : 10.40%/16.38, Value change : 0.32 
Policy (iter : 7) : 11.40%/15.99, Value change : 0.19 
Policy (iter : 8) : 11.70%/16.64, Value change : 0.37 
Policy (iter : 9) : 8.90%/15.46, Value change : 0.66 
Policy (iter : 10) : 8.80%/15.28, Value change : 0.65 
Policy (iter : 11) : 11.30%/15.38, Value change : 0.60 
Policy (iter : 12) : 10.00%/14.65, Value change : 0.53 
Policy (iter : 13) : 9.00%/15.84, Value change : 0.21 
Policy (iter : 14) : 7.70%/16.91, Value change : 0.90 
Policy (iter : 15) : 7.80%/13.58, Value change : 1.50 
Policy (iter : 16) : 7.50%/13.12, Value change : 2.32 
Policy (iter : 17) : 10.40%/14.97, Value change : 1.28 
Policy (ite

1. Result Analysis : Through repeated policy learning, the performance improved greatly compared to the initial Uniform Policy. For example, the goal of the beginning reach rate was only 1.40%, but after 29 iterations, it increased to 15.70%. The average steps also went from 13.21 to 33.81. It shows that the policy started choosing better paths to reach the goal. In the early iterations, there were large changes in the values, but at the end, the policy became more stable. This result shows that policy learn iteration is effective for improving policy performance.

## Exercise : Epsilon-greedy policy

One of the issues with the greedy policy is that some possible trajectories may never be visited depending on the initial estimation of the value function.

To avoid this create an epsilon-greedy policy. These policies act differently when running in training mode and evaluation mode. In evaluation mode they act as a greedy policy. In training mode:
- with probability epsilon, uniformly selects an action
- else selects the action with maximum value (greedy policy)

Implement the `EpsilonGreedyPolicy` class.


In [129]:
class EpsilonGreedyPolicy(GreedyPolicy):
    def __init__(self, action_space, q, epsilon,
                 epsilon_decay=1, epsilon_min=0):
        super().__init__(action_space, q)
        self.epsilon_start = epsilon
        self.epsilon_decay = epsilon_decay
        self.epsilon_min = epsilon_min

        self.epsilon = self.epsilon_start
        self.mode = "train"

    def sample(self, state):
        ### BEGIN SOLUTION
        if self.mode == "train":
          if np.random.random() < self.epsilon :
            action = np.random.choice(self.n_actions)
          else :
            action = self._max_value_action(state)
        elif self.mode == "eval":

           action = self._max_value_action(state)
        return action

    def train(self):
        self.mode = "train"

    def eval(self):
        self.mode = "eval"




        ### END SOLUTION


    def begin_episode(self, episode_index):
        # Start of an episode
        self.epsilon = self.epsilon_start * (self.epsilon_decay ** episode_index)
        self.epsilon = max(self.epsilon, self.epsilon_min)

# Instantiate a policy
q_mc_everyvisit = actionvalue_montecarlo_every(env, gamma, uniform_policy, n_episodes)
dummy = EpsilonGreedyPolicy(env.action_space, q_mc_everyvisit, epsilon=0.5)


# Sample 20 actions from state 0 in train mode
dummy.train()
actions = [dummy.sample(0) for i in range(20)]
print(actions)


# Sample 20 actions from state 0 in eval mode
dummy.eval()
actions = [dummy.sample(0) for i in range(20)]
print(actions)

[3, 2, 2, 3, 2, 1, 1, 1, 2, 1, 2, 2, 2, 2, 3, 1, 0, 1, 2, 2]
[2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2]


  q_values = q_values / counts


## Trial 2 - modified epsillon

In [130]:
# Instantiate a policy
q_mc_everyvisit = actionvalue_montecarlo_every(env, gamma, uniform_policy, n_episodes)
dummy = EpsilonGreedyPolicy(env.action_space, q_mc_everyvisit, epsilon=0.7)


# Sample 20 actions from state 0 in train mode
dummy.train()
actions = [dummy.sample(0) for i in range(20)]
print(actions)


# Sample 20 actions from state 0 in eval mode
dummy.eval()
actions = [dummy.sample(0) for i in range(20)]
print(actions)

[0, 2, 2, 0, 3, 2, 2, 2, 0, 3, 3, 3, 0, 0, 3, 3, 1, 0, 3, 0]
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]


  q_values = q_values / counts


## Trial 3 - modified epsillon

In [131]:
# Instantiate a policy
q_mc_everyvisit = actionvalue_montecarlo_every(env, gamma, uniform_policy, n_episodes)
dummy = EpsilonGreedyPolicy(env.action_space, q_mc_everyvisit, epsilon=0.3)


# Sample 20 actions from state 0 in train mode
dummy.train()
actions = [dummy.sample(0) for i in range(20)]
print(actions)


# Sample 20 actions from state 0 in eval mode
dummy.eval()
actions = [dummy.sample(0) for i in range(20)]
print(actions)

[2, 2, 2, 2, 2, 2, 0, 3, 2, 2, 3, 3, 2, 2, 3, 2, 2, 2, 2, 2]
[2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2]


  q_values = q_values / counts


1. Result Analysis :
- In Train mode, 𝜖 =0.5 is set, so about 50% of actions are chosen randomly, and the other 50% are the best actions. As a result, action 2, the best action, is chosen most often, but other actions are also selected.

- In Eval mode, there is no exploration, and only the best action is chosen. Therefore, action 2 is repeatedly selected.

2. New Experiment:
I was curious to see what happens when the value of
𝜖 changes. I modified ϵ to 0.3 and 0.7 and ran the code again to observe the results.When
- 𝜖 = 0.3 the model picked the best action more often because the chance of random actions was lower. This made the results more consistent, with the model focusing on the best action.

- When ϵ=0.7, the model picked random actions more often because the chance of exploration was higher. This caused more variation in the actions.

## Exercise : SARSA

State–action–reward–state–action (SARSA) is a method to learn a policy leveraging the TD estimation of action-value functions and an epsilon-greedy policy.

Since it is based on TD, it does not require full episodes to train, the policy can be improved at each step. We will therefore **not use the `sample_episode`** and reimplement here a similar loop.

Implement the SARSA algorithm, in this case it will be passed a policy of type `EpsilonGreedyPolicy` which stores the action-value (or Q-table). We may access and modify if using `policy.q`.

Score the trained policy.

In [132]:
def policy_learn_sarsa(env, policy, gamma, n_episodes, alpha, max_n_steps=1000, print_every=500):
    # No need to create the tables of action-values since they are stored directly in the policy

    # While we haven't reached the desired number of steps
    for episode in range(n_episodes):
        # Call the policy begin_episode so it can handle epsilon decay
        policy.begin_episode(episode)

        # Print every couple episodes
        if not episode % print_every:
            print(f'ep: {episode}, epsilon: {policy.epsilon:.3f}')

        # Get initial state
        state, info = env.reset()

        # Take the first action acording to the policy
        action = policy.sample(state)

        # While we haven't reached the maximum number of steps for the episode
        for step in range(max_n_steps):
            ### BEGIN SOLUTION
            # Perform a step of the environment
            next_state, reward, terminated, truncated, info = env.step(action)




            # It is done if terminated or truncated
            done = terminated or truncated


            # If episode has finished
            if done :

                # Update the action-value table and leave the loop
                policy.q[state, action] += alpha * (reward - policy.q[state, action])
                break



            # Sample the next action
            next_action = policy.sample(next_state)


            # Update the action-value table
            policy.q[state, action] += alpha * (reward + gamma * policy.q[next_state,  next_action ] - policy.q[state, action])


            ### END SOLUTION


            # Set the current state and action
            state =  next_action
            action = next_action

    return

env = gym.make("FrozenLake-v1", render_mode=None)
q_initial = np.zeros((env.observation_space.n, env.action_space.n))
sarsa_policy = EpsilonGreedyPolicy(env.action_space, q_initial,
                                   epsilon=1, epsilon_decay=0.999, epsilon_min=0.01)

gamma = 0.9
n_episodes = 5000
alpha = 0.2
n_episodes_score = 1000

### BEGIN SOLUTION
# Train the policy
policy_learn_sarsa(env, sarsa_policy, gamma, n_episodes, alpha)

### END SOLUTION

### BEGIN SOLUTION
# Evaluate the policy
avg_episodes_to_goal, avg_steps_to_goal = score_policy(env, gamma, sarsa_policy, n_episodes_score)


### END SOLUTION

print(f'Policy SARSA: {avg_episodes_to_goal:.2%} / {avg_steps_to_goal:.2f}')

ep: 0, epsilon: 1.000
ep: 500, epsilon: 0.606
ep: 1000, epsilon: 0.368
ep: 1500, epsilon: 0.223
ep: 2000, epsilon: 0.135
ep: 2500, epsilon: 0.082
ep: 3000, epsilon: 0.050
ep: 3500, epsilon: 0.030
ep: 4000, epsilon: 0.018
ep: 4500, epsilon: 0.011
Policy SARSA: 2.60% / 20.96


1. Result Analysis : At the start of training, I found that  the actions with ϵ=1.0 were chosen completely randomly. Then ϵ slowly getting smaller, and the policy started choosing the best actions more often.
When I used the SARSA algorithm, the results showed a low goal success rate of 1.00% with an average of 27.40 steps per episode. I discovered that this was likely because the FrozenLake environment is hard to explore, and the SARSA training didn’t fully learn the best policy.

2. Difficulty : I think there is a problem with how I wrote the SARSA method in my code. The results show that something is not working right, but I haven’t found where the mistake is yet. Finding and fixing this will be an important task for me in the future.

## Exercise : Q-learning

Q-learning is very similar to SARSA. **SARSA is an on-policy learning method** in which the action-values are updated following the same policy. In SARSA the action-values are updated using the value of the next state and next action taken.

Q-learning is off-policy, it does not assume the same policy when updating the action-values, instead it assumes an optimal policy by using the maximum value of the next state.

Implement a modified version of `policy_learn_sarsa` that performs Q-learning.

In [133]:
def policy_learn_qlearn(env, policy, gamma, n_episodes, alpha, max_n_steps=1000, print_every=500):
    # No need to create the tables of action-values since they are stored directly in the policy

    # While we haven't reached the desired number of steps
    for episode in range(n_episodes):
        # Call the policy begin_episode so it can handle epsilon decay
        policy.begin_episode(episode)

        # Print every couple episodes
        if not episode % print_every:
            print(f'ep: {episode}, epsilon: {policy.epsilon:.3f}')

        # Get initial state
        state, info = env.reset()

        # Take the first action acording to the policy
        action = policy.sample(state)

        # While we haven't reached the maximum number of steps for the episode
        for step in range(max_n_steps):
            ### BEGIN SOLUTION
            # Perform a step of the environment
            state_next, reward, terminated, truncated, info = env.step(action)



            # Set as done if terminated or truncated
            done = terminated or truncated


            if done :

                # Update the action-value table and leave the loop
                policy.q[state, action] += alpha * (reward - policy.q[state, action])
                break



            # Sample the next action
            action_next = policy.sample(state_next)


            # Update the action-value table
            policy.q[state, action] += alpha * (reward + gamma * np.max(policy.q[state_next]) - policy.q[state, action])

            ### END SOLUTION

            # Set the current state and action
            state = state_next
            action = action_next

    return

env = gym.make("FrozenLake-v1", render_mode=None)

q_initial = np.zeros((env.observation_space.n, env.action_space.n))
qlearn_policy = EpsilonGreedyPolicy(env.action_space, q_initial,
                                    epsilon=1, epsilon_decay=0.999, epsilon_min=0.01)

gamma = 0.9
n_episodes = 5000
alpha = 0.2
n_episodes_score = 1000

### BEGIN SOLUTION
# Train the policy

policy_learn_qlearn(env, qlearn_policy, gamma, n_episodes, alpha)
### END SOLUTION

### BEGIN SOLUTION
# Evaluate the policy
avg_episodes_to_goal, avg_steps_to_goal = score_policy(env, gamma, qlearn_policy, n_episodes_score)

### END SOLUTION

print(f'Policy Q-learn: {avg_episodes_to_goal:.2%} / {avg_steps_to_goal:.2f}')




ep: 0, epsilon: 1.000
ep: 500, epsilon: 0.606
ep: 1000, epsilon: 0.368
ep: 1500, epsilon: 0.223
ep: 2000, epsilon: 0.135
ep: 2500, epsilon: 0.082
ep: 3000, epsilon: 0.050
ep: 3500, epsilon: 0.030
ep: 4000, epsilon: 0.018
ep: 4500, epsilon: 0.011
Policy Q-learn: 31.40% / 33.35


1. Result Analysis : The epsilon values in Q-Learning were adjusted in the same way as in SARSA. I found that Q-Learning had a much better success rate, with 42.20%, compared to SARSA’s 1.00%. The average steps to reach the goal were 29.01, which is close to SARSA, but Q-Learning was much more successful. I learned that using the maximum Q-value of the next state makes Q-Learning more effective. I think Q-Learning is a stronger and better method for reinforcement learning.

## (Optional) Exercise : Train for Taxi

Train an epsilon-greedy policy using Q-learning on the `Taxi-v3` environment.

Score the performance of this policy and compare it to a uniform policy.

In [None]:
!pip install gym[toy_text]

In [None]:
!pip install gymnasium[toy_text] --upgrade

In [None]:
import gym
print(gym.__version__)

## modified score_policy and policy_learn_qlearn for Taxi environment

In [None]:
def score_policy(env, policy, n_episodes):
    total_rewards = 0
    total_steps = 0

    for _ in range(n_episodes):
        state = env.reset()
        done = False
        steps = 0

        while not done:
            action = policy.sample(state)
            state, reward, done, _ = env.step(action)
            total_rewards += reward
            steps += 1

        total_steps += steps

    avg_rewards = total_rewards / n_episodes
    avg_steps = total_steps / n_episodes
    return avg_rewards, avg_steps


In [None]:
def policy_learn_qlearn(env, policy, gamma, n_episodes, alpha, max_n_steps=1000, print_every=500):
    # No need to create the tables of action-values since they are stored directly in the policy

    # While we haven't reached the desired number of steps
    for episode in range(n_episodes):
        # Call the policy begin_episode so it can handle epsilon decay
        policy.begin_episode(episode)

        # Print every couple episodes
        if not episode % print_every:
            print(f'ep: {episode}, epsilon: {policy.epsilon:.3f}')

        # Get initial state
        state = env.reset()

        # Take the first action acording to the policy
        action = policy.sample(state)

        # While we haven't reached the maximum number of steps for the episode
        for step in range(max_n_steps):
            ### BEGIN SOLUTION
            # Perform a step of the environment
            state_next, reward, done,info = env.step(action)



            # Set as done if terminated or truncated
            done = terminated or truncated


            if done :

                # Update the action-value table and leave the loop
                policy.q[state, action] += alpha * (reward - policy.q[state, action])
                break



            # Sample the next action
            action_next = policy.sample(state_next)


            # Update the action-value table
            policy.q[state, action] += alpha * (reward + gamma * np.max(policy.q[state_next]) - policy.q[state, action])

            ### END SOLUTION

            # Set the current state and action
            state = state_next
            action = action_next

    return

env = gym.make("FrozenLake-v1", render_mode=None)





In [93]:
### BEGIN SOLUTION
import gym
env = gym.make("Taxi-v3")

q_initial = np.zeros((env.observation_space.n, env.action_space.n))
qlearn_policy = EpsilonGreedyPolicy(env.action_space, q_initial,
                                    epsilon=1.0, epsilon_decay=0.999, epsilon_min=0.01)

gamma = 0.9
n_episodes = 5000
alpha = 0.2
n_episodes_score = 1000

# Train the policy
policy_learn_qlearn(env, qlearn_policy, gamma, n_episodes, alpha)

# Evaluate the policy
avg_rewards, avg_steps = score_policy(env, qlearn_policy, n_episodes_score)

print(f'Policy Q-learn: {avg_rewards:.2f}% / {avg_steps:.2f}')










### END SOLUTION

ep: 0, epsilon: 1.000


  if not isinstance(terminated, (bool, np.bool8)):


ep: 500, epsilon: 0.606
ep: 1000, epsilon: 0.368
ep: 1500, epsilon: 0.223
ep: 2000, epsilon: 0.135
ep: 2500, epsilon: 0.082
ep: 3000, epsilon: 0.050
ep: 3500, epsilon: 0.030
ep: 4000, epsilon: 0.018
ep: 4500, epsilon: 0.011
Policy Q-learn: 7.33% / 13.35


- Result Analysis : I tried to modify the policy evaluation code for Taxi. However I couldn't understand how the game works. The most difficult things to understand is the way they gain rewards. For example, I couldn't understand when they earn rewards and when they get the minus rewards. I'll figure it out later. Moreover I faced some issue regarding a version of gym. I got many troubles when I reset the environment.

## (Optional) Exercise : Render Taxi

Run 3 episodes of a Q-learn trained policy on `Taxi-v3` this time rendering the result.

In [96]:
### BEGIN SOLUTION
import gym
from gym.wrappers import RecordVideo
import os
from IPython.display import Video


video_folder = "./videos"
if not os.path.exists(video_folder):
    os.makedirs(video_folder)


env = gym.make("Taxi-v3", render_mode="rgb_array")
env = RecordVideo(env, video_folder=video_folder)


observation = env.reset()


for _ in range(3):
    action = env.action_space.sample()
    state_next, reward, done, info = env.step(action)

    if done:
        observation= env.reset()


env.close()


video_files = [f for f in os.listdir(video_folder) if f.endswith(".mp4")]
if video_files:

    display(Video(os.path.join(video_folder, video_files[0]), embed=True))
else:
    print("No video File.")





### END SOLUTION

  and should_run_async(code)
  deprecation(
  deprecation(
  logger.warn(
  if not isinstance(terminated, (bool, np.bool8)):


1. Result Analysis : I took 3 samples and record the video. Since the way to display the video between frozen lake and taxi, I modified codes fitted in the Taxi environment.

## (Bonus) Exercise : A observation space environment

Try to perform Q-learning on an environment (e.g. `CartPole-v1`) with continuous action and observation spaces.

You will need to discretize the observation space.

In [None]:
# Example of rendering and showing a CartPole-v1 environment
env = RecordVideo(gym.make("CartPole-v1", render_mode="rgb_array"))
print(env.action_space)
print(env.observation_space)

observation, info = env.reset()

while True:
    env.render()

    action = env.action_space.sample()

    state_next, reward, terminated, truncated, info = env.step(action)

    done = terminated or truncated

    if done:
      break;

env.close()

In [None]:
class ObservationWrapper(gym.ObservationWrapper):
    def __init__(self, env, n_states=4096, bounds=(-50, 50)):
        super().__init__(env)

        self.n_states = n_states

        assert isinstance(env.observation_space, gym.spaces.box.Box)

        ### BEGIN SOLUTION
        # Get the dimensions of the observation space



        # Decide how many bins per dimension based on the target n_states


        # Define the bins to discretize based on the bounds and self.n_bins

        ### END SOLUTION

    def observation(self, obs):
        ### BEGIN SOLUTION
        # Quantize the observations (states)


        # Once quantized each dimension compute a single observation index

        ### END SOLUTION
        return new_obs

# Initialize environment, wrap to render
env = gym.make("CartPole-v1", render_mode="rgb_array")

# Wrap to discretize the observation space
env = ObservationWrapper(env)

### BEGIN SOLUTION
# Initialize policy






# Train the policy








# Recreate the wrapped environment



# Render and show an episode with the trained policy




### END SOLUTION