In [1]:
# %pip install tensorflow
# %pip install gym
# %pip install keras
# %pip install keras-rl2
# %pip3 install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu117

In [1]:
from gym import Env
from gym.spaces import Discrete, Box
from matplotlib import pyplot as plt
from mss import mss
import numpy as np 
import time
import pydirectinput
import pygetwindow
import cv2 as cv2
import win32gui
import gym as gym
from image_analysis.take_screenshot import Screenshot
from input_sending.input_sending import SendInput 
from pattern_recognition.pattern_recog import RecognizePattern 

In [10]:
class StepManiaEnv(Env):
    
    # Setup
    def __init__(self):
        
        super().__init__()
        # Defined action space
        self.action_space = Discrete(5)

        # Observation Array
        self.observation_space = Box(
            low=0, 
            high=255, 
            shape=(1, 135, 100),
            dtype=np.uint8
        )
        
        # Define extraction parameters for the game
        self.screenshot_helper = Screenshot()
        self.input_sending_helper = SendInput()
        self.pattern_recog_helper = RecognizePattern()
        self.capture = mss()
        self.steps = 0
        self.previous_reward = 0
        self.previous_observation = None
        self.window_location = {'top': 35, 'left': 10, 'width': 410, 'height': 230}
        self.game_location = {'top': 15, 'left': 20, 'width': 100, 'height': 185}
        self.score_location = {'top': 215, 'left': 280, 'width': 100, 'height': 25}
        self.done_location = {'top': 0, 'left': 0, 'width': 90, 'height':25}
        self.past_arrows_location = {'top': 45, 'left': 50, 'width': 140, 'height': 2}
        self.cur_held_buttons = {'a': False, 's': False, 'w': False, 'd': False}
        self.action_map = {
            0:'no_op',
            1:'a',
            2:'s',
            3:'w',
            4:'d',
        }
        
        # Adjust window position and size
        win = pygetwindow.getWindowsWithTitle('StepMania')[1]
        win.size = (450, 290)
        win.moveTo(0, 0)

    # One iteration of the environment
    def step(self, action):

        # Manage and send input based on action parameter
        if action != 0:
            if (list(self.cur_held_buttons.values())[action - 1]):
                self.cur_held_buttons[list(self.cur_held_buttons)[action - 1]] = False
                self.input_sending_helper.releaseKey(self.action_map[action])
            else:
                self.cur_held_buttons[list(self.cur_held_buttons)[action - 1]] = True
                self.input_sending_helper.holdKey(self.action_map[action])
        
        # Take screenshot for done, observation and reward functions
        screenshot = np.array(self.capture.grab(self.window_location))[:,:,:-1].astype(np.uint8)
        downscaled_screenshot = self.screenshot_helper.downscaleImageBinary(screenshot, (225, 150), (1, 150, 225))
        self.steps += 1
        
        # Checking if the game is over
        done = self.get_over(screenshot)
        
        # Get the next observation and save it in the environment for the next step
        new_observation = self.get_observation(downscaled_screenshot)
        self.previous_observation = new_observation

        # Use score as reward
        reward = self.get_reward(screenshot, action)
        info = {}

        return new_observation, reward, done, info

    # Quits result screen and selects new song
    def reset(self):

        # Exit to menu, select new song and start
        time.sleep(5)
        pydirectinput.press('enter')
        time.sleep(6)
        pydirectinput.press('d')
        time.sleep(2)
        pydirectinput.press('enter')

        # Edge Case - 'Roulette' is selected
        time.sleep(1.5)
        pydirectinput.press('enter')
        time.sleep(3)
        pydirectinput.press('enter')

        # Reset variables
        self.previous_reward = 0
        self.previous_observation = None
        self.steps = 0
        self.cur_held_buttons = {'a': False, 's': False, 'w': False, 'd': False}

        # Take screenshot to pass to observation
        screenshot = np.array(self.capture.grab(self.window_location))[:,:,:-1].astype(np.uint8)
        downscaled_screenshot = self.screenshot_helper.downscaleImageBinary(screenshot, (225, 150), (1, 150, 225))
        
        return self.get_observation(downscaled_screenshot)

    # Returns image of gameplay
    def get_observation(self, img):

        # Crop gameplay part of the window screenshot
        obs = img[:, self.game_location['top']:(self.game_location['top'] + self.game_location['height']), self.game_location['left']:(self.game_location['left'] + self.game_location['width'])]

        # Create an image containing the differences between this and the previous observation
        # If no previous observation exists, return the initial observation
        if (self.previous_observation is None):
            return obs
        else:
            diff = cv2.absdiff(self.previous_observation, obs)
            return diff
            

    # Returns the current score as a reward
    def get_reward(self, img, action):

        # Crop score part of the window screenshot and top of the gameplay section
        score_img = img[self.score_location['top']:(self.score_location['top'] + self.score_location['height']), self.score_location['left']:(self.score_location['left'] + self.score_location['width'])]
        past_arrows_img = img[env.past_arrows_location['top']:(env.past_arrows_location['top'] + env.past_arrows_location['height']), env.past_arrows_location['left']:(env.past_arrows_location['left'] + env.past_arrows_location['width'])]

        # If no input should have occured, give negative reward
        if (self.pattern_recog_helper.input_expected(past_arrows_img, action) == False):
            return -1

        # If the score increased, give positive reward
        new_reward = self.pattern_recog_helper.analyze_score(score_img)
        if (new_reward > self.previous_reward):
            # Set the current reward as the previous reward for the next iteration
            self.previous_reward = new_reward
            return 5
            
        # If the score didn't change, and no action has taken place, give a neutral reward
        else:
            return 0

    # Checks if the game is over
    def get_over(self, img):

        # Crop done part of the window screenshot
        obs = img[self.done_location['top']:(self.done_location['top'] + self.done_location['height']), self.done_location['left']:(self.done_location['left'] + self.done_location['width'])]
        
        return self.pattern_recog_helper.analyze_results(obs)


