# Lunar Lander !

## Policy gradient

Solution en 800 entrainements environ !

__Attention : utiliser tensorflow 2.12__



In [None]:
import time
import gymnasium as gym
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Dense,Input
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers.legacy import Adam

import matplotlib.pyplot as plt

import os

In [None]:
tf.compat.v1.disable_eager_execution()
tf.config.list_physical_devices()

In [None]:
class PolicyGradient():
    def __init__(self,nb_obs,nb_action,learning_rate=0.001,gamma=0.99, layer1=64,layer2=32):
        self.nb_obs=nb_obs
        self.nb_action=nb_action
        self.learning_rate=learning_rate
        self.gamma=gamma
        
        # création du réseau
        self.policy, self.predict = self.buildNN(layer1,layer2)
        
        # mémoire des transitions
        self.histo_states = []
        self.histo_actions =[]
        self.histo_rewards = []
        

        
    def buildNN(self,layer1,layer2):
        """
        Objet : création du réseau neural
        entrée : couches cachées 1 et 2
        sortie : réseau pour entrainement et prédiction
        """

        InputReward = Input(shape=[1])
        InputOBS    = Input(shape=(self.nb_obs,))
        dense1 = # votre code (InputOBS)
        dense2 = # votre code
        dense3 = # ajouter une derniere couche cachée de taille layer2 /2
        proba = # votre code

        
        def loss_function(y_true, y_pred):
            """
            Objet : fonction de perte
            Entrée : y réel et y prédit
            Sortie : loss
            """
            out = tf.keras.backend.clip(y_pred,1e-8,1-1e-8) # on élimine les valeurs extrèmes
            loss_brut = y_true * tf.keras.backend.log(out)
            loss_reward = -loss_brut * InputReward
            print("Loss reward ",loss_reward)
            return tf.keras.backend.sum(loss_reward)
    
        policy=Model(inputs=[InputOBS,InputReward], outputs=[proba])
        policy.compile(optimizer=Adam(learning_rate=self.learning_rate),loss=loss_function)
        predict=Model(inputs=[InputOBS], outputs=[proba])
        
        return policy,predict
                       
    def act(self,state):
        """
        Objet : choisi une action en fonction d'un état et de la distribution des prob
        Entrée : état
        Sortie : action
        """
        # Votre code
    
    def discount_reward(self,histo_reward):
        """
        Objet : renvoie le gain d'une trajectoire 
        Entrée : historique des récompenses pour chaque transition 
        Sortie : Gain avec dépréciation gamma : np.array pour tracer le gain à chaque step
        """
        # creation d un array a zero memes dimensions que rewards
        discounted = np.zeros_like(histo_reward)
        # votre code
        
        return discounted
    
    def memory(self,state,action,reward):
        """
        Objet : historisation des transitions
        Entrée : état, action, récompense
        Sortie : none
        """
        self.histo_states.append(state[0])
        self.histo_actions.append(action)
        self.histo_rewards.append(reward)
        
    def train(self):
        """
        Objet : entrainement du model en fonction de la dernière trajectoire
        """
        states_history = np.array(self.histo_states)
        actions_history = np.array(self.histo_actions)
        rewards_history = np.array(self.histo_rewards)

        # transformation de l'historique des action en matrice 0/1
        actions=np.zeros([len(actions_history),self.nb_action])
        actions[ np.arange(len(actions_history)), actions_history] = 1

        # calcul du Gain normalisé
        discounted_reward = self.discount_reward(rewards_history)
        discounted_reward -= np.mean(discounted_reward)
        discounted_reward /= np.std(discounted_reward)
        # print(discounted_reward.shape)
        
        # entrainement
        self.policy.train_on_batch([ states_history, discounted_reward ] , actions)
        self.histo_states = []
        self.histo_actions =[]
        self.histo_rewards = []
        
        

In [None]:
# transformation des données pour compatabilité avec l'alimentation du réseau de neurones (1,s)`
def trans_state(s):
    return  np.reshape(s, [1, nb_obs])

In [None]:
# graf=tools.Grafana("/var/www/html/files/train.csv")

In [None]:
env=gym.make('LunarLander-v3')
nb_obs=env.observation_space.shape[0]
nb_action=env.action_space.n
print(f"Nombre de carateristiques des états : {nb_obs}, nombre d actions possibles : {nb_action}")

In [None]:
agent=PolicyGradient(nb_obs,nb_action, learning_rate=0.001, gamma=0.99,layer1=64,layer2=32 ) # 0.98 l2 96

In [None]:
checkpoint_path=os.path.join("poids","pp_lunar_save_weights.hp5")

In [None]:
num_episodes = 20000
time_step = 0  # comptage du nombre total de mouvement
histoReturn=[] # pour graphique sur historique récompense

seuilWin=180
win=False

for i in range(num_episodes):
    # somme de la récompense total pour une cycle
    Return = 0
    
    # reset env et conversion state
    state = trans_state(env.reset()[0])
    done=False
    truncated=False
    startTime=time.time()
    moveCount=0
    while not(done or truncated):
        
        # décommenter pour affichage
        # env.render()
    
        time_step += 1
        moveCount += 1
                
        # sélection d'une action selon notre politique
        action = agent.act(state)
 
        # jouer l'action
        next_state, reward, done, truncated , _ = env.step(action)
        next_state=trans_state(next_state) # reformatage

        # on met en memoire
        agent.memory(state,action,reward)
        
        # et shift d'état
        state = trans_state(next_state)
        
        # cumul du retour G
        Return += reward

       
        # Done ?
        if done or truncated:
            # affichage du résultat du cyle
            duration=time.time()-startTime
            print('Episode: ',i, ',' 'Return', np.round(Return,2),', durée ',np.round(duration,2),' seconde en ',moveCount, ' mouvements, end=',truncated)
            # graf.add_record(i,Return,0,moveCount,duration)
            if np.mean(histoReturn[-5:]) > seuilWin:
                print("Yeah ! Gagné !")
                win=True
                
            histoReturn.append(Return)

            agent.train()
            
            # Sauvegarde des poids tous les 50 cycles
            #if i % 50 == 0:
            #    agent.policy.save_weights(checkpoint_path)
                
            break
    if win:
        break
        

In [None]:
plt.figure()
plt.plot(histoReturn)
plt.plot([np.mean(histoReturn[x:x+50]) for x in range(len(histoReturn)-50)])
plt.show()

# Visualisons le résultat !

Fonctionne uniquement si vous n'utilisez pas wsl/docker

In [10]:
env_test=gym.make('LunarLander-v3',render_mode="human")
nb_obs=env_test.observation_space.shape[0]
nb_action=env_test.action_space.n
print(f"Nombre de carateristiques des états : {nb_obs}, nombre d actions possibles : {nb_action}")

Nombre de carateristiques des états : 8, nombre d actions possibles : 4


In [None]:
state = trans_state(env_test.reset()[0])
done=False
truncated=False
while not(done or truncated):
        
        action = agent.act(state)
 
        # jouer l'action
        next_state, reward, done, truncated , _ = env_test.step(action)
        next_state=trans_state(next_state) # reformatage
    
        # et shift d'état
        state = trans_state(next_state)