# Chrome Dinosaur game reinforcement learning

## 1. Install dependencies

In [1]:
# for super fast screen capture
from mss import mss
# for fast button presses
import pydirectinput
import cv2 as cv
import numpy as np
# for ocr (optical character recognition)
import pytesseract
from matplotlib import pyplot as plt
import time
# base env class
from gym import Env
from gym.spaces import Box, Discrete
# imports for frame stacking
from stable_baselines3.common.vec_env import DummyVecEnv, VecFrameStack

  from .autonotebook import tqdm as notebook_tqdm


## 2. Build custom environment

### 2.1 Build env

In [2]:
class WebGame(Env):
    def __init__(self) -> None:
        super().__init__()
        
        # === Define spaces
        self.observation_space = Box(low=0, high=255, shape=(83,100,1), dtype=np.uint8)
        # can be 0, 1 or 2
        # 0 = up, 1 = down, 2 = no op
        self.action_space = Discrete(3)

        # === Define extraction parameters
        self.cap = mss()
        self.game_location = {'top': 300, 'left': 0, 'width': 800, 'height': 500}
        self.done_location = {'top': 420, 'left': 630, 'width': 260, 'height': 40}
        
        self.dark_done = np.load('dino/dark_done.npy')
        self.light_done = np.load('dino/light_done.npy')
        
    def step(self, action):
        # take action
        action_map = {
            0: 'up',
            1: 'down',
            2: 'f',
        }
        
        if action != 2:
            pydirectinput.press(action_map[action])
        
        # get infos
        done = self.get_done()
        obs = self.get_observation()
        
        # reward for every frame alive
        if done:
            reward = 0
        else:
            reward = 1
        
        info = {}
        
        return obs, reward, done, info
    
    def render(self):
        # omiting function
        pass
    
    def reset(self):
        time.sleep(2)
        
        # this is really to make sure we are tabbed into the game
        pydirectinput.click(150, 150)
        
        pydirectinput.press('space')
        
        return self.get_observation()
    
    # non standart functions
    def get_observation(self):
        # get screenshot
        obs = np.array(self.cap.grab(self.game_location))
        
        # channels are BGRA
        obs = obs[..., :3]
        
        obs = cv.cvtColor(obs, cv.COLOR_BGR2GRAY)
        
        obs = cv.resize(obs, (100, 83))
        obs = np.expand_dims(obs, -1)
        
        return obs
    
    def close(self) -> None:
        return super().close()
    
    def get_done(self, debug=False, save_name=None):
        done_cap = np.array(self.cap.grab(self.done_location))
        done_cap = done_cap[..., :3]
        
        done_cap = cv.cvtColor(done_cap, cv.COLOR_BGR2GRAY)
        factor = .25
        new_size = (int(self.done_location['width']*factor), int(self.done_location['height']*.25))
        done_cap = cv.resize(done_cap, new_size)
        
        if save_name != None:
            np.save(f'dino/{save_name}.npy', done_cap)
        
        dark_similarity = np.sum((self.dark_done - done_cap)**2)
        light_similarity = np.sum((self.light_done - done_cap)**2)
                
        done = False
        if dark_similarity < 2000 or light_similarity < 2000:
            done = True
        
        if debug:
            plt.imshow(done_cap, cmap='gray')
            print(dark_similarity, light_similarity)
                
        return done

In [None]:
env = WebGame()

In [None]:
env.reset().shape

In [None]:
obs = env.get_observation()
plt.imshow(obs, cmap='gray')

In [None]:
env.get_done(True)

### 2.2 Test env

In [3]:
env = WebGame()
# Wrap in dummy environment
env = DummyVecEnv([lambda: env])
# stack frames
env = VecFrameStack(env, 3, channels_order='last')

In [None]:
games = 2

# time.sleep(5)
for i in range(games):
    done = False
    env.reset()
    total_reward = 0
    while not done:
        obs, reward, done, info = env.step([env.action_space.sample()])
        total_reward += reward
    print(f'Total reward of episode {i} was {total_reward}')

In [None]:
from stable_baselines3.common import env_checker

In [None]:
env_checker.check_env(env)

## 3. Train model

### 3.1 create call back

In [4]:
import os
from stable_baselines3.common.callbacks import BaseCallback

