In [1]:
import retro 
# Import environment base class for the wrapper
from gym import Env
# import the space shapes for the environment
from gym.spaces import MultiBinary, Box
# import numpy for calculations of frame delta
import numpy as np 
#import open cv for grayscaling 
import cv2
# import matplotlib - image plotting 
from matplotlib import pyplot as plt
import time

In [2]:
# create custom environment 
class gameEnviron(Env):
    def __init__(self):
        super().__init__()
        self.observation_space = Box(low=0, high=255, shape=(84,84,1), dtype=np.uint8)
        self.action_space = MultiBinary(12)
        # start game retro.Actions.FILTERED filters it to only valid button combinations for input
        self.game = retro.make(game ='AddamsFamily-Snes', 
                               use_restricted_actions = retro.Actions.FILTERED)
    
    def step(self, action):
        obs, reward, done, info = self.game.step(action)
        obs = self.preprocess(obs)
        # frame delta 
        frame_delta = obs - self.previous_frame
        self.previous_frame = obs 
        # reshape the reward function 
        reward = info['score'] - self.score
        self.score = info['score']

        return frame_delta, reward, done, info
    
    def reset(self):
        # return first frame
        obs = self.game.reset()
        obs = self.preprocess(obs)
        # pre-processing ToDo
        self.previous_frame = obs
        # create score delta variable 
        self.score = 0
        return obs
    
    def preprocess(self, observation):
        gray = cv2.cvtColor(observation, cv2.COLOR_BGR2GRAY)
        # resize 
        resize = cv2.resize(gray, (84,84), cv2.INTER_CUBIC)
        resize = np.reshape(resize,(84,84,1))
        return resize

    def render(self, *args, **kwargs):
        self.game.render()

    def close(self): 
        self.game.close()

# Testing

In [None]:
env = gameEnviron()
# rest to starting state 
obs = env.reset()
# set flag to false 
done = False 
# play game once 
for game in range(1): 
    # play until not dead
    while not done: 
        if done:
            obs = env.reset()
        env.render()
        obs, reward, done, info = env.step(env.action_space.sample())
        plt.imshow(cv2.cvtColor(obs, cv2.COLOR_BGR2RGBA)) # show changes in frames 
        time.sleep(0.01)
        if reward > 0:
            print(obs)
            print(reward)
            print(info)

In [None]:
env.close()

# Testing End

In [3]:
import optuna
# eval policy for metric calculation
from stable_baselines3 import PPO
from stable_baselines3.common.evaluation import evaluate_policy
# import the sb3 monitor for logging 
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.vec_env import DummyVecEnv, VecFrameStack
import os 

  from .autonotebook import tqdm as notebook_tqdm


In [4]:
LOG_DIR = './logs2/'
OPT_DIR = './opt2/'

In [5]:
#Function to return test hyperparameters - define the object function 
def optimize_ppo(trial):
    return{
        'n_steps': trial.suggest_int('n_steps', 2048, 8192),
        'gamma': trial.suggest_float('gamma', 0.8, 0.9999, log=True),
        'learning_rate': trial.suggest_float('learning_rate', 1e-5, 1e-4, log=True),
        'clip_range': trial.suggest_float('clip_range', 0.1, 0.4),
        'gae_lambda': trial.suggest_float('gae_lambda', 0.8, 0.99)
    }

In [6]:
# run a training loop and return a mean reward 
def optimize_agent(trial):
    try:
        model_params = optimize_ppo(trial)

        # create environment 
        env = gameEnviron()
        env = Monitor(env, LOG_DIR)
        env = DummyVecEnv([lambda: env])
        env = VecFrameStack(env, 4, channels_order='last')

        # create algo 
        model = PPO('CnnPolicy', env, tensorboard_log=LOG_DIR, verbose=0, **model_params)
        # 30k is under shooting - just for speed and example - timesteps are frames in game
        model.learn(total_timesteps=500)

        # evaluate model n_eval should be higher, is low for testing 
        mean_reward, _ = evaluate_policy(model, env, n_eval_episodes=25)
        env.close()

        SAVE_PATH = os.path.join(OPT_DIR, 'trial_{}_best_model'.format(trial.number))
        model.save(SAVE_PATH)

        return mean_reward
    except Exception as e: 
        return -1000

