In [1]:
import numpy as np

In [145]:
class LineWorld:
    def __init__(self):
        self.done = None
        self.current_state = 3  # État actuel
        self.end_good_state = 5 # État final
        self.end_bad_state = 0
        self.num_states = 5  # Nombre total d'états
        self.states = [i for i in range(6)]
        self.actions = [0, 1]
        self.num_actions = 2  # Nombre total d'actions possibles
        self.reward = 0  # Récompense actuelle
        self.line_world = ["_"] * (self.num_states - 1)
        self.line_world.insert(self.current_state-1,"X")
        self.P = {}
        self.initP()
        
    def setState(self, st):
        self.current_state = st

        
        
    def initP(self):
        for state in self.states:
            for action in self.actions:
                self.current_state = state
                state_, reward_, done_ = self.step(action)
                self.P[(state_, reward_, state, action)] = 1
        

    def isTerminateState(self):
        if (self.current_state == self.end_good_state) or (self.current_state == self.end_bad_state):
            return True
        else:
            return False
        
    def step(self, action):
        # Si l'action est 1, on avance à droite
        if (action == 1) and (self.isTerminateState() == False) :
            self.current_state += 1
            self.reward = 0  # Pas de récompense pour avancer
            self.line_world.remove("X")
            self.line_world.insert(self.current_state-1,"X")
            print(self.line_world)
        # Si l'action est 0, on avance à gauche
        elif (action == 0)and (self.isTerminateState() == False):
            self.current_state-= 1
            self.reward = 0  # Pas de récompense pour avancer
            self.line_world.remove("X")
            self.line_world.insert(self.current_state - 1, "X")
            print(self.line_world)
        # Si l'on atteint l'état final, la partie est terminée
        if self.current_state == self.end_good_state:
            self.reward = 1  # Récompense de 1 pour atteindre l'état final
            #print(self.line_world)
            self.done = True
        elif self.current_state == self.end_bad_state:
            self.reward = -3
            #print(self.line_world)
            self.done = True
        return self.current_state, self.reward, self.done

In [42]:
def evaluate_policy(line, V, policy, discount_factor=0.9):
    theta=1e-6

    
    while True:
        DELTA = 0
        for s in line.states:
            old_V = V[s]
            weight = 1 / len(policy[s])
            for action in policy[s]:
                total = 0
                for key in line.P:
                    (newState, reward, oldState, act) = key
                    if oldState == s and act == action:
                        total += weight*line.P[key]*(reward+discount_factor*V[newState])
            V[s] = total
            DELTA = max(DELTA, np.abs(old_V-V[s]))
            
        if DELTA < theta:
            return V            


In [149]:
def policy_improvement(line, V, policy, discount_factor=0.9):
    policy_stable = True
    newPolicy = {}
    for s in line.states:
        old_actions = policy[s]
        value = []
        newAction = []
        for action in old_actions:
            total = 0
            weight = 1 / len(policy[s])
            for key in line.P:
                (newState, reward, oldState, act) = key
                if oldState == s and act == action:
                    total += weight*line.P[key]*(reward+discount_factor*V[newState])
                    value.append(np.round(total, 2))
                    newAction.append(action)  
                    
        value = np.array(value) #Get the list of gotten value actions in the current state
        print(value)
        best = np.where(value == value.max())[0] #Get the position of the best gotten value action
        bestActions = [newAction[item] for item in best]
        newPolicy[s] = bestActions
        
        if old_actions != bestActions:
            policy_stable = False
    return policy_stable, newPolicy

In [151]:
def policy_iteration(line, discount_factor=0.9):
    
    #Initialize the policy
    V = np.random.random((len(line.states),))
    V[0] = 0
    V[line.num_states-1] = 0
    
    policy = {}
    for state in line.states:
        # equiprobable random strategy
        policy[state] = line.actions
    print("Policy : ")
    print(policy)
    
    policy_stable = False
    while not policy_stable:
        old_policy = policy.copy()
        
        print("Start of policy evaluation")
        #Evaluate the policy
        V = evaluate_policy(line, V, policy, discount_factor=0.9)
        print("V = ")
        print(V)
        
        print("Start of policy Improvement")
        #Improve the policy
        policy_stable, policy =  policy_improvement(line, V, policy, discount_factor=0.9)
        print("Policy : ")
        print(policy)

            
    return policy
 

In [152]:
if __name__ == '__main__':
    world = LineWorld()
    policy_iteration(world, discount_factor=0.9)

['_', '_', '_', 'X', '_']
['_', 'X', '_', '_', '_']
['X', '_', '_', '_', '_']
['_', '_', 'X', '_', '_']
['_', 'X', '_', '_', '_']
['_', '_', '_', 'X', '_']
['_', '_', 'X', '_', '_']
['_', '_', '_', '_', 'X']
Policy : 
{0: [0, 1], 1: [0, 1], 2: [0, 1], 3: [0, 1], 4: [0, 1], 5: [0, 1]}
Start of policy evaluation
V = 
[-2.72727202  0.08284091  0.18409091  0.40909091  0.90909091  0.90909091]
Start of policy Improvement
[-2.73 -2.73]
[-2.73  0.08]
[0.04 0.18]
[0.08 0.41]
[0.18 0.91]
[0.91 0.91]
Policy : 
{0: [0, 1], 1: [1], 2: [1], 3: [1], 4: [1], 5: [0, 1]}
Start of policy evaluation
V = 
[-2.72727271  1.32545455  1.47272727  1.63636364  1.81818182  0.90909091]
Start of policy Improvement
[-2.73 -2.73]
[1.33]
[1.47]
[1.64]
[1.82]
[0.91 0.91]
Policy : 
{0: [0, 1], 1: [1], 2: [1], 3: [1], 4: [1], 5: [0, 1]}
