# TP Reinforcement Learning : Programmation Dynamique et MDP

## Objectifs du TP
Ce TP vous permettra de :
- Comprendre et implémenter les concepts fondamentaux des MDP (Processus de Décision Markoviens)
- Mettre en pratique les algorithmes de programmation dynamique (Policy Evaluation, Policy Iteration, Value Iteration)
- Analyser l'impact des hyperparamètres (facteur d'actualisation γ, seuil de convergence θ)
- Visualiser et interpréter les résultats

## Environnement : FrozenLake 8x8
Vous allez travailler avec l'environnement FrozenLake de Gymnasium :
- **S** : Position de départ (Start)
- **F** : Surface gelée - sûre (Frozen)
- **H** : Trou - fin du jeu (Hole)
- **G** : Objectif - récompense +1 (Goal)

Actions possibles : 0=Gauche, 1=Bas, 2=Droite, 3=Haut

## Rappels du cours :
1. MDP défini par (S, A, P, R, γ)
2. Équation de Bellman pour V^π(s) et V*(s)
3. Policy Evaluation : calculer V^π pour une politique π donnée
4. Policy Improvement : améliorer π en choisissant l'action qui maximise Q(s,a)
5. Policy Iteration : alterner évaluation et amélioration
6. Value Iteration : combiner évaluation et amélioration en une seule mise à jour

## Installation et configuration de l'environnement

In [None]:
import numpy as np
import gymnasium as gym
import matplotlib.pyplot as plt


print("CRÉATION DE L'ENVIRONNEMENT FROZENLAKE 8x8")

# Créer l'environnement
env = gym.make('FrozenLake-v1', map_name='8x8', is_slippery=False)

# Afficher les informations
lake_map = [
    "SFFFFFFF",
    "FFFFFFFF",
    "FFFHFFFF",
    "FFFFFHFF",
    "FFFHFFFF",
    "FHHFFFHF",
    "FHFFHFHF",
    "FFFHFFFG"
]

print("\nCarte du FrozenLake :")
for row in lake_map:
    print(row)

print(f"\nNombre d'états : {env.observation_space.n}")
print(f"Nombre d'actions : {env.action_space.n}")
print("\nActions : 0=Gauche, 1=Bas, 2=Droite, 3=Haut")


## Partie 1 : Policy Evaluation

In [None]:
def policy_evaluation(env, policy, discount_factor=0.9, theta=1e-5):
    """
    Évalue une politique donnée en calculant sa fonction de valeur V^π(s).

    Rappel du cours :
    V^π(s) = Σ_a π(a|s) * Σ_{s',r} p(s',r|s,a) * [r + γ * V^π(s')]

    Args:
        env: L'environnement Gym
        policy: Politique à évaluer (matrice |S| x |A|)
        discount_factor: Facteur d'actualisation γ
        theta: Seuil de convergence

    Returns:
        V: Fonction de valeur (vecteur de taille |S|)
    """
    # TODO : Initialiser V(s) à zéro pour tous les états
    V = np.zeros(env.observation_space.n)

    iteration = 0
    while True:
        delta = 0

        # TODO 2: Pour chaque état s
        for s in range(len(V)):
            v = 0

            # TODO 3: Pour chaque action a possible dans l'état s
            for action, action_prob in enumerate(policy[s]):

                # TODO 4: Pour chaque transition possible (s' | s, a)
                # Indice : utiliser env.unwrapped.P[s][action]
                for transition_prob, next_state, reward, done in env.unwrapped.P[s][action]:

                    # TODO 5: Calculer la contribution de cette transition
                    # Formule : π(a|s) * p(s',r|s,a) * [r + γ * V(s')]
                    v += action_prob * transition_prob * (reward + discount_factor * V[next_state])

            # TODO 6: Calculer delta = max(delta, |v_ancien - v_nouveau|)
            delta = max(delta, abs(V[s] - v))

            # TODO 7: Mettre à jour V(s)
            V[s] = v

        iteration += 1

        # TODO 8: Condition d'arrêt
        if delta < theta:
            print(f"Convergence après {iteration} itérations")
            break

    return np.array(V)

In [None]:
# Test avec une politique aléatoire uniforme
print("\nTest : Évaluation d'une politique aléatoire")
random_policy = np.ones([env.observation_space.n, env.action_space.n]) / env.action_space.n
V_random = policy_evaluation(env, random_policy, discount_factor=0.9)

