In [None]:
#install dependencies

import retro
import time
import os
from gym import Env
from gym.spaces import MultiBinary, Box
import numpy as np
import cv2
from matplotlib import pyplot as plt
import optuna
from stable_baselines3 import PPO, A2C, DQN
from stable_baselines3.common.evaluation import evaluate_policy
import tensorboard
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.vec_env import DummyVecEnv, VecFrameStack
from stable_baselines3.common.callbacks import BaseCallback

#define the environment: observation space, action space


In [None]:
class StreetFighter(Env):

    def __init__(self):
        super().__init__() 
        #the observation space is a 84x84 box with each value correp to a colour
        self.observation_space = Box(low=0, high=255, 
                                     shape=(84,84,1), dtype=np.uint8)
        
        #12-long vector where each action corresps to a 0 or a 1
        self.action_space = MultiBinary(12)
        
        #start up an instance of the game
        #use restricted actions ensures that only valid button combinations are chosen
        #scenario argument is set to 'custom_scenario' which is where the custom reward function is defined
        self.game = retro.make(game="StreetFighterIISpecialChampionEdition-Genesis",
                               use_restricted_actions = retro.Actions.FILTERED, scenario = "custom_scenario")

    def preprocess(self, observation):
        gray = cv2.cvtColor(observation, cv2.COLOR_BGR2GRAY)
        resize = cv2.resize(gray, (84,84), interpolation= cv2.INTER_CUBIC)
        channels = np.reshape(resize, (84,84,1))
        return channels
    
    def step(self,action):
        obs, reward, done, info = self.game.step(action)
        obs = self.preprocess(obs)
    
        frame_delta = obs - self.previous_frame
        self.previous_frame = obs
        
        #reward is calculated automatically from custom_scenario.json
        return frame_delta, reward, done, info
    
    def render(self, *args, **kwargs):
        self.game.render()
        
    def reset(self):
        # Return the first frame 
        obs = self.game.reset()
        obs = self.preprocess(obs) 
        self.previous_frame = obs 
        return obs
    
    def close(self):
        self.game.close()

In [None]:
env = retro.make(game='StreetFighterIISpecialChampionEdition-Genesis')

Hyperparameter tuning

In [None]:
#logs for tensorboard data and hyperparameter models
LOG_DIR = './A2Clogs2/'
OPT_DIR = './A2Copt2/'

In [None]:
def objectiveA2C(trial):
    #generates an example set of hyperparamaters
    return {
        'n_steps': trial.suggest_int('n_steps',2048,8192),
        'gamma': trial.suggest_float('gamma',0.8,0.9999, log=True),
        'learning_rate': trial.suggest_float('learning_rate',1e-6,1e-5, log=True),
        'gae_lambda': trial.suggest_float('gae_lambda',0.8,0.99)
    }

In [None]:
def optimize_agent(trial):
    #evaluate the agent's performance when it trains using different sets of hyperparameters

    try:
        model_params = objectiveA2C(trial)

        env = StreetFighter()
        env = Monitor(env, LOG_DIR)
        env = DummyVecEnv([lambda:env])
        env = VecFrameStack(env, 4, channels_order='last')
        
        model = A2C('CnnPolicy',env, tensorboard_log=LOG_DIR, verbose=0, **model_params)
        print("model_made")
        
        model.learn(total_timesteps=100000)
        print("model learned")

        mean_reward, __ = evaluate_policy(model, env, n_eval_episodes=5)
        print(mean_reward)
        env.close()

        SAVE_PATH = os.path.join(OPT_DIR, 'trial_{}_best_model'.format(trial.number))
        model.save(SAVE_PATH)
        
        return mean_reward
    
    except Exception as e:
       return -1000
    



In [None]:
#create the experiment/study. since returning a positive value, want to maximise the function. 
study = optuna.create_study(direction='maximize')
study.optimize(optimize_agent, n_trials=10, n_jobs=1)

In [None]:
#obtain best set of hyperparameters (trial 2)
study.best_params #= {'n_steps': 7773, 'gamma': 0.8943725816637216, 'learning_rate': 7.83549809575667e-06, 'gae_lambda': 0.9202713109607763}


In [None]:
model_params = {'n_steps': 7773, 'gamma': 0.8943725816637216, 'learning_rate': 7.83549809575667e-06, 'gae_lambda': 0.9202713109607763}

In [None]:
7773//64 #= 121
121*64 #= 7744
model_params['n_steps'] =7744
model_params

In [None]:
#callbacks are functions that is passed as an argument to another function or method

class TrainAndLoggingCallback(BaseCallback):

    def __init__(self, check_freq, save_path, verbose=1):
        super(TrainAndLoggingCallback, self).__init__(verbose)
        self.check_freq = check_freq
        self.save_path = save_path

    def _init_callback(self):
        if self.save_path is not None:
            os.makedirs(self.save_path, exist_ok=True)

    def _on_step(self):
        if self.n_calls % self.check_freq == 0:
            model_path = os.path.join(self.save_path, 'best_model_{}'.format(self.n_calls))
            self.model.save(model_path)

        return True

In [None]:
CHECKPOINT_DIR = "./A2Ctrain2/"

In [None]:
#save the model every 100,000 steps at checkpoint_dir
callback = TrainAndLoggingCallback(check_freq=100000, save_path=CHECKPOINT_DIR)

Training the Model

In [None]:
#create env with preprocessing

env = StreetFighter()
env = Monitor(env, LOG_DIR)
env = DummyVecEnv([lambda:env])
env = VecFrameStack(env, 4, channels_order='last')

In [None]:
model = A2C('CnnPolicy',env, tensorboard_log=LOG_DIR, verbose=0, **model_params)

In [None]:
#train the model for 5M timesteps using the best set of hyperparameters
model.load(os.path.join(OPT_DIR, 'trial_2_best_model.zip'))
model.learn(total_timesteps=5000000, callback= callback)

Evaluating the Model

In [None]:
#input the path of the model to evaluate, then evaluate for 30eps
model.load("./A2Ctrain2/best_model_5000000.zip")

In [None]:
mean_reward,_ = evaluate_policy(model, env, n_eval_episodes=30)

In [None]:
"""remember that this reward cannot be compared to the rewards from
other models, which use other reward functions"""

mean_reward

Testing the Model

In [None]:
# Reset game to starting state
obs = env.reset()
# Set flag to flase. Then predict using the model.
done = False
for game in range(1): 
    while not done: 
        if done: 
            obs = env.reset()
        env.render()
        action = model.predict(obs)[0]
        obs, reward, done, info = env.step(action)
        print(reward)
