In [3]:
# FLAPPY BIRD GRAPHICS FILES FROM https://github.com/yenchenlin/DeepLearningFlappyBird

import pygame
import sys
def load():
    # path of player with different states
    PLAYER_PATH = (
            'assets/sprites/redbird-upflap.png',
            'assets/sprites/redbird-midflap.png',
            'assets/sprites/redbird-downflap.png'
    )

    # path of background
    BACKGROUND_PATH = 'assets/sprites/background-black.png'

    # path of pipe
    PIPE_PATH = 'assets/sprites/pipe-green.png'

    IMAGES, SOUNDS, HITMASKS = {}, {}, {}

    # numbers sprites for score display
    IMAGES['numbers'] = (
        pygame.image.load('assets/sprites/0.png').convert_alpha(),
        pygame.image.load('assets/sprites/1.png').convert_alpha(),
        pygame.image.load('assets/sprites/2.png').convert_alpha(),
        pygame.image.load('assets/sprites/3.png').convert_alpha(),
        pygame.image.load('assets/sprites/4.png').convert_alpha(),
        pygame.image.load('assets/sprites/5.png').convert_alpha(),
        pygame.image.load('assets/sprites/6.png').convert_alpha(),
        pygame.image.load('assets/sprites/7.png').convert_alpha(),
        pygame.image.load('assets/sprites/8.png').convert_alpha(),
        pygame.image.load('assets/sprites/9.png').convert_alpha()
    )

    # base (ground) sprite
    IMAGES['base'] = pygame.image.load('assets/sprites/base.png').convert_alpha()

    # sounds
    if 'win' in sys.platform:
        soundExt = '.wav'
    else:
        soundExt = '.ogg'

    SOUNDS['die']    = pygame.mixer.Sound('assets/audio/die' + soundExt)
    SOUNDS['hit']    = pygame.mixer.Sound('assets/audio/hit' + soundExt)
    SOUNDS['point']  = pygame.mixer.Sound('assets/audio/point' + soundExt)
    SOUNDS['swoosh'] = pygame.mixer.Sound('assets/audio/swoosh' + soundExt)
    SOUNDS['wing']   = pygame.mixer.Sound('assets/audio/wing' + soundExt)

    # select random background sprites
    IMAGES['background'] = pygame.image.load(BACKGROUND_PATH).convert()

    # select random player sprites
    IMAGES['player'] = (
        pygame.image.load(PLAYER_PATH[0]).convert_alpha(),
        pygame.image.load(PLAYER_PATH[1]).convert_alpha(),
        pygame.image.load(PLAYER_PATH[2]).convert_alpha(),
    )

    # select random pipe sprites
    IMAGES['pipe'] = (
        pygame.transform.rotate(
            pygame.image.load(PIPE_PATH).convert_alpha(), 180),
        pygame.image.load(PIPE_PATH).convert_alpha(),
    )

    # hismask for pipes
    HITMASKS['pipe'] = (
        getHitmask(IMAGES['pipe'][0]),
        getHitmask(IMAGES['pipe'][1]),
    )

    # hitmask for player
    HITMASKS['player'] = (
        getHitmask(IMAGES['player'][0]),
        getHitmask(IMAGES['player'][1]),
        getHitmask(IMAGES['player'][2]),
    )

    return IMAGES, SOUNDS, HITMASKS

def getHitmask(image):
    """returns a hitmask using an image's alpha."""
    mask = []
    for x in range(image.get_width()):
        mask.append([])
        for y in range(image.get_height()):
            mask[x].append(bool(image.get_at((x,y))[3]))
    return mask


In [4]:
# FLAPPY BIRD CODE FROM https://github.com/yenchenlin/DeepLearningFlappyBird

import numpy as np
import sys
import random
import pygame
import pygame.surfarray as surfarray
from pygame.locals import *
from itertools import cycle

FPS = 30
SCREENWIDTH  = 288
SCREENHEIGHT = 512

pygame.init()
FPSCLOCK = pygame.time.Clock()
SCREEN = pygame.display.set_mode((SCREENWIDTH, SCREENHEIGHT))
pygame.display.set_caption('Flappy Bird')

IMAGES, SOUNDS, HITMASKS = load()
PIPEGAPSIZE = 100 # gap between upper and lower part of pipe
BASEY = SCREENHEIGHT * 0.79

PLAYER_WIDTH = IMAGES['player'][0].get_width()
PLAYER_HEIGHT = IMAGES['player'][0].get_height()
PIPE_WIDTH = IMAGES['pipe'][0].get_width()
PIPE_HEIGHT = IMAGES['pipe'][0].get_height()
BACKGROUND_WIDTH = IMAGES['background'].get_width()

PLAYER_INDEX_GEN = cycle([0, 1, 2, 1])


