In [1]:
!python --version


Python 3.7.16


In [2]:
!pip install gym-retro



In [None]:
import gym
import retro
import numpy

In [None]:
#list of games in retro
retro.data.list_games()

In [None]:
env = retro.make(game='StreetFighterIISpecialChampionEdition-Genesis')

In [None]:
env.close() #to avoid multiple emulator instances 

In [None]:
env.action_space.sample()

In [None]:
#testing game random starting state
obs = env.reset()
done = False 
for game in range(1):
    while not done:
        if done:
            obs = env.reset()
        env.render()
        obs, reward, done, info = env.step(env.action_space.sample())
        print(reward)

In [None]:
env.close()

In [None]:
from gym import Env


In [None]:
#reward function
class StreetFighter(Env):
    def __init__(self):
        super().__init__()
        self.game= retro.make(game='StreetFighterIISpecialChampionEdition-Genesis', use_restricted_actions=retro.Actions.FILTERED)
    def reset(self):
        obs = self.game.reset()
        self.previous_frame = obs
        
        self.score = 0 #score delta
        return obs

    def step (self, action):
        obs, reward, done, info = self.game.step(action)
        reward = info['score'] - self.score
        self.score = info['score']
        
        return reward, done, info
    
    def render(self, *arg, *kwarg):
        self.game.render()
    
    def close(self):
        self.game.close

In [None]:
env = StreetFighter() 

In [None]:
#install pytorch
!pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu117
#install Stable-baseline3
!pip install stable-baselines3[extra] optuna

In [None]:
import optuna #optimization frame
import os
from stable_baselines3 import PPO #PPO algorithm for RL
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.vec_env import DummyVecEnv, VecFrameStack


In [None]:
#Saving a model after each training
LOG_DIR = './logs/'
OPT_DIR = './opt/'

In [None]:
#using optuna for optimization
def optimize_ppo(trial): 
    return {
        'n_steps':trial.suggest_int('n_steps', 2048, 8192), #no. of eps within training PPO
        'gamma':trial.suggest_loguniform('gamma', 0.8, 0.9999), #reducing future rewards from PPO models
        'learning_rate':trial.suggest_loguniform('learning_rate', 1e-5, 1e-4), 
        'clip_range':trial.suggest_uniform('clip_range', 0.1, 0.4),
        'gae_lambda':trial.suggest_uniform('gae_lambda', 0.8, 0.99)
    }

In [None]:
#training loop and returning mean reward
 
def optimize_agent(trial):
    try:
        model_param = optimize_ppo(trial) 

        # Creating environment 
        env = StreetFighter()
        env = Monitor(env, LOG_DIR)
        env = DummyVecEnv([lambda: env])
        env = VecFrameStack(env, 4, channels_order='last')

        
        model = PPO('CnnPolicy', env, tensorboard_log=LOG_DIR, verbose=0, **model_param)
        model.learn(total_timesteps=3000)
     

        # Evaluating model on 5 different games
        mean_reward, _ = evaluate_policy(model, env, n_eval_episodes=5)
        env.close()

        SAVE_PATH = os.path.join(OPT_DIR, 'trial_{}_best_model'.format(trial.number))
        model.save(SAVE_PATH)

        return mean_reward

    except Exception as e:
        return -1000

In [None]:
# Creating experiment 
study = optuna.create_study(direction='maximize')
study.optimize(optimize_agent, n_trials=20, n_jobs=1)


In [None]:
study.best_param

In [None]:
study.best_trial
model = PPO.load(os.path.join(OPT_DIR, 'trial_5_best_model.zip'))#best trial   

In [None]:
#base callback for autosaving #(Stackoverflow)
from stable_baselines3.common.callbacks import BaseCallback


class TrainAndLoggingCallback(BaseCallback):

    def __init__(self, check_freq, save_path, verbose=1):
        super(TrainAndLoggingCallback, self).__init__(verbose)
        self.check_freq = check_freq
        self.save_path = save_path

    def _init_callback(self):
        if self.save_path is not None:
            os.makedirs(self.save_path, exist_ok=True)

    def _on_step(self):
        if self.n_calls % self.check_freq == 0:
            model_path = os.path.join(self.save_path, 'best_model_{}'.format(self.n_calls))
            self.model.save(model_path)

        return True

In [None]:
CHECKPOINT_DIR = './train/' #saving 

In [None]:
#saving model after every 8000 steps
callback = TrainAndLoggingCallback(check_freq=8000, save_path=CHECKPOINT_DIR)

In [None]:
#Training the model
env = StreetFighter()
env = Monitor(env, LOG_DIR)
env = DummyVecEnv([lambda: env])
env = VecFrameStack(env, 4, channels_order='last')

In [None]:
#best parameters from hyperparameter tuning using optuna
model_param = study.best_param
model_param['n_steps'] = 7360  # set n_steps to a factor of 64



In [None]:
model_param

In [None]:
#new PPO algo
model = PPO('CnnPolicy', env, tensorboard_log=LOG_DIR, verbose=1, **model_param)

In [None]:
model.load(os.path.join(OPT_DIR, 'trial_5_best_model.zip'))

In [None]:

model.learn(total_timesteps=20000, callback=callback)
 