In [None]:
import retro # framework to interact with the game
from gym import Env # environment base class
from gym.spaces import MultiBinary, Box # space shapes for the game environment

import os # for file paths
import ast
import pandas as pd
import numpy as np # for calculating frame changes
import cv2 as cv # for image prep|rocessing
import time # to slow down each frame so we can clearly see the game as it's being played
import math # to calculate the reward on each step

# Importing the optimzation frame - HPO
import optuna

# PPO algo for RL
from stable_baselines3 import PPO
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.vec_env import DummyVecEnv, VecFrameStack

In [None]:
# Create custom environment 
class StreetFighter(Env): 
    def __init__(self):
        super().__init__()
        # Specify action space and observation space 
        self.observation_space = Box(low=0, high=255, shape=(84, 84, 1), dtype=np.uint8)
        self.action_space = MultiBinary(12)
        # Startup and instance of the game 
        self.game = retro.make(game='StreetFighterIISpecialChampionEdition-Genesis')
        
        # Starting health in Street Fighter II is 176 HP
        self.START_HEALTH = 176
        self.REWARD_COEFF = 17
        self.PENALTY_COEFF = 1.75
        self.LOSS_PENALTY_COEFF = 0.35
        
        # enemy and player health values that get updated as the game goes along
        self.enemy_health = self.START_HEALTH
        self.player_health = self.START_HEALTH

        # Creating a score variable to hold the player's current score; important for calculating the reward on each step
        self.score = 0
        
    def reset(self):
        # Return the first frame 
        obs = self.game.reset()
        obs = self.preprocess(obs) 
        self.previous_frame = obs 
        
        # Create a attribute to hold the score delta 
        self.score = 0
        self.enemy_health = self.START_HEALTH
        self.player_health = self.START_HEALTH
        
        return obs
    
    def preprocess(self, observation): 
        # gray scaling from colour
        gray = cv.cvtColor(observation, cv.COLOR_BGR2GRAY)
        # resize frame from 256*200 to 84*84 for faster processing
        resize = cv.resize(gray, (84,84), interpolation=cv.INTER_CUBIC)
        # add the channels value
        channels = np.reshape(resize, (84,84,1))
        return channels 
    
    def step(self, action): 
        obs, reward, done, info = self.game.step(action)
        #preprocess state
        obs = self.preprocess(obs) 
        
        # get difference of the frames 
        frame_delta = obs - self.previous_frame
        # update previous frame as current frame
        self.previous_frame = obs 
        
        # Reshape the reward function
         # calculating the change in health for each player (i.e. health from this frame - health from previous frame)
        enemy_damage_taken = abs(info['enemy_health'] - self.enemy_health)
        player_damage_taken = abs(info['health'] - self.player_health)
        
        # catching edge cases to make sure no reward
        if (self.enemy_health != 0 and info['enemy_health'] == 0 and self.player_health != 0 and info['health'] == 0) or (enemy_damage_taken == 0 and player_damage_taken == 0) or (self.player_health == 0 and self.enemy_health == 0):
            reward = 0
        
        # If the player wins and enemy loses
        elif info['enemy_health'] < 0 and info['health'] > 0:
            reward = self.START_HEALTH * math.log(info['health'], self.START_HEALTH) * self.REWARD_COEFF
            
        # if the enemy wins and player loses
        elif info['health'] < 0 and info['enemy_health'] > 0:
            reward = -math.pow(self.START_HEALTH, (info['enemy_health'] / self.START_HEALTH)) * self.LOSS_PENALTY_COEFF
                               
        else:
            # If the enemy took more damage than the player
            if enemy_damage_taken > player_damage_taken:
                reward = ((enemy_damage_taken) - (player_damage_taken)) * self.REWARD_COEFF
            # If the player took more or same amount of damage than the enemy
            else:
                reward = ((enemy_damage_taken) - (player_damage_taken)) * self.PENALTY_COEFF

        #update current health
        self.enemy_health = info['enemy_health']
        self.player_health = info['health']
        
        #update current score to compare with next state
        self.score = info['score']
            
        return frame_delta, reward, done, info
    
    def render(self, *args, **kwargs):
        self.game.render()
        
    def close(self):
        self.game.close()