class GameState:
    def __init__(self):
        self.score = self.playerIndex = self.loopIter = 0
        self.playerx = int(SCREENWIDTH * 0.2)
        self.playery = int((SCREENHEIGHT - PLAYER_HEIGHT) / 2)
        self.basex = 0
        self.baseShift = IMAGES['base'].get_width() - BACKGROUND_WIDTH

        newPipe1 = getRandomPipe()
        newPipe2 = getRandomPipe()
        self.upperPipes = [
            {'x': SCREENWIDTH, 'y': newPipe1[0]['y']},
            {'x': SCREENWIDTH + (SCREENWIDTH / 2), 'y': newPipe2[0]['y']},
        ]
        self.lowerPipes = [
            {'x': SCREENWIDTH, 'y': newPipe1[1]['y']},
            {'x': SCREENWIDTH + (SCREENWIDTH / 2), 'y': newPipe2[1]['y']},
        ]

        # player velocity, max velocity, downward accleration, accleration on flap
        self.pipeVelX = -4
        self.playerVelY    =  0    # player's velocity along Y, default same as playerFlapped
        self.playerMaxVelY =  10   # max vel along Y, max descend speed
        self.playerMinVelY =  -8   # min vel along Y, max ascend speed
        self.playerAccY    =   1   # players downward accleration
        self.playerFlapAcc =  -9   # players speed on flapping
        self.playerFlapped = False # True when player flaps

    def frame_step(self, flap):
        pygame.event.pump()

        reward = 0.1
        terminal = False

        if flap:
            if self.playery > -2 * PLAYER_HEIGHT:
                self.playerVelY = self.playerFlapAcc
                self.playerFlapped = True
                #SOUNDS['wing'].play()

        # check for score
        playerMidPos = self.playerx + PLAYER_WIDTH / 2
        for pipe in self.upperPipes:
            pipeMidPos = pipe['x'] + PIPE_WIDTH / 2
            if pipeMidPos <= playerMidPos < pipeMidPos + 4:
                self.score += 1
                #SOUNDS['point'].play()
                reward = 1

        # playerIndex basex change
        if (self.loopIter + 1) % 3 == 0:
            self.playerIndex = next(PLAYER_INDEX_GEN)
        self.loopIter = (self.loopIter + 1) % 30
        self.basex = -((-self.basex + 100) % self.baseShift)

        # player's movement
        if self.playerVelY < self.playerMaxVelY and not self.playerFlapped:
            self.playerVelY += self.playerAccY
        if self.playerFlapped:
            self.playerFlapped = False
        self.playery += min(self.playerVelY, BASEY - self.playery - PLAYER_HEIGHT)
        if self.playery < 0:
            self.playery = 0

        # move pipes to left
        for uPipe, lPipe in zip(self.upperPipes, self.lowerPipes):
            uPipe['x'] += self.pipeVelX
            lPipe['x'] += self.pipeVelX

        # add new pipe when first pipe is about to touch left of screen
        if 0 < self.upperPipes[0]['x'] < 5:
            newPipe = getRandomPipe()
            self.upperPipes.append(newPipe[0])
            self.lowerPipes.append(newPipe[1])

        # remove first pipe if its out of the screen
        if self.upperPipes[0]['x'] < -PIPE_WIDTH:
            self.upperPipes.pop(0)
            self.lowerPipes.pop(0)

        # check if crash here
        isCrash= checkCrash({'x': self.playerx, 'y': self.playery,
                             'index': self.playerIndex},
                            self.upperPipes, self.lowerPipes)
        if isCrash:
            #SOUNDS['hit'].play()
            #SOUNDS['die'].play()
            terminal = True
            self.__init__()
            reward = -1

        # draw sprites
        SCREEN.blit(IMAGES['background'], (0,0))

        for uPipe, lPipe in zip(self.upperPipes, self.lowerPipes):
            SCREEN.blit(IMAGES['pipe'][0], (uPipe['x'], uPipe['y']))
            SCREEN.blit(IMAGES['pipe'][1], (lPipe['x'], lPipe['y']))

        SCREEN.blit(IMAGES['base'], (self.basex, BASEY))
        # print score so player overlaps the score
        # showScore(self.score)
        SCREEN.blit(IMAGES['player'][self.playerIndex],
                    (self.playerx, self.playery))

        image_data = pygame.surfarray.array3d(pygame.display.get_surface())
        pygame.display.update()
        FPSCLOCK.tick(FPS)
        #print self.upperPipes[0]['y'] + PIPE_HEIGHT - int(BASEY * 0.2)
        return image_data, reward, terminal

def getRandomPipe():
    """returns a randomly generated pipe"""
    # y of gap between upper and lower pipe
    gapYs = [20, 30, 40, 50, 60, 70, 80, 90]
    index = random.randint(0, len(gapYs)-1)
    gapY = gapYs[index]

    gapY += int(BASEY * 0.2)
    pipeX = SCREENWIDTH + 10

    return [
        {'x': pipeX, 'y': gapY - PIPE_HEIGHT},  # upper pipe
        {'x': pipeX, 'y': gapY + PIPEGAPSIZE},  # lower pipe
    ]


def showScore(score):
    """displays score in center of screen"""
    scoreDigits = [int(x) for x in list(str(score))]
    totalWidth = 0 # total width of all numbers to be printed

    for digit in scoreDigits:
        totalWidth += IMAGES['numbers'][digit].get_width()

    Xoffset = (SCREENWIDTH - totalWidth) / 2

    for digit in scoreDigits:
        SCREEN.blit(IMAGES['numbers'][digit], (Xoffset, SCREENHEIGHT * 0.1))
        Xoffset += IMAGES['numbers'][digit].get_width()


def checkCrash(player, upperPipes, lowerPipes):
    """returns True if player collders with base or pipes."""
    pi = player['index']
    player['w'] = IMAGES['player'][0].get_width()
    player['h'] = IMAGES['player'][0].get_height()

    # if player crashes into ground
    if player['y'] + player['h'] >= BASEY - 1:
        return True
    else:

        playerRect = pygame.Rect(player['x'], player['y'],
                      player['w'], player['h'])

        for uPipe, lPipe in zip(upperPipes, lowerPipes):
            # upper and lower pipe rects
            uPipeRect = pygame.Rect(uPipe['x'], uPipe['y'], PIPE_WIDTH, PIPE_HEIGHT)
            lPipeRect = pygame.Rect(lPipe['x'], lPipe['y'], PIPE_WIDTH, PIPE_HEIGHT)

            # player and upper/lower pipe hitmasks
            pHitMask = HITMASKS['player'][pi]
            uHitmask = HITMASKS['pipe'][0]
            lHitmask = HITMASKS['pipe'][1]

            # if bird collided with upipe or lpipe
            uCollide = pixelCollision(playerRect, uPipeRect, pHitMask, uHitmask)
            lCollide = pixelCollision(playerRect, lPipeRect, pHitMask, lHitmask)

            if uCollide or lCollide:
                return True

    return False

def pixelCollision(rect1, rect2, hitmask1, hitmask2):
    """Checks if two objects collide and not just their rects"""
    rect = rect1.clip(rect2)

    if rect.width == 0 or rect.height == 0:
        return False

    x1, y1 = rect.x - rect1.x, rect.y - rect1.y
    x2, y2 = rect.x - rect2.x, rect.y - rect2.y

    for x in range(rect.width):
        for y in range(rect.height):
            if hitmask1[x1+x][y1+y] and hitmask2[x2+x][y2+y]:
                return True
    return False

In [5]:
#flappy no graphics for DQL

