## Environment Settings (do not change)

Please do **not** change this part.

In [1]:
!pip install gymnasium
!pip install gymnasium[other]
!pip install gymnasium[toy-text]



In [2]:
import numpy as np
from typing import List, Tuple
import gymnasium as gym

from gymnasium.wrappers import RecordVideo
from base64 import b64encode
from IPython.display import HTML

def render_mp4(videopath: str) -> str:
  """
  Gets a string containing a b4-encoded version of the MP4 video
  at the specified path.
  """
  mp4 = open(videopath, 'rb').read()
  base64_encoded_mp4 = b64encode(mp4).decode()
  return f'<video width=400 controls><source src="data:video/mp4;' \
         f'base64,{base64_encoded_mp4}" type="video/mp4"></video>'

## Basics

Setup environment:

In [3]:
# Discount factor (in [0,1))
gamma = 0.95

# Simulation
n_episodes = 200
max_length_episode = 100

# Decide whether to generate a video or not
generate_video = True # leave it to True (or delete the rendering)
video_dir = 'vid'
video_name = 'FrozenLake'
n_episodes_video = 2 # episode which end up in the video

# Environment
env = gym.make('FrozenLake-v1', map_name="4x4", is_slippery=True, render_mode='rgb_array')
env = env.unwrapped

# The tutorial will be on FrozenLake, but feel free to play with other enviroments too
#env = gym.make('Taxi-v3', render_mode='rgb_array')
#env = gym.make('CliffWalking-v0', render_mode='rgb_array')

States and actions (for the environment FrozenLake, 4x4):

*   The state is the position on the 4x4 grid (i.e., between 0 and 15);
*   The action is left (0), down (1), right (2), up (3).




In [4]:
env.action_space

Discrete(4)

In [5]:
env.action_space.n

np.int64(4)

In [6]:
env.observation_space

Discrete(16)

In [7]:
env.observation_space.n

np.int64(16)

Probability matrix, for instance here the probabilities when at state 0 and action 1 is played.

In [8]:
env.unwrapped.P[0][1] # p_state, state, reward of that transition, done (ignore the last output)

[(0.3333333333333333, 0, 0.0, False),
 (0.3333333333333333, 4, 0.0, False),
 (0.3333333333333333, 1, 0.0, False)]

## Our First Policy: A Random Policy

We start with the semantics: For us, a policy is Python list of length $n_\mathrm{states}$ and each entry of this list is a vector of size $n_\mathrm{actions}$. To sample an action at a given state, which will be helpful later, the following function can be used.

*Note*: this is not the most efficient way, just for illustration purposes.

In [22]:
def sample_action(policy: List[np.ndarray], state: int) -> int:
  if isinstance(state, tuple):
    state = state[0]
  return np.random.choice(np.arange(start=0, stop=env.action_space.n), size=1, p=policy[state])[0]

We start with a simply random policy. We will later experiment other policies.

For us, a policy is list of arrays. In particular:
- `pi` is list whose dimension is the number of states;
- `pi[s]` is numpy array whose dimension is the number of actions which represents a probability distribution over the action space;
- `pi[s][a]` is the probability of playing action `a` when at state `s`.




In [23]:
pi_random = []
for x in range(env.observation_space.n):
    probability_actions = np.ones(env.action_space.n) / env.action_space.n
    pi_random.append(probability_actions)

Check your result by inspecting the probability distribution at the first state.  

In [24]:
pi_random[0]

array([0.25, 0.25, 0.25, 0.25])

Finally, we test our sampling method.

In [25]:
sample_action(policy=pi_random,
              state=1)

np.int64(0)

## Simulation environment
We now write a function that simulates a policy.

In [26]:
def simulate_environment(env, policy:List[np.ndarray], sim_video_name: str) -> float:
  # Setup video
  if generate_video:
    env = RecordVideo(env, video_dir, name_prefix=sim_video_name, episode_trigger=lambda e: e < n_episodes_video)

  total_reward = 0.0

  for e in range(n_episodes):
    reward_episode = 0.0

    # Reset
    observation = env.reset()

    # Simulate an episode
    for t in range(max_length_episode):
      action = sample_action(policy=policy,
                             state=observation)
      observation, reward, terminated, _, _ = env.step(action)

      # Compute reward
      reward_episode += gamma**t * reward

      if terminated:
        break

    # Increase reward
    total_reward += reward_episode

  env.close()

  return total_reward/n_episodes

We can now simulate our random policy. The simulations parameters are listed at the top of the file.

In [27]:
average_reward_random = simulate_environment(
    env,
    policy=pi_random,
    sim_video_name=video_name + '_random_policy')
print('Average reward: ' + str(average_reward_random))

