# Programmation dynamique à horizon infini

In [1]:

# Question 1

graph = {
    0: [(1, 1, 1), (2, 5, 1), (3, -3, 1)],
    1: [(4, -2, 1), (5, 5, 1)],
    2: [(4, 2, 1), (5, 7, 1), (6, 4, 1)],
    3: [(5, 7, 1), (6, 3, 1)],
    4: [(7, 3, 1), (8, 5, 1)],
    5: [(7, -2, 1), (8, 1, 1), (9, 2, 1)],
    6: [(8, 0, 1), (9, 4, 1)],
    7: [(0, 0, 0), (0, 0, 0)],
    8: [(0, 0, 0), (0, 0, 0)],
    9: [(0, 0, 0), (0, 0, 0)]
}
graph


{0: [(1, 1, 1), (2, 5, 1), (3, -3, 1)],
 1: [(4, -2, 1), (5, 5, 1)],
 2: [(4, 2, 1), (5, 7, 1), (6, 4, 1)],
 3: [(5, 7, 1), (6, 3, 1)],
 4: [(7, 3, 1), (8, 5, 1)],
 5: [(7, -2, 1), (8, 1, 1), (9, 2, 1)],
 6: [(8, 0, 1), (9, 4, 1)],
 7: [(0, 0, 0), (0, 0, 0)],
 8: [(0, 0, 0), (0, 0, 0)],
 9: [(0, 0, 0), (0, 0, 0)]}

In [2]:
# Détermination de la valeur des états par récursivité inverse

max_t = 3
def mdp_value_function(graph, max_t):
    # Initialisation de la fonction de valeur pour chaque état du graphe
    value_function = {state: 0 for state in graph.keys()}
    #print(value_function)
    # Parcours des niveaux de l'arbre de décision de bas en haut
    for t in range(max_t, -1, -1):
        # Calcul de la fonction de valeur pour chaque état du graphe
        for state in graph.keys():
            if t == max_t:
                # La fonction de valeur pour les états terminaux est nulle
                value_function[state] = 0
            else:
                # Calcul de la valeur pour chaque action possible
                action_values = []
                for next_state, cost, probability in graph[state]:
                    next_value = value_function[next_state]
                    action_values.append(probability * (cost + next_value))
                # La valeur pour l'état courant est la meilleure valeur parmi les actions possibles
                
                value_function[state] = max(action_values)
    
    # Retourne la fonction de valeur pour chaque état du graphe
    return value_function

In [3]:
print(mdp_value_function(graph,max_t))
value_function = mdp_value_function(graph,max_t)

{0: 14, 1: 7, 2: 9, 3: 9, 4: 5, 5: 2, 6: 4, 7: 0, 8: 0, 9: 0}


In [4]:
# Calcul de la politique optimale, le chemin le plus long

# Notre fonction prend en parametre la value function de chaque etat
def find_optimal_path(graph, value_function):
  #  initialisation de l'état courant à l'état initial
    current_state = 0  # on commence à l'état initial
    path = [current_state]
    for k in range(0,max_t):
        best_action = None
        best_action_value = -float('inf')
        for next_state, cost, probability in graph[current_state]:
            next_value = value_function[next_state]
            action_value = probability * (cost + next_value)
            if action_value > best_action_value:
                best_action = next_state
                best_action_value = action_value
        # vérification si l'état courant est l'état terminal
        if best_action is None:  # on est arrivé à l'état terminal
            break
        path.append(best_action)

        #affectation de la meilleure action à l'état courant
        current_state = best_action  # on passe à l'état suivant
    return path
print(find_optimal_path(graph, value_function))

[0, 2, 5, 9]


# Programmation dynamique à horizon infini

In [5]:
# 1) Iteration de la valeur 

import numpy as np

# Définir les paramètres du MDP
n = 3
m = 4
# etat initiale
start_state = (0, 0)

# Matrices de Recompenses
rewards = np.zeros((n, m))

rewards[1, 3] = -1
rewards[0, 3] = 1

gamma = 0.9


# pour verifier la condition d'arrete
epsilon = 0.01

# Définir les probabilités de transition pour chaque action et couple d'états

def get_transition_probs(state, action):
    transitions = []
    row, col = state

    if action == 'up':
        next_state = (row - 1, col)
        if next_state[0] >= 0:
            transitions.append((0.8, next_state))
        if col > 0:
            transitions.append((0.1, (row, col - 1)))
        if col < m - 1:
            transitions.append((0.1, (row, col + 1)))
    elif action == 'down':
        next_state = (row + 1, col)
        if next_state[0] < n:
            transitions.append((0.8, next_state))
        if col > 0:
            transitions.append((0.1, (row, col - 1)))
        if col < m - 1:
            transitions.append((0.1, (row, col + 1)))
    elif action == 'left':
        next_state = (row, col - 1)
        if next_state[1] >= 0:
            transitions.append((0.8, next_state))
        if row > 0:
            transitions.append((0.1, (row - 1, col)))
        if row < n - 1:
            transitions.append((0.1, (row + 1, col)))
    elif action == 'right':
        next_state = (row, col + 1)
        if next_state[1] < m:
            transitions.append((0.8, next_state))
        if row > 0:
            transitions.append((0.1, (row - 1, col)))
        if row < n - 1:
            transitions.append((0.1, (row + 1, col)))
    else:
        raise ValueError('Invalid action')

    return transitions

