In [1]:
import tkinter as tk
import time
import math
import random
import os
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
from collections import deque
import progressbar

from texttable import Texttable
from IPython.display import clear_output
import tensorflow as tf
tf_config=tf.compat.v1.ConfigProto()
tf_config.gpu_options.allow_growth=True
sess = tf.compat.v1.Session(config=tf_config)

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Activation, Flatten, Conv2D, MaxPooling2D, Dropout
from tensorflow.keras.optimizers import Adam
import keras

In [2]:
def get_2center(x0, y0, x1, y1):
    width = x1 - x0
    height = y1 - y0
    x = x0 + width / 2
    y = y0 + height / 2
    return x, y

def get_4points(x_center, y_center, width, height):
    x0 = x_center - width / 2
    y0 = y_center - height / 2
    x1 = x_center + width / 2
    y1 = y_center - height / 2
    x2 = x_center + width / 2
    y2 = y_center + height / 2
    x3 = x_center - width / 2
    y3 = y_center + height / 2
    return x0, y0, x1, y1, x2, y2, x3, y3

# Dessin des rails
def init_rails(height, thickness, canvas):
    canvas.create_line(0, height-thickness, 500, height-thickness, width=5)
    canvas.create_line(0, height+thickness, 500, height+thickness, width=5)

# Dessin de la route
def init_roads(width, thickness, canvas):
    canvas.create_line(width-thickness, 0, width-thickness, 500, width=2)
    canvas.create_line(width+thickness, 0, width+thickness, 500, width=2)

def rotate_point(x, y, center_x, center_y, angle_degre):
    radians = angle_degre * math.pi / 180
    x_rotated = (x - center_x) * math.cos(radians) - (y - center_y) * math.sin(radians) + center_x
    y_rotated = (x - center_x) * math.sin(radians) + (y - center_y) * math.cos(radians) + center_y
    return (x_rotated, y_rotated)

def rotate(polygon_id, angle, canvas):
    x0, y0, x1, y1, x2, y2, x3, y3 = canvas.coords(polygon_id)
    pt0, pt1, pt2, pt3 = [x0, y0], [x1, y1], [x2, y2], [x3, y3]
    x_center, y_center = get_2center(*pt0, *pt2)
    new_pt0 = rotate_point(*pt0, x_center, y_center, angle)
    new_pt1 = rotate_point(*pt1, x_center, y_center, angle)
    new_pt2 = rotate_point(*pt2, x_center, y_center, angle)
    new_pt3 = rotate_point(*pt3, x_center, y_center, angle)
    canvas.coords(polygon_id, *new_pt0, *new_pt1, *new_pt2, *new_pt3)

def move_forward(polygon, distance, angle):
    """Move the given polygon forward by the given distance (in pixels) along the path it faces.

  Args:
      polygon: The polygon object.
      distance: The distance to move the polygon (in pixels).
      angle: The angle at which the polygon is facing (in degrees).

  Returns:
      A tuple representing the amount of pixels to move the polygon in the x and y directions.
  """
    # Convert the angle to radians. 
    angle -= 90
    angle_rad = angle * math.pi / 180

    # Calculate the movement in the x and y directions.
    dx = distance * math.cos(angle_rad)
    dy = distance * math.sin(angle_rad)

    return (dx, dy)

In [3]:
WIDTH = 500        #si 15---8, 120 step----1
HEIGHT = 500

ROTATION = [-2, 0, +2] 
MOTION = [-1, 0, +1]
SPEED = 2

