In [17]:
SCREEN_HEIGHT = 600
SCREEN_WIDTH = 1100

INIT_GAME_SPEED = 14
X_POS_BG_INIT = 0
Y_POS_BG = 380

INIT_REPLAY_MEM_SIZE = 5_000
REPLAY_MEMORY_SIZE = 45_000
MODEL_NAME = "DINO"
MIN_REPLAY_MEMORY_SIZE = 1_000
MINIBATCH_SIZE = 64
DISCOUNT = 0.95
UPDATE_TARGET_THRESH = 5
EPSILON_INIT = 0.45
EPSILON_DECAY = 0.997
NUM_EPISODES = 2_000
MIN_EPSILON = 0.05

In [18]:
import pygame
import os

RUNNING = [pygame.image.load(os.path.join("Assets/Dino", "DinoRun1.png")), 
        pygame.image.load(os.path.join("Assets/Dino", "DinoRun2.png"))]

DUCKING = [pygame.image.load(os.path.join("Assets/Dino", "DinoDuck1.png")), 
        pygame.image.load(os.path.join("Assets/Dino", "DinoDuck2.png"))]


JUMPING = pygame.image.load(os.path.join("Assets/Dino", "DinoJump.png"))

SMALL_CACTUS = [pygame.image.load(os.path.join("Assets/Cactus", "SmallCactus1.png")), 
                pygame.image.load(os.path.join("Assets/Cactus", "SmallCactus2.png")), 
                pygame.image.load(os.path.join("Assets/Cactus", "SmallCactus3.png"))]


LARGE_CACTUS = [pygame.image.load(os.path.join("Assets/Cactus", "LargeCactus1.png")), 
                pygame.image.load(os.path.join("Assets/Cactus", "LargeCactus2.png")), 
                pygame.image.load(os.path.join("Assets/Cactus", "LargeCactus3.png"))]

BIRD = [pygame.image.load(os.path.join("Assets/Bird", "Bird1.png")), pygame.image.load(os.path.join("Assets/Bird", "Bird2.png"))]

CLOUD = pygame.image.load(os.path.join("Assets/Other", "Cloud.png"))

BACKGROUND = pygame.image.load(os.path.join("Assets/Other", "Track.png"))

In [19]:
import pygame
from typing import List

class Obstacle:
    def __init__(self, image: List[pygame.Surface], type: int) -> None:
        self.image = image
        self.type = type
        self.rect = self.image[self.type].get_rect()
        self.rect.x = SCREEN_WIDTH

    def update(self, obstacles: list, game_speed: int):
        self.rect.x -= game_speed
        if self.rect.x < -self.rect.width:
            obstacles.pop()
        
    def draw(self, SCREEN: pygame.Surface):
        SCREEN.blit(self.image[self.type], self.rect)

In [20]:
import tensorflow as tf
import pandas as pd
import numpy as np
from collections import deque
import random

