# **GridWorld** est une grille de 5x5 cases (5 lignes de 5 colonnes) sur laquelle un agent peut se déplacer. L'agent commence généralement dans la première case de la grille (première ligne, première colonne). Il a quatre actions possibles à sa disposition :
- **Gauche** : se déplacer vers la gauche.
- **Droite** : se déplacer vers la droite.
- **Haut** : se déplacer vers le haut.
- **Bas** : se déplacer vers le bas.

**Conditions terminales et récompenses :**
- Atteindre la dernière case de la première ligne (position [0, 4]) est un état terminal et donne une récompense de -3.
- Atteindre la dernière case de la dernière ligne (position [4, 4]) est également un état terminal, mais avec une récompense de 1.
- Essayer de se déplacer en dehors des bords de la grille entraîne un état terminal avec une récompense de -1.

In [47]:
import numpy as np
from typing import List, Tuple
from tqdm import tqdm

# Classe GridWorld

In [48]:
from grid import GridWorld

# Exemple de partie manuelle

In [49]:
grid = GridWorld()

grid.display()

X _ _ _ _ 
_ _ _ _ _ 
_ _ _ _ _ 
_ _ _ _ _ 
_ _ _ _ _ 


In [50]:
grid.step(1)
grid.display()

_ X _ _ _ 
_ _ _ _ _ 
_ _ _ _ _ 
_ _ _ _ _ 
_ _ _ _ _ 


In [51]:
grid.step(3)
grid.display()

_ _ _ _ _ 
_ X _ _ _ 
_ _ _ _ _ 
_ _ _ _ _ 
_ _ _ _ _ 


In [52]:
grid.step(3)
grid.display()

_ _ _ _ _ 
_ _ _ _ _ 
_ X _ _ _ 
_ _ _ _ _ 
_ _ _ _ _ 


In [53]:
grid.step(1)
grid.step(1)
grid.display()

_ _ _ _ _ 
_ _ _ _ _ 
_ _ _ X _ 
_ _ _ _ _ 
_ _ _ _ _ 


In [54]:
grid.step(3)
grid.step(3)
grid.display()

_ _ _ _ _ 
_ _ _ _ _ 
_ _ _ _ _ 
_ _ _ _ _ 
_ _ _ X _ 


In [55]:
grid.step(1)
grid.display()
grid.score()

_ _ _ _ _ 
_ _ _ _ _ 
_ _ _ _ _ 
_ _ _ _ _ 
_ _ _ _ X 


1.0

# Algorithme Naïf Q-learning

L'algorithme naïf de Q-learning est une méthode d'apprentissage par renforcement sans modèle qui cherche à apprendre la politique optimale pour un agent en explorant et en exploitant un environnement. Cet algorithme utilise une table de valeurs Q pour stocker et mettre à jour les estimations des récompenses futures attendues pour chaque paire état-action. Voici les étapes clés de cet algorithme :

1. **Initialisation** : La table Q est initialement remplie de valeurs arbitraires, et ces valeurs sont progressivement ajustées à travers les épisodes d'apprentissage.

2. **Choix des actions** : À chaque étape, l'agent choisit une action basée soit sur l'exploration (choix aléatoire) pour découvrir de nouvelles stratégies, soit sur l'exploitation (choisir l'action avec la valeur Q la plus élevée pour l'état actuel) pour maximiser les récompenses.

