## RLDMUU 2025
#### Policy Iteration
jakub.tluczek@unine.ch

In [6]:
import numpy as np

In [31]:
class DiscreteMDP:
    def __init__(self, n_states, n_actions, P = None, R = None):
        self.n_states = n_states 
        self.n_actions = n_actions 
        if (P is None):
            self.P = np.zeros([n_states, n_actions, n_states]) 
            for s in range(self.n_states):
                for a in range(self.n_actions):
                    self.P[s,a] = np.random.dirichlet(np.ones(n_states))
        else:
            self.P = P
        if (R is None):
            self.R = np.zeros([n_states, n_actions])
            for s in range(self.n_states):
                for a in range(self.n_actions):
                    self.R[s,a] = np.round(np.random.uniform(), decimals=1)
        else:
            self.R = R
        
        for s in range(self.n_states):
            for a in range(self.n_actions):
                assert(abs(np.sum(self.P[s,a,:])-1) <= 1e-3)
                assert((self.P[s,a,:] <= 1).all())
                assert((self.P[s,a,:] >= 0).all())
                
    def get_transition_probability(self, state, action, next_state):
        return self.P[state, action, next_state]
    
    def get_transition_probabilities(self, state, action):
        return self.P[state, action]
    
    def get_reward(self, state, action):
        return self.R[state, action]


In [32]:

class ChainMDP(DiscreteMDP):
    """
    Problem where we need to take the same action n_states-1 time in a row to get a highly rewarding state
    The optimal policy greatly depends on the discount factor we choose.
    """

    def __init__(self, n_states=20):
        assert  n_states > 1

        n_actions = 2
        super().__init__(n_states=n_states, n_actions=n_actions)

        self.R[:] = 0.
        self.P[:] = 0.

        self.R[:, 1] = -1 / (n_states-1)
        self.R[n_states-1, 1] = 1.
        self.R[:, 0] = 1/n_states

        for i in range(self.n_states-1):
            if i > 0:
                self.P[i, 0, i-1] = 1.
            else:
                self.P[i, 0, i] = 1.

            self.P[i, 1, i+1] = 1.

        self.P[self.n_states-1, :, self.n_states-1] = 1.

### Policy iteration

Your first task will be to implement the policy iteration algorithm. Let's start with policy evaluation. Given a policy $\pi$, you have to evaluate this policy on all states. 