import numpy as np
import sys
import random
import pygame
import game.flappy_bird_utils as flappy_bird_utils
import pygame.surfarray as surfarray
from pygame.locals import *
from itertools import cycle

FPS = 30
SCREENWIDTH  = 288
SCREENHEIGHT = 512

pygame.init()
FPSCLOCK = pygame.time.Clock()
SCREEN = pygame.display.set_mode((SCREENWIDTH, SCREENHEIGHT))
pygame.display.set_caption('Flappy Bird')

IMAGES, SOUNDS, HITMASKS = flappy_bird_utils.load()
PIPEGAPSIZE = 100 # gap between upper and lower part of pipe
BASEY = SCREENHEIGHT * 0.79

PLAYER_WIDTH = IMAGES['player'][0].get_width()
PLAYER_HEIGHT = IMAGES['player'][0].get_height()
PIPE_WIDTH = IMAGES['pipe'][0].get_width()
PIPE_HEIGHT = IMAGES['pipe'][0].get_height()
BACKGROUND_WIDTH = IMAGES['background'].get_width()

PLAYER_INDEX_GEN = cycle([0, 1, 2, 1])


class GameNoGraphics:
    def __init__(self):
        self.score = self.playerIndex = self.loopIter = 0
        self.playerx = int(SCREENWIDTH * 0.2)
        self.playery = int((SCREENHEIGHT - PLAYER_HEIGHT) / 2)
        self.basex = 0
        self.baseShift = IMAGES['base'].get_width() - BACKGROUND_WIDTH

        newPipe1 = getRandomPipe()
        newPipe2 = getRandomPipe()
        self.upperPipes = [
            {'x': SCREENWIDTH, 'y': newPipe1[0]['y']},
            {'x': SCREENWIDTH + (SCREENWIDTH / 2), 'y': newPipe2[0]['y']},
        ]
        self.lowerPipes = [
            {'x': SCREENWIDTH, 'y': newPipe1[1]['y']},
            {'x': SCREENWIDTH + (SCREENWIDTH / 2), 'y': newPipe2[1]['y']},
        ]

        # player velocity, max velocity, downward accleration, accleration on flap
        self.pipeVelX = -4
        self.playerVelY    =  0    # player's velocity along Y, default same as playerFlapped
        self.playerMaxVelY =  10   # max vel along Y, max descend speed
        self.playerMinVelY =  -8   # min vel along Y, max ascend speed
        self.playerAccY    =   1   # players downward accleration
        self.playerFlapAcc =  -9   # players speed on flapping
        self.playerFlapped = False # True when player flaps

    def get_next_pipe_index(self):
        playerMidPos = self.playerx + PLAYER_WIDTH / 2
        distance = [0, 0]
        idx = 0
        for pipe in self.upperPipes:
            if (idx == 2):
                break
            pipeMidPos = pipe['x'] + PIPE_WIDTH / 2
            if pipeMidPos >= playerMidPos:
                distance[idx] = pipeMidPos - playerMidPos
            else:
                distance[idx] = 999
            idx += 1
        return distance.index(min(distance))


    def frame_step(self, flap):
        pygame.event.pump()

        reward = 0.4
        terminal = False

        if flap:
            if self.playery > -2 * PLAYER_HEIGHT:
                self.playerVelY = self.playerFlapAcc
                self.playerFlapped = True
                #SOUNDS['wing'].play()

        next_pipe_idx = self.get_next_pipe_index()
        playerMidPos = self.playery + PLAYER_HEIGHT/2
        pipeUpperMidPos = self.upperPipes[next_pipe_idx]['y']
        pipeLowerMidPos = self.lowerPipes[next_pipe_idx]['y']
        # if playerMidPos < pipeUpperMidPos + 1 and playerMidPos > pipeLowerMidPos - 1:
        #     reward = 0.2
        # if playerMidPos > pipeLowerMidPos - 100 and playerMidPos < pipeLowerMidPos:
        #     # print("within")
        #     reward = 0.2
        if playerMidPos <= 0.2*SCREENHEIGHT:
            # print("too high")
            reward = -0.2
        if playerMidPos <= 0.1*SCREENHEIGHT:
            # print("too high")
            reward = -0.4


        # check for score
        playerMidPos = self.playerx + PLAYER_WIDTH / 2
        for pipe in self.upperPipes:
            pipeMidPos = pipe['x'] + PIPE_WIDTH / 2
            if pipeMidPos <= playerMidPos < pipeMidPos + 4:
                self.score += 1
                #SOUNDS['point'].play()
                reward = 10

        # playerIndex basex change
        if (self.loopIter + 1) % 3 == 0:
            self.playerIndex = next(PLAYER_INDEX_GEN)
        self.loopIter = (self.loopIter + 1) % 30
        self.basex = -((-self.basex + 100) % self.baseShift)

        # player's movement
        if self.playerVelY < self.playerMaxVelY and not self.playerFlapped:
            self.playerVelY += self.playerAccY
        if self.playerFlapped:
            self.playerFlapped = False
        self.playery += min(self.playerVelY, BASEY - self.playery - PLAYER_HEIGHT)
        if self.playery < 0:
            self.playery = 0

        # move pipes to left
        for uPipe, lPipe in zip(self.upperPipes, self.lowerPipes):
            uPipe['x'] += self.pipeVelX
            lPipe['x'] += self.pipeVelX

        # add new pipe when first pipe is about to touch left of screen
        if 0 < self.upperPipes[0]['x'] < 5:
            newPipe = getRandomPipe()
            self.upperPipes.append(newPipe[0])
            self.lowerPipes.append(newPipe[1])

        # remove first pipe if its out of the screen
        if self.upperPipes[0]['x'] < -PIPE_WIDTH:
            self.upperPipes.pop(0)
            self.lowerPipes.pop(0)

        # check if crash here
        isCrash= checkCrash({'x': self.playerx, 'y': self.playery,
                             'index': self.playerIndex},
                            self.upperPipes, self.lowerPipes)
        if isCrash:
            #SOUNDS['hit'].play()
            #SOUNDS['die'].play()
            terminal = True
            self.__init__()
            reward = -5

        # # draw sprites
        # SCREEN.blit(IMAGES['background'], (0,0))

        # for uPipe, lPipe in zip(self.upperPipes, self.lowerPipes):
        #     SCREEN.blit(IMAGES['pipe'][0], (uPipe['x'], uPipe['y']))
        #     SCREEN.blit(IMAGES['pipe'][1], (lPipe['x'], lPipe['y']))

        # SCREEN.blit(IMAGES['base'], (self.basex, BASEY))
        # # print score so player overlaps the score
        # # showScore(self.score)
        # SCREEN.blit(IMAGES['player'][self.playerIndex],
        #             (self.playerx, self.playery))

        # image_data = pygame.surfarray.array3d(pygame.display.get_surface())
        # pygame.display.update()
        # FPSCLOCK.tick(FPS)
        #print self.upperPipes[0]['y'] + PIPE_HEIGHT - int(BASEY * 0.2)
        return 0, reward, terminal

