In [None]:
import retro
from gym import Env
import numpy as np
import gym
import imageio

# Custom Environment

In [None]:
class Discretizer(gym.ActionWrapper):
    """
    Wrap a gym environment and make it use discrete actions.
    Args:
        combos: ordered list of lists of valid button combinations
    """

    def __init__(self, env, combos):
        super().__init__(env)
        assert isinstance(env.action_space, gym.spaces.MultiBinary)
        buttons = env.unwrapped.buttons
        self._decode_discrete_action = []
        for combo in combos:
            arr = np.array([False] * env.action_space.n)
            for button in combo:
                arr[buttons.index(button)] = True
            self._decode_discrete_action.append(arr)

        self.action_space = gym.spaces.Discrete(len(self._decode_discrete_action))

    def action(self, act):
        return self._decode_discrete_action[act].copy()

In [None]:
class SF2Discretizer(Discretizer):
    """
    Use Sonic-specific discrete actions
    based on https://github.com/openai/retro-baselines/blob/master/agents/sonic_util.py
    """
    def __init__(self, env):
        super().__init__(env=env, combos=[[],['DOWN'],['DOWN','LEFT'],['DOWN','RIGHT'],['LEFT'],['RIGHT'],['UP'],['UP','LEFT'],['UP','RIGHT'],['A'],['B'],['C'],['X'],['Y'],['Z']])

In [None]:
class StreetFighterEnv(Env):
    def __init__(self):
        super().__init__()
        self.game = SF2Discretizer(retro.make(game='StreetFighterIISpecialChampionEdition-Genesis', use_restricted_actions=retro.Actions.FILTERED))
        self.observation_space = self.game.observation_space
        self.action_space = self.game.action_space
        
        # Create a dedicated random number generator for the environment
        self.np_random = np.random.RandomState()

    def step(self, action):
        obs, reward, done, info = self.game.step(action)

        # Custom hp-based reward to encourage defense and attack
        if self.previous_enemy_health is None or self.previous_health is None:
            reward = 0
        else:
            # Reward is the hp changes
            reward = (self.previous_enemy_health - info['enemy_health']) - (self.previous_health - info['health'])
        self.previous_enemy_health = info['enemy_health']
        self.previous_health = info['health']

        # Stop on life loss
        if info['enemy_health'] <= 0 or info['health'] <= 0:
            done = True

        return obs, reward, done, info

    def reset(self):
        obs = self.game.reset()
        self.previous_enemy_health = None
        self.previous_health = None
        return obs

    def render(self, mode="human"): 
        return self.game.render(mode=mode)
    
    def close(self): 
        self.game.close()

    # The functions betlow is required for AtariWrapper

    def get_action_meaning(self, act):
        return 'NOOP' if act == 0 else self.game.get_action_meaning(self.game.action(act))
    
    def get_action_meanings(self):
        return [self.get_action_meaning(act) for act in range(self.action_space.n)]

    def seed(self, seed=None):
        # if there is no seed, return an empty list
        if seed is None:
            return []
        # set the random number seed for the NumPy random number generator
        self.np_random.seed(seed)
        # return the list of seeds used by RNG(s) in the environment
        return [seed]

# Preprocess Environment

In [None]:
from stable_baselines3.common.vec_env import VecFrameStack, VecMonitor, DummyVecEnv, SubprocVecEnv
from stable_baselines3.common.atari_wrappers import AtariWrapper
from stable_baselines3.common.utils import set_random_seed

In [None]:

def make_env(rank: int, seed: int = 0):
    def _init():
        # Create the base environment
        env = StreetFighterEnv()
        env.seed(seed + rank)

        # Random NOOP at the fight start
        # Accumulate rewards every 4 frames
        # Reduce resolution to 84 pixels grayscale
        env = AtariWrapper(env, noop_max=10, frame_skip=4, screen_size=84, terminal_on_life_loss=False, clip_reward=False)

        return env
    set_random_seed(seed)
    return _init

def make_vec_env(num_cpu: int):
    # Create the vectorized environment
    if num_cpu > 1:
        env = SubprocVecEnv([make_env(i) for i in range(num_cpu)])
    else:
        env = DummyVecEnv([make_env(i) for i in range(num_cpu)])

    # Stacking frames to detect movement
    env = VecFrameStack(env, 4, channels_order="last")

    # Monitor episode reward and length
    env = VecMonitor(env)
    return env

# Train RL Model

In [None]:
# Import PPO for algos
from stable_baselines3 import PPO
# Import Base Callback for saving models
from stable_baselines3.common.callbacks import CheckpointCallback

In [None]:
CHECKPOINT_DIR = './checkpoints/'
LOG_DIR = './logs/'

In [None]:
# Setup model saving callback
callback = CheckpointCallback(save_freq=10000, save_path=CHECKPOINT_DIR, verbose=1)

In [None]:
ppo_params = {
    'learning_rate': 3e-5,
    'batch_size': 128,
    'gamma': 0.9, # Discount factor for future reward, reduced from 0.99 because the agent only need to see a few step ahead
    'n_steps': 1024, # Steps to run per update, enough to pass an episode
}

In [None]:
# Number of processes to use
num_cpu = 8
print(f'Loading {num_cpu} environments for training')
env = make_vec_env(num_cpu)

# This is the AI model started
model = PPO('CnnPolicy', env, verbose=1, tensorboard_log=LOG_DIR, **ppo_params)
# model = PPO.load('./checkpoints/rl_model_13050624_steps', env, verbose=1, tensorboard_log=LOG_DIR, **ppo_params)

# Train the AI model, this is where the AI model starts to learn
try:
    model.learn(total_timesteps=1e7, callback=callback, reset_num_timesteps=False)
finally:
    env.close()
    model.save(CHECKPOINT_DIR + 'latest_model')

# Test It Out

In [None]:
# Load trained model
model = PPO.load('./latest_model')

In [None]:
# Create the test environment
env = make_vec_env(1)

# Loop through the games
games = 5
for i in range(games):
    # Reset game to starting state
    images = []
    obs = env.reset()
    done = False
    while not done:
        action, _ = model.predict(obs)
        obs, reward, done, info = env.step(action)
        img = env.render(mode='rgb_array')
        images.append(img)
    
    # Make a GIF for each game
    imageio.mimsave(f'./replays/game_{i + 1}.gif', images, fps=24)
env.close()