### Deep Learning Final Project
#### Playing Modified Flappy Bird Using Reinforcement Learning
Leo Li(zl3493), Zhangnan Jiang(zj2028), Zichen Yang(zy2486) <br>
In this project, we would like to build our own reinforcement learning neural network to play the
Flappy Bird game. <br>
Please find all supporting files in the same folder. 

#### Importing necessary libraries:

In [1]:
import os
import cv2
import shutil
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchsummary
from random import random, randint, sample
from tensorboardX import SummaryWriter

#### Defining function used to preprocess game frame data (converting to grayscale):

In [2]:
def pre_processing(image, width, height):
    
    image = cv2.cvtColor(cv2.resize(image, (width, height)), cv2.COLOR_BGR2GRAY)
    _, image = cv2.threshold(image, 1, 255, cv2.THRESH_BINARY)
    return image[None, :, :].astype(np.float32)


#### Defining Model Structure:

Deep Q Network:

In [3]:
class DeepQNetwork(nn.Module):
    def __init__(self):
        super(DeepQNetwork, self).__init__()
        self.conv1 = nn.Sequential(nn.Conv2d(4, 32, kernel_size=8, stride=4), nn.ReLU(inplace=True))
        self.conv2 = nn.Sequential(nn.Conv2d(32, 64, kernel_size=4, stride=2), nn.ReLU(inplace=True))
        self.conv3 = nn.Sequential(nn.Conv2d(64, 64, kernel_size=3, stride=1), nn.ReLU(inplace=True))

        self.fc1 = nn.Sequential(nn.Linear(7 * 7 * 64, 512), nn.ReLU(inplace=True))
        self.fc2 = nn.Linear(512, 2)
        
        self.init_weights()
        
    # Initiate model weights
    def init_weights(self):
        for module in self.modules():
            if isinstance(module, nn.Conv2d) or isinstance(module, nn.Linear):
                nn.init.uniform_(module.weight, -0.01, 0.01)
                nn.init.constant_(module.bias, 0)

    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = x.view(x.size(0), -1)
        x = self.fc1(x)
        x = self.fc2(x)

        return x


Dueling DQN:

In [4]:
class DDQN(nn.Module):

    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(4, 32, kernel_size=8, stride=4)
        self.bn1 = nn.BatchNorm2d(32)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=4, stride=2)
        self.bn2 = nn.BatchNorm2d(64)
        self.conv3 = nn.Conv2d(64, 64, kernel_size=3, stride=1)
        self.bn3 = nn.BatchNorm2d(64)

        linear_input_size = 7 * 7 * 64
        fc_output_size=512
        self.fc_val=nn.Linear(linear_input_size, fc_output_size)
        self.fc_adv=nn.Linear(linear_input_size, fc_output_size)
        self.val = nn.Linear(fc_output_size, 1)
        self.adv = nn.Linear(fc_output_size, 2)

    def forward(self, x):
        x = x.float()
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = F.relu(self.bn3(self.conv3(x)))
        
        x_val = F.relu(self.fc_val(x.view(x.size(0), -1)))
        x_adv = F.relu(self.fc_adv(x.view(x.size(0), -1)))
        val=self.val(x_val)
        adv=self.adv(x_adv)
        
        x=val+adv-adv.mean(1,keepdim=True)
        return x

In [5]:
torch.cuda.is_available()

True

#### The training procedure:

Defining hyperparameters:

In [6]:
saved_path = "results"
log_path = "ddqnlog"
image_size = 84
lr = 1e-6
num_iters = 2000000
initial_epsilon = 0.1
final_epsilon = 1e-4
memory_buffer_size = 46000
gamma = 0.99
batch_size = 32

Load model select (can choose: Deep Q Network/ Dueling DQN, pretrain or not):

In [7]:
torch.cuda.manual_seed(123)
model = DDQN()
# model = torch.load("results/flappy_bird")
# torchsummary.summary(model, (4, 8, 8))

Handling tensorboard log path:

In [8]:
if os.path.isdir(log_path):
        shutil.rmtree(log_path)