def getRandomPipe():
    """returns a randomly generated pipe"""
    # y of gap between upper and lower pipe
    gapYs = [20, 30, 40, 50, 60, 70, 80, 90]
    index = random.randint(0, len(gapYs)-1)
    gapY = gapYs[index]

    gapY += int(BASEY * 0.2)
    pipeX = SCREENWIDTH + 10

    return [
        {'x': pipeX, 'y': gapY - PIPE_HEIGHT},  # upper pipe
        {'x': pipeX, 'y': gapY + PIPEGAPSIZE},  # lower pipe
    ]


def showScore(score):
    """displays score in center of screen"""
    scoreDigits = [int(x) for x in list(str(score))]
    totalWidth = 0 # total width of all numbers to be printed

    for digit in scoreDigits:
        totalWidth += IMAGES['numbers'][digit].get_width()

    Xoffset = (SCREENWIDTH - totalWidth) / 2

    for digit in scoreDigits:
        SCREEN.blit(IMAGES['numbers'][digit], (Xoffset, SCREENHEIGHT * 0.1))
        Xoffset += IMAGES['numbers'][digit].get_width()


def checkCrash(player, upperPipes, lowerPipes):
    """returns True if player collders with base or pipes."""
    pi = player['index']
    player['w'] = IMAGES['player'][0].get_width()
    player['h'] = IMAGES['player'][0].get_height()

    # if player crashes into ground
    if player['y'] + player['h'] >= BASEY - 1:
        return True
    else:

        playerRect = pygame.Rect(player['x'], player['y'],
                      player['w'], player['h'])

        for uPipe, lPipe in zip(upperPipes, lowerPipes):
            # upper and lower pipe rects
            uPipeRect = pygame.Rect(uPipe['x'], uPipe['y'], PIPE_WIDTH, PIPE_HEIGHT)
            lPipeRect = pygame.Rect(lPipe['x'], lPipe['y'], PIPE_WIDTH, PIPE_HEIGHT)

            # player and upper/lower pipe hitmasks
            pHitMask = HITMASKS['player'][pi]
            uHitmask = HITMASKS['pipe'][0]
            lHitmask = HITMASKS['pipe'][1]

            # if bird collided with upipe or lpipe
            uCollide = pixelCollision(playerRect, uPipeRect, pHitMask, uHitmask)
            lCollide = pixelCollision(playerRect, lPipeRect, pHitMask, lHitmask)

            if uCollide or lCollide:
                return True

    return False

def pixelCollision(rect1, rect2, hitmask1, hitmask2):
    """Checks if two objects collide and not just their rects"""
    rect = rect1.clip(rect2)

    if rect.width == 0 or rect.height == 0:
        return False

    x1, y1 = rect.x - rect1.x, rect.y - rect1.y
    x2, y2 = rect.x - rect2.x, rect.y - rect2.y

    for x in range(rect.width):
        for y in range(rect.height):
            if hitmask1[x1+x][y1+y] and hitmask2[x2+x][y2+y]:
                return True
    return False


In [6]:
# FLAPPY BIRD CODE WITH GRAPHICS REMOVED

import random
import pygame
from itertools import cycle

SCREENWIDTH  = 288
SCREENHEIGHT = 512

IMAGES, SOUNDS, HITMASKS = load()
PIPEGAPSIZE = 100 # gap between upper and lower part of pipe
BASEY = SCREENHEIGHT * 0.79

PLAYER_WIDTH = IMAGES['player'][0].get_width()
PLAYER_HEIGHT = IMAGES['player'][0].get_height()
PIPE_WIDTH = IMAGES['pipe'][0].get_width()
PIPE_HEIGHT = IMAGES['pipe'][0].get_height()
BACKGROUND_WIDTH = IMAGES['background'].get_width()

PLAYER_INDEX_GEN = cycle([0, 1, 2, 1])


class GameStateNoGraphics:
    def __init__(self):
        self.score = self.playerIndex = self.loopIter = 0
        self.playerx = int(SCREENWIDTH * 0.2)
        self.playery = int((SCREENHEIGHT - PLAYER_HEIGHT) / 2)
        self.basex = 0
        self.baseShift = IMAGES['base'].get_width() - BACKGROUND_WIDTH

        newPipe1 = getRandomPipe()
        newPipe2 = getRandomPipe()
        self.upperPipes = [
            {'x': SCREENWIDTH, 'y': newPipe1[0]['y']},
            {'x': SCREENWIDTH + (SCREENWIDTH / 2), 'y': newPipe2[0]['y']},
        ]
        self.lowerPipes = [
            {'x': SCREENWIDTH, 'y': newPipe1[1]['y']},
            {'x': SCREENWIDTH + (SCREENWIDTH / 2), 'y': newPipe2[1]['y']},
        ]

        # player velocity, max velocity, downward accleration, accleration on flap
        self.pipeVelX = -4
        self.playerVelY    =  0    # player's velocity along Y, default same as playerFlapped
        self.playerMaxVelY =  10   # max vel along Y, max descend speed
        self.playerMinVelY =  -8   # min vel along Y, max ascend speed
        self.playerAccY    =   1   # players downward accleration
        self.playerFlapAcc =  -9   # players speed on flapping
        self.playerFlapped = False # True when player flaps

    def frame_step(self, flap):
        if flap:
            if self.playery > -2 * PLAYER_HEIGHT:
                self.playerVelY = self.playerFlapAcc
                self.playerFlapped = True

        # playerIndex basex change
        if (self.loopIter + 1) % 3 == 0:
            self.playerIndex = next(PLAYER_INDEX_GEN)
        self.loopIter = (self.loopIter + 1) % 30
        self.basex = -((-self.basex + 100) % self.baseShift)

        # player's movement
        if self.playerVelY < self.playerMaxVelY and not self.playerFlapped:
            self.playerVelY += self.playerAccY
        if self.playerFlapped:
            self.playerFlapped = False
        self.playery += min(self.playerVelY, BASEY - self.playery - PLAYER_HEIGHT)
        if self.playery < 0:
            self.playery = 0

        # move pipes to left
        for uPipe, lPipe in zip(self.upperPipes, self.lowerPipes):
            uPipe['x'] += self.pipeVelX
            lPipe['x'] += self.pipeVelX

        # add new pipe when first pipe is about to touch left of screen
        if 0 < self.upperPipes[0]['x'] < 5:
            newPipe = getRandomPipe()
            self.upperPipes.append(newPipe[0])
            self.lowerPipes.append(newPipe[1])

        # remove first pipe if it's out of the screen
        if self.upperPipes[0]['x'] < -PIPE_WIDTH:
            self.upperPipes.pop(0)
            self.lowerPipes.pop(0)

        # check if crash here
        isCrash = checkCrash({'x': self.playerx, 'y': self.playery, 'index': self.playerIndex}, self.upperPipes, self.lowerPipes)
        if isCrash:
            self.__init__()

        return isCrash


