In [1]:
import numpy as np
import matplotlib.pyplot as plt
from mss import mss
import cv2
import pydirectinput
import pytesseract
from gym import Env
from gym.spaces import Box, Discrete
import time

import random
from stable_baselines3 import PPO
from stable_baselines3.common.env_checker import check_env

pytesseract.pytesseract.tesseract_cmd = r'C:\Program Files\Tesseract-OCR\tesseract.exe'




In [2]:
class MazeEnv(Env):
    def __init__(self):
        super().__init__()

        self.observation_space = Box(low=0, high=255, shape=(1,500,500), dtype=np.uint8)
        self.action_space = Discrete(4)  # 0: up, 1: right, 2: down, 3: left

        self.cap = mss()
        self.game_location = {'top':0, 'left':0, 'width':500, 'height':500}
        self.done_location = {'top':250, 'left':175, 'width':150, 'height':50}
        
        self.start_pos = (1, 1)
        self.goal_pos = (8, 8)
        self.current_pos = self.start_pos

    def step(self, action):
        action_map = {
            0: 'up',
            1: 'right',
            2: 'down',
            3: 'left'
        }

        pydirectinput.press(action_map[action])

        done, done_cap = self.get_done()
        new_observation = self.get_observation()

        reward = 500 if done else -1

        info = {}

        return new_observation, reward, done, False, info

    def render(self):
        cv2.imshow('Game', np.array(self.cap.grab(self.game_location))[..., :3])
        if cv2.waitKey(1) & 0xFF == ord('q'):
            self.close()

    def close(self):
        cv2.destroyAllWindows()

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        time.sleep(1)
        pydirectinput.click(x=225, y=275)
        time.sleep(1)
        
        observation = self.get_observation()
        info = {}
        return observation, info

    def get_observation(self):
        raw = np.array(self.cap.grab(self.game_location))[..., :3]
        gray = cv2.cvtColor(raw, cv2.COLOR_BGR2GRAY)
        channel = np.reshape(gray, (1,500,500))
        
        return channel
    
    def get_done(self):
        done_cap = np.array(self.cap.grab(self.done_location))

        done_strings = ["Play"]

        done = False
        res = pytesseract.image_to_string(done_cap)[:4]
        if res in done_strings:
            done = True

        return done, done_cap

In [3]:
import os
from stable_baselines3.common.callbacks import BaseCallback
from stable_baselines3.common import env_checker

In [4]:
class TrainAndLoggingCallback(BaseCallback):

    def __init__(self, check_freq, save_path, verbose=1):
        super(TrainAndLoggingCallback, self).__init__(verbose)
        self.check_freq = check_freq
        self.save_path = save_path

    def _init_callback(self):
        if self.save_path is not None:
            os.makedirs(self.save_path, exist_ok=True)

    def _on_step(self):
        if self.n_calls % self.check_freq == 0:
            model_path = os.path.join(self.save_path, 'best_model_{}'.format(self.n_calls))
            self.model.save(model_path)

        return True

In [5]:
CHECKPOINT_DIR = './train/'
LOG_DIR = './logs/'

In [6]:
callback = TrainAndLoggingCallback(check_freq=100, save_path=CHECKPOINT_DIR)

In [7]:
from stable_baselines3 import DQN
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.vec_env import DummyVecEnv, VecFrameStack

In [8]:
env = MazeEnv()

In [10]:
model = DQN('CnnPolicy', env, tensorboard_log=LOG_DIR, verbose=1, buffer_size=1000, learning_starts=0)

Using cuda device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.




In [10]:
model.learn(total_timesteps=10000, callback=callback)

Logging to ./logs/DQN_7


RuntimeError: [enforce fail at inline_container.cc:595] . unexpected pos 456852352 vs 456852304

In [11]:
model.load(os.path.join('train', 'best_model_4700')) 

<stable_baselines3.dqn.dqn.DQN at 0x1ec7e0d12d0>

In [13]:
for episode in range(1): 
    obs = env.reset()
    obs = obs[0]
    done = False
    total_reward = 0
    while not done: 
        action, _ = model.predict(obs)
        print(action)
        obs, reward, done, _, _ = env.step(int(action))
        time.sleep(0.01)
        total_reward += reward
    print('Total Reward for episode {} is {}'.format(episode, total_reward))
    time.sleep(2)

0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0


KeyboardInterrupt: 