$$ V^{\pi}(s) = \sum_{s'} P(s' | s, \pi(s)) [R(s, \pi(s)) + \gamma V^{\pi}(s')] $$ 

You can either program the policy evaluation using dynamic programming, or by using the equation:

$$ V^{\pi} = \left[\mathbb{I} - \gamma \mathbb{P} \right]^{-1} r $$ 

where $\mathbb{I}$ is an identity matrix, $\mathbb{P}$ a probability matrix and $r$ a reward vector.

In [33]:
def policy_evaluation_dynamic_programming(mdp: DiscreteMDP, gamma: float, policy: list[int], n_iters: int):
    # TODO: Implement policy evaluation using dynamic programming
    V = np.zeros(mdp.n_states)
    newV = np.zeros(mdp.n_states)
    for _ in range(n_iters):
        for s in range(mdp.n_states):
            a = policy[s]
            transition_probailities = mdp.get_transition_probabilities(s,a)
            reward = mdp.get_reward(s,a)
            summe = np.sum([transition_probailities[next_s] for next_s in range(mdp.n_states)]) # Equals 1
            newV[s] = reward* summe + gamma * np.sum([transition_probailities[next_s] * V[next_s] for next_s in range(mdp.n_states)])
        V = newV.copy()
    return V

def policy_evaluation_matrix_multiplication(mdp: DiscreteMDP, gamma: float, policy: list[int]):
    # TODO: Implement policy evaluation using matrix operations
    n_states = mdp.n_states
    P_pi = np.zeros((n_states, n_states))
    r_pi = np.zeros(n_states)
    for s in range(n_states):
        a = policy[s]
        P_pi[s, :] = mdp.P[s, a, :]   # Transition probabilities for state s under action a
        r_pi[s] = mdp.R[s, a]
    I = np.eye(n_states)
    V = np.linalg.inv(I - gamma * P_pi).dot(r_pi)
    return V


Now implement policy iteration by evaluating policy at each state and set the policy to:

$$ \pi(s) = \arg\max_{a \in A} Q^\pi (s,a) $$

Iterate until you reach maximal number of iterations or until newly computed values don't differ by more than some $\epsilon$ or until $\pi$ doesn't change.

In [34]:
def policy_iteration(mdp: DiscreteMDP, gamma: float, n_iters: int, n_eval_iters: int, use_dp: bool = False):
    # TODO: Implement policy iteration
    epsilon = 1e-3
    n_states = mdp.n_states
    n_actions = mdp.n_actions
    policy = [0] * n_states
    for _ in range(n_iters):
        if use_dp:
            V = policy_evaluation_dynamic_programming(mdp, gamma, policy, n_eval_iters)
        else:
            V = policy_evaluation_matrix_multiplication(mdp, gamma, policy)
        new_policy = [0] * n_states
        for s in range(n_states):
            Q = np.zeros(n_actions)
            for a in range(n_actions):
                transition_probailities = mdp.get_transition_probabilities(s,a)
                reward = mdp.get_reward(s,a)
                Q[a] = reward + gamma * np.sum([transition_probailities[next_s] * V[next_s] for next_s in range(n_states)])
            new_policy[s] = np.argmax(Q)
        if np.all(np.array(policy) - np.array(new_policy) < epsilon):
            break
        policy = new_policy
    return policy, V

### Value iteration

Your second task will be to implement Value Iteration algorithm. At each timestep $t$, you have to compute a new value function by maximizing the Bellman equation.

$$ V_t (s) = \max_a \left( \sum_{s'} P(s'|s, a) [R(s,a) + \gamma V_{t-1}(s')] \right) $$

Once $V$ converges to some $V^{*}$ (or once you reach the limit of iterations), you can extract the policy for each state:

$$ \pi^{*}(s) = \arg\max_a \sum_{s'} P(s' | s, a) [R(s,a) + \gamma V^{*}(s')] $$ 

In [35]:
def value_iteration(mdp: DiscreteMDP, gamma: float, n_iters: int):
    # TODO: Implement value iteration
    epsilon = 1e-3
    n_states = mdp.n_states
    n_actions = mdp.n_actions
    V = np.zeros(n_states)
    for _ in range(n_iters):
        newV = np.zeros(n_states)
        for s in range(n_states):
            Q = np.zeros(n_actions)
            for a in range(n_actions):
                transition_probailities = mdp.get_transition_probabilities(s,a)
                reward = mdp.get_reward(s,a)
                Q[a] = reward + gamma * np.sum([transition_probailities[next_s] * V[next_s] for next_s in range(n_states)])
            newV[s] = np.max(Q)
        if np.all(np.abs(V - newV) < epsilon):
            break
        V = newV
    policy = np.zeros(n_states, dtype=int)
    for s in range(n_states):
        Q = np.zeros(n_actions)
        for a in range(n_actions):
            transition_probailities = mdp.get_transition_probabilities(s,a)
            reward = mdp.get_reward(s,a)
            Q[a] = reward + gamma * np.sum([transition_probailities[next_s] * V[next_s] for next_s in range(n_states)])
        policy[s] = np.argmax(Q)
    return policy, V

### Results

Run both methods on the provided MDP for a given Chain MDP instance and compare the results 

In [36]:
mdp = ChainMDP()

N_ITERS = 10_000
N_EVAL_ITERS = 100
GAMMA = .9

policy, V = policy_iteration(mdp, GAMMA, N_ITERS, N_EVAL_ITERS)
print("POLICY ITERATION")
print(f"POLICY:\n{policy}")
print(f"V\n{V}")
policy, V = value_iteration(mdp, GAMMA, N_ITERS)
print("VALUE ITERATION")
print(f"POLICY:\n{policy}")
print(f"V\n{V}")

POLICY ITERATION
POLICY:
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
V
[0.5 0.5 0.5 0.5 0.5 0.5 0.5 0.5 0.5 0.5 0.5 0.5 0.5 0.5 0.5 0.5 0.5 0.5
 0.5 0.5]
VALUE ITERATION
POLICY:
[1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1]
V
[0.88608334 1.04407769 1.21962697 1.41468173 1.63140924 1.87221758
 2.1397824  2.43707665 2.76740359 3.13443353 3.54224457 3.99536795
 4.49883837 5.05824995 5.67981837 6.37044995 7.13781837 7.99044995
 8.93781837 9.99044995]
