# Setup StreetFighter Console and ROM

In [None]:
# Import retro to play the game
import retro
import imageio

In [None]:
# See the different retro games
retro.data.list_games()

**Importing the ROM**

In [None]:
# python -m retro.import .

In [None]:
# Creating the game environment
env = retro.make(game='StreetFighterIISpecialChampionEdition-Genesis')

In [None]:
# Close the game environment 
env.close()

In [None]:
# Get the Observation Space
env.observation_space

In [None]:
# Get the Action Space
env.action_space

In [None]:
# Sample the actions space
env.action_space.sample()

**Running env with Random Policy**

In [None]:
obs = env.reset() # Reset the game environment

done = False # Looping through the game
for game in range(1): 
    while not done: 
        if done: 
            obs = env.reset()
        obs, reward, done, info = env.step(env.action_space.sample())
        print(reward)

In [None]:
env.close()

In [None]:
info # Get the info of game running on random policy

# Setup Custom Environment

In [None]:
# Importing the necessary libraries
from gym import Env 
from gym.spaces import MultiBinary, Box
import numpy as np
import cv2
from matplotlib import pyplot as plt

# Create a custom environment class
class StreetFighter(Env):
    def __init__(self):
        # Initialize the parent class
        super().__init__()
        
        # Define the action space and observation space
        # The action space is a 12-dimensional binary vector
        self.action_space = MultiBinary(12)
        # The observation space is an 84x84 grayscale image with values ranging from 0 to 255
        self.observation_space = Box(low=0, high=255, shape=(84, 84, 1), dtype=np.uint8)
        
        # Initialize an instance of the game
        self.game = retro.make(game='StreetFighterIISpecialChampionEdition-Genesis', use_restricted_actions=retro.Actions.FILTERED)
        
    def reset(self):
        # Reset the game and return the first frame
        obs = self.game.reset()
        obs = self.preprocess(obs)
        self.previous_frame = obs
        
        # Initialize the score delta attribute
        self.score = 0
        
        return obs
    
    def preprocess(self, observation):
        # Preprocess the observation by grayscaling, resizing, and adding a channel dimension
        gray = cv2.cvtColor(observation, cv2.COLOR_BGR2GRAY)
        resize = cv2.resize(gray, (84,84), interpolation=cv2.INTER_CUBIC)
        channels = np.reshape(resize, (84,84,1))
        
        return channels
    
    def step(self, action):
        # Take a step in the game and preprocess the resulting observation
        obs, reward, done, info = self.game.step(action)
        obs = self.preprocess(obs)
        
        # Calculate the frame delta
        frame_delta = obs - self.previous_frame
        self.previous_frame = obs
        
        # Reshape the reward function
        reward = info['score'] - self.score
        self.score = info['score']
        
        return frame_delta, reward, done, info
    
    def render(self, *args, **kwargs):
        # Render the game
        self.game.render()
        
    def close(self):
        # Close the game
        self.game.close()

In [None]:
env = StreetFighter()

In [None]:
env.observation_space.shape

In [None]:
env.action_space.shape

In [None]:
obs = env.reset()

In [None]:
obs, reward, done, info = env.step(env.action_space.sample())

In [None]:
plt.imshow(cv2.cvtColor(obs, cv2.COLOR_BGR2RGB))

The image is converted to RGB to visualize the difference in frames clearly as it would be not visible in grayscale

# Hyperparameter tune

In [None]:
!pip install torch==1.10.2+cu113 torchvision==0.11.3+cu113 torchaudio===0.10.2+cu113 -f https://download.pytorch.org/whl/cu113/torch_stable.html

In [None]:
!pip3 install torch==1.11.0 torchvision==0.12.0 torchaudio==0.11.0 --extra-index-url https://download.pytorch.org/whl/cu113

In [None]:
!pip install stable-baselines3 optuna

In [None]:
# Importing the optimzation library
import optuna

# PPO algo from stable baselines
from stable_baselines3 import PPO

# Bring in the eval policy function
from stable_baselines3.common.evaluation import evaluate_policy

# Import the sb3 monitor to log the training
from stable_baselines3.common.monitor import Monitor

# Import the vec wrappers to stack frames and vectorize the environment
from stable_baselines3.common.vec_env import DummyVecEnv, VecFrameStack

import os

In [None]:
LOG_DIR = './logs/' # Define the log directory
OPT_DIR = './opt/' # Define the optimization directory

