# Intelligence Artificielle : Travaux Pratiques sur les Processus de Décision Markoviens

**Intelligence Artificielle : Travaux Pratiques sur les Processus de Décision Markoviens**

L'objectif est d'implémenter et d'évaluer les algorithmes d'itération de valeur et d'itération de politique pour les **Processus de Décision Markoviens** (MDP).

Nous étudions un petit exemple tiré du livre *Artificial Intelligence: A Modern Approach* de manière aussi générique que possible.

Nous modélisons l'environnement comme une zone rectangulaire carrelée de longueur **L** et de hauteur **H**. Vous pouvez réutiliser et adapter les structures de données et les fonctions du TP sur la recherche de chemin (*Path-finding lab*). Nous nous concentrons en particulier sur le petit environnement de **6×5** suivant :

![Environnement 6x5](env.png)

Le nombre dans chaque case représente la récompense immédiate obtenue en s'y déplaçant.  
Les cases noires sont des murs infranchissables. Notez que, comme dans le TP sur la recherche de chemin, nous supposons que l'environnement est entouré de murs.  
Les cases avec des récompenses **+1** et **-1** sont des **nœuds terminaux** : lorsque le robot les atteint, il ne peut plus se déplacer. Par conséquent, l'utilité de ces cases (à partir de l'itération 1) est simplement la récompense immédiate.

Nous supposons qu'à chaque fois que le robot essaie de se déplacer dans une direction, il y a **10 % de chances** qu'il aille à gauche (par rapport à la direction choisie) au lieu d'aller tout droit, et **10 % de chances** qu'il aille à droite (toujours par rapport à la direction choisie). Si cela le fait se heurter à un mur, il reste sur place (ne bouge pas).

In [69]:
### Imports
import numpy as np

## Value Iteration Algorithme

![Algo Value Iter](value_iter_algo.png)

---

### **Questions :**

**Q1.** Implémentez l'algorithme d'**itération de valeur** pour le MDP correspondant. Affichez la politique calculée à chaque itération en utilisant, par exemple, les caractères `'v'`, `'<'`, `'>'`, `'^'`. Affichez également le **nombre d'itérations nécessaires pour converger**. Utilisez les valeurs **γ = 0.99** et **ϵ = 0.01**.  

À la fin du calcul, affichez également les utilités calculées.

In [70]:
# Définir les actions principales et adjacentes
actions = ['UP', 'DOWN', 'LEFT', 'RIGHT']
arrows_actions = ['↑', '↓', '←', '→']

# Définir les probabilités des actions principales et secondaires
action_probs = {
    'UP': (0.8, 0.1, 0.1),       # (Proba UP, Proba LEFT, Proba RIGHT)
    'DOWN': (0.8, 0.1, 0.1),     # (Proba DOWN, Proba LEFT, Proba RIGHT)
    'LEFT': (0.8, 0.1, 0.1),     # (Proba LEFT, Proba UP, Proba DOWN)
    'RIGHT': (0.8, 0.1, 0.1)     # (Proba RIGHT, Proba UP, Proba DOWN)
}

# Association des actions adjacentes
adjacent_actions = {
    'UP': ['LEFT', 'RIGHT'],     # Mouvement principal UP et erreurs LEFT/RIGHT
    'DOWN': ['LEFT', 'RIGHT'],   # Mouvement principal DOWN et erreurs LEFT/RIGHT
    'LEFT': ['UP', 'DOWN'],      # Mouvement principal LEFT et erreurs UP/DOWN
    'RIGHT': ['UP', 'DOWN']      # Mouvement principal RIGHT et erreurs UP/DOWN
}

# Fonction pour appliquer une action, avec gestion des murs et des bords
def get_next_state(state, action, rewards, H, L):
    x, y = state

    if action == 'UP':
        next_state = (max(0, x - 1), y)
    elif action == 'DOWN':
        next_state = (min(H - 1, x + 1), y)
    elif action == 'LEFT':
        next_state = (x, max(0, y - 1))
    elif action == 'RIGHT':
        next_state = (x, min(L - 1, y + 1))
    else:
        next_state = state  # Sécurité

    # Si on frappe un mur, rester sur place
    if np.isnan(rewards[next_state[0], next_state[1]]):
        return state

    return next_state

