In [None]:
!pip install selenium > /dev/null
!apt-get update > /dev/null # to update ubuntu to correctly run apt install
!apt install chromium-chromedriver> /dev/null
!cp /usr/lib/chromium-browser/chromedriver /usr/bin > /dev/null

In [2]:
import os
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import gym
import numpy as np
from collections import deque
from gym import spaces
from io import BytesIO
from PIL import Image
import base64
import cv2
import time
import matplotlib.pyplot as plt
from datetime import datetime
import random
import tensorflow as tf
from tensorflow.keras.layers import Input, Dense
from tensorflow.keras.optimizers import Adam, RMSprop
from tensorflow.keras.models import Model, load_model
import math



In [10]:
class Game():
	def __init__(self):
		self.chrome_options = webdriver.ChromeOptions()
		#self.chrome_options.binary_location = chrome_path # File path where chrome.exe is
		self.chrome_options.add_argument("--mute-audio")
		self.chrome_options.add_argument("--headless")
		self.chrome_options.add_argument('--no-sandbox')
		self.chrome_options.add_argument('--disable-dev-shm-usage')
		#self.chrome_options.setExperimentalOption("excludeSwitches",Arrays.asList("disable-popup-blocking"));
		self.chrome_options.add_argument('start-maximized')
		self.driver = webdriver.Chrome('chromedriver', options=self.chrome_options)

	def Start(self):
		'''
		Open the Game Instance in Chrome
		'''
		self.driver.get('https://chromedino.com/')

	def Action(self, action):
		'''
		Perform action
		'''
		self.driver.find_element(By.TAG_NAME, 'body').send_keys(action)

	def Refresh(self):
		'''
		Refresh the Chrome Tab
		'''
		self.driver.refresh()
		WebDriverWait(self.driver, 10).until(EC.presence_of_element_located((By.CLASS_NAME, "runner-canvas")))

	def Restart(self):
		'''
		Refresh the Chrome Tab and start the game again
		'''
		self.driver.find_element(By.TAG_NAME, 'body').send_keys(Keys.SPACE)

	def Get_Score(self):
		'''
		Return the score of the gane
		'''
		score = self.driver.execute_script("return Runner.instance_.distanceMeter.digits")
		score = ''.join(score)

		return int(score)

	def Img_State(self):
		'''
		Return the image of the current state
		'''
		img = self.driver.execute_script("return document.querySelector('canvas.runner-canvas').toDataURL()")
		return img

	def Done_State(self):
		'''
		Return whether the dino has crashed or not
		'''
		done = self.driver.execute_script("return Runner.instance_.crashed")
		
		return done

class DinoEnv(gym.Env):
	def __init__(self, width=120, height=120, chrome_path=None):
		self.screen_width = width
		self.screen_height = height

		self.action_space = spaces.Discrete(3) # Do nothing, jump, crouch
		self.observation_space = spaces.Box(low=0, high=255, shape=(self.screen_width, self.screen_height, 4), dtype=np.uint8)

		self.state_queue = deque(maxlen=4)

		self.game = Game()

		self.action_list = [Keys.ARROW_LEFT, Keys.ARROW_UP, Keys.ARROW_DOWN]		

	def Env_Start(self):
		'''
		Start the Dino Game Instance
		'''
		self.game.Start()

	def step(self, action):
		'''
		Returns Observation, reward, done, other
		'''
		self.game.Action(self.action_list[action])

		next_state = self.next_state()

		done = self.done_state()

		reward = 1 if not done else -100

		score = self.game.Get_Score()
		#print(score)

		time.sleep(0.15)

		return next_state, reward, done, score

	def reset(self):
		'''
		Reset the Dino Game Instance
		'''
		self.game.Restart()

		return self.next_state()

	def get_state_img(self):
		'''
		Returns an image of the current state of the game
		'''
		LEADING_TEXT = "data:image/png;base64,"
		img = self.game.Img_State()
		img = img[len(LEADING_TEXT):]

		return np.array(Image.open(BytesIO(base64.b64decode(img))))

	def next_state(self):
		'''
		Processes the image of the state
		'''
		img = cv2.cvtColor(self.get_state_img(), cv2.COLOR_BGR2GRAY)
		img = img[:, :150] # Cropping
		img = cv2.resize(img, (self.screen_width, self.screen_height)) # Resize

		self.state_queue.append(img)

		if len(self.state_queue) < 4:
			return np.stack([img] * 4, axis=-1)
		else:
			return np.stack(self.state_queue, axis=-1)
		#return img

	def Score(self):
		'''
		Obtain and return score from the Game Instance
		'''
		score = self.game.Get_Score()
		return score

	def done_state(self):
		'''
		Check and return whether the Dino has crashed or not
		'''
		return self.game.Done_State()

