In [32]:
# Pip Installs
!pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cpu
!pip install stable-baselines3[extra] protobuf==3.20.*
!pip install keyboard

Looking in indexes: https://download.pytorch.org/whl/cpu




In [11]:
# Imports
import numpy as np
from matplotlib import pyplot as plt
import time
import keyboard
import pygame
import sys
import random
import os

# Gym Imports

import gymnasium
from gymnasium import Env
from gymnasium.spaces import Box, Discrete

# Pygame Inits()

pygame.init()
pygame.font.init()


In [12]:
# Creating and Initializing Variables and Images

WIN_WIDTH = 800
WIN_HEIGHT = 800

BIRD_IMGS = [pygame.transform.scale_by(pygame.image.load(os.path.join('ProjectImages/FlappyBird/Images', "Bird1.png")), 0.1), 
             pygame.transform.scale_by(pygame.image.load(os.path.join('ProjectImages/FlappyBird/Images', "Bird2.png")), 0.1), 
             pygame.transform.scale_by(pygame.image.load(os.path.join('ProjectImages/FlappyBird/Images', "Bird3.png")), 0.1)]

PIPE_IMG = pygame.transform.scale_by(pygame.image.load(os.path.join('ProjectImages/FlappyBird/Images', "Pipe.png")), 0.5)
GROUND_IMG = pygame.transform.scale_by(pygame.image.load(os.path.join('ProjectImages/FlappyBird/Images', "Ground.png")), 0.5)
SKY_IMG = pygame.transform.scale_by(pygame.image.load(os.path.join('ProjectImages/FlappyBird/Images', "Sky.png")), 0.5)

FONT = pygame.font.Font('freesansbold.ttf', 20)

In [13]:
# Classes for the WebGame

# Bird Class --------------------------------------------------------------------------------------------------------
    
class Bird:

    IMGS = BIRD_IMGS
    MAX_ROTATION = 25
    ROT_VEL = 20
    ANIMATION_TIME = 5

    def __init__(self, x, y):
        self.x = x
        self.y = y
        self.tilt = 0
        self.tick_count = 0
        self.vel = 0
        self.height = self.y
        self.img_count = 0
        self.img = self.IMGS[0]
        self.done = False
        self.get_reward = False

    def jump(self):
        self.vel = -25
        self.tick_count = 0
        self.height = self.y

    def move(self):
        self.tick_count += 1
        d = self.tick_count

        #d = self.vel*self.tick_count + 1.5*self.tick_count**2
        d *= 5.6
        d += self.vel

        if d >= 16:
            d = 16

        if d < 0:
            d -= 2

        self.y = self.y + d

        if d < 12 or self.y < self.height + 50:
            if self.tilt < self.MAX_ROTATION:
                self.tilt = self.MAX_ROTATION

        else:
            if self.tilt > -90:
                self.tilt -= self.ROT_VEL

    def draw(self, win):
        self.img_count += 1

        if self.img_count < self.ANIMATION_TIME:
            self.img = self.IMGS[0]
        elif self.img_count < self.ANIMATION_TIME*2:
            self.img = self.IMGS[1]
        elif self.img_count < self.ANIMATION_TIME*3:
            self.img = self.IMGS[2]
        elif self.img_count < self.ANIMATION_TIME*4:
            self.img = self.IMGS[1]
        elif self.img_count < self.ANIMATION_TIME*4 + 1:
            self.img = self.IMGS[0]
            self.img_count = 0

        if self.tilt <= -90:
            self.img = self.IMGS[1]
            self.img_count = self.ANIMATION_TIME*2

        rotated_image = pygame.transform.rotate(self.img, self.tilt)
        new_rectangle = rotated_image.get_rect(center=self.img.get_rect(topleft=(self.x, self.y)).center)
        win.blit(rotated_image, new_rectangle.topleft)

    def get_mask(self):
        return pygame.mask.from_surface(self.img)

# Pipe Class --------------------------------------------------------------------------------------------------------