# Fonction pour vérifier si un état est terminal
def is_terminal(state, rewards):
    x, y = state
    return rewards[x][y] in [1, -1]

# Implémentation de l'algorithme Value Iteration
def value_iteration(gamma, epsilon, rewards, H, L):
    utilities = np.zeros((H, L))  # Initialisation des valeurs d'utilité

    # Terminaux
    utilities[0][3] = 1    # Récompense +1
    utilities[1][3] = -1   # Récompense -1

    policy = np.full((H, L), '')  # Initialisation de la politique
    
    iteration = 0
    while True:
        iteration += 1
        delta = 0
        new_utilities = np.copy(utilities)
        
        for x in range(H):
            for y in range(L):
                if is_terminal((x, y), rewards) or np.isnan(rewards[x][y]):  # Ignorer les états terminaux et les murs
                    continue

                # Calcul de l'utilité pour chaque action
                action_utilities = []
                for action in actions:
                    # Récupérer les probabilités et mouvements associés
                    prob_main, prob_adj1, prob_adj2 = action_probs[action]
                    adj1, adj2 = adjacent_actions[action]

                    # Calculer les états suivants pour cette action
                    next_main = get_next_state((x, y), action, rewards, H, L)
                    next_adj1 = get_next_state((x, y), adj1, rewards, H, L)
                    next_adj2 = get_next_state((x, y), adj2, rewards, H, L)

                    # Utilité escomptée pour cette action
                    expected_utility = (
                        prob_main * utilities[next_main[0], next_main[1]] +
                        prob_adj1 * utilities[next_adj1[0], next_adj1[1]] +
                        prob_adj2 * utilities[next_adj2[0], next_adj2[1]]
                    )
                    action_utilities.append(expected_utility)

                # Mise à jour de l'utilité avec la meilleure action
                best_action_utility = max(action_utilities)
                new_utilities[x][y] = rewards[x][y] + gamma * best_action_utility
                policy[x][y] = arrows_actions[np.argmax(action_utilities)]
                delta = max(delta, abs(new_utilities[x][y] - utilities[x][y]))
        
        utilities = new_utilities
        # Vérifier la condition de convergence
        if delta < epsilon * (1 - gamma) / gamma:
            break
    
    # Mettre à jour la politique pour les états terminaux et le mur
    policy[0][3] = '*'
    policy[1][3] = '*'
    policy[np.isnan(rewards)] = '#'

    return utilities, policy, iteration

In [71]:
# Définition de l'environnement
L, H = 4, 3

reward_default = -0.04
rewards = np.full((H, L), reward_default)
rewards[0][3] = 1    # Récompense +1
rewards[1][3] = -1   # Récompense -1

# Créer le mur de l'environnement au centre
rewards[1, 1] = np.nan

# Afficher les récompenses à travers l'environnement
print(rewards)

[[-0.04 -0.04 -0.04  1.  ]
 [-0.04   nan -0.04 -1.  ]
 [-0.04 -0.04 -0.04 -0.04]]


In [72]:
# Hyperparamètres de VIA (Value Iteration Algorithm)
gamma = 0.99
epsilon = 0.01

# Exécuter Value Iteration
utilities, policy, iterations = value_iteration(gamma, epsilon, rewards, H, L)

# Affichage des résultats
print("Valeurs d'utilité calculées :")
print(utilities)
print("\nPolitique optimale :")
for row in policy:
    print(' '.join(row))
print(f"\nNombre d'itérations nécessaires : {iterations}")

Valeurs d'utilité calculées :
[[ 0.7761849   0.8439351   0.9050959   1.        ]
 [ 0.71662996  0.          0.64132736 -1.        ]
 [ 0.65064215  0.59261419  0.56005111  0.33800045]]

Politique optimale :
→ → → *
↑ # ↑ *
↑ ← ↑ ←

Nombre d'itérations nécessaires : 20


---