In [5]:
class TrainAndLoggingCallback(BaseCallback):

    def __init__(self, check_freq, save_path, verbose=1):
        super(TrainAndLoggingCallback, self).__init__(verbose)
        self.check_freq = check_freq
        self.save_path = save_path

    def _init_callback(self):
        if self.save_path is not None:
            os.makedirs(self.save_path, exist_ok=True)

    def _on_step(self):
        if self.n_calls % self.check_freq == 0:
            model_path = os.path.join(self.save_path, 'best_model_{}'.format(self.n_calls))
            self.model.save(model_path)

        return True

In [14]:
CHECKPOINT_DIR = './dino/train/second' # for model weights
LOG_DIR = './dino/log' # for tf logs

In [15]:
callback = TrainAndLoggingCallback(check_freq=2000, save_path=CHECKPOINT_DIR)

### Build and drain DQN

In [8]:
from stable_baselines3 import DQN

In [12]:
model = DQN('CnnPolicy', env, tensorboard_log=LOG_DIR, verbose=1, buffer_size=220000, learning_starts=1000)

Using cuda device
Wrapping the env in a VecTransposeImage.


In [None]:
model = DQN('CnnPolicy', env, tensorboard_log=LOG_DIR, verbose=1, buffer_size=500000, learning_starts=1000, learning_rate=0.00001)
model.set_parameters('./dino/train/first/best_model_3000.zip')

In [16]:
model.learn(total_timesteps=15000, callback=callback)

Logging to ./dino/log\DQN_3
----------------------------------
| rollout/            |          |
|    exploration_rate | 0.954    |
| time/               |          |
|    episodes         | 4        |
|    fps              | 2        |
|    time_elapsed     | 29       |
|    total_timesteps  | 73       |
----------------------------------
----------------------------------
| rollout/            |          |
|    exploration_rate | 0.909    |
| time/               |          |
|    episodes         | 8        |
|    fps              | 2        |
|    time_elapsed     | 57       |
|    total_timesteps  | 143      |
----------------------------------
----------------------------------
| rollout/            |          |
|    exploration_rate | 0.863    |
| time/               |          |
|    episodes         | 12       |
|    fps              | 2        |
|    time_elapsed     | 86       |
|    total_timesteps  | 216      |
----------------------------------
---------------------------

<stable_baselines3.dqn.dqn.DQN at 0x2b3042fe0c8>

## 4. Test model

In [None]:
model = DQN.load('./dino/train/first/best_model_3000.zip')

In [None]:
env = WebGame()
games = 3

# time.sleep(5)
for i in range(games):
    done = False
    obs = env.reset()
    total_reward = 0
    while not done:
        action, _ = model.predict(obs)
        obs, reward, done, info = env.step(int(action))
        total_reward += reward
        time.sleep(.01)
    print(f'Total reward of episode {i} was {total_reward}')

### 4.1 check execution times

In [None]:
prediction_times = []
step_times = []

env = WebGame()
games = 3

# time.sleep(5)
for i in range(games):
    done = False
    obs = env.reset()
    total_reward = 0
    while not done:
        tic = time.time()
        action, _ = model.predict(obs)
        prediction_times.append(time.time()-tic)
        
        tic = time.time()
        obs, reward, done, info = env.step(int(action))
        step_times.append(time.time()-tic)
        
        total_reward += reward
        time.sleep(.01)
    print(f'Total reward of episode {i} was {total_reward}')

In [None]:
print(sum(prediction_times)/len(prediction_times))
print(sum(step_times)/len(step_times))

step takes very long (0.4s). The value is either ~.47 or ~.14

Investigating further

In [None]:
tic = time.time()
for i in range(100):
    pydirectinput.press('down')
print((time.time()-tic)/100)

In [None]:
# can't get it to work

import win32com.client as comclt
wsh = comclt.Dispatch("WScript.Shell")
# wsh.AppActivate("Steam") # select another application
wsh.AppActivate()
wsh.SendKeys("space")
wsh.SendKeys("space")
wsh.SendKeys("down")
wsh.SendKeys("down")
wsh.SendKeys("down")

In [None]:
import pyautogui

tic = time.time()
for i in range(100):
    pyautogui.press('down')
print((time.time()-tic)/100)

In [None]:
tic = time.time()
for i in range(100):
    env.get_done()
print((time.time()-tic)/100)

- 0.13391414880752564 original
- 0.01684134006500244 new method

In [None]:
tic = time.time()
for i in range(100):
    env.get_observation()
print((time.time()-tic)/100)