In [None]:
# Visualiser l'effet de gamma
def visualize_value_functions_by_gamma_pe(env, policy, gamma_list):
    fig, axes = plt.subplots(1, len(gamma_list), figsize=(5 * len(gamma_list), 6))

    for idx, gamma in enumerate(gamma_list):
        v_gamma = policy_evaluation(env, policy, discount_factor=gamma)
        value_grid = v_gamma.reshape(8, 8)

        ax = axes[idx]
        im = ax.imshow(value_grid, cmap='YlOrRd')
        ax.set_title(f'Value Function\nγ = {gamma}', fontsize=12)
        for i in range(8):
            for j in range(8):
                ax.text(j, i, f'{value_grid[i, j]:.3f}', ha='center', va='center',
                        fontsize=8, color='black')

    plt.tight_layout()
    plt.show()

In [None]:
discount_factors = [0.8, 0.9, 0.95, 1.0]
visualize_value_functions_by_gamma_pe(env, random_policy, discount_factors)

## Partie 2 : Policy Improvement

In [None]:
def policy_improvement(env, V, discount_factor=0.9):
    """
    Améliore une politique en la rendant greedy par rapport à V.

    Rappel du cours :
    π'(s) = argmax_a Σ_{s',r} p(s',r|s,a) * [r + γ * V(s')]

    Args:
        env: L'environnement Gym
        V: Fonction de valeur
        discount_factor: Facteur d'actualisation γ

    Returns:
        policy: Nouvelle politique améliorée (matrice |S| x |A|)
    """
    # TODO 9: Initialiser une nouvelle politique (déterministe)
    policy = np.zeros([env.observation_space.n, env.action_space.n])

    # TODO 10: Pour chaque état
    for s in range(env.observation_space.n):

        # TODO 11: Calculer Q(s, a) pour chaque action
        action_values = np.zeros(env.action_space.n)

        for a in range(env.action_space.n):

            # TODO 12: Pour chaque transition (s' | s, a)
            for transition_prob, next_state, reward, done in env.unwrapped.P[s][a]:

                # TODO 13: Calculer Q(s,a) = Σ p(s'|s,a) * [r + γ * V(s')]
                action_values[a] += transition_prob * (reward + discount_factor * V[next_state])

        # TODO 14: Choisir la meilleure action (politique greedy)
        best_action = np.argmax(action_values)
        policy[s][best_action] = 1.0

    return policy

## Partie 3 : Policy Iteration

In [None]:
def policy_iteration(env, discount_factor=0.9, max_iterations=200):
    """
    Algorithme de Policy Iteration complet.

    Principe :
    1. Initialiser une politique aléatoire
    2. Répéter :
       a. Policy Evaluation : calculer V^π
       b. Policy Improvement : obtenir π' greedy par rapport à V^π
       c. Si π' = π, arrêter (politique optimale trouvée)

    Args:
        env: L'environnement Gym
        discount_factor: Facteur d'actualisation γ
        max_iterations: Nombre max d'itérations

    Returns:
        policy: Politique optimale
        V: Fonction de valeur optimale
    """
    # TODO 15: Initialiser une politique aléatoire
    policy = np.ones([env.observation_space.n, env.action_space.n]) / env.action_space.n
    V = np.zeros(env.observation_space.n)

    for i in range(max_iterations):
        # TODO 16: Évaluer la politique actuelle
        V = policy_evaluation(env, policy, discount_factor)

        # TODO 17: Améliorer la politique
        new_policy = policy_improvement(env, V, discount_factor)

        # TODO 18: Vérifier la convergence (si la politique n'a pas changé)
        if np.array_equal(new_policy, policy):
            print(f"Politique optimale trouvée après {i+1} itérations")
            return policy, V

        policy = new_policy

    print(f"Nombre maximum d'itérations atteint ({max_iterations})")
    return policy, V

In [None]:
# Test de Policy Iteration
print("\nExécution de Policy Iteration...")
optimal_policy_pi, V_pi = policy_iteration(env, discount_factor=0.9)

In [None]:
# Visualiser l'effet de gamma
def visualize_value_functions_by_gamma_pi(env, policy, gamma_list):
    fig, axes = plt.subplots(1, len(gamma_list), figsize=(5 * len(gamma_list), 6))

    for idx, gamma in enumerate(gamma_list):
        print(f"γ = {gamma}")
        v_gamma = policy_evaluation(env, policy, discount_factor=gamma)
        value_grid = v_gamma.reshape(8, 8)

        ax = axes[idx]
        im = ax.imshow(value_grid, cmap='YlOrRd')
        ax.set_title(f'Value Function\nγ = {gamma}', fontsize=12)
        for i in range(8):
            for j in range(8):
                ax.text(j, i, f'{value_grid[i, j]:.3f}', ha='center', va='center',
                        fontsize=8, color='black')

    plt.tight_layout()
    plt.show()

In [None]:
discount_factors = [0.8, 0.9, 0.95, 1.0]
visualize_value_functions_by_gamma_pi(env, optimal_policy_pi, discount_factors)

## Partie 4 : Value Iteration