def getRandomPipe():
    """returns a randomly generated pipe"""
    # y of gap between upper and lower pipe
    gapYs = [20, 30, 40, 50, 60, 70, 80, 90]
    index = random.randint(0, len(gapYs)-1)
    gapY = gapYs[index]

    gapY += int(BASEY * 0.2)
    pipeX = SCREENWIDTH + 10

    return [
        {'x': pipeX, 'y': gapY - PIPE_HEIGHT},  # upper pipe
        {'x': pipeX, 'y': gapY + PIPEGAPSIZE},  # lower pipe
    ]


def checkCrash(player, upperPipes, lowerPipes):
    """returns True if player collders with base or pipes."""
    pi = player['index']
    player['w'] = IMAGES['player'][0].get_width()
    player['h'] = IMAGES['player'][0].get_height()

    # if player crashes into ground
    if player['y'] + player['h'] >= BASEY - 1:
        return True
    else:

        playerRect = pygame.Rect(player['x'], player['y'],
                      player['w'], player['h'])

        for uPipe, lPipe in zip(upperPipes, lowerPipes):
            # upper and lower pipe rects
            uPipeRect = pygame.Rect(uPipe['x'], uPipe['y'], PIPE_WIDTH, PIPE_HEIGHT)
            lPipeRect = pygame.Rect(lPipe['x'], lPipe['y'], PIPE_WIDTH, PIPE_HEIGHT)

            # player and upper/lower pipe hitmasks
            pHitMask = HITMASKS['player'][pi]
            uHitmask = HITMASKS['pipe'][0]
            lHitmask = HITMASKS['pipe'][1]

            # if bird collided with upipe or lpipe
            uCollide = pixelCollision(playerRect, uPipeRect, pHitMask, uHitmask)
            lCollide = pixelCollision(playerRect, lPipeRect, pHitMask, lHitmask)

            if uCollide or lCollide:
                return True

    return False

def pixelCollision(rect1, rect2, hitmask1, hitmask2):
    """Checks if two objects collide and not just their rects"""
    rect = rect1.clip(rect2)

    if rect.width == 0 or rect.height == 0:
        return False

    x1, y1 = rect.x - rect1.x, rect.y - rect1.y
    x2, y2 = rect.x - rect2.x, rect.y - rect2.y

    for x in range(rect.width):
        for y in range(rect.height):
            if hitmask1[x1+x][y1+y] and hitmask2[x2+x][y2+y]:
                return True
    return False


In [7]:
# HELPER FUNCTION TO GET INPUT FROM FLAPPY BIRD GAME

import torch

def get_gamestate_info(game_state):
    """
    gets coordinates of the two pipes
    usage:          pipe_info = get_pipes_info(game_state)
                    pipe_info["pipe0"]["upper"]["x"] = x coordinate of the upper pipe of the first pipe
    @args:          game_state
    @returns:         
        "pipe0": {
            "upper": {
                "x":
                "y": 
            },
            "lower": {
                "x": 
                "y": 
            }
        },
        "pipe1": {
            "upper": {
                "x": ,
                "y": 
            },
            "lower": {
                "x": 
                "y": 
            }
        }, 
        "player": {
            "x": 
            "y": 
            "VelY":
            "AccY": 
            "Flapped": 
        }
    """
    return {
        "pipe0": {
            "upper": {
                "x": game_state.upperPipes[0]['x'],
                "y": game_state.upperPipes[0]['y']
            },
            "lower": {
                "x": game_state.lowerPipes[0]['x'],
                "y": game_state.lowerPipes[0]['y']
            }
        },
        "pipe1": {
            "upper": {
                "x": game_state.upperPipes[1]['x'],
                "y": game_state.upperPipes[1]['y']
            },
            "lower": {
                "x": game_state.lowerPipes[1]['x'],
                "y": game_state.lowerPipes[1]['y']
            }
        },
        "player": {
            "x": game_state.playerx,
            "y": game_state.playery,
            "VelY": game_state.playerVelY,
            "AccY": game_state.playerAccY,
            "Flapped": game_state.playerFlapped,
        }
    }

def get_input_layer(game_state):
    """
    gets gamestate but returns it as a tensor. Use when feeding into ML algorithm
    Arguments: game_state
    Returns: tensor of shape (6, 1) containing same information as get_gamestate_info, but without the dictionary.
    """
    return torch.tensor([game_state.lowerPipes[0]['x'], game_state.lowerPipes[0]['y'], 
                         game_state.lowerPipes[1]['x'], game_state.lowerPipes[1]['y'], 
                        game_state.playery, game_state.playerVelY, game_state.playerFlapped])

