In [1]:
# Import retro to play Street Fighter using a ROM
import retro
# Import time to slow down game
import time

In [2]:
# %%python -m retro.import . # Run this from the roms folder, or where you have your game roms 

In [3]:
# Import environment base class for a wrapper 
from gym import Env 
# Import the space shapes for the environment
from gym.spaces import MultiBinary, Box, Discrete
# Import numpy to calculate frame delta 
import numpy as np
# Import opencv for grayscaling
import cv2
# Import matplotlib for plotting the image
from matplotlib import pyplot as plt
# Import deque for the frame stack
from collections import deque

In [4]:
# Claude version 2

class StreetFighter(Env):
    def __init__(self):
        super().__init__()
        self.observation_space = Box(low=0, high=255, shape=(84, 84, 1), dtype=np.uint8)
        self.action_space = MultiBinary(12)
        
        # Initialize state tracking variables
        self.health = 144
        self.enemy_health = 144
        self.score = 0
        self.matches_won = 0
        self.enemy_matches_won = 0
        self.continue_timer = 100
        
        # Combo tracking variables
        self.damage_window = deque(maxlen=30)  # Track damage over 30 frames
        self.score_window = deque(maxlen=30)   # Track score changes over 30 frames
        self.current_combo = 0
        self.frames_since_last_hit = 0
        
        # Anti-spam tracking
        self.action_history = deque(maxlen=60)  # Track last 60 frames of actions
        self.last_hit_frame = 0                 # Track when we last dealt damage
        self.whiff_counter = 0                  # Count actions without score/health changes
        
        # Constants for reward shaping
        self.HEALTH_SCALE = 10.0
        self.ROUND_WIN_BONUS = 100.0
        self.MATCH_WIN_BONUS = 500.0
        self.DAMAGE_TRADE_SCALE = 1.5
        self.COMBO_SCALE = 2.0
        self.MAX_COMBO_BONUS = 5.0
        self.COMBO_TIMEOUT = 30
        
        # Spam prevention constants
        self.WHIFF_PENALTY = -2.0           # Penalty for missing attacks
        self.SPAM_THRESHOLD = 0.8           # Percentage of similar actions that triggers spam penalty
        self.SPAM_PENALTY = -5.0            # Penalty for move spamming
        
        self.game = retro.make(
            game='StreetFighterIISpecialChampionEdition-Genesis',
            use_restricted_actions=retro.Actions.FILTERED
        )
        
        # Initialize last state
        self.last_state = {
            'enemy_health': 144,
            'health': 144,
            'score': 0,
            'matches_won': 0,
            'enemy_matches_won': 0,
            'continuetimer': 100
        }

    def calculate_action_diversity(self):
        """
        Calculate how diverse the recent actions have been
        Returns a penalty if actions are too repetitive
        """
        if len(self.action_history) < 30:
            return 0
        
        # Convert binary actions to move types for easier analysis
        recent_moves = list(self.action_history)
        move_counts = {}
        
        for move in recent_moves:
            move_str = ''.join(map(str, move))
            move_counts[move_str] = move_counts.get(move_str, 0) + 1
        
        # Calculate the ratio of the most common move
        most_common_ratio = max(move_counts.values()) / len(recent_moves)
        
        # Apply penalty if the same move is being spammed
        if most_common_ratio > self.SPAM_THRESHOLD:
            return self.SPAM_PENALTY
        return 0

    def detect_combo(self, enemy_health_diff, score_diff):
        """
        Detect combos based on damage and score changes
        """
        # Update tracking windows
        self.damage_window.append(enemy_health_diff)
        self.score_window.append(score_diff)
        
        # If we dealt damage or got points this frame
        if enemy_health_diff > 0 or score_diff > 0:
            self.whiff_counter = 0  # Reset whiff counter on successful hit
            if self.frames_since_last_hit < self.COMBO_TIMEOUT:
                self.current_combo += 1
            else:
                self.current_combo = 1
            self.frames_since_last_hit = 0
            self.last_hit_frame = 0
        else:
            self.frames_since_last_hit += 1
            
            # If we're executing moves but not getting results
            if any(self.action_history[-1] if self.action_history else [0]):
                self.whiff_counter += 1
        
        # Reset combo if too much time has passed
        if self.frames_since_last_hit >= self.COMBO_TIMEOUT:
            self.current_combo = 0
            
        # Calculate combo multiplier
        combo_multiplier = min(1.0 + (self.current_combo * 0.5), self.MAX_COMBO_BONUS)
        
        # Detect if this seems to be a "true" combo
        recent_damage = sum(self.damage_window)
        recent_score = sum(self.score_window)
        
        is_true_combo = (
            self.current_combo > 1 and 
            (recent_damage > 10 or recent_score > 100)
        )
        
        return is_true_combo, combo_multiplier

    def reward_function(self, state):
        reward = 0
        
        # Extract current state
        enemy_health = state['enemy_health']
        health = state['health']
        score = state['score']
        matches_won = state['matches_won']
        enemy_matches_won = state['enemy_matches_won']
        
        # Calculate changes
        score_diff = score - self.last_state['score']
        enemy_health_diff = self.last_state['enemy_health'] - enemy_health
        health_diff = self.last_state['health'] - health
        
        # Check if round is active (either player has health)
        is_round_active = (enemy_health > 0 or health > 0)
        
        if is_round_active:
            # Detect combo state
            is_combo, combo_multiplier = self.detect_combo(enemy_health_diff, score_diff)
            
            # Reward for dealing damage, with combo scaling
            if enemy_health_diff > 0:
                base_damage_reward = enemy_health_diff * self.HEALTH_SCALE
                if is_combo:
                    reward += base_damage_reward * combo_multiplier
                    reward += self.COMBO_SCALE * self.current_combo
                else:
                    reward += base_damage_reward
                
                # Extra reward for trading damage favorably
                if health_diff > 0:
                    if enemy_health_diff > health_diff:
                        reward += (enemy_health_diff - health_diff) * self.DAMAGE_TRADE_SCALE
            
            # Apply anti-spam mechanics
            spam_penalty = self.calculate_action_diversity()
            whiff_penalty = self.WHIFF_PENALTY * min(self.whiff_counter, 5) if self.whiff_counter > 2 else 0
            
            reward += spam_penalty
            reward += whiff_penalty
            
            # Penalty for taking damage
            if health_diff > 0:
                reward -= health_diff * self.HEALTH_SCALE
                self.current_combo = 0
                self.frames_since_last_hit = self.COMBO_TIMEOUT
            
            # Small reward for score increases
            if score_diff > 0:
                if is_combo:
                    reward += score_diff * 0.2
                else:
                    reward += score_diff * 0.1
        
        # Round end rewards/penalties
        if self.last_state['enemy_health'] > 0 and enemy_health == 0:  # Won the round
            reward += self.ROUND_WIN_BONUS
            if matches_won > self.last_state['matches_won']:  # Won the match
                reward += self.MATCH_WIN_BONUS
        
        if self.last_state['health'] > 0 and health == 0:  # Lost the round
            reward -= self.ROUND_WIN_BONUS / 2
            if enemy_matches_won > self.last_state['enemy_matches_won']:  # Lost the match
                reward -= self.MATCH_WIN_BONUS / 2
        
        # Update last state
        self.last_state = {
            'enemy_health': enemy_health,
            'health': health,
            'score': score,
            'matches_won': matches_won,
            'enemy_matches_won': enemy_matches_won,
            'continuetimer': state['continuetimer']
        }
        
        return reward

    def step(self, action):
        obs, _, done, info = self.game.step(action)
        self.action_history.append(action)  # Track action for spam detection
        obs = self.preprocess(obs)
        frame_delta = obs - self.previous_frame
        self.previous_frame = obs
        
        reward = self.reward_function(info)
        return frame_delta, reward, done, info

    def reset(self):
        obs = self.game.reset()
        obs = self.preprocess(obs)
        self.previous_frame = obs
        
        # Reset state variables
        self.last_state = {
            'enemy_health': 144,
            'health': 144,
            'score': 0,
            'matches_won': 0,
            'enemy_matches_won': 0,
            'continuetimer': 100
        }
        
        # Reset tracking variables
        self.damage_window.clear()
        self.score_window.clear()
        self.action_history.clear()
        self.current_combo = 0
        self.frames_since_last_hit = self.COMBO_TIMEOUT
        self.whiff_counter = 0
        
        return obs

    def preprocess(self, observation):
        gray = cv2.cvtColor(observation, cv2.COLOR_BGR2GRAY)
        resize = cv2.resize(gray, (84, 84), interpolation=cv2.INTER_CUBIC)
        channels = np.reshape(resize, (84, 84, 1))
        return channels

    def render(self, *args, **kwargs):
        self.game.render()

    def close(self):
        self.game.close()