In [None]:
def value_iteration(env, discount_factor=0.9, theta=1e-6):
    """
    Algorithme de Value Iteration.

    Principe : Mise à jour combinant évaluation et amélioration
    V_{k+1}(s) = max_a Σ_{s',r} p(s',r|s,a) * [r + γ * V_k(s')]

    Args:
        env: L'environnement Gym
        discount_factor: Facteur d'actualisation γ
        theta: Seuil de convergence

    Returns:
        policy: Politique optimale extraite de V*
        V: Fonction de valeur optimale V*
    """
    V = np.zeros(env.observation_space.n)
    iteration = 0
    while True:
        delta = 0

        for s in range(len(V)):
            v = V[s]

            action_values = np.zeros(env.action_space.n)
            for a in range(env.action_space.n):
                for transition_prob, next_state, reward, done in env.unwrapped.P[s][a]:
                    action_values[a] += transition_prob * (reward + discount_factor * V[next_state])
            V[s] = np.max(action_values)
            delta = max(delta, abs(v - V[s]))
        iteration += 1
        if delta < theta:
            print(f"Convergence après {iteration} itérations")
            break
    optimal_policy = np.zeros([env.observation_space.n, env.action_space.n])

    return optimal_policy, V

In [None]:
# Test de Value Iteration
print("\nExécution de Value Iteration...")
optimal_policy_vi, V_vi = value_iteration(env, discount_factor=0.9)

In [None]:
# Experiment with different discount factors
print("\n=== Effect of Discount Factor ===")
discount_factors = [0.8, 0.9, 0.95, 1.0]
# TO DO

In [None]:
# Visualiser l'effet de gamma
def visualize_value_functions_by_gamma_vi(env, gamma_list):
    fig, axes = plt.subplots(1, len(gamma_list), figsize=(5 * len(gamma_list), 6))

    for idx, gamma in enumerate(gamma_list):
        _, v_gamma = value_iteration(env, discount_factor=gamma)
        value_grid = v_gamma.reshape(8, 8)

        ax = axes[idx]
        im = ax.imshow(value_grid, cmap='YlOrRd')
        ax.set_title(f'Value Function\nγ = {gamma}', fontsize=12)
        for i in range(8):
            for j in range(8):
                ax.text(j, i, f'{value_grid[i, j]:.3f}', ha='center', va='center',
                        fontsize=8, color='black')

    plt.tight_layout()
    plt.show()

In [None]:
discount_factors = [0.8, 0.9, 0.95, 1.0]
visualize_value_functions_by_gamma_vi(env, discount_factors)

## Partie 5 : Visualisation et analyse

In [None]:
def visualize_value_function(V, title="Fonction de Valeur"):
    """Affiche la fonction de valeur sous forme de grille."""
    value_grid = V.reshape(8, 8)

    plt.figure(figsize=(8, 8))
    plt.imshow(value_grid, cmap='YlOrRd')
    plt.colorbar(label='V(s)')
    plt.title(title, fontsize=14, fontweight='bold')

    for i in range(8):
        for j in range(8):
            state = i * 8 + j
            plt.text(j, i, f'{V[state]:.3f}',
                    ha='center', va='center', fontsize=9, color='black')

    plt.tight_layout()
    plt.show()


def visualize_policy(policy, title="Politique"):
    """Affiche la politique sous forme de flèches."""
    directions = ['←', '↓', '→', '↑']

    plt.figure(figsize=(8, 8))
    arrow_grid = np.full((8, 8), '', dtype='<U1')

    for s in range(env.observation_space.n):
        i, j = divmod(s, 8)
        best_action = np.argmax(policy[s])
        arrow_grid[i, j] = directions[best_action]

    table = plt.table(cellText=arrow_grid, loc='center', cellLoc='center',
                     bbox=[0, 0, 1, 1])
    table.scale(1, 1.5)
    table.auto_set_font_size(False)
    table.set_fontsize(16)

    plt.axis('off')
    plt.title(title, fontsize=14, fontweight='bold', pad=20)
    plt.tight_layout()
    plt.show()

In [None]:
# TODO 28:
## A changer selon ce qui doit être visualisé
visualize_value_function(V_vi, "Fonction de Valeur Optimale V*")
visualize_policy(optimal_policy_vi, "Politique Optimale π*")

## Partie 6 : Comparaison et analyse

#### Question 1
Comparez les résultats de Policy Iteration et Value Iteration.
Les politiques obtenues sont-elles identiques ?

#### Question 2
Comparez le nombre d'itérations nécessaires pour chaque algorithme.
   Lequel converge le plus rapidement ?

#### Question 3
Que se passe-t-il si on initialise policy_iteration avec la politique
   optimale au lieu d'une politique aléatoire ?

#### Question 4
Pourquoi la programmation dynamique nécessite-t-elle de connaître
   complètement le modèle de l'environnement (P et R) ?
   Quelles sont les limites de cette approche ?