def get_input_layer_2(game_state):
    """
    gets gamestate but returns it as a np array. Use when feeding into ML algorithm
    Arguments: game_state
    Returns: tensor of shape (7, 1) containing same information as get_gamestate_info, but without the dictionary.
    """
    # print("pipe height: ", game_state.lowerPipes[0]['y'])
    return np.array([game_state.lowerPipes[0]['x'], game_state.lowerPipes[0]['y'], game_state.lowerPipes[0]['y'] - 100,
                         game_state.lowerPipes[1]['x'], game_state.lowerPipes[1]['y'], game_state.lowerPipes[0]['y'] - 100,
                        game_state.playery])

In [8]:
# NEAT MODEL

import os.path
import pickle
import neat

class NEATModel:
    def __init__(self):
        configFile = os.path.join(os.path.abspath(''), 'NEATConfig')

        self.config = neat.Config(neat.DefaultGenome, neat.DefaultReproduction,
                                  neat.DefaultSpeciesSet, neat.DefaultStagnation,
                                  configFile)

        self.population = neat.Population(self.config)

        self.population.add_reporter(neat.StdOutReporter(True))
        stats = neat.StatisticsReporter()
        self.population.add_reporter(stats)
        self.population.add_reporter(neat.Checkpointer(20))
        self.bestGenome = None
        self.gameState = None

    def run(self, generations, checkpointFileName=""):
        if checkpointFileName != "":
            self.population = neat.Checkpointer.restore_checkpoint(checkpointFileName)
        self.bestGenome = self.population.run(self.evaluateGenomes, generations)
        with open("NEATBestGenome.pkl", "wb") as f:
            pickle.dump(self.bestGenome, f)
            f.close()

    def loadBest(self):
        with open("NEATBestGenome.pkl", "rb") as f:
            self.bestGenome = pickle.load(f)

    def playGame(self):
        self.gameState = GameState()
        network = neat.nn.FeedForwardNetwork.create(self.bestGenome, self.config)
        go = True
        while go:
            networkInput = get_input_layer(self.gameState)
            networkOutput = network.activate(networkInput)[0]
            flap = networkOutput > 0.5  # sigmoid activation, output should be between 0 and 1
            _, _, terminal = self.gameState.frame_step(flap)
            if terminal:
                go = False

    def testBest(self, runs):
        self.gameState = GameStateNoGraphics()
        network = neat.nn.FeedForwardNetwork.create(self.bestGenome, self.config)
        fitnesses = []
        for i in range(runs):
            thisRunFitness = 0
            go = True
            while go:
                thisRunFitness += 1
                networkInput = get_input_layer(self.gameState)
                networkOutput = network.activate(networkInput)[0]
                flap = networkOutput > 0.5  # sigmoid activation, output should be between 0 and 1
                if self.gameState.frame_step(flap) or thisRunFitness > 10000:
                    go = False
                    fitnesses.append(thisRunFitness)
                    if (i+1) % 100 == 0:
                        print("Finished run: " + str(i+1) + "/" + str(runs))
        print(fitnesses)

    @staticmethod
    def evaluateGenomes(genomes, config):
        gameState = GameStateNoGraphics()
        for genome_id, genome in genomes:
            network = neat.nn.FeedForwardNetwork.create(genome, config)
            runs = 10
            averageFitness = 0
            for i in range(runs):
                thisRunFitness = 0
                go = True
                while go:
                    thisRunFitness += 1
                    networkInput = get_input_layer(gameState)
                    networkOutput = network.activate(networkInput)[0]
                    flap = networkOutput > 0.5  # sigmoid activation, output should be between 0 and 1
                    if gameState.frame_step(flap) or thisRunFitness > 10000:
                        go = False
                        averageFitness += thisRunFitness / runs
            genome.fitness = averageFitness


ModuleNotFoundError: No module named 'neat'

In [7]:
# TRAINING AND TESTING THE NEAT MODEL

neatModel = NEATModel()
# neatModel.run(300)  # Train the model, will take quite a long time!
neatModel.loadBest()  # Load the best model from training
neatModel.playGame()  # Watch the best model play the game (look at your taskbar it won't popup automatically)
neatModel.testBest(1000)  # Test performance of the best model

Finished run: 100/1000
Finished run: 200/1000


KeyboardInterrupt: 

DQL MODEL

In [9]:
# imports
import numpy as np
import random
import torch
import torch.optim as optim
# from utils import get_input_layer_2 as input
# import game.flappyNoGraphics as Game
# import game.wrapped_flappy_bird as GameVisual
from collections import deque
import pickle

In [10]:
# define the neural net of the agent

import torch
import torch.nn as nn
import torch.optim as optim

class Network(nn.Module):
    def __init__(self, lr):
        super(Network, self).__init__()

        self.n_actions = 2
        self.hid_1 = 128
        self.hid_2 = 128
        self.hid_3 = 128
        self.inputs = 7 * 4
        self.model = nn.Sequential(
            nn.Linear(self.inputs, self.hid_1),
            nn.ReLU(),
            nn.Linear(self.hid_1, self.hid_2),
            nn.ReLU(),
            nn.Linear(self.hid_2, self.hid_3),
            nn.ReLU(),
            nn.Linear(self.hid_3, self.n_actions),
        )

        self.optimizer = optim.Adam(self.parameters(), lr=lr)
        self.loss = nn.SmoothL1Loss()
        self.device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
        self.to(self.device)

    def forward(self, x):
        output = self.model(x.type(torch.FloatTensor).to(self.device))
        return output

In [11]:
# create a class to manage the state. Each state is a stack of 4 "frame" of the game
# which provides the agent on information of the bird's movement.

from collections import deque
import torch
# from utils import get_input_layer_2 as input
import numpy as np

class StateManager(object):

    def __init__(self, capacity):
        self.memory = deque([], maxlen=capacity)
        starting_state = [0, 0, 0, 0, 0, 0, 0]
        for _ in range(capacity):
            self.memory.append(starting_state)

    def push(self, game):
        """Save a frame, 
            returns tensor of flattened state frames
        """
        state_frame = get_input_layer_2(game)
        self.memory.popleft()
        self.memory.append(state_frame)
        tensor_list = []
        for i in range(4):
            tensor_list.append(self.memory[i])
        return np.array(tensor_list).flatten()
    
    def get(self):
        # return np array of state frames
        tensor_list = []
        for i in range(4):
            tensor_list.append(self.memory[i])
        return np.array(tensor_list).flatten()

