In [1]:
from mss import mss
import pydirectinput
import cv2 as cv
import numpy as np
import pytesseract
import matplotlib.pyplot as plt
import time
from gym import Env
from gym.spaces import Box, Discrete
from PIL import Image
import pickle

In [2]:
class WebGame(Env):
    def __init__(self):
        super().__init__()
        self.observation_space = Box(low=0, high=255, shape = (1,84, 84), dtype=np.uint8)
        self.action_space = Discrete(2)
        self.cap = mss()
        self.game_location = {'top':300, 'left':0, 'width':1400, "height":500}
        self.done_location = {'top':385, 'left':630, 'width':680, "height":100}

    def step(self, action):
        action_map = {
            0:'space',
            1:'no_op'      
        }
        if action!=1: 
            pydirectinput.FAILSAFE = False
            pydirectinput.press(action_map[action])
        done = self.get_done()
        observation  = self.get_observation()
        reward = 1
        info = {}
        if action!=1:
            time.sleep(0.2) 

        return observation, reward, done, info 

    def reset(self):
        time.sleep(1)
        pydirectinput.click(x=150, y=150)
        pydirectinput.press('space')
        return self.get_observation()


    def get_observation(self):
        raw = np.array(self.cap.grab(self.game_location))[:,:,:3]
        gray = cv.cvtColor(raw, cv.COLOR_BGR2GRAY)
        resized = cv.resize(gray, (84, 84))
        channel = np.reshape(resized, (1, 84, 84))
        
        return channel

    def get_done(self):
        done_i = np.array(self.cap.grab(self.done_location))[:,:,:3]


        done_strings = ['GAME', 'GAHE']

        done = False
        res = pytesseract.image_to_string(done_i)[:4]
        if res in done_strings:
            done = True

        return done

In [3]:
from torch import nn
import torch.nn.functional as F
import torch as T
from collections import deque
import itertools
import random


In [4]:
GAMMA=0.99
BATCH_SIZE=64
BUFFER_SIZE=50000
MIN_REPLAY_SIZE=1000
EPSILON_START=1.0
EPSILON_END=0.02
EPSILON_DECAY=10000
TARGET_UPDATE_FREQ = 1000
lr = 3e-4
episode_reward = 0.0

In [5]:
def nature_cnn(obs, depths=(32, 64, 64), final_layer = 512):
    n_input_channels = obs.shape[0]
    cnn = nn.Sequential(
        nn.Conv2d(n_input_channels, depths[0], kernel_size=8, stride=4),
        nn.ReLU(),
        nn.Conv2d(depths[0], depths[1], kernel_size=4, stride=2),
        nn.ReLU(),
        nn.Conv2d(depths[1], depths[2], kernel_size=2, stride=1),
        nn.ReLU(),
        nn.Flatten()
    )
    with T.no_grad():
        n_flatten = cnn(T.as_tensor(obs.sample()[None]).float()).shape[1]
    out = nn.Sequential(cnn, nn.Linear(n_flatten, final_layer), nn.ReLU())
    return out

In [6]:
class Network(nn.Module):
    def __init__(self, env):
        super().__init__()

        self.net = nn.Sequential(
            nature_cnn(env.observation_space),
            nn.Linear(512, env.action_space.n)
        )

    def forward(self, x):
        return self.net(x)

    def act(self, obs):
        obs_t = T.as_tensor(obs, dtype=T.float32).cuda()
        q_values = self(obs_t.unsqueeze(0))
        max_q_index = T.argmax(q_values, dim=1)[0]
        action = max_q_index.detach().item()

        return action

    def train_loss(self, transitions, target_net):
        obses = np.asarray([t[0] for t in transitions])
        actions = np.asarray([t[1] for t in transitions])
        rews = np.asarray([t[2] for t in transitions])
        dones = np.asarray([t[3] for t in transitions])
        new_obses = np.asarray([t[4] for t in transitions])

        obses_t = T.as_tensor(obses, dtype=T.float32).cuda()
        actions_t = T.as_tensor(actions, dtype=T.int64).unsqueeze(-1).cuda()
        rews_t = T.as_tensor(rews, dtype=T.float32).unsqueeze(-1).cuda()
        dones_t = T.as_tensor(dones, dtype=T.float32).unsqueeze(-1).cuda()
        new_obses_t = T.as_tensor(new_obses, dtype=T.float32).cuda()

        target_q_values = target_net(new_obses_t)
        max_target_q_values = target_q_values.max(dim=1, keepdim=True)[0]

        targets = rews_t + GAMMA*(1-dones_t)*max_target_q_values

        q_values = self(obses_t)
        action_q_values = T.gather(q_values, dim=1, index=actions_t)

        loss = F.smooth_l1_loss(action_q_values, targets)
        return loss