In [None]:
img = np.array(mss().grab(env.window_location))
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
image1 = cv2.resize(gray, (255, 150))[env.game_location['top']:(env.game_location['top'] + env.game_location['height']), env.game_location['left']:(env.game_location['left'] + env.game_location['width'])]
cv2.imwrite('grayscale.png', image1)
black_white = cv2.threshold(image1, 60, 255, cv2.THRESH_BINARY)
cv2.imwrite('blackwhite.png', black_white[1])


img2 = np.array(mss().grab(env.window_location))
gray2 = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY)
image2 = cv2.resize(gray2, (255, 150))[env.game_location['top']:(env.game_location['top'] + env.game_location['height']), env.game_location['left']:(env.game_location['left'] + env.game_location['width'])]
black_white2 = cv2.threshold(image2, 60, 255, cv2.THRESH_BINARY)
diff = cv2.absdiff(gray, gray2)
cv2.imwrite('diff.png', diff)

plt.imshow(diff)

In [24]:
space = gym.spaces.MultiDiscrete([2] * 4)

print(space.sample())

[2 2 0 1]


In [11]:
env = StepManiaEnv()

In [None]:
game_location = {'top': 15, 'left': 20, 'width': 100, 'height': 185}
screenshot = np.array(mss().grab(game_location))[:,:,:-1].astype(np.uint8)

plt.imshow(env.screenshot_helper.downscaleImageBinary(screenshot, (225, 150), (1, 150, 225)))
# downscaled_screenshot = self.screenshot_helper.downscaleImage(screenshot, (225, 150), (1, 150, 225))
# gray = cv2.cvtColor(screenshot, cv2.COLOR_BGR2GRAY)
# resized = cv2.resize(gray, (225, 150))
# channel = np.reshape(resized, (1, 150, 225))

In [None]:
print('Environment started - Using random inputs')
time.sleep(2)
for episode in range(10):
  done = False
  start = time.perf_counter()
  total_reward = 0
  while not done:
    obs, reward, done, info = env.step(env.action_space.sample())
    total_reward += reward

  stop = time.perf_counter()
  final_time = stop - start
  print(f'Total Reward for episode {episode} is {total_reward}')
  print(f'Total steps during this episode: {env.steps}')
  print(f'Total duration of this episode is {final_time:0.4f} seconds')
  print(f'This equals an average of {env.steps / final_time} steps per second')
  env.reset()

In [12]:
import os
from stable_baselines3.common.callbacks import BaseCallback
from stable_baselines3.common import env_checker

In [8]:
# Check if the environment is valid
env_checker.check_env(env)

