In [1]:
# Import libraries
import gym, retro # For the environment
import time  # For slowing down fights
import os
import sys

# Import the libraries necessary for data preprocessing.

from gym import Env  # Base environment class for a wrapper
from gym.spaces import MultiBinary, Box  # Ensure we pick the correct action space type. (Space shapes for the environment)
import numpy as np  # To calculate frame delta
import cv2  # For grayscaling
from matplotlib import pyplot as plt  # For plotting observation images

import optuna  # Importing the optimization framework that allows to both train and tune at the same time
import torch
import os  # For exporting the model
from stable_baselines3 import PPO  # PPO algorithm for RL
from stable_baselines3.common.evaluation import evaluate_policy  # Metric calculation of agent performance
from stable_baselines3.common.monitor import Monitor  # SB3 Monitor for logging
from stable_baselines3.common.vec_env import DummyVecEnv, VecFrameStack  # Vec wrappers to vectorize and frame stack
import tensorboard as tb

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# Class definition
class StreetFighter(Env):
    def __init__(self):

        # Inherit from our base environment
        super().__init__()

        # Specify action and observation spaces
        self.observation_space = Box(low=0, high=255, shape=(84, 84, 1), dtype=np.uint8)  # We create our observation space based on the new size and colors
        self.action_space = MultiBinary(12)  # We replicate the base action environment

        # Startup and instance the game
        # The second parameter will limit actions to only valid ones.
        try:
            self.game = retro.make(
                game='StreetFighterIISpecialChampionEdition-Genesis', 
                use_restricted_actions=retro.Actions.FILTERED
            )
        except Exception as e:
            print(f"Error creating retro environment: {e}")
            sys.exit(1)


    def reset(self):
        # Return first frame, preprocess the frame, and define score back to 0.

        self.previous_frame = np.zeros(self.game.observation_space.shape)

        obs = self.game.reset()  # Will return our observation
        obs = self.preprocess(obs)  # We preprocess the observation

        self.health = 176  # Initial health
        self.enemy_health = 176

        self.matches_won = 0
        self.enemy_matches_won = 0
        
        
        # Game delta = Current_frame - Previous_frame
        # Preprocess
        self.previous_frame = obs

        # Attribute to hold delta score.
        self.score = 0

        return obs
    
    def preprocess(self, observation):
        # Grayscale, and resize frame
        
        # Grayscaling
        gray = cv2.cvtColor(observation, cv2.COLOR_BGR2GRAY)

        # Resizing
        resize = cv2.resize(gray, (84, 84), interpolation=cv2.INTER_CUBIC)
        
        channel = np.reshape(resize, (84, 84, 1))  # We add the grayscale layer since its what gym expects

        return channel

    def step(self, action):
        # We take a step, preprocess the observation, calculate frame delta and reshape the reward function

        # Take a step
        obs, reward, done, info = self.game.step(action)  # New step based on an action

        obs = self.preprocess(obs)  # We preprocess the observation

        # Frame delta

        # We subtract the current one from the previous one and then we set the current as the last one.
        frame_delta = obs  # - self.previous_frame
        # self.previous_frame = obs

        # Reshape the reward function based on relative score
        # reward = info['score'] - self.score  # Current reward minus the previous score
        # self.score = info['score']  # We set our score to the current score.
        
        delta_enemy = (self.enemy_health - info['enemy_health']) / 176
        delta_self = (info['health'] - self.health) / 176
        reward = delta_enemy * 10 - delta_self * 5

        if abs(delta_enemy) < 1e-6 and abs(delta_self) < 1e-6:
            reward -= 0.001 # small penalty for idling

        if done:
            reward += (info['matches_won'] - info['enemy_matches_won']) * 20  # match win/loss bonus
            if info["enemy_health"] <= 0:
                reward += 50  # big win bonus
            elif info["health"] <= 0:
                reward -= 60  # big loss penalty
            


                
        delta_score = info.get('score', 0) - getattr(self, 'score', 0)
        self.score = info.get('score', self.score)
        reward += delta_score * 0.001


       # Update values
        self.health = info['health']
        self.enemy_health = info['enemy_health'] 

        return frame_delta, reward, done, info


    def render(self, *args, **kwargs):
        # We render the game
        self.game.render()

    def close(self):
        # We close the game
        self.game.close()


In [3]:
# Directories where saved optimization models are going to be saved

LOG_DIR = './logs/'  # SB3 has the ability to log out to a support log
OPT_DIR = './opt/'  # Location to save every single model after every try

# Model definition
# env.close()
env = StreetFighter()
env = Monitor(env, LOG_DIR)
env = DummyVecEnv([lambda: env])
env = VecFrameStack(env, 4, channels_order='last')

In [4]:
model = PPO.load('./opt/trial_0_colab-conda.zip')

# mean_reward, _ = evaluate_policy(model, env, render=False, n_eval_episodes=5)
# mean_reward

In [6]:
# Test to see everything working

# Reset game to starting state
obs = env.reset()

# Flag to false
done = False

tick = 0

max_ticks = 10000

for game in range(2):

    # If game is not over.
    while not done: # and tick < max_ticks
        if done:
            # We reset the game
            obs = env.reset()

        tick += 1

        # Render environment
        env.render()

        action = model.predict(obs)[0]
        obs, reward, done, info = env.step(action)

        # # We slow down the renders so they are watchable
        # time.sleep(0.01)

        # # We print the reward
        # if reward != 0:
        #     print(reward)
    # obs = env.reset()

In [7]:
env.close()

In [2]:
env2 = retro.make(
    game='StreetFighterIISpecialChampionEdition-Genesis', 
    use_restricted_actions=retro.Actions.FILTERED
)

In [4]:
# Reset game to starting state
obs = env2.reset()

# Flag to false
done = False

# We only play one game
for game in range(1):

    # If game is not over.
    while not done:
        if done:
            # We reset the game
            obs = env2.reset()

        # Render environment
        env2.render()

        # We take random actions inside the environment
        obs, reward, done, info = env2.step(env2.action_space.sample())

        # We slow down the renders so they are watchable
        # time.sleep(0.01)

        # We print the reward
        print(reward)

0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0


In [11]:
env2.close()