class Pipe:
    GAP = 150
    VEL = 10

    def __init__(self, x):
        self.x = x
        self.height = 0

        self.toplc = 0
        self.bottomlc = 0
        self.PIPE_TOP_IMG = PIPE_IMG
        self.PIPE_BOTTOM_IMG = pygame.transform.flip(PIPE_IMG, False, True)
        
        self.size = 1

        self.passed = False
        self.already_checked = False
        self.thing = -400 
        self.set_height()
        
        self.centerx = self.x - self.thing
        self.centery = self.toplc + 600

    def set_height(self):
        self.height = random.randrange(-120, 10)
        self.toplc = self.height - self.PIPE_TOP_IMG.get_height() + 750
        self.bottomlc = self.height + self.GAP

    def move(self):
        self.x -= self.VEL
        self.centerx = self.x - self.thing
        self.centery = (self.toplc + 600 + self.bottomlc + 200) / 2

    def draw(self, win):
        
        win.blit(self.PIPE_TOP_IMG, (self.x, self.toplc))
        win.blit(self.PIPE_BOTTOM_IMG, (self.x, self.bottomlc))
        
        # This was for Testing the Pipe Locations, for example (Center Gap, Upper Pipe, Lower Pipe)
        # I used it for the Inputs for the Model
        
        #pygame.draw.line(win, (0, 255, 255), (self.x - self.thing, self.toplc + 600), (self.x - self.thing, self.bottomlc + 200), 5)
        #pygame.draw.circle(win, (0, 255, 0), (self.x - self.thing, self.toplc + 600), 15)
        #pygame.draw.circle(win, (0, 255, 0), (self.x - self.thing, self.bottomlc + 200), 15)
        #pygame.draw.circle(win, (0, 255, 0), (self.centerx, self.centery), 15)
        

    def collide(self, bird):
        bird_mask = bird.get_mask()
        top_mask = pygame.mask.from_surface(self.PIPE_TOP_IMG)
        bottom_mask = pygame.mask.from_surface(self.PIPE_BOTTOM_IMG)

        top_offset = (self.x - bird.x, self.toplc - round(bird.y))
        bottom_offset = (self.x - bird.x, self.bottomlc - round(bird.y))

        b_point = bird_mask.overlap(bottom_mask, bottom_offset)
        t_point = bird_mask.overlap(top_mask, top_offset)

        if t_point or b_point:
            bird.done = True
            return True

        return False
    
# Ground Class --------------------------------------------------------------------------------------------------------

class Ground:

    VEL = 5
    WIDTH = GROUND_IMG.get_width()
    IMG = GROUND_IMG

    def __init__(self, y):
        self.y = y
        self.x1 = 0
        self.x2 = WIN_WIDTH

    def move(self):
        self.x1 -= self.VEL
        self.x2 -= self.VEL

        if self.x1 + self.WIDTH < 0:
            self.x1 = self.x2 + self.WIDTH

        if self.x2 + self.WIDTH < 0:
            self.x2 = self.x1 + self.WIDTH

    def draw(self, win):
        win.blit(self.IMG, (self.x1, self.y))
        win.blit(self.IMG, (self.x2, self.y))

In [14]:
#  WebGame Functions 

def spawnPipes(pipes):
    pipe = Pipe(500)
    pipes.append(pipe)
    
# Used in Render Function -

def draw_window(win, pipes, ground, score, reward):
    win.blit(SKY_IMG, (0, 0))

    for pipe in pipes:
        pipe.draw(win)

    ground.draw(win)

    env.bird.draw(win)
    
    text = FONT.render("Score: " + str(score), 1, (0, 0, 0))
    win.blit(text, (WIN_WIDTH - 20 - text.get_width(), 20))
    
    new_reward = round(reward, 5)
    
    text2 = FONT.render("Reward: " + str(new_reward), 1, (0, 0, 0))
    win.blit(text2, (WIN_WIDTH - 20 - text2.get_width(), 50))

    pygame.display.update()

In [15]:
# Creating the Environment for the Model

