In [1]:
import torch
from mss import mss #capture
import pyautogui
import cv2
import numpy as np
import pytesseract #OCR
from matplotlib import pyplot as plt
import time
from gymnasium import Env
from gymnasium.spaces import Box, Discrete

Environment

In [3]:
class WebGame(Env):
    def __init__(self):
        super().__init__()
        self.observation_space = Box(low=0, high=255, shape= (1,83,100), dtype=np.uint8)
        self.action_space = Discrete(3)
        self.cap = mss()
        self.game_location = {'top':225, 'left':130, 'width':600, 'height':160}
        self.done_location = {'top':263, 'left':329, 'width':90, 'height':20}
    
    def step(self, action):
        # 0 -> Space, 1-> Duck, 2 -> None action
        action_map ={
            0: 'space',
            1: 'down',
            2: 'no_op'
        }
        if action != 2:
            pyautogui.press(action_map[action])

        done, done_cap = self.get_done()
        new_observation = self.get_observation()
        reward = 1
        info = {}
        truncated = False
        
        return new_observation, reward, done, truncated, info
    
    def render(self):
        cv2.imshow('Game', np.array(self.cap.grab(self.game_location))[:,:,:3])
        # cv2.destroyAllWindows()
        if cv2.waitKey(1) & 0xFF == ord('q'):
            self.close()

    def close(self):
        cv2.destroyAllWindows()
        cv2.waitKey(1)
        
    def reset(self, seed=0):
        time.sleep(1)
        pyautogui.click(x=430, y=320)
        time.sleep(0.1)
        pyautogui.click(x=430, y=320)
        info = {}
        return self.get_observation(), info

    def get_observation(self):
        raw = np.array(self.cap.grab(self.game_location))[:,:,:3]
        gray = cv2.cvtColor(raw, cv2.COLOR_BGR2GRAY)
        resized = cv2.resize(gray, (100,83)) #width first, then height
        channel = np.reshape(resized, (1,83,100)) #need channels first
        return channel
    
    def get_done(self):
        done_cap = np.array(self.cap.grab(self.done_location))[:,:,:3]
        done_strings = ['GAME', 'GaAW']
        done = False
        res = pytesseract.image_to_string(done_cap)[:4]
        if res in done_strings:
            done = True
        return done, done_cap
    

DQN

In [4]:
env = WebGame()

In [5]:
import os
from stable_baselines3.common.callbacks import BaseCallback
from stable_baselines3.common import env_checker

In [6]:
class TrainAndLoggingCallback(BaseCallback):

    def __init__(self, check_freq, save_path, verbose = 1):
        super(TrainAndLoggingCallback, self).__init__(verbose)
        self.check_freq = check_freq
        self.save_path = save_path

    def _init_callback(self):
        if self.save_path is not None:
            os.makedirs(self.save_path, exist_ok=True)

    def _on_step(self):
        if self.n_calls % self.check_freq == 0:
            model_path = os.path.join(self.save_path, 'best_model_{}'. format(self.n_calls))
            self.model.save(model_path)

        return True


In [7]:
CHECKPOINT_DIR = './train'
LOG_DIR = './logs'

In [8]:
callback = TrainAndLoggingCallback(check_freq=1000, save_path=CHECKPOINT_DIR)

In [9]:
from stable_baselines3 import DQN

In [None]:
model = DQN('CnnPolicy', env, tensorboard_log=LOG_DIR, verbose=1,
             buffer_size=300000, learning_starts=180)

In [10]:
time.sleep(20)

In [11]:
model.learn(total_timesteps=200000, callback=callback)

Logging to ./logs/DQN_1
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 26.2     |
|    ep_rew_mean      | 26.2     |
|    exploration_rate | 0.995    |
| time/               |          |
|    episodes         | 4        |
|    fps              | 4        |
|    time_elapsed     | 25       |
|    total_timesteps  | 105      |
----------------------------------
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 25.5     |
|    ep_rew_mean      | 25.5     |
|    exploration_rate | 0.99     |
| time/               |          |
|    episodes         | 8        |
|    fps              | 4        |
|    time_elapsed     | 48       |
|    total_timesteps  | 204      |
| train/              |          |
|    learning_rate    | 0.0001   |
|    loss             | 0.335    |
|    n_updates        | 5        |
----------------------------------
----------------------------------
| rollout/            |        

<stable_baselines3.dqn.dqn.DQN at 0x2aabbaf50>

Test

In [None]:
model.load(os.path.join('train', 'best_model_200000'))

In [14]:
for episode in range(1):
    obs, info = env.reset()
    done = False
    total_reward = 0

    while not done: 
        action, _ = model.predict(obs)
        obs, reward, done, _, info = env.step(int(action))
        total_reward += reward
    print(f'Total Reward for episode {episode} is {total_reward}')

Total Reward for episode 0 is 21