class VehicleSimulator:
    def __init__(self):
        self.window = tk.Tk()
        self.window.title("Simulator")
        self.canvas = tk.Canvas(self.window, width=WIDTH, height=HEIGHT)
        self.canvas.pack()
        self.steps = 0
        # Draw rails
        self.rails_thickness = 20
        self.init_rails(HEIGHT/2, self.rails_thickness)
        #self.init_rails(HEIGHT/3, self.rails_thickness)
       
        #horizontal road
        self.canvas.create_line(0, 130, 150, 130, width=2)
        self.canvas.create_line(0, 198, 150, 198, width=2)

        # vertical road
        #up
        self.canvas.create_line(210, 120, 210, 0, width=2)
        self.canvas.create_line(288, 120, 288, 0, width=2)

        #down
        self.canvas.create_line(210, 300, 210, 500, width=2)
        self.canvas.create_line(288, 300, 288, 500, width=2)

        #Draw transition zone
        self.transition_zone()

        #Draw vehicule
        self.x = 248
        self.y = HEIGHT-50
        self.aligned=False
        self.in_zone=False
        self.motion = MOTION[2]
        self.rotation = ROTATION[1]
        self.vehicle_angle = 0
        self.v_width, self.v_height = 40, 70
        self.vehicule(self.x,self.y,self.v_width, self.v_height)

        # Create buttons on grid(frame)
        self.frame = tk.Frame(self.window)
        self.frame.pack()

        label = tk.Label(self.frame, text="MOTION")
        label.grid(row=0, column=0)
        button1 = tk.Button(self.frame, text="backward", command=lambda: self.button_motion(0))
        button2 = tk.Button(self.frame, text="still", command=lambda: self.button_motion(1))
        self.button_f = tk.Button(self.frame, text="forward", command=lambda: self.button_motion(2))
        
        button1.grid(row=0, column=1)
        button2.grid(row=0, column=2)
        self.button_f.grid(row=0, column=3)

        label = tk.Label(self.frame, text="ROTATION")
        label.grid(row=1, column=0)
        self.button_l = tk.Button(self.frame, text="left", command=lambda: self.button_rotation(0))
        button5 = tk.Button(self.frame, text="still", command=lambda: self.button_rotation(1))
        self.button_r = tk.Button(self.frame, text="right", command=lambda: self.button_rotation(2))
        self.button_l.grid(row=1, column=1)
        button5.grid(row=1, column=2)
        self.button_r.grid(row=1, column=3)

        button_reset = tk.Button(self.frame, text="RESET", command=self.reset)
        button_reset.grid(row=0, column=4)
        button_start = tk.Button(self.frame, text="TRAIN", command=self.train)
        button_start.grid(row=1, column=4)

        # Text creation 
        self.text_x_y = self.canvas.create_text(400, 15, text="(x,y) = ({},{})".format(round(self.x,1),round(self.y,1)), font=('Arial', 10))
        self.text_vehicle_angle = self.canvas.create_text(400, 45, text="vehicle_angle = {}".format(round(self.vehicle_angle,1)), font=('Arial', 10))
        self.text_motion = self.canvas.create_text(400, 60, text="motion = {}".format(round(self.motion,1)), font=('Arial', 10))
        self.text_step = self.canvas.create_text(400, 80, text="step = {}".format(self.steps), font=('Arial', 10))

    # Buttons functions
    def button_motion(self, number):
        if number==0:
            self.motion = MOTION[0]
        elif number==1:
            self.motion = MOTION[1]
        elif number==2:
            self.motion = MOTION[2]  

        self.rotation = ROTATION[1]
        self.update_position()


    def button_rotation(self, number):
        if number==0:
            self.rotation = ROTATION[0]
        elif number==1:
            self.rotation = ROTATION[1]
        elif number==2:
            self.rotation = ROTATION[2]

        #self.motion = MOTION[1]
        self.update_position()
    
    def sample(self):
        l = [0,1,2]
        return np.random.choice(l)
        

    # Draw roads function
    def init_roads(self, width, thickness):
        self.canvas.create_line(width-thickness, 0, width-thickness, HEIGHT, width=2)
        self.canvas.create_line(width+thickness, 0, width+thickness, HEIGHT, width=2)

    # Draw transition zone function
    def transition_zone(self):
        self.canvas.create_line(150, 120, 340, 120, width=1)
        self.canvas.create_line(150, 300, 340, 300, width=1)
        self.canvas.create_line(150, 120, 150, 300, width=1)
        self.canvas.create_line(340, 120, 340, 300, width=1)
        self.canvas.create_line(150, 210, 340, 210, width=1)

    # Draw rails function   
    def init_rails(self,height, thickness):
        self.canvas.create_line(340, height-thickness, WIDTH, height-thickness, width=5)
        self.canvas.create_line(340, height+thickness, WIDTH, height+thickness, width=5)

    # Update position function
    def update_position(self):
        rotate(self.vehicle, self.rotation, self.canvas)
        self.vehicle_angle += self.rotation
        if self.rotation == 0: movement = move_forward(self.vehicle, self.motion*SPEED, self.vehicle_angle)
        else : movement = move_forward(self.vehicle, self.motion*(SPEED/2), self.vehicle_angle)
        self.canvas.move(self.vehicle, movement[0], movement[1])
        x0, y0, x1, y1, x2, y2, x3, y3 = self.canvas.coords(self.vehicle)
        self.x, self.y = get_2center(x0, y0, x2, y2)
        self.update_display()
        

    # Update text
    def update_display(self):
        self.canvas.itemconfig(self.text_x_y, text="(x,y) = ({},{})".format(round(self.x,1),round(self.y,1)))
        self.canvas.itemconfig(self.text_vehicle_angle, text="vehicle_angle = {}".format(round(self.vehicle_angle,1)))
        self.canvas.itemconfig(self.text_motion, text="motion = {}".format(round(self.motion,1)))
        self.canvas.itemconfig(self.text_step,text="step = {}".format(self.steps), font=('Arial', 10))
    
    #create vehicule
    def vehicule(self,x,y,v_width,v_height):
        self.vehicle = self.canvas.create_polygon(*get_4points(x,y,v_width,v_height), fill="green")
    
    # Reset the vehicle
    def reset(self):
        self.x = 248
        self.y = HEIGHT-50
        self.motion = MOTION[1]
        self.rotation = ROTATION[1]
        self.vehicle_angle = 0
        self.v_width, self.v_height = 40, 70
        self.canvas.delete(self.vehicle)
        self.vehicule(self.x,self.y,self.v_width, self.v_height)
        self.update_display()
     
        return [self.x,self.y]
    
    def step(self,rotation):
        self.steps += 1
        done = False
        
        reward=-0.1
        
        #mis à jour de position
        #self.button_rotation(rotation)
        if rotation == 0:
            self.button_l.invoke()
        elif rotation == 1:
            self.button_f.invoke()
        else:
            self.button_r.invoke()
       
        x0, y0, x1, y1, x2, y2, x3, y3 = self.canvas.coords(self.vehicle)
        
        print(x0, y0, x1, y1, x2, y2, x3, y3)
		
        if y0>300 and y1>300 and y2>300 and y3>300:
            self.in_zone=False
        
        if y0<300 and y1<300 and y2<300 and y3<300:
            self.in_zone=True
            
        # sortir de la route principale (amélioration possible)
        if x0<210 or x1>288 and not self.in_zone :
            reward=-100
            done = True

        #alignement avel les rails
        if  y0==230 and y1==270 and y0==y2 and y1==y3 and self.aligned==False:
            self.aligned = True
            reward=5

        # se désaligner
        if self.aligned and rotation!=0:
            reward=-10
            self.aligned=False
            
        
        #Punition à la sortie de la zone de transition
        if self.in_zone and ((y0<210 or y1<210 or y2<210 or y3<210) or (x0<150 or x1<150 or x2<150 or x3<150) or (y0>300 or y1>300 or y2>300 or y3>300) or ( self.aligned==False and (x0>340 or x1>340 or x2>340 or x3>340))):
            reward=-100
            done=True
            

        #success
        if self.aligned==True and self.x>=248:
            reward=100
            done=True
        # Angle  reset
        self.vehicle_angle %= 360
        #time.sleep(0.001)
        
       
        
        return [self.x,self.y],reward,done
    
    def train(self):
        for episode in range(episodes):
            self.steps = 0
             # On affiche l'épisode en cours
            print("\033[1m" + "\033[94m" + "###"*13 + " Episode {} / {} ".format(episode+1, episodes) + "###"*18 + "\033[00m" + "\033[0;0m")

            # A chaque épisode, on ré-initialise l'état de l'environnement
            state = env.reset()

            # La conséquence de l'initialisation est que le jeu ne peut pas être terminé, donc done=False    
            done = False

            # On initialise à 0 la variable steps qui est le compteur du nombre d'étapes (et donc d'actions) réalisés
            # au cours d'un épisode
            episode_steps = 0

            # On initialise à 0 la durée de l'épisode
            episode_duration = 0

            # On initialise la récompense cumulée sur un episode
            total_reward = 0

            # On initialise le nombre d'atterissages réussis
            sucessfull_landings = 0

            # On calcule le nombre de steps maximum à réaliser pour l'épisode en cours
            max_steps = get_max_steps(episode)


            # On initialise les bars de progression
            episode_bar = progressbar.ProgressBar()
            episode_bar.start(max_value=get_max_steps(episode+1))


            # On lance le chronomètre pour cet épisode
            t0 = time.time()

            # On commence par itérer tant que l'épisode n'est pas terminé
            #def store_transition(self, state, action, reward, next_state, done):
            while not done:
                action = agent.choose_action(state)
                print(action)
                next_state, reward, done = env.step(action)
                env.window.update()
                print(next_state,reward,done)
                agent.store_transition(state,action,reward,next_state,done)
                agent.train_on_batch()
                if overall_steps % agent.update_rate == 0:
                    agent.update_target_model()
                agent.perform_epsilon_decay()
                total_reward += reward
                state = next_state
                episode_steps += 1
                if done and reward == 100:
                    sucessfull_landings += 1

                if episode_steps >= max_steps or done:
                    episode_duration = time.time() - t0
                    break

            episode_duration_list.append(episode_duration)
            episodes_rewards_list.append(total_reward)
            sucessfull_landings_list.append(sucessfull_landings)
            if episode % 25: last25_episodes_rewards_list.append(np.mean(episodes_rewards_list[-25:]))
            if episode % 50: last25_episodes_rewards_list.append(np.mean(episodes_rewards_list[-50:]))
            if episode % 100: last25_episodes_rewards_list.append(np.mean(episodes_rewards_list[-100:]))
            test = display_metrics(total_reward = total_reward, sucessfull_landings = sucessfull_landings, 
                                   episode_duration = episode_duration,episode_bar = episode_bar,current_step = episode_steps,
                                   done = done,max_steps = max_steps)
            if test: agent.model_policy.save('Model.h5')
    