3. **Mise à jour de Q** : Après avoir pris une action, l'agent observe la récompense obtenue et le nouvel état atteint. La valeur Q pour la paire état-action est ensuite mise à jour en utilisant la formule :
   \[
   Q(s, a) = Q(s, a) + \alpha \left[r + \gamma \max_{a'} Q(s', a') - Q(s, a)\right]
   \]
   où \( \alpha \) est le taux d'apprentissage, \( \gamma \) est le facteur de dépréciation, \( r \) est la récompense observée, \( s' \) est le nouvel état, et \( a' \) est la meilleure action possible dans le nouvel état.

4. **Répétition** : Ces étapes sont répétées pour de nombreux épisodes jusqu'à ce que la table Q converge vers une approximation de la fonction de valeur optimale, permettant à l'agent de suivre la politique optimale déduite de Q.

L'algorithme de Q-learning est particulièrement utile pour les problèmes avec un espace d'état et d'action discrets et peut être appliqué à une variété de tâches d'apprentissage et de décision.


In [56]:
def naive_q_learning_gridworld(env_type, alpha: float = 0.1, epsilon: float = 0.1, gamma: float = 0.999, nb_iter: int = 100000):
    Q = {}

    env = env_type()
    for it in tqdm(range(nb_iter)):
        env.reset()

        while not env.is_game_over():
            s = env.state_id()
            aa = env.available_actions()

            if s not in Q:
                Q[s] = {}
                for a in aa:
                    Q[s][a] = np.random.random()  # Initialize Q-values for each state-action pair

            # Decide whether to explore or exploit
            if np.random.random() < epsilon:
                a = np.random.choice(aa)
            else:
                # Choose the best action based on current Q-values
                q_s = [Q[s][a] for a in aa]
                best_a_index = np.argmax(q_s)
                a = aa[best_a_index]

            prev_score = env.score()
            env.step(a)
            r = env.score() - prev_score

            s_p = env.state_id()
            aa_p = env.available_actions()

            if env.is_game_over():
                target = r
            else:
                if s_p not in Q:
                    Q[s_p] = {}
                    for a_p in aa_p:
                        Q[s_p][a_p] = np.random.random()
                q_s_p = [Q[s_p][a_p] for a_p in aa_p]
                max_a_p = np.max(q_s_p)
                target = r + gamma * max_a_p

            Q[s][a] = (1 - alpha) * Q[s][a] + alpha * target

    # Extract policy from Q-values
    Pi = {}
    for s, actions in Q.items():
        best_a = max(actions, key=actions.get)
        Pi[s] = best_a

    return Pi, Q

# Example usage:
naive_q_learning_gridworld(GridWorld, nb_iter=10000)

100%|██████████| 10000/10000 [00:02<00:00, 3625.09it/s]


({(0, 0): 1,
  (0, 1): 1,
  (1, 1): 1,
  (1, 0): 1,
  (2, 0): 3,
  (3, 0): 3,
  (0, 2): 1,
  (1, 2): 2,
  (1, 3): 2,
  (2, 2): 2,
  (2, 3): 2,
  (2, 1): 1,
  (3, 1): 3,
  (3, 2): 1,
  (3, 3): 2,
  (4, 2): 3,
  (4, 1): 0,
  (0, 3): 2,
  (2, 4): 0,
  (1, 4): 2,
  (0, 4): 1,
  (3, 4): 0,
  (4, 3): 3},
 {(0, 0): {1: 0.9930209650349772, 3: 0.9930209650349772},
  (0, 1): {1: 0.994014980014992, 2: 0.9818677420978968, 3: 0.9176136655553827},
  (1, 1): {0: 0.993020965034559,
   1: 0.9950099900049977,
   2: 0.993020965034758,
   3: 0.9930209650341054},
  (1, 0): {0: 0.9920279440699412, 1: 0.994014980014992, 3: 0.994014980014992},
  (2, 0): {0: 0.9757763486007596,
   1: 0.9786948630077604,
   3: 0.9950099900049977},
  (3, 0): {0: 0.9183594647516775, 1: -2.98820091436854, 3: 0.9960059960009989},
  (0, 2): {1: 0.9734175304072404, 2: 0.7981644727735243, 3: 0.752418872742548},
  (1, 2): {0: 0.7839501082225314,
   1: 0.8651642261539003,
   2: 0.994014980014992,
   3: 0.8430096037557249},
  (1, 3): {0:

# Algorithme Monte Carlo avec départs exploratoires (ES)

L'algorithme de Monte Carlo avec départs exploratoires (ES) est une technique d'apprentissage par renforcement qui permet d'estimer la politique optimale en utilisant l'échantillonnage complet des retours (gains cumulés) de chaque épisode. Contrairement aux méthodes basées sur le temps différé comme Q-learning, les méthodes Monte Carlo ajustent les estimations de la politique uniquement à la fin de chaque épisode. Voici les étapes principales de cet algorithme :

1. **Initialisation** : La politique (Pi) et la table de valeurs Q sont initialisées. Pour chaque état, une action est choisie aléatoirement comme action par défaut. Un dictionnaire de retours est également initialisé pour stocker les retours accumulés pour chaque paire état-action.

2. **Départs exploratoires** : Chaque épisode commence dans un état initial choisi aléatoirement avec une action également choisie aléatoirement. Cela assure que toutes les paires état-action ont une chance d'être explorées suffisamment.

3. **Génération de l'épisode** : L'agent suit la politique actuelle sauf pour le premier choix qui est aléatoire. L'épisode est enregistré sous forme de séquence de triplets (état, action, récompense).

4. **Calcul du retour** : À la fin de l'épisode, le retour pour chaque étape est calculé en remontant depuis la fin de l'épisode jusqu'au début, en utilisant le facteur de dépréciation \( \gamma \). Le retour est le gain cumulé à partir de cette étape jusqu'à la fin de l'épisode.

5. **Mise à jour de Q et de la politique** :
   - **Mise à jour de Q** : Pour chaque paire état-action unique rencontrée dans l'épisode, le retour est ajouté à la liste des retours correspondants et la valeur Q est mise à jour comme la moyenne de ces retours.
   - **Mise à jour de la politique** : Pour chaque état visité, la politique est mise à jour pour choisir l'action qui maximise la valeur Q estimée pour cet état.

6. **Répétition** : Le processus est répété pour un grand nombre d'épisodes pour permettre à la politique de converger vers la politique optimale.

Cet algorithme est particulièrement utile dans les environnements avec des dynamiques simples où la méthode des départs exploratoires est faisable. Il est aussi bien adapté aux situations où il est possible de simuler l'environnement pour générer des épisodes avec des conditions de départ arbitraires.


In [57]:
def naive_monte_carlo_with_exploring_starts(env_type, gamma=0.999, nb_iter=10000, max_steps=10):
    Pi = {}
    Q = {}
    Returns = {}

    for it in tqdm(range(nb_iter)):
        env = env_type.from_random_state()  # Assurez-vous que cette méthode initialise correctement

        is_first_action = True
        trajectory = []
        steps_count = 0
        while not env.is_game_over() and steps_count < max_steps:
            s = env.state_id()
            aa = env.available_actions()

            if s not in Pi:
                Pi[s] = np.random.choice(aa)

            if is_first_action:
                a = np.random.choice(aa)
                is_first_action = False
            else:
                a = Pi[s]

            prev_score = env.score()
            env.step(a)
            r = env.score() - prev_score

            trajectory.append((s, a, r, aa))
            steps_count += 1

        G = 0
        for (t, (s, a, r, aa)) in reversed(list(enumerate(trajectory))):
            G = gamma * G + r

            if all(map(lambda triplet: triplet[0] != s or triplet[1] != a, trajectory[:t])):
                if (s, a) not in Returns:
                    Returns[(s, a)] = []
                Returns[(s, a)].append(G)
                Q[(s, a)] = np.mean(Returns[(s, a)])

                best_a = None
                best_a_score = 0.0
                for a in aa:
                    if (s, a) not in Q:
                        Q[(s, a)] = np.random.random()
                    if best_a is None or Q[(s, a)] > best_a_score:
                        best_a = a
                        best_a_score = Q[(s, a)]

                Pi[s] = best_a
    return Pi
  
naive_monte_carlo_with_exploring_starts(GridWorld, nb_iter=10000)

100%|██████████| 10000/10000 [00:04<00:00, 2057.65it/s]


{(1, 3): 1,
 (0, 3): 1,
 (0, 4): 1,
 (0, 2): 1,
 (0, 1): 3,
 (1, 1): 1,
 (1, 2): 1,
 (3, 1): 3,
 (4, 1): 0,
 (2, 3): 1,
 (2, 2): 3,
 (3, 3): 3,
 (4, 3): 3,
 (4, 2): 3,
 (3, 4): 1,
 (3, 2): 1,
 (2, 1): 1,
 (1, 0): 1,
 (2, 0): 3,
 (3, 0): 3,
 (0, 0): 3,
 (1, 4): 2,
 (2, 4): 1}

# Algorithme On-policy First visit Monte Carlo control

In [58]:
import random
from collections import defaultdict

def on_policy_first_visit_mc_control(env_type, gamma=0.999, epsilon=0.1, nb_iter=10000, max_steps=10):
    Pi = {}
    Q = defaultdict(lambda: 0.0)
    Returns = defaultdict(list)

    for it in tqdm(range(nb_iter)):
        env = env_type.from_random_state()
        trajectory = []
        steps_count = 0

        while not env.is_game_over() and steps_count < max_steps:
            s = env.state_id()
            aa = env.available_actions()

            if np.random.random() < epsilon or s not in Pi:
                a = np.random.choice(aa)
            else:
                a = Pi[s]

            prev_score = env.score()
            env.step(a)
            r = env.score() - prev_score

            trajectory.append((s, a, r, aa))
            steps_count += 1

        G = 0
        visited_state_action_pairs = set()
        for (s, a, r, aa) in reversed(trajectory):
            G = gamma * G + r

            if (s, a) not in visited_state_action_pairs:
                visited_state_action_pairs.add((s, a))
                Returns[(s, a)].append(G)
                Q[(s, a)] = np.mean(Returns[(s, a)])

                best_a = max(aa, key=lambda action: Q[(s, action)])
                Pi[s] = best_a

    return Pi

on_policy_first_visit_mc_control(GridWorld, nb_iter=10000)

100%|██████████| 10000/10000 [00:05<00:00, 1770.30it/s]


{(0, 1): 3,
 (0, 0): 1,
 (1, 1): 1,
 (1, 2): 3,
 (1, 0): 3,
 (2, 1): 3,
 (2, 2): 3,
 (3, 2): 3,
 (3, 1): 3,
 (3, 0): 3,
 (1, 4): 1,
 (0, 4): 1,
 (2, 4): 1,
 (2, 3): 3,
 (1, 3): 1,
 (0, 3): 3,
 (0, 2): 3,
 (4, 1): 0,
 (3, 3): 3,
 (4, 3): 3,
 (2, 0): 3,
 (4, 2): 0,
 (3, 4): 1}

# Off Policy First visit Monte Carlo Control

In [59]:
def off_policy_mc_control(env_type, gamma=0.999, nb_iter=10000, max_steps=10):
    Q = defaultdict(lambda: 0.0)
    C = defaultdict(lambda: 0.0)
    b = defaultdict(lambda: 1.0)  # behavior policy (random)

    Pi = {}

    for it in tqdm(range(nb_iter)):
        env = env_type.from_random_state()
        trajectory = []
        steps_count = 0

        while not env.is_game_over() and steps_count < max_steps:
            s = env.state_id()
            aa = env.available_actions()

            a = np.random.choice(aa)
            prev_score = env.score()
            env.step(a)
            r = env.score() - prev_score

            trajectory.append((s, a, r))
            steps_count += 1

        G = 0
        W = 1.0
        for (s, a, r) in reversed(trajectory):
            G = gamma * G + r
            C[(s, a)] += W
            Q[(s, a)] += (W / C[(s, a)]) * (G - Q[(s, a)])
            
            Pi[s] = max(env.available_actions(), key=lambda action: Q[(s, action)])
            
            if a != Pi[s]:
                break
            
            W *= 1.0 / b[(s, a)]

    return Pi

off_policy_mc_control(GridWorld, nb_iter=10000)


100%|██████████| 10000/10000 [00:01<00:00, 7674.38it/s]


{(1, 4): 0,
 (3, 0): 0,
 (0, 3): 1,
 (3, 1): 0,
 (0, 1): 1,
 (2, 3): 0,
 (2, 4): 0,
 (0, 2): 1,
 (4, 2): 0,
 (3, 2): 0,
 (0, 4): 1,
 (3, 4): 0,
 (3, 3): 0,
 (1, 2): 0,
 (4, 1): 0,
 (4, 3): 0,
 (2, 1): 0,
 (1, 3): 0,
 (2, 0): 0,
 (2, 2): 0,
 (1, 0): 0,
 (1, 1): 0,
 (0, 0): 0}

# Sarsa

In [60]:
def epsilon_greedy_policy(Q, state, epsilon, available_actions):
    if random.uniform(0, 1) < epsilon:
        return random.choice(available_actions)
    else:
        return max(available_actions, key=lambda x: Q[state][x])

def sarsa(env, num_episodes, alpha, gamma, epsilon):
    Q = {}
    for row in range(5):
        for col in range(5):
            Q[(col, row)] = [0.0, 0.0, 0.0, 0.0]

    for episode in range(num_episodes):
        env.reset()
        state = env.state_id()
        available_actions = env.available_actions()
        action = epsilon_greedy_policy(Q, state, epsilon, available_actions)

        while not env.is_game_over():
            env.step(action)
            reward = env.score()
            next_state = env.state_id()
            next_available_actions = env.available_actions()
            next_action = epsilon_greedy_policy(Q, next_state, epsilon, next_available_actions)

            Q[state][action] += alpha * (reward + gamma * Q[next_state][next_action] - Q[state][action])

            state = next_state
            action = next_action

    return Q

In [61]:
alpha = 0.1  # Taux d'apprentissage
gamma = 0.99  # Facteur de réduction
epsilon = 0.1  # Taux d'exploration
num_episodes = 1000  # Nombre d'épisodes

# Création de l'environnement et exécution de l'algorithme SARSA
env = GridWorld()
Q = sarsa(env, num_episodes, alpha, gamma, epsilon)

# Affichage de la politique optimale trouvée
for row in range(5):
    for col in range(5):
        state = (col, row)
        best_action = np.argmax(Q[state])
        action_symbols = ['←', '→', '↑', '↓']
        print(action_symbols[best_action], end=' ')
    print()

↓ ↓ ← ← ← 
→ ↓ ← ← ← 
→ ↓ ← ← ← 
→ ↓ ← ← ← 
→ → → → ← 


# Dyna Q

In [62]:
def epsilon_greedy_policy(Q, state, epsilon, available_actions):
    if not available_actions:
        return random.choice([0, 1, 2, 3])
    if random.uniform(0, 1) < epsilon:
        return random.choice(available_actions)
    else:
        if available_actions:
            return max(available_actions, key=lambda x: Q[state][x])
        else:
            return random.choice([0, 1, 2, 3])

def dyna_q(env, num_episodes, alpha, gamma, epsilon, planning_steps):
    Q = {}
    model = {}

    for row in range(5):
        for col in range(5):
            Q[(col, row)] = [0.0, 0.0, 0.0, 0.0]
            model[(col, row)] = {}

    for episode in range(num_episodes):
        env.reset()
        state = env.state_id()
        available_actions = env.available_actions()
        action = epsilon_greedy_policy(Q, state, epsilon, available_actions)

        while not env.is_game_over():
            env.step(action)
            reward = env.score()
            next_state = env.state_id()
            next_available_actions = env.available_actions()
            next_action = epsilon_greedy_policy(Q, next_state, epsilon, next_available_actions)

            # Mise à jour de Q
            Q[state][action] += alpha * (reward + gamma * Q[next_state][next_action] - Q[state][action])

            # Mise à jour du modèle
            if action not in model[state]:
                model[state][action] = (reward, next_state)
            else:
                model[state][action] = (reward, next_state)

            # Planification
            for _ in range(planning_steps):
                sampled_state = random.choice(list(model.keys()))
                if not model[sampled_state]:
                    continue
                sampled_action = random.choice(list(model[sampled_state].keys()))
                sampled_reward, sampled_next_state = model[sampled_state][sampled_action]

                Q[sampled_state][sampled_action] += alpha * (sampled_reward + gamma * max(Q[sampled_next_state]) - Q[sampled_state][sampled_action])

            state = next_state
            action = next_action

    return Q

# Paramètres
alpha = 0.1  # Taux d'apprentissage
gamma = 0.99  # Facteur de réduction
epsilon = 0.1  # Taux d'exploration
num_episodes = 1000  # Nombre d'épisodes
planning_steps = 5  # Nombre d'étapes de planification

# Création de l'environnement et exécution de l'algorithme Dyna-Q
env = GridWorld()
Q = dyna_q(env, num_episodes, alpha, gamma, epsilon, planning_steps)

# Affichage de la politique optimale trouvée
for row in range(5):
    for col in range(5):
        state = (col, row)
        best_action = np.argmax(Q[state])
        action_symbols = ['←', '→', '↑', '↓']
        print(action_symbols[best_action], end=' ')
    print()

→ ↓ ↓ ← ← 
↓ ↓ ↓ ← ← 
→ → ↓ ↓ ← 
→ → → ↓ ← 
→ → → → ← 
