In [None]:
!pip3 install torch torchvision torchaudio --extra-index-url https://download.pytorch.org/whl/cu113

In [None]:
!pip install stable-baselines3[extra] protobuf==3.20.*


In [None]:
!pip install mss pydirectinput pytesseract

In [None]:
!pip list


In [None]:
#Baseline is for training the DQN to train the AI for the game
#MSS is used for screen cap.
#Pydirectinput is used for sending the commands.
#OpenCV allows processing of the frames.
#Numpy is used for carrying mathematical operations.
#Tesseract is for OCR to identify when Game Over is displayed for each round.
#Matplotlib is used for visualizing the frames captured during the training process.
#Time is used for pausing the frames.
#Gym is OpenAI framework for training AI for different games.

In [None]:
from mss import mss
import pydirectinput
import cv2
import numpy as np
import pytesseract
from matplotlib import pyplot as plt
import time
from gym import Env
from gym.spaces import Box, Discrete

In [None]:
#Creating the game environment from the scratch

In [None]:
class DinoGame(Env):
    def __init__(self):
        super().__init__()
        #Creating the gaming space
        self.observation_space = Box(low=0, high=255, shape=(1,83,100), dtype=np.uint8)
        self.action_space = Discrete(3)  #Defines the only three ations allowed for the Dino i.e. up, down and no action

        #Capturing the extraction parameters of the game
        self.cap = mss()
        self.game_location = {'top': 300, 'left': 0, 'width': 600, 'height': 500}
        self.done_location = {'top': 405, 'left': 630, 'width': 660, 'height': 70}

    #Defining how the actions will be played by the agent
    def step(self,action):
        action_map = {
            0:'space',
            1: 'down', 
            2: 'no_op'
        }
        if action !=2:
            pydirectinput.press(action_map[action])
        #Calculating the reward after the game is done
        done, done_cap = self.get_done() 
        observation = self.get_observation()
        reward = 1 
        info = {}
        return observation, reward, done, info

    def reset(self):
        time.sleep(1)
        pydirectinput.click(x=150, y=150)
        pydirectinput.press('space')
        return self.get_observation()

    #Visualizing the game
    def render(self):
        cv2.imshow('Game', self.current_frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            self.close()
         
    def close(self):
        cv2.destroyAllWindows()

    #Preparing the array representation of the frame where the game will be captured
    def get_observation(self):
        raw = np.array(self.cap.grab(self.game_location))[:,:,:3].astype(np.uint8)
        gray = cv2.cvtColor(raw, cv2.COLOR_BGR2GRAY)
        resized = cv2.resize(gray, (100,83))
        channel = np.reshape(resized, (1,83,100))
        return channel

    #For the agent to understand when the Game is considered to be over
    def get_done(self):
        done_cap = np.array(self.cap.grab(self.done_location))
        done_strings = ['GAME', 'GAHE']
        done=False
        # if np.sum(done_cap) < 44300000:
        #     done = True
        #done = False
        res = pytesseract.image_to_string(done_cap)[:4]
        if res in done_strings:
            done = True
        return done, done_cap
    

In [None]:
#Testing the environment

In [None]:
env = DinoGame()

In [None]:
obs=env.get_observation()

In [None]:
plt.imshow(cv2.cvtColor(obs[0], cv2.COLOR_GRAY2BGR))

In [None]:
done, done_cap = env.get_done()

In [None]:
plt.imshow(done_cap)

In [None]:
pytesseract.image_to_string(done_cap)[:4]

In [None]:
for episode in range(10): 
    obs = env.reset()
    done = False  
    total_reward   = 0
    while not done: 
        obs, reward,  done, info =  env.step(env.action_space.sample())
        total_reward  += reward
    print('Total Reward in the episode {} is {}'.format(episode, total_reward))    

In [None]:
 #Model Training

In [1]:
#Creating a callback for checking the environment before model training

import os 
from stable_baselines3.common.callbacks import BaseCallback  
from stable_baselines3.common import env_checker

In [None]:
env_checker.check_env(env)

In [None]:
#For every epoch of training and logging the results of each step
class TrainAndLogCallback(BaseCallback):

    def __init__(self, check_freq, save_path, verbose=1):
        super(TrainAndLogCallback, self).__init__(verbose)
        self.check_freq = check_freq
        self.save_path = save_path

    def _init_callback(self):
        if self.save_path is not None:
            os.makedirs(self.save_path, exist_ok=True)

    def _on_step(self):
        if self.n_calls % self.check_freq == 0:
            model_path = os.path.join(self.save_path, 'best_model_{}'.format(self.n_calls))
            self.model.save(model_path)

        return True

In [None]:
#Directories for storing training checkpoints and logs
check_dir = './train/'
log_dir = './logs/'

In [None]:
callback = TrainAndLogCallback(check_freq=1000, save_path=check_dir)

In [None]:
#Developing the DQN

In [None]:
#Importing DQN algorithm for model training
from stable_baselines3 import DQN
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.vec_env import DummyVecEnv, VecFrameStack

In [None]:
env = DinoGame()

In [None]:
model = DQN('CnnPolicy', env, tensorboard_log=log_dir, verbose=1, buffer_size=1000000, learning_starts=1000)

In [None]:
model.learn(total_timesteps=85000, callback=callback)

In [None]:
model.load('train_first/best_mode l_50000') 

In [None]:
#Testing of the model

In [None]:
for episode in range(10): 
    obs = env.reset()
    done = False
    total_reward = 0
    while not done: 
        action, _ = model.predict(obs)
        obs, reward, done, info = env.step(int(action))   #Allows the agent to play the game
        total_reward += reward
    print('Total Reward for episode {} is {}'.format(episode, total_reward))