In [None]:
# policy network
def OurModel(input_shape, action_space):

    input = tf.keras.layers.Input(input_shape)
    s = input

    c1 = tf.keras.layers.Conv2D(filters=64, kernel_size=(3,3),padding='same',activation='relu', activity_regularizer='L1L2')(s)
    c1 = tf.keras.layers.Conv2D(filters=64, kernel_size=(3,3),padding='same',activation='relu')(c1)
    do1 = tf.keras.layers.Dropout(0.15)(c1)

    m1 = tf.keras.layers.MaxPool2D(pool_size=(2,2), strides=(2,2))(do1)

    c2 = tf.keras.layers.Conv2D(filters=128, kernel_size=(3,3),padding='same',activation='relu', activity_regularizer='L1L2')(m1)
    c2 = tf.keras.layers.Conv2D(filters=128, kernel_size=(3,3),padding='same',activation='relu')(c2)
    do2 = tf.keras.layers.Dropout(0.15)(c2)

    m2 = tf.keras.layers.MaxPool2D(pool_size=(2,2), strides=(2,2))(do2)

    c3 = tf.keras.layers.Conv2D(filters=256, kernel_size=(3,3),padding='same',activation='relu', activity_regularizer='L1L2')(m2)
    c3 = tf.keras.layers.Conv2D(filters=256, kernel_size=(3,3),padding='same',activation='relu')(c3)
    c3 = tf.keras.layers.Conv2D(filters=256, kernel_size=(3,3),padding='same',activation='relu')(c3)
    do3 = tf.keras.layers.Dropout(0.15)(c3)

    m3 = tf.keras.layers.MaxPool2D(pool_size=(2,2), strides=(2,2))(do3)

    c4 = tf.keras.layers.Conv2D(filters=512, kernel_size=(3,3),padding='same',activation='relu', activity_regularizer='L1L2')(m3)
    c4 = tf.keras.layers.Conv2D(filters=512, kernel_size=(3,3),padding='same',activation='relu')(c4)
    c4 = tf.keras.layers.Conv2D(filters=512, kernel_size=(3,3),padding='same',activation='relu')(c4)
    do4 = tf.keras.layers.Dropout(0.15)(c4)

    m4 = tf.keras.layers.MaxPool2D(pool_size=(2,2), strides=(2,2))(do4)

    c5 = tf.keras.layers.Conv2D(filters=512, kernel_size=(3,3),padding='same',activation='relu', activity_regularizer='L1L2')(m4)
    c5 = tf.keras.layers.Conv2D(filters=512, kernel_size=(3,3),padding='same',activation='relu')(c5)
    c5 = tf.keras.layers.Conv2D(filters=512, kernel_size=(3,3),padding='same',activation='relu')(c5)
    do5 = tf.keras.layers.Dropout(0.15)(c5)

    f1 = tf.keras.layers.Flatten()(do5)

    d1 = tf.keras.layers.Dense(units=4096, activation='relu')(f1)
    d2 = tf.keras.layers.Dense(units=1024, activation='relu')(d1)

    d = tf.keras.layers.Dense(units=action_space, activation='linear')(d2)

    model = tf.keras.models.Model(inputs=[input], outputs=[d])
    
    model.compile(loss="mse", optimizer=Adam(learning_rate=0.001), metrics=["accuracy"]) 

    # model.summary()
    return model

