# Flappy Bird Machine Learning Using Reinforcement Algorithm

### Deep Q-learning using Flappy Bird Game
Flappy Bird was a popular mobile game originally developed by Vietnamese video game artist and programmer Dong Nguyen. In it, the player controls a bird and tries to fly between green pipes without hitting them.

<img scr="assets/screenshots/flappy_screenshot.png">

In [None]:
# import libraries
import os
import sys
import time

import cv2
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim

In [None]:
# import game class
from game.flappy import GameState

In [None]:
# Neural Network class
class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()

        self.number_of_actions = 2
        self.gamma = 0.99
        self.final_epsilon = 0.0001
        self.initial_epsilon = 0.1
        self.number_of_iterations = 1000000
        self.replay_memory_size = 10000
        self.minibatch_size = 32

        self.conv1 = nn.Conv2d(4, 32, 8, 4)
        self.relu1 = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(32, 64, 4, 2)
        self.relu2 = nn.ReLU(inplace=True)
        self.conv3 = nn.Conv2d(64, 64, 3, 1)
        self.relu3 = nn.ReLU(inplace=True)
        self.fc4 = nn.Linear(3136, 512)
        self.relu4 = nn.ReLU(inplace=True)
        self.fc5 = nn.Linear(512, self.number_of_actions)

    def forward(self, x):
        x = self.conv1(x)
        x = self.relu1(x)
        x = self.conv2(x)
        x = self.relu2(x)
        x = self.conv3(x)
        x = self.relu3(x)
        x = x.view(x.size(0), -1)
        x = self.fc4(x)
        x = self.relu4(x)
        x = self.fc5(x)
        return x


In [None]:
# weights initialization
def init_weights(m):
    if type(m) == nn.Conv2d or type(m) == nn.Linear:
        torch.nn.init.uniform(m.weight, -0.01, 0.01)
        m.bias.data.fill_(0.01)
        

In [None]:
# image to tensor
def image_to_tensor(self, image):
    image = cv2.resize(image, (84, 84))
    image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    image = image.reshape(1, 4, 84, 84)
    image = torch.from_numpy(image)
    image = image.type(torch.FloatTensor)
    return image

In [None]:
# resize bgr image to grayscale
def resize_and_bgr2gray(self, image):
    image = image[0:288, 0:404]
    image_resize = cv2.resize(image, (84, 84))
    image_data = cv2.cvtColor(image_resize, cv2.COLOR_BGR2GRAY)
    image_data[image_data > 0] = 255
    image_data = np.reshape(image_data, (84, 84, 1))
    return image_data