In [13]:
# Define callback function, that is called after every step
# This is used to save the model in regular intervals
class TrainAndLoggingCallback(BaseCallback):
  def __init__(self, checking_freq, save_path, verbose=1):
    super(TrainAndLoggingCallback, self).__init__(verbose)
    self.checking_freq = checking_freq
    self.save_path = save_path

  def _init_callback(self):
    if self.save_path is not None:
      os.makedirs(self.save_path, exist_ok=True)

  def _on_step(self):
    if (self.num_timesteps % 500 == 0):
      self.logger.dump(self.num_timesteps)
    if self.n_calls % self.checking_freq == 0:
      model_path = os.path.join(self.save_path, f'best_model_{self.n_calls}')
      self.model.save(model_path)
    
    return True

In [14]:
CHECKPOINT_DIR = './training/'
LOG_DIR = './logs/'

In [19]:
callback = TrainAndLoggingCallback(checking_freq=50_000, save_path=CHECKPOINT_DIR)

In [16]:
from stable_baselines3 import DQN
from stable_baselines3.common.monitor import Monitor

In [20]:
env = StepManiaEnv()

In [None]:
# Wrap environment to monitor performance and training process
env = gym.make("StepManiaEnv-v1")
env = Monitor(env, filename="./logs/stepmania-env-v1")

In [21]:
model = DQN(
  'CnnPolicy',              
  env,                      # Used environment
  tensorboard_log=LOG_DIR,  # Log directory
  verbose=1,                # Enables logging
  learning_rate=0.0005,     # Learning rate of the optimizer used in training
  buffer_size=120_000,      # Buffer size depending on amount of ram
  learning_starts=1_000,    # Learning starts after 1000 steps
  # device='cpu'              # Training on cpu or gpu
)

Using cuda device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.


In [22]:
# Start training
model.learn(total_timesteps=1_000_000, callback=callback, log_interval=1)

Logging to ./logs/DQN_13
----------------------------------
| rollout/            |          |
|    exploration_rate | 0.995    |
----------------------------------
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 976      |
|    ep_rew_mean      | -339     |
|    exploration_rate | 0.991    |
| time/               |          |
|    episodes         | 1        |
|    fps              | 7        |
|    time_elapsed     | 139      |
|    total_timesteps  | 976      |
----------------------------------
----------------------------------
| rollout/            |          |
|    exploration_rate | 0.991    |
----------------------------------
----------------------------------
| rollout/            |          |
|    exploration_rate | 0.986    |
| train/              |          |
|    learning_rate    | 0.0005   |
|    loss             | 0.361    |
|    n_updates        | 124      |
----------------------------------
------------------------------

<stable_baselines3.dqn.dqn.DQN at 0x23733a619a0>

In [23]:
# Load past model 
model = DQN.load(r'training\DQN13_best_model_1000000', env=env)

Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.


In [None]:
# Continue training
model = DQN.load(
  r'training\DQN11_comparison_binary_long', 
  env=env, 
  tensorboard_log="./logs/DQN11_continued_training",
)

model.set_env(env)

model.learn(total_timesteps=1_000_000, callback=callback, log_interval=1, reset_num_timesteps=False, tb_log_name="second_run")

model.save(r"training\DQN11_continued_training")

In [24]:
print('Environment started - Using Model')
for episode in range(10):
  obs = env.reset()
  done = False
  start = time.perf_counter()
  total_reward = 0
  while not done:
    action, _ = model.predict(obs)
    obs, reward, done, info = env.step(int(action))
    total_reward += reward

  stop = time.perf_counter()
  final_time = stop - start
  print(f'Total Reward for episode {episode} is {env.total_reward}')
  print(f'Total steps during this episode: {env.steps}')
  print(f'Total duration of this episode is {final_time:0.4f} seconds')
  print(f'This equals an average of {env.steps / final_time} steps per second')

Environment started - Using Model


KeyboardInterrupt: 

In [129]:
screenshot = np.array(env.capture.grab(env.window_location))[:,:,:-1].astype(np.uint8)
downscaled_screenshot = env.screenshot_helper.downscaleImage(screenshot, (225, 150), (1, 150, 225))
obs = env.get_observation(downscaled_screenshot)
print(model.predict(obs))

(array(0, dtype=int64), None)


In [None]:
from stable_baselines3.common import results_plotter

results_plotter.plot_results(
  [LOG_DIR], 10000, results_plotter.X_TIMESTEPS, "StepManiaEnv"
)