In [12]:
# create the class for the agent.

class Agent(object):
    def __init__(self):
        """
        Porperties:
            gamma (float): Future reward discount rate.
            epsilon (float): Probability for choosing random policy.
            epsilon_decay (float): Rate at which epsilon decays toward zero.
            learning_rate (float): Learning rate for Adam optimizer.

        Returns:
            Agent
        """
        # constant parameters
        self.gamma = 0.95
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.996
        self.lr = 0.00005
        self.batch_size = 64
        self.max_mem_size = 10000
        # self.input_dims = 7 * 4

        #variable parameters
        self.epsilon = 0.01
        self.mem_cntr = 0
        self.mem_cntr_successful = 0

        # initializing memory
        self.memory = deque(maxlen=self.max_mem_size)
        self.memory_successful = deque(maxlen=1000)
        self.episodic_memory = []

        #initialize networks
        self.network = Network(self.lr)

    def save_experience(self):
        with open('Models/DQL/experience.pickle', 'wb') as handle:
            pickle.dump(self.memory, handle)
        with open('Models/DQL/experience_successful.pickle', 'wb') as handle:
            pickle.dump(self.memory_successful, handle)

    def load_experience(self):
        with open('Models/DQL/experience.pickle', 'rb') as handle:
            self.memory = pickle.load(handle)
        with open('Models/DQL/experience_successful.pickle', 'rb') as handle:
            self.memory_successful = pickle.load(handle)

    def getMemory(self):
        return self.memory

    def nextEpisode(self):
        self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)

    def getepsilon(self):
        return self.epsilon

    def remember(self, state, action, reward, next_state, game_over, score, next_reward):
        if (self.mem_cntr >= self.max_mem_size - 2):
            for i in range(self.max_mem_size - 3000):
                self.memory.popleft()
            self.mem_cntr = len(self.memory) - 1

        memory = [state, action, reward, next_state, game_over, score, next_reward]
        self.memory.append(memory)

        self.mem_cntr += 1

    def remember_successful(self, state, action, reward, next_state, game_over, score, next_reward):
        if (self.mem_cntr_successful >= 1000 - 20):
            for i in range(1000 - 500):
                self.memory_successful.popleft()
            self.mem_cntr_successful = len(self.memory_successful) - 1

        memory = [state, action, reward, next_state, game_over, score, next_reward]
        self.memory_successful.append(memory)

        self.mem_cntr_successful += 1

    def select_action(self, state):
        if np.random.rand() <= self.epsilon:
            # exploration

            # 2 in 30 = averages about 1 press every 0.5 seconds which is in the ballpark of whats required to play the game. 
            # Gives bot best start possible (as it actually has a chance of making it through the first block!)
            # in flappy bird a flap changes the gamestate a lot more than a no-flap.
            determiner = np.random.randint(0, 30);
            if (determiner <= 2):
                return 1
            return 0
        else:
            # exploitation, select epsilon-greedy action.
                state_tensor = torch.tensor([state]).to(self.network.device, dtype=torch.int32)
                action = torch.argmax(self.network.forward(state_tensor)).item()
                
        return action
    
    def updateEpsilon(self):
        self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)

    def learn(self):
        """
        learn from a random batch of experiences
        """
        if self.mem_cntr < self.batch_size:
            return
        
        self.network.optimizer.zero_grad()
        max_mem = min(self.mem_cntr, self.max_mem_size)
        batch = np.random.choice(max_mem, self.batch_size, replace=False)

        batch_index = np.arange(self.batch_size, dtype=np.int32)

        # memory = [state, action, reward, next_state, game_over, score, next_reward]
        state_batch = torch.tensor([self.memory[i][0] for i in batch]).to(self.network.device, dtype=torch.float32)
        action_batch = torch.tensor([self.memory[i][1] for i in batch])
        reward_batch = torch.tensor([self.memory[i][2] for i in batch]).to(self.network.device, dtype=torch.float32)
        new_state_batch = torch.tensor([self.memory[i][3] for i in batch]).to(self.network.device, dtype=torch.float32)
        game_over_batch = torch.tensor([self.memory[i][4] for i in batch]).to(self.network.device, dtype=torch.bool)

        #estimate q(s,a) and q(s',a').
        q_current = self.network.forward(state_batch)[batch_index, action_batch]
        q_next = self.network.forward(new_state_batch)
        q_next[game_over_batch] = 0.0
        # q(st,at) = r + gamma * max(q(s',a')
        q_target = reward_batch + self.gamma * torch.max(q_next, dim=1)[0]

        # smoothl1 loss and back-propagation
        loss = self.network.loss(q_target, q_current).to(self.network.device)
        loss.backward()
        #prevent exploding gradient
        torch.nn.utils.clip_grad_value_(self.network.parameters(), 100)
        self.network.optimizer.step()

    def learn_successful(self):
        """
        learn from the set of experience that the agent was successful in. 
        Incentivises the agent to 
        """
        if self.mem_cntr_successful < self.batch_size:
            return
        
        # print("learning successful")
        self.network.optimizer.zero_grad()
        max_mem = min(self.mem_cntr_successful, self.max_mem_size)
        batch = np.random.choice(max_mem, self.batch_size, replace=False)

        batch_index = np.arange(self.batch_size, dtype=np.int32)

        # memory = [state, action, reward, next_state, game_over, score]
        state_batch = torch.tensor([self.memory[i][0] for i in batch]).to(self.network.device, dtype=torch.float32)
        action_batch = torch.tensor([self.memory[i][1] for i in batch])
        reward_batch = torch.tensor([self.memory[i][2] for i in batch]).to(self.network.device, dtype=torch.float32)
        new_state_batch = torch.tensor([self.memory[i][3] for i in batch]).to(self.network.device, dtype=torch.float32)
        game_over_batch = torch.tensor([self.memory[i][4] for i in batch]).to(self.network.device, dtype=torch.bool)
        #estimate q(s,a) and q(s',a').
        q_current = self.network.forward(state_batch)[batch_index, action_batch]
        q_next = self.network.forward(new_state_batch)
        q_next[game_over_batch] = 0.0
        # q(st,at) = r + gamma * max(q(s',a')
        q_target = reward_batch + self.gamma * torch.max(q_next, dim=1)[0]
        
        # smoothl1 loss and back-propagation
        loss = self.network.loss(q_target, q_current).to(self.network.device)
        loss.backward()
        self.network.optimizer.step()

    def update_episodic_memory(self, state, action, reward, next_state, done, score, current_step):
        """appends to a temporary memory. The temporary memory is uploaded to the main memory
        once a game is complete.
        """
        self.episodic_memory.append([state, action, reward, next_state, done, score, 0])