In [12]:
# Importing the optimzation frame - HPO
import optuna
# PPO algo for RL
from stable_baselines3 import PPO, A2C, DQN
# Bring in the eval policy method for metric calculation
from stable_baselines3.common.evaluation import evaluate_policy
# Import the sb3 monitor for logging 
from stable_baselines3.common.monitor import Monitor
# Import the vec wrappers to vectorize and frame stack
from stable_baselines3.common.vec_env import DummyVecEnv, VecFrameStack
# Import os to deal with filepaths
import os

In [7]:
LOG_DIR = './logs/'
OPT_DIR = './opt/'

In [8]:
# Function to return test hyperparameters - define the object function
def optimize_ppo(trial): 
    return {
        'n_steps':trial.suggest_int('n_steps', 2048, 8192), # number of frames used in one batch of training (must use a factor of 64) (maybe take a number and multiply it by 64? 😎🤝😈)
        'gamma':trial.suggest_loguniform('gamma', 0.8, 0.9999), # discount rate
        'learning_rate':trial.suggest_loguniform('learning_rate', 1e-5, 1e-4), # how fast we tune optimizer (Critic and Actor for PPO)
        'clip_range':trial.suggest_uniform('clip_range', 0.1, 0.4), # how far we want to clip for our advantage value in PPO
        'gae_lambda':trial.suggest_uniform('gae_lambda', 0.8, 0.99) # smoothing parameter (used when calculating advantage)
    }

