# Dependencies

In [None]:
import os # for file paths
import retro # framework to interact with the game
from gym import Env # environment base class
from gym.spaces import MultiBinary, Box # space shapes for the game environment
import numpy as np # for calculating frame changes
import cv2 as cv # for image preprocessing
import time # to slow down each frame so we can clearly see the game as it's being played
import math # to calculate the reward on each step
#from matplotlib import pyplot as plt
import optuna # for optimizing the agent's hyperparameters easily

# For building the DL model
from tensorflow import keras
from keras.models import Sequential
from keras.layers import Dense, Flatten, InputLayer
from keras.optimizers import Adam
from keras.losses import MeanSquaredError

# For building the DQN agent
from rl.agents import DQNAgent
from rl.memory import SequentialMemory
from rl.policy import EpsGreedyQPolicy, LinearAnnealedPolicy

# Game Environment

In [None]:
# Specifying directories where the models will be saved
LOG_DIR = "./logs/"
OPT_DIR = "./opt/"
TRAIN_DIR = "./train/"

In [None]:
# B, A, _, _, UP, DOWN, LEFT, RIGHT, C, Y, X, Z
# Every possible action for a given step in Street Fighter II
possible_actions = {
    # Idle
    0: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
    # Left
    1 : [0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0],
    # Right
    2 : [0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0],
    # Up
    3 : [0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0],
    # Down
    4 : [0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0],
    # Light Kick
    5 : [0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
    # Medium Kick
    6 : [1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
    # Hard Kick
    7 : [0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0],
    # Light Punch
    8 : [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0],
    # Medium Punch
    9 : [0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0],
    # Hard Punch
    10 : [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
    # Down Left
    11 : [0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0],
    # Down Right
    12 : [0, 0, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0],
    # Up Left
    13 : [0, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0, 0],
    # Up Right
    14 : [0, 0, 0, 0, 1, 0, 0, 1, 0, 0, 0, 0],
}

In [None]:
# Creating a wrapper to interact with the game environment

class StreetFighterWrapper(Env):
    
    def __init__(self):
        super(StreetFighterWrapper, self).__init__()
        
        # Starting health in Street Fighter II is 176 HP
        self.START_HEALTH = 176
        self.REWARD_COEFF = 17
        self.PENALTY_COEFF = 1.75
        self.LOSS_PENALTY_COEFF = 0.35
        
        # defining our observation space to take frames from a
        self.observation_space = Box(low = 0, high = 255, shape = (100, 100, 1), dtype=np.uint8)
        
        # defining our own set of actions that mimics the original
        self.action_space = MultiBinary(12)
        
        # setting up the game environment
        self.game = retro.make(game='StreetFighterIISpecialChampionEdition-Genesis', use_restricted_actions=retro.Actions.FILTERED)

        # enemy and agent health values that get updated as the game goes along
        self.enemy_health = self.START_HEALTH
        self.agent_health = self.START_HEALTH

        # Creating a score variable to hold the agent's current score; important for calculating the reward on each step
        self.score = 0
    
    def step(self, action):
        # Getting the array of button combinations that corresponds to the number generated by the agent
        action_arr = possible_actions[action]
        
        # calculate change in current frame vs previous frame
        obs, reward, done, info = self.game.step(action_arr)
        obs = self.preprocess(obs)

        ## reward = info['score'] - self.score

        # calculating the change in health for each player (i.e. health from this frame - health from previous frame)
        enemy_damage_taken = abs(info['enemy_health'] - self.enemy_health)
        agent_damage_taken = abs(info['health'] - self.agent_health)
        
        # Tweaking the reward function to be the score of this step - score from the previous step (i.e. the change in score)

        # catching edge cases to make sure no reward is being earned outside of a fight (i.e. in between rounds)
        if (self.enemy_health != 0 and info['enemy_health'] == 0 and self.agent_health != 0 and info['health'] == 0) or (enemy_damage_taken == 0 and agent_damage_taken == 0) or (self.agent_health == 0 and self.enemy_health == 0):
            reward = 0
        
        # If the agent wins and enemy loses
        elif info['enemy_health'] < 0:
            reward = self.START_HEALTH * math.log(info['health'], self.START_HEALTH) * self.REWARD_COEFF
            
        # if the enemy wins and agent loses
        elif info['health'] < 0:
            reward = -math.pow(self.START_HEALTH, (info['enemy_health'] / self.START_HEALTH)) * self.LOSS_PENALTY_COEFF
                               
        # the fight goes on
        else:
            # If the enemy took more damage than the agent
            if enemy_damage_taken > agent_damage_taken:
                reward = ((enemy_damage_taken) - (agent_damage_taken)) * self.REWARD_COEFF
            # If the agent took more or same amount of damage than the enemy
            else:
                reward = ((enemy_damage_taken) - (agent_damage_taken)) * self.PENALTY_COEFF

        # as long as the game hasn't ended, set the "previous" health as the current health so we can calculate the reward for the next step
        self.enemy_health = info['enemy_health']
        self.agent_health = info['health']
        
        # set the "previous" score as the current score so we can calculate the reward for the next step
        self.score = info['score']

        # only print the reward earned if it's non-zero
        if (reward != 0):
            print("Agent Health: {} Enemy Health: {} Agent Damage Taken: {} Enemy Damage Taken: {} Reward: {}".format(info['health'], info['enemy_health'], agent_damage_taken, enemy_damage_taken, reward))
            ## print("Agent Health: {} Enemy Health: {} Reward: {}".format(info['health'], info['enemy_health'], reward))
        return obs, reward, done, info

    # Renders the current game frame
    def render(self, mode= 'human'):
        self.game.render(mode = 'human')

    def reset(self):
        # Reset the observation
        obs = self.game.reset()
        # Preprocess the observation for the agent
        obs = self.preprocess(obs)

        # Reset the game data
        self.score = 0
        self.enemy_health = self.START_HEALTH
        self.agent_health = self.START_HEALTH
        
        return obs
    
    def preprocess(self, obs):
    
        # set image to greyscale
        greyed = cv.cvtColor(obs, cv.COLOR_BGR2GRAY)
        
        # and make it smaller
        resized = cv.resize(greyed, (100, 100), interpolation=cv.INTER_CUBIC)
        
        processed = np.reshape(resized, (100, 100, 1))
        
        return processed
    
    def close(self):
        # Close the game environment
        self.game.close()

In [None]:
# Instantiating our environment and wrapping it in a FrameStack to keep track of the past 4 frames
env = StreetFighterWrapper()

In [None]:
# Variables to store observation and action space data
height, width, channels = env.observation_space.shape
frames = 4
actions = len(possible_actions)

In [None]:
# Game Loop

# Resets the observations
obs = env.reset()

# Flag that controls the inner loop
done = False

# Play one match
for game in range(1):
    while not done:
        if done:
            obs = env.reset()
        # renders the game frame
        env.render()
        # Taking random actions
        action = possible_actions[np.random.randint(len(possible_actions))]
        # Info contains: continueTimer, enemy_health, enemy_matches_won, health, matches_won, score
        obs, reward, done, info = env.step(action) # taking random actions
        # To make the game slower so it can be watched more closely
        time.sleep(0.01)


In [None]:
env.close()

# Hyperparameter Optimization
Using the Optuna library to optimize hyperparameters for a Deep Q Network.
These hyperparameters include:
DL Model: # of layers, # of units in each Dense layer,
DQN Agent: SequentialMemory - limit,
LinearAnnealedPolicy - value_test,
Other agent parameters: target_model_update, learning rate


In [None]:
def optimize_model(trial):
    # Testing either 3 or 4 Dense layers
    n_layers = trial.suggest_int("n_layers", 3, 4)

    # Creating the model
    model = Sequential()

    # Adding the input layer - input shape 
    model.add(Flatten(input_shape = (frames, height, width, channels)))
    
    # Suggest values of the number of units in each Dense layer
    for i in range(n_layers):
        model.add(
            Dense(units = trial.suggest_categorical(f'n_units_l{i}', [64, 96, 128, 256, 512]),
                  activation = 'relu'))

    # Output layer
    model.add(Dense(actions, activation='linear'))
    # Keras RL needs a Flatten layer at the end for the agent to work
    model.add(Flatten())

    return model

In [None]:
def optimize_dqn(trial):
    # Testing out various hyperparameters for the DQN Agent
    dqn_params = {
        'limit': trial.suggest_int('limit', 1000, 3000),
        'policy': LinearAnnealedPolicy(EpsGreedyQPolicy(), attr = 'eps', value_max = 1.0, value_min = 0.1, value_test = trial.suggest_float('value_test', 0.05, 0.2), nb_steps = 30000),
        'target_model_update': trial.suggest_float('target_model_update', 0.01, 1.0, log = True),
        'learning_rate' : trial.suggest_float('learning_rate', 1e-7, 1e-3, log = True)
    }
    
    return dqn_params

In [None]:
def optimize_agent(trial):
    try:
        # Getting the DQN and model parameters
        model = optimize_model(trial)
        dqn_params = optimize_dqn(trial)
    
        # Create environment
        env = StreetFighterWrapper()
        
        # Creating the DQN
        agent = DQNAgent(
        model = model,
        memory = SequentialMemory(limit = dqn_params['limit'], window_length = frames), # window length has to align with input shape in the NN
        policy = dqn_params['policy'],
        enable_dueling_network = True, dueling_type = 'avg',
        nb_actions = actions,
        nb_steps_warmup = 5000,
        target_model_update = dqn_params['target_model_update']
        )

        # Compile and fit the agent to the environment
        agent.compile(Adam(learning_rate = (dqn_params['learning_rate'])), metrics = ['mae'])
        agent.fit(env, 30000, action_repetition = 1, callbacks = None, verbose = 2, visualize = False)
    
        # Evaluate the model
        scores = agent.test(env, nb_episodes = 10, visualize = False)
        mean_reward = np.mean(scores.history['episode_reward'])
        print(f"Mean reward for trial {trial.number} is {mean_reward}")
        env.close()
    
        SAVE_PATH = os.path.join(OPT_DIR, f"trial_{trial.number}_best_agent_weights.h5f")
        agent.save_weights(SAVE_PATH)
    
        return mean_reward
        
    except Exception as e:
        return -500

In [None]:
env.close()

In [None]:
# Creating the experiment to find the optimal DQN and model hyperparameters
study = optuna.create_study(direction='maximize')
study.optimize(optimize_agent, n_trials = 150, n_jobs = 1)

# Building the Agent

In [None]:
# Storing the best agent hyperparameters and the number of the trial that had the best results
params = study.best_params
best_trial_number = study.best_trial.number

In [None]:
env = StreetFighterWrapper()

In [None]:
def build_model(params):
    # Creating the model with tuned hyperparameters
    model = Sequential()
    model.add(Flatten(input_shape = (frames, height, width, channels)))

    for i in range(params['n_layers']):
        model.add(Dense(units = params[f"n_units_l{i}"], activation = 'relu'))

    model.add(Dense(actions, activation='linear'))
    model.add(Flatten()) 

    return model

In [None]:
def build_dqn_agent(params):
    # Creating the agent and setting hyperparameters to the tuned values
    agent = DQNAgent(
    model = build_model(params),
    memory = SequentialMemory(limit = params['limit'], window_length = frames), # window length has to align with input shape in the NN
    policy = LinearAnnealedPolicy(EpsGreedyQPolicy(), attr = 'eps', value_max = 1.0, value_min = 0.1, value_test = params['value_test'], nb_steps = 30000),
    enable_dueling_network = True, dueling_type = 'avg',
    nb_actions = actions,
    nb_steps_warmup = 1000,
    target_model_update = params['target_model_update']
    )

    # Loading the weights from the best trial so it gets a headstart when training
    agent.load_weights(f"{OPT_DIR}/trial_{best_trial_number}_best_agent_weights.h5f")

    return agent

In [None]:
# Creating the DQN agent with optimized hyperparameters
agent = build_dqn_agent(params)
# Compiling the agent
agent.compile(Adam(learning_rate = (params['learning_rate'])), loss = 'MeanSquaredError()', metrics = ['mae'])
num_steps = 100000
# Training the agent
agent.fit(env, num_steps, action_repetition = 1, callbacks = None, verbose = 2, visualize = False)

In [None]:
agent.save_weights(f"{TRAIN_DIR}/agent_{num_steps}_steps.h5f")

# Testing out the agent

In [None]:
agent.load_weights(f"{TRAIN_DIR}/agent_{num_steps}_steps.h5f")

In [None]:
# Resets the observations
obs = env.reset()

# Flag that controls the inner loop
done = False

# Play one match
for game in range(1):
    while not done:
        if done:
            obs = env.reset()
        # renders the game frame
        env.render()
        action = agent.forward(obs)
        # Info contains: continueTimer, enemy_health, enemy_matches_won, health, matches_won, score
        obs, reward, done, info = env.step(action)
        time.sleep(0.01)