In [13]:
#the trainer allows a human to play the game then upload the relevant data to the agent.
# increases the rate at which the agent initially learns.
import keyboard
import pickle

class Trainer(object):
    def __init__(self, agent):
        self.runs = 10
        self.agent = agent
        self.game = GameState()
        self.state_manager = StateManager(4)

    def play(self, runs=10):
        self.runs = runs
        self.agent.episodic_memory = []
        current_step = 0
        # for runs amount of games
        for i in range(self.runs):
            #initialize game
            self.game = GameState()
            self.state_manager = StateManager(4)
            state = self.state_manager.get()
            done = False
            score = 0
            # manually play the game
            while not done:
                if keyboard.is_pressed(" "):
                    action = 1
                    _, reward, _ = self.game.frame_step(True)
                else:
                    action = 0
                    _, reward, _ = self.game.frame_step(False)
                if (reward == -5):
                    done = True
                    final_score = score
                    reward = -5
                score += reward

                self.state_manager.push(self.game)
                
                #upload experience and train the agent on the human gameplay
                next_state = self.state_manager.get()
                self.agent.update_episodic_memory(state, action, reward, next_state, done, score, current_step)
                self.agent.learn()
                self.agent.learn_successful()
                current_step += 1
                state = next_state
            for frame in self.agent.episodic_memory:
                self.agent.remember(frame[0], frame[1], frame[2], frame[3], frame[4], frame[5], frame[6])
                self.agent.remember_successful(frame[0], frame[1], frame[2], frame[3], frame[4], frame[5], frame[6])


In [14]:
#main training loop
import matplotlib.pyplot as plt


def train():

    agent = Agent()
    scores, median_scores, eps_history, time_history, time_median = [], [], [], [], []
    n_games = 100000
    success_threshold = 15
    trainer = Trainer(agent)

    # trainer.play(10)
    # agent.save_experience()
    agent.load_experience()
    #learn from human experiences for a headstart
    #without this, the agent typically just default to a policy of only flapping or only doing nothing
    for i in range(100):
        agent.learn()

    # for n_games amount of games
    for i in range(n_games):
        #initialize game
        game = GameNoGraphics()
        if (keyboard.is_pressed("p")):
            game = GameState()
        score = 0
        game_over = False
        state_manager = StateManager(4)
        state = state_manager.get()
        done = False
        # state, action, reward, next_state, done, score
        agent.episodic_memory = []
        current_step = 0
        #while the game is not complete
        while not done:
            #select an action
            action = agent.select_action(state)
            _, reward, _ = game.frame_step(action)
            #calculate the state
            state_manager.push(game)
            next_state = state_manager.get()
            if (reward == -5):
                done = True
                final_score = score
                reward = -5
            score += reward
            #remember the action taken
            # agent.remember(state, action, reward, next_state, done, score)
            agent.update_episodic_memory(state, action, reward, next_state, done, score, current_step)
            

            state = next_state
            current_step += 1
        #after each game, learn a random batch of experiences from memory
        #and also lean a batch of experiences that the bird was successful in.
        agent.learn()
        agent.learn_successful()

        agent.updateEpsilon()
        #upload memory to main memory
        eps_history.append(agent.epsilon)
        for frame in agent.episodic_memory:
            agent.remember(frame[0], frame[1], frame[2], frame[3], frame[4], frame[5], frame[6])
        # success_threshold is typically 10 greater than the median score.
        if (score > success_threshold):
            agent.remember_successful(frame[0], frame[1], frame[2], frame[3], frame[4], frame[5], frame[6])
        # agent.remember(state, action, reward, next_state, done, score)

        #calculate some statistics for evaluation
        median_score = np.median(scores[-100:])
        success_threshold = max(success_threshold, median_score + 10)
        scores.append(score)
        median_scores.append(median_score)
        time_history.append(current_step/30)
        median_t = np.median(time_history[-100:])
        time_median.append(median_t)
        if ((i % 100) == 0):
            print('episode: ', i,'score: %.2f' % score,
                    ' median score %.2f' % median_score, 'time %.2f' % (current_step/30),'median time %.2f' % (median_t) ,'epsilon %.2f' % agent.epsilon)
        if (keyboard.is_pressed("`")):
            break
    #save the final model
    torch.save(agent.network.state_dict(), 'Models/DQL/dqlmodel.pth')
    #plot.
    plt.plot(time_median)
    plt.show()

In [17]:
#run the DQL model training.

train()

episode:  0 score: 2.20  median score nan time 0.63 median time 0.63 epsilon 0.01
episode:  100 score: 91.40  median score 14.60 time 5.67 median time 1.67 epsilon 0.01
episode:  200 score: 31.00  median score 14.60 time 2.23 median time 1.67 epsilon 0.01
episode:  300 score: 14.60  median score 14.60 time 1.67 median time 1.67 epsilon 0.01
episode:  400 score: 14.60  median score 14.60 time 1.67 median time 1.67 epsilon 0.01
episode:  500 score: 29.40  median score 14.60 time 2.10 median time 1.67 epsilon 0.01
episode:  600 score: 14.60  median score 14.60 time 1.67 median time 1.67 epsilon 0.01
episode:  700 score: -5.20  median score 14.60 time 1.67 median time 1.67 epsilon 0.01
episode:  800 score: 38.60  median score 14.60 time 2.87 median time 1.67 epsilon 0.01
episode:  900 score: 11.40  median score 14.60 time 1.80 median time 1.82 epsilon 0.01
episode:  1000 score: 8.60  median score 14.20 time 1.87 median time 1.73 epsilon 0.01
episode:  1100 score: 2.20  median score 8.40 ti