**Q2.** Expérimentez avec d'autres récompenses. Essayez en particulier de faire varier la récompense des cases "normales".  
- Que se passe-t-il lorsqu'elle est **positive** ?  
- Et lorsqu'elle est beaucoup plus négative que **-0.04** (par exemple **-2**) ?  
- Pouvez-vous trouver des situations intermédiaires intéressantes ?

In [73]:
# Définition de l'environnement
L, H = 4, 3

reward_default = 0.1
rewards = np.full((H, L), reward_default)
rewards[0][3] = 1    # Récompense +1
rewards[1][3] = -1   # Récompense -1

# Créer le mur de l'environnement au centre
rewards[1, 1] = np.nan

# Afficher les récompenses à travers l'environnement
print(rewards)

[[ 0.1  0.1  0.1  1. ]
 [ 0.1  nan  0.1 -1. ]
 [ 0.1  0.1  0.1  0.1]]


In [74]:
# Hyperparamètres de VIA (Value Iteration Algorithm)
gamma = 0.99
epsilon = 0.01

# Exécuter Value Iteration
utilities, policy, iterations = value_iteration(gamma, epsilon, rewards, H, L)

# Affichage des résultats
print("Valeurs d'utilité calculées :")
print(utilities)
print("\nPolitique optimale :")
for row in policy:
    print(' '.join(row))
print(f"\nNombre d'itérations nécessaires : {iterations}")

Valeurs d'utilité calculées :
[[ 9.99006829  9.99006829  9.99006829  1.        ]
 [ 9.99006829  0.          9.99006829 -1.        ]
 [ 9.99006829  9.99006829  9.99006829  9.99006829]]

Politique optimale :
↑ ↑ ← *
↑ # ← *
↑ ↑ ↑ ↓

Nombre d'itérations nécessaires : 680


- cas avec rewards > 0 : cherche à ne pas terminer pour maximiser son gain (ARGENT)
- cas avec rewards << 0 : cherche à se suicider, donc terminer au plus vite

## Policy Iteration Algorithme

![Algo Policy Iteration](policy_iter_algo.png)

---

**Q3.** Implémentez l'algorithme d'**itération de politique** pour le MDP correspondant. Utilisez une version simplifiée de la fonction écrite pour la question Q1 afin de calculer les utilités des politiques jusqu'à **ϵ**.  

Affichez la politique calculée à chaque itération et le **nombre d'itérations nécessaires pour converger**. Comparez avec l'itération de valeur.

In [75]:
# Évaluation de la politique (Policy Evaluation)
def policy_evaluation(policy, gamma, epsilon, rewards, H, L):
    utilities = np.zeros((H, L))  # Initialisation des valeurs d'utilité

    utilities[0][3] = 1    # Récompense +1
    utilities[1][3] = -1   # Récompense -1

    while True:
        delta = 0
        new_utilities = np.copy(utilities)
        for x in range(H):
            for y in range(L):
                if is_terminal((x, y), rewards) or np.isnan(rewards[x][y]):  # Ignorer les états terminaux et les murs
                    continue
                
                action = policy[x][y]  # Action choisie par la politique actuelle
                if action == '':
                    continue  # Si aucune action n'est définie, passez

                # Récupérer les probabilités et mouvements associés
                prob_main, prob_adj1, prob_adj2 = action_probs[action]
                adj1, adj2 = adjacent_actions[action]

                # Calculer les états suivants pour cette action
                next_main = get_next_state((x, y), action, rewards, H, L)
                next_adj1 = get_next_state((x, y), adj1, rewards, H, L)
                next_adj2 = get_next_state((x, y), adj2, rewards, H, L)

                # Mettre à jour l'utilité de l'état actuel pour cette politique
                expected_utility = (
                    prob_main * utilities[next_main[0], next_main[1]] +
                    prob_adj1 * utilities[next_adj1[0], next_adj1[1]] +
                    prob_adj2 * utilities[next_adj2[0], next_adj2[1]]
                )
                new_utilities[x][y] = rewards[x][y] + gamma * expected_utility
                delta = max(delta, abs(new_utilities[x][y] - utilities[x][y]))

        utilities = new_utilities
        if delta < epsilon:  # Convergence des utilités
            break

    return utilities

