In [None]:
# Permet de jouer au jeu
import retro
# Permet de ralentir la vitesse 
import time

## Preprocess

In [None]:
from gym import Env
from gym.spaces import MultiBinary, Box
import numpy as np
import cv2
import matplotlib.pyplot as plt

In [None]:
# Environnement Customisé
class StreetFighter(Env) : 
    def __init__(self) :
        super().__init__()
        # A fixer inférieure à 200
        self.taille_reduite = 84
        # Specification de l'espace des actions et de l'espace d'observation
        # low = 0, high = 255 : couleur pixels par défaut
        # shape : shape de la sortie par défaut (hauteur, largeur, Gris)
        self.observation_space = Box(low=0,high=255,shape=(self.taille_reduite,self.taille_reduite,1),dtype=np.uint8)
        # action_space = MultiBinary(12) : 12 touches possibles et combinables pour faire des coups spéciaux
        self.action_space = MultiBinary(12)
        # Lancer une instance du jeu et ne permet que les combinaisons valides de boutons
        self.game = retro.make(game='StreetFighterIISpecialChampionEdition-Genesis', use_restricted_actions = retro.Actions.FILTERED)
    
    def step(self,action):
        # Faire une étape 
        observation, reward, done, info = self.game.step(action)
        observation = self.preprocess(observation)
        
        # Fonction de récompense
        reward = info['score'] - self.score
        self.score = info['score']
        
        return observation, reward, done, info
    
    def render(self,*args,**kwargs):
        self.game.render()
    
    def reset(self):
        # Remet le jeu à zéro
        observation = self.game.reset()
        # Preprocess l'image obtenue
        observation = self.preprocess(observation)
        # Cette variable va permettre de stocker la récompense obtenue pour la partie
        self.score = 0
        return observation
    
    def preprocess(self,observation):
        # Transformation de l'image RGB en nuance de gris => Entraînement plus rapide
        image_gris = cv2.cvtColor(observation,cv2.COLOR_RGB2GRAY)
        # Modifier la taille de l'image => Entraînement plus rapide
        image_retaillee = cv2.resize(image_gris,(self.taille_reduite,self.taille_reduite), interpolation = cv2.INTER_CUBIC)
        # Specificité pour stable_baselines
        image_retaillee_gris_finale = np.reshape(image_retaillee,(self.taille_reduite,self.taille_reduite,1))
        return image_retaillee_gris_finale
    
    def close(self):
        self.game.close()

## HyperParamètres

In [None]:
import optuna
from stable_baselines3 import PPO
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.vec_env import DummyVecEnv, VecFrameStack
import os

In [None]:
LOG_DIR = './logs/'
OPT_DIR = './opt/'

In [None]:
def optimize_ppo(trial):
    return {
        'n_steps':trial.suggest_int('n_steps',2048,8192),
        'gamma':trial.suggest_loguniform('gamma',0.8,0.9999),
        'learning_rate':trial.suggest_loguniform('learning_rate',1e-9,1e-4),
        'clip_range':trial.suggest_uniform('clip_range',0.1,0.4),
        'gae_lambda':trial.suggest_uniform('gae_lambda',0.8,0.99)
    }

def optimize_agent(trial):
    try:
        # Récupère les hyperparamètres pour le modèle
        model_params = optimize_ppo(trial)

        # Crée un environnement
        env = StreetFighter()
        # Permet d'extraire la recompense moyenne et la longueur moyenne d'un episode
        env = Monitor(env,LOG_DIR)
        #Nécessaire pour Stable Baselines
        env = DummyVecEnv([lambda: env])
        #Empile 4 images consécutives pour donner la perception de mouvement
        env = VecFrameStack(env, 4, channels_order='last')

        # Déclaration d'une nouvelle instance de l'algorithme RL avec les paramètres donnés par Optuna
        model = PPO('CnnPolicy', env, tensorboard_log=LOG_DIR, verbose=0, **model_params)
        #Entraînement du modèle
        model.learn(total_timesteps=50000)
        #model.learn(total_timesteps=100000)

        # Evaluation du modèle sur 5 partie différentes
        mean_reward, _ = evaluate_policy(model,env,n_eval_episodes=1)
        env.close()

        # Sauvegarde du modèle
        SAVE_PATH = os.path.join(OPT_DIR,'trial_{}_best_model_ppo'.format(trial.number))
        model.save(SAVE_PATH)

        return mean_reward
        
    except Exception as e:
        return -1000

In [None]:
# On veut maximiser la récompense
study = optuna.create_study(direction='maximize')

# On va chercher le meilleur set d'hyperparamètres
study.optimize(optimize_agent,n_trials=10,n_jobs=1)
#study.optimize(optimize_agent,n_trials=100,n_jobs=1)

## Définition du CallBack

In [None]:
from stable_baselines3.common.callbacks import BaseCallback

In [None]:
class TrainAndLoggingCallback(BaseCallback):
    def __init__(self,check_freq,save_path, verbose=1):
        super(TrainAndLoggingCallback,self).__init__(verbose)
        self.check_freq = check_freq
        self.save_path = save_path

    def _init_callback(self):
        if self.save_path is not None:
            os.makedirs(self.save_path,exist_ok=True)

    def _on_step(self):
        if self.n_calls % self.check_freq == 0:
            model_path = os.path.join(self.save_path,'best_model_ppo_{}'.format(self.n_calls))
            self.model.save(model_path)

        return True

In [None]:
CHECKPOINT_DIR = './train/'

In [None]:
callback = TrainAndLoggingCallback(check_freq=50000, save_path = CHECKPOINT_DIR)

## Préparation de l'environnement

In [None]:
# Crée un environnement
env = StreetFighter()
# Permet d'extraire la recompense moyenne et la longueur moyenne d'un episode
env = Monitor(env,LOG_DIR)
#Nécessaire pour Stable Baselines
env = DummyVecEnv([lambda: env])
#Empile 4 images consécutives pour donner la perception de mouvement
env = VecFrameStack(env, 4, channels_order='last')

## Entrainement du modèle avec les paramètres sélectionné via Optuna

In [None]:
# Récupération des meilleurs paramètres
model_params = study.best_params
# Pour avoir des batch de bonnes tailles
model_params['n_steps']= int(model_params['n_steps']/64)*64

In [None]:
# Construction du modèle avec les meilleurs hyperparam
model = PPO('CnnPolicy',env,tensorboard_log=LOG_DIR,verbose=1,**model_params)

model.learn(total_timesteps = 5000000, callback = callback)
env.close()