# Devoir 2

In [3]:
import numpy

## Environnements simples et solutions exactes

### [15 points] Implémentation d'un environnement

Considérez l'environnement en grille $5 \times 5$ suivant :
$$
\begin{array}{|c|c|c|c|c|}
\hline \\[-7pt]
\phantom{\text{A}} & \text{A} & \phantom{\text{A}} & \text{B} & \phantom{\text{A}} \\[2pt]
\hline \\[-7pt]
\phantom{\text{A}} & \phantom{\text{A}} & \phantom{\text{A}} & \phantom{\text{A}} & \phantom{\text{A}} \\[2pt]
\hline \\[-7pt]
\phantom{\text{A}} & \phantom{\text{A}} & \phantom{\text{A}} & \text{B'} & \phantom{\text{A}} \\[2pt]
\hline \\[-7pt]
\phantom{\text{A}} & \phantom{\text{A}} & \phantom{\text{A}} & \phantom{\text{A}} & \phantom{\text{A}} \\[2pt]
\hline \\[-7pt]
\phantom{\text{A}} & \text{A'} & \phantom{\text{A}} & \phantom{\text{A}} & \phantom{\text{A}} \\[2pt]
\hline
\end{array}
$$

L'environnement est caractérisé par l'ensemble d'actions $\mathcal A := \{ \uparrow, \downarrow, \rightarrow, \leftarrow \}$ visant à déplacer l'agent d'une case dans la direction indiquée par la flèche.

Considérez des déplacements déterministes :
- si $S_t = \text{A}$, alors $S_{t+1} = \text{A'}$ (peu importe l'action);
- si $S_t = \text{B}$, alors $S_{t+1} = \text{B'}$ (peu importe l'action);
- si l'action fait sortir l'agent de la grille, alors $S_{t+1} = S_t$;
- sinon appliquer l'action.

Considérez la fonction de récompense suivante :
- si $S_t = \text{A}$, alors $R_{t+1} = R_{\text{A}}$;
- si $S_t = \text{B}$, alors $R_{t+1} = R_{\text{B}}$;
- si l'action fait sortir l'agent de la grille, alors $R_{t+1} = -1$;
- sinon $R_{t+1} = 0$,

où $R_{\text{A}}$ et $R_{\text{B}}$ sont des valeurs spécifiées à l'initialisation de l'environnement.

Complétez le code suivant pour implémenter cet environnement. Votre classe doit contenir une fonction _expected\_reward_ retournant la récompense attendue en suivant la politique spécifiée dans un état donné. Votre classe doit également contenir une fonction _prob\_transition\_policy_ retournant la probabilité de transitionner vers un prochain état donné en suivant la politique spécifiée dans un état courant donné.

Vous pouvez implémenter d'autres fonctions utilitaires à votre guise. Les manières de représenter les états et la politique sont laissées à votre discrétion.

In [4]:
class Gridworld:
    
    def __init__(self, reward_a, reward_b):
        self.reward_a = reward_a
        self.reward_b = reward_b
        
        self.rows = 5
        self.cols = 5

        self.A = (0, 1)      # position de A
        self.A_prime = (4, 1)  # position de A'
        self.B = (0, 3)      # position de B
        self.B_prime = (2, 3)  # position de B'

        self.actions = {
            'up': (-1, 0),
            'down': (1, 0),
            'left': (0, -1),
            'right': (0, 1)
        }

    def step(self, state, action):

        # Si state est A ou B
        if state == self.A:
            return self.A_prime, self.reward_a
        if state == self.B:
            return self.B_prime, self.reward_b
        
        # Mouvement standard : calcul de la nouvelle position
        delta = self.actions[action]
        next_state = (state[0] + delta[0], state[1] + delta[1])
        
        # Vérifier si le nouvel état est hors grille
        if not (0 <= next_state[0] < self.rows and 0 <= next_state[1] < self.cols):
            # Si l'agent sort de la grille, il reste au même endroit et reçoit -1
            return state, -1
        
        # Sinon, le déplacement est normal et la récompense est nulle
        return next_state, 0
    
    def expected_reward(self, state, policy):
        # Pour les états A et B, la récompense est déterminée indépendamment de l'action
        if state == self.A:
            return self.reward_a
        if state == self.B:
            return self.reward_b
    
        exp_reward = 0
        # Parcourir toutes les actions possibles dans l'état selon la politique
        for action, prob in policy[state].items():
            _, reward = self.step(state, action)
            exp_reward += prob * reward
        return exp_reward
    
    def prob_transition_policy(self, next_state, state, policy):
        # Pour les états spéciaux A et B, le prochain état est fixé
        if state == self.A:
            return 1 if next_state == self.A_prime else 0
        if state == self.B:
            return 1 if next_state == self.B_prime else 0
        
        total_prob = 0
        # Somme des probabilités de toutes les actions conduisant à next_state
        for action, prob in policy[state].items():
            ns, _ = self.step(state, action)
            if ns == next_state:
                total_prob += prob
        return total_prob

Expliquez comment vous représentez une politique et comment vous représentez les états dans votre implémentation.

Les états sont représentés par un tuple (i, j) où i représente l'indice de la ligne et j celui de la colonne. Ils sont tous une position dans la grille 5 x 5. L'état A est représenté par le tuple (0, 1) et A' par (4, 1).

La politique est représentée par un dictionnaire où les clés sont les états possibles dans la grille et les valeurs sont un dictionnaire des 4 mouvements possibles avec leur probabilité associée dans cet état.

### [10] points] Résolution exacte du système linéaire

Résolvez le système linéaire résultant de cet environnement pour calculer la fonction de valeur d'état associée à la politique $\pi(a | s) = 1/4$ pour tout $a \in \mathcal A$, $s \in \mathcal S$. Considérez un taux d'actualisation $\gamma = 0.9$.

Affichez votre résultat sous la forme d'une matrice $5 \times 5$, où la cellule $(i, j)$ correspond à l'emplacement $(i, j)$ dans l'environnement en grille illustré plus haut.

In [5]:
gamma = 0.9
reward_A = 10
reward_B = 5

grid = Gridworld(reward_A, reward_B)

states = [(i, j) for i in range(grid.rows) for j in range(grid.cols)]
n_states = len(states)

def state_to_index(state):
    """Associe à un état (i,j) son indice dans [0, n_states - 1]."""
    i, j = state
    return i * grid.cols + j

# --- Construction du système linéaire A * V = b ---
A_mat = numpy.zeros((n_states, n_states))
b_vec = numpy.zeros(n_states)

for state in states:
    idx = state_to_index(state)
    A_mat[idx, idx] = 1
    # Cas particuliers des états A et B (transitions déterministes)
    if state == grid.A:
        A_mat[idx, state_to_index(grid.A_prime)] = -gamma
        b_vec[idx] = reward_A
    elif state == grid.B:
        A_mat[idx, state_to_index(grid.B_prime)] = -gamma
        b_vec[idx] = reward_B

    else:
        # Pour un état ordinaire, on part de l'équation : 
        for action in grid.actions.keys():
            next_state, r = grid.step(state, action)
            idx_ns = state_to_index(next_state)
            A_mat[idx, idx_ns] -= gamma/4
            b_vec[idx] += r/4

V = numpy.linalg.solve(A_mat, b_vec)
V_matrix = V.reshape(grid.rows, grid.cols)
print(numpy.round(V_matrix, 3))

[[ 3.309  8.789  4.428  5.322  1.492]
 [ 1.522  2.992  2.25   1.908  0.547]
 [ 0.051  0.738  0.673  0.358 -0.403]
 [-0.974 -0.435 -0.355 -0.586 -1.183]
 [-1.858 -1.345 -1.229 -1.423 -1.975]]


## Programmation dynamique

### [10 points] Évaluation de politique

Complétez le code suivant pour implémenter la stratégie d'évaluation de politique par programmation dynamique. Votre fonction doit recevoir en entrée l'environnement à résoudre, la politique à évaluer, le taux d'actualisation $\gamma$, ainsi qu'un seuil de tolérance à l'erreur $\theta$ en-dessous duquel l'algorithme peut terminer. Votre fonction doit retourner la fonction de valeur d'état sous la forme d'un vecteur de longueur $|\mathcal S|$.

In [6]:
def policy_evaluation(gridworld, pi, gamma, theta):
    rows, cols = gridworld.rows, gridworld.cols
    n_states = rows * cols
    V = numpy.zeros(n_states)

    delta = float('inf')
    while delta >= theta:
        delta = 0
        # Parcourir tous les états
        for i in range(rows):
            for j in range(cols):
                state = (i, j)
                idx = state_to_index(state)
                v = V[idx]
                new_v = 0
                # Calcul de l'espérance sur toutes les actions selon la politique pi
                for action, prob in pi[state].items():
                    next_state, reward = gridworld.step(state, action)
                    new_v += prob * (reward + gamma * V[state_to_index(next_state)])
                V[idx] = new_v
                delta = max(delta, abs(v - new_v))
    return V

Utilisez votre méthode pour évaluer la valeur des états sous la politique $\pi(a|s) = 1/4$ pour tout $a \in \mathcal A$, $s \in \mathcal S$, dans l'environnement en grille implémenté précédemment. Considérez un taux d'actualisation $\gamma = 0.9$ et un seuil de tolérance $\theta = 10^{-5}$.

Affichez votre résultat sous la forme d'une matrice $5 \times 5$, où la cellule $(i, j)$ correspond à l'emplacement $(i, j)$ dans l'environnement en grille illustré plus haut.

In [7]:
reward_A = 10
reward_B = 5
gamma = 0.9
theta = 1e-5

grid = Gridworld(reward_A, reward_B)

policy = {}
for i in range(grid.rows):
    for j in range(grid.cols):
        # Pour chaque état (i,j), la probabilité de prendre n'importe quelle action est 1/4
        policy[(i, j)] = {'up': 0.25, 'down': 0.25, 'left': 0.25, 'right': 0.25}

V = policy_evaluation(grid, policy, gamma, theta)

# Conversion du vecteur V en une matrice 5x5 (ordre row-major)
V_matrix = V.reshape((grid.rows, grid.cols))
print(numpy.round(V_matrix, 3))

[[ 3.309  8.789  4.428  5.322  1.492]
 [ 1.522  2.992  2.25   1.908  0.547]
 [ 0.051  0.738  0.673  0.358 -0.403]
 [-0.974 -0.435 -0.355 -0.586 -1.183]
 [-1.858 -1.345 -1.229 -1.423 -1.975]]


### [15 points] Itération de valeur

Complétez le code suivant pour implémenter la stratégie d'itération de valeurs permettant d'identifier la fonction de valeur de la politique optimale par programmation dynamique. Votre fonction doit recevoir en entrée l'environnement à résoudre, le taux d'actualisation $\gamma$, ainsi qu'un seuil de tolérance à l'erreur $\theta$ en-dessous duquel l'algorithme peut terminer. Votre fonction doit retourner la fonction de valeur d'état sous la forme d'un vecteur de longueur $|\mathcal S|$.

In [8]:
def value_iteration(gridworld, gamma, theta):
    rows, cols = gridworld.rows, gridworld.cols
    n_states = rows * cols
    V = numpy.zeros(n_states)

    delta = float('inf')
    while delta >= theta:
        delta = 0
        # Parcourir tous les états
        for i in range(rows):
            for j in range(cols):
                state = (i, j)
                idx = state_to_index(state)
                v = V[idx]
                
                # Estimation de la valeur optimale pour l'état courant
                v_actions = []
                for action in gridworld.actions.keys(): # Parcourir toutes les actions possibles
                    next_state, reward = gridworld.step(state, action)
                    v_actions.append(reward + gamma * V[state_to_index(next_state)]) # Estimation de la valeur pour l'action courante

                v_optim = max(v_actions) # Valeur optimale pour l'état courant
                V[idx] = v_optim # Mise à jour de la valeur de l'état courant
                delta = max(delta, abs(v - v_optim)) # Mise à jour de delta
    return V               


Utilisez votre méthode pour évaluer la valeur des états sous la politique optimale dans l'environnement en grille implémenté précédemment. Considérez un taux d'actualisation $\gamma = 0.9$ et un seuil de tolérance $\theta = 10^{-5}$.

Affichez votre résultat sous la forme d'une matrice $5 \times 5$, où la cellule $(i, j)$ correspond à l'emplacement $(i, j)$ dans l'environnement en grille illustré plus haut.

In [11]:
reward_A = 10
reward_B = 5
gamma = 0.9
theta = 1e-5

grid = Gridworld(reward_A, reward_B)

V = value_iteration(grid, gamma, theta)

# Conversion du vecteur V en une matrice 5x5 (ordre row-major)
V_matrix = V.reshape((grid.rows, grid.cols))
print(numpy.round(V_matrix, 3))

[[21.977 24.419 21.977 19.419 17.477]
 [19.78  21.977 19.78  17.802 16.022]
 [17.802 19.78  17.802 16.022 14.419]
 [16.022 17.802 16.022 14.419 12.977]
 [14.419 16.022 14.419 12.977 11.68 ]]


Énumérez toutes les situations dans lesquelles un agent suivant la politique optimale traverse l'état $\text{B}$. Justifiez votre réponse en référant aux valeurs d'états affichées précédemment.

Les deux seules situations qui mènent à traverser l’état B sont les suivantes : soit la position (0, 3), c’est-à-dire B directement, soit la position (0, 4).

Dans toutes les autres positions, la politique optimale estime qu’il est plus avantageux de se diriger vers l’état A, qui rapporte une récompense plus élevée. Ainsi, la valeur des états en se rapprochant de A est toujours supérieure à celle des états menant vers B.

__[IFT-7201]__ Évaluez maintenant la valeur des états sous la politique optimale en considérant un taux d'actualisation $\gamma = 0.2$ (conserver le même seuil de tolérance $\theta = 10^{-5}$).

Affichez votre résultat sous la forme d'une matrice $5 \times 5$, où la cellule $(i, j)$ correspond à l'emplacement $(i, j)$ dans l'environnement en grille illustré plus haut.

__[IFT-7201]__ Expliquez l'impact de cette réduction taux d'actualisation sur la politique optimale.

__[IFT-7201]__ Quelle est la valeur entière minimale de $R_{\text{A}}$ permettant de retrouver, avec $\gamma = 0.2$, la même politique optimale que celle calculée précédemment avec $\gamma = 0.9$?

Supportez votre réponse en affichant les valeurs d'états sous la forme d'une matrice $5 \times 5$, où la cellule $(i, j)$ correspond à l'emplacement $(i, j)$ dans l'environnement en grille illustré plus haut.

__[IFT-7201]__ L'expérience précédente permet d'exposer une relation entre l'amplitude des récompenses, la valeur du taux d'actualisation et la politique optimale résultante. Décrivez cette relation.

## Contrôle tabulaire

### [10 points] Environnement

Le code suivant permet d'instancier un environnement de type CartPole-v1 provenant de la librairie Gymnasium d'OpenAI :

In [12]:
import gymnasium as gym

environment = gym.make("CartPole-v1")

Dans cet environnement, le but de l'agent consiste à déplacer une plateforme de manière à faire tenir en équilibre un poteau sur la plateforme. Les états sont des vecteurs représentant la position de la plateforme et du poteau. La récompense est de 1 pour chaque état où le poteau est encore sur la plateforme et tombe à 0 lorsque le poteau tombe, ce qui sonne la fin de l'épisode.

L'environnement dispose de différents attributs permettant, par exemple, de retrouver la dimension d'un état $s \in \mathcal S$ et la dimension de l'espace d'actions $\mathcal A$ dans l'environnement :

In [13]:
# observation_space : environment attribute qualifying the state space
# continuous state space : shape attribute
observation_dim = environment.observation_space.shape

# action_space : environment attribute qualifying the action space
# discrete action space : n attribute indicating the number of actions
# actions : 0...(n-1)
n_actions = environment.action_space.n # only discrete action spaces

print(f"L'environnement {environment.spec.id} a des états de la forme {observation_dim} et contient {n_actions} actions.")

L'environnement CartPole-v1 a des états de la forme (4,) et contient 2 actions.


À quoi correspondent les éléments d'un vecteur d'état dans cet environnement?

Selon la documentation de https://gymnasium.farama.org/environments/classic_control/cart_pole, les éléments d'un vecteur d'état sont (par ordre croissant d'index) :

0. Position de la platforme $x \in [-4.8, 4.8]$
1. Vitesse de la platforme $v \in \ ]-\infty, \infty[$
2. Angle du poteau $\theta \in [-0.418, 0.418]$
3. Vitesse angulaire du poteau $\omega \in \ ]-\infty, \infty[$

Pour les besoins de l'implémentation tabulaire, il est nécessaire de discrétiser l'espace d'états continu de CartPole. Le code suivant permet de déterminer les limites de l'espace d'état :

In [14]:
from math import radians

# get upper bounds on state components
upper_bounds = environment.observation_space.high

# recall : [cart position, cart speed, pole angle, pole speed]
upper_bounds[1] = 0.5
upper_bounds[3] = radians(50)

# symmetric state space
lower_bounds = -upper_bounds # or environment.observation_space.low with similar trick as above

Comme les vitesses ont des bornes infinies qui ne se prètent pas bien à la discrétisation, on leur fixe les valeurs limites de 0.5 unité / $t$ et 50 radians / $t$ pour la vitesse du chariot et du poteau, respectivement.

Complétez le code suivant pour implémenter une fonction permettant de discrétiser un état. Votre fonction doit recevoir en entrée l'état à discrétiser, les bornes inférieures et supérieures à considérer pour chaque composante d'un état, ainsi  qu'une liste indiquant le nombre de zones (_buckets_ ou _bins_) à utiliser pour discrétiser chaque composante. Chaque zone est de taille identique : $(u_i - \ell_i) / N_i$, où $u_i$ et $\ell_i$ sont les bornes supérieure et inférieure pour la composante $i$ et $N_i$ est le nombre de zones qui divisent la composante $i$. Votre fonction doit retourner un tuple de taille 4, où chaque indice correspond à l'indice de la zone discrète représentant la composante continue Assurez-vous de gérer les cas limites où l'état en entrée contient des composantes en dehors de vos bornes (ramenez-les à la borne la plus près). Considérez la discrétisation par défaut en 1, 1, 6 et 12 zones par composante respectivement.

In [15]:
def discretize(state, lower_bounds, upper_bounds, n_buckets=[1, 1, 6, 12]):
    '''
    state : (4,) numpy array
    lower_bounds : (4,) numpy array
    upper_bounds : (4,) numpy array
    n_buckets : list of integers

    Return : Discrete state index (integer).
    '''
    discrete_state = ()

    for c_i in range(len(state)):
        if state[c_i] <= lower_bounds[c_i]:
            index = 0
        elif state[c_i] >= upper_bounds[c_i]:
            index = n_buckets[c_i] - 1
        else:
            # Créer des zones espacées uniformément
            zones = numpy.linspace(lower_bounds[c_i], upper_bounds[c_i], n_buckets[c_i] + 1)
            # Trouver la zone dans laquelle se trouve l'état
            index = numpy.digitize(state[c_i], zones) - 1  # Soustrait 1 pour démarrer à 0

        discrete_state += (int(index),)
    
    # Convertir le tuple en un index unique
    state_index = 0
    for i, val in enumerate(discrete_state):
        state_index = state_index * n_buckets[i] + val

    return state_index

__[IFT-7201]__ La discrétisation suggérée est seulement appliquée sur la position et la vitesse du poteau. Pourquoi est-ce que cette discrétisation est suffisante pour l'apprentissage d'une bonne politique?

### Visualisation

La fonction suivante permet de visualiser une politique _greedy_ définie sur des valeurs d'actions ($q$-_values_) données :

In [16]:
def display_tabular(q_values, max_steps=1000):
    '''
    q_values : (n, k) numpy array where n is the number of (discrete) states and k is the number of actions
    '''
    environment = gym.make("CartPole-v1", render_mode="human")

    time_step = 0
    terminated, truncated = False, False
    state, _ = environment.reset()
    s_idx = discretize(state, lower_bounds, upper_bounds)
    while not (terminated or truncated) and time_step < max_steps:
        environment.render()
        action = numpy.argmax(q_values[s_idx])
        next_state, _, terminated, truncated, _ = environment.step(action)            
        next_s_idx = discretize(next_state, lower_bounds, upper_bounds)
        s_idx = next_s_idx
        time_step += 1
    environment.close()

Elle sera utilisée dans les sections suivantes pour visualiser les politiques apprises.

### [20 points] Apprentissage Monte Carlo

Complétez le code suivant pour implémenter la stratégie First-visit Monte Carlo pour le contrôle. Votre fonction doit recevoir en entrée l'environnement à aborder, le taux d'actualisation $\gamma$ à considérer dans l'objectif, ainsi que le nombre maximal d'épisodes à effectuer. Votre fonction doit retourner la matrice des valeurs d'actions ($q$-_values_) sous la forme d'un _numpy.array_ de dimension $|\mathcal S| \times |\mathcal A|$, où $\mathcal S$ correspond à l'ensemble des états discrétisés.

Considérez une politique de collecte de données de type $\varepsilon$-greedy. Pour chaque épisode $i = 1...N$, considérez un taux d'exploration $\varepsilon = 0.99^{i-1}$ tant que $\varepsilon \geq 0.1$.

À tous les 10 épisodes, votre fonction doit afficher le taux d'exploration $\varepsilon$ utilisé ainsi que la somme des récompenses cumulées (sans actualisation) durant l'épisode.

In [None]:
def first_visit_mc_control(environment, gamma, max_episodes):
    n_buckets = [1, 1, 6, 12]
    n = numpy.prod(n_buckets)  # nombre total d'états discrets possibles
    k = environment.action_space.n # nombre total d'actions possibles

    q_values = numpy.random.rand(n, k)
    alpha = 0.1 # taux d'apprentissage

    for e in range(max_episodes):
        epsilon = 0.99 ** e if 0.99 ** e > 0.1 else 0.1 # taux d'exploration
        trajectoire = [] # séquence d'états-actions-récompenses durant l'épisode
        total_reward = 0
        fin_episode = False
        state, _ = environment.reset()
        s_idx = discretize(state, lower_bounds, upper_bounds)
        while not fin_episode:
            if numpy.random.rand() > epsilon:
                action = numpy.argmax(q_values[s_idx]) 
            else:
                action = numpy.random.randint(k)
    
            next_state, reward, terminated, truncated, _ = environment.step(action)
            trajectoire.append((s_idx, action, reward))
            total_reward += reward

            next_s_idx = discretize(next_state, lower_bounds, upper_bounds)
            s_idx = next_s_idx
            fin_episode = terminated or truncated
        
        # mise à jour des q_values
        s_a_vu = set() # états-actions rencontrés une fois
        for t in range(len(trajectoire)):
            s, a, _ = trajectoire[t]
            if (s, a) not in s_a_vu:
                s_a_vu.add((s, a))
                G_t = sum([gamma ** k * trajectoire[t + 1 + k][2] for k in range(len(trajectoire) - t - 1)]) # calcul du rendement actualisé
  
                # Mise à jour des q_values par taux d'apprentissage alpha (pas mentionné pourtant mais fonctionne super bien)
                q_values[s][a] = q_values[s][a] + alpha * (G_t - q_values[s][a])

        if e % 10 == 0:
            print(f"Episode {e} : epsilon = {epsilon}, total_reward = {total_reward}")
    environment.close()

    return q_values


Appliquez votre stratégie First-visit Monte Carlo au contrôle de l'agent dans CartPole-v1.

Générez 5000 épisodes sans utiliser d'actualisation (donc avec $\gamma = 1$). Votre agent devrait converger à une récompense cumulative autour de 500.

In [54]:
q_values = first_visit_mc_control(environment, gamma=1.0, max_episodes=5000)

Episode 0 : epsilon = 1.0, total_reward = 14.0
Episode 10 : epsilon = 0.9043820750088044, total_reward = 38.0
Episode 20 : epsilon = 0.8179069375972308, total_reward = 122.0
Episode 30 : epsilon = 0.7397003733882802, total_reward = 21.0
Episode 40 : epsilon = 0.6689717585696803, total_reward = 17.0
Episode 50 : epsilon = 0.6050060671375364, total_reward = 21.0
Episode 60 : epsilon = 0.5471566423907612, total_reward = 97.0
Episode 70 : epsilon = 0.49483865960020695, total_reward = 13.0
Episode 80 : epsilon = 0.4475232137638106, total_reward = 24.0
Episode 90 : epsilon = 0.4047319726783238, total_reward = 27.0
Episode 100 : epsilon = 0.3660323412732292, total_reward = 255.0
Episode 110 : epsilon = 0.33103308832101386, total_reward = 212.0
Episode 120 : epsilon = 0.2993803913123313, total_reward = 202.0
Episode 130 : epsilon = 0.27075425951199406, total_reward = 121.0
Episode 140 : epsilon = 0.24486529903492948, total_reward = 500.0
Episode 150 : epsilon = 0.22145178723886091, total_rewar

Vous pouvez maintenant appeler la fonction _display_ définie précédemment pour visualiser la politique _greedy_ définie à partir des valeurs d'actions ($q$-_values_) apprises avec la méthode Monte Carlo :

In [45]:
display_tabular(q_values)

Répétez votre expérience quelques fois. Qu'est-ce que vous observez?

Le contrôle par Monte Carlo First-Visit parvient à trouver une politique qui résout l’environnement, c’est-à-dire qui atteint régulièrement la récompense maximale (500). Les différences d’un entraînement à l’autre se situent essentiellement dans la vitesse de convergence de la politique. De manière générale, une fois que la politique a convergé à la performance maximale (500), les performances varient très peu, hormis quelques fluctuations dues à l’exploration. On remarque toutefois que certains épisodes se terminent par un échec.

__[IFT-7201]__ Pourquoi est-ce que les résultats sont différents à chaque répétition? D'où provient la variance d'une répétition à l'autre?

### [20 points] Q-learning

Complétez le code suivant pour implémenter la stratégie Q-learning. Votre fonction doit recevoir en entrée le taux d'actualisation $\gamma$ à considérer dans l'objectif, un taux d'apprentissage initial $\alpha_0$, ainsi que le nombre maximal d'épisodes à effectuer. Votre fonction doit retourner la matrice des valeurs d'actions ($q$-_values_) sous la forme d'un _numpy.array_ de dimension $|\bar{\mathcal S}| \times |\bar{\mathcal S}| \times |\mathcal A|$, où $\bar{\mathcal S}$ correspond à l'ensemble des états discrétisés.

Considérez un mécanisme de réduction du taux d'apprentissage au fil des épisodes tel que l'épisode $i$ est effectué avec un taux d'apprentissage $\alpha = \alpha_0 \times 0.99^{i-1}$ tant que $\alpha \geq 0.1$.

Considérez une politique de collecte de données de type $\varepsilon$-greedy telle que l'épisode $i$ est réalisé avec un taux d'exploration $\varepsilon = 0.99^{i-1}$ tant que $\varepsilon \geq 0.1$.

À tous les 10 épisodes, votre fonction doit afficher le taux d'exploration $\varepsilon$ utilisé ainsi que la somme des récompenses cumulées (sans actualisation) durant l'épisode.

In [52]:
def q_learning(environment, gamma, learning_rate, max_episodes):
    n_buckets = [1, 1, 6, 12]
    n = numpy.prod(n_buckets)  # nombre total d'états discrets possibles
    k = environment.action_space.n # nombre total d'actions possibles

    q_values = numpy.random.rand(n, k) # initialisation aléatoire des q_values

    for e in range(max_episodes):
        alpha = learning_rate * 0.99 ** e if learning_rate * 0.99 ** e > 0.1 else 0.1 # taux d'apprentissage
        epsilon = 0.99 ** e if 0.99 ** e > 0.1 else 0.1 # taux d'exploration

        total_reward = 0
        fin_episode = False
        state, _ = environment.reset()
        s_idx = discretize(state, lower_bounds, upper_bounds)
        while not fin_episode:
            if numpy.random.rand() > epsilon:
                action = numpy.argmax(q_values[s_idx]) 
            else:
                action = numpy.random.randint(k)
    
            next_state, reward, terminated, truncated, _ = environment.step(action)
            total_reward += reward

            next_s_idx = discretize(next_state, lower_bounds, upper_bounds)
        
            # Mise à jour des q_values par Q-learning
            q_values[s_idx][action] = q_values[s_idx][action] + alpha * (reward + gamma * numpy.max(q_values[next_s_idx]) - q_values[s_idx][action])

            s_idx = next_s_idx
            fin_episode = terminated or truncated

        if e % 10 == 0:
            print(f"Episode {e} : epsilon = {epsilon}, total_reward = {total_reward}")
    environment.close()

    return q_values

Appliquez votre stratégie Q-learning au contrôle de l'agent dans CartPole-v1.

Générez 500 épisodes sans utiliser d'actualisation (donc avec $\gamma = 1$) et avec un taux d'apprentissage initial $\alpha_0 = 1$. Votre agent devrait converger à une récompense cumulative autour de 500.

In [56]:
q_values = q_learning(environment, gamma=1.0, learning_rate=1.0, max_episodes=500)

Episode 0 : epsilon = 1.0, total_reward = 18.0
Episode 10 : epsilon = 0.9043820750088044, total_reward = 15.0
Episode 20 : epsilon = 0.8179069375972308, total_reward = 52.0
Episode 30 : epsilon = 0.7397003733882802, total_reward = 13.0
Episode 40 : epsilon = 0.6689717585696803, total_reward = 11.0
Episode 50 : epsilon = 0.6050060671375364, total_reward = 9.0
Episode 60 : epsilon = 0.5471566423907612, total_reward = 31.0
Episode 70 : epsilon = 0.49483865960020695, total_reward = 12.0
Episode 80 : epsilon = 0.4475232137638106, total_reward = 71.0
Episode 90 : epsilon = 0.4047319726783238, total_reward = 60.0
Episode 100 : epsilon = 0.3660323412732292, total_reward = 38.0
Episode 110 : epsilon = 0.33103308832101386, total_reward = 228.0
Episode 120 : epsilon = 0.2993803913123313, total_reward = 120.0
Episode 130 : epsilon = 0.27075425951199406, total_reward = 142.0
Episode 140 : epsilon = 0.24486529903492948, total_reward = 64.0
Episode 150 : epsilon = 0.22145178723886091, total_reward = 

Vous pouvez maintenant appeler la fonction _display_ définie précédemment pour visualiser la politique _greedy_ définie à partir des valeurs d'actions ($q$-_values_) apprisent avec la méthode Monte Carlo :

In [59]:
display_tabular(q_values)

Comparez ces résultats avec ceux obtenus précédemment en utilisant la stratégie First-visit Monte Carlo :

__[IFT-7201]__ Quel est l'intérêt de réduire le taux d'apprentissage au fil des épisodes?