- Notebook réalisé par Aristide LALOUX, Hugo QUENIAT et Mohamed Ali SRIR. 

# Sujet :

Ce notebook contient le code nécessaire pour créer et entraîner un agent à jouer à Flappy Bird. Ici nous implémentons un algorithme de type Q-Learning.

Un état est caractérisé par :
- La distance horizontal entre l'oiseau et le prochain tuyau.
- La distance vertical entre l'oiseau et le milieu de la prochaine ouverture. 

A la fin de l'entraînement, l'agent devrait jouer l'action optimale à chaque instant dans le jeu. 

# Bibliographie



Environnement : https://github.com/Talendar/flappy-bird-gym

Code Q Learning : On s'est inspiré de celui du CartPole. 

# Les imports

In [None]:
#Nous installons le jeu.
#Sur windows, écrire !pip plutôt que !{sys.executable}

#!{sys.executable} -m pip install flappy-bird-gym
#!{sys.executable} -m pip install torch

!pip install flappy-bird-gym
!pip install torch

In [3]:
#Nous importons les librairies utiles.
#La bibliothèque GYM contient l'environnement de flappy bird. C'est à dire les états, les actions, les rewards
import gym 
import flappy_bird_gym

#On utilise numpy pour les matrices 
import numpy as np

#Random pour les politiques epsilon-greedy.
import random

#Maths pour faire des fonctions avancées
import math

#Pour l'affichage des graphiques
import matplotlib.pyplot as plt

#Time pour faire des sleep(n) et ainsi gérer le nombre de FPS du jeu. 
import time

# Création et inspection de l'environnement


Un état : 
- La distance horizontale entre l'oiseau et le prochain tuyau.
- La distance verticale entre l'oiseau et le milieu de la prochaine ouverture. 

Les actions :
- NOOP : Ne rien faire.
- JUMP : Faire un saut.

Rewards : 
- Distance horizontale parcourue entre deux états
- Nous modifions l'environnement gym pour mettre une récompense négative lorsque l'agent meurt. Ainsi, on espère que l'agent apprendra qu'il ne faut pas mourir dans le jeu.

In [4]:
#Création de l'environnement Gym du jeu Flappy Bird
env = flappy_bird_gym.make("FlappyBird-v0")

In [4]:
#Ici nous enquêtons sur la nature de l'environnement Flappy Bird de Gym en affichant les intervalles d'évolution de DeltaX et DeltaY qui caractérisent nos états
state_value_bounds = list(zip(env.observation_space.low, 
                              env.observation_space.high)) 

print(state_value_bounds)
print(len(state_value_bounds))
print(np.shape(state_value_bounds))
print(state_value_bounds[0][0])
#On remarque que DeltaX et DeltaY évolue dans R tout entier. 
#Nous allons donc discrétiser l'espace de jeu.

[(-inf, inf), (-inf, inf)]
2
(2, 2)
-inf


In [None]:
#Code pour tester que l'environnement fonctionne correctement.
# L'agent prend ici des actions totalement aléatoires.
#On remarque qu'il finit au sommet de la zone de jeu, le saut a beaucoup plus "d'impact" que le "ne rien faire".

#Boucle infinie : le jeu se répète tant que la cellule n'est pas interrompue.
while True :
    #Mise en place d'une séquence de jeu
	obs = env.reset()
    #Boucle infinie qui peut seulement être brisée si l'agent meurt.
	while True:
		# On sélectionne la prochaine action.
        # La commande "env.action_space.sample" permet de tirer au sort une action parmi l'ensemble des actions possibles de l'environnement
		action = env.action_space.sample() 
		
		# L'environnement fait éxécuter l'action par l'agent.
		obs, reward, done, info = env.step(action)
		
		# print(obs) permet d'afficher les états que rencontre l'agent à chaque étape.
        # Affichage du jeu
		env.render()

		time.sleep(1 / 60)  # FPS
		
		# Vérification de si l'agent est toujours en vie
		if done:
			break


On remarque (en utilisant la ligne "print(obs)") que les états parcourus par l'agent sont en réalité des états (x,y) tels que x est dans [0,2] et y dans ]-1,1[]. Ceci va guider la création de la fonction suivante.

In [None]:
# Combien de décimales souhaitez-vous conserver pour les états (x,y) ?

Precision_DeltaX = 2
Precision_DeltaY = 1

# Avec les valeurs ci-dessus, nous avons pu observer un compromis correct entre convergence rapide et performances de l'agent.
#Cependant, il faut faire attention à ne pas discrétiser trop précisément l'espace de jeu car cela engendrerait trop d'états et ainsi la matrice Q convergerait
#trop lentement vers la matrice Q*. Mais d'un autre côté, si on ne discrétise pas suffisament, il n'y aura pas assez d'état, et donc l'agent ne sera pas précis
#au pixel près. Il faut trouver empiriquement un juste milieu.

In [5]:
#Fonction de discrétisation
#Bucketize signifie que nous discrétisons l'espace de jeu qui est à l'origine continu. 
def bucketize_state_value(state):
	x,y = state
	return (math.floor(((10**(Precision_DeltaX-1))*x)),math.floor((10**(Precision_DeltaY))*y))

#La fonction ci-dessus renvoie les valeurs discrétisées de DeltaX et DeltaY. 




In [6]:
#Création de la matrice Q, ses dimensions sont en adéquation avec les précisions choisies ci-dessus.
#Plus la précision est élevée, plus Q est de grande dimension.
#env.action_space.n donne le nombre d'actions possibles.
#Ainsi on a bien une valeur pour chaque couple (s,a) avec s=(DeltaX,DeltaY)
q_value_table = np.zeros((2*(Precision_DeltaX-1),2*(Precision_DeltaY)) + (env.action_space.n,))