In [7]:
env = WebGame()

In [8]:
online_net = T.load('best_model/online.pth')
target_net = T.load('best_model/target.pth')
optimizer = T.load('best_model/optim.pth')  

In [9]:
# replay_buffer = deque(maxlen=BUFFER_SIZE)
rew_buffer = deque([0.0], maxlen=100)

In [10]:
replay_buffer = pickle.load(open('replay_buffer', 'rb'))

In [11]:
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
import chromedriver_binary
import time

driver = webdriver.Chrome(executable_path='./chromedriver.exe')
driver.maximize_window()
try: 
    driver.get('chrome://dino')
except:   
    pass

  driver = webdriver.Chrome(executable_path='./chromedriver.exe')


In [13]:
# obs = env.reset()
# time.sleep(2)
# for i in range(MIN_REPLAY_SIZE):
#     action = env.action_space.sample() 
#     new_obs, rew, done, _ = env.step(action)
#     transition = (obs, action, rew, done, new_obs)
#     replay_buffer.append(transition)
#     obs = new_obs

#     if done:
#         obs = env.reset()

obs = env.reset()
time.sleep(2)
for step in itertools.count():
    # epsilon = EPSILON_END

    # rand = random.random()

    # if rand <= epsilon:
    #     action = env.action_space.sample()
    # else:
    action = online_net.act(obs)

    new_obs, rew, done, _ = env.step(action)
    transition = (obs, action, rew, done, new_obs)
    replay_buffer.append(transition)
    obs = new_obs

    episode_reward += rew

    if done:
        obs = env.reset()
        rew_buffer.append(episode_reward)
        episode_reward = 0.0


    transitions = random.sample(replay_buffer, BATCH_SIZE)

    loss = online_net.train_loss(transitions, target_net)

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    if step%TARGET_UPDATE_FREQ == 0:
        target_net.load_state_dict(online_net.state_dict())

    if step%100 == 0:
        print()
        print('STEP', step)
        print('Loss', loss)
        print('Avg Reward', np.mean(rew_buffer))

    if step%1000 == 0:
        T.save(online_net, 'best_model/online.pth')
        T.save(target_net, 'best_model/target.pth')
        T.save(optimizer, 'best_model/optim.pth')


STEP 0
Loss tensor(1.9644, device='cuda:0', grad_fn=<SmoothL1LossBackward0>)
Avg Reward 23.0

STEP 100
Loss tensor(2.2248, device='cuda:0', grad_fn=<SmoothL1LossBackward0>)
Avg Reward 23.0625

STEP 200
Loss tensor(2.5536, device='cuda:0', grad_fn=<SmoothL1LossBackward0>)
Avg Reward 24.05263157894737

STEP 300
Loss tensor(2.5304, device='cuda:0', grad_fn=<SmoothL1LossBackward0>)
Avg Reward 25.043478260869566


KeyboardInterrupt: 

In [None]:
T.save(online_net, 'best_model/online.pth')
T.save(target_net, 'best_model/target.pth')
T.save(optimizer, 'best_model/optim.pth')

In [None]:
pickle.dump(replay_buffer, open('replay_buffer', 'wb'))

In [None]:
obs = env.reset()
time.sleep(2)
for step in itertools.count():
    epsilon = EPSILON_END

    rand = random.random()

    # if rand <= epsilon:
    #     action = env.action_space.sample()
    # else:
    action = online_net.act(obs)

    new_obs, rew, done, _ = env.step(action)
    obs = new_obs
    if done:
        obs = env.reset()

KeyboardInterrupt: 