# Reinforcement Learning
Prof. Milica Gašić

### Policy iteration

Policy iteration is an algorithm to find an optimal policy for an MDP. Written in pseudocode it looks like this:  

Repeat two steps until the policy converges:
- **Policy evaluation**, i.e. compute the value function $v_\pi$ of the policy $\pi$:
   1. Initialize value function $v_0(s)$ for all states $s \in \mathcal{S}$ arbitrarily.
   2. Repeat until the value function converges:
      $$\begin{aligned}
      & v_{k+1}(s) = \sum_a \pi(a|s) \left( \mathcal{R}(s,a) + \gamma \sum_{s'} \mathcal{P}(s'|s,a) v_k(s') \right)\\
      & \text{for all } s \in \mathcal{S}
      \end{aligned}$$
      The value function is converged if $$|v_{k+1}(s) - v_k(s)| < \epsilon$$ for all $s \in \mathcal{S}$.
- **Policy improvement**, i.e. find a better policy $\pi'$ using the value function:
  $$\begin{aligned}
  & \pi'(s) = \arg\max_a \left( \mathcal{R}(s,a) + \gamma \sum_{s'} \mathcal{P}(s'|s,a) v_\pi(s') \right) \\
  & \text{for all } s \in \mathcal{S}
  \end{aligned}$$
  The policy is converged if $\pi'(s) = \pi(s)$ for all $s \in \mathcal{S}$.