In [7]:
# Paramètres définis par l'utilisateur pour l'algo de Q learning
max_episodes = 5000000 #Nombre de parties que l'agent va jouer pendant l'entrainement 
streak_to_end = 120 #Pour gagner, il faut que l'agent gagne 120 parties d'affilée. 
solved_time = 500  #Nombre de décisions que l'agent peut prendre pour une partie. Au delà, on considère que l'agent a résolu le jeu.
discount = 0.9 #Facteur de discount dans le Q Learning
no_streaks = 0 #Au début, l'agent n'a pas encore résolu le jeu, donc notre streak / nombre de parties gagnées d'affilée vaut 0. 

# Epsilon Greedy Policy

In [8]:
# Sélection d'une action : Exploration ou Exploitation
# Méthode epsilon-greedy

min_explore_rate = 0.1 
min_learning_rate = 0.07
explore_rate_decay = 0.999998 #Raison de la suite géométrique que suit l'exploration_rate

def select_action(state_value, explore_rate):
	if random.random() < explore_rate:
		#Exploration, on explore l'environnement.
		if random.random() < 0.95: #Pour éviter que l'oiseau se retrouve haut dans le ciel comme on l'a observé dans le test aléatoire. Dans 95% des cas, on ne fait rien et dans 5% des
		#cas on saute. En moyenne l'oiseau reste à peu prêt à la même altitude. 
			action = 0 #Action NOOP
		else:
			action = 1 #Action JUMP
	else:
		action = np.argmax(q_value_table[state_value])  # Exploitation, on prend l'action qui maximise le gain. 
	return action

def select_explore_rate(x):
    # Actualisation de l'exploration rate au fur et à mesure.
    return max(min_explore_rate, explore_rate_decay*x)

def select_learning_rate(x):
    # Actualisation du learning_rate au fur et à mesure.
    return max(min_learning_rate, min(1.0, 1.0 - math.log10((x+1)/25)))

# Entrainement de l'agent

In [None]:
#Code à remplir, modification de la table Q selon l'algorithme de Q-Learning

def update_q_table(.....): 

In [None]:
#Correction du code ci-dessus
def update_q_table(previous_state, current_state, action, learning_rate, reward):
        best_q_value = np.max(q_value_table[current_state])
        q_value_table[previous_state][action] += learning_rate * (
            reward + discount * best_q_value - 
            q_value_table[previous_state][action])



In [None]:
#On commence avec un grand Exploration Rate puis il va diminuer avec le temps, selon une décroissance géométrique de raison "explore_rate_decay"

explore_rate = 1


# Lancement de l'entrainement


for episode_no in range(max_episodes):

    #Pour chaque partie, on calcule les nouveaux exploration et learning rates.  
    explore_rate = select_explore_rate(explore_rate)
    learning_rate = select_learning_rate(episode_no)



    # On reset l' environment avant le début de la partie.
    observation = env.reset()
    
    #On discrétise l'état (X,Y) avec notre fonction au dessus. 
    start_state_value = bucketize_state_value(observation)
    previous_state_value = start_state_value



    done = False 
    time_step = 0  #nombre de décisions effectuées par l'agent

    #Tant que la partie n'est pas terminée, l'agent joue et essaye de faire le meilleur score possible à l'aide de l'algo du Q Learning. 
    while not done: 
        #Pour afficher l'agent en train de jouer.
        #env.render()  

        #On choisit l'action selon notre politique epsilon greedy. 
        action = select_action(previous_state_value, explore_rate)

        #L'agent joue l'action et il reçoit le nouvel état la récompense associée, s'il est mort ou non et son score.
        observation, reward_gain, done, info = env.step(action)

        #On discrétise le nouvel état
        state_value = bucketize_state_value(observation)

        
        reward_gain= 100*max(0,previous_state_value[0]-state_value[0]) - 10000 * done
        #reward_gain= - 10000 * done
        #Ainsi le Gain est très mauvais si l'agent meurt car done passe à 1
        #Le gain vaut 0 si l'agent lorsque le pipe de référence change (on passe au suivant)

        #Appliquer l'algorithme de Q-Learning en mettant à jour la table Q
        update_q_table(...)

        previous_state_value = state_value
  
        time_step += 1
        
    
    if time_step >= solved_time: #Si la partie dure plus de solved_time actions, alors on considère que la partie est gagnée
        #Etant donné que le premier tuyau et le n-ième tuyau est identique, on peut s'arrêter à 199 actions
        no_streaks += 1
    else:
        no_streaks = 0
    
    if no_streaks > streak_to_end: #puis si l'agent gagne plus de streak_to_win parties d'affilé, alors l'entrainement est terminé. 
        print('Après {} épisodes, le fappy bird est résolu.'.format(episode_no))
        break

    # Données à afficher en console, avec un certain pas (de 100 épisodes ici)
    if episode_no % 100 == 0:  
        print('Episode {} terminé après {} décisions, un exploration_rate de {}'.format(episode_no, time_step, explore_rate))
    
env.close()

# Save Q Tensor


Nous pouvons sauvegarder dans le fichier "Qtensor.pt" la matrice Q pour l'utiliser ultérieurement.

In [None]:

import torch
print(q_value_table)
torch.save(q_value_table, 'Qtensor.pt')

# Load Q Tensor


Nous pouvons load une matrice Q Tensor déjà entrainée pour la tester.

In [None]:
import torch
q_value_table = torch.load("Qtensor.pt")
print(q_value_table)