# Implémentation de l'algorithme d'itération de politique (Policy Iteration)
def policy_iteration(gamma, epsilon, rewards, H, L):
    policy = np.full((H, L), 'UP', dtype=object)  # Initialiser une politique arbitraire
    policy[0][3] = '*'  # États terminaux
    policy[1][3] = '*'
    policy[np.isnan(rewards)] = '#'  # Murs

    iterations = 0
    while True:
        iterations += 1

        # Étape 1 : Évaluation de la politique actuelle
        utilities = policy_evaluation(policy, gamma, epsilon, rewards, H, L)

        # Étape 2 : Amélioration de la politique
        policy_stable = True
        new_policy = np.copy(policy)

        for x in range(H):
            for y in range(L):
                if is_terminal((x, y), rewards) or np.isnan(rewards[x][y]):  # Ignorer les états terminaux et les murs
                    continue

                # Calcul de l'utilité pour chaque action
                action_utilities = []
                for action in actions:
                    # Récupérer les probabilités et mouvements associés
                    prob_main, prob_adj1, prob_adj2 = action_probs[action]
                    adj1, adj2 = adjacent_actions[action]

                    # Calculer les états suivants pour cette action
                    next_main = get_next_state((x, y), action, rewards, H, L)
                    next_adj1 = get_next_state((x, y), adj1, rewards, H, L)
                    next_adj2 = get_next_state((x, y), adj2, rewards, H, L)

                    # Utilité escomptée pour cette action
                    expected_utility = (
                        prob_main * utilities[next_main[0], next_main[1]] +
                        prob_adj1 * utilities[next_adj1[0], next_adj1[1]] +
                        prob_adj2 * utilities[next_adj2[0], next_adj2[1]]
                    )
                    action_utilities.append(expected_utility)

                # Déterminer la meilleure action et comparer à la politique courante
                best_action_index = np.argmax(action_utilities)
                best_action = actions[best_action_index]

                # Mettre à jour la politique (changement de direction si nécessaire)
                if best_action != policy[x][y]:
                    policy_stable = False
                    new_policy[x][y] = best_action

        policy = new_policy

        # print(f"Politique après iteration {iterations}:")
        # for row in policy:
        #     print(" ".join(row))
        # print("\n")

        if policy_stable:  # Si la politique ne change plus, arrêt
            break

    return policy, utilities, iterations

In [76]:
# Définition de l'environnement
L, H = 4, 3

reward_default = -0.04
rewards = np.full((H, L), reward_default)
rewards[0][3] = 1    # Récompense +1
rewards[1][3] = -1   # Récompense -1

# Créer le mur de l'environnement au centre
rewards[1, 1] = np.nan

# Afficher les récompenses à travers l'environnement
print(rewards)

[[-0.04 -0.04 -0.04  1.  ]
 [-0.04   nan -0.04 -1.  ]
 [-0.04 -0.04 -0.04 -0.04]]


In [77]:
policy, utilities, iterations = policy_iteration(gamma, epsilon, rewards, H, L)

print("Politique optimale obtenue (itération de politique) :")

row_to_arrow = {"UP": "↑", "DOWN": "↓", "LEFT": "←", "RIGHT": "→", "*": "*", "#": "#"}
for row in policy:
    print(" ".join([row_to_arrow[action] for action in row]))

print("\nValeurs d'utilité correspondantes :")
print(utilities)

print(f"\nNombre d'itérations nécessaires : {iterations}")

Politique optimale obtenue (itération de politique) :
→ → → *
↑ # ↑ *
↑ ← ↑ ←

Valeurs d'utilité correspondantes :
[[ 0.77608967  0.84393198  0.90509519  1.        ]
 [ 0.71632498  0.          0.64132534 -1.        ]
 [ 0.6486454   0.58727827  0.55845133  0.33504264]]

Nombre d'itérations nécessaires : 4


Je n'ai pas pris une stratégie aléatoirement au début (j'ai mis tout à UP) en revanche on voit que le policy iteration à bien moins d'itérations que le Value Iteration.