class DQNAgent:
    def __init__(self) -> None:
        # creating the main model
        # used for fitting at each step

        self.model = self.create_model()

        # creating the target model
        # used for predicting at each step
        self.target_model = self.create_model()
        self.target_model.set_weights(self.model.get_weights())

        # memory that is kept to learn from
        self.init_replay_memory = deque(maxlen=INIT_REPLAY_MEM_SIZE)
        self.late_replay_memory = deque(maxlen=REPLAY_MEMORY_SIZE)

        # counter for updating the target model
        self.target_update_counter = 0

    
    def create_model(self):
        model = tf.keras.models.Sequential()
        model.add(tf.keras.layers.Input(shape=(7,)))
        # model.add(tf.keras.layers.Dropout(0.2))
        model.add(tf.keras.layers.Dense(4, activation='relu'))
        # model.add(tf.keras.layers.Dropout(0.2))
        model.add(tf.keras.layers.Dense(3))

        model.compile(
            optimizer='adam',
            loss=tf.keras.losses.MeanSquaredError(),
            metrics=['accuracy']
        )

        return model
    

    # Update the memory store
    def update_replay_memory(self, transition):
        # if len(self.replay_memory) > 50_000:
        #     self.replay_memory.clear()
        if len(self.init_replay_memory) < INIT_REPLAY_MEM_SIZE:
            self.init_replay_memory.append(transition)
        else:
            self.late_replay_memory.append(transition)
    

    # Get the q values for the given state
    def get_qs(self, state):
        return self.model(np.array([state]), training=False)[0]
    

    # train the neural networks
    def train(self, terminal_state, step):
        # Start training only if a certain number of examples is available in memory
        if len(self.init_replay_memory) < MIN_REPLAY_MEMORY_SIZE:
            return
        
        total_mem = list(self.init_replay_memory)
        total_mem.extend(self.late_replay_memory)
        # Get a minibatch of random samples from memory
        minibatch = random.sample(total_mem, MINIBATCH_SIZE)

        # Get current states for minibatch and then query the NN for Q values
        current_states = np.array([transition[0] for transition in minibatch])
        current_qs_list = self.model(current_states, training=False).numpy()

        # Get future states for minibatch and then query the target NN for Q values

        new_current_states = np.array([transition[3] for transition in minibatch])
        future_qs_list = self.target_model(new_current_states, training=False).numpy()

        X = []
        y = []

        # enumerate minibatch

        for index, (current_state, action, reward, new_current_state, done) in enumerate(minibatch):
            # If not a terminal state, get a new Q value from future states, else we set it to 0
            if not done:
                max_future_q = np.max(future_qs_list[index])
                new_q = reward + DISCOUNT * max_future_q
            else:
                new_q = reward
            

            # update q value for a given state
            current_qs = current_qs_list[index]
            current_qs[action] = new_q

            X.append(current_state)
            y.append(current_qs)

        
        # Fit on all samples as one batch
        self.model.fit(np.array(X), np.array(y), batch_size=MINIBATCH_SIZE, shuffle=False, verbose=0)

        # Update target network every episode
        if terminal_state:
            self.target_update_counter += 1
        
        # If counter crosses threshold, update target network with the weights of the main network
        if self.target_update_counter > UPDATE_TARGET_THRESH:
            # print(self.target_update_counter)
            self.target_model.set_weights(self.model.get_weights())
            self.target_update_counter = 0
            # print(self.target_update_counter)

# Clase del Dinosaurio

In [21]:
import pygame
import numpy as np

