In [1]:
import gym_super_mario_bros
from gym_super_mario_bros.actions import SIMPLE_MOVEMENT
from nes_py.wrappers import JoypadSpace
from gym.wrappers import FrameStack, GrayScaleObservation
from stable_baselines3.common.vec_env import VecFrameStack, DummyVecEnv, VecTransposeImage
from matplotlib import pyplot as plt
import time

class CustomJoypadSpace(JoypadSpace):
    def reset(self, **kwargs):
        kwargs.pop('seed', None)
        kwargs.pop('options', None)
        return super().reset(**kwargs)

    def step(self, action):
        observation, reward, done, info = super().step(action)
        return observation, reward, done, info

class CustomDummyVecEnv(DummyVecEnv):
    def reset(self):
        for env_idx in range(self.num_envs):
            obs = self.envs[env_idx].reset()
            self._save_obs(env_idx, obs)
        return self._obs_from_buf()

    def step_wait(self):
        results = [self.envs[env_idx].step(self.actions[env_idx]) for env_idx in range(self.num_envs)]
        for env_idx, (obs, reward, done, info) in enumerate(results):
            self.buf_rews[env_idx] = reward
            self.buf_dones[env_idx] = done
            self.buf_infos[env_idx] = info
            if done:
                obs = self.envs[env_idx].reset()
            self._save_obs(env_idx, obs)
        return self._obs_from_buf(), self.buf_rews, self.buf_dones, self.buf_infos

In [None]:
env = gym_super_mario_bros.make("SuperMarioBros-v0")
env = CustomJoypadSpace(env, SIMPLE_MOVEMENT)

env.render_mode = 'human'

In [32]:
done = True

for step in range(1000): 
    if done: 
        env.reset()
    state, reward, done, info = env.step(env.action_space.sample())
    env.render()
env.close()

In [None]:
env = GrayScaleObservation(env, keep_dim=True)

env = CustomDummyVecEnv([lambda: env])

env = VecFrameStack(env, 4, channels_order='last')

env = VecTransposeImage(env)

In [4]:
import os

from stable_baselines3 import PPO

from stable_baselines3.common.callbacks import BaseCallback

In [5]:
class TrainAndLoggingCallback(BaseCallback):

    def __init__(self, check_freq, save_path, verbose=1):
        super(TrainAndLoggingCallback, self).__init__(verbose)
        self.check_freq = check_freq
        self.save_path = save_path

    def _init_callback(self):
        if self.save_path is not None:
            os.makedirs(self.save_path, exist_ok=True)

    def _on_step(self):
        if self.n_calls % self.check_freq == 0:
            model_path = os.path.join(self.save_path, 'best_model_{}'.format(self.n_calls))
            self.model.save(model_path)

        return True

In [6]:
CHECKPOINT_DIR = './train/'
log_dir ='./logs/'

In [7]:
callback = TrainAndLoggingCallback(check_freq=50000, save_path=CHECKPOINT_DIR)

In [None]:
model = PPO("CnnPolicy", env, verbose=1, tensorboard_log=log_dir, learning_rate=0.000001, n_steps=512, device="cuda")

In [9]:
def callback(locals_, globals_):
    return locals_, globals_, True, None

In [None]:
model.learn(total_timesteps=1000000, callback=callback)

In [11]:
model.save('thisisatestmodel')

In [12]:
model = PPO.load('C:/Users/ACER/Desktop/RL_in_gaming/thisisatestmodel')

In [None]:
state = env.reset()

while True:
    action, _ = model.predict(state)
    state, reward, done, info = env.step(action)
    env.render()
    time.sleep(0.01)
    if done:
        state = env.reset()