# Import libraries

In [None]:
import pyautogui
import gi
gi.require_version("Wnck", "3.0")
from gi.repository import Wnck

from IPython.display import clear_output
from time import sleep

import matplotlib.pyplot as plt

from collections import deque
import tensorflow.compat.v1 as tf
tf.compat.v1.disable_eager_execution()
import cv2

import numpy as np

import time
import random

# FIRST OF ALL : Run the game
``` bash
cd ./FlappyBirdGame
python ./flappy.py
```

# Game Class

In [None]:
class FlappyBirdGame :
    def __init__(self, windowName):
        scr = Wnck.Screen.get_default()
        scr.force_update()
        windows = scr.get_windows()
        self.flappyBird_window = None
        for window in windows:
            # if title is "Flappy Bird"
            if window.get_name() == windowName:
                # get the window's geometry
                x, y, width, height = window.get_geometry()
                self.flappyBird_window = { "x": x, "y": y, "width": width, "height": height }
                print("Game Positions ↓\n - x :", x , "\n - y :", y , "\n - width :", width , "\n - height :", height)
        if self.flappyBird_window is None:
            print("Window not found")
        else :
            self.flappyBird_click = ( self.flappyBird_window["x"] + self.flappyBird_window["width"]/2, self.flappyBird_window["y"] + self.flappyBird_window["height"] - 30 )
            # focus window
            self.focus()
            # first click
            self.get_processed_img()
            # init score
            self.score = 0.1
    
    def focus(self):
        pyautogui.moveTo(self.flappyBird_click)
        pyautogui.click()
    
    def up(self):
        pyautogui.press('space')
    
    def get_img(self):
        # make a screenshot of the window
        self.img = pyautogui.screenshot(region=(self.flappyBird_window["x"], self.flappyBird_window["y"], self.flappyBird_window["width"], self.flappyBird_window["height"]))
        return self.img
    
    def get_processed_img(self):
        # make a screenshot of the window
        img_rgb = self.get_img()
        # crop top and bottom of the image
        img_rgb = img_rgb.crop((0, img_rgb.height - 512, img_rgb.width, img_rgb.height - 108))
        # convert image to numpy array
        img_rgb = np.array(img_rgb)
        # convert image to grayscale just using green and red channels
        img = img_rgb[:,:,1] + img_rgb[:,:,0]
        img[img>255] = 255
        # threshold image
        img = cv2.adaptiveThreshold(img,1,cv2.ADAPTIVE_THRESH_MEAN_C,cv2.THRESH_BINARY,21,2)
        # resize image
        img = cv2.resize(img, (80, 80))
        # reshape image to 60x60x1
        img = np.reshape(img, (80, 80, 1))
        # save processed image
        self.processed_img = img
        # return the image
        return img
    
    def is_end(self): # True if game over, else False
        if self.img.getpixel((100, self.img.height - 305)) == (252, 160, 72):
            return True
        return False
    
    def startRound(self):
        # focus window
        self.focus()
        # restart the game if it's over
        if self.is_end():
            sleep(1)
            self.up()
            sleep(0.1)
        # init score
        self.score = 0
        # start game
        self.up()
        # first click
        self.get_processed_img()
    
    def get_reward(self):
        # if it's the end, the reward is -1
        if self.is_end() :
            self.score = -1
        else :
            # init score
            self.score = 0.1
            # if the bird is between the pipes, the reward is 1
            if (np.array(self.img.crop((50, self.img.height - 510, 80, self.img.height - 509))) == (220, 245, 133)).all(axis=2).any() :
                self.score = 1
        return self.score
        

    def play(self, action):
        if action == 1:
            self.up()
        # wait 0.1 second
        sleep(0.01)
        return self.get_processed_img(), self.get_reward(), self.is_end()
        

# Some Tests

### Create a game instance for test

In [None]:
game = FlappyBirdGame("Flappy Bird") 

### Test end of game

