In [17]:
import os
from PIL import Image
import pytesseract #used for ocr
from mss import mss #used for recording screen app
import pydirectinput # used for sending command to game regarding actions
import cv2 #allow to do image processing
from matplotlib import pyplot as plt
import time
import gymnasium as gym
from gymnasium.spaces import Box, Discrete
import numpy as np
from IPython.display import display, Image

# For DQN
from stable_baselines3.common.callbacks import BaseCallback
from stable_baselines3.common import env_checker
from stable_baselines3 import DQN

pydirectinput.FAILSAFE = False

### Build Environment

In [18]:
class WebGame(gym.Env):
    def __init__(self):
        # Subclass model
        super().__init__()
        # Setup Spaces
        self.observation_space = Box(low=0, high=255, shape=(1,83,100), dtype=np.uint8)
        self.action_space = Discrete(3)
        # Define extraction parameters for the game
        self.cap = mss()
        self.game_location = {'top': 400, 'left': 0, 'width': 600, 'height': 500}
        self.done_location = {'top': 480, 'left': 550, 'width': 800, 'height': 100}

    def step(self, action):
        action_map = {
            0: 'space',
            1: 'down',
            2: 'no_op'
        }
        if action != 2:
            pydirectinput.press(action_map[action])
        
        terminated = self.get_done()
        truncated = False  # You can set this to True if you implement truncation logic
        new_observations = self.get_observation()
        reward = 1
        info = {}
        
        return new_observations, reward, terminated, truncated, info
    
    def render(self):
        frame = np.array(self.cap.grab(self.game_location))[:, :, :3]
        cv2.imwrite('frame.jpg', frame)
        display(Image(filename='frame.jpg'))
    
    def reset(self, seed=None, options=None):
        # Optionally set the seed
        if seed is not None:
            np.random.seed(seed)
        
        time.sleep(1)
        pydirectinput.click(x=150, y=150)
        pydirectinput.press('space')
        observation = self.get_observation()
        info = {}  # You can return additional info if needed
        return observation, info
    
    def close(self):
        cv2.destroyAllWindows()
    
    def get_observation(self):
        raw = np.array(self.cap.grab(self.game_location))[:, :, :3]
        gray = cv2.cvtColor(raw, cv2.COLOR_BGR2GRAY)
        resized = cv2.resize(gray, (100, 83))
        channel = np.reshape(resized, (1, 83, 100))
        return channel
    
    def get_done(self):
        done_cap = np.array(self.cap.grab(self.done_location))[:, :, :3]
        res = pytesseract.image_to_string(done_cap)
        done_strings = ['OVE', 'GAME', 'chrome', 'OVER', 'enterprise', 'GAME', 'OVER\n']
        res = res.split(" ")
        if any(word in res for word in done_strings):
            return True
        return False

In [19]:
env = WebGame()

In [4]:
# Testing our environment Logics

for episodes in range(10):
    obs = env.reset()
    done = False
    total_reward = 0
    
    while not done:
        obs, reward, done, trunc, info = env.step(env.action_space.sample())
        total_reward += reward
    print(f'Total Reward for ep isodes {episodes} is {total_reward}')

Total Reward for ep isodes 0 is 11
Total Reward for ep isodes 1 is 15
Total Reward for ep isodes 2 is 12
Total Reward for ep isodes 3 is 10
Total Reward for ep isodes 4 is 10
Total Reward for ep isodes 5 is 17
Total Reward for ep isodes 6 is 31
Total Reward for ep isodes 7 is 11
Total Reward for ep isodes 8 is 13
Total Reward for ep isodes 9 is 10


### Using Deep Reinforcement Algorithm (DQN)

In [11]:
# This help us to check if our created environment is workig perfectly or not
# env_checker.check_env(env)

In [12]:
class TrainAndLoggingCallback(BaseCallback):
    
    def __init__(self, check_freq, save_path, verbose=1):
        super(TrainAndLoggingCallback, self).__init__(verbose)
        self.check_freq = check_freq
        self.save_path = save_path
        
    def _init_callback(self):
        if self.save_path is not None:
            os.makedirs(self.save_path, exist_ok=True)
            
    def _on_step(self):
        if self.n_calls % self.check_freq == 0:
            model_path = os.path.join(self.save_path, 'best_model_{}'.format(self.n_calls))
            self.model.save(model_path)
        return True

In [13]:
CHECKPOINT_DIR = './train/'
LOG_DIR = './logs'

In [14]:
callback = TrainAndLoggingCallback(check_freq=1000, save_path=CHECKPOINT_DIR)

In [15]:
model = DQN(
        'CnnPolicy', 
        env, 
        tensorboard_log=LOG_DIR, 
        verbose=1, 
        buffer_size=1000000, 
        learning_starts=1000
    )

Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.




In [16]:
model.learn(total_timesteps=2000, callback=callback)

Logging to ./logs\DQN_5


KeyboardInterrupt: 

### Test Best Model

In [30]:
model = DQN('CnnPolicy', env, buffer_size=10000)
model.set_parameters(os.path.join('train', 'best_model_50000'))

for episodes in range(15):
    obs, info = env.reset()
    done = False
    total_reward = 0
    
    while not done:
        action, _ = model.predict(obs)
        obs, reward, done, trunc, info = env.step(int(action))
        total_reward += reward
    print(f'Total Reward for episodes {episodes} is {total_reward}')

Total Reward for episodes 0 is 26
Total Reward for episodes 1 is 33
Total Reward for episodes 2 is 20
Total Reward for episodes 3 is 19
Total Reward for episodes 4 is 21
Total Reward for episodes 5 is 40
Total Reward for episodes 6 is 34
Total Reward for episodes 7 is 23
Total Reward for episodes 8 is 31
Total Reward for episodes 9 is 23
Total Reward for episodes 10 is 22
Total Reward for episodes 11 is 47
Total Reward for episodes 12 is 189
Total Reward for episodes 13 is 25
Total Reward for episodes 14 is 33


### Training DQN for further timestamps

In [9]:
model = DQN.load(os.path.join(CHECKPOINT_DIR, 'best_model_3000.zip'), env=env)

Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.




In [10]:
additional_timesteps = 60000  # Adjust as needed
model.learn(total_timesteps=additional_timesteps, callback=callback)

Logging to ./logs\DQN_4
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 18.2     |
|    ep_rew_mean      | 18.2     |
|    exploration_rate | 0.988    |
| time/               |          |
|    episodes         | 4        |
|    fps              | 1        |
|    time_elapsed     | 37       |
|    total_timesteps  | 73       |
----------------------------------
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 15.1     |
|    ep_rew_mean      | 15.1     |
|    exploration_rate | 0.981    |
| time/               |          |
|    episodes         | 8        |
|    fps              | 1        |
|    time_elapsed     | 63       |
|    total_timesteps  | 121      |
----------------------------------
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 13.8     |
|    ep_rew_mean      | 13.8     |
|    exploration_rate | 0.974    |
| time/               |        

<stable_baselines3.dqn.dqn.DQN at 0x2448f9b6f90>