In [4]:
class DQNAgent:

    """
    Classe de l'agent DQN implémentant l'algorithme Deep Q-Network
    """
    
    def __init__(self, state_size, action_size):
        
        # L'attribut state_size représente la dimension de l'espace d'états, 
        # c'est-à-dire les dimensions de l'image de l'environnement de jeu
        self.state_size = state_size
        
        # L'attribut action_size représente la dimension de l'espace d'actions
        self.action_size = action_size
        
        # L'attribut render représente l'autorisation d'afficher le jeu pendant l'entrainement
        self.render = True
        
        # Hyperparametres
        
        # L'attribut gamma représente le facteur de dépréciation dans le calcul de la récompense cumulée
        self.gamma = 0.99
        
        # L'attribut epsilon représente le facteur d'exploration initial
        self.epsilon = 1 
        
        # L'attribut lr représente le taux d'apprentissage du réseau de neurones DQN
        self.lr = 0.0005 
        
        # L'attribut batch_size représente la taille de lots utilisée pendant l'entrainement
        self.batch_size = 64
        
        # L'attribut epsilon_min représente le facteur d'exploration minimal 
        # en deça duquel l'agent ne peut jamais descendre pendant son entrainement
        self.epsilon_min = 0.01
        
        # L'attribut epsilon_decay représente le facteur de décroissance qu'on applique à epsilon 
        # pour réduire le facteur d'exploration au cours de l'entrainement
        self.epsilon_decay = 0.9995
        
        # L'attribut update_rate représente le nombre d'étapes au terme duquel on transfère 
        # les poids du réseau "policy" vers le réseau "target"
        self.update_rate = 100
        
        # L'attribut replay_buffer_size représente la taille du buffer d'expériences utilisé pendant l'entrainement
        self.replay_buffer_size = 15000
        
        # L'attribut replay_buffer_ùin_size représente la taille minimale du buffer d'expériences 
        # avant de pouvoir commencer à l'utiliser pendant l'entrainement
        self.replay_buffer_min_size = 150
        
        # L'attribut mémory représente le buffer d'expérience. 
        # C'est un objet deque d'une profondeur de 5000 éléments maximum
        self.replay_buffer = deque(maxlen=self.replay_buffer_size)
        
        self.model_policy = self.build_model()
        self.model_target = self.build_model()
        
        self.update_target_model()
    
    #
    # Méthode pour réduire graduellement la valeur du facteur d'exploration
    #
    def perform_epsilon_decay(self):
        
        '''
        Méthode permettant de réduire la valeur du facteur d'exploration (epsilon)

                Parameters:
                        - self : l'instance de classe, permettant d'accéder à tous les attributs de la classe

                Returns:
                        Rien : la fonction réduit à chaque appel l'attribut epsilon d'un facteur de epsilon_decay
                               sans jamais descendre en dessous de epsilon_min
        '''
        
        if self.epsilon > self.epsilon_min:
            self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)

    #
    # Méthode pour construire un modèle de réseau neuronal
    #
    def build_model(self):
        
        '''
        Méthode permettant de définir et de compiler un modèle de réseau de neurones

                Parameters:
                        - self : l'instance de classe, permettant d'accéder à tous les attributs de la classe


                Returns:
                        model (Keras Model) : Modèle Keras représentant le modèle compilé défini 
                                              suivant une architecture
        '''
        model = tf.keras.Sequential()
        model.add(tf.keras.layers.Dense(128, activation='relu', input_shape=self.state_size))
        model.add(tf.keras.layers.Dense(128, activation='relu'))
        model.add(tf.keras.layers.Dense(3, activation='linear'))

        model.compile(optimizer=Adam(learning_rate=self.lr), loss='mean_squared_error', metrics=['accuracy'])
        
        return model
            
 

    #
    # Méthode pour stocker des explériences (transitions) dans un buffer
    #
    def store_transition(self, state, action, reward, next_state, done):
        
        '''
        Méthode permettant rajouter une transition dans le buffer d'expérience

                Parameters:
                        - self : l'instance de classe, permettant d'accéder à tous les attributs de la classe
                        - state (Numpy ndarray) : Matrice représentant l'état courant de l'environnement
                        - action (int) : Nombre entier représentant l'action entreprise par l'agent
                        - reward (float) : Nombre réel représentant la récompense immédiate obtenue de l'environnement
                        - next_state (Numpy ndarray) : Matrice représentant l'état suivant de l'environnement


                Returns:
                        Rien : modification de l'attribut memory pour y rajouter une transition
        '''
        self.replay_buffer.append((state,action,reward,next_state,done))
            

        
    #
    # Méthode pour choisir l'action à entreprendre suivant une politique epsilon-greedy
    #
    def choose_action(self, state):
        
        '''
        Méthode permettant de choisir l'action à entreprendre par l'action en suivant une politique epsilon-greedy

                Parameters:
                        - self : l'instance de classe, permettant d'accéder à tous les attributs de la classe
                        - state (Numpy ndarray) : Matrice représentant l'état courant de l'environnement


                Returns:
                        - action (int) : Nombre entier représentant l'action à entreprendre par l'agent
        '''
        policy = self.model_policy
        n = random.uniform(0, 1)
        if n < self.epsilon_decay:
            action = env.sample()
            return action
        else:
            state_array = tf.expand_dims(state, axis=0)
            print(state_array)
            temp = self.model_policy.predict(state_array)
            return np.argmax(temp[0])
      
    
    #
    # Méthode pour sélectionner des batchs de données dans le replay_buffer
    #    
    def sample_batches_from_buffer(self):
        
        '''
        Méthode permettant d'échantillonner des vecteurs listes de valeurs à partir du buffer d'expérience

                Parameters:
                        - self : l'instance de classe, permettant d'accéder à tous les attributs de la classe


                Returns:
                        - states_list (List) : Liste Python contenant l'ensemble des 'batch_size' états de l'environnement
                                          aléatoirement choisis dans le replay_buffer
                        - actions_list (List) : Liste Python contenant l'ensemble des 'batch_size' actions de 
                                           l'environnement aléatoirement choisis dans le replay_buffer
                        - rewards_list (List) : Liste Python contenant l'ensemble des 'batch_size' récompenses 
                                           de l'environnement aléatoirement choisis dans le replay_buffer
                        - next_states_list (List) : Liste Python contenant l'ensemble des 'batch_size' états suivants 
                                               de l'environnement aléatoirement choisis dans le replay_buffer
                        - dones_list (List) : Liste Python contenant l'ensemble des 'batch_size' done de 
                                              l'environnement aléatoirement choisis dans le replay_buffer
        '''
        
        # Par exemple, pour batch_size = 4, on choisit un batch aléatoire de transitions
        # batch = [(s1, a1, r1, s1', d1), (s2, a2, r2, s2', d2), (s3, a3, r3, s3', d3), (s4, a4, r4, s4', d4)]
        # La fonction doit créer des batchs pour chaque variable (states, actions, rewards, next_states, dones)
        # Par exemple :
        # states      = [s1, s2, s3, s4]
        # actions     = [a1, a2, a3, a4]
        # rewards     = [r1, r2, r3, r4]
        # next_states = [s1', s2', s3', s4']
        # dones       = [d1, d2, d3, d4]
        
        indices = np.random.randint(0,len(self.replay_buffer), size=self.batch_size)
   
        batch = [self.replay_buffer[i] for i in indices]
        list_index = []
        states_list = []
        actions_list = []
        rewards_list = []
        next_states_list = []
        dones_list = []
        
        
        # On boucle enfin sur la liste des transitions, et on stocke chaque élément dans la bonne liste
        for temp in batch:
            states_list.append(temp[0])
            actions_list.append(temp[1])
            rewards_list.append(temp[2])
            next_states_list.append(temp[3])
            dones_list.append(temp[4])

        return states_list, actions_list, rewards_list, next_states_list, dones_list

    

    #
    # Méthode pour calculer les valeur à cible (de la Q-value) à utiliser comme valeurs d'apprentissage des réseaux de neurones
    #    
    def compute_targets(self, states_list, actions_list, rewards_list, next_states_list, dones_list):
        
        '''
        Méthode permettant de calculer les valeurs cibles des valeurs d'actions pour les batchs d'états dans actions_list
        states_list

                Parameters:
                        - self : l'instance de classe, permettant d'accéder à tous les attributs de la classe
                        - states_list (List) : Liste Python contenant l'ensemble des 'batch_size' états de l'environnement
                                          aléatoirement choisis dans le replay_buffer
                        - actions_list (List) : Liste Python contenant l'ensemble des 'batch_size' actions de 
                                           l'environnement aléatoirement choisis dans le replay_buffer
                        - rewards_list (List) : Liste Python contenant l'ensemble des 'batch_size' récompenses 
                                           de l'environnement aléatoirement choisis dans le replay_buffer
                        - next_states_list (List) : Liste Python contenant l'ensemble des 'batch_size' états suivants 
                                               de l'environnement aléatoirement choisis dans le replay_buffer
                        - dones_list (List) : Liste Python contenant l'ensemble des 'batch_size' done de 
                                              l'environnement aléatoirement choisis dans le replay_buffer


                Returns:
                        - final_targets (List) : Liste Python contenant les 'batch_size' vecteurs contenant 
                                                       chacun les valeurs de chaque action
        '''
        
        # Par exemple, pour un batch_size = 6, la liste des vecteurs de valeurs cibles pour les 6 états 
        # states = [s1, s2, s3, s4, s5, s6] choisis aléatoirement dans le replay_buffer aura cette forme 
        # [[0.1, 0.4, 0.1, 1.1], ==> vecteur des valeurs-cibles d'action pour s1
        #  [4.1, 0.9, 0.3, 0.1], ==> vecteur des valeurs-cibles d'action pour s2
        #  [0.8, 0.3, 9.8, 9.0], ==> vecteur des valeurs-cibles d'action pour s3
        #  [0.1, 0.2, 7.3, 7.9], ==> vecteur des valeurs-cibles d'action pour s4
        #  [6.3, 0.1, 2.0, 5.4], ==> vecteur des valeurs-cibles d'action pour s5
        #  [3.9, 0.1, 0.3, 5.3]] ==> vecteur des valeurs-cibles d'action pour s6
        
        #np.squeeze est utilisé ici pour réduire la dimension du np.array
        # (batch_size, 8, 1) --> (batch_size, 8)
        # Cf https://numpy.org/doc/stable/reference/generated/numpy.squeeze.html   
        
        states_list = np.squeeze(np.array(states_list))
        next_states_list = np.squeeze(np.array(next_states_list))
        
        current_val = self.model_policy.predict_on_batch(states_list)
        final_val = current_val 
        
        future_val = self.model_policy.predict_on_batch(next_states_list)
        real_targets = np.array(rewards_list)+ self.gamma * (np.amax(np.array(future_val),axis=1)) * (1 - np.array(dones_list))
       
        final_val[ [i for i in range(self.batch_size)] , [np.array(actions_list)] ] = real_targets
        
        return final_val


    #
    # Méthode pour entrainer le modèle neuronal en utilisant des échantillons de données dans le buffer d'expériences
    #
    def train_on_batch(self):
        
        '''
        Méthode permettant d'entrainer le modèle neuronal policy 

                Parameters:
                        - self : l'instance de classe, permettant d'accéder à tous les attributs de la classe


                Returns:
                        - Rien : la fonction échantillonne les données du replay_buffer
                                 calcule les target et entraine le réseau "policy"
        '''
        
        # On vérifie la taille du replay_buffer : s'il n'y a pas assez d'éléments dans le replay_buffer,
        # on ne fait rien
        if len(self.replay_buffer) < self.replay_buffer_min_size:
            return 
        else:
            states_list, actions_list, rewards_list, next_states_list, dones_list = self.sample_batches_from_buffer();
            final_targets = self.compute_targets(states_list, actions_list, rewards_list, next_states_list, dones_list)
            self.model_policy.fit(np.squeeze(np.array(states_list)),np.array(final_targets))
            
        
        

    #
    # Méthode pour de mettre à jour les poids du réseau "target" à partir du réseau local
    #
    def update_target_model(self):
        
        
        '''
        Méthode permettant  de de mettre à jour les poids du réseau "target" à partir du réseau local

                Parameters:
                        - self : l'instance de classe, permettant d'accéder à tous les attributs de la classe


                Returns:
                        - Rien : transfère les poids de model dans target_model
        '''
        
        wt = self.model_policy.get_weights()
        self.model_target.set_weights(wt)
        

                        
                    
    #
    # Méthode pour charger un fichier de poids dans le modèle local
    #
    def load(self, name):
        
        '''
        Méthode permettant de charger un fichier de poids dans le modèle local

                Parameters:
                        - self : l'instance de classe, permettant d'accéder à tous les attributs de la classe


                Returns:
                        - Rien : Charge les poids d'un fichier de poids stocké au chemin d'accès "name" dans model 
        '''
        
        print("[INFO] : Loading model from disk at ", name, "\n")     
        self.policy_model.load_weights(name)

    #
    # Méthode pour enregistrer les poids de model dans un fichier de poids
    #
    def save(self, name):
        
        
        '''
        Méthode permettant d'enregistrer les poids de model dans un fichier de poids

                Parameters:
                        - self : l'instance de classe, permettant d'accéder à tous les attributs de la classe


                Returns:
                        - Rien : Enregistre les paramètres de model dans un fichier de poids stocké au chemin d'accès "name"
        '''
        
        print("[INFO] : Saving model to disk at ", name, "\n")
        self.policy_model.save_weights(name)

        

