# Recherche directe de politique

Dans ce TP, l'objectif est d'implémenter un algorithme de recherche directe de politique pour trouver une politique optimale pour l'environnement [CartPole](https://gymnasium.farama.org/environments/classic_control/cart_pole/).

> Vous devez  <span style="color:red"> commenter votre code ! </span> 
> 
> Vous devez aussi répondre à des questions (dans les cellules <span style="color:blue">Votre réponse: </span> )


> Le code doit être fonctionnel avec l'environnement virtuel du TP. Si d'autres packages que ceux présents dans l'environnement virtuel créé au départ sont nécessaires, vous devez ajouter à votre dépôt un fichier `environnement.yaml` qui est un export de votre environnement virtuel. Ce fichier est obtenu avec la commande suivante:  ```conda env export > environnement.yaml```

In [2]:
import gymnasium as gym
import random
import numpy as np
from collections import deque
import matplotlib.pyplot as plt
import torch
from torch.distributions import Categorical
%matplotlib inline
%load_ext autoreload

def init_seed(seedval):
    torch.manual_seed(seedval)
    np.random.seed(seedval)
    random.seed(seedval)

# 1 Environnement CartPole

Lire la documentation et afficher les caractéristiques (états, action) de l'environnement *CartPole*.

In [3]:
# Initialiser l'environnement CartPole
env = gym.make('CartPole-v1')

# Obtenir les caractéristiques de l'environnement
# Espace d'état (observation)
state_space = env.observation_space
# Espace d'actions
action_space = env.action_space

# Afficher les informations sur l'environnement
print("=== Caractéristiques de l'environnement CartPole ===")
print(f"Espace d'états (observations): {state_space}")
print(f" - Borne inférieure des états: {state_space.low}")
print(f" - Borne supérieure des états: {state_space.high}")

print(f"\nEspace d'actions: {action_space}")
print(f" - Nombre d'actions possibles: {action_space.n}")
print(f" - Actions possibles: {list(range(action_space.n))} (0: Gauche, 1: Droite)")

print("\nCritères de fin d'épisode:")
print(" - Le poteau dépasse un angle de ±12° (environ 0.418 radians).")
print(" - Le chariot dépasse la limite de ±2.4 mètres.")
print(" - Le nombre maximum de pas de temps atteint (généralement 500).")

print("\nRécompense:")
print(" - Récompense de +1 à chaque pas tant que le poteau reste en équilibre.")

# Fermer l'environnement
env.close()



=== Caractéristiques de l'environnement CartPole ===
Espace d'états (observations): Box([-4.8000002e+00 -3.4028235e+38 -4.1887903e-01 -3.4028235e+38], [4.8000002e+00 3.4028235e+38 4.1887903e-01 3.4028235e+38], (4,), float32)
 - Borne inférieure des états: [-4.8000002e+00 -3.4028235e+38 -4.1887903e-01 -3.4028235e+38]
 - Borne supérieure des états: [4.8000002e+00 3.4028235e+38 4.1887903e-01 3.4028235e+38]

Espace d'actions: Discrete(2)
 - Nombre d'actions possibles: 2
 - Actions possibles: [0, 1] (0: Gauche, 1: Droite)

Critères de fin d'épisode:
 - Le poteau dépasse un angle de ±12° (environ 0.418 radians).
 - Le chariot dépasse la limite de ±2.4 mètres.
 - Le nombre maximum de pas de temps atteint (généralement 500).

Récompense:
 - Récompense de +1 à chaque pas tant que le poteau reste en équilibre.


# 2 Définition de la Politique


> <span style="color:green">- Quelle est la différence entre une politique stochastique et déterministe ?  </span>



> <span style="color:blue">Votre réponse: </span>

Une politique déterministe associe chaque état à une action unique. Cela signifie que pour chaque état donné, l'agent choisit toujours la même action.

Une politique stochastique associe chaque état à une distribution de probabilité sur les actions possibles. Cela signifie que l'agent peut choisir différentes actions à partir d'un même état, avec des probabilités spécifiques.

Dans le cas déterministe l'agent va toujours faire le meme choix alors que dans le cas stochastique l'agent va parfois changer son action.

> <span style="color:green">- Dans le cas d'une politique déterministe dans le CartPole, qu'est-ce qui sera en entrée de la politique ? En sortie ?  </span>

> <span style="color:blue">Votre réponse: </span>

l'entrée est le vecteur d'état qui décrit la situation actuelle du système, et la sortie est l'action unique à entreprendre pour maintenir l'équilibre du poteau.

> <span style="color:green">- Dans le cas d'une politique stochastique dans le CartPole, qu'est-ce qui sera en entrée de la politique ? En sortie ?  </span>