# Initialiser les valeurs de tous les états à 0
values = np.zeros((n, m))

# Appliquer l'algorithme de programmation dynamique par itération de la valeur
while True:
  # Cette variable est utilisée pour suivre la différence maximale entre les nouvelles valeurs et les anciennes valeurs dans la matrice de valeurs à chaque itération de la boucle.
    delta = 0
    for i in range(n):
        for j in range(m):
            state = (i, j)
            max_value = float('-inf')
            # Etant dans un etat i,j on a un ensemble d'acion qu'on peut effectuer
            for action in ['up', 'down', 'left', 'right']:

                
                transitions = get_transition_probs(state, action)
                action_value = 0
                for prob, next_state in transitions:
                  # on accède aux recommpense dans la grille a partir de la valeur de l'etat suivant
                    next_value = values[next_state[0], next_state[1]]
                    action_value += prob * (rewards[next_state[0], next_state[1]] + gamma * next_value)
                if action_value > max_value:
                    max_value = action_value
            delta = max(delta, abs(values[i, j] - max_value))
            values[i, j] = max_value
    if delta < epsilon:
        break

# Afficher les valeurs de tous les états
print(values)


[[1.30447669 1.65641187 2.116166   1.61409446]
 [1.30628174 1.52539982 1.75138045 2.11977225]
 [1.06900002 1.32768543 1.4845216  1.15963505]]


In [11]:
# 2) iteration de la politique

# policy = np.random.choice(['up', 'down', 'left', 'right'], size=(n, m))
# policy

# Définir la fonction d'itération de politique ( Amelioration de la politique)
def improve_policy(policy, values):
    policy_stable = True
    for i in range(n):
        for j in range(m):
            state = (i, j)
            old_action = policy[state]
            best_action = None
            best_value = float('-inf')
            for action in ['up', 'down', 'left', 'right']:
                transitions = get_transition_probs(state, action)
                action_value = 0
                for prob, next_state in transitions:
                    next_value = values[next_state[0], next_state[1]]
                    action_value += prob * (rewards[next_state[0], next_state[1]] + gamma * next_value)
                if action_value > best_value:
                    best_value = action_value
                    best_action = action
            policy[state] = best_action
            if old_action != best_action:
                policy_stable = False
    return policy_stable


#algorithme d'itération de la valeur pour obtenir les valeurs de chaque état pour une politique donnée
def compute_state_values(policy,values, rewards, gamma, epsilon):
    n, m = rewards.shape

    # Initialiser les valeurs de tous les états à 0
  

    while True:
        delta = 0

        # Pour chaque état, calculer sa nouvelle valeur
        for i in range(n):
            for j in range(m):
                state = (i, j)
                old_value = values[state]

                # Calculer la nouvelle valeur en appliquant la politique courante
                new_value = 0
                for prob, next_state in get_transition_probs(state, policy[state]):
                    new_value += prob * (rewards[next_state[0], next_state[1]] + gamma * values[next_state[0], next_state[1]])

                # Mettre à jour la valeur de l'état courant
                values[state] = new_value

                # Calculer la différence entre les anciennes et nouvelles valeurs
                delta = max(delta, abs(old_value - new_value))

        # Si la convergence est atteinte, sortir de la boucle
        if delta < epsilon:
            break

    return values

# Initialiser les valeurs de tous les états à 0
values = np.zeros((n, m))

# Initialiser la politique avec une action aléatoire pour chaque état
policy = np.random.choice(['up', 'down', 'left', 'right'], size=(n, m)
)
# Appliquer l'algorithme d'itération de politique
policy_stable = False
i = 0
while not policy_stable:
    i = i + 1
    # Appliquer l'algorithme d'itération de la valeur pour obtenir les valeurs de chaque état
    values = compute_state_values(policy,values,rewards,gamma, epsilon)

    # Améliorer la politique
    print("iteration N° ", i)
    print(policy)
    policy_stable = improve_policy(policy, values)

# Afficher la politique finale



iteration N°  1
[['down' 'down' 'down' 'down']
 ['up' 'left' 'down' 'up']
 ['up' 'up' 'up' 'down']]
iteration N°  2
[['left' 'left' 'right' 'up']
 ['left' 'left' 'left' 'up']
 ['left' 'left' 'down' 'down']]
iteration N°  3
[['left' 'right' 'right' 'left']
 ['left' 'right' 'up' 'up']
 ['left' 'down' 'up' 'down']]
iteration N°  4
[['right' 'right' 'right' 'left']
 ['right' 'right' 'up' 'up']
 ['right' 'up' 'up' 'left']]