In [None]:
print(game.is_end())

### Test screen capture

In [None]:
(np.array(game.img.crop((50, game.img.height - 510, 80, game.img.height - 509))) == (220, 245, 133)).all(axis=2).any()

In [None]:
plt.imshow(game.img.crop((50, game.img.height - 510, 80, game.img.height - 509)))

In [None]:
plt.imshow(game.img)

In [None]:
# print image

plt.imshow(game.processed_img, cmap='gray')

# Parameters

In [None]:
GAME = 'FlappyBird' # the name of the game being played for log files
ACTIONS = 2 # number of valid actions
GAMMA = 0.99 # decay rate of past observations
OBSERVE = 20000. # timesteps to observe before training
EXPLORE = 40000. # frames over which to anneal epsilon
FINAL_EPSILON = 0.0001 # final value of epsilon
INITIAL_EPSILON = 0.0001 # starting value of epsilon
REPLAY_MEMORY = 10000 # number of previous transitions to remember
BATCH = 32 # size of minibatch
FRAME_PER_ACTION = 1
SAVE_ITERATIONS = 1000 # save model progress every X iterations

# Build model

In [None]:
def weight_variable(shape):
    initial = tf.truncated_normal(shape, stddev = 0.01)
    return tf.Variable(initial)

def bias_variable(shape):
    initial = tf.constant(0.01, shape = shape)
    return tf.Variable(initial)

def conv2d(x, W, stride):
    return tf.nn.conv2d(x, W, strides = [1, stride, stride, 1], padding = "SAME")

def max_pool_2x2(x):
    return tf.nn.max_pool(x, ksize = [1, 2, 2, 1], strides = [1, 2, 2, 1], padding = "SAME")

def createNetwork():
    # network weights
    W_conv1 = weight_variable([8, 8, 4, 32])
    b_conv1 = bias_variable([32])

    W_conv2 = weight_variable([4, 4, 32, 64])
    b_conv2 = bias_variable([64])

    W_conv3 = weight_variable([3, 3, 64, 64])
    b_conv3 = bias_variable([64])

    W_fc1 = weight_variable([64, 256])
    b_fc1 = bias_variable([256])

    W_fc2 = weight_variable([256, ACTIONS])
    b_fc2 = bias_variable([ACTIONS])

    # input layer
    s = tf.placeholder("float", [None, 80, 80, 4])

    # hidden layers
    h_conv1 = tf.nn.relu(conv2d(s, W_conv1, 4) + b_conv1)
    h_pool1 = max_pool_2x2(h_conv1)

    h_conv2 = tf.nn.relu(conv2d(h_pool1, W_conv2, 2) + b_conv2)
    h_pool2 = max_pool_2x2(h_conv2)

    h_conv3 = tf.nn.relu(conv2d(h_pool2, W_conv3, 1) + b_conv3)
    h_pool3 = max_pool_2x2(h_conv3)

    h_pool3_flat = tf.reshape(h_pool3, [-1, 64])

    h_fc1 = tf.nn.relu(tf.matmul(h_pool3_flat, W_fc1) + b_fc1)

    # readout layer
    readout = tf.matmul(h_fc1, W_fc2) + b_fc2

    return s, readout, h_fc1

# Run the training session

