In [None]:
!pip install gym gym-retro
!pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118
!pip install matplotlib

# Setup basic enviroment

Sample game

In [None]:
import time
# Reset game to starting state
env = StreetFighterEnv(style='balanced')
obs = env.reset()

done = False
for game in range(1):
    while not done:
        if done:
            obs=env.reset()
        env.render()
        obs, reward, done, info = env.step(env.action_space.sample())
        time.sleep(0.01)
        if reward  > 0:
            print("Reward:", reward)

## Setup custom enviroment

In [None]:
from gym import Env
from gym.spaces import MultiBinary, Box
import numpy as np
import cv2
from matplotlib import pyplot as plt

In [1]:
import retro
import numpy as np
import cv2
from gym import Env
from gym.spaces import Box, MultiBinary
import math

class StreetFighterEnv(Env):
    def __init__(self, style='balanced'):
        super().__init__()
        self.style = style
        self.observation_space = Box(low=0, high=255, shape=(84, 84, 1), dtype=np.uint8)
        self.action_space = MultiBinary(12)
        self.game = retro.make(game='StreetFighterIISpecialChampionEdition-Genesis', use_restricted_actions=retro.Actions.FILTERED)
        self.previous_frame = None
        self.score = 0
        self.previous_health = None  # Valor inicial de salud
        self.opponent_previous_health = None
        self.last_hit_time = None

    def reset(self):
        obs = self.game.reset()
        obs = self.preprocess(obs)
        self.previous_frame = obs
        self.score = 0
        self.previous_health = 176
        self.opponent_previous_health = 176
        self.last_hit_time = 39208  # Inicializamos al valor máximo del round timer
        return obs

    def preprocess(self, observation):
        gray = cv2.cvtColor(observation, cv2.COLOR_BGR2GRAY)
        resize = cv2.resize(gray, (84, 84), interpolation=cv2.INTER_CUBIC)
        channels = np.reshape(resize, (84, 84, 1))
        return channels

    def step(self, action):
        obs, reward, done, info = self.game.step(action)
        obs = self.preprocess(obs)
        frame_delta = obs - self.previous_frame
        self.previous_frame = obs

        current_health = info.get('health', self.previous_health)
        opponent_current_health = info.get('enemy_health', self.opponent_previous_health)

        # Penalizaciones y recompensas según el estilo
        distance_x = info['enemy_x_position'] - info['x_position']
        distance_y = info['enemy_y_position'] - info['y_position']
        distance = math.sqrt(distance_x**2 + distance_y**2)
        round_timer = info['round_timer']

        # Verificamos si se hizo daño al oponente
        if opponent_current_health < self.opponent_previous_health:
            self.last_hit_time = round_timer

        time_since_last_hit = (39208 - self.last_hit_time) / 1000  # Convertimos a segundos

        if self.style == 'defensive':
            # Penalización por baja salud
            health_penalty = -0.5 * (176 - current_health)
            reward += health_penalty
            
        elif self.style == 'balanced':
            # Recompensa por mantener una distancia óptima del enemigo
            optimal_distance = 100  # Supongamos que la distancia óptima es 100
            distance_reward = -0.01 * abs(distance - optimal_distance)
            reward += distance_reward
            
            # Penalización por tiempo sin pegar un golpe que hizo daño
            time_penalty = -0.005 * time_since_last_hit
            reward += time_penalty
            
        elif self.style == 'aggressive':
            # Penalización por tiempo sin atacar
            time_penalty = -0.05 * time_since_last_hit
            reward += time_penalty
            
            # Recompensa por estar cerca del enemigo
            distance_reward = -0.1 * distance
            reward += distance_reward
            
            # Recompensa por reducir la salud del enemigo
            enemy_health_reward = 0.5 * (176 - opponent_current_health)
            reward += enemy_health_reward
            
            # Penalización por baja salud
            health_penalty = -0.2 * (176 - current_health)
            reward += health_penalty

        self.previous_health = current_health
        self.opponent_previous_health = opponent_current_health
        return frame_delta, reward, done, info

    def render(self, *args, **kwargs):
        self.game.render()

    def close(self):
        self.game.close()




In [2]:
try:
    env.close()
except:
    print("Environment closed")

Environment closed


In [2]:
import optuna 
from stable_baselines3 import PPO
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.vec_env import DummyVecEnv, VecFrameStack
from stable_baselines3.common.callbacks import CheckpointCallback
import os 

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
LOG_DIR = './logs/'
OPT_DIR = './opt/'
os.makedirs(LOG_DIR, exist_ok=True)
os.makedirs(OPT_DIR, exist_ok=True)

In [4]:
def optimize_ppo(trial): 
    return {
        'n_steps':trial.suggest_int('n_steps', 2048, 8192),
        'gamma':trial.suggest_loguniform('gamma', 0.8, 0.9999),
        'learning_rate':trial.suggest_loguniform('learning_rate', 1e-5, 1e-4),
        'clip_range':trial.suggest_uniform('clip_range', 0.1, 0.4),
        'gae_lambda':trial.suggest_uniform('gae_lambda', 0.8, 0.99)
    }

In [5]:

def optimize_agent(trial):
    try:
        model_params = optimize_ppo(trial) 

        # Create environment 
        env = StreetFighterEnv(style="balanced")
        env = Monitor(env, LOG_DIR)
        env = DummyVecEnv([lambda: env])
        env = VecFrameStack(env, 4, channels_order='last')

        # Create algo 
        model = PPO('CnnPolicy', env, tensorboard_log=LOG_DIR, verbose=0, **model_params)
        #model.learn(total_timesteps=300)
        model.learn(total_timesteps=100000)

        # Evaluate model 
        mean_reward, _ = evaluate_policy(model, env, n_eval_episodes=5)
        env.close()

        SAVE_PATH = os.path.join(OPT_DIR, 'trial_{}_best_model'.format(trial.number))
        model.save(SAVE_PATH)

        return mean_reward

    except Exception as e:
        print(e)
        return -1000

## Crear estudio


In [6]:
study_name = 'ppo_street_fighter'  # Nombre del estudio
storage_name = f'sqlite:///{OPT_DIR}/{study_name}.db'
study = optuna.create_study(study_name=study_name, storage=storage_name, load_if_exists=True, direction='maximize')
study.optimize(optimize_agent, n_trials=100)

[I 2024-06-26 20:09:59,043] Using an existing study with name 'ppo_street_fighter' instead of creating a new one.
  'gamma':trial.suggest_loguniform('gamma', 0.8, 0.9999),
  'learning_rate':trial.suggest_loguniform('learning_rate', 1e-5, 1e-4),
  'clip_range':trial.suggest_uniform('clip_range', 0.1, 0.4),
  'gae_lambda':trial.suggest_uniform('gae_lambda', 0.8, 0.99)
We recommend using a `batch_size` that is a factor of `n_steps * n_envs`.
Info: (n_steps=3470 and n_envs=1)
[W 2024-06-26 20:11:18,830] Trial 16 failed with parameters: {'n_steps': 3470, 'gamma': 0.8029436424143827, 'learning_rate': 2.9743883779635303e-05, 'clip_range': 0.17126243763028792, 'gae_lambda': 0.8833886681534371} because of the following error: KeyboardInterrupt().
Traceback (most recent call last):
  File "c:\Users\nicol\OneDrive\Documentos\Aprendizaje\StreetFighter2Reinforced\.conda\lib\site-packages\optuna\study\_optimize.py", line 196, in _run_trial
    value_or_values = func(trial)
  File "C:\Users\nicol\App

KeyboardInterrupt: 

# Guardar el mejor modelo

In [None]:
# Guardar el mejor modelo
model_params = study.best_params
model_params['n_steps'] = 7488  # Ajustar n_steps a un factor de 64
model = PPO('CnnPolicy', env, tensorboard_log=LOG_DIR, verbose=1, **model_params)
model.save(os.path.join(OPT_DIR, 'best_model'))

In [None]:
# Configurar callback para guardar checkpoints
checkpoint_callback = CheckpointCallback(save_freq=1000, save_path=OPT_DIR, name_prefix='rl_model')

# Entrenamiento del modelo
model.learn(total_timesteps=1000000, callback=checkpoint_callback)


In [None]:
# Guardar el modelo final
model.save(os.path.join(OPT_DIR, 'final_model'))

In [None]:

# Para reanudar el entrenamiento
model = PPO.load(os.path.join(OPT_DIR, 'final_model'))
model.set_env(env)
model.learn(total_timesteps=1000000, callback=checkpoint_callback)

In [None]:
from stable_baselines3.common.callbacks import BaseCallback

In [None]:
class TrainAndLoggingCallback(BaseCallback):

    def __init__(self, check_freq, save_path, verbose=1):
        super(TrainAndLoggingCallback, self).__init__(verbose)
        self.check_freq = check_freq
        self.save_path = save_path

    def _init_callback(self):
        if self.save_path is not None:
            os.makedirs(self.save_path, exist_ok=True)

    def _on_step(self):
        if self.n_calls % self.check_freq == 0:
            model_path = os.path.join(self.save_path, 'best_model_{}'.format(self.n_calls))
            self.model.save(model_path)

        return True

In [None]:
CHECKPOINT_DIR = './train/'

In [None]:
callback = TrainAndLoggingCallback(check_freq=10000, save_path=CHECKPOINT_DIR)

In [None]:
# Create environment 
env = StreetFighterEnv(style="balanced")
env = Monitor(env, LOG_DIR)
env = DummyVecEnv([lambda: env])
env = VecFrameStack(env, 4, channels_order='last')

In [None]:
model_params = model.best_params
model_params['n_steps'] = 7488  # set n_steps to 7488 or a factor of 64
# model_params['learning_rate'] = 5e-7
model_params

In [None]:
model = PPO('CnnPolicy', env, tensorboard_log=LOG_DIR, verbose=1, **model_params)

In [None]:
model.load(os.path.join(OPT_DIR, 'trial_6_best_model.zip'))

In [None]:
#model.learn(total_timesteps=1000, callback=callback)
model.learn(total_timesteps=1000000, callback=callback)

In [None]:
model = PPO.load('./opt/trial_6_best_model.zip')
mean_reward, _ = evaluate_policy(model, env, render=True, n_eval_episodes=1)
mean_reward

In [None]:
obs = env.reset()
obs.shape
env.step(model.predict(obs)[0])

In [None]:
import time
# Reset game to starting state
obs = env.reset()
# Set flag to flase
done = False
for game in range(1): 
    while not done: 
        if done: 
            obs = env.reset()
        env.render()
        action = model.predict(obs)[0]
        obs, reward, done, info = env.step(action)
        time.sleep(0.01)
        print(reward)