In [8]:
# Dimensions d'un état environnement après redimensionnement
state_size = (2,)

# Nombre d'actions possibles dans l'environnement
action_size = 3

# On instancie notre agent DQN
agent = DQNAgent(state_size, action_size)

# Nombre d'épisodes pendant lequel il faut entrainer
episodes = 500

# Batch size pour échantillonner les transitions
batch_size = 32 

# Compteur du nombre de timestep
overall_steps = 0  

# Liste d'enregistrement des récompenses cumulées à chaque épisode
episodes_rewards_list = list()

# Liste d'enregistrement des moyennes de récompenses cumulées sur les 25 derniers épisodes
last25_episodes_rewards_list = list()

# Liste d'enregistrement des moyennes de récompenses cumulées sur les 50 derniers épisodes
last50_episodes_rewards_list = list()

# Liste d'enregistrement des moyennes de récompenses cumulées sur les 100 derniers épisodes
last100_episodes_rewards_list = list()

# Liste d'enregistrement des atterissages réussis
sucessfull_landings_list = list()

# Liste d'enregistrement des durées de chaque épisode
episode_duration_list = list()

# Intervalles d'épisodes avec le nombre de steps maximum. Cela définit une stratégie d'entrainement
# Ici STRATEGY_DICT permet d'appliquer la stratégie suivante
# Pour les 150 premiers épisodes, l'agent ne doit pas éffectuer plus de 300 steps
# Pour les épisodes entre 150 et 400, l'agent ne doit pas effectuer plus de 500 steps
# Au delà de 500 épisodes, l'agent ne doit pas effectuer plus de 700 épisodes
# Cela permet d'avoir un entrainement plus rapide, et un apprentissage par pallier.
STRATEGY_DICT = {"0-150":300, "150-400":500, "400-{}".format(episodes+1):700}