> <span style="color:blue">Votre réponse: </span>

l'entrée est le vecteur d'état qui décrit la situation actuelle du système, tandis que la sortie est une distribution de probabilités sur les actions possibles.

Compléter la classe *Politique* ci-dessous qui sera utilisée dans le CartPole. Cette politique sera déterministe ou stochastique (choix via l'argument *det*) et sera composée d'une seule couche de poids (il est conseillé d'utiliser une *ndarray* pour les poids car pas besoin de calcul de gradient). Vous devez utiliser une distribution catégorielle de *Torch* pour la politique stochastique. 


In [4]:
import torch.nn.functional as F

class Politique():

    def __init__(self, dim_entree:int, dim_sortie:int, det = True):
        
        #TO DO 

        self.dim_entree = dim_entree
        self.dim_sortie = dim_sortie
        self.det = det

        self.poids = np.random.randn(dim_entree, dim_sortie) * 0.01
 
    def output(self, etat : np.ndarray) -> int:

        #TO DO
        logits = etat @ self.poids  # Produit matriciel

        if self.det:
            return int(np.argmax(logits))
        else:
            probas = F.softmax(torch.tensor(logits), dim=0)  # Calcul des probabilités
            action = torch.multinomial(probas, num_samples=1)  # Échantillonnage d'une action
            return int(action.item())
 
 
    def set_poids(self, poids: np.ndarray):
        self.poids = poids
    
    def get_poids(self) -> np.ndarray:
        return self.poids

    def save(self,file):
        np.save(file, self.poids)

    def load(self,file):
        self.poids = np.load(file)

Compléter la méthode *eval_politique* qui évalue une politique sur plusieurs épisodes, et renvoie la **performance de chaque épisode**, qui est la somme des récompenses obtenues sur chaque épisode.

NB: L'attribut 'init_large' permet d'élargir les bornes définies pour le choix de l'état initial.

In [5]:
def eval_politique(politique, env, nb_episodes=100, max_t=1000, seed=random.randint(0,5000), init_large=False) -> list:


    somme_rec = []
    for epi in range(1, nb_episodes+1):
        rec_epi = []
        if init_large: 
            state, _ = env.reset(seed=seed,options={'low':-0.2,'high':0.2})
        else:
            state, _ = env.reset(seed=seed)
        #TO COMPLETE

        for t in range(max_t):
            action = politique.output(state)  # Obtenir l'action de la politique
            state, reward, done, info, x = env.step(action)  # Appliquer l'action et obtenir les nouveaux états et récompenses
            
            rec_epi.append(reward)  # Ajouter la récompense à la récompense de l'épisode
            
            if done:  # Vérifier si l'épisode est terminé
                break
        
        somme_rec.append(len(rec_epi))  # Ajouter la récompense de l'épisode à la liste des performances
    
    return somme_rec  # Retourner la liste des performances




On évalue ci-dessous une politique déterministe et une politique stochastique (non-optimisées) sur 5 épisodes, **à partir du même état initial**.

In [6]:
env = gym.make('CartPole-v1',render_mode="human")
init_seed(5)

politique = Politique(dim_entree=4, dim_sortie=2,det=True) 
perf = eval_politique(politique,env,seed=5,nb_episodes=5)
print(f"Perf  = {perf}")

politique = Politique(dim_entree=4, dim_sortie=2,det=False) 
perf = eval_politique(politique,env,seed=5,nb_episodes=5)
print(f"Perf  = {perf}")

env.close()

Perf  = [48, 48, 48, 48, 48]
Perf  = [29, 11, 11, 29, 22]


> <span style="color:green">- Analyser les résultats obtenus sur les 2 évaluations: quelle est la différence ? pourquoi ? que peut-on en déduire sur la stochasticité de l'environnement Cart-Pole ?  </span>


> <span style="color:blue">Votre réponse: </span>

La différence est que le déterministe est prévisible et obtient toujours le même résultat alors que le stochastique change.
Car le stochastique utilise des probabilités pour faire son choix alors que le déterministe prend le meilleure.

On peut déduire que la stochasticité de l'environnement n'est pas bonne car en déterministe on obtient de meilleure résultat.

# 3 Algorithme de recherche directe de politique

L'objectif est d'implément un algorithme de type *Adaptive Noise Scaling*, qui va à chaque itération, générer une nouvelle politique candidate en perturbant les paramètres de la meilleure politique trouvée. 

La perturbation des paramètres sera réalisée en ajoutant un *bruit* sur chaque paramètre, et en faisant **varier la variance** de ce bruit: variance réduite si la politique candidate trouvée est meilleure que l'ancienne, augmentée sinon.

L'algorithme est le suivant:
- Choix d'une politique initiale arbitraire Pi et évaluation de cette politique
- Tant que l'environnement n'est pas résolu (itération)
    - Génération d'une nouvelle politique candidate à partir de Pi
    - Evaluation de la candidate
    - Si la candidate a une meilleure perf que Pi: réduction de la variance du bruit et Pi = Candidate
    - Si la candidate a une moins bonne perf que Pi: augmentation de la variance du bruit 


> <span style="color:green">Vous devez  </span>
- compléter la méthode *rollout* qui se charge d'évaluer une politique sur un épisode
- compléter la méthode *recherche_directe* pour faire une exploration dans l'espace des paramètres de la politique, sur *nb_episode* de durée max *max_t* pas. Cette méthode renverra les performances obtenues à chaque itération et les poids de la meilleure politique trouvée
- faire en sorte de stopper la recherche lorsqu'une politique optimale est trouvée
- afficher les performances régulièrement pendant la recherche
- vous utiliserez ici des états initiaux différents à chaque épisode (le paramètre *seed* lors du *reset* de l'environnement doit être différent à chaque épisode, par ex., avec *random.randint(0,5000)* pour obtenir des expériences répétables ...)

In [7]:
def rollout(politique, env,max_t=1000)-> int : 
    """
        execute un episode sur l'environnement env avec la politique et renvoie la somme des recompenses obtenues sur l'épisode
    """
    #TODO

    state, _ = env.reset(seed=random.randint(0, 5000)) 
    total_reward = 0
    for t in range(max_t):
        action = politique.output(state) 
        state, reward, done, trunc, info = env.step(action) 
        total_reward += reward
        if done: 
            break
    return total_reward

def recherche_directe(politique, env,  nb_episodes=5000, max_t=1000) -> tuple[list,np.ndarray] :
    #TO COMPLETE
    bruit_std = 1e-2 
    performances = []
    meilleur_perf = -np.inf  
    meilleur_politique = politique.get_poids() 
    best_poli = politique
    count = 0

    for i_episode in range(1, nb_episodes + 1):
        
        politique_bruitee = best_poli
        #print(best_poli.get_poids())
        bruit = np.random.normal(0, bruit_std, size=politique_bruitee.get_poids().shape)
        politique_bruitee.set_poids(best_poli.get_poids() + bruit)
        
        # Évaluer la performance de la politique bruitée
        perf = rollout(politique_bruitee, env, max_t)
        performances.append(perf)

        if perf >= meilleur_perf:  
            best_poli = politique_bruitee
            meilleur_perf = perf
            meilleur_politique = politique_bruitee.get_poids()
            bruit_std = 1e-2 # reset de la variance quand une meilleur perf est trouvé pour chercher autour
            # Réduction de la variance du bruit
            bruit_std = max(1e-3, bruit_std / 2)
        else:
            # Augmentation de la variance du bruit
            bruit_std = max(2, bruit_std * 2) # avec max 2 en variance, pas assez élevé pour trouver la solution

        if i_episode % 1 == 0:
            print(f"Épisode {i_episode}/{nb_episodes} : Perf = {perf}, Meilleure Perf = {meilleur_perf}, Bruit Std = {bruit_std}")
            
        if meilleur_perf >= env.spec.reward_threshold:  #si il trouve 5 fois la meilleur perf on peut s'arrêter
            count+=1
            if count == 5:
                print(f"Politique optimale trouvée à l'épisode {i_episode} avec une performance de {meilleur_perf}")
                break


    return performances, meilleur_politique
            
         

   

Compléter la cellule ci-dessous pour sauvegarder votre meilleure politique déterministe dans un fichier "best_pi_det.npy" et votre meilleure politique stochastique dans un fichier "best_pi_stoc.npy".

<span style="color:red"> Vous ajouterez ces 2 fichiers dans votre dépôt github ! </span> 

In [17]:
env = gym.make('CartPole-v1')
init_seed(5)

#TODO

politiqueDet = Politique(dim_entree=4, dim_sortie=2,det=True) 
politiqueSto = Politique(dim_entree=4, dim_sortie=2,det=False) 

best_pi_det = recherche_directe(politiqueDet,env,5000,1000)  
best_pi_stoc = recherche_directe(politiqueSto,env,5000,1000)


# Sauvegarde des politiques
np.save("best_pi_det.npy", best_pi_det)  # Sauvegarder la politique déterministe
np.save("best_pi_stoc.npy", best_pi_stoc)  # Sauvegarder la politique stochastique


Épisode 1/5000 : Perf = 795.0, Meilleure Perf = 795.0, Bruit Std = 0.005
Épisode 2/5000 : Perf = 88.0, Meilleure Perf = 795.0, Bruit Std = 2
Épisode 3/5000 : Perf = 1000.0, Meilleure Perf = 1000.0, Bruit Std = 0.005
Épisode 4/5000 : Perf = 1000.0, Meilleure Perf = 1000.0, Bruit Std = 0.005
Épisode 5/5000 : Perf = 1000.0, Meilleure Perf = 1000.0, Bruit Std = 0.005
Politique optimale trouvée à l'épisode 5 avec une performance de 1000.0
Épisode 1/5000 : Perf = 20.0, Meilleure Perf = 20.0, Bruit Std = 0.005
Épisode 2/5000 : Perf = 30.0, Meilleure Perf = 30.0, Bruit Std = 0.005
Épisode 3/5000 : Perf = 22.0, Meilleure Perf = 30.0, Bruit Std = 2
Épisode 4/5000 : Perf = 47.0, Meilleure Perf = 47.0, Bruit Std = 0.005
Épisode 5/5000 : Perf = 34.0, Meilleure Perf = 47.0, Bruit Std = 2
Épisode 6/5000 : Perf = 69.0, Meilleure Perf = 69.0, Bruit Std = 0.005
Épisode 7/5000 : Perf = 76.0, Meilleure Perf = 76.0, Bruit Std = 0.005
Épisode 8/5000 : Perf = 73.0, Meilleure Perf = 76.0, Bruit Std = 2
Épisod

  arr = np.asanyarray(arr)


# 4 Evaluation des politiques trouvées

Compléter la cellule ci-dessous pour évaluer les meilleures politiques déterministes et stochastiques que vous avez trouvées.


In [21]:
import numpy as np
import gymnasium as gym
import random

# Charger les meilleures politiques sauvegardées
politique_det = Politique(dim_entree=4, dim_sortie=2, det=True)
politique_stoc = Politique(dim_entree=4, dim_sortie=2, det=False)

# Charger les poids des fichiers .npy pour chaque politique
poids_det = np.load("best_pi_det.npy", allow_pickle=True)
poids_stoc = np.load("best_pi_stoc.npy", allow_pickle=True)

# Convertir les poids en tableau NumPy de forme correcte (sinon problème de dimensions)
poids_det = np.array(poids_det[1])
poids_stoc = np.array(poids_stoc[1])

# Vérification des dimensions (on avait un problème de dimensions)
if poids_det.shape != (4, 2):
    raise ValueError(f"Dimensions des poids déterministes incorrectes : {poids_det.shape}, attendu (4, 2)")
if poids_stoc.shape != (4, 2):
    raise ValueError(f"Dimensions des poids stochastiques incorrectes : {poids_stoc.shape}, attendu (4, 2)")

# Mettre les poids dans les politiques
politique_det.set_poids(poids_det)
politique_stoc.set_poids(poids_stoc)

# Fonction d'évaluation d'une politique
def evaluer_politique(politique, env, nb_episodes=100, max_t=1000):

    total_rewards = []
    for episode in range(nb_episodes):
        state, _ = env.reset(seed=random.randint(0, 5000))
        episode_reward = 0
        for t in range(max_t):
            action = politique.output(state)
            state, reward, done, _, _ = env.step(action)
            episode_reward += reward
            if done:
                break
        total_rewards.append(episode_reward)
    moyenne_perf = np.mean(total_rewards)
    print(f"Performance moyenne sur {nb_episodes} épisodes: {moyenne_perf}")
    return moyenne_perf

# Initialiser l'environnement et la graine
env = gym.make('CartPole-v1')
init_seed(5)

# Évaluer et afficher les performances des meilleures politiques
print("Évaluation de la meilleure politique déterministe:")
moyenne_perf_det = evaluer_politique(politique_det, env)

print("\nÉvaluation de la meilleure politique stochastique:")
moyenne_perf_stoc = evaluer_politique(politique_stoc, env)

# Fermer l'environnement
env.close()


Évaluation de la meilleure politique déterministe:
Performance moyenne sur 100 épisodes: 888.47

Évaluation de la meilleure politique stochastique:
Performance moyenne sur 100 épisodes: 165.1



> <span style="color:green"> Analysez les résultats. Qu'en déduisez-vous sur le type de politique le plus adapté ici ?  </span>



> <span style="color:blue">Votre réponse: </span>  
La politique déterministe semble être le choix le plus adapté pour cet environnement, probablement en raison de sa capacité à fournir des actions plus cohérentes et prévisibles, ce qui est essentiel pour maintenir le poteau en équilibre. La politique stochastique, bien qu'elle puisse explorer plus de variations d'actions, n'a pas réussi à atteindre une performance compétitive.