<a href="https://colab.research.google.com/github/AndreasKing-Goks/MIR_Reinforcement-Learning/blob/main/TP_02_FrozenLake_policy_evaluation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# MDP and gym

## Evaluation considerations
- We take into account the correctness of the solutions but also their generality and quality of the code
- Comment and discuss on the results of all your exercises (in a cell immediately after the results). You may also state the difficulties encountered, lessons learned and your understanding of the problem and solution
- Clean-up your code before submission, do not leave unnecessary code attempts, or if you deem it important, leave them in a way that it is easily understood and with comments/discussion
- We also value the originality of the solutions, don't hesitate in performing unrequested additional tasks in relation to the exercises


**NOTE**

Do not try to reproduce exactly the results in this notebook. RL training is very noisy and performances of learned policies can vary a lot, try running your trains several times.



In [None]:
!pip install gymnasium
!pip install gymnasium[accept-rom-license,toy_text]

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
import gymnasium as gym
from IPython.display import clear_output, HTML, display
import matplotlib.pyplot as plt
%matplotlib notebook

In [None]:
#@title Wrapper for recording an environment into a video

from __future__ import annotations

from copy import deepcopy
from typing import Any, SupportsFloat

from gymnasium.core import ActType, ObsType, RenderFrame, WrapperActType, WrapperObsType
from gymnasium.error import DependencyNotInstalled

class RecordVideo(gym.Wrapper):
    """Adapted from https://github.com/Farama-Foundation/Gymnasium/blob/main/gymnasium/experimental/wrappers/rendering.py#L87
    """

    def __init__(self, env):
        """Initialize a :class:`HumanRendering` instance.
        Args:
            env: The environment that is being wrapped
        """
        super().__init__(env)
        assert env.render_mode in [
            "rgb_array",
            "rgb_array_list",
        ], f"Expected env.render_mode to be one of 'rgb_array' or 'rgb_array_list' but got '{env.render_mode}'"

        if "render_fps" not in env.metadata:
            env.metadata["render_fps"] = 24

        assert (
            "render_fps" in env.metadata
        ), "The base environment must specify 'render_fps' to be used with the HumanRendering wrapper"

        if "human" not in self.metadata["render_modes"]:
            self.metadata = deepcopy(self.env.metadata)
            self.metadata["render_modes"].append("human")

        self.artists = []
        self.figure = None

    @property
    def render_mode(self):
        """Always returns ``'human'``."""
        return "human"

    def step(
        self, action: WrapperActType
    ) -> tuple[WrapperObsType, SupportsFloat, bool, bool, dict]:
        """Perform a step in the base environment and render a frame to the screen."""
        result = super().step(action)
        self._render_frame()
        return result

    def reset(
        self, *, seed: int | None = None, options: dict[str, Any] | None = None
    ) -> tuple[WrapperObsType, dict[str, Any]]:
        """Reset the base environment and render a frame to the screen."""
        result = super().reset(seed=seed, options=options)
        self._render_frame()
        return result

    def video(self):
        """This method renders all frames collected up to now."""
        if self.figure is not None:
            from IPython.display import HTML
            import matplotlib.animation

            animation = matplotlib.animation.ArtistAnimation(self.figure, self.artists, 
                                                             interval=1000//self.metadata["render_fps"],
                                                             blit=True,
                                                             repeat=True,
                                                             repeat_delay=2000)
            return HTML(animation.to_html5_video())

        return None

    def _render_frame(self):
        """Fetch the last frame from the base environment and render it to the screen."""
        try:
            import matplotlib.animation
            import numpy as np
        except ImportError:
            raise DependencyNotInstalled(
                "matplotlib is not installed, run `pip install matplotlib`"
            )
        if self.env.render_mode == "rgb_array_list":
            rgb_arrays = self.env.render()
        elif self.env.render_mode == "rgb_array":
            rgb_arrays = [self.env.render()]
        else:
            raise Exception(
                f"Wrapped environment must have mode 'rgb_array' or 'rgb_array_list', actual render mode: {self.env.render_mode}"
            )

        assert isinstance(rgb_arrays, list)

        for rgb_array in rgb_arrays:
            assert isinstance(rgb_array, np.ndarray)

        if self.figure is None:
            self.figure = plt.figure()
            plt.axis('off')
        
        self.artists.append([plt.imshow(rgb_array) for rgb_array in rgb_arrays])

    def close(self):
        """Close the rendering window."""
        result = self.video()
        super().close()

        return result

In [None]:
import numpy as np
np.set_printoptions(linewidth=100)

Let's render soe steps to show how to use the `RecordVideo` class.

In [None]:
env = RecordVideo(gym.make("FrozenLake-v1", render_mode="rgb_array"))
env.reset()                    

for _ in range(100):
    observation, reward, terminated, truncated, info = env.step(env.action_space.sample())

    done = terminated or truncated

    if done:
        env.reset()

display(env.video())

<IPython.core.display.Javascript object>

This is only needed when we want to render. In many situations of this lab we will perform many steps without the need to render, we will avoid recording a video, since doing so requires a lot of RAM (to save all the RGB frames) and we may run out of memory.

To avoid this do not wrap the environment on a `RecordVideo` class and set keyword argument `render_mode=None` when making the environment.

In [None]:
env = gym.make("FrozenLake-v1", render_mode=None)
env.reset()                    

for _ in range(100):
    observation, reward, terminated, truncated, info = env.step(env.action_space.sample())

    done = terminated or truncated

    if done:
        env.reset()

In this lab we will be estimating value functions. Value functions of an MDP are conditioned on the policy used.

A policy is the probability distribution over actions. We will restrict ourselves to discrete action spaces.

We will start by using a uniform policy that chooses with equal chances between all actions.

## Exercise : A uniform policy

Create a `policy_uniform` function that returns the probability of each possible action given an environment and state.

We will also create a `sample_multinomial` function that facilitates sampling an action from the policy given the probability of each possible action.

**Hint** you may access the number of discrete actions with `env.action_space.n`.

In [None]:
class UniformPolicy(object):
    def __init__(self, action_space):
        assert isinstance(action_space, gym.spaces.discrete.Discrete), "Can only create uniform policies for Discrete action spaces"
        self.n_actions = action_space.n
        self.training = True
        self.action_space = action_space

    def train(self):
        self.training = True

    def eval(self):
        self.training = False

    def probability(self, state, action):
        ### BEGIN SOLUTION
        action_prob = 1/self.n_actions
        ### END SOLUTION

        return action_prob

    def sample(self, state):
        ### BEGIN SOLUTION
        action = self.action_space.sample()
        ### END SOLUTION

        return action

# Let's instantiate a uniform policy
env = RecordVideo(gym.make("FrozenLake-v1", render_mode="rgb_array"))
uniform_policy = UniformPolicy(env.action_space)