class Dino(DQNAgent):
    X_POS = 80
    Y_POS = 310
    Y_DUCK_POS = 340
    JUMP_VEL = 8.5
    #code here
    def __init__(self) -> None:
        #Initializing the images for the dino
        self.duck_img = DUCKING
        self.run_img = RUNNING
        self.jump_img = JUMPING


        #Initially the dino starts running
        self.dino_duck = False
        self.dino_run = True
        self.dino_jump = False

        self.step_index = 0
        self.jump_vel = self.JUMP_VEL
        self.image = self.run_img[0]
        self.dino_rect = self.image.get_rect()

        self.dino_rect.x = self.X_POS
        self.dino_rect.y = self.Y_POS

        self.score = 0

        super().__init__()
    
    
    # Update the Dino's state
    def update(self, move: pygame.key.ScancodeWrapper):
        if self.dino_duck:
            self.duck()
        
        if self.dino_jump:
            self.jump()
        
        if self.dino_run:
            self.run()

        if self.step_index >= 20:
            self.step_index = 0
        

        if move[pygame.K_UP] and not self.dino_jump:
            self.dino_jump = True
            self.dino_run = False
            self.dino_duck = False

        elif move[pygame.K_DOWN] and not self.dino_jump:
            self.dino_duck = True
            self.dino_run = False
            self.dino_jump = False
        
        elif not(self.dino_jump or move[pygame.K_DOWN]):
            self.dino_run = True
            self.dino_jump = False
            self.dino_duck = False
    
    def update_auto(self, move):
        if self.dino_duck == True:
            self.duck()
        
        if self.dino_jump == True:
            self.jump()
        
        if self.dino_run == True:
            self.run()

        if self.step_index >= 20:
            self.step_index = 0
        
        if move == 0 and not self.dino_jump:
            self.dino_jump = True
            self.dino_run = False
            self.dino_duck = False

        elif move == 1 and not self.dino_jump:
            self.dino_duck = True
            self.dino_run = False
            self.dino_jump = False
        
        elif not(self.dino_jump or move == 1):
            self.dino_run = True
            self.dino_jump = False
            self.dino_duck = False

    def duck(self) -> None:
        self.image = self.duck_img[self.step_index // 10]
        self.dino_rect = self.image.get_rect()
        self.dino_rect.x = self.X_POS
        self.dino_rect.y = self.Y_DUCK_POS
        self.step_index += 1

    def run(self) -> None:
        self.image = self.run_img[self.step_index // 10]
        self.dino_rect = self.image.get_rect()
        self.dino_rect.x = self.X_POS
        self.dino_rect.y = self.Y_POS
        self.step_index += 1
        

    def jump(self) -> None:
        self.image = self.jump_img
        if self.dino_jump:
            self.dino_rect.y -= self.jump_vel * 3
            self.jump_vel -= 0.6
        
        if self.jump_vel < -self.JUMP_VEL:
            self.dino_jump = False
            self.dino_run = True
            self.jump_vel = self.JUMP_VEL

    def draw(self, SCREEN: pygame.Surface):
        SCREEN.blit(self.image, (self.dino_rect.x, self.dino_rect.y))

# Clases de Assets

In [22]:
import pygame
import random
from typing import List

class LargeCactus(Obstacle):
    def __init__(self, image: List[pygame.Surface]) -> None:
        self.type = random.randint(0, 2)
        super().__init__(image, self.type)
        self.rect.y = 300


class SmallCactus(Obstacle):
    def __init__(self, image: List[pygame.Surface]) -> None:
        self.type = random.randint(0, 2)
        super().__init__(image, self.type)
        self.rect.y = 325

In [23]:
from random import random
import pygame
from typing import List
import random

class Bird(Obstacle):
    def __init__(self, image: List[pygame.Surface]) -> None:
        self.type = 0
        super().__init__(image, self.type)
        self.rect.y = 120
        self.index = 0
    
    def draw(self, SCREEN: pygame.Surface):
        if self.index >= 19:
            self.index = 0
        
        SCREEN.blit(self.image[self.index // 10], self.rect)
        self.index += 1

In [24]:
import pygame
import random

class Cloud:
    def __init__(self) -> None:
        self.x = SCREEN_WIDTH + random.randint(800, 1000)
        self.y = random.randint(50, 100)
        self.image = CLOUD
        self.width = self.image.get_width()

    def update(self, game_speed: int):
        self.x -= game_speed
        if self.x < -self.width:
            self.x = SCREEN_WIDTH + random.randint(800, 1000)
            self.y = random.randint(50, 100)
    

    def draw(self, SCREEN: pygame.Surface):
        SCREEN.blit(self.image, (self.x, self.y))   

#  Clase Game

In [16]:
# !pip install sqlalchemy

ERROR: Exception:
Traceback (most recent call last):
  File "C:\Users\Ruben\.conda\envs\newenv\lib\site-packages\pip\_vendor\urllib3\response.py", line 437, in _error_catcher
    yield
  File "C:\Users\Ruben\.conda\envs\newenv\lib\site-packages\pip\_vendor\urllib3\response.py", line 560, in read
    data = self._fp_read(amt) if not fp_closed else b""
  File "C:\Users\Ruben\.conda\envs\newenv\lib\site-packages\pip\_vendor\urllib3\response.py", line 526, in _fp_read
    return self._fp.read(amt) if amt is not None else self._fp.read()
  File "C:\Users\Ruben\.conda\envs\newenv\lib\site-packages\pip\_vendor\cachecontrol\filewrapper.py", line 90, in read
    data = self.__fp.read(amt)
  File "C:\Users\Ruben\.conda\envs\newenv\lib\http\client.py", line 465, in read
    n = self.readinto(b)
  File "C:\Users\Ruben\.conda\envs\newenv\lib\http\client.py", line 509, in readinto
    n = self.fp.readinto(b)
  File "C:\Users\Ruben\.conda\envs\newenv\lib\socket.py", line 589, in readinto
    return s

Collecting sqlalchemy
  Downloading SQLAlchemy-2.0.23-cp37-cp37m-win_amd64.whl (2.1 MB)
     ---------------------------------------- 2.1/2.1 MB 9.0 kB/s eta 0:00:00
Collecting greenlet!=0.4.17
  Downloading greenlet-3.0.1-cp37-cp37m-win_amd64.whl (287 kB)
     ---------------------------            204.8/287.7 kB 6.8 kB/s eta 0:00:13
Collecting sqlalchemy
  Downloading SQLAlchemy-2.0.23-cp37-cp37m-win_amd64.whl (2.1 MB)
     ---------------------------------------- 2.1/2.1 MB 23.9 kB/s eta 0:00:00
Collecting greenlet!=0.4.17
  Downloading greenlet-3.0.1-cp37-cp37m-win_amd64.whl (287 kB)
     ------------------------------------- 287.7/287.7 kB 18.2 kB/s eta 0:00:00
Installing collected packages: greenlet, sqlalchemy
Successfully installed greenlet-3.0.1 sqlalchemy-2.0.23


In [25]:
from argparse import Action
import random
import sys
import pygame
import numpy as np
from sqlalchemy import asc
import math
import time
from tqdm import tqdm

class Game:
    def __init__(self, epsilon) -> None:
        pygame.init()
        self.SCREEN = pygame.display.set_mode((SCREEN_WIDTH, SCREEN_HEIGHT))

        self.obstacles = []

        self.run = True

        self.clock = pygame.time.Clock()

        self.cloud = Cloud()

        self.game_speed = INIT_GAME_SPEED

        self.font = pygame.font.Font("freesansbold.ttf", 20)

        self.dino = Dino()

        self.x_pos_bg = X_POS_BG_INIT

        self.points = 0
        
        self.epsilon = epsilon

        self.ep_rewards = [-200]
    

    def reset(self):
        self.game_speed = INIT_GAME_SPEED
        old_dino = self.dino
        self.dino = Dino()
        self.dino.init_replay_memory = old_dino.init_replay_memory
        self.dino.late_replay_memory = old_dino.late_replay_memory
        self.dino.target_update_counter = old_dino.target_update_counter
        self.dino.model.set_weights(old_dino.model.get_weights())
        self.dino.target_model.set_weights(old_dino.target_model.get_weights())


        self.x_pos_bg = X_POS_BG_INIT
        self.points = 0
        self.SCREEN = pygame.display.set_mode((SCREEN_WIDTH, SCREEN_HEIGHT))
        self.clock = pygame.time.Clock()

    def get_dist(self, pos_a: tuple, pos_b:tuple):
        dx = pos_a[0] - pos_b[0]
        dy = pos_a[1] - pos_b[1]

        return math.sqrt(dx**2 + dy**2) 

    def update_background(self):
        image_width = BACKGROUND.get_width()

        self.SCREEN.blit(BACKGROUND, (self.x_pos_bg, Y_POS_BG))
        self.SCREEN.blit(BACKGROUND, (self.x_pos_bg + image_width, Y_POS_BG))

        if self.x_pos_bg <= -image_width:
            self.SCREEN.blit(BACKGROUND, (self.x_pos_bg + image_width, Y_POS_BG))
            self.x_pos_bg = 0
        
        self.x_pos_bg -= self.game_speed
        return self.x_pos_bg
    
    def get_state(self):
        state = []
        state.append(self.dino.dino_rect.y / self.dino.Y_DUCK_POS + 10) 
        pos_a = (self.dino.dino_rect.x, self.dino.dino_rect.y)
        bird = 0
        cactus = 0
        if len(self.obstacles) == 0:
            dist = self.get_dist(pos_a, tuple([SCREEN_WIDTH + 10, self.dino.Y_POS])) / math.sqrt(SCREEN_HEIGHT**2 + SCREEN_WIDTH**2)
            obs_height = 0
            obj_width = 0
        else:
            dist = self.get_dist(pos_a, (self.obstacles[0].rect.midtop)) / math.sqrt(SCREEN_HEIGHT**2 + SCREEN_WIDTH**2)
            obs_height = self.obstacles[0].rect.midtop[1] / self.dino.Y_DUCK_POS
            obj_width = self.obstacles[0].rect.width / SMALL_CACTUS[2].get_rect().width
            if self.obstacles[0].__class__ == SmallCactus(SMALL_CACTUS).__class__ or \
                self.obstacles[0].__class__ == LargeCactus(LARGE_CACTUS).__class__:
                cactus = 1
            else:
                bird = 1
        
        state.append(dist)
        state.append(obs_height)
        state.append(self.game_speed / 24)
        state.append(obj_width)
        state.append(cactus)
        state.append(bird)
        
        return state


    def update_score(self):
        self.points += 1
        if self.points % 200 == 0:
            self.game_speed += 1

        text = self.font.render("Points: " + str(self.points), True, (0, 0, 0))
        textRect = text.get_rect()
        textRect.center = (1000, 40)
        self.SCREEN.blit(text, textRect)
    
    def create_obstacle(self):
        # bird_prob = random.randint(0, 15)
        # cactus_prob = random.randint(0, 10)
        # if bird_prob == 0:
        #     self.obstacles.append(Bird(BIRD))
        # elif cactus_prob == 0:
        #     self.obstacles.append(SmallCactus(SMALL_CACTUS))
        # elif cactus_prob == 1:
        #     self.obstacles.append(LargeCactus(LARGE_CACTUS))

        obstacle_prob = random.randint(0, 50)
        if obstacle_prob == 0:
            self.obstacles.append(SmallCactus(SMALL_CACTUS))
        elif obstacle_prob == 1:
            self.obstacles.append(LargeCactus(LARGE_CACTUS))
        elif obstacle_prob == 2 and self.points > 300:
            self.obstacles.append(Bird(BIRD))
    
    def update_game(self, moves, user_input=None):
        self.dino.draw(self.SCREEN)
        if user_input is not None:
            self.dino.update(user_input)
        else:
            self.dino.update_auto(moves)

        self.update_background()

        self.cloud.draw(self.SCREEN)

        self.cloud.update(self.game_speed)

        self.update_score() 

        self.clock.tick(30)

        # pygame.display.update()

    def play_manual(self):
        
        while self.run is True:
            for event in pygame.event.get():
                if event.type == pygame.QUIT:
                    sys.exit()
                
            self.SCREEN.fill((255, 255, 255))
            user_input = pygame.key.get_pressed()
            # moves = []

            if len(self.obstacles) == 0:
                self.create_obstacle()

            for obstacle in self.obstacles:
                obstacle.draw(SCREEN=self.SCREEN)
                obstacle.update(self.obstacles, self.game_speed)
                if self.dino.dino_rect.colliderect(obstacle.rect):
                    self.dino.score = self.points
                    pygame.quit()
                    self.obstacles.pop()
                    print("Game over!")
                    return

            self.update_game(user_input=user_input, moves=2)
            pygame.display.update()


    def play_auto(self):
        points_label = 0
        for episode in tqdm(range(1, NUM_EPISODES + 1), ascii=True, unit='episodes'):
            episode_reward = 0
            step = 1
            current_state = self.get_state()
            self.run = True
            while self.run is True:
                for event in pygame.event.get():
                    if event.type == pygame.QUIT:
                        sys.exit()
                
                self.SCREEN.fill((255, 255, 255))

                if len(self.obstacles) == 0:
                    self.create_obstacle()

                # if self.run == False:
                #     print(current_state)
                #     time.sleep(2)
                #     continue

                if np.random.random() > self.epsilon:
                    action = self.dino.get_qs(current_state)
                    # print(action)
                    action = np.argmax(action)
                    # print(action)
                else:
                    num = np.random.randint(0, 10)
                    if num == 0:
                        # print("yes")
                        action = num
                    elif num <= 3:
                        action = 1
                    else:
                        action = 2

                self.update_game(moves=action)
                # print(self.game_speed)
                next_state = self.get_state()
                reward = 0

                for obstacle in self.obstacles:
                    obstacle.draw(SCREEN=self.SCREEN)
                    obstacle.update(self.obstacles, self.game_speed)
                    next_state = self.get_state()
                    if self.dino.dino_rect.x > obstacle.rect.x + obstacle.rect.width:
                        reward = 3
                    
                    if action == 0 and obstacle.rect.x > SCREEN_WIDTH // 2:
                        reward = -1
                    
                    if self.dino.dino_rect.colliderect(obstacle.rect):
                        self.dino.score = self.points
                        # pygame.quit()
                        self.obstacles.pop()
                        points_label = self.points
                        self.reset()
                        reward = -10
                        # print("Game over!")
                        self.run = False
                        break
                # if reward != 0:
                #     print(reward > 0)

                episode_reward += reward
                
                self.dino.update_replay_memory(tuple([current_state, action, reward, next_state, self.run]))

                self.dino.train( not self.run, step=step)

                current_state = next_state

                step += 1

                # self.clock.tick(60)

                # print(current_state)

                pygame.display.update()
            

            self.ep_rewards.append(episode_reward)
            if episode % 50 == 0:
                self.dino.model.save(f'models/Episode_{episode}_Points_{points_label}_model.model')
            

            if self.epsilon > MIN_EPSILON:
                self.epsilon *= EPSILON_DECAY
                if self.epsilon < MIN_EPSILON:
                    self.epsilon = 0
                    # print(self.epsilon)
                else:
                    self.epsilon = max(MIN_EPSILON, self.epsilon)
                # print(self.epsilon)
                # print((self.dino.replay_memory))

In [None]:
game = Game(EPSILON_INIT)

game.play_auto()

  1%|4                                                                       | 13/2000 [01:38<3:37:35,  6.57s/episodes]