os.makedirs(log_path)
writer = SummaryWriter(log_path)

We can monitor the training progress by checking the embedded tensorboard: 

In [9]:
%load_ext tensorboard
%tensorboard --logdir ddqnlog

Using Adam Optimizer and MSE loss between y_batch and q_value:

In [10]:
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
criterion = nn.MSELoss()

Starting the game and take the first frame as the initial input:

In [11]:
from src.flappy_bird import FlappyBird
game_state = FlappyBird() # start the game
image, reward, terminal = game_state.next_frame(0) # get the frame from the started game
image = pre_processing(image[:game_state.screen_width, :int(game_state.base_y)], image_size, image_size)
image = torch.from_numpy(image)

pygame 2.1.2 (SDL 2.0.16, Python 3.6.15)
Hello from the pygame community. https://www.pygame.org/contribute.html


In [12]:
model.cuda()
image = image.cuda()
state = torch.cat(tuple(image for _ in range(4)))[None, :, :, :]

Initialize the relay buffer:

In [13]:
memory_buffer = []
iter = 0

The training iterations:

In [None]:
try:
    while iter < num_iters:
        
        prediction = model(state)[0]

        epsilon = final_epsilon + ((num_iters - iter) * (initial_epsilon - final_epsilon) / num_iters)
        iter += 1
        
        if random() <= epsilon:
            action = randint(0, 1)# make a random game action
        else:
            action = torch.argmax(prediction)
        
        # get the next frame from the started game applying a new game action
        next_image, reward, terminal = game_state.next_frame(action) 
        next_image = pre_processing(next_image[:game_state.screen_width, :int(game_state.base_y)], image_size, image_size) # preprocess the game frame
        next_image = torch.from_numpy(next_image)
        next_image = next_image.cuda()
        next_state = torch.cat((state[0, 1:, :, :], next_image))[None, :, :, :]
        
        memory_buffer.append([state, action, reward, next_state, terminal])
        
        if len(memory_buffer) > memory_buffer_size:
            del memory_buffer[0]
        
        batch = sample(memory_buffer, min(len(memory_buffer), batch_size))
        
        state_batch, action_batch, reward_batch, next_state_batch, terminal_batch = zip(*batch)

        state_batch = torch.cat(tuple(state for state in state_batch))
        action_batch = torch.from_numpy(np.array([[1, 0] if action == 0 else [0, 1] for action in action_batch], dtype=np.float32))
        reward_batch = torch.from_numpy(np.array(reward_batch, dtype=np.float32)[:, None])
        next_state_batch = torch.cat(tuple(state for state in next_state_batch))

        state_batch = state_batch.cuda()
        action_batch = action_batch.cuda()
        reward_batch = reward_batch.cuda()
        next_state_batch = next_state_batch.cuda()
        
        current_prediction_batch = model(state_batch)
        next_prediction_batch = model(next_state_batch)

        y_batch = torch.cat(tuple(reward if terminal else reward + gamma * torch.max(prediction) for reward, terminal, prediction in
zip(reward_batch, terminal_batch, next_prediction_batch)))

        q_value = torch.sum(current_prediction_batch * action_batch, dim=1)
        optimizer.zero_grad()

        loss = criterion(q_value, y_batch) # MSE
        loss.backward()
        optimizer.step()

        state = next_state
        
        if iter % 1000 == 0:
            print("Iteration: {}/{}, Action: {}, Loss: {}, Epsilon {}, Reward: {}, Q-value: {}".format(
                iter + 1,
                num_iters,
                action,
                loss,
                epsilon, reward, torch.max(prediction)))
            writer.add_scalar('Train/Loss', loss, iter)
            writer.add_scalar('Train/Epsilon', epsilon, iter)
            writer.add_scalar('Train/Reward', reward, iter)
            writer.add_scalar('Train/Q-value', torch.max(prediction), iter)

        if (iter+1) % 1000000 == 0:
            torch.save(model.state_dict(), "{}/fireball_flappy_bird_rewarded_ddqn{}.pth".format(saved_path, iter+1))
