## StreetFighter Setup (Default)


In [None]:
!pip3 install gym-retro --use-pep517

#python version = 3.7.9
# libraries used: 
# gym = 0.19
# openai-gym-retro = 0.8.0
# pytorch
# optuna
# stable_baseline3 = 1.4.0

In [None]:
# import retro to be able to install streetfighter
import retro

import time # to slow down game

In [None]:
# see different retro games
retro.data.list_games()

In [None]:
# python -m retro.import . # needs to be run from the roms folder to be able to run


In [None]:
# starts up the game environment
env = retro.make(game='StreetFighterIISpecialChampionEdition-Genesis') # IT FINALLY WORKED. THIS IS PROGRESS


In [None]:
env.close() # run this to close a previous instance of the game environment

In [None]:

# actions, moves or decisions available for the character or agent
env.action_space.sample() # look into what the floop this means!, along with observation_space

In [None]:
# what the character or agent can view about the current situation 
env.observation_space

In [None]:
obs = env.reset() # resets game to starting state
done = False # tells if game is dead

for game in range(1): # loops through game one time
    while not done: # keep looping until dead
        if done: # if dead, start(reset) again. 
            obs = env.reset()
        env.render() # renders environement
        obs, reward, done, info = env.step(env.action_space.sample())
        time.sleep(0.01) # to see some of the actions better
        print(reward)

In [None]:
info

## Custom Environment Setup


 ### What needs to be done:
- observation preprocess - grayscale, from delta, rezize the gram so we have less pixels

- filter the action - parameter
- Reward function - set to score for now



- Grayscale:

Change the game's view to black and white.
Why it's necessary: Colors often have lots of extra information that may not be needed. By using grayscale, you reduce the data's complexity, making it faster and sometimes easier for the agent to process and learn from.

- From Delta:

Look at how things change from one moment to the next, rather than the entire scene.
Why it's necessary: Often, the change in the environment (like movement of objects) is more important than the entire scene. By focusing on changes, the agent can better understand and react to dynamic elements, like moving obstacles.

- Resize (to have less pixels):

Make the game's view smaller by reducing the number of pixels.
Why it's necessary: A smaller image is faster to process. By reducing the image's size, you make computations quicker and require less memory, making the learning process more efficient.



In essence, the StreetFighter class is a custom setup for an AI to play Street Fighter. It ensures the AI sees the game in a simpler way, gets feedback after every move, and learns from it.

In [None]:
!pip3 install opencv-python

In [None]:
!pip3 install matplotlib

In [None]:
# Import environment base class for a wrapper
from gym import Env
# import the space shapes for the environment
from gym.spaces import MultiBinary, Box
# import numpy to calculate frame delta
import numpy as np
# import opencv for grayscaling
import cv2 

#import matplotlib for plotting the image

from matplotlib import pyplot as plt

In [None]:
# create custom training environment 




class StreetFighter(Env):
    def __init__(self):
        
#         When you turn on the console (__init__ method), it sets
#         up the controls (actions the AI can take) and the screen 
#         size (what the AI sees). It then loads the Street Fighter game.
        
        super().__init__()
        # specify action space and observation space
        self.observation_space = Box(low = 0, high = 255, shape=(84, 84, 1), dtype=np.uint8)
        self.action_space = MultiBinary(12)
        # startup an instance of the game
        self.game = retro.make(game='StreetFighterIISpecialChampionEdition-Genesis',
                               use_restricted_actions=retro.Actions.FILTERED)
        
        
        
        
        
#-----------------------------------------------------------------------------------------------------------------------------------------------------------------
    def reset(self):
        
        # Whenever you want to start a new game, you press the reset button.
        # The game screen is made simple (black and white, smaller size) for the AI.
        # The scoreboard is set to zero.
        
        
        
        obs = self.game.reset()
        # current frame - previous frame = delta
        
        obs = self.preprocess(obs)
        
        self.previous_frame = obs
        
        #create a attribute to hold the score delta
        self.score = 0
        
        return obs
     
        
        
        
# -----------------------------------------------------------------------------------------------------------------------------------------------------------------
    def preprocess(self, observation):
        

        # grayscale. remember by grayscaling we reduce the data to a single channel
        # this simplifies image data, and reduces computational burden
        gray = cv2.cvtColor(observation, cv2.COLOR_BGR2GRAY)
        
        # RESIZE. smaller dimension means less pixels and faster processing. 
        resize = cv2.resize(gray, (84,84), interpolation = cv2.INTER_CUBIC)
        
        # add the channels value
        channels = np.reshape(resize, (84,84,1))
        return channels

    
    
    