In [None]:
def trainNetwork(s, readout, h_fc1, sess):
    # define the cost function
    a = tf.placeholder("float", [None, ACTIONS])
    y = tf.placeholder("float", [None])
    readout_action = tf.reduce_sum(tf.multiply(readout, a), reduction_indices=1)
    cost = tf.reduce_mean(tf.square(y - readout_action))
    train_step = tf.train.AdamOptimizer(1e-6).minimize(cost)

    # open up a game state to communicate with emulator
    game = FlappyBirdGame("Flappy Bird")

    # store the previous observations in replay memory
    D = deque()

    # printing
    # a_file = open("logs_" + GAME + "/readout.txt", 'w')
    # h_file = open("logs_" + GAME + "/hidden.txt", 'w')

    # get the first state by doing nothing and preprocess the image to 80x80x4
    game.startRound()
    image, reward, terminal = game.play(0) # do nothing
    images_last_t = np.squeeze(np.flip(np.stack((image, image, image, image), axis=2), 0), axis=3)

    # saving and loading networks
    saver = tf.train.Saver()
    sess.run(tf.initialize_all_variables())
    checkpoint = tf.train.get_checkpoint_state("saved_networks")
    if checkpoint and checkpoint.model_checkpoint_path:
        saver.restore(sess, checkpoint.model_checkpoint_path)
        print("Successfully loaded:", checkpoint.model_checkpoint_path)
    else:
        print("Could not find old network weights")

    # start training
    epsilon = INITIAL_EPSILON
    t = 0
    
    while True:
        print("t = ", t)
        # run model and get the action
        readout_t = readout.eval(feed_dict={s : [images_last_t]})[0]
        action_t = np.zeros([ACTIONS])
        action_t_index = 0

        # sometimes do a random action (exploration)
        if random.random() <= epsilon:
            print("Do a random action !")
            action_t_index = random.randrange(ACTIONS)
            action_t[action_t_index] = 1
        else :
            action_t_index = np.argmax(readout_t)
            action_t[action_t_index] = 1
        
        

        # scale down epsilon
        if epsilon > FINAL_EPSILON and t > OBSERVE:
            epsilon -= (INITIAL_EPSILON - FINAL_EPSILON) / EXPLORE

        # run the selected action and observe next state and reward
        image_t, reward_t, terminal = game.play(action_t_index)
        images_t = np.append(image_t, images_last_t[:, :, :3], axis=2)

        # store the transition in D
        D.append((images_last_t, action_t, reward_t, images_t, terminal))
        if len(D) > REPLAY_MEMORY:
            D.popleft()

        # only train if done observing
        if t > OBSERVE:
            # sample a minibatch to train on
            minibatch = random.sample(D, BATCH)

            # get the batch variables
            images_last_t_batch = [d[0] for d in minibatch]
            action_batch = [d[1] for d in minibatch]
            reward_batch = [d[2] for d in minibatch]
            images_t_batch = [d[3] for d in minibatch]

            y_batch = []
            readout_batch = readout.eval(feed_dict = {s : images_t_batch})
            for i in range(0, len(minibatch)):
                terminal_i = minibatch[i][4]
                # if terminal_i, only equals reward
                if terminal_i:
                    y_batch.append(reward_batch[i])
                else:
                    y_batch.append(reward_batch[i] + GAMMA * np.max(readout_batch[i]))

            # perform gradient step
            train_step.run(feed_dict = {
                y : y_batch,
                a : action_batch,
                s : images_last_t_batch}
            )

        # update the old values
        images_last_t = images_t
        t += 1

        # save progress every SAVE_ITERATIONS iterations   
        if t % SAVE_ITERATIONS == 0:
            saver.save(sess, 'saved_networks/' + GAME + '-dqn', global_step = t)

        # print info
        state = ""
        if t <= OBSERVE:
            state = "observe"
        elif t > OBSERVE and t <= OBSERVE + EXPLORE:
            state = "explore"
        else:
            state = "train"

        print("TIMESTEP", t, "/ STATE", state, \
            "/ EPSILON", epsilon, "/ ACTION", action_t_index, "/ REWARD", reward_t, \
            "/ Q_MAX %e" % np.max(readout_t))
        
        if terminal:
            game.startRound()

In [None]:
def runTrain():
    sess = tf.InteractiveSession()
    s, readout, h_fc1 = createNetwork()
    trainNetwork(s, readout, h_fc1, sess)

In [None]:
runTrain()

# Tests and verifications

In [None]:
# pyautogui.confirm('Start AI?', buttons=['Go!'])