except KeyboardInterrupt:
    print("Saving model before quit")
    torch.save(model.state_dict(), "{}/fireball_flappy_bird_rewarded_ddqn{}.pth".format(saved_path, iter+1))

torch.save(model.state_dict(), "{}/fireball_flappy_bird_rewarded_ddqn.pth".format(saved_path))

#### Test Procedure:

In [21]:
saved_path = "results"
image_size = 84
testing_rounds = 5

In [22]:
torch.cuda.manual_seed(123)

In [23]:
model = DeepQNetwork()
# model = DDQN()

In [24]:
# from src.deep_q_network import DeepQNetwork
from src.flappy_bird import FlappyBird
import copy

def test(model_name, testing_rounds):
    if torch.cuda.is_available():
        model.load_state_dict(torch.load("{}/{}".format(saved_path, model_name)))
    else:
        model.load_state_dict(torch.load("{}/{}".format(saved_path, model_name), map_location=torch.device('cpu')))

    model.eval()

    scores = []

    try:
        game_state = FlappyBird()
        image, reward, terminal = game_state.next_frame(0)
        image = pre_processing(image[:game_state.screen_width, :int(game_state.base_y)], image_size, image_size)
        image = torch.from_numpy(image)
        if torch.cuda.is_available():
            model.cuda()
            image = image.cuda()
        state = torch.cat(tuple(image for _ in range(4)))[None, :, :, :]

        while True:
            if testing_rounds == 0:
                break
            prediction = model(state)[0]
            action = torch.argmax(prediction)

            next_image, reward, terminal = game_state.next_frame(action)
            if terminal:
                testing_rounds = testing_rounds - 1
            next_image = pre_processing(next_image[:game_state.screen_width, :int(game_state.base_y)], image_size,
                                        image_size)
            next_image = torch.from_numpy(next_image)
            if torch.cuda.is_available():
                next_image = next_image.cuda()
            next_state = torch.cat((state[0, 1:, :, :], next_image))[None, :, :, :]

            state = next_state
    except KeyboardInterrupt:
        print("Quit")

In [None]:
test("dqn_rewarded_flappy_bird_2000000.pth",100)
# Score output below is saved in a txt file named score.txt

In [None]:
def test_ddqn(model_name, testing_rounds):
    if torch.cuda.is_available():
        model = torch.load("{}/{}".format(saved_path, model_name))
    else:
        model = torch.load("{}/{}".format(saved_path, model_name), map_location=torch.device('cpu'))

    model.eval()

    scores = []

    try:
        game_state = FlappyBird()
        image, reward, terminal = game_state.next_frame(0)
        image = pre_processing(image[:game_state.screen_width, :int(game_state.base_y)], image_size, image_size)
        image = torch.from_numpy(image)
        if torch.cuda.is_available():
            model.cuda()
            image = image.cuda()
        state = torch.cat(tuple(image for _ in range(4)))[None, :, :, :]

        while True:
            if testing_rounds == 0:
                break
            prediction = model(state)[0]
            action = torch.argmax(prediction)

            next_image, reward, terminal = game_state.next_frame(action)
            if terminal:
                testing_rounds = testing_rounds - 1
            next_image = pre_processing(next_image[:game_state.screen_width, :int(game_state.base_y)], image_size,
                                        image_size)
            next_image = torch.from_numpy(next_image)
            if torch.cuda.is_available():
                next_image = next_image.cuda()
            next_state = torch.cat((state[0, 1:, :, :], next_image))[None, :, :, :]

            state = next_state
    except KeyboardInterrupt:
        print("Quit")
        

In [None]:
from src.ddqn4 import DDQN4
test_ddqn("fireball_flappy_bird_rewarded_ddqn2000000.pth",100)

In [19]:
file1 = open('score/fireball_flappy_bird_rewarded_ddqn1000000.txt', 'r')
Lines = file1.readlines()
  
count = 0
score = []
# Strips the newline character
for line in Lines:
    count += 1
    score.append(int(line.strip()))

In [20]:
import numpy
print("Max: " + str(numpy.max(score)))
print("Average: " + str(numpy.average(score)))

Max: 8
Average: 3.19