# IF U WANT TO USE OTHER ALGOS THE HYPERPARAMS MUST BE SWITCHED AS WELL (DQN, SAC, etc.)

In [9]:
SAVE_PATH = os.path.join(OPT_DIR, 'trial_{}_best_model'.format(1))

In [14]:
ALGORITHMS = {
    'PPO': (PPO, optimize_ppo),
    # 'A2C': (A2C, optimize_a2c),
    # 'DQN': (DQN, optimize_dqn),
}

def optimize_agent(trial, algo_name='PPO'):
    try:
        # Select algorithm and get hyperparameters
        ModelClass, optimize_fn = ALGORITHMS[algo_name]
        model_params = optimize_fn(trial)

        # Create environment
        env = StreetFighter()
        env = Monitor(env, LOG_DIR)
        env = DummyVecEnv([lambda: env])
        env = VecFrameStack(env, 4, channels_order='last')

        # Initialize and train model
        model = ModelClass('CnnPolicy', env, tensorboard_log=LOG_DIR, verbose=0, **model_params) # would recommend looking into other cnnpolicy's if they are compatible
        model.learn(total_timesteps=100000)

        # Evaluate model
        mean_reward, _ = evaluate_policy(model, env, n_eval_episodes=10)
        env.close()

        SAVE_PATH = os.path.join(OPT_DIR, 'trial_{}_best_model'.format(trial.number))
        model.save(SAVE_PATH)

        return mean_reward

    except Exception as e:
        return -1000

In [15]:
study = optuna.create_study(direction='maximize') # since mean reward is positive we maximize, otherwise minimize
study.optimize(lambda trial: optimize_agent(trial, algo_name='PPO'), n_trials=5) # for prod used n_trials=100 (change algo name to change algos)

[I 2024-11-13 00:44:46,421] A new study created in memory with name: no-name-b35fe3fa-c665-4884-9439-19ca9f1b3b72
  'gamma':trial.suggest_loguniform('gamma', 0.8, 0.9999), # discount rate
  'learning_rate':trial.suggest_loguniform('learning_rate', 1e-5, 1e-4), # how fast we tune optimizer (Critic and Actor for PPO)
  'clip_range':trial.suggest_uniform('clip_range', 0.1, 0.4), # how far we want to clip for our advantage value in PPO
  'gae_lambda':trial.suggest_uniform('gae_lambda', 0.8, 0.99) # smoothing parameter (used when calculating advantage)
We recommend using a `batch_size` that is a factor of `n_steps * n_envs`.
Info: (n_steps=3560 and n_envs=1)
[I 2024-11-13 00:58:58,372] Trial 0 finished with value: -105164.0 and parameters: {'n_steps': 3560, 'gamma': 0.9041179200591378, 'learning_rate': 6.15978899282089e-05, 'clip_range': 0.19876558678820333, 'gae_lambda': 0.9758469364237689}. Best is trial 0 with value: -105164.0.
  'gamma':trial.suggest_loguniform('gamma', 0.8, 0.9999), # 