In [None]:
# model training
def train(model, start):
    # define Adam optimizer
    optmizer = optim.Adam(model.parameters(), lr=1e-6)

    # initialize mean squared error loss
    criterion = nn.MSELoss()

    # instantiate game
    game_state = GameState()

    # initialize replay memory
    replay_memory = []

    # initial action is do nothing
    action = torch.zeros([model.number_of_actions], dtype=torch.float32)
    action[0] = 1
    image_data, reward, terminal = game_state.frame_step(action)
    image_data = resize_and_bgr2gray(image_data)
    image_data = image_to_tensor(image_data)
    state = torch.cat((image_data, image_data, image_data, image_data)).unsqueeze(0)

    # initialize epsilon value
    epsilon = model.initial_epsilon
    iteration = 0

    epsilon_decrements = np.linspace(model.initial_epsilon, model.final_epsilon, model.number_of_iterations)

    # main infinite loop
    while iteration < model.number_of_iterations:
        # get output from the neural network
        output = model(state)[0]

        # initialize action
        action = torch.zeros([model.number_of_actions], dtype=torch.float32)
        if torch.cuda.is_available():
            action = action.cuda()

        # epsilon greedy exploration
        random_action = random.random() <= epsilon
        if random_action:
            print('Performed random action')
            action_index = [torch.randint(model.number_of_actions, torch.Size([]), dtype=torch.int) if random_action else torch.argmax(output)][0]


        if torch.cuda.is_available():
            action_index = action_index.cuda()

        action[action_index] = 1

        # get next state and reward
        image_data_1, reward, terminal = game_state.frame_step(action)
        image_data_1 = resize_and_bgr2gray(image_data_1)
        image_data_1 = image_to_tensor(image_data_1)
        state_1 = torch.cat((state.squeeze(0)[1:, :, :], image_data_1)).unsqueeze(0)

        action = action.unsqueeze(0)
        reward = torch.from_numpy(np.array([reward], dtype=np.float32)).unsqueeze(0)

        # save transition to replay memory
        replay_memory.append((state, action, reward, state_1, terminal))

        # if replay memory is full, remove the oldest transition
        if len(replay_memory) > model.replay_memory_size:
            replay_memory.pop(0)

        # epsilon annealing
        epsilon = epsilon_decrements[iteration]

        # sample random minibatch
        minibatch = random.sample(replay_memory, min(len(replay_memory), model.minibatch_size))

        # unpack minibatch
        state_batch = torch.cat(tuple(d[0] for d in minibatch))
        action_batch = torch.cat(tuple(d[1] for d in minibatch))
        reward_batch = torch.cat(tuple(d[2] for d in minibatch))
        state_1_batch = torch.cat(tuple(d[3] for d in minibatch))


        if torch.cuda.is_available(): # use GPU if cuda is available
              state_batch = state_batch.cuda()
              action_batch = action_batch.cuda()
              reward_batch = reward_batch.cuda()
              state_1_batch = state_1_batch.cuda()

        # get output for the next state
        output_1_batch = model(state_1_batch)

        # set y_j to r_j for terminal state, otherwise to to r_j + gamma*max(Q)
        y_batch = torch.cat(tuple(reward_batch[i] if minibatch[i][4]
                                    else reward_batch[i] + model.gamma * torch.max(output_1_batch[i])
                                    for i in range(len(minibatch))))

        # extract Q-value
        q_value = torch.sum(model(state_batch) * action_batch, dim=1)

        # PyTorch accumulates gradients by default, so they need to be reset in each pass
        optmizer.zero_grad()

        # returns a new Tensor, detached from the current graph, the result will never require gradient
        y_batch = y_batch.detach()

        # calculate loss
        loss = criterion(q_value, y_batch)

        # do backward pass
        loss.backward()
        optmizer.step()

        # set state to be state_1
        state = state_1
        iteration += 1

        if iteration % 25000 == 0:
            torch.save(model, 'pretrained_models/current_model_' + str(iteration) + '.pth')

        print('Iteration: {} | Loss: {:.4f} | epsilon: {:.2f}'.format(iteration, loss.item(), epsilon))
        print('Iteration:', iteration, 'elapsed time:', time.time() - start, 'seconds', 'epsilon:', epsilon, 
                'action:', action_index.cpu().detach().numpy(), 'reward: ', reward.numpy()[0][0],
                'Q max: ', np.max(output.cpu().detach().numpy()))

In [None]:
# model testing
def test(model):
    game_state = GameState()

    # initial action is do nothing
    action = torch.zeros([model.number_of_actions], dtype=torch.float32)
    action[0] = 1
    image_data, reward, terminal = game_state.frame_step(action)
    image_data = resize_and_bgr2gray(image_data)
    image_data = image_to_tensor(image_data)
    state = torch.cat((image_data, image_data, image_data, image_data)).unsqueeze(0)


    while True:
        # get output from the neural network
        output = model(state)[0]

        action = torch.zeros([model.number_of_actions], dtype=torch.float32)
        if torch.cuda.is_available(): # use GPU if cuda is available
            action = action.cuda()

        # get action
        action_index = action[torch.argmax(output)]
        if torch.cuda.is_available(): # use GPU if cuda is available
            action_index = action_index.cuda()
        action[action_index] = 1

        # get next state
        image_data_1, reward, terminal = game_state.frame_step(action)
        image_data_1 = resize_and_bgr2gray(image_data_1)
        image_data_1 = image_to_tensor(image_data_1)
        state_1 = torch.cat((state.squeeze(0)[1:, :, :], image_data_1)).unsqueeze(0)
        
        
        # set state to be state_1
        state = state_1

In [None]:
# main function
def main(mode):
    cuda_is_available = torch.cuda.is_available()

    if mode == 'test':
        model = torch.load(
            'pretrained_model/current_model_2000000.pth',
            map_location='cpu' if not cuda_is_available else None
        ).eval()

        if cuda_is_available: # use GPU if cuda is available
            model = model.cuda()

        test(model)
    elif mode == 'train':
        if not os.path.exists('pretrained_model/'):
            os.makedirs('pretrained_model/')

        model = NeuralNetwork()

        if cuda_is_available: # use GPU if cuda is available
            model = model.cuda()

        model.apply(init_weights)
        start = time.time()

        train(model, start)

In [None]:
if __name__ == '__main__':
    main(sys.argv[1])