# -----------------------------------------------------------------------------------------------------------------------------------------------------------------   
    def step(self, action):
        
        # Here's the AI's turn to play!
        # The AI makes a move (action).
        # The game responds and shows a new screen.
        # Again, the screen is made simple for the AI.
        # The game checks what's changed on the screen from the last move.
        # The game also checks if the AI's move improved the score.
        # The AI gets feedback: how the game changed, how good the move was,
        # and if the game is over.
        
        
        
       # how does this work?
        # 1. we get the current frame
        # 2. preprocess 200x256x3 -> 84x84x1
        # 3. change in pixels current_frame - the last frame
        
        # Think of it like this: After every move the agent makes, the step 
        # method provides feedback about what changed and how good that move was.
        
        # take a step
        obs, reward, done, info = self.game.step(action)
        
        obs = self.preprocess(obs)
        
        # frame delta 
        frame_delta = obs-self.previous_frame
        self.previous_frame = obs
        
        # reshape the reward function
        reward = info['score'] - self.score
        self.score = info['score']
        
        return frame_delta, reward, done, info
    

        
        
        
# -----------------------------------------------------------------------------------------------------------------------------------------------------------------   
    def render(self, *args, **kwargs):
        # You can watch the AI play the game. This is like the game being displayed on your TV.

        self.game.render()
        


    
# -----------------------------------------------------------------------------------------------------------------------------------------------------------------    
    def close(self):
    # After the AI's done playing, you can shut down the game.

        self.game.close()
    
    

In [None]:
# test out your new streetfighter class that will intiliaze your custom environment

env = StreetFighter()

In [None]:
env.action_space.shape

In [None]:
#using the old loop to run the environment

obs = env.reset() # resets game to starting state
done = False # tells if game is dead

for game in range(1): # loops through game one time
    while not done: # keep looping until dead
        if done: # if dead, start(reset) again. 
            obs = env.reset()
        env.render() # renders environement
        obs, reward, done, info = env.step(env.action_space.sample())
        time.sleep(0.01) # to see some of the actions better
        
        if reward > 0: # print only when ryu hits. Possible todo: should blocking be rewarded?
            print(reward)

In [None]:
obs = env.reset()

In [None]:
# the following will show the frames being processed in gray scale
# with the reduced size. 

obs, reward, done, info = env.step(env.action_space.sample())

In [None]:
plt.imshow(cv2.cvtColor(obs, cv2.COLOR_BGR2RGB))

## Hyperparameter Tuning

PPO Hyperparameters to tune

n_steps: batch size (frames in buffer)
gamma: discount rate for calculating returns
learning_rate: learning coefficient for optimizer
clip_range: clipping amount for advantage calc
gae_lambda: advantage smoothing parameter


In [None]:
# need to install pytorch, optuna
!pip3 install torch torchvision torchaudio



In [None]:
#install stable baseline 
!pip3 install "stable-baselines3[extra] == 1.4.0"

In [None]:
# installing optuna
!pip3 install optuna 

In [None]:
#import dependencies. 


# the optimization frame work - HPO
import optuna 

#PPO algo for RL
from stable_baselines3 import PPO

# bringin in the eval policy method for metric calculation
from stable_baselines3.common.evaluation import evaluate_policy

# Import the sb3 monitor for logging
from stable_baselines3.common.monitor import Monitor

# import the vec wrappers to vectorize and frame stack
from stable_baselines3.common.vec_env import DummyVecEnv, VecFrameStack

import os


In [None]:
LOG_DIR = './logs/' # saves logs here
OPT_DIR = './opt/' # saves each model here

In [None]:
# function to return test hyperparameters -define the object function

# Hyperparameters:

# 'n_steps': The number of steps to run for each environment per update.
# Optuna will try different values between 2048 and 8192.

# 'gamma': Discount factor. It determines how much importance to give to future rewards.
# Optuna will try different values between 0.8 and 0.9999 in a logarithmic manner.

# 'learning_rate': The rate at which the algorithm learns.
# Optuna will try different logarithmic values between 1e-5 and 1e-4.

# 'clip_range': Used in the PPO algorithm to clip the new policy probability to prevent large updates.
# Optuna will try different values between 0.1 and 0.4.

