# **Setup Game Environment:**

- pip install gym==0.21.0
- pip install gym gym-retro
- pip install importlib-metadata==4.13.0
- python -m retro.import .

In [1]:
#import retro: to run the game disk
import retro
#import time: to slow down game render time
import time

In [None]:
retro.data.list_games()

In [None]:
#Setup game environment
env = retro.make(game='StreetFighterIISpecialChampionEdition-Genesis')

In [None]:
env.action_space.sample()

In [None]:
#Try to run an instance of the game and observe the entity random actions:
obs = env.reset()
done = False
for game in range(1):
    while not done:
        if done:
            obs = env.reset()
        env.render()
        obs, reward, done, info = env.step(env.action_space.sample())
        time.sleep(0.01)
        print(reward)

In [None]:
#closes any running game environment
env.close()

In [None]:
info

# **Setup Learning environment for Training:**

Data Preprocessing the observed environment:
- Colour to grayscale
- Resize the frame for effeciency
- Calculate delta frame: change in subsequent frames
- Tweak the reward function

In [45]:
# Import environment base class for a wrapper
from gym import Env
# Import the space shapes for the environment
from gym.spaces import MultiBinary, Box
# Import numpy to calculate frame delta
import numpy as np
# Import opencv for grayscaling
import cv2
# Import matplotlib for plotting the image
from matplotlib import pyplot as plt

In [None]:
plt.imshow(obs)

In [30]:
# Create custom environment
class StreetFighter(Env):
    def __init__(self):
        super().__init__()
        # Specify action and observation space  
        self.observation_space = Box(low=0, high=255, shape=(84, 84, 1), dtype=np.uint8)
        self.action_space = MultiBinary(12)
        # Startup an instance of the game
        self.game = retro.make(game='StreetFighterIISpecialChampionEdition-Genesis', use_restricted_actions = retro.Actions.FILTERED)
        # Define render_mode attribute
        self.render_mode = 'human' 
    
    def reset(self):
        # Returning the first frame
        obs = self.game.reset()
        # Pass the observation to preprocess method
        obs = self.preprocess(obs)
        self.previous_frame = obs
        # Create a attributr to hold the score delta
        self.score = 0
        return obs
    
    def preprocess(self, observation):
        # Convert observation to Grayscale
        gray = cv2.cvtColor(observation, cv2.COLOR_BGR2GRAY)
        # Resize the observation Space
        resize = cv2.resize(gray, (84, 84), interpolation = cv2.INTER_CUBIC)
        # Add the channels value
        channels = np.reshape(resize, (84,84,1))
        return channels
        
    def step(self, action):
        # Take a step
        obs, reward, done, info = self.game.step(action)
        obs = self.preprocess(obs)

        # Frame delta
        frame_delta = obs - self.previous_frame
        self.previous_frame = obs

        # Reshape the reward function
        reward = info['score'] - self.score
        self.score = info['score']

        return frame_delta, reward, done, info

    def render(self, *args, **kwargs):
        self.game.render()

    def close(self):
        self.game.close()

In [None]:
# Create a new instance of the environment
env = StreetFighter()

In [None]:
# Cross checking the shape of obserbation_space
env.observation_space.shape

In [None]:
# Cross checking the shape of the action_space
env.action_space.shape

In [None]:
#Try to run an instance of the game and observe the entity random actions:
obs = env.reset()
done = False
for game in range(1):
    while not done:
        if done:
            obs = env.reset()
        env.render()
        obs, reward, done, info = env.step(env.action_space.sample())
        time.sleep(0.01)
        if reward > 0:
            print(reward)

In [None]:
# Validating frame delta
obs = env.reset()
obs, reward, done, info = env.step(env.action_space.sample())
plt.imshow(cv2.cvtColor(obs, cv2.COLOR_BGR2RGB))

# **Hyperparameter tuning:**

Install required libraries for training:
- pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118pip 
- pip install stable-baselines3[extra] optuna

