### 1. Modified Policy Iteration

#### Rappel

L'algorithme de Modified Policy Iteration reprendre exactement la même structure que celui de Policy Iteration. La seule différence entre ses deux algorithmes est la taĉhe de <b>prédiction</b>, qui, dans le cas du MPI, n'est pas faite complètement. En effet, on part du principe que $k$ updates de $V$ suffisent à déterminer les meilleurs actions pour chaque state dans la phase de <b>contrôle</b>. Autrement dit, on n'a pas besoin d'attendre une convergence parfaite de $V$ à $v_\pi$ afin d'enchaîner sur la phase de <b>contrôle</b>.

#### Environnement

In [1]:
class GridworldRow():
    def __init__(self, size=5):
        self.size = size

        self.nS = size
        self.nA = 2

        self.MAX_X = size-1

        P = {}
        
        for s in range(self.nS):                
            dynamics_s = {}
            
            for a in range(self.nA):
                s_prime_list = []

                p = 1 if s != 0 and s != self.nS-1 else 0

                if(a == 0):
                    s_prime = max(0, s-1)
                else:
                    s_prime = min(self.MAX_X, s+1)

                if (s_prime == 0):
                    reward = -100
                    done = True
                elif(s_prime == self.MAX_X):
                    reward = 10
                    done = True
                else:
                    reward = 0
                    done = False

                s_prime_list.append((p, s_prime, reward, done))
                dynamics_s.update({a:s_prime_list})

            P.update({s: dynamics_s})
            
        self.P = P 

#### Algorithme

In [2]:
import numpy as np

In [3]:
env = GridworldRow()

In [4]:
def compute_q_value_for_s_a(env, V, s, a, gamma):
    q = 0
    
    for (p_sPrime, sPrime, r_ss_a, done) in env.P[s][a]:
        q += p_sPrime * (r_ss_a + gamma * V[sPrime])
        
    return q

In [5]:
def evaluate_policy(env, pi, V, gamma, k):    
    V_updated = np.copy(V)
    improved = True
    
    for i in range(k):      
        for s in range(env.nS):
            V_new = 0
        
            for a in range(env.nA):
                prob_a = pi[s][a]
                q_s_a = compute_q_value_for_s_a(env, V_updated, s, a, gamma)
            
                V_new += prob_a * q_s_a
        
            V_updated[s] = V_new
    
    if(np.array_equal(V, V_updated)):
        improved = False
    
    return V_updated, improved

def improve_policy(env, pi, V, gamma):    
    for s in range(env.nS):
        q_s = np.zeros([env.nA, 1])
    
        for a in range(env.nA):
            q_s[a] = compute_q_value_for_s_a(env, V, s, a, gamma)
        
        best_a = np.argmax(q_s)
        pi[s] = np.eye(env.nA)[best_a]
    
    return pi

In [6]:
pi = np.ones([env.nS, env.nA]) * 0.5
V = np.zeros([env.nS, 1])

k = 5 #paramètre du MPI
gamma = 0.99 #facteur de remise du return

In [7]:
i = 0
while True:
    i+=1
    
    V, improved = evaluate_policy(env, pi, V, gamma, k)
    pi = improve_policy(env, pi, V, gamma)
    
    if(improved == False):
        print("Terminé après " + str(i) + " iterations.")
        break

Terminé après 3 iterations.


In [8]:
pi

array([[1., 0.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [1., 0.]])