class WebGame(Env):
    REWARD_PER_FRAME_ALIVE = 0.006
    
    # Initialize Function
    
    def __init__(self):
        self.observation_space = Box(low=0, high=1000, shape=(5,))
        self.action_space = Discrete(2)
        self.bird = bird = Bird(150, 30)
        
        self.score = 0
        self.rem = []
        self.pipes = []
        self.last_pipe = time.time()
        self.pipe_cooldown = 0.1
        self.ground = Ground(60)
        self.win = pygame.display.set_mode((WIN_WIDTH, WIN_HEIGHT))
        self.total_reward_for_episode = 0
        
        spawnPipes(self.pipes)
        
    # Step Function
    
    def step(self, action):
        
        # Mapping the Actions
        
        action_map = {0:'space',
                     1:'no_op'}
        
        if action != 1:
            self.bird.jump()
            #pass
            
        reward = 0
 
        # Game Loop Logic

        cur = time.time()
        
        if cur - self.last_pipe > 5:
            spawnPipes(self.pipes)
            self.last_pipe = time.time()
        
        
        for pipe in self.pipes:
            pipe.move()
            pipe.collide(self.bird)
            
            if (pipe.x + pipe.PIPE_TOP_IMG.get_width() / 3) <= self.bird.x and pipe.passed is False:
                pipe.passed = True
                self.bird.get_reward = True
                self.score += 1
            
            if pipe.x <= -600:
                self.rem.append(pipe)
                
            if ((pipe.bottomlc + 200) < self.bird.y < (pipe.toplc + 600) and pipe.passed == False):
                reward += 0.1
                self.total_reward_for_episode += 0.1
                
        for pipe in self.rem:
            try:
                self.pipes.remove(pipe)
            except ValueError:
                pass
            
        self.bird.move()
        self.ground.move()
        
        if self.bird.done:
            done = True
        
        if self.bird.y + self.bird.img.get_height() >= 950 or self.bird.y < -200:
            self.bird.done = True
            
        if keyboard.is_pressed('space'):
            self.bird.jump()
                     
        
        # Returning the Observations, Rewards, Done, Truncated, and Info
            
        new_observation = self.get_observation()
        
        
        reward += self.REWARD_PER_FRAME_ALIVE
        self.total_reward_for_episode += self.REWARD_PER_FRAME_ALIVE
        done = self.bird.done
        
        truncated = done
        
        if self.bird.done:
            reward -= 15
            self.total_reward_for_episode -= 15
            
        if self.bird.get_reward:
            reward += 15
            self.total_reward_for_episode += 15
            self.bird.get_reward = False
            
        info = {}
        
        if done is True:
            self.reset()
            
        round(self.total_reward_for_episode, 5)
            
        self.render()
        time.sleep(1/30)
        
        return new_observation, reward, done, truncated, info
    
    # The Scene Render Function 
    
    def render(self, mode="human"):
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                pygame.quit()
                sys.exit()
        draw_window(self.win, self.pipes, self.ground, self.score, round(self.total_reward_for_episode, 5))
        
    # Reseting After Each Episode
    
    def reset(self, seed=None, options=None):
        
        super().reset(seed=seed)
        if seed is not None:
            np.random.seed(seed)
        
        info = {}
            
        time.sleep(0.05)
        self.bird = bird = Bird(100, 300)
        self.score = 0
        self.rem = []
        self.pipes = []
        self.last_pipe = time.time()
        spawnPipes(self.pipes)
        self.total_reward_for_episode = 0
        
        return self.get_observation()
    
    # Close Function
    
    def close(self):
        pygame.quit()
        
    # Getting Observations in the Game
    
    def get_observation(self):
        inputs = []
        
        # Getting the Closest Pipe that Hasn't Passed the Player
        
        if len(self.pipes) > 0:
            all_pipes = []
            
            for pipe in self.pipes:
                if pipe.passed == False:
                    all_pipes.append(pipe.x)
            
            closest = closest_pipe = min(self.pipes, key=lambda pipe: pipe.x)
            
            # Inputs = 
            
            # Bird's Y Pos
            # Y Distance to Upper Pipe and Lower Pipe
            # Center Coordinates of the Pipe
            
            inputs.append(self.bird.y)
            inputs.append(abs(self.bird.y - (closest.toplc + 600)))
            inputs.append(abs(self.bird.y - (closest.bottomlc + 200)))
            inputs.append(closest.centerx)
            inputs.append(closest.centery)
                
                
        else:
            # If no Pipes are on Screen, Returns Basic Values
            inputs.append(self.bird.y)
            inputs.append(200)
            inputs.append(200)
            inputs.append(400)
            inputs.append(400)
            
        inputs = np.array(inputs)
        inputs = inputs.astype(np.float32)
        
        return inputs
    
    def get_done(self):
        return self.bird.done


In [16]:
# Creating the Training and Logging Callback
# Not my Code, this Callback / Logging Code was Thanks to Nicholas Renotte
# https://www.youtube.com/@NicholasRenotte

from stable_baselines3.common.callbacks import BaseCallback
from stable_baselines3.common import env_checker
from stable_baselines3 import PPO
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.vec_env import DummyVecEnv

class TrainingandLoggingCallback(BaseCallback):
    
    def __init__(self, check_freq, save_path, verbose=1):
        super(TrainingandLoggingCallback, self).__init__(verbose)
        self.check = check_freq
        self.path = save_path
        
    def _init_callback(self):
        if self.path is not None:
            os.makedirs(self.path, exist_ok=True)
            
    def _on_step(self):
        if self.n_calls % self.check == 0:
            model_path = os.path.join(self.path, f'Model{self.n_calls}')
            self.model.save(model_path)
            
        return True

CHECKPOINT_DIR = './train/FlappyBirdPPO/'
LOG_DIR = './logs/Flappy-Bird-PPO/'

callback = TrainingandLoggingCallback(check_freq=9000, save_path=CHECKPOINT_DIR)


In [None]:
    
# Creating and Training the Model    
    
env = WebGame()
env_checker.check_env(env)
pygame.font.init()

model = PPO(
    'MlpPolicy', 
    env, 
    tensorboard_log=LOG_DIR, 
    learning_rate = 0.0005, 
    verbose=1, 
    batch_size=32,
)

model.learn(total_timesteps=90000, callback=callback)

obs = env.reset()
for i in range(1000):
    action, _states = model.predict(obs, deterministic=True)
    obs, reward, done, truncated, info = env.step(action)
    env.render()
    if done or truncated:
        obs = env.reset()


In [None]:

# Testing the model

model = PPO.load('train/FlappyBirdPPO/Model90000')

env = WebGame()

obs = env.reset()
for i in range(1000):
    action, _ = model.predict(obs)
    obs, reward, done, truncated, info = env.step(action)
    env.render()
    if done or truncated:
        obs = env.reset()
