In [1]:
!pip install pygame opencv-python numpy torch torchvision

[0m

In [2]:
RUNNING_ON_COLAB = False # Use True for colab. It will ask for a google drive permission where you have your assets and weights.
                        # Although the visual assets are not needed on colab since the screen is not shown, this was made for convenience when developing this project.
                        # Maybe in further updates this will be changed so no assets are needed on google drive.

EVALUATION_MODE = True # Use False for training the model, True for testing it

PROJECT_ROOT = "./" # If you downloaded the repo locally (and running it using Jupyter Lab for example) and your working directory is FlapFlapDQN, you can leave this variable as is.
                    # If you downloaded the repo locally (and running it using Jupyter Lab for example) and your working directory is NOT FlapFlapDQN, use an absolute path (or relative paths using ../ if you're that type of guy)
                    # If you are running on colab and the project is at the root of My Drive, you should add "./FlapFlapDQN/"
                    # If you are running on colab and the project is in a subfolder of My Drive, add the part after 'My Drive' as a relative path.
                    # Example: if you have your files in My Drive/github_projects/reinforcement_learning/FlapFlapDQN add "./github_projects/reinforcement_learning/FlapFlapDQN"

PRETRAINED_MODEL_WEIGHTS = "FlapFlap_725000.pth" # Change this to None if you want to train it from scratch
SAVE_MODEL_EVERY = 25000 # a new .pth file will be created every SAVE_MODEL_EVERY iterations

### Environment Code


In [3]:
import random
import pygame
import os
import random
import sys
import time
import cv2
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

from itertools import cycle

if RUNNING_ON_COLAB:
    from google.colab import drive
    drive.mount('/content/drive')
    PROJECT_ROOT = os.path.join('/content/drive/My Drive/', PROJECT_ROOT)

def load():
    # path of player with different states
    PLAYER_PATH = (
        os.path.join(PROJECT_ROOT, './assets/sprites/redbird-upflap.png'),
        os.path.join(PROJECT_ROOT, './assets/sprites/redbird-midflap.png'),
        os.path.join(PROJECT_ROOT, './assets/sprites/redbird-downflap.png')
    )

    # path of background
    BACKGROUND_PATH = os.path.join(PROJECT_ROOT, './assets/sprites/background-black.png')

    # path of pipe
    PIPE_PATH = os.path.join(PROJECT_ROOT, './assets/sprites/pipe-green.png')

    IMAGES, HITMASKS = {}, {}

    # numbers sprites for score display
    IMAGES['numbers'] = [
        pygame.image.load(os.path.join(PROJECT_ROOT, './assets/sprites/{}.png'.format(idx))).convert_alpha()
        for idx in range(10)
    ]

    # base (ground) sprite
    IMAGES['base'] = pygame.image.load(os.path.join(PROJECT_ROOT, './assets/sprites/base.png')).convert_alpha()

    # select random background sprites
    IMAGES['background'] = pygame.image.load(BACKGROUND_PATH).convert()

    # select random player sprites
    IMAGES['player'] = [
        pygame.image.load(PLAYER_PATH[idx]).convert_alpha()
        for idx in range(3)
    ]

    # select random pipe sprites
    IMAGES['pipe'] = (
        pygame.transform.rotate(
            pygame.image.load(PIPE_PATH).convert_alpha(), 180),
        pygame.image.load(PIPE_PATH).convert_alpha(),
    )

    # hismask for pipes
    HITMASKS['pipe'] = [
        getHitmask(IMAGES['pipe'][idx])
        for idx in range(2)
    ]

    # hitmask for player
    HITMASKS['player'] = [
        getHitmask(IMAGES['player'][idx])
        for idx in range(3)
    ]

    return IMAGES, HITMASKS


def getHitmask(image):
    """returns a hitmask using an image's alpha."""
    mask = []
    for x in range(image.get_width()):
        mask.append([])
        for y in range(image.get_height()):
            mask[x].append(bool(image.get_at((x, y))[3]))
    return mask


FPS = 240
SCREENWIDTH = 288
SCREENHEIGHT = 512

# pygame.init()
FPSCLOCK = pygame.time.Clock()
SCREEN = pygame.display.set_mode((SCREENWIDTH, SCREENHEIGHT))
pygame.display.set_caption('FlapFlapDQN')

IMAGES, HITMASKS = load()
PIPEGAPSIZE = 100  # gap between upper and lower part of pipe
BASEY = SCREENHEIGHT * 0.79

PLAYER_WIDTH = IMAGES['player'][0].get_width()
PLAYER_HEIGHT = IMAGES['player'][0].get_height()
PIPE_WIDTH = IMAGES['pipe'][0].get_width()
PIPE_HEIGHT = IMAGES['pipe'][0].get_height()
BACKGROUND_WIDTH = IMAGES['background'].get_width()

PLAYER_INDEX_GEN = cycle([0, 1, 2, 1])


class GameState:
    def __init__(self, show_game):
        self.show_game = show_game

        if self.show_game:
            pygame.init()
            self.SCREEN = pygame.display.set_mode((SCREENWIDTH, SCREENHEIGHT))
            pygame.display.set_caption('FlapFlapDQN')

        self.score = self.playerIndex = self.loopIter = 0
        self.playerx = int(SCREENWIDTH * 0.2)
        self.playery = int((SCREENHEIGHT - PLAYER_HEIGHT) / 2)
        self.basex = 0
        self.baseShift = IMAGES['base'].get_width() - BACKGROUND_WIDTH

        newPipe1 = getRandomPipe()
        newPipe2 = getRandomPipe()
        self.upperPipes = [
            {'x': SCREENWIDTH, 'y': newPipe1[0]['y']},
            {'x': SCREENWIDTH + (SCREENWIDTH / 2), 'y': newPipe2[0]['y']},
        ]
        self.lowerPipes = [
            {'x': SCREENWIDTH, 'y': newPipe1[1]['y']},
            {'x': SCREENWIDTH + (SCREENWIDTH / 2), 'y': newPipe2[1]['y']},
        ]

        # player velocity, max velocity, downward accleration, accleration on flap
        self.pipeVelX = -4
        self.playerVelY = 0  # player's velocity along Y, default same as playerFlapped
        self.playerMaxVelY = 10  # max vel along Y, max descend speed
        self.playerMinVelY = -8  # min vel along Y, max ascend speed
        self.playerAccY = 1  # players downward accleration
        self.playerFlapAcc = -9  # players speed on flapping
        self.playerFlapped = False  # True when player flaps
        self.top_score = 0

    def frame_step(self, input_actions):
        pygame.event.pump()

        reward = 0.1
        terminal = False

        if sum(input_actions) != 1:
            raise ValueError('Multiple input actions!')

        # input_actions[0] == 1: do nothing
        # input_actions[1] == 1: flap the bird
        if input_actions[1] == 1:
            if self.playery > -2 * PLAYER_HEIGHT:
                self.playerVelY = self.playerFlapAcc
                self.playerFlapped = True

        # check for score
        playerMidPos = self.playerx + PLAYER_WIDTH / 2
        for pipe in self.upperPipes:
            pipeMidPos = pipe['x'] + PIPE_WIDTH / 2
            if pipeMidPos <= playerMidPos < pipeMidPos + 4:
                self.top_score += 1
                self.score += 1
                reward = 1

        # playerIndex basex change
        if (self.loopIter + 1) % 3 == 0:
            self.playerIndex = next(PLAYER_INDEX_GEN)
        self.loopIter = (self.loopIter + 1) % 30
        self.basex = -((-self.basex + 100) % self.baseShift)

        # player's movement
        if self.playerVelY < self.playerMaxVelY and not self.playerFlapped:
            self.playerVelY += self.playerAccY
        if self.playerFlapped:
            self.playerFlapped = False
        self.playery += min(self.playerVelY, BASEY - self.playery - PLAYER_HEIGHT)
        if self.playery < 0:
            self.playery = 0

        # move pipes to left
        for uPipe, lPipe in zip(self.upperPipes, self.lowerPipes):
            uPipe['x'] += self.pipeVelX
            lPipe['x'] += self.pipeVelX

        # add new pipe when first pipe is about to touch left of screen
        if 0 < self.upperPipes[0]['x'] < 5:
            newPipe = getRandomPipe()
            self.upperPipes.append(newPipe[0])
            self.lowerPipes.append(newPipe[1])

        # remove first pipe if its out of the screen
        if self.upperPipes[0]['x'] < -PIPE_WIDTH:
            self.upperPipes.pop(0)
            self.lowerPipes.pop(0)

        # check if crash here
        isCrash = checkCrash({'x': self.playerx, 'y': self.playery,
                              'index': self.playerIndex},
                             self.upperPipes, self.lowerPipes)
        if isCrash:
            terminal = True
            self.__init__(self.show_game)
            reward = -1


        self.SCREEN.blit(IMAGES['background'], (0, 0))

        for uPipe, lPipe in zip(self.upperPipes, self.lowerPipes):
            self.SCREEN.blit(IMAGES['pipe'][0], (uPipe['x'], uPipe['y']))
            self.SCREEN.blit(IMAGES['pipe'][1], (lPipe['x'], lPipe['y']))

        self.SCREEN.blit(IMAGES['base'], (self.basex, BASEY))
        self.SCREEN.blit(IMAGES['player'][self.playerIndex],
                            (self.playerx, self.playery))

        image_data = pygame.surfarray.array3d(pygame.display.get_surface())

        if self.show_game:
            showScore(self.top_score)
            pygame.display.update()

        FPSCLOCK.tick(FPS)
        return image_data, reward, terminal


def getRandomPipe():
    """returns a randomly generated pipe"""
    # y of gap between upper and lower pipe
    gapYs = [20, 30, 40, 50, 60, 70, 80, 90]
    index = random.randint(0, len(gapYs) - 1)
    gapY = gapYs[index]

    gapY += int(BASEY * 0.2)
    pipeX = SCREENWIDTH + 10

    return [
        {'x': pipeX, 'y': gapY - PIPE_HEIGHT},  # upper pipe
        {'x': pipeX, 'y': gapY + PIPEGAPSIZE},  # lower pipe
    ]


def showScore(score):
    """displays score in center of screen"""
    scoreDigits = [int(x) for x in list(str(score))]
    totalWidth = 0  # total width of all numbers to be printed

    for digit in scoreDigits:
        totalWidth += IMAGES['numbers'][digit].get_width()

    Xoffset = (SCREENWIDTH - totalWidth) / 2

    for digit in scoreDigits:
        SCREEN.blit(IMAGES['numbers'][digit], (Xoffset, SCREENHEIGHT * 0.1))
        Xoffset += IMAGES['numbers'][digit].get_width()


def checkCrash(player, upperPipes, lowerPipes):
    """returns True if player collders with base or pipes."""
    pi = player['index']
    player['w'] = IMAGES['player'][0].get_width()
    player['h'] = IMAGES['player'][0].get_height()

    # if player crashes into ground
    if player['y'] + player['h'] >= BASEY - 1:
        return True
    else:

        playerRect = pygame.Rect(player['x'], player['y'],
                                 player['w'], player['h'])

        for uPipe, lPipe in zip(upperPipes, lowerPipes):
            # upper and lower pipe rects
            uPipeRect = pygame.Rect(uPipe['x'], uPipe['y'], PIPE_WIDTH, PIPE_HEIGHT)
            lPipeRect = pygame.Rect(lPipe['x'], lPipe['y'], PIPE_WIDTH, PIPE_HEIGHT)

            # player and upper/lower pipe hitmasks
            pHitMask = HITMASKS['player'][pi]
            uHitmask = HITMASKS['pipe'][0]
            lHitmask = HITMASKS['pipe'][1]

            # if bird collided with upipe or lpipe
            uCollide = pixelCollision(playerRect, uPipeRect, pHitMask, uHitmask)
            lCollide = pixelCollision(playerRect, lPipeRect, pHitMask, lHitmask)

            if uCollide or lCollide:
                return True

    return False


def pixelCollision(rect1, rect2, hitmask1, hitmask2):
    """Checks if two objects collide and not just their rects"""
    rect = rect1.clip(rect2)

    if rect.width == 0 or rect.height == 0:
        return False

    x1, y1 = rect.x - rect1.x, rect.y - rect1.y
    x2, y2 = rect.x - rect2.x, rect.y - rect2.y

    for x in range(rect.width):
        for y in range(rect.height):
            if hitmask1[x1 + x][y1 + y] and hitmask2[x2 + x][y2 + y]:
                return True
    return False

pygame 2.6.1 (SDL 2.28.4, Python 3.10.12)
Hello from the pygame community. https://www.pygame.org/contribute.html


### DQN

In [None]:
if RUNNING_ON_COLAB:
    os.environ["SDL_VIDEODRIVER"] = "dummy"


class Hyperparameters:
    MODEL_NAME = "FlapFlap"
    LEARNING_RATE = 1e-5
    FRAME_SKIP = 1
    FRAME_SKIP_JUMP = 0
    SHOW_GAME = True
    NUMBER_OF_ACTIONS = 2
    GAMMA = 0.99
    INITIAL_EPSILON = 0.2
    FINAL_EPSILON = 0.00001
    NUMBER_OF_ITERATIONS = 2_000_000
    REPLAY_MEMORY_SIZE = 50_000 # Decrease this if your machine crashes due to lack of RAM. In this configuration, google colab uses ~20 GB of RAM
    MINIBATCH_SIZE = 32
    TARGET_UPDATE_FREQUENCY = 1000
    CUTOFF = 1_000_000 # The iteration when the epsilon will reach FINAL_EPSILON.
                       # At this point the network can continue to train forever with a fixed epsilon FINAL_EPSILON


class NeuralNetwork(nn.Module):

    def __init__(self):
        super(NeuralNetwork, self).__init__()

        self.conv1 = nn.Conv2d(4, 32, 8, 4)
        self.conv2 = nn.Conv2d(32, 64, 4, 2)
        self.conv3 = nn.Conv2d(64, 64, 3, 1)
        self.fc4 = nn.Linear(3136, 512)
        self.fc5 = nn.Linear(512, Hyperparameters.NUMBER_OF_ACTIONS)

    def forward(self, x):
        output = self.conv1(x)
        output = torch.nn.functional.relu(output)
        output = self.conv2(output)
        output = torch.nn.functional.relu(output)
        output = self.conv3(output)
        output = torch.nn.functional.relu(output)
        output = output.view(output.size()[0], -1)
        output = self.fc4(output)
        output = torch.nn.functional.relu(output)
        output = self.fc5(output)

        return output


def init_weights(m):
    if type(m) == nn.Conv2d or type(m) == nn.Linear:
        torch.nn.init.uniform_(m.weight, -0.01, 0.01)
        m.bias.data.fill_(0.01)

i = 0

def image_processing(image):
    image = image[:, 40:300]
    image_data = cv2.cvtColor(cv2.resize(image, (84, 84)), cv2.COLOR_BGR2GRAY)
    image_data[image_data > 0] = 255
    image_data = np.reshape(image_data, (84, 84, 1))
    image_tensor = image_data.transpose(2, 0, 1)
    image_tensor = image_tensor.astype(np.float32)
    image_tensor = torch.from_numpy(image_tensor)
    if torch.cuda.is_available():  # put on GPU if CUDA is available
        image_tensor = image_tensor.cuda()
    return image_tensor


def optimize_model(replay_memory, model, target_model, optimizer, loss_function):
    minibatch = random.sample(replay_memory, min(len(replay_memory), Hyperparameters.MINIBATCH_SIZE))

    state_batch = torch.cat(tuple(d[0] for d in minibatch))
    action_batch = torch.cat(tuple(d[1] for d in minibatch))
    reward_batch = torch.cat(tuple(d[2] for d in minibatch))
    new_state_batch = torch.cat(tuple(d[3] for d in minibatch))

    if torch.cuda.is_available():
        state_batch = state_batch.cuda()
        action_batch = action_batch.cuda()
        reward_batch = reward_batch.cuda()
        new_state_batch = new_state_batch.cuda()

    output_1_batch = target_model(new_state_batch)

    y_batch = torch.cat(tuple(reward_batch[i] if minibatch[i][4]
                              else reward_batch[i] + Hyperparameters.GAMMA * torch.max(output_1_batch[i])
                              for i in range(len(minibatch))))

    q_value = torch.sum(model(state_batch) * action_batch, dim=1)

    optimizer.zero_grad()

    y_batch = y_batch.detach()

    loss = loss_function(q_value, y_batch)

    loss.backward()
    optimizer.step()


def train(model=None, iteration=0):
    if not model:
      model = NeuralNetwork()
      model.apply(init_weights)

    target_model = NeuralNetwork()

    if torch.cuda.is_available():
        model = model.cuda()
        target_model = target_model.cuda()

    target_model.load_state_dict(model.state_dict())
    start = time.time()

    frames_to_skip = 0

    optimizer = optim.Adam(model.parameters(), lr=Hyperparameters.LEARNING_RATE)
    loss_function = nn.MSELoss()

    game_state = GameState(Hyperparameters.SHOW_GAME)
    replay_memory = []

    action = torch.zeros([Hyperparameters.NUMBER_OF_ACTIONS], dtype=torch.float32)
    action[0] = 1
    image_data, reward, finished = game_state.frame_step(action)
    image_data = image_processing(image_data)
    state = torch.cat((image_data, image_data, image_data, image_data)).unsqueeze(0)

    epsilon = Hyperparameters.INITIAL_EPSILON

    epsilon_decrements = np.concatenate((
        np.linspace(Hyperparameters.INITIAL_EPSILON, Hyperparameters.FINAL_EPSILON, min(Hyperparameters.NUMBER_OF_ITERATIONS, Hyperparameters.CUTOFF)),
        np.full(max(0, Hyperparameters.NUMBER_OF_ITERATIONS - Hyperparameters.CUTOFF), Hyperparameters.FINAL_EPSILON)
    ))

    # Metrics for tracking performance
    total_reward_per_episode = 0
    rewards = []
    avg_q_values = []  # To track average Q-value per episode
    min_q_values = []  # To track minimum Q-value per episode
    max_q_values = []  # To track maximum Q-value per episode
    episode_q_values = []  # Temporary list to store Q-values for current episode

    while iteration < Hyperparameters.NUMBER_OF_ITERATIONS:
        output = model(state)[0]

        action = torch.zeros([Hyperparameters.NUMBER_OF_ACTIONS], dtype=torch.float32)
        if torch.cuda.is_available():
            action = action.cuda()

        skipped_frame = False
        if frames_to_skip > 0:
            skipped_frame = True
            frames_to_skip -= 1
            action_index = [torch.tensor(0)][0]
        else:
            if random.random() <= epsilon:
                action_index = random.randint(0, Hyperparameters.NUMBER_OF_ACTIONS - 1)
            else:
                action_index = torch.argmax(output).item()

        action[action_index] = 1

        if action_index == 1:
            frames_to_skip += Hyperparameters.FRAME_SKIP_JUMP
        frames_to_skip += Hyperparameters.FRAME_SKIP if not skipped_frame else 0

        image_data_1, reward, finished = game_state.frame_step(action)
        image_data_1 = image_processing(image_data_1)
        new_state = torch.cat((state.squeeze(0)[1:, :, :], image_data_1)).unsqueeze(0)

        action = action.unsqueeze(0)
        reward = torch.from_numpy(np.array([reward], dtype=np.float32)).unsqueeze(0)

        reward_float = reward.numpy()[0][0]
        if reward_float == -1:
            # End of episode: compute Q-value metrics
            avg_q_values.append(np.mean(episode_q_values))
            min_q_values.append(np.min(episode_q_values))
            max_q_values.append(np.max(episode_q_values))
            episode_q_values = []  # Reset for the next episode

            # Track total rewards per episode
            rewards.append(total_reward_per_episode - 1)
            total_reward_per_episode = 0
        else:
            total_reward_per_episode += reward_float

        # Append the Q-value of the chosen action to the episode Q-values
        episode_q_values.append(output[action_index].item())

        # Add transition to replay memory
        replay_memory.append((state.cpu(), action.cpu(), reward.cpu(), new_state.cpu(), finished))
        if len(replay_memory) > Hyperparameters.REPLAY_MEMORY_SIZE:
            replay_memory.pop(0)

        epsilon = epsilon_decrements[iteration]

        optimize_model(replay_memory, model, target_model, optimizer, loss_function)

        state = new_state
        iteration += 1

        # Update target network periodically
        if iteration % Hyperparameters.TARGET_UPDATE_FREQUENCY == 0:
            target_model.load_state_dict(model.state_dict())

        # Save the model periodically
        if iteration % SAVE_MODEL_EVERY == 0:
            torch.save(model, os.path.join(PROJECT_ROOT, "./models/", f"./{Hyperparameters.MODEL_NAME}_" + str(iteration) + ".pth"))

        # Print metrics every 1000 iterations
        if iteration % 1000 == 0:
            avg_reward = np.mean(rewards) if rewards else 0
            avg_q = np.mean(avg_q_values) if avg_q_values else 0
            min_q = np.min(min_q_values) if min_q_values else 0
            max_q = np.max(max_q_values) if max_q_values else 0
            print(f"Iteration: {iteration}, Time elapsed: {time.time() - start:.4f}s, epsilon: {epsilon:.4f}, "
                  f"Avg Rewards: {avg_reward:.4f}, Avg Q: {avg_q:.4f}, Min Q: {min_q:.4f}, Max Q: {max_q:.4f}")

def test(model):
    game_state = GameState(Hyperparameters.SHOW_GAME)

    action = torch.zeros([Hyperparameters.NUMBER_OF_ACTIONS], dtype=torch.float32)
    action[0] = 1
    image_data, reward, terminal = game_state.frame_step(action)
    image_data = image_processing(image_data)
    state = torch.cat((image_data, image_data, image_data, image_data)).unsqueeze(0)

    total_reward = 0
    points = 0
    while True:
        output = model(state)[0]

        action = torch.zeros([Hyperparameters.NUMBER_OF_ACTIONS], dtype=torch.float32)
        if torch.cuda.is_available():
            action = action.cuda()

        action_index = torch.argmax(output)
        if torch.cuda.is_available():
            action_index = action_index.cuda()
        action[action_index] = 1

        image_data_1, reward, terminal = game_state.frame_step(action)
        if reward == -1:
          print(f"\n ------- Finished -------\nPoints: {points} Total reward: {total_reward}\n------------------------")
          total_reward = points = 0
        elif reward == 1:
          points += 1
          if points % 10 == 0:
              print(f"Current points: {points}")
        else:
          total_reward += reward

        image_data_1 = image_processing(image_data_1)
        new_state = torch.cat((state.squeeze(0)[1:, :, :], image_data_1)).unsqueeze(0)

        state = new_state


def main(model_path=None, continue_training=False):
    cuda_is_available = torch.cuda.is_available()

    if model_path:
        model = torch.load(model_path,  map_location=torch.device('cpu'), weights_only=False).eval()

        if cuda_is_available:
            model = model.cuda()

        if continue_training:
          last_saved_iteration = int(model_path.split("_")[-1].split(".")[0])
          print(f"Continue training starting with weights {PRETRAINED_MODEL_WEIGHTS} and iteration {last_saved_iteration}")
          train(model, iteration=last_saved_iteration)
        else:
          print(f"Testing model {PRETRAINED_MODEL_WEIGHTS}")
          test(model)

    else:
        print(f"Starting to train {Hyperparameters.MODEL_NAME} from scratch. If this is not intended, check your root project path and the .pth file you defined at the start of the script")
        models_folder = os.path.join(PROJECT_ROOT, './models/')
        if not os.path.exists(models_folder):
            os.mkdir(models_folder)
        train()

pretrained_model = None if not PRETRAINED_MODEL_WEIGHTS else os.path.join(PROJECT_ROOT, './models/', PRETRAINED_MODEL_WEIGHTS)
main(pretrained_model, continue_training=not EVALUATION_MODE)

Testing model FlapFlap_725000.pth
Current points: 10
Current points: 20
Current points: 30
Current points: 40
Current points: 50
Current points: 60
Current points: 70
Current points: 80
Current points: 90
Current points: 100
Current points: 110

 ------- Finished -------
Points: 113 Total reward: 411.40000000002505
------------------------
Current points: 10
Current points: 20
Current points: 30
Current points: 40
Current points: 50
Current points: 60
Current points: 70
Current points: 80
Current points: 90
Current points: 100
Current points: 110
Current points: 120
Current points: 130

 ------- Finished -------
Points: 131 Total reward: 476.3000000000398
------------------------
Current points: 10
Current points: 20
Current points: 30
Current points: 40
Current points: 50
Current points: 60
Current points: 70
Current points: 80
Current points: 90
Current points: 100
Current points: 110
Current points: 120
Current points: 130
Current points: 140
Current points: 150
Current points: 160
