# Setup

In [1]:
# installs
!pip3 install torch torchvision torchaudio
!pip3 install 'stable-baselines3[extra]' protobuf==3.20.\*
!pip3 install mss pyautogui pytesseract

You should consider upgrading via the '/Library/Frameworks/Python.framework/Versions/3.9/bin/python3 -m pip install --upgrade pip' command.[0m
You should consider upgrading via the '/Library/Frameworks/Python.framework/Versions/3.9/bin/python3 -m pip install --upgrade pip' command.[0m
You should consider upgrading via the '/Library/Frameworks/Python.framework/Versions/3.9/bin/python3 -m pip install --upgrade pip' command.[0m


In [3]:
# imports
from mss import mss
import pyautogui
import cv2
import numpy as np
import pytesseract
from matplotlib import pyplot as plt
import time
from gym import Env
from gym.spaces import Box, Discrete

# Game Environment

In [70]:
class WebGame(Env):
    def __init__(self):
        super().__init__()
        # Setup spaces
        self.observation_space = Box(low=0, high=255, shape=(1,60,150), dtype=np.uint8)
        self.action_space = Discrete(3)
        # Capture game frames
        self.cap = mss()
        # match to your window size
        self.game_location = {'top': 400, 'left': 0, 'width': 1400, 'height': 220}
        self.done_location = {'top': 405, 'left': 430, 'width': 660, 'height': 70}
        # store game over image here...will show how later
        self.template = cv2.imread('gameover.png',0)

        
        
    def step(self, action):
        action_map = {
            0:'space',
            1: 'down', 
            2: 'no_op'
        }
        if action !=2:
            pyautogui.press(action_map[action])

        done, done_cap = self.get_done() 
        observation = self.get_observation()
        if done:
            reward = -20
        else:
            reward = 1
        info = {}
        return observation, reward, done, info
        
    
    def reset(self):
        time.sleep(1)
        pyautogui.click(150, 150)
        pyautogui.press('space')
        return self.get_observation()
        
    def render(self):
        cv2.imshow('Game', self.current_frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            self.close()
         
    def close(self):
        cv2.destroyAllWindows()
        
    def get_observation(self):
        raw = np.array(self.cap.grab(self.game_location))[:,:,:3].astype(np.uint8)
        gray = cv2.cvtColor(raw, cv2.COLOR_BGR2GRAY)
        resized = cv2.resize(gray, (150,60))
        channel = np.reshape(resized, (1,60,150))
        return channel
    
    def get_done(self):
        done_cap = np.array(self.cap.grab(self.done_location))
        img = cv2.cvtColor(done_cap[:,:,:3], cv2.COLOR_RGB2GRAY)
        #plt.imshow(img)
        #plt.imshow(self.template)
        # on first run, uncomment so it saves 'game over' image, then comment out again
        # adjust path to your machine
        #cv2.imwrite('/Users/hunterstew/gameover.png', img)

        done = False
        res = cv2.matchTemplate(img,self.template,cv2.TM_CCOEFF_NORMED)[0][0]
        if res > .99:
            done = True
        return done, done_cap


In [118]:
# test code to make sure its working
env = WebGame()
obs=env.get_observation()
#UNCOMMENT AND ADJUST self.game_location/done_location to your screen size
#plt.imshow(cv2.cvtColor(obs[0], cv2.COLOR_GRAY2BGR))

done, done_cap = env.get_done()
print(done)

for episode in range(1): 
    obs = env.reset()
    done = False  
    total_reward   = 0
    while not done: 
        obs, reward,  done, info =  env.step(env.action_space.sample())
        total_reward  +=    reward
    print('Total Reward for episode {} is {}'.format(episode, total_reward))       

False
Total Reward for episode 0 is 32


# Helpers for saving trained models 

In [32]:
# Import os for file path management
import os 
# Import Base Callback for saving models
from stable_baselines3.common.callbacks import BaseCallback
# Check Environment    
from stable_baselines3.common import env_checker

In [33]:
env_checker.check_env(env)

In [34]:
class TrainAndLoggingCallback(BaseCallback):

    def __init__(self, check_freq, save_path, verbose=1):
        super(TrainAndLoggingCallback, self).__init__(verbose)
        self.check_freq = check_freq
        self.save_path = save_path

    def _init_callback(self):
        if self.save_path is not None:
            os.makedirs(self.save_path, exist_ok=True)

    def _on_step(self):
        if self.n_calls % self.check_freq == 0:
            model_path = os.path.join(self.save_path, 'best_model_{}'.format(self.n_calls))
            self.model.save(model_path)

        return True

In [80]:
CHECKPOINT_DIR = './trainv9/'
LOG_DIR = './logs/'

In [81]:
callback = TrainAndLoggingCallback(check_freq=1000, save_path=CHECKPOINT_DIR)

# Train model

In [84]:
from stable_baselines3 import DQN
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.vec_env import DummyVecEnv, VecFrameStack

In [87]:
env = WebGame()

In [75]:
model = DQN('CnnPolicy', env, tensorboard_log=LOG_DIR, verbose=1, buffer_size=1200000, learning_starts=1000)

Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.




In [76]:
model.learn(total_timesteps=1000000, callback=callback)

Logging to ./logs/DQN_9
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 37.2     |
|    ep_rew_mean      | 16.2     |
|    exploration_rate | 0.997    |
| time/               |          |
|    episodes         | 4        |
|    fps              | 5        |
|    time_elapsed     | 28       |
|    total_timesteps  | 149      |
----------------------------------
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 35.9     |
|    ep_rew_mean      | 14.9     |
|    exploration_rate | 0.995    |
| time/               |          |
|    episodes         | 8        |
|    fps              | 5        |
|    time_elapsed     | 53       |
|    total_timesteps  | 287      |
----------------------------------
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 36.8     |
|    ep_rew_mean      | 15.8     |
|    exploration_rate | 0.992    |
| time/               |        

KeyboardInterrupt: 

# Test model

In [119]:
model = DQN.load('/Users/hunterstew/trainv8/best_model_325000.zip', env=env, print_system_info=True)

== CURRENT SYSTEM INFO ==
OS: macOS-10.16-x86_64-i386-64bit Darwin Kernel Version 20.6.0: Mon Aug 30 06:12:20 PDT 2021; root:xnu-7195.141.6~3/RELEASE_ARM64_T8101
Python: 3.9.7
Stable-Baselines3: 1.6.2
PyTorch: 1.13.0
GPU Enabled: False
Numpy: 1.21.2
Gym: 0.21.0

== SAVED MODEL SYSTEM INFO ==
OS: macOS-10.16-x86_64-i386-64bit Darwin Kernel Version 20.6.0: Mon Aug 30 06:12:20 PDT 2021; root:xnu-7195.141.6~3/RELEASE_ARM64_T8101
Python: 3.9.7
Stable-Baselines3: 1.6.2
PyTorch: 1.13.0
GPU Enabled: False
Numpy: 1.21.2
Gym: 0.21.0

Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.




In [120]:
for episode in range(5): 
    obs = env.reset()
    done = False
    total_reward = 0
    while not done: 
        action, _ = model.predict(obs)
        obs, reward, done, info = env.step(int(action))
        #plt.imshow(cv2.cvtColor(obs[0], cv2.COLOR_GRAY2BGR))
        total_reward += reward
    print('Total Reward for episode {} is {}'.format(episode, total_reward))

Total Reward for episode 0 is 60
Total Reward for episode 1 is 88
Total Reward for episode 2 is 322


KeyboardInterrupt: 