In [7]:
study = optuna.create_study(direction='maximize')
# n_trails is 1 for example, 100 is better for actual training 
study.optimize(optimize_agent, n_trials=1, n_jobs=1)

[I 2023-10-03 16:31:44,136] A new study created in memory with name: no-name-eada1401-7ba1-4caf-87eb-3b6e75de9eb7
We recommend using a `batch_size` that is a factor of `n_steps * n_envs`.
Info: (n_steps=6052 and n_envs=1)
[W 2023-10-03 16:48:00,749] Trial 0 failed with parameters: {'n_steps': 6052, 'gamma': 0.8558849113886039, 'learning_rate': 8.949955600726266e-05, 'clip_range': 0.20083568853862244, 'gae_lambda': 0.9008993810474053} because of the following error: KeyboardInterrupt().
Traceback (most recent call last):
  File "e:\School\Independant\.conda\lib\site-packages\optuna\study\_optimize.py", line 200, in _run_trial
    value_or_values = func(trial)
  File "C:\Users\KYLE\AppData\Local\Temp\ipykernel_14240\1561660290.py", line 18, in optimize_agent
    mean_reward, _ = evaluate_policy(model, env, n_eval_episodes=25)
  File "e:\School\Independant\.conda\lib\site-packages\stable_baselines3\common\evaluation.py", line 89, in evaluate_policy
    observations, rewards, dones, infos 

KeyboardInterrupt: 

In [None]:
study = PPO.load(os.path.join(OPT_DIR, 'trial_7_best_model.zip'))

In [None]:
from stable_baselines3.common.callbacks import BaseCallback

In [None]:
class TrainAndLoggingCallback(BaseCallback): 

    def __init__(self, check_freq, save_path, verbose=1):
        super(TrainAndLoggingCallback, self).__init__(verbose)
        self.check_freq = check_freq 
        self.save_path = save_path 

    def _init_callback(self):
        if self.save_path is not None: 
            os.makedirs(self.save_path, exist_ok=True)

    def _on_step(self):
        if self.n_calls % self.check_freq == 0: 
            model_path = os.path.join(self.save_path, 'best_model_{}'.format(self.n_calls))
            self.model.save(model_path)
        return True

In [None]:
CHECKPOINT_DIR = './train2/'
callback = TrainAndLoggingCallback(check_freq=10000, save_path=CHECKPOINT_DIR)

In [None]:
# Create environment 
env = gameEnviron()
env = Monitor(env, LOG_DIR)
env = DummyVecEnv([lambda: env])
env = VecFrameStack(env, 4, channels_order='last')

In [None]:
# get this from study.bestparams 
model_params = {
    'n_steps': 7516,
    'gamma': 0.9085173842732223,
    'learning_rate': 5.02771591344835e-05,
    'clip_range': 0.39105070719865653
}
# make sure to overide the params step values in the params to the nearest num divisible by 64
model_params['n_steps'] = 7488

In [None]:
model = PPO('CnnPolicy', env, tensorboard_log=LOG_DIR, verbose=0, **model_params)
model.load(os.path.join(OPT_DIR, 'trial_7_best_model.zip'))
model.learn(total_timesteps=30000, callback=callback)
# again 30k is pretty big undershoot 
# if poor perfomance maybe put model_params['learning_rate'] = 5e-7

In [None]:
model = PPO.load('./train/best_model_30000')
mean_reward, _ = evaluate_policy(model, env, render=True, n_eval_episodes=5)

In [None]:
obs = env.reset()
obs.shape

In [None]:
# rest to starting state 
obs = env.reset()
# set flag to false 
done = False 
# play game once 
for game in range(1): 
    # play until not dead
    while not done: 
        if done:
            obs = env.reset()
        env.render()
        action = model.predict(obs)[0] # get an action prediction from model
        obs, reward, done, info = env.step(action) # pass into the game
        time.sleep(0.01)
        if reward > 0:
            print(reward)