# 'gae_lambda': Lambda value used in Generalized Advantage Estimation (a technique in reinforcement learning).
# Optuna will try different values between 0.8 and 0.99.



def optimize_ppo(trial):
    return {
        'n_steps': trial.suggest_int('n_steps', 2048, 8192),
        'gamma': trial.suggest_loguniform('gamma', 0.8, 0.9999),
        'learning_rate': trial.suggest_loguniform('learning_rate', 1e-5, 1e-4),
        'clip_range': trial.suggest_uniform('clip_range', 0.1, 0.4),
        'gae_lambda': trial.suggest_uniform('gae_lambda', 0.8, 0.99)
    }


In [None]:
SAVE_PATH = os.path.join(OPT_DIR, 'trial_{}_best_model'.format(1))


In [None]:
# run a training loop and return mean reward
# Run a training loop and return mean reward 
def optimize_agent(trial):
    try:
        model_params = optimize_ppo(trial) 

        # Create environment 
        env = StreetFighter()
        env = Monitor(env, LOG_DIR)
        env = DummyVecEnv([lambda: env])
        env = VecFrameStack(env, 4, channels_order='last')

        # Create algo 
        model = PPO('CnnPolicy', env, tensorboard_log=LOG_DIR, verbose=0, **model_params)
        model.learn(total_timesteps=30000)
        #model.learn(total_timesteps=100000)

        # Evaluate model 
        mean_reward, _ = evaluate_policy(model, env, n_eval_episodes=5)
        env.close()

        SAVE_PATH = os.path.join(OPT_DIR, 'trial_{}_best_model'.format(trial.number))
        model.save(SAVE_PATH)

        return mean_reward

    except Exception as e:
        return -1000

In [None]:
# Creating the experiment 
study = optuna.create_study(direction='maximize')
study.optimize(optimize_agent, n_trials=100, n_jobs=1)

In [None]:
study.best_params

In [None]:
model = PPO.load(os.path.join(OPT_DIR, 'best_model'))

## Setup Callback

In [None]:
# import base callback

from stable_baselines3.common.callbacks import BaseCallback

In [None]:
class TrainAndLoggingCallback(BaseCallback):

    def __init__(self, check_freq, save_path, verbose=1):
        super(TrainAndLoggingCallback, self).__init__(verbose)
        self.check_freq = check_freq
        self.save_path = save_path

    def _init_callback(self):
        if self.save_path is not None:
            os.makedirs(self.save_path, exist_ok=True)

    def _on_step(self):
        if self.n_calls % self.check_freq == 0:
            model_path = os.path.join(self.save_path, 'best_model_{}'.format(self.n_calls))
            self.model.save(model_path)

        return True

In [None]:
CHECKPOINT_DIR = './train' 

In [None]:
callback = TrainAndLoggingCallback(check_freq=10000, save_path=CHECKPOINT_DIR)


## Train Model

In [None]:
# Create environment 
env = StreetFighter()
env = Monitor(env, LOG_DIR)
env = DummyVecEnv([lambda: env])
env = VecFrameStack(env, 4, channels_order='last')

In [None]:
model_params = study.best_params
model_params['n_steps'] = 7488  # set n_steps to 7488 or a factor of 64
# model_params['learning_rate'] = 5e-7
model_params

In [None]:
model = PPO('CnnPolicy', env, tensorboard_log=LOG_DIR, verbose=1, **model_params)


In [None]:
# Reload previous weights from HPO
model.load(os.path.join(OPT_DIR, 'trial_5_best_model.zip'))

In [None]:
# Kick off training 
model.learn(total_timesteps=100000, callback=callback)
# model.learn(total_timestep=5000000) 

## Evaluate Model

In [None]:
model = PPO.load('./train/best_model_5460000.zip')


In [None]:
mean_reward, _ = evaluate_policy(model, env, render=True, n_eval_episodes=1)


In [None]:
mean_reward

## Test out the Model


In [None]:
obs = env.reset()


In [None]:
obs.shape


In [None]:
env.step(model.predict(obs)[0])


In [None]:
# Reset game to starting state
obs = env.reset()
# Set flag to flase
done = False
for game in range(1): 
    while not done: 
        if done: 
            obs = env.reset()
        env.render()
        action = model.predict(obs)[0]
        obs, reward, done, info = env.step(action)
        time.sleep(0.001)
        print(reward)