- (Set $\pi = \pi'$)

Both update rules above make use of the same expression on the right-hand side, which is known as the Q-function or action-value function:
$$q_\pi(s,a) = \mathcal{R}(s,a) + \gamma \sum_{s'} \mathcal{P}(s'|s,a) v_\pi(s')$$
We will talk more about it in the next lecture. We can use it now to simplify the update rules:
$$\begin{aligned}
v_{k+1}(s) & = \sum_a \pi(a|s) \, q_k(s,a) \\
\pi'(s) & = \arg\max_a q_\pi(s,a)
\end{aligned}$$
Note that we need to convert the deterministic policy into a stochastic one to be able to use it for policy evalution:
$$\pi(a|s) = \begin{cases}
  1 & \text{ if } a = \pi(s) \\
  0 & \text{ otherwise}
\end{cases}$$
<details>
<summary>Minor note (click)</summary>
Alternatively, we could make use of the fact that the policy is deterministic and change the update rule of policy evaluation to:

$$v_{k+1}(s) = q_k(s,\pi(s))$$

However, in this exercise we use the more general stochastic version above.
</details>

#### Implementation

Make sure that the files `rl_agent.py`, `rl_env.py`, `rl_gui.py` and `rl_tests.py` are in the same folder as the notebook.

In [1]:
%load_ext autoreload
%autoreload 2

import numpy as np

import rl_agent
import rl_env
import rl_gui
import rl_tests

Your task is to implement policy iteration in the class `PolicyIterationAgent` below.  
Follow the instructions in the methods `compute_q()`, `update_v()` and `policy_improvement()`.

In [None]:
class PolicyIterationAgent(rl_agent.TabularAgent):

    def __init__(self, env, gamma):
        super().__init__(env)
        self.gamma = gamma
        self.reset()

    def reset(self):
        # Resets the agent
        # Initialize the stochastic policy with uniform probabilities
        self.pi = np.full((self.num_states, self.num_actions), 1 / self.num_actions)
        # The policy did not converge yet
        self.pi_converged = False
        # Initialize the value function as an array of zeros
        self.v = np.zeros(self.num_states, dtype=float)
        # The value function did not converge yet
        self.v_converged = False

    def policy(self, state):
        # Sample an action from the stochastic policy
        action = np.random.choice(self.num_actions, p=self.pi[state])
        return action
    
    def value(self, state):
        # Lookup in the value function array
        return self.v[state]

    def compute_q(self):
        gamma = self.gamma
        v = self.v
        # Get the transition probabilities and reward function
        # from the environment
        P = self.env.P
        R = self.env.R
        #######################################################################
        # TODO: This methods compute the q-function using the value function  #
        # with the formula:                                                   #
        #     q(s,a) = R(s,a) + gamma * sum_s' P(s'|s,a) v(s')                #
        # Note that the next states are stored in the last dimension of P,    #
        # i.e. the probability of s' given s, a is stored in P[s, a, s'].     #
        # You can implement this using for-loops. Alternatively, a vectorized #
        # implementation can be written in a single line of code.             #
        #######################################################################
        
    
        
        #######################################################################
        # End of your code.                                                   #
        #######################################################################
        return q

    def update_v(self, epsilon=1e-4):
        # Store the old values to check for convergence
        old_v = np.copy(self.v)
        # Compute the action value function
        q = self.compute_q()
        pi = self.pi
        #######################################################################
        # TODO: This method applies one step of policy evaluation and checks  #
        # if the value function converged. We already implemented the         #
        # convergence check below. Remember to store the result in self.v     #
        # You can implement this using for-loops. Alternatively, a vectorized #
        # implementation can be written in a single line of code.             #
        #######################################################################
        
        #######################################################################
        # End of your code.                                                   #
        #######################################################################

        # Check if the value function converged
        self.v_converged = np.allclose(self.v, old_v, atol=epsilon)

    def policy_evaluation(self, epsilon=1e-4):
        # Run update_v() until the value function converges
        while not self.v_converged:
            self.update_v(epsilon)

    def policy_improvement(self):
        # Improve the policy by taking the maximum over the q-function

        old_pi = np.copy(self.pi)
        q = self.compute_q()
        #######################################################################
        # TODO: This method improves the policy by taking the maximum over    #
        # the q-function. We already implemented the conversion of the        #
        # deterministic policy into a stochastic one. Your task is to create  #
        # a NumPy array of type integer called "max_actions" that contains    #
        # the best action for each state.                                     #
        # You can implement this using for-loops. Alternatively, a vectorized #
        # implementation can be written in a single line of code.             #
        #######################################################################
        
        #######################################################################
        # End of your code.                                                   #
        #######################################################################

        # Convert to stochastic policy with one-hot probabilities
        self.pi = np.eye(self.num_actions)[max_actions]
        # Check if the policy converged
        self.pi_converged = np.all(self.pi == old_pi)

        if not self.pi_converged:
            # The value function did not converge, since we have a new policy
            self.v_converged = False

    def policy_iteration(self, epsilon=1e-4):
        # Repeat policy evaluation and improvement until the policy converges
        while not self.pi_converged:
            self.policy_evaluation(epsilon)
            self.policy_improvement()

    # This method is used for the GUI
    # You don't have to understand the code
    def interactive_optimization(self):
        from rl_gui import RLCmd, RLParamsResult

        def update_params(params):
            if params['gamma'] != self.gamma:
                self.gamma = params['gamma']
                if self.v_converged or self.pi_converged:
                    self.v_converged = False
                    self.pi_converged = False
                    # Get out of the 'Policy evaluation converged'/'Policy is optimal' state
                    return RLParamsResult.RESET_GENERATOR

        yield RLCmd.Init(options={'eval_step': 'Evaluate 1 step',
                                  'eval': 'Evaluate policy',
                                  'improve': 'Improve policy',
                                  'complete': 'Finish optimization',
                                  'reset': 'Reset agent'},
                         params={'gamma': ('Discount factor', 'float', self.gamma, 0.0, 1.0 - 1e-4)},
                         params_callback=update_params)

        while not self.pi_converged:
            option = None
            while not self.v_converged and not self.pi_converged:
                if option is None or option == 'eval_step':
                    option = yield RLCmd.WaitForOption(active=['eval', 'eval_step', 'complete', 'reset'],
                                                       step='eval_step', message='Policy evaluation not converged')

                if option == 'complete':
                    self.policy_iteration()
                elif option == 'reset':
                    self.reset()
                    option = None
                else:
                    self.update_v()

            if self.pi_converged:
                break

            option = yield RLCmd.WaitForOption(active=['complete', 'improve', 'reset'],
                                               step='improve', message='Policy evaluation converged')
            if option == 'complete':
                self.policy_iteration()
            elif option == 'reset':
                self.reset()
            else:
                self.policy_improvement()

        option = yield RLCmd.WaitForOption(active=['reset'], message='Policy is optimal')
        assert option == 'reset', option
        self.reset()


You can use the following code cell to test your implementation.  
**Important**: After changing your code, execute the above code cell before running the tests.

In [None]:
def test_pi_agent():
    env = rl_env.default_5x5_maze(model_based=True)
    rng = None

    def seed():
        nonlocal rng
        rng = np.random.Generator(np.random.PCG64(seed=42))

    def create_agent(gamma):
        agent = PolicyIterationAgent(env, gamma)
        pi = rng.uniform(0, 1, (agent.num_states, agent.num_actions))
        pi /= np.sum(pi, axis=1, keepdims=True)
        agent.pi = pi
        agent.v = rng.standard_normal(agent.num_states)
        return agent

    yield 'compute_q()'
    seed()
    for expected_sum in [-18.462667, -4.839807, 14.589116]:
        agent = create_agent(gamma=0.8)
        q = agent.compute_q()
        if (yield from rl_tests.check_numpy_array(q, name='The result of compute_q()', shape=(agent.num_states, agent.num_actions), dtype=np.floating)):
            q_sum = np.sum(q)
            yield np.isclose(q_sum, expected_sum, atol=1e-5), f'The computed q-values are incorrect (error = {abs(expected_sum - q_sum):.5f})'
        yield None
    
    yield 'update_v()'
    seed()
    for expected_sum in [-4.457114, -1.007184, 4.208458]:
        agent = create_agent(gamma=0.8)
        agent.update_v()
        if (yield from rl_tests.check_numpy_array(agent.v, name='self.v', shape=(agent.num_states,), dtype=np.floating)):
            v_sum = np.sum(agent.v)
            yield np.isclose(v_sum, expected_sum, atol=1e-5), f'The updated values are incorrect (error = {abs(expected_sum - v_sum):.5f})'
        yield None
    
    yield 'policy_improvement()'
    seed()
    for expected_indices in [
        np.array([3, 1, 0, 0, 0, 0, 1, 2, 2, 2, 2, 3, 1, 3, 1, 0, 0, 0, 0, 2, 2, 0]),
        np.array([0, 0, 3, 0, 2, 1, 1, 2, 0, 0, 0, 2, 3, 2, 3, 1, 3, 1, 3, 0, 1, 0]),
        np.array([3, 1, 0, 0, 3, 0, 2, 3, 3, 0, 3, 0, 0, 1, 1, 2, 2, 2, 2, 1, 3, 0])
    ]:
        agent = create_agent(gamma=0.8)
        agent.policy_improvement()
        if (yield from rl_tests.check_numpy_array(agent.pi, name='self.pi', shape=(agent.num_states, agent.num_actions), dtype=np.floating)):
            indices = np.argmax(agent.pi, axis=1)
            yield np.all(indices == expected_indices), f'The improved policy is incorrect'
        yield None

rl_tests.run_tests(test_pi_agent())

If all of your tests passed, you can see your agent in action in the following code cell.

Sometimes there is a strange bug and the environment is rendered multiple times. In that case you may have to restart the notebook and reopen the browser tab or restart the editor (e.g. in VS Code).

In [None]:
# start a GUI for the maze environment
env = rl_env.default_5x5_maze(model_based=True)
# you can try the bigger maze by uncommenting the next line
#env = rl_env.default_8x8_maze(model_based=True)

gamma = 0.9  # discount factor
agents = {'Random': rl_agent.RandomAgent(env),
          'Policy Iteration': PolicyIterationAgent(env, gamma)}

rl_gui.RLGui(env, agents=agents, max_steps=1000, render_mode='rgb_array')