In [31]:
import os
# Importing optimization framework - HPO
import optuna
# PPO algorithm for Reinforcement Learning
from stable_baselines3 import PPO
# Eval policy method for metric calculation
from stable_baselines3.common.evaluation import evaluate_policy
# Import the sb3 monitor for logging
from stable_baselines3.common.monitor import Monitor
# Import the vec wrappers to vectorize and frame stack
from stable_baselines3.common.vec_env import DummyVecEnv, VecFrameStack

In [32]:
# Setup directories for logs and models
LOG_DIR = './logs/'
OPT_DIR = './opt/'

In [39]:
# Function to return test hyperparamenters - define the objective function
"""def optimize_ppo(trial):
    return{
        'n_steps':trial.suggest_int('n_steps', 2048, 8192),
        'gamma': trial.suggest_loguniform('gamma', 0.8, 0.9999),
        'learning_rate': trial.suggest_loguniform('learning_rate', 1e-5, 1e-4),
        'clip_range': trial.suggest_uniform('clip_range', 0.1, 0.4),
        'gae_lambda': trial.suggest_uniform('gae_lambda', 0.8, 0.99)
    }"""

def optimize_ppo(trial):
    try:
        hyperparameters = {
            'n_steps': trial.suggest_int('n_steps', 2048, 8192),
            'gamma': trial.suggest_float('gamma', 0.8, 0.9999, log=True),
            'learning_rate': trial.suggest_float('learning_rate', 1e-5, 1e-4, log=True),
            'clip_range': trial.suggest_float('clip_range', 0.1, 0.4),
            'gae_lambda': trial.suggest_float('gae_lambda', 0.8, 0.99)
        }
        print("Hyperparameters suggested:", hyperparameters)
        return hyperparameters
    except Exception as e:
        print("Error in hyperparameter suggestion:", e)
        return None


In [42]:
# Run a training loop and return mean reward
def optimize_agent(trial):
    try:
        model_params = optimize_ppo(trial)
        if model_params is None:
            raise ValueError("Model parameters cannot be None.")
        
        # Create environment
        env = StreetFighter()
        env = Monitor(env, LOG_DIR)
        env = DummyVecEnv([lambda: env])
        env = VecFrameStack(env, 4, channels_order='last')  # Make sure observation space is compatible
        
        # Create algorithm
        model = PPO('CnnPolicy', env, tensorboard_log=LOG_DIR, verbose=0, **model_params)
        model.learn(total_timesteps=30000)
        
        # Evaluate model
        mean_reward, _ = evaluate_policy(model, env, n_eval_episodes=1)
        env.close()

        SAVE_PATH = os.path.join(OPT_DIR, 'trial_{}_best_model'.format(trial.number))
        model.save(SAVE_PATH)
        return mean_reward
    
    except Exception as e:
        print("Error during optimization:", e)
        return None


In [44]:
# Create the experiment
study = optuna.create_study(direction='maximize')
study.optimize(optimize_agent, n_trials=1, n_jobs=1)

[I 2024-04-01 19:06:37,711] A new study created in memory with name: no-name-943e2a36-e90c-43f1-8ef3-04ae9101b06c


Hyperparameters suggested: {'n_steps': 5655, 'gamma': 0.9859996190543577, 'learning_rate': 7.96711746543737e-05, 'clip_range': 0.1206064656113395, 'gae_lambda': 0.9210014495830235}


[W 2024-04-01 19:06:37,954] Trial 0 failed with parameters: {'n_steps': 5655, 'gamma': 0.9859996190543577, 'learning_rate': 7.96711746543737e-05, 'clip_range': 0.1206064656113395, 'gae_lambda': 0.9210014495830235} because of the following error: The value None could not be cast to float..
[W 2024-04-01 19:06:37,954] Trial 0 failed with value None.


Error during optimization: VecFrameStack only works with gym.spaces.Box and gym.spaces.Dict observation spaces
