# Import Dependencies

In [2]:
import gym 
from gym import Env
from gym.spaces import Discrete, Box, Dict, Tuple, MultiBinary, MultiDiscrete 

import numpy as np
import random
import math
import os


from stable_baselines3.common.env_checker import check_env
from stable_baselines3 import PPO
from stable_baselines3 import DQN
from stable_baselines3.common.vec_env import VecFrameStack
from stable_baselines3.common.evaluation import evaluate_policy

import pygame

import time

# Initialize Constants

In [3]:
FPS = 60
HEIGHT = 500
WIDTH = 500
START_X = 100
START_Y = int(HEIGHT / 2)
PIPE_GAP = 125
MIN_PIPE_HEIGHT = 100
GENERATE_PIPE_X = 600
PIPE_DISTANCE = 250
JUMP_HEIGHT = 25
FALL_HEIGHT = 7.5
# JUMP_HEIGHT = 15
# FALL_HEIGHT = 20
PIPE_WIDTH = 50
BIRD_WIDTH = 100
BIRD_HEIGHT = 100
TEXT_BUFFER = 25

# colours
BLUE = (164, 219, 232)
BLACK = (0, 0, 0)

# Create Environment 

In [4]:
class FlappyBirdEnv(gym.Env):
    def __init__(self, display=False):
        pygame.init()
        
        # if display is true render the GUI
        self.display = display
        if self.display:
            self.WINDOW = pygame.display.set_mode((WIDTH, HEIGHT))
            pygame.display.set_caption("Flappy Bird")
            self.clock = pygame.time.Clock()
        
        # 2 actions --> jump, don't jump
        self.action_space = Discrete(2)
        # y_coord of bird, distance from nearest pipe, height of nearest bottom pipe
        self.observation_space = Box(0, 600, shape=(3, 1), dtype=int)
        self.iterations = 0
        
        self.reset()
    
    def game_over(self):
        # set game over to true if bird goes out of bounds
        if self.bird_y >= HEIGHT or self.bird_y <= 0:
            return True
        # set game over to true if bird hits the pipe
        if self.next_pipes[self.curr_pipe][0] <= self.bird_x <= self.next_pipes[self.curr_pipe][0] + PIPE_WIDTH and \
        (not HEIGHT - self.next_pipes[self.curr_pipe][1] >= self.bird_y >= HEIGHT - self.next_pipes[self.curr_pipe][1] - PIPE_GAP):
            return True
        return False
        
    # generate a new pipe and append it to next_pipes
    def generate_new_pipe(self):
        new_pipe_height = random.randint(MIN_PIPE_HEIGHT, HEIGHT - PIPE_GAP - MIN_PIPE_HEIGHT)
        self.next_pipes.append([GENERATE_PIPE_X, new_pipe_height])
        return self.next_pipes
    
    def step(self, action):
        if self.display:
            self.render()
            
        new_pipes = []
        # decrement x_coord of pipes
        # delete pipes that are off the screen
        for i in range(len(self.next_pipes)):
            self.next_pipes[i][0] -= 5
            if self.next_pipes[i][0] >= -(PIPE_WIDTH):
                new_pipes.append(self.next_pipes[i].copy())
            else:
                self.curr_pipe -= 1
        self.next_pipes = new_pipes.copy()
        
        # generate new pipe
        if self.next_pipes[-1][0] <= GENERATE_PIPE_X - PIPE_DISTANCE:
            self.generate_new_pipe()
        
        # jumps
        if action == 0:
            self.bird_y -= JUMP_HEIGHT
        # does nothing
        else:
            self.bird_y += FALL_HEIGHT
        
        # calculate reward
        done = self.game_over()
        # reward is -1000 if the bird dies
        if done:
            reward = -1000
        # reward is 1 if bird survives
        else:
            reward = 1
        
        # increment curr_pipe and set reward to 15 if bird passes the pipe
        if self.bird_x > self.next_pipes[self.curr_pipe][0] + PIPE_WIDTH:
            reward = 15
            self.score += 10
            self.curr_pipe += 1
        
        # update state
        self.state = [[self.bird_y], [self.next_pipes[self.curr_pipe][0]], [HEIGHT - self.next_pipes[self.curr_pipe][1]]]
        self.prev_action = action
        info = {}
        
        return np.array(self.state), reward, done, info
    
    # renders the GUI
    def render(self):
        self.WINDOW.fill(BLUE)
        for event in pygame.event.get():
            self.clock.tick(FPS)
            # close window when exit button is clicked
            if event.type == pygame.QUIT:
                pygame.quit()
            
        # draw bird
        bird_img = pygame.image.load('flappy bird.png').convert_alpha()
        bird_img = pygame.transform.smoothscale(bird_img, (50, 50))
        self.WINDOW.blit(bird_img, (self.bird_x, self.bird_y))
        
        # draw pipes
        for pipe in self.next_pipes:
            pipe1 = pygame.image.load('pipe.jpg').convert_alpha()
            pipe1 =  pygame.transform.smoothscale(pipe1, (PIPE_WIDTH, pipe[1])) 
            pipe2 = pygame.image.load('pipe.jpg').convert_alpha()
            pipe2 = pygame.transform.rotate(pipe2, 180)
            pipe2 = pygame.transform.smoothscale(pipe2, (PIPE_WIDTH, HEIGHT - pipe[1] - PIPE_GAP)) 
            
            self.WINDOW.blit(pipe1, (pipe[0], HEIGHT - pipe[1]))
            self.WINDOW.blit(pipe2, (pipe[0], 0))
        
        # display score and number of iterations
        FONT = pygame.font.Font('freesansbold.ttf', 20)
        text = FONT.render(f"Score: {self.score}", True, BLACK)
        self.WINDOW.blit(text, (TEXT_BUFFER, TEXT_BUFFER))

        text = FONT.render(f"Iterations: {self.iterations}", True, BLACK)
        self.WINDOW.blit(text, (WIDTH - 200, TEXT_BUFFER))
        
        pygame.display.update()
    
    # reset all variables to their initial value and restart the game
    def reset(self):
        self.score = 0
        self.bird_x = START_X
        self.bird_y = START_Y
        self.next_pipes = []
        self.generate_new_pipe()
        self.prev_action = 0
        self.state = [[START_Y], [GENERATE_PIPE_X], [self.next_pipes[0][1]]]
        self.curr_pipe = 0
        self.iterations += 1
        
        return np.array(self.state)
    