class DQNAgent:
    def __init__(self):
        self.env = DinoEnv()
        self.state_size = self.env.observation_space.shape
        #self.state_size = (120, 120, 4)
        self.action_size = self.env.action_space.n
        self.EPISODES = 100 
        self.memory = deque(maxlen=2000)
        
        self.gamma = 0.95    # discount rate
        self.epsilon = 1.0  # exploration rate
        self.epsilon_min = 0.001
        self.epsilon_decay = 0.999
        self.batch_size = 32
        self.train_start = 500

        # create main model
        self.Target_model = OurModel(input_shape=self.state_size, action_space = self.action_size) 
        self.Train_model = OurModel(input_shape=self.state_size, action_space = self.action_size) 

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))
        if len(self.memory) > self.train_start:
            if self.epsilon > self.epsilon_min:
                self.epsilon *= self.epsilon_decay
    
    # to do
    # implement the epsilon-greedy policy
    def act(self, state):
        p = np.random.uniform() 
        if p < self.epsilon: 
          action = self.env.action_space.sample() 
        else: 
          q = self.Train_model.predict(state[np.newaxis,:]) 
          action = np.argmax(q) 
        return action 

    # to do
    # implement the Q-learning
    def replay(self): 
        if len(self.memory) < self.train_start:
            return
        # Randomly sample minibatch from the memory
        minibatch = random.sample(self.memory, min(len(self.memory), self.batch_size))

        #state = np.zeros((self.batch_size, self.state_size))
        state = np.zeros((self.batch_size, 120, 120, 4))
        #next_state = np.zeros((self.batch_size, self.state_size))
        next_state = np.zeros((self.batch_size, 120, 120, 4))
        action, reward, done, targets = [], [], [], [] 

        # assign data into state, next_state, action, reward and done from minibatch
        for i in range(self.batch_size):
            state[i] = minibatch[i][0] 
            next_state[i] = minibatch[i][3] 
            action.append(minibatch[i][1])
            reward.append(minibatch[i][2])
            done.append(minibatch[i][4]) 

        # compute value function of current state (call it target) and value function of next state (call it target_next)
        for i in range(self.batch_size):
            target = self.Train_model.predict(state[i][np.newaxis,:]) 
            target = target[0] 
            target_next = self.Target_model.predict(next_state[i][np.newaxis,:]) 
            target_next = target_next[0] 

            # correction on the Q value for the action used,
            # if done[i] is true, then the target should be just the final reward
            if not done[i]:
                # else, use Bellman Equation
                # Standard - DQN
                # DQN chooses the max Q value among next actions
                # selection and evaluation of action is on the target Q Network
                # target = max_a' (r + gamma*Q_target_next(s', a'))

                q_next = np.max(target_next) 
                new_q = reward[i] + self.gamma*q_next 
            else:
                new_q = reward[i] 

            target[action[i]] = new_q
            targets.append(target) 

        # Train the Neural Network with batches where target is the value function
        targets = np.asarray(targets) 
        self.Train_model.fit(state, targets, batch_size=self.batch_size, verbose=0) 

    def load(self, name):
        self.model = load_model(name)

    def save(self, name):
        self.model.save(name)
            
    def training(self):
        max = 0
        total_r = [] 
        count = 25 
        start = time.time() 
        self.env.Env_Start()

        for e in range(self.EPISODES):
            time.sleep(1.5)
            state = self.env.reset()
            done = False
            i = 0
            
            while not done:
                # if you have graphic support, you can render() to see the animation. 
                #self.env.render()
                action = self.act(state)
                next_state, reward, done, _ = self.env.step(action)
                    
                self.remember(state, action, reward, next_state, done)
                state = next_state
                
                i += 1
                if done:  
                    if i > max:
                      max = i 
                    dateTimeObj = datetime.now()
                    timestampStr = dateTimeObj.strftime("%H:%M:%S")

                    end = time.time() 
                    elapse = np.abs(start-end) 
                    hour = elapse/3600
                    minute = np.abs(elapse - (math.floor(hour) * 3600))/60 
                    seconds = np.abs(minute - math.floor(minute)) * 60 
                    print(f"\repisode: {e+1}/{self.EPISODES}, score: {i}, max score: {max}, e: {round(self.epsilon, 4)}, time: {timestampStr}, elapsed time: {math.floor(hour)} hours, {math.floor(minute)} minutes, {math.floor(seconds)} seconds", end='', flush=True) 
                    #print(f"episode: {e+1}/{self.EPISODES}, score: {i}, max score: {max}, e: {round(self.epsilon, 4)}, time: {timestampStr}, elapsed time: {math.floor(hour)} hours, {math.floor(minute)} minutes, {math.floor(seconds)} seconds") 
                    total_r.append(i) 

                    self.replay() 
                    if e > count: 
                      count += e 
                      self.Target_model.set_weights(self.Train_model.get_weights()) 

        epi = np.linspace(0, self.EPISODES, self.EPISODES) 
        plt.plot(epi, total_r) 
        plt.xlabel('Episodes') 
        plt.ylabel('Total Reward') 
        plt.show()

if __name__ == "__main__":
	agent = DQNAgent()
	agent.training()

episode: 25/100, score: 22, max score: 30, e: 0.8913, time: 22:08:00, elapsed time: 0 hours, 5 minutes, 5 seconds