In [None]:
# function to return hyperparameters with their range to test 
def optimize_ppo(trial): 
    return {
        'n_steps':trial.suggest_int('n_steps', 2048, 8192),
        'gamma':trial.suggest_loguniform('gamma', 0.8, 0.9999),
        'learning_rate':trial.suggest_loguniform('learning_rate', 1e-5, 1e-4),
        'clip_range':trial.suggest_uniform('clip_range', 0.1, 0.4),
        'gae_lambda':trial.suggest_uniform('gae_lambda', 0.8, 0.99)
    }

In [None]:
# run optimisation and return mean_reward as the indicator
def optimize_agent(trial):
    try:
        # declare hyperparameters to test
        model_params = optimize_ppo(trial) 

        # create environement with additional wrapper for vectorisation and framing 
        env = StreetFighter()
        env = Monitor(env, LOG_DIR)
        env = DummyVecEnv([lambda: env])
        env = VecFrameStack(env, 4, channels_order='last')

        # create PPO agent model
        model = PPO('CnnPolicy', env, tensorboard_log=LOG_DIR, verbose=0, **model_params)
        model.learn(total_timesteps=30000)

        # evaluate model with 5 episodes
        mean_reward, _ = evaluate_policy(model, env, n_eval_episodes=5)
        env.close()
        
        #save the best model
        SAVE_PATH = os.path.join(OPT_DIR, 'trial_{}_best_model'.format(trial.number))
        model.save(SAVE_PATH)

        return mean_reward
    
    except Exception as e:
        return -1000

In [None]:
model = PPO.load('./best_model_30000.zip')

In [None]:
LOG_DIR = './logs/'

env = StreetFighter()
env = DummyVecEnv([lambda: env])
env = VecFrameStack(env, 4, channels_order='last')

In [None]:
# Reset game to starting state
obs = env.reset()
# Set flag to flase
done = False


prev_player_win = 0
player_wins = 0

prev_enemy_win = 0
enemy_wins = 0

rewards = []
actions = []
    
for game in range(1): 
    while not done: 
        if done:
            enemy_wins = info[0]['enemy_matches_won']
            player_wins = info[0]['matches_won']
            obs = env.reset()
            
        env.render()
        
        action = model.predict(obs)[0]
        obs, reward, done, info = env.step(action)
        time.sleep(0.01)
        
        rewards.append(reward[0])
        actions.append(action[0])
        
        if prev_player_win > 0 and info[0]['matches_won'] == 0:
            prev_player_win = 0
            prev_enemy_win = 0
        elif prev_enemy_win > 0 and info[0]['enemy_matches_won'] == 0:
            prev_player_win = 0
            prev_enemy_win = 0
        else:
            if prev_player_win != info[0]['matches_won']:
                player_wins += 1
                prev_player_win = info[0]['matches_won']
                
            if prev_enemy_win != info[0]['enemy_matches_won']:
                enemy_wins += 1
                prev_enemy_win = info[0]['enemy_matches_won']
                
        print(reward[0], action[0], player_wins, enemy_wins)

In [None]:
actions_copy = actions
actions_list = []

for l in actions_copy:
    l_list = l.tolist()
    l_list = [round(i) for i in l_list]
    
    actions_list.append(l_list)

In [None]:
pos_rewards = 0
neg_rewards = 0

for i in rewards:
    if i < 0:
        neg_rewards += 1
    elif i > 0:
        pos_rewards += 1
        
print(pos_rewards, neg_rewards)

In [None]:
df_record = pd.DataFrame({
    'rewards' : rewards,
    'actions' : actions_list
})

df_record.to_csv('ppo_record.csv', index=False)

In [None]:
df_compare = pd.DataFrame(columns = [
    'pos_rewards', 'neg_rewards', 'player_wins', 'enemy_wins'
])


df_compare.loc[len(df_compare.index)] = [
    pos_rewards, neg_rewards, player_wins, enemy_wins
] 

df_compare.to_csv('ppo_compare.csv', index=False)