# Test Environment

In [25]:
env = FlappyBirdEnv()

In [26]:
# sample of observation space
env.observation_space.sample()

array([[460],
       [ 22],
       [575]])

In [27]:
# sample of action space
env.action_space.sample()

1

In [28]:
# sample of current state
env.state

[[250], [600], [229]]

In [29]:
# return value of step function
env.step(1)

(array([[257.5],
        [595. ],
        [271. ]]),
 1,
 False,
 {})

In [78]:
# performance if random steps are taken
env = FlappyBirdEnv(display=True)
episodes = 5
env.iterations = 0
for episode in range(1, episodes+1):
    state = env.reset()
    done = False
    env.score = 0
    reward_score = 0
    
    while not done:
        time.sleep(0.001)
        action = env.action_space.sample()
        n_state, reward, done, info = env.step(action)
        reward_score += reward
    print(f"Episode:{episode} Score:{env.score} Reward Score:{reward_score}")

env.close()
pygame.quit()

Episode:1 Score:0 Reward Score:-911
Episode:2 Score:0 Reward Score:-911
Episode:3 Score:0 Reward Score:-911
Episode:4 Score:0 Reward Score:-911
Episode:5 Score:0 Reward Score:-911


# Train Model

In [None]:
env = FlappyBirdEnv(display=False)
model = PPO("MlpPolicy", env, verbose=1, learning_rate=0.000005)
model.learn(total_timesteps=4000000)
model.save('PPO FLAPPY BIRD 4MIL')

In [26]:
pygame.quit()

# Test Model

In [135]:
# save the model
model.save('PPO FLAPPY BIRD 4MIL')

In [None]:
# test the ai
env = FlappyBirdEnv(display=True)
model = PPO.load('PPO FLAPPY BIRD 4MIL', env=env)
episodes = 5
env.iterations = 0
for episode in range(1, episodes+1):
    state = env.reset()
    done = False
    env.score = 0
    reward_score = 0 
    
    start_time = time.time()
    while not done:
        time.sleep(0.001)
        action, _ = model.predict(state)
        state, reward, done, info = env.step(action)
        reward_score += reward
    print(f"Episode:{episode} Score:{env.score} Reward Score:{reward_score}")
    print(time.time() - start_time)

pygame.quit()

In [None]:
'''
Performance: Generally performs well and can reach scores of over 2000
Areas of Imrovement: On odd ocassions the AI will die before reaching the first pipe(rare cases) not exactly sure why this
is but I believe that further training would stabilize the model
'''