Average reward: 0.0046213641854315765


  logger.warn(


Display video:

In [28]:
for episode in range(n_episodes_video):
    vid = HTML(render_mp4(f'{video_dir}/{video_name}_random_policy-episode-{episode}.mp4'))
    display(vid)

## Value function of a policy

Now, we can write a function that computes the value/cost-to-go of a given policy. We will do it in three ways:

1.   Solving the (linear) Bellman equation:
$$V^\pi(x)=\sum_{a}\pi(x,a)\left(R_x^a+\sum_{x'}P_{xx'}^aV^\pi(x')\right);$$

2.   Using the contractivity "Bellman equation": Update $V_t^\pi$ to $V_{t+1}^\pi$ via
$$V_{t+1}^\pi(x)=\sum_a\pi(x,a)\left(R_x^a+\sum_{x'}P_{xx'}^aV_t^\pi(x')\right);$$
note that you also need to define an initial condition and an adequate stopping  criterium;

3.   Via numerical simulations.

*Note*: This is not the optimal value/cost-to-go, but just the reward/cost incurred when using the policy $\pi$. After that, we will look into methods to compute the optimal policy $\pi^\ast$.

First, let's write a function that computes the expected reward $R_x^a$ and the probability vector $P_{xx'}^a$ when playing action $a$ at state $x$.






In [29]:
def get_reward_probability_vector_state_action(state: int, action: int) -> Tuple[float, np.ndarray]:
    expected_reward = None
    probability_vector = None
    # IMPLEMENT HERE
    transitions = env.P[state][action]  # List of (prob, next_state, reward, done)
    
    n_states = env.observation_space.n
    probability_vector = np.zeros(n_states)
    expected_reward = 0.0

    for prob, next_state, reward, done in transitions:
        probability_vector[next_state] += prob
        expected_reward += prob * reward

        
    return expected_reward, probability_vector

We can now start with 1. We split the task in two pieces:


1.   Implement a function `get_reward_probability_matrix` that outputs the reward vector (numpy array whose dimension is the number of states) whose entry $x$ is
$$\sum_{u}\pi(x,u)R_x^u$$
and the probability matrix (two-dimensional numpy array whose dimension is the number of states) whose entry $(x,x')$ is
$$\sum_{u}\pi(x,u)P_{xx'}^u.$$
2.   Use these two quantities to solve the linear equation. The output should be $V(x)$ as a numpy array (whose dimension is the number of states).

*Hint:* Use the function `get_reward_probability_vector_state_action` you wrote above.



In [30]:
def get_reward_probability_matrix(policy: List[np.ndarray]) -> Tuple[np.ndarray, np.ndarray]:
    expected_reward = None
    probability_matrix = None
    # IMPLEMENT HERE
    n_states = env.observation_space.n
    r_pi = np.zeros(n_states)
    P_pi = np.zeros((n_states, n_states))

    for s in range(n_states):
        for a in range(env.action_space.n):
            pi_s_a = policy[s][a]  # probability of taking action a in state s
            r_sa, p_sa = get_reward_probability_vector_state_action(s, a)
            if r_sa != 0:
                print(f"State {s}, Action {a} -> Reward: {r_sa}, Transition Prob Sum: {np.sum(p_sa)}")

            r_pi[s] += pi_s_a * r_sa
            P_pi[s] += pi_s_a * p_sa  # weighted sum of transition probs
    expected_reward = r_pi
    probability_matrix = P_pi

    return expected_reward, probability_matrix

def compute_value_policy(policy: List[np.ndarray]) -> np.ndarray:
    r, p = get_reward_probability_matrix(policy=policy)
    I = np.eye(len(r))
    V = np.linalg.solve(I - gamma * p, r)
    return V

value_random_policy = compute_value_policy(pi_random)
print(value_random_policy)

State 14, Action 1 -> Reward: 0.3333333333333333, Transition Prob Sum: 1.0
State 14, Action 2 -> Reward: 0.3333333333333333, Transition Prob Sum: 1.0
State 14, Action 3 -> Reward: 0.3333333333333333, Transition Prob Sum: 1.0
[ 7.76738424e-03  6.86813641e-03  1.42829484e-02  6.46133382e-03
  1.03018709e-02 -3.33746163e-16  3.25263116e-02 -1.42577884e-16
  2.53070433e-02  7.09470575e-02  1.22669943e-01  5.72937812e-16
  1.52886650e-16  1.50747467e-01  4.13031652e-01  0.00000000e+00]


We now use 2.

*Hint:* You can start with a zero initial condition.

In [41]:
def compute_value_policy_iterative(
    policy: List[np.ndarray] | np.ndarray,
    gamma: float = 0.99,
    theta: float = 1e-6
) -> np.ndarray:
    """
    Robust iterative policy evaluation:
      V(s) ← Σ_a π(a|s)[ R(s,a) + γ Σ_{s'} P(s'|s,a) V(s') ]
    """

    # 1) Build a plain (n_states × n_actions) float array
    n_states = env.observation_space.n
    n_actions = env.action_space.n
    policy_arr = np.zeros((n_states, n_actions), dtype=float)
    for s in range(n_states):
        row = np.asarray(policy[s], dtype=float).flatten()
        if row.size != n_actions:
            raise ValueError(f"policy[{s}] has length {row.size}, expected {n_actions}")
        policy_arr[s] = row

    # 2) Now do the usual iterative sweep
    value_pi = np.zeros(n_states, dtype=float)
    while True:
        delta = 0.0
        for s in range(n_states):
            v_s = 0.0
            for a in range(n_actions):
                pi_s_a = policy_arr[s, a]                 # guaranteed scalar float
                r_sa, p_sa = get_reward_probability_vector_state_action(s, a)
                # collapse the transition×value back to a scalar
                v_s += pi_s_a * (r_sa + gamma * np.dot(p_sa, value_pi))

            delta = max(delta, abs(v_s - value_pi[s]))
            value_pi[s] = v_s

        if delta < theta:
            break

    return value_pi

value_iterative_random_policy = compute_value_policy_iterative(pi_random)
print(value_iterative_random_policy)

[0.01235356 0.01042298 0.01933735 0.00947697 0.01478582 0.
 0.03889412 0.         0.0326019  0.08433739 0.13781067 0.
 0.         0.17034467 0.43357932 0.        ]


Finally, we check the value at the first cell and check your result numerically (use the simulation done above)

In [42]:
# Print value at the starting cell (note: for other environment it might not be the first cell)
print(average_reward_random)

0.0046213641854315765


## Optimal policies

Now, we look into methods to find the optimal policy. In the lecture, you learned about the two main algorithms: Value iteration and policy iteration.

First, we need a function that computes the greedy policy. It consists of two ingredients:

*   A function that evaluates
$$x\mapsto\max_{u\in U}R_x^u + \gamma\sum_{x'} P_{xx'}^u V(x')$$

*   A second function that computes
$$\arg\max_{\pi}\sum_{u}\pi(x,u)\left(R_x^u + \gamma\sum_{x'} P_{xx'}^u V(x')\right)$$
Here, note that the minimum always is a deterministic policy.


In [45]:
def compute_bellman_operator(state: int, value_function: np.ndarray) -> Tuple[float, int]:
    best_value = -np.inf
    best_action = 0

    # loop over all actions to find the maximal Q
    for a in range(env.action_space.n):
        r_sa, p_sa = get_reward_probability_vector_state_action(state, a)
        q_sa = r_sa + gamma * np.dot(p_sa, value_function)
        if q_sa > best_value:
            best_value = q_sa
            best_action = a

    return best_value, best_action

def compute_greedy_policy(value_function: np.ndarray) -> List[np.ndarray]:
    n_states = env.observation_space.n
    n_actions = env.action_space.n
    pi_greedy: List[np.ndarray] = []

    for s in range(n_states):
        _, best_a = compute_bellman_operator(s, value_function)
        # build one-hot distribution
        dist = np.zeros(n_actions, dtype=float)
        dist[best_a] = 1.0
        pi_greedy.append(dist)

    return pi_greedy

### Value Iteration

We now perform value iteration. We run the algortihm for at most a maximum number of iterations and we stop when the difference between the value functions of consecutive steps (measured via $\|\cdot\|_\infty$) is smaller than some given tolerance.

In [49]:
# Maximum number of iterations
max_number_iterations = 1000
n_states = env.observation_space.n

# 1) Initialize V arbitrarily (e.g. zeros)
value_value_iteration = np.zeros(n_states, dtype=float)

# 2) Value-iteration sweeps
for i in range(max_number_iterations):
    # compute a fresh copy so we don’t mix updates in one sweep
    new_V = np.zeros_like(value_value_iteration)

    for s in range(n_states):
        # uses your Bellman operator to get the optimal value at state s
        best_value, _ = compute_bellman_operator(s, value_value_iteration)
        new_V[s] = best_value
    value_value_iteration = new_V


# 3) Extract the greedy policy from the converged V*
pi_value_iteration = compute_greedy_policy(value_value_iteration)

# 4) Print value at the starting cell (state 0)
print(value_value_iteration[0])

0.1804715783972009


Simulate policy

In [50]:
average_reward_value_iteration = simulate_environment(env,
                                                      policy=pi_value_iteration,
                                                      sim_video_name=video_name + '_value_iteration')
print('Average reward: ' + str(average_reward_value_iteration))

Average reward: 0.18401481590605873


Display video

In [51]:
for episode in range(n_episodes_video):
    vid = HTML(render_mp4(f'{video_dir}/{video_name}_value_iteration-episode-{episode}.mp4'))
    display(vid)

### BONUS: Policy Iteration

If you have time left, try to implement policy iteration to obtain an optimal policy.

We can now implement policy iteration. We initialize the algorithm with the random policy and stop at convergence (or when a given number of iterations is reached).

In [None]:
# Maximum number of iterations
max_number_iterations = 100 # will converge in finitely many steps anyway
tol = 1e-5

# IMPLEMENT POLICY ITERATION HERE

# Final result
pi_policy_iteration = None
value_policy_iteration = None

# Print value at the starting cell
print(value_policy_iteration[0])

Simulate policy

In [None]:
average_reward_policy_iteration = simulate_environment(env,
                                                       policy=pi_policy_iteration,
                                                       sim_video_name=video_name + '_policy_iteration')
print('Average reward: ' + str(average_reward_policy_iteration))

Display video

In [None]:
for episode in range(n_episodes_video):
    vid = HTML(render_mp4(f'{video_dir}/{video_name}_policy_iteration-episode-{episode}.mp4'))
    display(vid)