# And sample 20 actions from state==0
actions = [uniform_policy.sample(0) for i in range(20)]
print(actions)

## Comment:
# Uniform Policy will returns all possible action with the same probability.

[0, 0, 1, 0, 2, 1, 1, 0, 2, 1, 2, 1, 0, 2, 3, 3, 2, 1, 0, 1]


Gym follows a formalism of MDPs that differs slightly (is more general) than the one we have seen in class. The differences are:
- In gym, transitions from a particular state and action can lead to the same destination state with different rewards (in the course all transitions reaching the same state obtain the same reward, except for transitions departing from a terminal state, whose rewards can be ignored)
- In gym, transitions to the same state can be both terminal and non-terminal (in the course it's the states that are terminal)

However we have chosen the environment `FrozenLake` which behaves as the MDPs described in the course, thus:
- All transitions to a given state have the same reward (unless the departure and destination state is the same terminal state)
- All transitions to a terminal state are terminal

In bym `env.env.P` contains the information about the process dynamics, rewards and terminal states. The information is encoded in the following way:

`env.env.P[state][action]` returns a list of tuples containing all the possible outcomes and their probability in the form `(prob, next_state, reward, terminal)` where `state` is the current state, `action` is the action taken, `probability` is the probability of the transition, `next_state` is the destination state of the transition, `reward` is the reward obtained for the transition and `terminal` is a boolean indicating if the transition ends the episode.

## Exercise : MDP from gym environment

Using this information compute the matrices $\mathcal{P}_{ss'}$, $\mathcal{R}_s$ and `terminal_states` of your MDP. Terminal is a boolean vector containing `True` for terminal states and `False` otherwise.

We will use the uniform policy that we created before, that assigns equal probability to all actions.

**Hint** all the transitions from a terminal state must be ignored.

**Optionally** while computing the matrices verify with `assert` that the conditions that we described above are fulfilled.

In [None]:
def compute_P_and_R(env, policy):
    # Initialize number of possible states and actions according to the gymnasium environment
    n_states = env.observation_space.n
    n_actions = env.action_space.n

    # Intitalize Probability Matrix
    P = np.zeros((n_states, n_states))

    # Initialize Reward Matrix
    R = np.empty((n_states, 1))
    R[:] = np.nan

    # Initialize a vector for terminal states
    # we will initialize with nan
    terminal_states = np.empty(n_states)
    terminal_states[:] = np.nan

    ### BEGIN SOLUTION
    # Iterate over env.env.P.items()
    # env.env.P is a dictionary that maps from 
    # departure state to a dictionary of possible actions and subsequent transitions
    for current_state, states in enumerate(env.env.P.items()):
        action = states[1] # Inside the states tuple, the index of the action is 1

        # Iterate over all possible actions with equal probability
        for outcomes in action:
            outcome = action[outcomes] # Get the all action outcome from the action dictionaries

            # Obtain the probability of taking that action
            prob_of_action = 1 / n_actions

            # Iterate over all transitions for this action
            for transition in outcome:
                  # Following the outcome, we can get probability of transition, next state, reward, and terminal frag respectively
                  prob_of_transition = transition[0]
                  next_state = transition[1]

                  # If the next state is terminal flag it as such in terminal_states
                  if transition[3] == True:
                    terminal_states[next_state] = True

                  # If we are not departing from a terminal state
                  else:
                    terminal_states[next_state] = False

                  # Add to P the probability of the transition as:
                  # (probability of the action) * (probability of the transition)
                  P[current_state, next_state] += prob_of_action * prob_of_transition

                  # Set the reward in R
                  if R[next_state, 0] == 0 or np.isnan(R[next_state, 0]): # Check if the Reward has not been updated
                    R[next_state, 0] = transition[2]
                  else: # If it is already updated, ignore
                    continue

    ### END SOLUTION

    # Convert the terminal states vector to boolean to use in indexing
    terminal_states = terminal_states.astype(bool)

    return P, R, terminal_states

# Set up the gymnasium environment
env = RecordVideo(gym.make("FrozenLake-v1", render_mode="rgb_array"))

# Compute the Probability Matrix, Reward Matrix, and terminal states
P, R, terminal_states = compute_P_and_R(env, uniform_policy)

print(P)
print(R)
print(terminal_states)

## Comment
# From the Probability Matrix, If we sum every elements in the same row, we will obtain the total probability of 1
# From the reward matrix, the last element contain reward 1 because that is where the treasure located
# In the terminal states matrix, for every element with value True, it means that the particular state are the hole or the treasure

[[0.5  0.25 0.   0.   0.25 0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.  ]
 [0.25 0.25 0.25 0.   0.   0.25 0.   0.   0.   0.   0.   0.   0.   0.   0.   0.  ]
 [0.   0.25 0.25 0.25 0.   0.   0.25 0.   0.   0.   0.   0.   0.   0.   0.   0.  ]
 [0.   0.   0.25 0.5  0.   0.   0.   0.25 0.   0.   0.   0.   0.   0.   0.   0.  ]
 [0.25 0.   0.   0.   0.25 0.25 0.   0.   0.25 0.   0.   0.   0.   0.   0.   0.  ]
 [0.   0.   0.   0.   0.   1.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.  ]
 [0.   0.   0.25 0.   0.   0.25 0.   0.25 0.   0.   0.25 0.   0.   0.   0.   0.  ]
 [0.   0.   0.   0.   0.   0.   0.   1.   0.   0.   0.   0.   0.   0.   0.   0.  ]
 [0.   0.   0.   0.   0.25 0.   0.   0.   0.25 0.25 0.   0.   0.25 0.   0.   0.  ]
 [0.   0.   0.   0.   0.   0.25 0.   0.   0.25 0.   0.25 0.   0.   0.25 0.   0.  ]
 [0.   0.   0.   0.   0.   0.   0.25 0.   0.   0.25 0.   0.25 0.   0.   0.25 0.  ]
 [0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   1.   0.   0.   0.   0.  ]
 [0.

## Exercise : Sample an episode

Sample an episode with the uniform policy and return the lists of observations (states), actions, rewards and the done flags.

Notice that the list of states should have one more element (the initial state).

Test the function by rendering an episode. We will use `RecordVideo` to do that.

In [None]:
def sample_episode(env, policy, reset=True):
    states = []
    actions = []
    rewards = []
    dones = []

    ### BEGIN SOLUTION

    # If reset, we reset the environment and get an initial state
    # else we set the initial state to it's current state env.env.s
    if reset:
      obs, info = env.reset() # We are doing the reset to get intial observation
    else:
      obs = env.env.s # If we are not doing reset, we set the inital value to the current state

    # Collect the initial state
    states.append(obs)

    # While the episode has not finished
    for i in range(100):

        # Select an action
        action = policy.sample(obs)

        # Step the environment
        obs, reward, terminated, truncated, info = env.step(action)

        # The episode is done if it has been terminated or truncated
        done = terminated or truncated

        # Collect the state, reward and action taken
        states.append(obs)
        actions.append(action)
        rewards.append(reward)
        dones.append(done)

        # If the last episode is truncated or terminated, we stop the process
        if done:
            break

    ### END SOLUTION

    return states, actions, rewards, dones

# Set up the gymnasium environment
env = RecordVideo(gym.make("FrozenLake-v1", render_mode="rgb_array"))

# Initiate the Uniform Policy
uniform_policy = UniformPolicy(env.action_space)

# Sample the episode from  Uniform Policy
states, actions, rewards, dones = sample_episode(env, uniform_policy)

# Print the output
print(states)
print(actions)
print(rewards)
print(dones)

print(len(states), len(actions), len(rewards), len(dones))

# Display the video
display(env.video())

## Comment
# This function will create a series of action and it's outcome until the agent reach the terminal state

<IPython.core.display.Javascript object>

[0, 0, 0, 1, 5]
[0, 3, 3, 1]
[0.0, 0.0, 0.0, 0.0]
[False, False, False, True]
5 4 4 4


## Exercise : Returns of episode

For a sampled episode compute the return $G_t$ for all steps $t$.

**Hint** the return is easily computed backwards from the last step in the episode to the first.

In [None]:
def compute_returns(rewards, gamma):
    returns = np.zeros(len(rewards))

    ### BEGIN SOLUTION
    # Set the return for last state as the reward
    returns[-1] = rewards[-1]
    
    # Iterate over the rewards backward computing each return
    # using the previous return computed
    for t in range(len(rewards)-1):
      returns[-(t+2)] = returns[-(t+1)]*gamma

    ### END SOLUTION

    return returns

env = gym.make("FrozenLake-v1", render_mode="rgb_array")
uniform_policy = UniformPolicy(env.action_space)

# Now we will run until one of them has a positive reward
while True:
    states, actions, rewards, dones = sample_episode(env, uniform_policy)

    if np.sum(rewards) > 0:
        # Print the rewards
        print(rewards)

        # Compute and print the returns
        gamma = 0.9
        returns = compute_returns(rewards, gamma)
        print(returns)

        # Exit the loop
        break

[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0]
[0.10941899 0.12157665 0.13508517 0.15009464 0.16677182 0.18530202 0.20589113 0.22876792 0.25418658
 0.28242954 0.3138106  0.34867844 0.38742049 0.43046721 0.4782969  0.531441   0.59049    0.6561
 0.729      0.81       0.9        1.        ]


# Policy evaluation

In this section we will implement various value function estimation methods:
- Direct method using the previous P and R
- Naive (cheating) method that assumes possibility of setting an arbitrary initial state
- Monte Carlo methods
- Time-difference method

## Exercise : Direct method

Solve the Bellman equation, using the direct solution (matrix inversion).

**Note** that this method leads to miss-leading values for terminal states, since there is no way to indicate terminality using the Bellman equation without sampling.

In [None]:
def value_direct(env, gamma, policy):
    P, R, terminal_states = compute_P_and_R(env, policy)
    
    ### BEGIN SOLUTION
    v = np.linalg.inv(np.identity(len(R)) - (gamma * P)) @ R

    ### END SOLUTION

    # Unsqueeze the extra dimension
    v = v[:,0]

    return v

# Set up the gymnasium environment
env = gym.make("FrozenLake-v1", render_mode="rgb_array")

# Initiate the Uniform Policy
uniform_policy = UniformPolicy(env.action_space)

# Initiate the parameters
gamma = 0.9

# Calculate the state value and print it
v_direct = value_direct(env, gamma, uniform_policy)
print(v_direct)

[ 4.02953462e-02  3.80021094e-02  9.06008086e-02  3.70639671e-02  6.04976257e-02 -1.08110653e-15
  2.37003375e-01  0.00000000e+00  1.68085365e-01  5.18463074e-01  9.62747525e-01  0.00000000e+00
  4.55649547e-16  1.17344744e+00  3.52341144e+00  1.00000000e+01]


## (Optional) Exercise : Verify the solution

Check if the computed value function $V$ fulfills the Bellman equation. You may use `np.allclose(a, b)` to check if all elements in `a` and `b` are close up to a numerical error.

In [None]:
### BEGIN SOLUTION

### END SOLUTION

For the following we must set the value function of terminal states to 0 in order to keep our value function comparable to those computed by methods using episode sampling.

In [None]:
# To compare to the following methods we must set the value of terminal states to 0
v_direct[terminal_states] = 0
print(v_direct)

[0.04029535 0.03800211 0.09060081 0.03706397 0.06049763 0.         0.23700338 0.         0.16808536
 0.51846307 0.96274753 0.         0.         1.17344744 3.52341144 0.        ]


## Exercise : Naive (cheating) method

Compute the value function for each state by cheating (changing the starting state and computing the average return from the start).

In [None]:
def value_naive(env, gamma, policy, n_episodes):
    ### BEGIN SOLUTION

    # Initialize values and counts tables (one cell per state)
    n_state = env.observation_space.n
    values = np.zeros((n_state,1), dtype=float)

    # Compute the number of episodes per state
    N = np.zeros((n_state,1))

    # For each initial state
    for i in range(n_state): # With this line, we let the agent position itself on every states possible and do the episode.

        # For each episode
        for episode in range(n_episodes):
            # Reset the environment
            init_obs, info = env.reset()

            # Set the initial state
            env.env.s = i

            # Sample an episode 
            # (without reseting the environment to avoid changing the initial state)
            states, actions, rewards, dones = sample_episode(env, policy)

            # Compute the returns
            returns = compute_returns(rewards, gamma)

            # Accumulate the return of the initial state
            values[i] += returns[0]


        # Divide the accumulated value by the number of episodes
        value = np.divide(values, n_episodes)
    ### END SOLUTION
    
    return value

def compute_value_error(v_est, v_ref):
    diff = (v_est - v_ref)
    return np.mean(diff), np.std(diff)

# Set up the gymnasium environment
env = gym.make("FrozenLake-v1", render_mode=None)

# Initiate the Uniform Policy
uniform_policy = UniformPolicy(env.action_space)

# Initiate the parameters
gamma = 0.9
n_episodes = 10000

# Calculate the state value and print it
v_naive = value_naive(env, gamma, uniform_policy, n_episodes)
print(v_naive)

# Compare the state value of Cheating Method and Direct Method
print(compute_value_error(v_naive, v_direct))

[[0.0041885 ]
 [0.00403716]
 [0.00428444]
 [0.00487917]
 [0.00395951]
 [0.00424266]
 [0.00449242]
 [0.00551316]
 [0.00329758]
 [0.00410473]
 [0.00480844]
 [0.0044981 ]
 [0.00405399]
 [0.00450645]
 [0.00505935]
 [0.00413556]]
(-0.42372230321703735, 0.8716185363061932)


## Exercise : Monte Carlo first visit method
Compute the value function for each state using the Monte carlo method "first visit"

In [None]:
def value_montecarlo_first(env, gamma, policy, n_episodes):
    ### BEGIN SOLUTION

    # Initialize values and counts tables (one cell per state)
    n_state = env.observation_space.n
    N = np.zeros((n_state,1))
    values  = np.zeros((n_state, 1), dtype=float)

    # For each episode
    for n in range(n_episodes):
        # Sample an episode and compute returns
        states, actions, rewards, dones = sample_episode(env, policy)
        returns = compute_returns(rewards, gamma)

        # Keep track of visited states
        visit_states = []

        # For each state and associated return
        for state, ret in zip(states, returns): # -1 for initial states fron env.reset()
            # If first visit
            if state not in visit_states:
                # Increment counts
                N[state] += 1

                # Accumulate returns
                values[state] += ret

                # Update the set of visited states
                visit_states.append(state)

    # Average the accumulated returns
    value = np.divide(values, N, out=np.zeros_like(N), where=N!=0)
    print(N)
    ### END SOLUTION
    
    return value

# Set up the gymnasium environment
env = gym.make("FrozenLake-v1", render_mode=None)

# Initiate the Uniform Policy
uniform_policy = UniformPolicy(env.action_space)

# Initiate the parameters
gamma = 0.9
n_episodes = 10000

# Calculate the state value and print it
v_mc_firstvisit = value_montecarlo_first(env, gamma, uniform_policy, n_episodes)
print(v_mc_firstvisit)

# Compare the state value of Monte Carlo First Visit Method and Direct Method
print(compute_value_error(v_mc_firstvisit, v_direct))

## Comment
# By doing Monte Carlo Every Visit, we will record how many time an agent firstly visit every state.

[[10000.]
 [ 6243.]
 [ 2634.]
 [ 1175.]
 [ 6223.]
 [    0.]
 [ 1371.]
 [    0.]
 [ 2673.]
 [ 1152.]
 [  702.]
 [    0.]
 [    0.]
 [  405.]
 [  323.]
 [    0.]]
[[0.0041581 ]
 [0.00395402]
 [0.00909533]
 [0.00375615]
 [0.0062396 ]
 [0.        ]
 [0.02508312]
 [0.        ]
 [0.01662599]
 [0.04907038]
 [0.09813634]
 [0.        ]
 [0.        ]
 [0.12421349]
 [0.35290559]
 [0.        ]]
(-0.3847737477684513, 0.8760189083216212)


## Exercise : Monte Carlo every visit method
Compute the value function for each state using the Monte carlo method "every visit"

In [None]:
def value_montecarlo_every(env, gamma, policy, n_episodes):
    ### BEGIN SOLUTION
    # Initialize values and counts tables (one cell per state)
    n_state = env.observation_space.n
    N = np.zeros((n_state,1))
    values  = np.zeros((n_state, 1), dtype=float)

    # For each episode
    for n in range(n_episodes):
        # Sample an episode and compute returns
        states, actions, rewards, dones = sample_episode(env, policy)
        returns = compute_returns(rewards, gamma)

        # For each state and associated return
        for state, ret in zip(states, returns): #states and returns will be the same dimension
            # Increment counts
            N[state] += 1

            # Accumulate returns
            values[state] += ret

    # Average the accumulated returns
    value = np.divide(values, N, out=np.zeros_like(N), where=N!=0)
    print(N)
    ### END SOLUTION
    
    return value

# Set up the gymnasium environment
env = gym.make("FrozenLake-v1", render_mode=None)

# Initiate the Uniform Policy
uniform_policy = UniformPolicy(env.action_space)

# Initiate the parameters
gamma = 0.9
n_episodes = 10000

# Calculate the state value and print it
v_mc_everyvisit = value_montecarlo_every(env, gamma, uniform_policy, n_episodes)
print(v_mc_everyvisit)

# Compare the state value of Monte Carlo Every Visit Method and Direct Method
print(compute_value_error(v_mc_everyvisit, v_direct))

## Comment
# By doing Monte Carlo Every Visit, we will record how many time an agent visit every state.

[[32579.]
 [12599.]
 [ 5719.]
 [ 2922.]
 [12321.]
 [    0.]
 [ 1656.]
 [    0.]
 [ 4689.]
 [ 1549.]
 [  916.]
 [    0.]
 [    0.]
 [  701.]
 [  536.]
 [    0.]]
[[0.00403148]
 [0.00459596]
 [0.01240392]
 [0.00462177]
 [0.00606001]
 [0.        ]
 [0.03390158]
 [0.        ]
 [0.01603629]
 [0.05327598]
 [0.11303882]
 [0.        ]
 [0.        ]
 [0.14215287]
 [0.41327184]
 [0.        ]]
(-0.3778892220899994, 0.8776186227400664)


## Exercise : Monte Carlo incremental

Implement the incremental Monte Carlo method.

In [None]:
def value_montecarlo_incremental(env, gamma, policy, n_episodes):
    ### BEGIN SOLUTION
    # Initialize values and counts tables (one cell per state)
    n_state = env.observation_space.n
    N = np.zeros((n_state,1))
    value  = np.zeros((n_state, 1), dtype=float)

    # For each episode
    for n in range(n_episodes):
        # Sample an episode and compute returns
        states, actions, rewards, dones = sample_episode(env, policy)
        returns = compute_returns(rewards, gamma)

        # For each state and associated return
        for state, ret in zip(states, returns): #states and returns will be the same dimension
            # Increment counts
            N[state] += 1 

            # Update value with return
            value[state] = value[state] + (ret-value[state])/N[state] # The increase of the state value function will be incrementally

    print(N)
    ### END SOLUTION
    return value

# Set up the gymnasium environment
env = gym.make("FrozenLake-v1", render_mode=None)

# Initiate the Uniform Policy
uniform_policy = UniformPolicy(env.action_space)

# Initiate the parameters
gamma = 0.9
n_episodes = 10000

# Calculate the state value and print it
v_mc_incremental = value_montecarlo_incremental(env, gamma, uniform_policy, n_episodes)
print(v_mc_incremental)

# Compare the state value of Monte Carlo Incremental Method and Direct Method
print(compute_value_error(v_mc_incremental, v_direct))

[[32918.]
 [12775.]
 [ 5554.]
 [ 2701.]
 [12507.]
 [    0.]
 [ 1628.]
 [    0.]
 [ 4584.]
 [ 1526.]
 [  934.]
 [    0.]
 [    0.]
 [  714.]
 [  560.]
 [    0.]]
[[0.0045778 ]
 [0.00506174]
 [0.01264461]
 [0.00636322]
 [0.0064693 ]
 [0.        ]
 [0.02990297]
 [0.        ]
 [0.02112469]
 [0.05958226]
 [0.10869087]
 [0.        ]
 [0.        ]
 [0.13635782]
 [0.38902537]
 [0.        ]]
(-0.37936358939960374, 0.8769414031665558)


## Exercise : Time-differences (TD) method

In [None]:
def value_td(env, gamma, policy, n_episodes, alpha=0.4):
    ### BEGIN SOLUTION
    # Initialize value table (one cell per state)
    n_state = env.observation_space.n
    N = np.zeros((n_state,1))
    value  = np.zeros((n_state, 1), dtype=float)

    # For each episode
    for n in range(n_episodes):
        # Sample an episode
        states, actions, rewards, dones = sample_episode(env, policy)

        # For each step in the episode
        for i, state in enumerate(states):
            # Update the value of the depart state with the current value, the value of the next state and the reward
            if i < len(rewards):
              value[state] = value[state] + alpha*(rewards[i] + gamma*value[states[i+1]] - value[state]) 

    ### END SOLUTION
    return value

# Set up the gymnasium environment
env = gym.make("FrozenLake-v1", render_mode=None)

# Initiate the Uniform Policy
uniform_policy = UniformPolicy(env.action_space)

# Initiate the parameters
gamma = 0.9
n_episodes = 10000
alpha = 0.4

# Calculate the state value and print it
v_td = value_td(env, gamma, uniform_policy, n_episodes, alpha=alpha)
print(v_td)

# Compare the state value of Time Difference Method and Direct Method
print(compute_value_error(v_td, v_direct))

[[0.00080418]
 [0.0010455 ]
 [0.00720718]
 [0.00180156]
 [0.00091557]
 [0.        ]
 [0.03405918]
 [0.        ]
 [0.00328069]
 [0.19695065]
 [0.29064123]
 [0.        ]
 [0.        ]
 [0.03843509]
 [0.63247242]
 [0.        ]]
(-0.35262530328670194, 0.8870717776952682)


## (Optional) Exercise : Comparison TD to Monte Carlo incremental

How do these to methods compare? Explain the relative advantages and disadvantages.

## Exercise : Action-value
Compute the action value function using one or more of the following methods:
- Naive (cheating)
- MC "first visit"
- MC "every visit"
- MC incremental
- TD

In [None]:
def actionvalue_montecarlo_every(env, gamma, policy, n_episodes):
    ### BEGIN SOLUTION
    # Initialize value table (one cell per state)
    n_state = env.observation_space.n
    n_action = env.action_space.n
    q = np.zeros((n_state,n_action))

    # Set the counter for each state visited due to a particular action
    count_state_action = np.zeros((n_state,n_action)) 

    # For each episode
    for n in range(n_episodes):
        # Sample an episode and compute the returns
        states, actions, rewards, dones = sample_episode(env, policy)
        returns = compute_returns(rewards, gamma)

        # For each state and followed action
        for state, action, ret in zip(states, actions, returns):
          # Update the returns of the state visited due to a particular action
          q[state, action] += ret

          # Update how many time the state visited due to a particular action
          count_state_action[state, action] += 1

    # Get the expected return from each state visited due to a particular action
    q = np.divide(q, count_state_action, out=np.zeros_like(count_state_action), where=count_state_action!=0)

    ### END SOLUTION
    return q

# Set up the gymnasium environment
env = gym.make("FrozenLake-v1", render_mode="rgb_array")

# Initiate the Uniform Policy
uniform_policy = UniformPolicy(env.action_space)
     
# Initiate the parameters
gamma = 0.9
n_episodes = 10000

# Calculate the action value and print it
q_mc_everyvisit = actionvalue_montecarlo_every(env, gamma, uniform_policy, n_episodes)
print(q_mc_everyvisit)

## Comment
# Action Value will the average of return on particular state by action taken to move to that particular state

[[0.00453397 0.00571012 0.00492443 0.00343301]
 [0.00278184 0.00445963 0.00406355 0.00635668]
 [0.01158339 0.010026   0.01448545 0.00320646]
 [0.00335211 0.00392951 0.00368025 0.00412306]
 [0.00984484 0.00806723 0.00582239 0.00220221]
 [0.         0.         0.         0.        ]
 [0.03833323 0.03670229 0.03439175 0.00264664]
 [0.         0.         0.         0.        ]
 [0.00734837 0.02338888 0.02106792 0.0232164 ]
 [0.0448026  0.07848848 0.07417061 0.04015739]
 [0.17930823 0.19015144 0.10271455 0.02595332]
 [0.         0.         0.         0.        ]
 [0.         0.         0.         0.        ]
 [0.03094601 0.11391333 0.20701039 0.13284311]
 [0.23645378 0.50366569 0.51358897 0.46892226]
 [0.         0.         0.         0.        ]]


# Policy improvement

In this section we are going to improve upon the uniform policy, which selects actions at random, independently on the state.

To assess whether our learned policies work, we will start by implmenting a scoring function to evaluate the policies.

## Exercise : Evaluate performance of a policy

In this environment we define a goal as obtaining a final reward greater than 0, thus reaching the target state, since it is the only one with a non-zero reward.

For a given policy, compute:
- the average number of episodes where the goal was reached
- the average number of steps to reach the goal


In [None]:
def score_policy(env, gamma, policy, n_episodes):
    episodes_to_goal = 0
    steps_to_goal = []

    ### BEGIN SOLUTION
    for n in range(n_episodes):
        # Sample an episode
        states, actions, rewards, dones = sample_episode(env, policy)
        if rewards[-1] != 0:
          episodes_to_goal += 1
          steps_to_goal.append(len(states))
    ### END SOLUTION

    return episodes_to_goal/n_episodes, np.mean(steps_to_goal)

## Exercise : Greedy policy (policy improvement)

Create a greedy policy from an action value function and evaluate it's performance.

The greedy policy selects the action leading to the largest value in the current state.

**Note** that when several actions have the same maximal value, it is best to randomly pick one of them.

Pick the first highest value action or (optionally) pick randomly amongst actions with the highest value.

In [None]:
class GreedyPolicy(UniformPolicy):
    def __init__(self, action_space, q):
        super().__init__(action_space)

        # q is the action-value table
        self.q = q

    def _max_value_action(self, state):
        ### BEGIN SOLUTION
        action = np.argmax(self.q[state])
        ### END SOLUTION11
        return action

    def probability(self, state, action):
        # Select the highest value action
        action_max = self._max_value_action(state)

        # Return a probability of 1 for the selected action 0 otherwise
        action_prob = float(action == action_max)
        
        return action_prob

    def sample(self, state):
        # Select the highest value action
        action_max = self._max_value_action(state)

        return action_max

# Set up the gymnasium environment
env = gym.make("FrozenLake-v1", render_mode=None)

# Initiate the greedy policy
greedy_policy = GreedyPolicy(env.action_space, q_mc_everyvisit)

print(q_mc_everyvisit)

# And sample 20 actions from state==0
actions = [greedy_policy.sample(0) for i in range(20)]
print(actions)

[[0.00453397 0.00571012 0.00492443 0.00343301]
 [0.00278184 0.00445963 0.00406355 0.00635668]
 [0.01158339 0.010026   0.01448545 0.00320646]
 [0.00335211 0.00392951 0.00368025 0.00412306]
 [0.00984484 0.00806723 0.00582239 0.00220221]
 [0.         0.         0.         0.        ]
 [0.03833323 0.03670229 0.03439175 0.00264664]
 [0.         0.         0.         0.        ]
 [0.00734837 0.02338888 0.02106792 0.0232164 ]
 [0.0448026  0.07848848 0.07417061 0.04015739]
 [0.17930823 0.19015144 0.10271455 0.02595332]
 [0.         0.         0.         0.        ]
 [0.         0.         0.         0.        ]
 [0.03094601 0.11391333 0.20701039 0.13284311]
 [0.23645378 0.50366569 0.51358897 0.46892226]
 [0.         0.         0.         0.        ]]
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]


## Exercise : Compare the performance of policies

Report the performance of the uniform policy and the greedy policy

In [None]:
env = gym.make("FrozenLake-v1", render_mode=None)

### BEGIN SOLUTION
# Intiate gamma and number of episodes
gamma = 0.9
n_episodes = 10000

# Uniform Policy Score
avg_episodes_to_goal_UP, avg_steps_to_goal_UP = score_policy(env, gamma, uniform_policy, n_episodes)

# Greedy Policy Score
avg_episodes_to_goal_GP, avg_steps_to_goal_GP = score_policy(env, gamma, greedy_policy, n_episodes)

# Print the score and average steps to reach the goal for both uniform policy and greedy policy
print(f'Uniform Policy: {avg_episodes_to_goal_UP:.2%} / {avg_steps_to_goal_UP:.2f}')
print(f'Greedy  Policy: {avg_episodes_to_goal_GP:.2%} / {avg_steps_to_goal_GP:.2f}')
### END SOLUTION

Uniform Policy: 1.48% / 13.64
Greedy  Policy: 15.91% / 18.92


In [None]:
## Comment
# From this comparison we can see that Greedy Policy perform much better that the Uniform Policy

# Policy learning

In this section we will start learning policies.

We will then implement the following policy learning methods:
- policy iteration
- SARSA
- Q-Learning

## Exercise : Policy iteration
By alternating between policy evaluation and policy improvement, find an optimal policy.

Print the scores of each intermediate policy and comment on how the metrics evolve.

In [None]:
def policy_learn_iteration(env, initial_policy, policy_evaluation_function, 
                           n_episodes_value, n_episodes_score, n_iterations):
    policy = initial_policy

    ### BEGIN SOLUTION

    # Score and print the initial policy
    gamma = 0.9
    init_policy_score = score_policy(env, gamma, policy, n_episodes_score)[0]
    av_init = np.nan
    print ('Iteration : 0 ,Score : {s:.2%}, Action-Value Function Change : {avc}'.format(s = init_policy_score, avc = av_init))

    for iteration in range(n_iterations):
        # Policy evaluation
        action_value = policy_evaluation_function(env, gamma, policy, n_episodes_value)
        # Keep track of the action-value function change 
        # (sum of absolute difference between the previous 
        #  and next action-value functions)
        # Note that at the first step we will report a change of nan
        # since we still don't have a previous action-value function

        # Tracking the action-value function change 
        if iteration == 0:
          action_value_prev = 0
        
        avc = np.sum(np.abs(action_value - action_value_prev))

        # Policy improvement
        greedy_policy = GreedyPolicy(env.action_space, action_value)

        # Policy scoring
        policy_score = score_policy(env, gamma, greedy_policy, n_episodes_score)[0]
        print ('Iteration : {i}, Score : {s:.2%}, Action-Value Function Change : {avc:.5}'.format(i = iteration+1, s = policy_score, avc = avc))

        # Set the current greedy policy as previous policy for next iteration
        policy = greedy_policy

        # Set the current action value as previous action value for next iteration
        acion_value_prev = action_value

    ### END SOLUTION
# Set up the gymnasium environment
env = gym.make("FrozenLake-v1", render_mode="rgb_array")

# Initiate the Uniform Policy
uniform_policy = UniformPolicy(env.action_space)

# Train the policy
policy_learn_iteration(env, uniform_policy, actionvalue_montecarlo_every, 
                       n_episodes_value=10000, n_episodes_score=10000,
                       n_iterations=30)

# Create Video
env = RecordVideo(env)
states, actions, rewards, dones = sample_episode(env, uniform_policy)
display(env.video())

Iteration : 0 ,Score : 1.16%, Action-Value Function Change : nan
Iteration : 1, Score : 33.19%, Action-Value Function Change : 2.8772
Iteration : 2, Score : 33.13%, Action-Value Function Change : 1.3625
Iteration : 3, Score : 33.37%, Action-Value Function Change : 1.3219
Iteration : 4, Score : 33.33%, Action-Value Function Change : 1.3677
Iteration : 5, Score : 32.56%, Action-Value Function Change : 1.3598
Iteration : 6, Score : 33.36%, Action-Value Function Change : 1.3387
Iteration : 7, Score : 32.52%, Action-Value Function Change : 1.3208
Iteration : 8, Score : 32.92%, Action-Value Function Change : 1.3433
Iteration : 9, Score : 32.84%, Action-Value Function Change : 1.3527
Iteration : 10, Score : 32.99%, Action-Value Function Change : 1.3708
Iteration : 11, Score : 32.43%, Action-Value Function Change : 1.3366
Iteration : 12, Score : 32.69%, Action-Value Function Change : 1.3544
Iteration : 13, Score : 31.83%, Action-Value Function Change : 1.3268
Iteration : 14, Score : 32.15%, Ac

<IPython.core.display.Javascript object>

In [None]:
## Comment
# From the resul above, it turns out that Greedy Policy didn't ensure that the model will learn properly
# Main reason because Greedy Policy didn't allow the agent to learn from another state to find another optimal path

## Exercise : Epsilon-greedy policy

One of the issues with the greedy policy is that some possible trajectories may never be visited depending on the initial estimation of the value function.

To avoid this create an epsilon-greedy policy. These policies act differently when running in training mode and evaluation mode. In evaluation mode they act as a greedy policy. In training mode:
- with probability epsilon, uniformly selects an action
- else selects the action with maximum value (greedy policy)

Implement the `EpsilonGreedyPolicy` class.


In [None]:
class EpsilonGreedyPolicy(GreedyPolicy):
    def __init__(self, action_space, q, epsilon, 
                 epsilon_decay=1, epsilon_min=0):
        super().__init__(action_space, q)
        self.epsilon_start = epsilon
        self.epsilon_decay = epsilon_decay
        self.epsilon_min = epsilon_min
        
        self.epsilon = self.epsilon_start

    def sample(self, state):
        ### BEGIN SOLUTION
        if self.training == True:
          prob = np.random.random() # This probability value will be the factor to promote exploration in training mode
          if prob > self.epsilon: # If the probability value is bigger than the threshold epsilon, choose the action with maximum value
            action = self._max_value_action(state)
          else: # else, do the uniform policy
            action = np.random.choice(4)

        else: # In the evaluation mode, The model will alwats take action with maximum value
          action = self._max_value_action(state)
        ### END SOLUTION
        return action

    def begin_episode(self, episode_index):
        # Start of an episode
        self.epsilon = self.epsilon_start * (self.epsilon_decay ** episode_index) # Apply the decay on the epsilon threshold
        self.epsilon = max(self.epsilon, self.epsilon_min)

# Instantiate a policy
dummy = EpsilonGreedyPolicy(env.action_space, q_mc_everyvisit, epsilon=0.5)

# Sample 20 actions from state 0 in train mode
dummy.train()
actions = [dummy.sample(0) for i in range(20)]
print(actions)

# Sample 20 actions from state 0 in eval mode
dummy.eval()
actions = [dummy.sample(0) for i in range(20)]
print(actions)

[1, 1, 1, 2, 1, 3, 1, 1, 0, 3, 1, 1, 1, 1, 2, 2, 1, 1, 1, 1]
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]


In [None]:
## Comment
# With train mode activated, we can see that the policy ocassionally change randomly to promote further exploration

## Exercise : SARSA

State–action–reward–state–action (SARSA) is a method to learn a policy leveraging the TD estimation of action-value functions and an epsilon-greedy policy.

Since it is based on TD, it does not require full episodes to train, the policy can be improved at each step. We will therefore **not use the `sample_episode`** and reimplement here a similar loop.

Implement the SARSA algorithm, in this case it will be passed a policy of type `EpsilonGreedyPolicy` which stores the action-value (or Q-table). We may access and modify if using `policy.q`.

Score the trained policy.

In [None]:
def policy_learn_sarsa(env, policy, gamma, n_episodes, alpha, max_n_steps=1000, print_every=500):
    # No need to create the tables of action-values since they are stored directly in the policy
    
    # While we haven't reached the desired number of steps
    for episode in range(n_episodes):
        # Call the policy begin_episode so it can handle epsilon decay
        policy.begin_episode(episode)
        
        # Print every couple episodes
        if not episode % print_every:
            print(f'ep: {episode}, epsilon: {policy.epsilon:.3f}')

        # Get initial state
        state, info = env.reset()

        # Take the first action acording to the policy
        action = policy.sample(state)

        # While we haven't reached the maximum number of steps for the episode
        for step in range(max_n_steps):     
            ### BEGIN SOLUTION
            # Perform a step of the environment
            state_next, reward, terminated, truncated, info = env.step(action)

            # It is done if terminated or truncated
            done = terminated or truncated

            # If episode has finished
            if done:

                # Update the action-value table and leave the loop
                policy.q[state, action] = policy.q[state, action] + alpha*(reward + gamma*policy.q[state_next, action_next] - policy.q[state, action])
                break

            # Sample the next action
            action_next = policy.sample(state_next)

            # Update the action-value table
            policy.q[state, action] = policy.q[state, action] + alpha*(reward + gamma*policy.q[state_next, action_next] - policy.q[state, action])

            ### END SOLUTION

            # Set the current state and action
            state = state_next
            action = action_next
            
    return

# Set up the gymnasium environment
env = gym.make("FrozenLake-v1", render_mode="rgb_array")

# Initiate initial SARSA learning policy
q_initial = np.zeros((env.observation_space.n, env.action_space.n))
sarsa_policy = EpsilonGreedyPolicy(env.action_space, q_initial,
                                   epsilon=1, epsilon_decay=0.999, epsilon_min=0.01)

# Set up the parameter for SARSA Learning
gamma = 0.9
n_episodes = 5000
alpha = 0.2
n_episodes_score = 1000

### BEGIN SOLUTION
# Train the policy
sarsa_policy.train()
policy_learn_sarsa(env, sarsa_policy, gamma, n_episodes, alpha, max_n_steps=1000, print_every=500)

### END SOLUTION

### BEGIN SOLUTION
# Evaluate the policy
sarsa_policy.eval()
avg_episodes_to_goal, avg_steps_to_goal = score_policy(env, gamma, sarsa_policy, n_episodes)

### END SOLUTION

print(f'Policy SARSA: {avg_episodes_to_goal:.2%} / {avg_steps_to_goal:.2f}')

# Create Video
env = RecordVideo(env)
states, actions, rewards, dones = sample_episode(env, sarsa_policy)
display(env.video())

ep: 0, epsilon: 1.000
ep: 500, epsilon: 0.606
ep: 1000, epsilon: 0.368
ep: 1500, epsilon: 0.223
ep: 2000, epsilon: 0.135
ep: 2500, epsilon: 0.082
ep: 3000, epsilon: 0.050
ep: 3500, epsilon: 0.030
ep: 4000, epsilon: 0.018
ep: 4500, epsilon: 0.011
Policy SARSA: 51.56% / 33.16


<IPython.core.display.Javascript object>

In [None]:
## From the SARSA Learning Method, we can see that the model train the policy much better with epsilon greedy policy rather than the vanillay greedy policy

## Exercise : Q-learning

Q-learning is very similar to SARSA. **SARSA is an on-policy learning method** in which the action-values are updated following the same policy. In SARSA the action-values are updated using the value of the next state and next action taken.

Q-learning is off-policy, it does not assume the same policy when updating the action-values, instead it assumes an optimal policy by using the maximum value of the next state.

Implement a modified version of `policy_learn_sarsa` that performs Q-learning.

In [None]:
def policy_learn_qlearn(env, policy, gamma, n_episodes, alpha, max_n_steps=1000, print_every=500):
    # No need to create the tables of action-values since they are stored directly in the policy
    
    # While we haven't reached the desired number of steps
    for episode in range(n_episodes):
        # Call the policy begin_episode so it can handle epsilon decay
        policy.begin_episode(episode)
        
        # Print every couple episodes
        if not episode % print_every:
            print(f'ep: {episode}, epsilon: {policy.epsilon:.3f}')

        # Get initial state
        state, info = env.reset()

        # Take the first action acording to the policy
        action = policy.sample(state)

        # While we haven't reached the maximum number of steps for the episode
        for step in range(max_n_steps):     
            ### BEGIN SOLUTION
            # Perform a step of the environment
            state_next, reward, terminated, truncated, info = env.step(action)

            # Set as done if terminated or truncated
            done = terminated or truncated

            # If episode has finished
            if done:
                # Update the action-value table and leave the loop
                policy.q[state, action] = policy.q[state, action] + alpha*(reward + gamma*np.amax(policy.q[state_next]) - policy.q[state, action])
                break

            # Sample the next action
            action_next = policy.sample(state_next)

            # Update the action-value table
            policy.q[state, action] = policy.q[state, action] + alpha*(reward + gamma*np.amax(policy.q[state_next]) - policy.q[state, action])
            ### END SOLUTION

            # Set the current state and action
            state = state_next
            action = action_next
            
    return

# Set up the gymnasium environment
env = gym.make("FrozenLake-v1", render_mode="rgb_array")

# Initiate initial q learning policy
q_initial = np.zeros((env.observation_space.n, env.action_space.n))
qlearn_policy = EpsilonGreedyPolicy(env.action_space, q_initial,
                                    epsilon=1, epsilon_decay=0.999, epsilon_min=0.01)

# Set up the parameter for Q learning Policy
gamma = 0.9
n_episodes = 5000
alpha = 0.2
n_episodes_score = 1000

### BEGIN SOLUTION
# Train the policy
qlearn_policy.train()
policy_learn_qlearn(env, qlearn_policy, gamma, n_episodes, alpha, max_n_steps=1000, print_every=500)

### END SOLUTION

### BEGIN SOLUTION
# Evaluate the policy
qlearn_policy.eval()
avg_episodes_to_goal, avg_steps_to_goal = score_policy(env, gamma, qlearn_policy, n_episodes)

### END SOLUTION

print(f'Policy Q-learn: {avg_episodes_to_goal:.2%} / {avg_steps_to_goal:.2f}')

# Create Video
env = RecordVideo(env)
states, actions, rewards, dones = sample_episode(env, qlearn_policy)
display(env.video())

ep: 0, epsilon: 1.000
ep: 500, epsilon: 0.606
ep: 1000, epsilon: 0.368
ep: 1500, epsilon: 0.223
ep: 2000, epsilon: 0.135
ep: 2500, epsilon: 0.082
ep: 3000, epsilon: 0.050
ep: 3500, epsilon: 0.030
ep: 4000, epsilon: 0.018
ep: 4500, epsilon: 0.011
Policy Q-learn: 45.34% / 28.82


<IPython.core.display.Javascript object>

In [None]:
## Comment
# From the result, Q Learning tend to train the policy much better than the SARSA
# Probabily this happened because Q Learning updated the action value using the optimal policy, rather than same policy

## (Optional) Exercise : Train for Taxi

Train an epsilon-greedy policy using Q-learning on the `Taxi-v3` environment.

Score the performance of this policy and compare it to a uniform policy.

In [None]:
# Set up the gymnasium environment
env = gym.make("Taxi-v3", render_mode="rgb_array")

# Initiate initial q learning policy
q_initial = np.zeros((env.observation_space.n, env.action_space.n))
qlearn_policy = EpsilonGreedyPolicy(env.action_space, q_initial,
                                    epsilon=1, epsilon_decay=0.999, epsilon_min=0.01)

# Set up the parameter for Q learning Policy
gamma = 0.9
n_episodes = 5000
alpha = 0.2
n_episodes_score = 1000

### BEGIN SOLUTION
# Train the policy
qlearn_policy.train()
policy_learn_qlearn(env, qlearn_policy, gamma, n_episodes, alpha, max_n_steps=1000, print_every=500)

### END SOLUTION

### BEGIN SOLUTION
# Evaluate the policy
qlearn_policy.eval()
avg_episodes_to_goal, avg_steps_to_goal = score_policy(env, gamma, qlearn_policy, n_episodes)

### END SOLUTION

print(f'Policy Q-learn: {avg_episodes_to_goal:.2%} / {avg_steps_to_goal:.2f}')

# Create Video
env = RecordVideo(env)
states, actions, rewards, dones = sample_episode(env, qlearn_policy)
display(env.video())

ep: 0, epsilon: 1.000
ep: 500, epsilon: 0.606
ep: 1000, epsilon: 0.368
ep: 1500, epsilon: 0.223
ep: 2000, epsilon: 0.135
ep: 2500, epsilon: 0.082
ep: 3000, epsilon: 0.050
ep: 3500, epsilon: 0.030
ep: 4000, epsilon: 0.018
ep: 4500, epsilon: 0.011
Policy Q-learn: 100.00% / 14.03


<IPython.core.display.Javascript object>

## (Optional) Exercise : Render Taxi

Run 3 episodes of a Q-learn trained policy on `Taxi-v3` this time rendering the result.

## (Bonus) Exercise : A observation space environment

Try to perform Q-learning on an environment (e.g. `CartPole-v1`) with continuous action and observation spaces.

You will need to discretize the observation space.

In [None]:
# Example of rendering and showing a CartPole-v1 environment
env = RecordVideo(gym.make("CartPole-v1", render_mode="rgb_array"))
print(env.action_space)
print(env.observation_space)

observation, info = env.reset()

while True:
    env.render()
    
    action = env.action_space.sample() 
         
    state_next, reward, terminated, truncated, info = env.step(action)

    done = terminated or truncated
   
    if done: 
      break;
            
env.close()

Discrete(2)
Box([-4.8000002e+00 -3.4028235e+38 -4.1887903e-01 -3.4028235e+38], [4.8000002e+00 3.4028235e+38 4.1887903e-01 3.4028235e+38], (4,), float32)


<IPython.core.display.Javascript object>

In [None]:
class ObservationWrapper(gym.ObservationWrapper):
    def __init__(self, env, n_states=4096, bounds=(-50, 50)):
        super().__init__(env)

        self.n_states = n_states

        assert isinstance(env.observation_space, gym.spaces.box.Box)

        ### BEGIN SOLUTION
        # Get the dimensions of the observation space


        # Decide how many bins per dimension based on the target n_states


        # Define the bins to discretize based on the bounds and self.n_bins

        ### END SOLUTION
        
    def observation(self, obs):
        ### BEGIN SOLUTION
        # Quantize the observations (states)


        # Once quantized each dimension compute a single observation index

        ### END SOLUTION
        return new_obs

# Initialize environment, wrap to render
env = gym.make("CartPole-v1", render_mode="rgb_array")

# Wrap to discretize the observation space
env = ObservationWrapper(env)

### BEGIN SOLUTION
# Initialize policy






# Train the policy








# Recreate the wrapped environment 



# Render and show an episode with the trained policy




### END SOLUTION