In [5]:
def get_max_steps(episode_number):
    
    '''
        Fonction permettant de retourner le nombre maximum de steps à effectuer sur l'épisode en cours

                Parameters:
                       
                        - episode_number (Int) : Nombre entier représentant le numéro de l'épisode actuel
                        
                Returns:
                        - steps_max (Int) : Nombre entier représentant le nombre maximum de steps 
                                            à effectuer sur l'épisode en cours
    '''
    
    for interval_str, steps_max in STRATEGY_DICT.items():
        low_interval = int(interval_str.split("-")[0])
        high_interval = int(interval_str.split("-")[1])
        
        if episode_number < high_interval and episode_number >= low_interval:
            
            return steps_max

In [6]:
def display_metrics(total_reward, 
                    sucessfull_landings, 
                    episode_duration, 
                    episode_bar, 
                    current_step, 
                    done, 
                    max_steps,
                    display_frequency = 10):
    
    '''
        Fonction permettant d'afficher la progression de l'entrainement à chaque action réalisée, d'afficher 
        un récapitulatif des performances à chaque fin d'épisode, et d'afficher des graphiques d'entrainement
        à intervalle régulier

                Parameters:
                       
                        - total_reward (Float) : Nombre réel représentant la récompense cumulée obtenue à la fin 
                                                 d'un épisode.
                        - sucessfull_landings (Int) : Nombre entier représentant, à la fin d'un épisode, le nombre
                                                      d'atterrissage réussis par l'agent depuis le début de 
                                                      l'entrainement 
                        - episode_duration (Float) : Nombre réel représentant la durée en secondes de l'épisode
                        - episode_bar (ProgressBar()) : Objet ProgressBar() représentant un widget destiné à 
                                                        afficher la progression de l'entrainement
                        - current_step (Int) : Nombre entier représentant le nombre de steps réalisés à 
                                               un instant donné d'un épisode.
                        - done (Booléen) : Booléen représentant si l'épisode est terminé ou non
                        - episode_number (Int) : Nombre entier représentant le numéro de l'épisode actuel
                        - max_steps (Int) : Nombre entier représentant le nombre maximum de steps autorisés
                                            pour l'épisode en cours
                        - display_frequency (Int) : Nombre entier représentant la fréquence (en nombre d'épisodes)
                                                    à laquelle afficher les courbes d'apprentissages de l'agent

                        
                Returns:
                        - is_better (Booléen) : Booléen représentant l'agent a progressé ou non entre les derniers
                                                épisodes et l'épisode en cours. L'agent a progressé s'il a augmenté
                                                sa récompense cumulée.
    '''
    
    # On commence par initialiser is_better à False. 
    # On le passera dans la suite à True si les conditions sont réunies
    is_better = False
    
    # Premier cas de traitement : l'épisode est terminé ou l'agent à excéder le nombre maximum de steps autorisés
    if done or current_step >= max_steps:
        
        # On initialise toutes les variables ... Sinon Python ne sera pas content !
        best_id_before = best_reward_before = best_id_after = best_reward_after = 0
        
        # On commence par déterminer la valeur maximum de récompense cumulée des derniers épisodes ainsi que l'épisode
        # à laquelle cette valeur maximum a été obtenue. Il faut s'assurer que la liste des récompenses cumulées
        # n'est pas vide. Sinon Python ne sera pas content !
        if len(episodes_rewards_list) > 0:
            # La fonction argsort() permet de trouver une liste ordonnée des indices des valeurs les plus
            # élevées d'une liste
            # Par exemple, [0.65, -9.6, 808.12, 11.1, 0.0078] ==> [2, 3, 0, 4, 1]
            best_id_before = np.array(episodes_rewards_list).argsort()[-1]
            best_reward_before = np.array(episodes_rewards_list)[best_id_before]
        
        # On rajoute la dernière récompense cumulée à la liste d'enregistrement des récompenses cumulées
        episodes_rewards_list.append(total_reward)
        
        # On rajoute la dernière valeur des atterrissages réussis à la liste d'enregistrement 
        # des atterrissages réussis
        sucessfull_landings_list.append(sucessfull_landings)
        
        #  On rajoute la durée du dernier épisode à la liste d'enregistrement des durées
        episode_duration_list.append(episode_duration)
        
        # On calcule ensuite la moyenne glissante des récompenses cumulées sur les 25, 50 et 100 derniers épisodes    
        last25_episodes_rewards_list.append(np.mean(episodes_rewards_list[-25:]))
        last50_episodes_rewards_list.append(np.mean(episodes_rewards_list[-50:]))
        last100_episodes_rewards_list.append(np.mean(episodes_rewards_list[-100:]))
        
        # On détermine la valeur maximum de récompense cumulée après avoir rajouté la dernière récompense cumulée
        # ainsi que l'épisode à laquelle cette valeur maximum a été obtenue. 
        # Il faut s'assurer que la liste des récompenses cumulées n'est pas vide.  Sinon Python ne sera pas content !    
        if len(episodes_rewards_list) > 0:
            # On réutilise la fonction argsort()
            best_id_after = np.array(episodes_rewards_list).argsort()[-1]
            best_reward_after = np.array(episodes_rewards_list)[best_id_after]
        
        # Si la récompense cumulée a augmenté après le rajout de la valeur du dernier épisode, alors 
        # l'agent s'est amélioré. 
        if best_reward_after > best_reward_before :
            # On affiche cette amélioration
            print("\033[1m" + '\033[92m' + "[INFO] : Récompense obtenue améliorée de {:.1f} (épisode {}) à {:.1f}\n".
                  format(best_reward_before, best_id_before+1, best_reward_after) + "\033[00m"+ "\033[0;0m")
            
            # Et on passe la variable is_better à True
            is_better = True
        
                
        # Si la récompense cumulée n'a pas augmenté, on rappelle juste la valeur maximale obtenue jusqu'ici
        # et l'épisode auquel cette valeur maximale a été obtenue
        else:
            print("[INFO] : Récompense maximale obtenue jusqu'ici : {:.1f} (épisode {})\n".
                  format(best_reward_before, best_id_before+1))
        
        # Dans cette partie, on va afficher à la fin d'un épisode des statistiques sur la performance de l'agent 
        # On utilise un objet Texttable de largeur maximum 100 pixels
        t = Texttable(max_width=100)
        
        # On centre l'ensemble des colonnes avec l'argument "c"
        t.set_cols_align(["c", "c", "c", "c", "c", "c", "c"])
        
        # On rajoute et on affiche les statistiques
        t.add_rows([['Récompense', 'Steps', "Durée de l'épisode",'Récompense moy. [100]', 'Récompense moy. [50]', 'Récompense moy. [25]', 'Atterissage'], 
                    ["{:.1f}".format(total_reward), current_step, 
                     "{:.1f} secondes".format(episode_duration_list[-1]),
                     "{:.1f}".format(last100_episodes_rewards_list[-1]), 
                     "{:.1f}".format(last50_episodes_rewards_list[-1]), 
                     "{:.1f}".format(last25_episodes_rewards_list[-1]), 
                     np.sum(sucessfull_landings_list)]])
        print(t.draw(), "\n")

        
        # A la fin de l'épisode, on mets la barre de progression à 100% (max_value)
        episode_bar.update(int(episode_bar.max_value))
        episode_bar.finish()
        
        # Si on atteint la fréquence d'affichage des courbes d'apprentissage
        if len(episodes_rewards_list)%display_frequency == 0:
            
            # On divise le graphique en 4 sous-graphiques pour afficher 4 variables        
            fig, axs = plt.subplots(4, figsize=(12, 12))
            
            # On donne un nom au graphique principal
            fig.suptitle("Evolution de performances à l'épisode {}".format(len(episodes_rewards_list)))
            
            ######## On s'occupe ici du 1er sous-graphique représenté par la variable axs[0]: 
            ######## afficher l'évolution de la récompense cumulée au cours de l'entrainement
            ######## La méthode plot() va afficher un graphe "ligne" par défaut
            axs[0].plot(np.array(range(len(episodes_rewards_list)))+1, np.array(episodes_rewards_list))
            
            # On va tracer une ligne verticale sur le sous-graphique ax[0] qui va représenté la valeur maximale
            # obtenue jusqu'ici pour la récompense cumulée
            
            # Pour ce faire, on va déterminer la limite haute et la limite basse de cette ligne
            ymin,ymax = axs[0].get_ylim()
            
            # On détermine aussi la position en x, c'est-à-dire l'épisode auquel cette valeur a été obtenue
            x_line = np.max([best_id_before, best_id_after])+1
            
            # On dessine enfin la ligne verticale avec les éléments calculés précédemment
            axs[0].vlines(x=x_line, ymin=ymin, ymax=ymax, colors='red', label="Meilleure valeur")
            
            # On affiche une légende pour une meilleure interprétabilité du graphique
            axs[0].legend(loc='best')
            
            # On affiche le titre du sous-graphique pour une meilleure interprétabilité également
            axs[0].title.set_text("Récompense cumulée")
            
            # Enfin, on affiche une étiquette pour indiquer à l'écrit la valeur maximale et l'épisode
            axs[0].text(x_line-1, 
                        ymin+1, 
                        "Valeur max = {:.1f} | Episode = {}".format(episodes_rewards_list[x_line-1], x_line),
                        fontsize=10,
                        bbox = dict(facecolor="wheat", alpha=0.7))
            
            
            ######## On s'occupe ici du 1er sous-graphique représenté par la variable axs[1]: 
            ######## afficher l'évolution de la moyenne des 25 dernières récompenses cumulées 
            ######## La méthode plot() va afficher un graphe "ligne" par défaut
            axs[1].plot(np.array(range(len(last25_episodes_rewards_list)))+1, np.array(last25_episodes_rewards_list), 'o')
            
            # On va tracer une ligne verticale sur le sous-graphique ax[1] qui va représenté la valeur maximale
            # obtenue jusqu'ici pour la moyenne des 25 dernières récompenses cumulées
            
            # Pour ce faire, on va déterminer la limite haute et la limite basse de cette ligne
            ymin,ymax = axs[1].get_ylim()
            
            # On détermine aussi la position en x, c'est-à-dire l'épisode auquel cette valeur a été obtenue
            x_line = np.argmax(np.array(last25_episodes_rewards_list))+1
            
            # On dessine enfin la ligne verticale avec les éléments calculés précédemment
            axs[1].vlines(x=x_line, ymin=ymin, ymax=ymax, colors='red', label="Meilleure valeur")
            
            # On affiche une légende pour une meilleure interprétabilité du graphique
            axs[1].legend(loc='best')
            
            # On affiche le titre du sous-graphique pour une meilleure interprétabilité également
            axs[1].title.set_text("Récompense moyenne des 25 derniers épisodes")
            
            # Enfin, on affiche une étiquette pour indiquer à l'écrit la valeur maximale et l'épisode
            axs[1].text(x_line-1, 
                        ymin+1, 
                        "Valeur max = {:.1f} | Episode = {}".format(last25_episodes_rewards_list[x_line-1], x_line),
                        fontsize=10,
                        bbox = dict(facecolor="wheat", alpha=0.7))

            ######## On s'occupe ici du 1er sous-graphique représenté par la variable axs[2]: 
            ######## afficher l'évolution de la moyenne des 100 dernières récompenses cumulées 
            ######## La méthode plot() va afficher un graphe "ligne" par défaut
            axs[2].plot(np.array(range(len(last100_episodes_rewards_list)))+1, np.array(last100_episodes_rewards_list), '+')
            
            # On va tracer une ligne verticale sur le sous-graphique ax[2] qui va représenté la valeur maximale
            # obtenue jusqu'ici pour la moyenne des 100 dernières récompenses cumulées
            
            # Pour ce faire, on va déterminer la limite haute et la limite basse de cette ligne
            ymin,ymax = axs[2].get_ylim()
            
            # On détermine aussi la position en x, c'est-à-dire l'épisode auquel cette valeur a été obtenue
            x_line = np.argmax(np.array(last100_episodes_rewards_list))+1
            
            # On dessine enfin la ligne verticale avec les éléments calculés précédemment
            axs[2].vlines(x=x_line, ymin=ymin, ymax=ymax, colors='red', label="Meilleure valeur")
            
            # On affiche une légende pour une meilleure interprétabilité du graphique
            axs[2].legend(loc='best')
            
            # On affiche le titre du sous-graphique pour une meilleure interprétabilité également
            axs[2].title.set_text("Récompense moyenne des 100 derniers épisodes")
            
            # Enfin, on affiche une étiquette pour indiquer à l'écrit la valeur maximale et l'épisode
            axs[2].text(x_line-1, 
                        ymin+1, 
                        "Valeur max = {:.1f} | Episode = {}".format(last100_episodes_rewards_list[x_line-1], x_line),
                        fontsize=10,
                        bbox = dict(facecolor="wheat", alpha=0.7))


            ######## On s'occupe ici du 1er sous-graphique représenté par la variable axs[2]: 
            ######## afficher l'évolution du nombre d'atterrissages réussis 
            ######## La méthode plot() va afficher un graphe "ligne" par défaut
            cumsum_landings = np.cumsum(sucessfull_landings_list) 
            axs[3].plot(np.array(range(len(cumsum_landings)))+1, cumsum_landings)
            
            # On va tracer une ligne verticale sur le sous-graphique ax[3] qui va représenté la valeur maximale
            # obtenue jusqu'ici pour la moyenne des 100 dernières récompenses cumulées
            
            # Pour ce faire, on va déterminer la limite haute et la limite basse de cette ligne
            ymin,ymax = axs[3].get_ylim()
            
            # On détermine aussi la position en x, c'est-à-dire l'épisode auquel cette valeur a été obtenue
            x_line = np.argmax(cumsum_landings)+1
            
            # On dessine enfin la ligne verticale avec les éléments calculés précédemment
            axs[3].vlines(x=x_line, ymin=ymin, ymax=ymax, colors='red', label="Meilleure valeur")
            
            # On affiche une légende pour une meilleure interprétabilité du graphique
            axs[3].legend(loc='best')
            
            # On affiche le titre du sous-graphique pour une meilleure interprétabilité également
            axs[3].title.set_text("Nombre d'atterrissages réussis depuis le début")
            
            # Enfin, on affiche une étiquette pour indiquer à l'écrit la valeur maximale et l'épisode
            axs[3].text(x_line-1, 
                        0, 
                        "Valeur max = {:.1f} | Episode = {}".format(cumsum_landings.tolist()[x_line-1], x_line),
                        fontsize=10,
                        bbox = dict(facecolor="wheat", alpha=0.7))
            
            # On appelle enfin la méthode plot qui afficher afficher à l'ecran le graphique
            plt.show()
    
    # Dans le cas où l'épisode n'est pas terminé
    else:
        # On incrémente juste la progressbar pour l'affichage
        episode_bar.update(current_step)
    
    return is_better