In [16]:
model = PPO.load(os.path.join(OPT_DIR, 'trial_2_best_model.zip'))

  th_object = th.load(file_content, map_location=device)


In [18]:
# env.close()

NameError: name 'env' is not defined

In [17]:
# Import base callback 
from stable_baselines3.common.callbacks import BaseCallback

In [18]:
class TrainAndLoggingCallback(BaseCallback): # continuously learn by starting from best parameters done above

    def __init__(self, check_freq, save_path, verbose=1):
        super(TrainAndLoggingCallback, self).__init__(verbose)
        self.check_freq = check_freq
        self.save_path = save_path

    def _init_callback(self):
        if self.save_path is not None:
            os.makedirs(self.save_path, exist_ok=True)

    def _on_step(self):
        if self.n_calls % self.check_freq == 0:
            model_path = os.path.join(self.save_path, 'best_model_{}'.format(self.n_calls))
            self.model.save(model_path)

        return True

In [19]:
CHECKPOINT_DIR = './train/'

In [20]:
callback = TrainAndLoggingCallback(check_freq=10000, save_path=CHECKPOINT_DIR)

# Train Model

In [42]:
# Create environment 
env = StreetFighter()
env = Monitor(env, LOG_DIR)
env = DummyVecEnv([lambda: env])
env = VecFrameStack(env, 4, channels_order='last')

In [22]:
model_params = study.best_params
model_params['n_steps'] = 7488  # set n_steps to 7488 or a factor of 64
# model_params['learning_rate'] = 5e-7 -> if really slow at training
model_params

{'n_steps': 7488,
 'gamma': 0.8482634234145207,
 'learning_rate': 7.056142209381235e-05,
 'clip_range': 0.10576391391756874,
 'gae_lambda': 0.8043934599923289}

In [23]:
model = PPO('CnnPolicy', env, tensorboard_log=LOG_DIR, verbose=1, **model_params) # verbose 1 shows results as training

Using cpu device
Wrapping the env in a VecTransposeImage.


In [24]:
# Reload previous weights from HPO
model.load(os.path.join(OPT_DIR, 'trial_2_best_model.zip'))

<stable_baselines3.ppo.ppo.PPO at 0x2728a67e430>

In [26]:
# Kick off training 
model.learn(total_timesteps=500000, callback=callback) # timestep 5000000 recommended


Logging to ./logs/PPO_1
----------------------------------
| rollout/           |           |
|    ep_len_mean     | 5.29e+03  |
|    ep_rew_mean     | -5.51e+04 |
| time/              |           |
|    fps             | 420       |
|    iterations      | 1         |
|    time_elapsed    | 17        |
|    total_timesteps | 7488      |
----------------------------------
---------------------------------------
| rollout/                |           |
|    ep_len_mean          | 4.91e+03  |
|    ep_rew_mean          | -5.18e+04 |
| time/                   |           |
|    fps                  | 217       |
|    iterations           | 2         |
|    time_elapsed         | 68        |
|    total_timesteps      | 14976     |
| train/                  |           |
|    approx_kl            | 0.2107653 |
|    clip_fraction        | 0.4       |
|    clip_range           | 0.106     |
|    entropy_loss         | -0.891    |
|    explained_variance   | 0.447     |
|    learning_rate        

<stable_baselines3.ppo.ppo.PPO at 0x272e1dfe0a0>

In [22]:
# tensorboard --logdir=. 
# cd to logs
# ^ use to visually see learning progress

# Evaluate Model

In [43]:
model = PPO.load('./train/best_model_570000.zip')

In [38]:
# mean_reward, _ = evaluate_policy(model, env, render=True, n_eval_episodes=1)

# Testing Model

In [39]:
obs = env.reset()

In [44]:
# Reset game to starting state
obs = env.reset()
# Set flag to flase
done = False
for game in range(1): 
    while not done: 
        if done: 
            obs = env.reset()
        env.render()
        action = model.predict(obs)[0]
        obs, reward, done, info = env.step(np.array(action))
        # time.sleep(0.01)
        print(reward)

ArgumentError: argument 1: <class 'OverflowError'>: int too long to convert

In [41]:
env.close()