In [1]:
!pip install flappy-bird-gymnasium

[0m

In [2]:
import random
from itertools import cycle

import pygame
import os
import random
import sys
import time
import cv2
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import flappy_bird_gymnasium
import gymnasium
from matplotlib import pyplot as plt


def load():
    # path of player with different states
    PLAYER_PATH = (
        './assets/sprites/redbird-upflap.png',
        './assets/sprites/redbird-midflap.png',
        './assets/sprites/redbird-downflap.png'
    )

    # path of background
    BACKGROUND_PATH = './assets/sprites/background-black.png'

    # path of pipe
    PIPE_PATH = './assets/sprites/pipe-green.png'

    IMAGES, HITMASKS = {}, {}

    # numbers sprites for score display
    IMAGES['numbers'] = [
        pygame.image.load('./assets/sprites/{}.png'.format(idx)).convert_alpha()
        for idx in range(10)
    ]

    # base (ground) sprite
    IMAGES['base'] = pygame.image.load('./assets/sprites/base.png').convert_alpha()

    # select random background sprites
    IMAGES['background'] = pygame.image.load(BACKGROUND_PATH).convert()

    # select random player sprites
    IMAGES['player'] = [
        pygame.image.load(PLAYER_PATH[idx]).convert_alpha()
        for idx in range(3)
    ]

    # select random pipe sprites
    IMAGES['pipe'] = (
        pygame.transform.rotate(
            pygame.image.load(PIPE_PATH).convert_alpha(), 180),
        pygame.image.load(PIPE_PATH).convert_alpha(),
    )

    # hismask for pipes
    HITMASKS['pipe'] = [
        getHitmask(IMAGES['pipe'][idx])
        for idx in range(2)
    ]

    # hitmask for player
    HITMASKS['player'] = [
        getHitmask(IMAGES['player'][idx])
        for idx in range(3)
    ]

    return IMAGES, HITMASKS


def getHitmask(image):
    """returns a hitmask using an image's alpha."""
    mask = []
    for x in range(image.get_width()):
        mask.append([])
        for y in range(image.get_height()):
            mask[x].append(bool(image.get_at((x, y))[3]))
    return mask


FPS = 60000 # 30000
SCREENWIDTH = 288
SCREENHEIGHT = 512

# pygame.init()
FPSCLOCK = pygame.time.Clock()
SCREEN = pygame.display.set_mode((SCREENWIDTH, SCREENHEIGHT))
pygame.display.set_caption('Flappy Bird')

IMAGES, HITMASKS = load()
PIPEGAPSIZE = 100  # gap between upper and lower part of pipe
BASEY = SCREENHEIGHT * 0.79

PLAYER_WIDTH = IMAGES['player'][0].get_width()
PLAYER_HEIGHT = IMAGES['player'][0].get_height()
PIPE_WIDTH = IMAGES['pipe'][0].get_width()
PIPE_HEIGHT = IMAGES['pipe'][0].get_height()
BACKGROUND_WIDTH = IMAGES['background'].get_width()

PLAYER_INDEX_GEN = cycle([0, 1, 2, 1])


class GameState:
    def __init__(self, show_game):
        self.show_game = show_game

        if self.show_game:
            pygame.init()
            self.SCREEN = pygame.display.set_mode((SCREENWIDTH, SCREENHEIGHT))
            pygame.display.set_caption('Flappy Bird')

        self.score = self.playerIndex = self.loopIter = 0
        self.playerx = int(SCREENWIDTH * 0.2)
        self.playery = int((SCREENHEIGHT - PLAYER_HEIGHT) / 2)
        self.basex = 0
        self.baseShift = IMAGES['base'].get_width() - BACKGROUND_WIDTH

        newPipe1 = getRandomPipe()
        newPipe2 = getRandomPipe()
        self.upperPipes = [
            {'x': SCREENWIDTH, 'y': newPipe1[0]['y']},
            {'x': SCREENWIDTH + (SCREENWIDTH / 2), 'y': newPipe2[0]['y']},
        ]
        self.lowerPipes = [
            {'x': SCREENWIDTH, 'y': newPipe1[1]['y']},
            {'x': SCREENWIDTH + (SCREENWIDTH / 2), 'y': newPipe2[1]['y']},
        ]

        # player velocity, max velocity, downward accleration, accleration on flap
        self.pipeVelX = -4
        self.playerVelY = 0  # player's velocity along Y, default same as playerFlapped
        self.playerMaxVelY = 10  # max vel along Y, max descend speed
        self.playerMinVelY = -8  # min vel along Y, max ascend speed
        self.playerAccY = 1  # players downward accleration
        self.playerFlapAcc = -9  # players speed on flapping
        self.playerFlapped = False  # True when player flaps
        self.top_score = 0

    def frame_step(self, input_actions):
        pygame.event.pump()

        reward = 0.1
        terminal = False

        if sum(input_actions) != 1:
            raise ValueError('Multiple input actions!')

        # input_actions[0] == 1: do nothing
        # input_actions[1] == 1: flap the bird
        if input_actions[1] == 1:
            if self.playery > -2 * PLAYER_HEIGHT:
                self.playerVelY = self.playerFlapAcc
                self.playerFlapped = True

        # check for score
        playerMidPos = self.playerx + PLAYER_WIDTH / 2
        for pipe in self.upperPipes:
            pipeMidPos = pipe['x'] + PIPE_WIDTH / 2
            if pipeMidPos <= playerMidPos < pipeMidPos + 4:
                self.top_score += 1
                self.score += 1
                reward = 1

        # playerIndex basex change
        if (self.loopIter + 1) % 3 == 0:
            self.playerIndex = next(PLAYER_INDEX_GEN)
        self.loopIter = (self.loopIter + 1) % 30
        self.basex = -((-self.basex + 100) % self.baseShift)

        # player's movement
        if self.playerVelY < self.playerMaxVelY and not self.playerFlapped:
            self.playerVelY += self.playerAccY
        if self.playerFlapped:
            self.playerFlapped = False
        self.playery += min(self.playerVelY, BASEY - self.playery - PLAYER_HEIGHT)
        if self.playery < 0:
            self.playery = 0

        # move pipes to left
        for uPipe, lPipe in zip(self.upperPipes, self.lowerPipes):
            uPipe['x'] += self.pipeVelX
            lPipe['x'] += self.pipeVelX

        # add new pipe when first pipe is about to touch left of screen
        if 0 < self.upperPipes[0]['x'] < 5:
            newPipe = getRandomPipe()
            self.upperPipes.append(newPipe[0])
            self.lowerPipes.append(newPipe[1])

        # remove first pipe if its out of the screen
        if self.upperPipes[0]['x'] < -PIPE_WIDTH:
            self.upperPipes.pop(0)
            self.lowerPipes.pop(0)

        # check if crash here
        isCrash = checkCrash({'x': self.playerx, 'y': self.playery,
                              'index': self.playerIndex},
                             self.upperPipes, self.lowerPipes)
        if isCrash:
            terminal = True
            self.__init__(self.show_game)
            reward = -1


        self.SCREEN.blit(IMAGES['background'], (0, 0))

        for uPipe, lPipe in zip(self.upperPipes, self.lowerPipes):
            self.SCREEN.blit(IMAGES['pipe'][0], (uPipe['x'], uPipe['y']))
            self.SCREEN.blit(IMAGES['pipe'][1], (lPipe['x'], lPipe['y']))

        self.SCREEN.blit(IMAGES['base'], (self.basex, BASEY))
        self.SCREEN.blit(IMAGES['player'][self.playerIndex],
                            (self.playerx, self.playery))

        image_data = pygame.surfarray.array3d(pygame.display.get_surface())

        if self.show_game:
            showScore(self.top_score)
            pygame.display.update()

        FPSCLOCK.tick(FPS)
        return image_data, reward, terminal


def getRandomPipe():
    """returns a randomly generated pipe"""
    # y of gap between upper and lower pipe
    gapYs = [20, 30, 40, 50, 60, 70, 80, 90]
    index = random.randint(0, len(gapYs) - 1)
    gapY = gapYs[index]

    gapY += int(BASEY * 0.2)
    pipeX = SCREENWIDTH + 10

    return [
        {'x': pipeX, 'y': gapY - PIPE_HEIGHT},  # upper pipe
        {'x': pipeX, 'y': gapY + PIPEGAPSIZE},  # lower pipe
    ]


def showScore(score):
    """displays score in center of screen"""
    scoreDigits = [int(x) for x in list(str(score))]
    totalWidth = 0  # total width of all numbers to be printed

    for digit in scoreDigits:
        totalWidth += IMAGES['numbers'][digit].get_width()

    Xoffset = (SCREENWIDTH - totalWidth) / 2

    for digit in scoreDigits:
        SCREEN.blit(IMAGES['numbers'][digit], (Xoffset, SCREENHEIGHT * 0.1))
        Xoffset += IMAGES['numbers'][digit].get_width()


def checkCrash(player, upperPipes, lowerPipes):
    """returns True if player collders with base or pipes."""
    pi = player['index']
    player['w'] = IMAGES['player'][0].get_width()
    player['h'] = IMAGES['player'][0].get_height()

    # if player crashes into ground
    if player['y'] + player['h'] >= BASEY - 1:
        return True
    else:

        playerRect = pygame.Rect(player['x'], player['y'],
                                 player['w'], player['h'])

        for uPipe, lPipe in zip(upperPipes, lowerPipes):
            # upper and lower pipe rects
            uPipeRect = pygame.Rect(uPipe['x'], uPipe['y'], PIPE_WIDTH, PIPE_HEIGHT)
            lPipeRect = pygame.Rect(lPipe['x'], lPipe['y'], PIPE_WIDTH, PIPE_HEIGHT)

            # player and upper/lower pipe hitmasks
            pHitMask = HITMASKS['player'][pi]
            uHitmask = HITMASKS['pipe'][0]
            lHitmask = HITMASKS['pipe'][1]

            # if bird collided with upipe or lpipe
            uCollide = pixelCollision(playerRect, uPipeRect, pHitMask, uHitmask)
            lCollide = pixelCollision(playerRect, lPipeRect, pHitMask, lHitmask)

            if uCollide or lCollide:
                return True

    return False


def pixelCollision(rect1, rect2, hitmask1, hitmask2):
    """Checks if two objects collide and not just their rects"""
    rect = rect1.clip(rect2)

    if rect.width == 0 or rect.height == 0:
        return False

    x1, y1 = rect.x - rect1.x, rect.y - rect1.y
    x2, y2 = rect.x - rect2.x, rect.y - rect2.y

    for x in range(rect.width):
        for y in range(rect.height):
            if hitmask1[x1 + x][y1 + y] and hitmask2[x2 + x][y2 + y]:
                return True
    return False

pygame 2.6.1 (SDL 2.28.4, Python 3.10.12)
Hello from the pygame community. https://www.pygame.org/contribute.html


In [None]:
class Hyperparameters:
    MODEL_NAME = "Flapflaptry26"
    LEARNING_RATE = 1e-5
    FRAME_SKIP = 1
    FRAME_SKIP_JUMP = 0
    SHOW_GAME = True
    NUMBER_OF_ACTIONS = 2
    GAMMA = 0.99
    INITIAL_EPSILON = 0.2 
    FINAL_EPSILON = 0.00001
    NUMBER_OF_ITERATIONS = 2000000
    REPLAY_MEMORY_SIZE = 50000
    MINIBATCH_SIZE = 32
    TARGET_UPDATE_FREQUENCY = 1000


class NeuralNetwork(nn.Module):

    def __init__(self):
        super(NeuralNetwork, self).__init__()

        self.conv1 = nn.Conv2d(4, 32, 8, 4)
        self.conv2 = nn.Conv2d(32, 64, 4, 2)
        self.conv3 = nn.Conv2d(64, 64, 3, 1)
        self.fc4 = nn.Linear(3136, 512)
        self.fc5 = nn.Linear(512, Hyperparameters.NUMBER_OF_ACTIONS)

    def forward(self, x):
        output = self.conv1(x)
        output = torch.nn.functional.relu(output)
        output = self.conv2(output)
        output = torch.nn.functional.relu(output)
        output = self.conv3(output)
        output = torch.nn.functional.relu(output)
        output = output.view(output.size()[0], -1)
        output = self.fc4(output)
        output = torch.nn.functional.relu(output)
        output = self.fc5(output)

        return output


def init_weights(m):
    if type(m) == nn.Conv2d or type(m) == nn.Linear:
        torch.nn.init.uniform_(m.weight, -0.01, 0.01)
        m.bias.data.fill_(0.01)

i = 0

def image_processing(image):
    image = image[:, 40:300]
    image_data = cv2.cvtColor(cv2.resize(image, (84, 84)), cv2.COLOR_BGR2GRAY)
    image_data[image_data > 0] = 255
    image_data = np.reshape(image_data, (84, 84, 1))
    image_tensor = image_data.transpose(2, 0, 1)
    image_tensor = image_tensor.astype(np.float32)
    image_tensor = torch.from_numpy(image_tensor)
    if torch.cuda.is_available():  # put on GPU if CUDA is available
        image_tensor = image_tensor.cuda()
    return image_tensor


def optimize_model(replay_memory, model, target_model, optimizer, loss_function):
    minibatch = random.sample(replay_memory, min(len(replay_memory), Hyperparameters.MINIBATCH_SIZE))

    state_batch = torch.cat(tuple(d[0] for d in minibatch))
    action_batch = torch.cat(tuple(d[1] for d in minibatch))
    reward_batch = torch.cat(tuple(d[2] for d in minibatch))
    new_state_batch = torch.cat(tuple(d[3] for d in minibatch))

    if torch.cuda.is_available():
        state_batch = state_batch.cuda()
        action_batch = action_batch.cuda()
        reward_batch = reward_batch.cuda()
        new_state_batch = new_state_batch.cuda()

    output_1_batch = target_model(new_state_batch)

    y_batch = torch.cat(tuple(reward_batch[i] if minibatch[i][4]
                              else reward_batch[i] + Hyperparameters.GAMMA * torch.max(output_1_batch[i])
                              for i in range(len(minibatch))))

    q_value = torch.sum(model(state_batch) * action_batch, dim=1)

    optimizer.zero_grad()

    y_batch = y_batch.detach()

    loss = loss_function(q_value, y_batch)

    loss.backward()
    optimizer.step()


def train():
    model = NeuralNetwork()
    target_model = NeuralNetwork()

    if torch.cuda.is_available():
        model = model.cuda()
        target_model = target_model.cuda()

    model.apply(init_weights)
    target_model.load_state_dict(model.state_dict())
    start = time.time()

    frames_to_skip = 0

    optimizer = optim.Adam(model.parameters(), lr=Hyperparameters.LEARNING_RATE)
    loss_function = nn.MSELoss()

    game_state = GameState(Hyperparameters.SHOW_GAME)
    replay_memory = []

    action = torch.zeros([Hyperparameters.NUMBER_OF_ACTIONS], dtype=torch.float32)
    action[0] = 1
    image_data, reward, finished = game_state.frame_step(action)
    image_data = image_processing(image_data)
    state = torch.cat((image_data, image_data, image_data, image_data)).unsqueeze(0)

    epsilon = Hyperparameters.INITIAL_EPSILON
    iteration = 0

    epsilon_decrements = np.linspace(Hyperparameters.INITIAL_EPSILON, Hyperparameters.FINAL_EPSILON, Hyperparameters.NUMBER_OF_ITERATIONS)
    total_reward_per_episode = 0
    rewards = []
    while iteration < Hyperparameters.NUMBER_OF_ITERATIONS:
        output = model(state)[0]

        action = torch.zeros([Hyperparameters.NUMBER_OF_ACTIONS], dtype=torch.float32)
        if torch.cuda.is_available():
            action = action.cuda()

        skipped_frame = False
        if frames_to_skip > 0:
            skipped_frame = True
            frames_to_skip -= 1
            action_index = [torch.tensor(0)][0]
        else:
            if random.random() <= epsilon:
                action_index = random.randint(0, Hyperparameters.NUMBER_OF_ACTIONS - 1)
            else:
                action_index = torch.argmax(output).item()

        action[action_index] = 1

        if action_index == 1:
            frames_to_skip += Hyperparameters.FRAME_SKIP_JUMP
        frames_to_skip += Hyperparameters.FRAME_SKIP if not skipped_frame else 0

        image_data_1, reward, finished = game_state.frame_step(action)
        image_data_1 = image_processing(image_data_1)
        new_state = torch.cat((state.squeeze(0)[1:, :, :], image_data_1)).unsqueeze(0)

        action = action.unsqueeze(0)
        reward = torch.from_numpy(np.array([reward], dtype=np.float32)).unsqueeze(0)

        reward_float = reward.numpy()[0][0]
        if reward_float == -1:
            rewards.append(total_reward_per_episode - 1)
            total_reward_per_episode = 0
        else:
            total_reward_per_episode += reward_float
        replay_memory.append((state.cpu(), action.cpu(), reward.cpu(), new_state.cpu(), finished))

        if len(replay_memory) > Hyperparameters.REPLAY_MEMORY_SIZE:
            replay_memory.pop(0)

        epsilon = epsilon_decrements[iteration]

        optimize_model(replay_memory, model, target_model, optimizer, loss_function)

        state = new_state
        iteration += 1

        if iteration % Hyperparameters.TARGET_UPDATE_FREQUENCY == 0:
            target_model.load_state_dict(model.state_dict())

        if iteration % 25000 == 0:
            torch.save(model, f"/content/drive/My Drive/{Hyperparameters.MODEL_NAME}_" + str(iteration) + ".pth")

        if iteration % 1000 == 0:
            print(f"Iteration: {iteration}, Time elapsed: {time.time() - start:.4f}, epsilon: {epsilon:.4f}, "
                  f"Avg Rewards: {np.mean(total_reward_per_episode):.4f}, Avg Q: {np.mean(output.cpu().detach().numpy()):.4f}")


def test(model):
    game_state = GameState(Hyperparameters.SHOW_GAME)

    action = torch.zeros([Hyperparameters.NUMBER_OF_ACTIONS], dtype=torch.float32)
    action[0] = 1
    image_data, reward, terminal = game_state.frame_step(action)
    image_data = image_processing(image_data)
    state = torch.cat((image_data, image_data, image_data, image_data)).unsqueeze(0)

    
    while True:
        output = model(state)[0]

        action = torch.zeros([Hyperparameters.NUMBER_OF_ACTIONS], dtype=torch.float32)
        if torch.cuda.is_available():
            action = action.cuda()

        action_index = torch.argmax(output)
        if torch.cuda.is_available():
            action_index = action_index.cuda()
        action[action_index] = 1

        image_data_1, reward, terminal = game_state.frame_step(action)
        image_data_1 = image_processing(image_data_1)
        new_state = torch.cat((state.squeeze(0)[1:, :, :], image_data_1)).unsqueeze(0)

        state = new_state


def main(model_path=None):
    cuda_is_available = torch.cuda.is_available()

    if model_path:
        model = torch.load(model_path,  map_location=torch.device('cpu'), weights_only=False).eval()

        if cuda_is_available:
            model = model.cuda()

        test(model)

    else:
        if not os.path.exists('models/'):
            os.mkdir('models/')

        train()


# main("./Flapflaptry26_675000.pth")
main()

Iteration: 1000, Time elapsed: 28.3878, epsilon: 0.1999, Avg Rewards: 3.3000, Avg Q: 0.1159
Iteration: 2000, Time elapsed: 56.5465, epsilon: 0.1998, Avg Rewards: 4.7000, Avg Q: 0.2412
Iteration: 3000, Time elapsed: 86.8865, epsilon: 0.1997, Avg Rewards: 2.3000, Avg Q: 0.3084