In [9]:
env = VehicleSimulator()
env.window.mainloop()

  0% (0 of 300) |                        | Elapsed Time: 0:00:00 ETA:  --:--:--

[1m[94m####################################### Episode 1 / 500 ######################################################[00m[0;0m
0
226.79070107503054 415.71931098838166 266.76633415579437 414.32333112028164 269.20929892496946 484.28068901161834 229.23366584420563 485.67666887971836
[248.0, 450.0] -0.1 False
0
225.6072424137591 416.48038771578865 265.5098044241521 413.69012876602363 270.39275758624086 483.51961228421135 230.4901955758479 486.30987123397637
[248.0, 450.0] -0.1 False
0
224.45106587826663 417.2823029274635 264.23194169299757 413.1011643967573 271.54893412173334 482.7176970725365 231.7680583070024 486.8988356032427
[248.0, 450.0] -0.1 False
0
223.32358009156627 418.12407961324635 262.9343028412291 412.5571555748437 272.6764199084337 481.87592038675365 233.0656971587709 487.4428444251563
[247.99999999999997, 450.0] -0.1 False
1
223.04523388964614 416.14354347576324 262.65595663930895 410.5766194373606 272.3980737065135 479.89538424927053 232.78735095685076 485.462308287673

100% (300 of 300) |######################| Elapsed Time: 0:00:00 Time:  0:00:00
  0% (0 of 300) |                        | Elapsed Time: 0:00:00 ETA:  --:--:--

212.4606175474217 376.8135066582768 251.58652157677417 368.49703902556655 266.1403399340174 436.96737107693303 227.01443590466494 445.2838387096434
[239.30047874071954, 406.8904388676049] -0.1 False
0
211.18537597361143 376.79823063366234 249.99720502465155 367.1213548096757 266.93173771662833 435.04205564899553 228.11990866558827 444.71893147298226
[239.05855684511988, 405.92014314132894] -0.1 False
2
212.01078396100428 374.86506333126704 251.13668799035676 366.54859569855677 265.6905063476 435.01892774992325 226.56460231824752 443.33539538263364
[238.85064515430213, 404.94199554059514] -0.1 False
0
210.73554238719402 374.84978730665256 249.54737143823414 365.17291148266594 266.4819041302109 433.09361232198574 227.67007507917086 442.7704881459725
[238.60872325870247, 403.97169981431915] -0.1 False
0
209.46054453052398 374.87902587687955 247.911012368057 363.8535316441997 267.20562727524697 431.1418503598821 228.755159437714 442.1673445925621
[238.33308590288547, 403.0104381183808] -10

100% (300 of 300) |######################| Elapsed Time: 0:00:00 Time:  0:00:00
  0% (0 of 300) |                        | Elapsed Time: 0:00:00 ETA:  --:--:--

209.97902915391228 397.8651462216492 246.52084745961636 381.5956804986171 274.9924124749224 445.5438625335993 238.4505941692183 461.81332825663134
[242.48572081441733, 421.70450437762423] -100 True
[INFO] : Récompense maximale obtenue jusqu'ici : -102.3 (épisode 3)

+------------+-------+---------------+---------------+---------------+---------------+-------------+
| Récompense | Steps |   Durée de    |  Récompense   |  Récompense   |  Récompense   | Atterissage |
|            |       |   l'épisode   |  moy. [100]   |   moy. [50]   |   moy. [25]   |             |
|  -102.300  |  24   | 0.3 secondes  |   -103.200    |   -103.200    |   -103.200    |      0      |
+------------+-------+---------------+---------------+---------------+---------------+-------------+ 

[1m[94m####################################### Episode 3 / 500 ######################################################[00m[0;0m
1
228.0 413.0 268.0 413.0 268.0 483.0 228.0 483.0
[248.0, 448.0] -0.1 False
0
226.7558015783280

100% (300 of 300) |######################| Elapsed Time: 0:00:00 Time:  0:00:00
  0% (0 of 300) |                        | Elapsed Time: 0:00:00 ETA:  --:--:--

210.23028465006402 342.5368798750073 248.68075248759703 331.5113856423273 267.97536739478704 398.7997043580097 229.524899557254 409.8251985906897
[239.10282602242552, 370.6682921165085] -0.1 False
0
208.95708389902418 342.61059741738194 246.99934455083059 330.249917642384 268.630534157077 396.8238737830448 230.58827350527054 409.18455355804275
[238.7938090280506, 369.71723560021337] -100 True
[INFO] : Récompense maximale obtenue jusqu'ici : -102.3 (épisode 4)

+------------+-------+---------------+---------------+---------------+---------------+-------------+
| Récompense | Steps |   Durée de    |  Récompense   |  Récompense   |  Récompense   | Atterissage |
|            |       |   l'épisode   |  moy. [100]   |   moy. [50]   |   moy. [25]   |             |
|    -106    |  61   | 0.7 secondes  |   -104.200    |   -104.200    |   -104.200    |      0      |
+------------+-------+---------------+---------------+---------------+---------------+-------------+ 

[1m[94m###################

Exception in Tkinter callback
Traceback (most recent call last):
  File "C:\Users\ngong\anaconda3\lib\tkinter\__init__.py", line 1892, in __call__
    return self.func(*args)
  File "C:\Users\ngong\AppData\Local\Temp\ipykernel_6204\3750952494.py", line 266, in train
    next_state, reward, done = env.step(action)
  File "C:\Users\ngong\AppData\Local\Temp\ipykernel_6204\3750952494.py", line 176, in step
    self.button_f.invoke()
  File "C:\Users\ngong\anaconda3\lib\tkinter\__init__.py", line 2672, in invoke
    return self.tk.call(self._w, 'invoke')
_tkinter.TclError: invalid command name ".!frame.!button3"
