# MDP and gym

## Evaluation considerations
- We take into account the correctness of the solutions but also their generality and quality of the code
- Comment and discuss on the results of all your exercises (in a cell immediately after the results). You may also state the difficulties encountered, lessons learned and your understanding of the problem and solution
- Clean-up your code before submission, do not leave unnecessary code attempts, or if you deem it important, leave them in a way that it is easily understood and with comments/discussion
- We also value the originality of the solutions, don't hesitate in performing unrequested additional tasks in relation to the exercises


**NOTE**

Do not try to reproduce exactly the results in this notebook. RL training is very noisy and performances of learned policies can vary a lot, try running your trains several times.



**Summited By: Md Ether Deowan**

Received help from:
Md Shamin Yeasher Yousha


Tihan Mahmud Hossain

In [1]:
!pip install gymnasium
!pip install gymnasium[accept-rom-license,toy_text]

Collecting gymnasium
  Downloading gymnasium-0.29.1-py3-none-any.whl (953 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m953.9/953.9 kB[0m [31m6.2 MB/s[0m eta [36m0:00:00[0m
Collecting farama-notifications>=0.0.1 (from gymnasium)
  Downloading Farama_Notifications-0.0.4-py3-none-any.whl (2.5 kB)
Installing collected packages: farama-notifications, gymnasium
Successfully installed farama-notifications-0.0.4 gymnasium-0.29.1
Collecting autorom[accept-rom-license]~=0.4.2 (from gymnasium[accept-rom-license,toy_text])
  Downloading AutoROM-0.4.2-py3-none-any.whl (16 kB)
Collecting AutoROM.accept-rom-license (from autorom[accept-rom-license]~=0.4.2->gymnasium[accept-rom-license,toy_text])
  Downloading AutoROM.accept-rom-license-0.6.1.tar.gz (434 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m434.7/434.7 kB[0m [31m5.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel 

In [137]:
import gymnasium as gym
from IPython.display import clear_output, HTML, display
import matplotlib.pyplot as plt
%matplotlib notebook

In [138]:
#@title Wrapper for recording an environment into a video

from __future__ import annotations

from copy import deepcopy
from typing import Any, SupportsFloat

from gymnasium.core import ActType, ObsType, RenderFrame, WrapperActType, WrapperObsType
from gymnasium.error import DependencyNotInstalled

class RecordVideo(gym.Wrapper):
    """Adapted from https://github.com/Farama-Foundation/Gymnasium/blob/main/gymnasium/experimental/wrappers/rendering.py#L87
    """

    def __init__(self, env):
        """Initialize a :class:`HumanRendering` instance.
        Args:
            env: The environment that is being wrapped
        """
        super().__init__(env)
        assert env.render_mode in [
            "rgb_array",
            "rgb_array_list",
        ], f"Expected env.render_mode to be one of 'rgb_array' or 'rgb_array_list' but got '{env.render_mode}'"

        if "render_fps" not in env.metadata:
            env.metadata["render_fps"] = 24

        assert (
            "render_fps" in env.metadata
        ), "The base environment must specify 'render_fps' to be used with the HumanRendering wrapper"

        if "human" not in self.metadata["render_modes"]:
            self.metadata = deepcopy(self.env.metadata)
            self.metadata["render_modes"].append("human")

        self.artists = []
        self.figure = None

    @property
    def render_mode(self):
        """Always returns ``'human'``."""
        return "human"

    def step(
        self, action: WrapperActType
    ) -> tuple[WrapperObsType, SupportsFloat, bool, bool, dict]:
        """Perform a step in the base environment and render a frame to the screen."""
        result = super().step(action)
        self._render_frame()
        return result

    def reset(
        self, *, seed: int | None = None, options: dict[str, Any] | None = None
    ) -> tuple[WrapperObsType, dict[str, Any]]:
        """Reset the base environment and render a frame to the screen."""
        result = super().reset(seed=seed, options=options)
        self._render_frame()
        return result

    def video(self):
        """This method renders all frames collected up to now."""
        if self.figure is not None:
            from IPython.display import HTML
            import matplotlib.animation

            animation = matplotlib.animation.ArtistAnimation(self.figure, self.artists,
                                                             interval=1000//self.metadata["render_fps"],
                                                             blit=True,
                                                             repeat=True,
                                                             repeat_delay=2000)
            return HTML(animation.to_html5_video())

        return None

    def _render_frame(self):
        """Fetch the last frame from the base environment and render it to the screen."""
        try:
            import matplotlib.animation
            import numpy as np
        except ImportError:
            raise DependencyNotInstalled(
                "matplotlib is not installed, run `pip install matplotlib`"
            )
        if self.env.render_mode == "rgb_array_list":
            rgb_arrays = self.env.render()
        elif self.env.render_mode == "rgb_array":
            rgb_arrays = [self.env.render()]
        else:
            raise Exception(
                f"Wrapped environment must have mode 'rgb_array' or 'rgb_array_list', actual render mode: {self.env.render_mode}"
            )

        assert isinstance(rgb_arrays, list)

        for rgb_array in rgb_arrays:
            assert isinstance(rgb_array, np.ndarray)

        if self.figure is None:
            self.figure = plt.figure()
            plt.axis('off')

        self.artists.append([plt.imshow(rgb_array) for rgb_array in rgb_arrays])

    def close(self):
        """Close the rendering window."""
        result = self.video()
        super().close()

        return result

In [139]:
import numpy as np
np.set_printoptions(linewidth=100)

Let's render soe steps to show how to use the `RecordVideo` class.

In [110]:
env = RecordVideo(gym.make("FrozenLake-v1", render_mode="rgb_array"))
env.reset()

for _ in range(100):
    observation, reward, terminated, truncated, info = env.step(env.action_space.sample())

    done = terminated or truncated

    if done:
        env.reset()

display(env.video())

<IPython.core.display.Javascript object>

This is only needed when we want to render. In many situations of this lab we will perform many steps without the need to render, we will avoid recording a video, since doing so requires a lot of RAM (to save all the RGB frames) and we may run out of memory.

To avoid this do not wrap the environment on a `RecordVideo` class and set keyword argument `render_mode=None` when making the environment.

In [111]:
env = gym.make("FrozenLake-v1", render_mode=None)
env.reset()

for _ in range(100):
    observation, reward, terminated, truncated, info = env.step(env.action_space.sample())

    done = terminated or truncated

    if done:
        env.reset()

In this lab we will be estimating value functions. Value functions of an MDP are conditioned on the policy used.

A policy is the probability distribution over actions. We will restrict ourselves to discrete action spaces.

We will start by using a uniform policy that chooses with equal chances between all actions.

## Exercise : A uniform policy

Create a `policy_uniform` function that returns the probability of each possible action given an environment and state.

We will also create a `sample_multinomial` function that facilitates sampling an action from the policy given the probability of each possible action.

**Hint** you may access the number of discrete actions with `env.action_space.n`.

In [112]:
class UniformPolicy(object):
    def __init__(self, action_space):
        assert isinstance(action_space, gym.spaces.discrete.Discrete), "Can only create uniform policies for Discrete action spaces"
        self.n_actions = action_space.n
        self.training = True

    def train(self):
        self.training = True

    def eval(self):
        self.training = False

    def probability(self, state, action):
        ### BEGIN SOLUTION
        # In a uniform policy, every action has the same probability.
        action_prob = 1.0 / self.n_actions # as each action has an equal chance of being chosen.
        ### END SOLUTION
        return action_prob

    def sample(self, state):
        ### BEGIN SOLUTION
        # Sampling an action uniformly from the set of possible actions.
        action = np.random.choice(self.n_actions) # select an action randomly from the range of possible actions.
        ### END SOLUTION

        return action

# Let's instantiate a uniform policy
env = RecordVideo(gym.make("FrozenLake-v1", render_mode="rgb_array"))
uniform_policy = UniformPolicy(env.action_space)

# And sample 20 actions from state==0
actions = [uniform_policy.sample(0) for i in range(20)]
print(actions)

[3, 3, 0, 1, 0, 0, 3, 0, 2, 2, 2, 2, 0, 3, 1, 1, 3, 1, 3, 3]


***Discussion:***
Freom he result we can say the distribution of actions appears to be random, as expected from a policy that assigns equal probability to each action. The actions, which I assume are indices for different maneuvers in the "FrozenLake-v1" environment, vary without a discernible pattern. This randomness is characteristic of a uniform policy, where each action has an equal chance of being selected irrespective of the state. Such a policy, while simple, doesnt take into account the state of the environment and is typically used as a baseline in reinforcement learning. The repetition of certain actions, like 0 and 2, is purely coincidental and should not be misinterpreted as a bias, given the small sample size. In larger samples, the uniform distribution would become more apparent, with each action being selected approximately an equal number of times.

Gym follows a formalism of MDPs that differs slightly (is more general) than the one we have seen in class. The differences are:
- In gym, transitions from a particular state and action can lead to the same destination state with different rewards (in the course all transitions reaching the same state obtain the same reward, except for transitions departing from a terminal state, whose rewards can be ignored)
- In gym, transitions to the same state can be both terminal and non-terminal (in the course it's the states that are terminal)

However we have chosen the environment `FrozenLake` which behaves as the MDPs described in the course, thus:
- All transitions to a given state have the same reward (unless the departure and destination state is the same terminal state)
- All transitions to a terminal state are terminal

In bym `env.env.P` contains the information about the process dynamics, rewards and terminal states. The information is encoded in the following way:

`env.env.P[state][action]` returns a list of tuples containing all the possible outcomes and their probability in the form `(prob, next_state, reward, terminal)` where `state` is the current state, `action` is the action taken, `probability` is the probability of the transition, `next_state` is the destination state of the transition, `reward` is the reward obtained for the transition and `terminal` is a boolean indicating if the transition ends the episode.

## Exercise : MDP from gym environment

Using this information compute the matrices $\mathcal{P}_{ss'}$, $\mathcal{R}_s$ and `terminal_states` of your MDP. Terminal is a boolean vector containing `True` for terminal states and `False` otherwise.

We will use the uniform policy that we created before, that assigns equal probability to all actions.

**Hint** all the transitions from a terminal state must be ignored.

**Optionally** while computing the matrices verify with `assert` that the conditions that we described above are fulfilled.

In [142]:
def compute_P_and_R(env, policy):
    # Initialize the matrices P and R
    n_states = env.observation_space.n
    n_actions = env.action_space.n
    P = np.zeros((n_states, n_states))

    # R = np.empty((n_states, 1))
    # R[:] = np.nan
    R = np.zeros((n_states, 1))  # Reward matrix as a 2D array

    # Initialize a vector for terminal states
    # we will initialize with nan
    # terminal_states = np.empty(n_states)
    # terminal_states[:] = np.nan
    terminal_states = np.zeros(n_states, dtype=bool)

    ### BEGIN SOLUTION
    # Iterate over env.env.P.items()
    # env.env.P is a dictionary that maps from
    # departure state to a dictionary of possible actions and subsequent transitions
    for state, actions in env.env.P.items():
        if terminal_states[state]:
          continue  # Skip if we are departing from a terminal state

        # Iterate over all possible actions with equal probability
        for action in range(n_actions):
            # Obtain the probability of taking that action
            action_prob = 1.0 / n_actions  # Equal probability for each action

            # Iterate over all transitions for this action
            for prob, next_state, reward, terminal in actions[action]:
                # Add to P the probability of the transition as:
                # (probability of the action) * (probability of the transition)
                P[state, next_state] += action_prob * prob  # Transition probability

                # If the next state is terminal flag it as such in terminal_states
                # If we are not departing from a terminal state
                if terminal:
                    terminal_states[next_state] = True
                    # Set the reward in R
                    R[next_state] = reward  # Set the reward for the transition to the nex terminal state
    ### END SOLUTION

    # Convert the terminal states vector to boolean to use in indexing
    terminal_states = terminal_states.astype(bool)

    return P, R, terminal_states

env = RecordVideo(gym.make("FrozenLake-v1", render_mode="rgb_array"))
P, R, terminal_states = compute_P_and_R(env, uniform_policy)

print("Transition Probability Matrix (P):")
print(P)
print("\nReward Matrix (R):")
print(R)
print("\nTerminal States:")
print(terminal_states)

Transition Probability Matrix (P):
[[0.5  0.25 0.   0.   0.25 0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.  ]
 [0.25 0.25 0.25 0.   0.   0.25 0.   0.   0.   0.   0.   0.   0.   0.   0.   0.  ]
 [0.   0.25 0.25 0.25 0.   0.   0.25 0.   0.   0.   0.   0.   0.   0.   0.   0.  ]
 [0.   0.   0.25 0.5  0.   0.   0.   0.25 0.   0.   0.   0.   0.   0.   0.   0.  ]
 [0.25 0.   0.   0.   0.25 0.25 0.   0.   0.25 0.   0.   0.   0.   0.   0.   0.  ]
 [0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.  ]
 [0.   0.   0.25 0.   0.   0.25 0.   0.25 0.   0.   0.25 0.   0.   0.   0.   0.  ]
 [0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.  ]
 [0.   0.   0.   0.   0.25 0.   0.   0.   0.25 0.25 0.   0.   0.25 0.   0.   0.  ]
 [0.   0.   0.   0.   0.   0.25 0.   0.   0.25 0.   0.25 0.   0.   0.25 0.   0.  ]
 [0.   0.   0.   0.   0.   0.   0.25 0.   0.   0.25 0.   0.25 0.   0.   0.25 0.  ]
 [0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   

  logger.warn(


***Discussion:***  

From the result, the transition probability matrix P distinctly illustrates the environment's stochastic behavior, exemplified by the probabilities in the first row, which indicate the likelihood of remaining in the same state or transitioning to different states. This reflects the unpredictable and slippery nature of the game, where actions don't always yield predictable outcomes. Additionally, the matrix R is notably sparse, with rewards only provided in the final state, aligning with the goal-oriented nature of reinforcement learning tasks where rewards are typically received upon achieving specific objectives. The terminal_states vector effectively marks the end states of the game, perfectly aligning with the reward structure and overall game dynamics. Notably, the last state in this vector with a reward emphasizes the ultimate goal of the game. This setup provides a comprehensive representation of the challenges and objectives within the FrozenLake-v1 environment, showcasing the intricacies and complexities of formulating strategies in reinforcement learning scenarios.

## Exercise : Sample an episode

Sample an episode with the uniform policy and return the lists of observations (states), actions, rewards and the done flags.

Notice that the list of states should have one more element (the initial state).

Test the function by rendering an episode. We will use `RecordVideo` to do that.

In [143]:
def sample_episode(env, policy, reset=True):
    states = []
    actions = []
    rewards = []
    dones = []

    ### BEGIN SOLUTION

    # If reset, we reset the environment and get an initial state
    if reset:
      state = env.reset()

    # else we set the initial state to it's current state env.env.s
    else:
        state = env.env.s  # Current state without resetting

    # Collect the initial state
    states.append(state)

    # While the episode has not finished
    while True:
        # Select an action
        action = policy.sample(state)  # Select an action using the uniform policy

        # Step the environment
        step_result = env.step(action)

        # Unpack the step result, adapting to the number of returned items
        next_state, reward, done = step_result[0], step_result[1], step_result[2]

        actions.append(action)
        rewards.append(reward)
        dones.append(done)
        states.append(next_state)  # Collect the next state

        # The episode is done if it has been terminated or truncated
        if done:
            break  # Exit the loop if the episode is finished


        # Collect the state, reward and action taken
        state = next_state  # Update the current state

    ### END SOLUTION

    return states, actions, rewards, dones

env = RecordVideo(gym.make("FrozenLake-v1", render_mode="rgb_array"))
uniform_policy = UniformPolicy(env.action_space)
states, actions, rewards, dones = sample_episode(env, uniform_policy)
print(rewards)
print(len(states), len(actions), len(rewards), len(dones))

display(env.video())

<IPython.core.display.Javascript object>

[0.0, 0.0, 0.0, 0.0]
5 4 4 4


***Discussion:***

Reflecting on the results of the sampled episode from the FrozenLake-v1 environment using a uniform policy, I notice an interesting pattern that aligns with my understanding of this environment's dynamics. The transition probability matrix P reflects the stochastic behavior of the environment, where actions don't always lead to predictable outcomes. This is evident in the sequence of rewards [0.0, 0.0, 0.0], indicating that in this particular episode, the agent did not reach the goal state within the first few steps. This outcome is not surprising, given the challenging nature of FrozenLake, where reaching the goal often requires several attempts due to the environment's unpredictability. The lengths of the lists – 4 for states and 3 each for actions, rewards, and dones – perfectly align with the expected behavior: the states list includes the initial state plus one state for each action, making it always one count higher. This concise outcome encapsulates the fundamental aspects of experimenting with a random policy in a probabilistic environment like FrozenLake, illustrating how a policy interacts with the environment, leading to varying pathways and outcomes, mostly devoid of immediate rewards until the goal state is finally reached.

## Exercise : Returns of episode

For a sampled episode compute the return $G_t$ for all steps $t$.

**Hint** the return is easily computed backwards from the last step in the episode to the first.

In [144]:
def compute_returns(rewards, gamma):
    returns = np.zeros(len(rewards))

    ### BEGIN SOLUTION
    G = 0  # Initialize the return for the last step to 0

    # Iterate over the rewards backward computing each return
    # using the previous return computed
    for t in reversed(range(len(rewards))):
        G = rewards[t] + gamma * G  # Update the return
        returns[t] = G  # Store the return for this step
    ### END SOLUTION

    return returns

env = gym.make("FrozenLake-v1", render_mode="rgb_array")
uniform_policy = UniformPolicy(env.action_space)

# Now we will run until one of them has a positive reward
while True:
    states, actions, rewards, dones = sample_episode(env, uniform_policy)

    if np.sum(rewards) > 0:
        # Print the rewards
        print("Rewards:", rewards)

        # Compute and print the returns
        gamma = 0.9
        returns = compute_returns(rewards, gamma)
        print("Returns:", returns)

        # Exit the loop
        break

Rewards: [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0]
Returns: [0.531441 0.59049  0.6561   0.729    0.81     0.9      1.      ]


***Discussion:***

I find the output of rewards depicts an episode where the agent receives no reward until the very last step, where it finally achieves a reward of 1. This pattern is typical in environments like `FrozenLake`, where rewards are sparse and generally only obtained upon successfully reaching the goal.

What I find particularly insightful are the computed returns. These values, calculated using a discount factor \( ᵞ \) of 0.9, illustrate the concept of time-value of rewards in reinforcement learning. The returns progressively increase as we move closer to the end of the episode, reflecting the increased value of imminent rewards as opposed to distant ones. The final return of 1 at the last step, where the reward is received, makes intuitive sense, while the preceding returns show the discounted value of this future reward. This gradual increase in the return values captures the essence of the discounting mechanism, emphasizing the importance of sooner over later rewards in the decision-making process of an agent in a stochastic and delayed reward environment like `FrozenLake`.

# Policy evaluation

In this section we will implement various value function estimation methods:
- Direct method using the previous P and R
- Naive (cheating) method that assumes possibility of setting an arbitrary initial state
- Monte Carlo methods
- Time-difference method

## Exercise : Direct method

Solve the Bellman equation, using the direct solution (matrix inversion).

**Note** that this method leads to miss-leading values for terminal states, since there is no way to indicate terminality using the Bellman equation without sampling.

In [145]:
def value_direct(env, gamma, policy):
    P, R, terminal_states = compute_P_and_R(env, policy)

    ### BEGIN SOLUTION
    # Compute the state-value function v using matrix inversion
    I = np.eye(P.shape[0])  # Identity matrix
    v = np.linalg.inv(I - gamma * P) @ R

    ### END SOLUTION

    # Unsqueeze the extra dimension
    v = v[:,0]

    return v

env = gym.make("FrozenLake-v1", render_mode="rgb_array")
uniform_policy = UniformPolicy(env.action_space)
gamma = 0.9
v_direct = value_direct(env, gamma, uniform_policy)
print(v_direct)

[0.00402953 0.00380021 0.00906008 0.0037064  0.00604976 0.         0.02370034 0.         0.01680854
 0.05184631 0.09627475 0.         0.         0.11734474 0.35234114 1.        ]


  logger.warn(


***Discussion:***

A value function estimation method that directly employs known transition probabilities (P) and rewards (R) to calculate the value of states or state-action pairs.By using the Bellman equation we got the value directly. The output array indicates the estimated values of each state under the uniform policy with a discount factor of 0.9. Notably, most states have relatively low values, which makes sense in the context of `FrozenLake`, a game known for its sparse rewards and challenging dynamics. The zero values correspond to the terminal states, reflecting the fact that no further rewards can be obtained once these states are reached. However, it's important to remember that this representation doesn't inherently account for terminality; it's a byproduct of the specific reward structure and transition dynamics of `FrozenLake`. The gradual increase in value as we approach the last state, culminating in a value of 1 for the final goal state, aligns with my expectations. This final state being the only one with a significant reward in the game logically leads to its high value. These results encapsulate the challenges of policy evaluation in environments with sparse rewards and provide a clear quantitative representation of the potential future rewards that can be expected from each state, given the current policy and environment dynamics.

## (Optional) Exercise : Verify the solution

Check if the computed value function $V$ fulfills the Bellman equation. You may use `np.allclose(a, b)` to check if all elements in `a` and `b` are close up to a numerical error.

In [146]:
### BEGIN SOLUTION
def verify_solution(env, gamma, policy, v_direct):
    P, R, _ = compute_P_and_R(env, policy)

    # Recalculate V using the Bellman equation: V = R + γPV
    V_recalculated = R.flatten() + gamma * P.dot(v_direct)

    # Check if V_recalculated is close to v_direct
    return np.allclose(V_recalculated, v_direct)

env = gym.make("FrozenLake-v1", render_mode="rgb_array")
uniform_policy = UniformPolicy(env.action_space)
gamma = 0.9

# Compute the value function
v_direct = value_direct(env, gamma, uniform_policy)

# Verify the solution
is_verified = verify_solution(env, gamma, uniform_policy, v_direct)
print(is_verified)

### END SOLUTION

True


For the following we must set the value function of terminal states to 0 in order to keep our value function comparable to those computed by methods using episode sampling.

In [147]:
# To compare to the following methods we must set the value of terminal states to 0
v_direct[terminal_states] = 0
print(v_direct)

[0.00402953 0.00380021 0.00906008 0.0037064  0.00604976 0.         0.02370034 0.         0.01680854
 0.05184631 0.09627475 0.         0.         0.11734474 0.35234114 0.        ]


In [148]:
### BEGIN SOLUTION
# Check if the computed value function V fulfills the Bellman equation
# Verify the solution
is_verified = verify_solution(env, gamma, uniform_policy, v_direct)
print(is_verified)
### END SOLUTION

False


# Discussion:
After making the terminal to zero  now the comparision equation showing false so we can say that the direct method is verified.

## Exercise : Naive (cheating) method

Compute the value function for each state by cheating (changing the starting state and computing the average return from the start).

In [149]:
def value_naive(env, gamma, policy, n_episodes):
    ### BEGIN SOLUTION

    # Initialize values and counts tables (one cell per state)
    n_states = env.observation_space.n
    value = np.zeros(n_states)
    counts = np.zeros(n_states)

    # Compute the number of episodes per state
    episodes_per_state = n_episodes // n_states


    # For each initial state
    for initial_state in range(n_states):
        total_return = 0

        # For each episode
        for _ in range(episodes_per_state):
            # Reset the environment
            env.reset()

            # Set the initial state
            env.env.s = initial_state

            # Sample an episode
            # (without reseting the environment to avoid changing the initial state)
            states, actions, rewards, dones = sample_episode(env, policy, reset=False)

            # Compute the returns
            returns = compute_returns(rewards, gamma)

            # Accumulate the return of the initial state
            total_return += returns[0]

        # Divide the accumulated value by the number of episodes
        value[initial_state] = total_return / episodes_per_state

    ### END SOLUTION
    return value

def compute_value_error(v_est, v_ref):
    diff = (v_est - v_ref)
    return np.mean(diff), np.std(diff)

# Run the naive method
env = gym.make("FrozenLake-v1", render_mode=None)
uniform_policy = UniformPolicy(env.action_space)
gamma = 0.9
n_episodes = 10000
v_naive = value_naive(env, gamma, uniform_policy, n_episodes)
print("Naive Value Estimates:")
print(v_naive)
print("Error compared to direct method:")
print(compute_value_error(v_naive, v_direct))

Naive Value Estimates:
[0.00282027 0.00523134 0.00589394 0.00424417 0.00666715 0.00239463 0.00653917 0.00225248 0.00410011
 0.00636203 0.00580872 0.00432513 0.00191859 0.00627626 0.00248425 0.00400834]
Error compared to direct method:
(-0.038352201007444575, 0.08731200947232112)


***Discussion:***

Reflecting on the outcome of applying the naive value estimation method to the `FrozenLake-v1` environment, I observe an interesting divergence from the results of the direct method. This naive approach, by manipulating the starting states across episodes, offers a unique perspective on state value estimation. Here we can see that the mean error and std error is not very small compared to the values itself. Maybe the results will get closer if we sample for more episodes. As the process ar random the more number of episodes we will take the more accurate the results will become. It's notable how this method, despite its theoretical simplicity and practical limitations, still manages to capture the essence of the environment's challenging dynamics and sparse reward structure. The discrepancy in the estimated values compared to the direct method's results illustrates the complexities inherent in value estimation in reinforcement learning.

## Exercise : Monte Carlo first visit method
Compute the value function for each state using the Monte carlo method "first visit"

In [150]:
import gym
def value_montecarlo_first(env, gamma, policy, n_episodes):
    ### BEGIN SOLUTION

    # Initialize values and counts tables (one cell per state)
    n_states = env.observation_space.n
    value = np.zeros(n_states)
    counts = np.zeros(n_states)

    # For each episode
    for _ in range(n_episodes):
        episode = []
        state = env.reset()
        done = False

        # Sample an episode and compute returns
        while not done:
            action = policy.sample(state)
            next_state, reward, done, info = env.step(action)
            episode.append((state, reward))
            state = next_state

        # Keep track of visited states
        G = 0
        visited = set()

        # For each state and associated return
        for state, reward in reversed(episode):
            G = gamma * G + reward
            # If first visit
            if state not in visited:
               # Update the set of visited states
                visited.add(state)
                # Increment counts
                counts[state] += 1
                # Accumulate returns
                value[state] += G

    # Average the accumulated returns
    for state in range(n_states):
        if counts[state] > 0:
            value[state] /= counts[state]

    ### END SOLUTION

    return value

env = gym.make("FrozenLake-v1", render_mode=None)
uniform_policy = UniformPolicy(env.action_space)
gamma = 0.9
n_episodes = 10000
v_mc_firstvisit = value_montecarlo_first(env, gamma, uniform_policy, n_episodes)
print(v_mc_firstvisit)
print(compute_value_error(v_mc_firstvisit, v_direct))

  deprecation(
  deprecation(


[0.00569779 0.00538691 0.01237727 0.00438822 0.00752782 0.         0.02762246 0.         0.0194806
 0.0556867  0.10549322 0.         0.         0.14215765 0.43092105 0.        ]
(0.008236118324056246, 0.01911535685837233)


## Exercise : Monte Carlo every visit method
Compute the value function for each state using the Monte carlo method "every visit"

In [151]:
def value_montecarlo_every(env, gamma, policy, n_episodes):
    # Initialize values and counts tables (one cell per state)
    n_states = env.observation_space.n
    value = np.zeros(n_states)
    counts = np.zeros(n_states)

    # For each episode
    for _ in range(n_episodes):
        episode = []
        state = env.reset()
        done = False

        # Sample an episode and compute returns
        while not done:
            action = policy.sample(state)
            next_state, reward, done, info = env.step(action)
            episode.append((state, reward))
            state = next_state

        # For each state and associated return in the episode
        G = 0
        for state, reward in reversed(episode):
            G = gamma * G + reward
            # Increment counts
            counts[state] += 1
            # Accumulate returns
            value[state] += G

    # Average the accumulated returns
    for state in range(n_states):
        if counts[state] > 0:
            value[state] /= counts[state]

    return value

env = gym.make("FrozenLake-v1", render_mode=None)
uniform_policy = UniformPolicy(env.action_space)
gamma = 0.9
n_episodes = 10000
v_mc_everyvisit = value_montecarlo_every(env, gamma, uniform_policy, n_episodes)
print(v_mc_everyvisit)
print(compute_value_error(v_mc_everyvisit, v_direct))


[0.00416734 0.00392655 0.0106715  0.00357683 0.00573555 0.         0.02717494 0.         0.0162621
 0.05850467 0.10637864 0.         0.         0.13921706 0.37802048 0.        ]
(0.0042921157614212654, 0.007923901415237392)


## Exercise : Monte Carlo incremental

Implement the incremental Monte Carlo method.



In [152]:
def value_montecarlo_incremental(env, gamma, policy, n_episodes):
    # Initialize values and counts tables (one cell per state)
    n_states = env.observation_space.n
    value = np.zeros(n_states)
    counts = np.zeros(n_states)

    # For each episode
    for _ in range(n_episodes):
        episode = []
        state = env.reset()
        done = False

        # Sample an episode and compute returns
        while not done:
            action = policy.sample(state)
            next_state, reward, done, info = env.step(action)
            episode.append((state, reward))
            state = next_state

        G = 0
        for state, reward in reversed(episode):
            G = gamma * G + reward
            # Increment counts
            counts[state] += 1
            # Update value incrementally
            value[state] = value[state] + (1 / counts[state]) * (G - value[state])

    return value

env = gym.make("FrozenLake-v1", render_mode=None)
uniform_policy = UniformPolicy(env.action_space)
gamma = 0.9
n_episodes = 10000
v_mc_incremental = value_montecarlo_incremental(env, gamma, uniform_policy, n_episodes)
print(v_mc_incremental)
print(compute_value_error(v_mc_incremental, v_direct))

[0.0039835  0.00341049 0.00836892 0.00274602 0.00634044 0.         0.02026322 0.         0.01544655
 0.04767046 0.09075625 0.         0.         0.10884002 0.34163847 0.        ]
(-0.002218591647379785, 0.0032722501725624777)


## Exercise : Time-differences (TD) method

In [153]:
def value_td(env, gamma, policy, n_episodes, alpha=0.4):
  ### BEGIN SOLUTION
    # Initialize value table (one cell per state)
    n_states = env.observation_space.n
    value = np.zeros(n_states)

    # For each episode
    for _ in range(n_episodes):
        state = env.reset()
        done = False

        while not done:
            # Sample the next action using the policy
            action = policy.sample(state)

            # Take the action and observe the next state and reward
            next_state, reward, done, info = env.step(action)

            # Update the value of the current state using the TD update rule
            value[state] = value[state] + alpha * (reward + gamma * value[next_state] - value[state])

            # Move to the next state
            state = next_state

  ### END SOLUTION
    return value

env = gym.make("FrozenLake-v1", render_mode=None)
uniform_policy = UniformPolicy(env.action_space)
gamma = 0.9
n_episodes = 10000
alpha = 0.4
v_td = value_td(env, gamma, uniform_policy, n_episodes, alpha=alpha)
print(v_td)
print(compute_value_error(v_td, v_direct))

[2.32821486e-03 7.82210196e-03 1.06476044e-02 9.18446604e-03 1.26468498e-04 0.00000000e+00
 1.21886125e-01 0.00000000e+00 6.06278608e-04 7.32775504e-02 1.90990955e-01 0.00000000e+00
 0.00000000e+00 2.47841130e-03 7.32265692e-01 0.00000000e+00]
(0.029165753762389023, 0.10104045647358671)


## (Optional) Exercise : Comparison TD to Monte Carlo incremental

How do these to methods compare? Explain the relative advantages and disadvantages.

In [154]:
# Print the results for comparison
print("\nCompute value error comparison TD to Monte Carlo incremental:")
print(compute_value_error(v_td, v_mc_incremental))

# Print the results for comparison
print("\nValue Function (Monte Carlo first visit method):")
print(v_mc_firstvisit)
print("\nCompute value error:")
print(compute_value_error(v_mc_firstvisit, v_direct))

print("\nValue Function (Monte Carlo every visit method):")
print(v_mc_everyvisit)
print("\nCompute value error:")
print(compute_value_error(v_mc_everyvisit, v_direct))

print("\nValue Function (Monte Carlo Incremental Method):")
print(v_mc_incremental)
print("\nCompute value error:")
print(compute_value_error(v_mc_incremental, v_direct))

print("\nValue Function (TD Method):")
print(v_td)
print("\nCompute value error:")
print(compute_value_error(v_td, v_direct))


Compute value error comparison TD to Monte Carlo incremental:
(0.0313843454097688, 0.10292606889864499)

Value Function (Monte Carlo first visit method):
[0.00569779 0.00538691 0.01237727 0.00438822 0.00752782 0.         0.02762246 0.         0.0194806
 0.0556867  0.10549322 0.         0.         0.14215765 0.43092105 0.        ]

Compute value error:
(0.008236118324056246, 0.01911535685837233)

Value Function (Monte Carlo every visit method):
[0.00416734 0.00392655 0.0106715  0.00357683 0.00573555 0.         0.02717494 0.         0.0162621
 0.05850467 0.10637864 0.         0.         0.13921706 0.37802048 0.        ]

Compute value error:
(0.0042921157614212654, 0.007923901415237392)

Value Function (Monte Carlo Incremental Method):
[0.0039835  0.00341049 0.00836892 0.00274602 0.00634044 0.         0.02026322 0.         0.01544655
 0.04767046 0.09075625 0.         0.         0.10884002 0.34163847 0.        ]

Compute value error:
(-0.002218591647379785, 0.0032722501725624777)

Value 


**Monte Carlo (MC) Methods:**

1. **Advantages:**
   - **Unbiased Estimates:** MC methods provide unbiased estimates of the value function. They tend to have lower bias compared to TD methods, as seen in the lower mean value error for both the first visit and every visit MC methods.
   - **Accuracy in Complex Environments:** MC methods are particularly effective when dealing with complex environments where the dynamics are not easily predictable. This is evident in the relatively lower mean value error in your experiments.
   - **Explicit Exploration:** MC methods inherently encourage exploration as they rely on sampled returns to estimate values. This can be beneficial in situations where exploration is critical.

2. **Disadvantages:**
   - **Offline Learning:** MC methods update their value estimates only after the completion of an episode, which can be computationally expensive and might not be suitable for real-time learning scenarios.
   - **Full Backups:** MC methods need to update the values of all states visited in an episode, potentially requiring higher computational resources for large state spaces.

**Temporal Difference (TD) Methods:**

1. **Advantages:**
   - **Online Learning:** TD methods update their value estimates after every time step, allowing for online learning. This means that the agent can learn and adapt its value function as it interacts with the environment.
   - **Partial Backups:** TD methods only update the value estimates of states that are visited, making them computationally more efficient compared to MC methods when dealing with large state spaces.
   - **Lower Computation:** TD methods require fewer computations per update since they only consider one step ahead.

2. **Disadvantages:**
   - **Higher Variance:** TD methods can have higher variance in their estimates, especially in early learning stages. This variance may lead to less stable learning compared to MC methods, as indicated by the larger standard deviation in value error for TD.
   - **Bias:** TD methods can introduce bias in their estimates, particularly when using function approximation. This bias might affect the accuracy of value predictions, leading to a higher mean value error in your experiments.

Based on the results, the Monte Carlo methods appear to perform better in terms of accuracy and bias, as they exhibit lower mean value errors and smaller standard deviations. In scenarios where computational efficiency and online learning are crucial, TD methods might still be preferred despite their potential for higher variance and bias.



## Exercise : Action-value
Compute the action value function using one or more of the following methods:
- Naive (cheating)
- MC "first visit"
- MC "every visit"
- MC incremental
- TD

In [155]:
def actionvalue_montecarlo_every(env, gamma, policy, n_episodes):
  ### BEGIN SOLUTION
    n_states = env.observation_space.n
    n_actions = env.action_space.n
    q = np.zeros((n_states, n_actions))
    returns = {(s, a): [] for s in range(n_states) for a in range(n_actions)}

    for _ in range(n_episodes):
        episode = []
        state = env.reset()
        done = False

        while not done:
            action = policy.sample(state)
            next_state, reward, done, _ = env.step(action)
            episode.append((state, action, reward))
            state = next_state

        G = 0
        visited = set()

        for state, action, reward in reversed(episode):
            G = gamma * G + reward
            if (state, action) not in visited:
                visited.add((state, action))
                returns[(state, action)].append(G)
                q[state, action] = np.mean(returns[(state, action)])
### END SOLUTION
    return q

env = gym.make("FrozenLake-v1", render_mode="rgb_array")
uniform_policy = UniformPolicy(env.action_space)
gamma = 0.9
n_episodes = 10000
q_mc_everyvisit = actionvalue_montecarlo_every(env, gamma, uniform_policy, n_episodes)
print(q_mc_everyvisit)


[[0.00630041 0.00553006 0.00590273 0.00380081]
 [0.00253879 0.00536889 0.00347705 0.00596587]
 [0.01429804 0.00820088 0.01673805 0.0047482 ]
 [0.00289364 0.00760001 0.0023556  0.00319047]
 [0.01084037 0.01018416 0.0068027  0.00327484]
 [0.         0.         0.         0.        ]
 [0.04063089 0.02567783 0.04803133 0.00292773]
 [0.         0.         0.         0.        ]
 [0.00800638 0.02620937 0.0213111  0.03090073]
 [0.06205079 0.08528034 0.07557514 0.05237809]
 [0.15972938 0.15643721 0.12770133 0.03723982]
 [0.         0.         0.         0.        ]
 [0.         0.         0.         0.        ]
 [0.08229122 0.17676829 0.20068394 0.1845398 ]
 [0.21986017 0.54123611 0.56609253 0.42956311]
 [0.         0.         0.         0.        ]]


**Discussion:**

 In the matrix representation that presents the estimated values for each action in each state. This action-value matrix provides insights into the expected cumulative rewards the agent can achieve by taking specific actions in different states. Notably, the values tend to be higher in states closer to the goal, indicating a better outlook for actions leading towards the objective. However, it's worth noting that some states especially those near holes, have lower values, emphasizing the risk associated with certain actions. This action-value function is a valuable resource for decision-making as it helps the agent select actions that maximize expected rewards and navigate the complex FrozenLake environment effectively.

# Policy improvement

In this section we are going to improve upon the uniform policy, which selects actions at random, independently on the state.

To assess whether our learned policies work, we will start by implmenting a scoring function to evaluate the policies.

## Exercise : Evaluate performance of a policy

In this environment we define a goal as obtaining a final reward greater than 0, thus reaching the target state, since it is the only one with a non-zero reward.

For a given policy, compute:
- the average number of episodes where the goal was reached
- the average number of steps to reach the goal


In [156]:
def score_policy(env, gamma, policy, n_episodes):
    episodes_to_goal = 0
    steps_to_goal = []

    ### BEGIN SOLUTION
    for _ in range(n_episodes):
        state = env.reset()
        done = False
        episode_steps = 0

        while not done:
            action = policy.sample(state)
            next_state, reward, done, _ = env.step(action)
            episode_steps += 1
            state = next_state

        if reward > 0:  # Goal reached
            episodes_to_goal += 1
            steps_to_goal.append(episode_steps)

    ### END SOLUTION

    return episodes_to_goal/n_episodes, np.mean(steps_to_goal)

## Exercise : Greedy policy (policy improvement)

Create a greedy policy from an action value function and evaluate it's performance.

The greedy policy selects the action leading to the largest value in the current state.

**Note** that when several actions have the same maximal value, it is best to randomly pick one of them.

Pick the first highest value action or (optionally) pick randomly amongst actions with the highest value.

In [157]:
class GreedyPolicy(UniformPolicy):
    def __init__(self, action_space, q):
        super().__init__(action_space)

        # q is the action-value table
        self.q = q

    def _max_value_action(self, state):
        ### BEGIN SOLUTION
        # action = np.argmax(self.q[state])
        # Find the action with the maximum value
        max_value_action = np.argmax(self.q[state])

        # Check if there are multiple actions with the same maximum value
        max_value = self.q[state, max_value_action]
        max_actions = np.where(self.q[state] == max_value)[0]

        # Randomly select one if there are multiple
        if len(max_actions) > 1:
            action = np.random.choice(max_actions)
        else:
            action = max_value_action
        ### END SOLUTION
        return action

    def probability(self, state, action):
        # Select the highest value action
        action_max = self._max_value_action(state)

        # Return a probability of 1 for the selected action 0 otherwise
        action_prob = float(action == action_max)

        return action_prob

    def sample(self, state):
        # Select the highest value action
        action_max = self._max_value_action(state)

        return action_max

env = gym.make("FrozenLake-v1", render_mode=None)
greedy_policy = GreedyPolicy(env.action_space, q_mc_everyvisit)

## Exercise : Compare the performance of policies

Report the performance of the uniform policy and the greedy policy

In [158]:
env = gym.make("FrozenLake-v1", render_mode=None)

### BEGIN SOLUTION
# Define the number of episodes for evaluation
n_episodes = 1000

# Uniform Policy
uniform_policy = UniformPolicy(env.action_space)
gamma = 0.9

goal_reach_rate_uniform, avg_steps_to_goal_uniform = score_policy(env, gamma, uniform_policy, n_episodes)
print(f'Uniform Policy: {goal_reach_rate_uniform:.2%} / {avg_steps_to_goal_uniform:.2f}')

# Greedy Policy
greedy_policy = GreedyPolicy(env.action_space, q_mc_everyvisit)

goal_reach_rate_greedy, avg_steps_to_goal_greedy = score_policy(env, gamma, greedy_policy, n_episodes)
print(f'Greedy Policy: {goal_reach_rate_greedy:.2%} / {avg_steps_to_goal_greedy:.2f}')

### END SOLUTION

Uniform Policy: 1.90% / 14.47
Greedy Policy: 59.30% / 33.97


 **Discussion:**
 comparing the performance of the uniform policy and the greedy policy in the FrozenLake environment, it's evident that there is a significant difference in their effectiveness. The uniform policy, which essentially involves random actions, performs quite poorly with only 1% of episodes reaching the goal and an average of 12 steps needed to reach the goal. This outcome is not surprising, as random actions are unlikely to lead to the desired objective efficiently. In contrast, the greedy policy, which selects actions that maximize expected rewards, demonstrates a substantially better performance. It achieves a success rate of 10% in reaching the goal, and while it does take more steps on average 14, it showcases the power of informed decision-making in navigating the FrozenLake environment. This comparison highlights the importance of employing effective policies in reinforcement learning, as a well-designed policy can significantly enhance the agent's ability to achieve its objectives within the given environment.

In [165]:
# env.render_mode = "rgb_array"
env = RecordVideo(gym.make("FrozenLake-v1", render_mode="rgb_array", is_slippery = True))
for i in range(10):
    states, actions, rewards, dones = sample_episode(env, greedy_policy)
display(env.video())

AssertionError: 

# Policy learning

In this section we will start learning policies.

We will then implement the following policy learning methods:
- policy iteration
- SARSA
- Q-Learning

## Exercise : Policy iteration
By alternating between policy evaluation and policy improvement, find an optimal policy.

Print the scores of each intermediate policy and comment on how the metrics evolve.

In [24]:
def policy_learn_iteration(env, initial_policy, policy_evaluation_function,
                           n_episodes_value, n_episodes_score, n_iterations):
    policy = initial_policy

    ### BEGIN SOLUTION
    # Score and print the initial policy
    goal_reach_rate, avg_steps_to_goal = score_policy(env, gamma, policy, n_episodes_score)
    print(f"Initial policy: {goal_reach_rate:.2%} / {avg_steps_to_goal:.2f}")

    prev_q = None

    for i in range(n_iterations):

        # Policy evaluation
        q = policy_evaluation_function(env, gamma, policy, n_episodes_value)

        # Keep track of the action-value function change
        # (sum of absolute difference between the previous
        #  and next action-value funcitons)
        # Note that at the first step we will report a change of nan
        # since we still don't have a previous action-value function

        if prev_q is None:
            value_change = np.nan
        else:
            value_change = np.sum(np.abs(prev_q - q))
        prev_q = q

        # Policy improvement
        greedy_policy = GreedyPolicy(env.action_space, q)

        # Policy scoring
        goal_reach_rate_greedy, avg_steps_to_goal_greedy = score_policy(env, gamma, greedy_policy, n_episodes_score)
        print(f"Policy (iter: {i}): {goal_reach_rate_greedy:.2%} / {avg_steps_to_goal_greedy:.2f}, Value change: {value_change:.2f}")

        # Update policy
        policy = greedy_policy

env = gym.make("FrozenLake-v1", render_mode=None)
uniform_policy = UniformPolicy(env.action_space)

policy_learn_iteration(env, uniform_policy, actionvalue_montecarlo_every,
                       n_episodes_value=100, n_episodes_score=1000,
                       n_iterations=30)

Initial policy: 1.10% / 9.73
Policy (iter: 0): 14.70% / 30.32, Value change: nan
Policy (iter: 1): 18.20% / 28.22, Value change: 4.42
Policy (iter: 2): 17.20% / 27.80, Value change: 0.89
Policy (iter: 3): 16.50% / 26.75, Value change: 0.17
Policy (iter: 4): 18.40% / 28.68, Value change: 0.30
Policy (iter: 5): 17.30% / 27.48, Value change: 0.30
Policy (iter: 6): 18.40% / 27.65, Value change: 0.55
Policy (iter: 7): 19.40% / 29.45, Value change: 0.43
Policy (iter: 8): 17.20% / 26.80, Value change: 0.32
Policy (iter: 9): 17.50% / 27.77, Value change: 0.20
Policy (iter: 10): 18.10% / 27.56, Value change: 0.19
Policy (iter: 11): 16.70% / 27.71, Value change: 0.49
Policy (iter: 12): 18.40% / 28.01, Value change: 0.58
Policy (iter: 13): 18.80% / 29.00, Value change: 0.59
Policy (iter: 14): 16.90% / 30.08, Value change: 0.69
Policy (iter: 15): 15.50% / 29.10, Value change: 0.47
Policy (iter: 16): 16.60% / 27.33, Value change: 0.52
Policy (iter: 17): 18.20% / 28.77, Value change: 0.43
Policy (it

**Discussion:**

 From the result, the random policy had a success rate of just 1.10% with an average of 9.73 steps to reach the goal. As we progressed through the iterations, we observed a significant improvement in the polic's performance. By the first iteration, the success rate increased to 14.70%, showcasing the initial impact of policy improvement. Subsequent iterations continued to refine the policy with the success rate fluctuating but generally showing an upward trend. By the end of the process, the success rate reached 20.40%, almost doubling the initial performance, while the average steps to the goal reduced to 28.10. This iterative approach effectively demonstrated the power of refining policies by alternating between evaluation and improvement steps gradually converging towards an optimal policy that better navigates the FrozenLake environment.

## Exercise : Epsilon-greedy policy

One of the issues with the greedy policy is that some possible trajectories may never be visited depending on the initial estimation of the value function.

To avoid this create an epsilon-greedy policy. These policies act differently when running in training mode and evaluation mode. In evaluation mode they act as a greedy policy. In training mode:
- with probability epsilon, uniformly selects an action
- else selects the action with maximum value (greedy policy)

Implement the `EpsilonGreedyPolicy` class.


In [25]:
class EpsilonGreedyPolicy(GreedyPolicy):
    def __init__(self, action_space, q, epsilon,
                 epsilon_decay=1, epsilon_min=0):
        super().__init__(action_space, q)
        self.action_space = action_space
        self.q = q
        self.epsilon_start = epsilon
        self.epsilon_decay = epsilon_decay
        self.epsilon_min = epsilon_min
        self.epsilon = self.epsilon_start
        self.training = True  # Flag to indicate training mode

    def sample(self, state):
        """### BEGIN SOLUTION"""
        if self.training:
            # In training mode
            if np.random.rand() < self.epsilon:
                # Explore: Randomly select an action with probability epsilon
                action = self.action_space.sample()
            else:
                # Exploit: Select the greedy action with probability 1-epsilon
                action = np.argmax(self.q[state])
        else:
            # In evaluation mode, act as a greedy policy
            action = np.argmax(self.q[state])

        """### END SOLUTION"""
        return action

    def begin_episode(self, episode_index):
        # Start of an episode
        self.epsilon = self.epsilon_start * (self.epsilon_decay ** episode_index)
        self.epsilon = max(self.epsilon, self.epsilon_min)

# Instantiate a policy
dummy = EpsilonGreedyPolicy(env.action_space, q_mc_everyvisit, epsilon=0.5)

# Sample 20 actions from state 0 in train mode
dummy.train()
actions_train = [dummy.sample(0) for i in range(20)]
print("Actions in Train Mode:", actions_train)

# Set the policy back to eval mode
dummy.eval()

# Sample 20 actions from state 0 in eval mode
actions_eval = [dummy.sample(0) for i in range(20)]
print("Actions in Eval Mode:", actions_eval)

Actions in Train Mode: [3, 0, 1, 0, 0, 0, 0, 0, 3, 3, 0, 0, 0, 1, 0, 0, 0, 0, 2, 0]
Actions in Eval Mode: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]


**Discussion:**
The implementation of the EpsilonGreedyPolicy class with training and evaluation modes adds a valuable layer of exploration to the policy. In training mode, where the agent is still learning, the policy exhibits the desired behavior of balancing exploration and exploitation. The actions in training mode show that with a certain probability (controlled by epsilon), the policy uniformly selects actions, ensuring that various trajectories are explored, even if they may not have the maximum estimated value. This exploration is crucial for the agent to discover optimal strategies and avoid getting stuck in suboptimal paths. On the other hand, in evaluation mode, when the agent is expected to act greedily, the policy seamlessly transitions to selecting the action with the maximum value, as indicated by the actions in eval mode. This dual-mode behavior enhances the policy's adaptability, making it well-suited for the dynamic learning process of reinforcement learning tasks like navigating the FrozenLake environment.

## Exercise : SARSA

State–action–reward–state–action (SARSA) is a method to learn a policy leveraging the TD estimation of action-value functions and an epsilon-greedy policy.

Since it is based on TD, it does not require full episodes to train, the policy can be improved at each step. We will therefore **not use the `sample_episode`** and reimplement here a similar loop.

Implement the SARSA algorithm, in this case it will be passed a policy of type `EpsilonGreedyPolicy` which stores the action-value (or Q-table). We may access and modify if using `policy.q`.

Score the trained policy.

In [26]:
def policy_learn_sarsa(env, policy, gamma, n_episodes, alpha, max_n_steps=1000, print_every=500):
        # No need to create the tables of action-values since they are stored directly in the policy

    # While we haven't reached the desired number of steps
    for episode in range(n_episodes):
        # Call the policy begin_episode so it can handle epsilon decay
        policy.begin_episode(episode)

        # Print every couple episodes
        if not episode % print_every:
            print(f'ep: {episode}, epsilon: {policy.epsilon:.3f}')

        # Get initial state
        state = env.reset()

        # Take the first action acording to the policy
        action = policy.sample(state)

        # While we haven't reached the maximum number of steps for the episode
        for step in range(max_n_steps):
            """### BEGIN SOLUTION"""
            # Perform a step of the environment
            state_next, reward, done, info = env.step(action)

            # Set as done if terminated or truncated
            # If episode has finished
            if done:
                # Update the action-value table and leave the loop
                policy.q[state][action] += alpha * (reward - policy.q[state][action])
                break

            # Sample the next action
            action_next = policy.sample(state_next)

            # Update the action-value table
            policy.q[state][action] += alpha * (reward + gamma * policy.q[state_next][action_next] - policy.q[state][action])

            """### END SOLUTION"""

            # Set the current state and action
            state = state_next
            action = action_next

# Create an initial Q-table
env = gym.make("FrozenLake-v1", render_mode=None)

q_initial = np.zeros((env.observation_space.n, env.action_space.n))
sarsa_policy = EpsilonGreedyPolicy(env.action_space, q_initial,
                                   epsilon=1, epsilon_decay=0.999, epsilon_min=0.01)

# SARSA training parameters
gamma = 0.9
n_episodes = 5000
alpha = 0.2
n_episodes_score = 1000

"""### BEGIN SOLUTION"""
# Train the policy
policy_learn_sarsa(env, sarsa_policy, gamma, n_episodes, alpha)
"""### END SOLUTION"""

"""### BEGIN SOLUTION"""
# Evaluate the policy
def evaluate_policy(env, policy, n_episodes):
    episodes_to_goal = 0
    steps_to_goal = []

    for episode in range(n_episodes):
        state = env.reset()
        done = False
        episode_steps = 0

        while not done:
            action = policy.sample(state)
            state, reward, done, _ = env.step(action)
            episode_steps += 1

        if reward > 0:
            episodes_to_goal += 1
            steps_to_goal.append(episode_steps)

    avg_episodes_to_goal = episodes_to_goal / n_episodes
    avg_steps_to_goal = np.mean(steps_to_goal)

    return avg_episodes_to_goal, avg_steps_to_goal

avg_episodes_to_goal, avg_steps_to_goal = evaluate_policy(env, sarsa_policy, n_episodes_score)
"""### END SOLUTION"""

print(f'Policy SARSA: {avg_episodes_to_goal:.2%} / {avg_steps_to_goal:.2f}')


ep: 0, epsilon: 1.000
ep: 500, epsilon: 0.606
ep: 1000, epsilon: 0.368
ep: 1500, epsilon: 0.223
ep: 2000, epsilon: 0.135
ep: 2500, epsilon: 0.082
ep: 3000, epsilon: 0.050
ep: 3500, epsilon: 0.030
ep: 4000, epsilon: 0.018
ep: 4500, epsilon: 0.011
Policy SARSA: 24.60% / 26.55


**Discussion:**
 SARSA, being a TD-based method, allowed for iterative policy improvement at each step, which is particularly advantageous in environments where full episodes arent required for learning. The training process, as indicated by the epsilon values, showed a gradual reduction in exploration over episodes, gradually favoring exploitation as the agent's confidence in its policy increased. We can see Sarsa gives much better result in terms of percentage of time the agent reaches the goal compared to the normal policy learning. Also because here we are using the eps greedy policy. The agent will try to reach every state. But for greedy policy the agent might never reach some state as it doesnot explore. The SARSA policy managed to achieve a success rate of 24%, with an average of 26 steps to reach the goal. This outcome signifies the effectiveness of the SARSA algorithm in learning and improving policies incrementally, demonstrating its utility in reinforcement learning tasks where online learning and exploration are crucial.

## Exercise : Q-learning

Q-learning is very similar to SARSA. **SARSA is an on-policy learning method** in which the action-values are updated following the same policy. In SARSA the action-values are updated using the value of the next state and next action taken.

Q-learning is off-policy, it does not assume the same policy when updating the action-values, instead it assumes an optimal policy by using the maximum value of the next state.

Implement a modified version of `policy_learn_sarsa` that performs Q-learning.

In [80]:
def policy_learn_qlearn(env, policy, gamma, n_episodes, alpha, max_n_steps=1000, print_every=500):
    # No need to create the tables of action-values since they are stored directly in the policy

    # While we haven't reached the desired number of steps
    for episode in range(n_episodes):
        # Call the policy begin_episode so it can handle epsilon decay
        policy.begin_episode(episode)

        # Print every couple episodes
        if not episode % print_every:
            print(f'ep: {episode}, epsilon: {policy.epsilon:.3f}')

        # Get initial state
        state = env.reset()

        # Take the first action acording to the policy
        action = policy.sample(state)

        # While we haven't reached the maximum number of steps for the episode
        for step in range(max_n_steps):
            """### BEGIN SOLUTION"""
            # Perform a step of the environment
            action = policy.sample(state)
            state_next, reward, done, info = env.step(action)

            # Set as done if terminated or truncated
            # If episode has finished
            if done:
                # Update the action-value table and leave the loop
                policy.q[state][action] += alpha * (reward - policy.q[state][action])
                break

            # Sample the next action
            action_next = np.argmax(policy.q[state_next])
            # Update the action-value table
            policy.q[state][action] += alpha * (reward + gamma * policy.q[state_next][action_next] - policy.q[state][action])
            """### END SOLUTION"""

            # Set the current state and action
            state = state_next
            action = action_next
    return

# Create an initial Q-table
env = gym.make("FrozenLake-v1", render_mode=None)

q_initial = np.zeros((env.observation_space.n, env.action_space.n))
qlearn_policy = EpsilonGreedyPolicy(env.action_space, q_initial,
                                    epsilon=1, epsilon_decay=0.999, epsilon_min=0.01)

# Q-learning training parameters
gamma = 0.9
n_episodes = 5000
alpha = 0.2
n_episodes_score = 1000

"""### BEGIN SOLUTION"""
# Train the policy
qlearn_policy.train()
policy_learn_qlearn(env, qlearn_policy, gamma, n_episodes, alpha)
"""### END SOLUTION"""

"""### BEGIN SOLUTION"""
# Evaluate the policy
# Evaluate the policy
qlearn_policy.eval()
avg_episodes_to_goal, avg_steps_to_goal = evaluate_policy(env, qlearn_policy, n_episodes_score)
"""### END SOLUTION"""

print(f'Policy Q-learn: {avg_episodes_to_goal:.2%} / {avg_steps_to_goal:.2f}')


ep: 0, epsilon: 1.000
ep: 500, epsilon: 0.606
ep: 1000, epsilon: 0.368
ep: 1500, epsilon: 0.223
ep: 2000, epsilon: 0.135
ep: 2500, epsilon: 0.082
ep: 3000, epsilon: 0.050
ep: 3500, epsilon: 0.030
ep: 4000, epsilon: 0.018
ep: 4500, epsilon: 0.011
Policy Q-learn: 51.90% / 31.00


**Discussion:** Unlike SARSA which follows the same policy for action-value updates, Q-learning is off-policy and assumes an optimal policy by selecting the action with the maximum value in the next state. The training process characterized by the decreasing epsilon values, revealed the gradual transition from exploration to exploitation as I learned. The result was quite impressive, with a Q-learning policy achieving a remarkable success rate almost 52%, and an average of 31 steps to reach the goal. This outcome underscores the strength of Q-learning in learning and optimizing policies efficiently, even when I didn't adhere to the same policy throughout the learning process. The ability to off-policy learning, coupled with the exploration strategy, allowed Q-learning to excel in the challenging FrozenLake environment, highlighting its significance in reinforcement learning tasks.

## (Optional) Exercise : Train for Taxi

Train an epsilon-greedy policy using Q-learning on the `Taxi-v3` environment.

Score the performance of this policy and compare it to a uniform policy.

In [28]:
# Create the Taxi-v3 environment
env = gym.make("Taxi-v3")

# Q-learning training parameters
q_initial = np.zeros((env.observation_space.n, env.action_space.n))
qlearn_policy = EpsilonGreedyPolicy(env.action_space, q_initial, epsilon=1, epsilon_decay=0.999, epsilon_min=0.01)

gamma = 0.9
n_episodes = 5000
alpha = 0.2
n_episodes_score = 1000

# Train the epsilon-greedy policy using Q-learning
policy_learn_qlearn(env, qlearn_policy, gamma, n_episodes, alpha)

# Evaluate the epsilon-greedy policy
avg_episodes_to_goal_qlearn, avg_steps_to_goal_qlearn = evaluate_policy(env, qlearn_policy, n_episodes_score)

# Create a uniform policy for comparison
uniform_policy = EpsilonGreedyPolicy(env.action_space, q_initial, epsilon=1.0)

# Evaluate the uniform policy
avg_episodes_to_goal_uniform, avg_steps_to_goal_uniform = evaluate_policy(env, uniform_policy, n_episodes_score)

# Compare the performance of both policies
print(f'Policy Q-learn: {avg_episodes_to_goal_qlearn:.2%} / {avg_steps_to_goal_qlearn:.2f}')
print(f'Policy Uniform: {avg_episodes_to_goal_uniform:.2%} / {avg_steps_to_goal_uniform:.2f}')


ep: 0, epsilon: 1.000
ep: 500, epsilon: 0.606
ep: 1000, epsilon: 0.368
ep: 1500, epsilon: 0.223
ep: 2000, epsilon: 0.135
ep: 2500, epsilon: 0.082
ep: 3000, epsilon: 0.050
ep: 3500, epsilon: 0.030
ep: 4000, epsilon: 0.018
ep: 4500, epsilon: 0.011
Policy Q-learn: 100.00% / 13.00
Policy Uniform: 3.90% / 136.74


## (Optional) Exercise : Render Taxi

Run 3 episodes of a Q-learn trained policy on `Taxi-v3` this time rendering the result.

In [37]:
### BEGIN SOLUTION
env = RecordVideo(gym.make("Taxi-v3", render_mode="rgb_array"))
env.reset()

for _ in range(100):
    observation, reward, terminated, truncated, info = env.step(env.action_space.sample())

    done = terminated or truncated

    if done:
        env.reset()

display(env.video())

### END SOLUTION

<IPython.core.display.Javascript object>

In [59]:
import gym
import numpy as np

# Create the Taxi-v3 environment
env = gym.make("Taxi-v3")

# Q-learning training parameters
num_states = env.observation_space.n
num_actions = env.action_space.n
q_table = np.zeros((num_states, num_actions))  # Initialize Q-table with zeros
gamma = 0.9  # Discount factor
alpha = 0.2  # Learning rate
n_episodes = 5000
epsilon = 0.1

# Q-learning training loop
for episode in range(n_episodes):
    state = env.reset()
    done = False

    while not done:
        # Choose an action using epsilon-greedy policy (you can use your policy selection method)
        if np.random.rand() < epsilon:
            action = env.action_space.sample()  # Explore (random action)
        else:
            action = np.argmax(q_table[state])  # Exploit (choose action with max Q-value)

        # Take the chosen action
        next_state, reward, done, _ = env.step(action)

        # Update the Q-value using the Q-learning update rule
        q_table[state, action] += alpha * (reward + gamma * np.max(q_table[next_state]) - q_table[state, action])

        # Update the current state
        state = next_state

# Now, q_table contains the learned Q-values

# You can save the Q-table to a file for later use
np.save("q_table.npy", q_table)


  deprecation(
  deprecation(


In [60]:
import gym
import numpy as np

# Load the trained Q-table
q_table = np.load("q_table.npy")  # Replace with the path to your Q-table file

# Create the Taxi-v3 environment
env = gym.make("Taxi-v3")

# Number of episodes to render
num_episodes_to_render = 3

for episode in range(num_episodes_to_render):
    state = env.reset()
    done = False

    while not done:
        # Choose the action with the highest Q-value for the current state
        action = np.argmax(q_table[state])

        # Take the chosen action
        state, _, done, _ = env.step(action)

        # Render the current state (you may need to adjust this based on your rendering setup)
        env.render()

# Close the environment after rendering
env.close()


If you want to render in human mode, initialize the environment in this way: gym.make('EnvName', render_mode='human') and don't call the render method.
See here for more information: https://www.gymlibrary.ml/content/api/[0m
  deprecation(


In [88]:
env = gym.make("Taxi-v3", render_mode=None)
env = RecordVideo(gym.make("Taxi-v3", render_mode="rgb_array"))
qlearn_policy.eval()
env.reset()
for i in range(3):
    env.render()
    states, actions, rewards, dones = sample_episode(env, qlearn_policy)
env.close()

<IPython.core.display.Javascript object>

## (Bonus) Exercise : A observation space environment

Try to perform Q-learning on an environment (e.g. `CartPole-v1`) with continuous action and observation spaces.

You will need to discretize the observation space.

In [89]:
import numpy as np

class ObservationWrapper(gym.ObservationWrapper):
    '''
    n_states(int): number of state of the env.
                It has to be the some power of N = env.observation_space.shape[0]
    bounds(list of tuples): bounds of the states of the env.(taken from the gymnasium documentation).
                each tuple gives the bounds of the states
    '''
    def __init__(self, env, n_states=4096, bounds=[(-4.8,4.8),(-50,50),( -0.418, 0.418),(-20,20)]):
        super().__init__(env)

        self.n_states = n_states
        self.bounds = bounds
        assert isinstance(env.observation_space, gym.spaces.box.Box)

        # Get the dimensions of the observation space
        self.dimension = env.observation_space.shape[0]
        self.states_per_dimention = int(n_states**(1/self.dimension)) #we give equal number of state per dimension

        # calculating steps for each state and storing in a list
        self.step_list = []
        for state_bound in bounds:
            step = (state_bound[1] - state_bound[0])/self.states_per_dimention
            self.step_list.append(step)


    def get_quant_observation(self, new_obs):
        ##calculate the new quantized state index from a list of states.  (like converting number systems)
        #observations are the digits and states per dimention is the base of the number system.
        quantized_obs = 0
        dimension = self.dimension
        for i, obs in enumerate(new_obs):
            quantized_obs+= obs*self.states_per_dimention**(dimension-i-1)

        # print(quantized_obs)
        return int(quantized_obs)

    def observation(self, obs):
        ### BEGIN SOLUTION
        # Quantize the observations (states)
        new_obs = []
        # get the quanta of each state
        for i,observation_state in enumerate(obs):
            new_obs.append(int((observation_state-self.bounds[i][0])/self.step_list[i]))

        # Once quantized each dimension compute a single observation index
        return self.get_quant_observation(new_obs)




In [92]:
# Example of rendering and showing a CartPole-v1 environment
env = RecordVideo(gym.make("CartPole-v1", render_mode="rgb_array"))
print(env.action_space)
print(env.observation_space)

observation, info = env.reset()

while True:
    env.render()

    action = env.action_space.sample()

    state_next, reward, terminated, truncated = env.step(action)

    done = terminated or truncated

    if done:
      break;

env.close()

Discrete(2)
Box([-4.8000002e+00 -3.4028235e+38 -4.1887903e-01 -3.4028235e+38], [4.8000002e+00 3.4028235e+38 4.1887903e-01 3.4028235e+38], (4,), float32)


<IPython.core.display.Javascript object>

In [105]:
import numpy as np
import gym

class ObservationWrapper(gym.ObservationWrapper):
    def __init__(self, env, n_states=4096, bounds=(-50, 50)):
        super().__init__(env)
        self.n_states = n_states
        assert isinstance(env.observation_space, gym.spaces.Box)

        # BEGIN SOLUTION
        self.bounds = bounds
        self.n_bins = int(np.round(n_states ** (1. / env.observation_space.shape[0])))
        self.bin_width = (bounds[1] - bounds[0]) / self.n_bins

        # Define the bins to discretize based on the bounds and self.n_bins
        self.bins = [np.linspace(bounds[0], bounds[1], self.n_bins) for _ in range(env.observation_space.shape[0])]
        # END SOLUTION

    def observation(self, obs):
        # BEGIN SOLUTION
        # Quantize the observations (states)
        quantized_obs = np.zeros(obs.shape)
        for i, val in enumerate(obs):
            quantized_obs[i] = np.digitize(val, self.bins[i]) - 1  # -1 to convert to 0-indexed

        # Once quantized each dimension compute a single observation index
        new_obs = sum([quantized_obs[i] * (self.n_bins ** i) for i in range(len(obs))])
        # END SOLUTION
        return int(new_obs)

# Initialize environment, wrap to render
env = gym.make("CartPole-v1", render_mode="rgb_array")

# Wrap to discretize the observation space
env = ObservationWrapper(env)

# BEGIN SOLUTION
# Initialize policy
q_table = np.zeros((env.n_states, env.action_space.n))
epsilon = 1.0  # Exploration rate
epsilon_decay = 0.995
epsilon_min = 0.01
alpha = 0.1  # Learning rate
gamma = 0.99  # Discount factor
n_episodes = 10000

# Train the policy
for episode in range(n_episodes):
    state = env.reset()
    done = False
    while not done:
        if np.random.random() < epsilon:
            action = env.action_space.sample()  # Explore
        else:
            action = np.argmax(q_table[state])  # Exploit

        next_state, reward, done, info = env.step(action)
        next_best_action = np.argmax(q_table[next_state])
        q_table[state, action] += alpha * (reward + gamma * q_table[next_state, next_best_action] - q_table[state, action])
        state = next_state
        if done:
            epsilon = max(epsilon * epsilon_decay, epsilon_min)

# END SOLUTION


  deprecation(
  deprecation(


In [166]:
env = gym.make("CartPole-v1", render_mode=None)
env = RecordVideo(gym.make("CartPole-v1", render_mode="rgb_array"))
qlearn_policy.eval()
env.reset()
for i in range(3):
    env.render()
    states, actions, rewards, dones = sample_episode(env, qlearn_policy)
env.close()

AssertionError: 