In [None]:
# Function to optimize the PPO hyperparameters
def optimize_ppo(trial): 
    return {
        'n_steps':trial.suggest_int('n_steps', 2112, 8192),
        'gamma':trial.suggest_loguniform('gamma', 0.7, 0.9999),
        'learning_rate':trial.suggest_loguniform('learning_rate', 1e-5, 1e-45),
        'clip_range':trial.suggest_uniform('clip_range', 0.1, 0.3),
        'gae_lambda':trial.suggest_uniform('gae_lambda', 0.8, 0.99)
    }

In [None]:
# Function to optimize the agent 
def optimize_agent(trial):
    try:
        model_params = optimize_ppo(trial) 

        # Create environment 
        env = StreetFighter()
        env = Monitor(env, LOG_DIR)
        env = DummyVecEnv([lambda: env])
        env = VecFrameStack(env, 4, channels_order='last')

        # Create algo 
        model = PPO('CnnPolicy', env, tensorboard_log=LOG_DIR, verbose=0, **model_params)
        model.learn(total_timesteps=100000)

        # Evaluate model 
        mean_reward, _ = evaluate_policy(model, env, n_eval_episodes=5)
        env.close()

        SAVE_PATH = os.path.join(OPT_DIR, 'trial_{}_best_model'.format(trial.number))
        model.save(SAVE_PATH)

        return mean_reward

    except Exception as e:
        return -1000

In [None]:
# Creating the optimizer
optimizer = optuna.create_study(direction='maximize')
optimizer.optimize(optimize_agent, n_trials=100, n_jobs=10)

In [None]:
optimizer.best_params # Get the best parameters

In [None]:
optimizer.best_trial # Get the best trial
model = PPO.load(os.path.join(OPT_DIR, 'trial_24_best_model.zip')) # Load the best model

# Setup Callback

In [None]:
# Importing callback 
from stable_baselines3.common.callbacks import BaseCallback

In [None]:
class TrainAndLoggingCallback(BaseCallback):

    def __init__(self, check_freq, save_path, verbose=1):
        super(TrainAndLoggingCallback, self).__init__(verbose)
        self.check_freq = check_freq
        self.save_path = save_path

    def _init_callback(self):
        if self.save_path is not None:
            os.makedirs(self.save_path, exist_ok=True)

    def _on_step(self):
        if self.n_calls % self.check_freq == 0:
            model_path = os.path.join(self.save_path, 'best_model_{}'.format(self.n_calls))
            self.model.save(model_path)

        return True

In [None]:
train_dir = './train/'
callback = TrainAndLoggingCallback(check_freq=10000, save_path=train_dir)

# Train Model

In [None]:
# Create environment 
env = StreetFighter()
env = Monitor(env, LOG_DIR)
env = DummyVecEnv([lambda: env])
env = VecFrameStack(env, 4, channels_order='last')

In [None]:
model_params = study.best_params
model_params['n_steps'] = 7488  # set n_steps to a factor of 64
model_params

In [None]:
model = PPO('CnnPolicy', env, tensorboard_log=LOG_DIR, verbose=1, **model_params)

In [None]:
# Reload last weights
model.load(os.path.join(OPT_DIR, 'trial_24_best_model.zip'))

# Train the model
model.learn(total_timesteps=5000000, callback=callback)

# Evaluate the Model

In [None]:
model = PPO.load('./opt/trial_24_best_model.zip')

# Evaluate the model
mean_reward, _ = evaluate_policy(model, env, render=True, n_eval_episodes=1)

In [None]:
mean_reward

# Test out the Model

In [None]:
obs = env.reset()

In [None]:
obs.shape

In [None]:
env.step(model.predict(obs)[0])

In [None]:
# Create the test environment
env = make_vec_env(1)

# Loop through the games
games = 5
for i in range(games):
    # Reset game to starting state
    images = []
    obs = env.reset()
    done = False
    while not done:
        action, _ = model.predict(obs)
        obs, reward, done, info = env.step(action)
        img = env.render(mode='rgb_array')
        images.append(img)
    
    # Make a GIF for each game
    imageio.mimsave(f'./replays/game_{i + 1}.gif', images, fps=24)
env.close()

In [None]:
# Reset game to starting state
obs = env.reset()
# Set flag to flase
done = False
for game in range(1): 
    while not done: 
        if done: 
            obs = env.reset()
        env.render()
        action = model.predict(obs)[0]
        obs, reward, done, info = env.step(action)
        print(reward)

**Code referenced from - PPO stable baselines documentations, https://github.com/thuongmhh/Street-Fighter-AI and https://github.com/linyiLYi/street-fighter-ai/blob/master/main/train.py**