In [1]:
import sys
from keras.models import Sequential
from keras.models import load_model
from keras.layers.core import Dense, Activation, Reshape
from keras.layers import Conv2D, LSTM
from keras import optimizers
from keras import initializers
import tensorflow as tf
import numpy as np
import random
import cv2

Using TensorFlow backend.


In [2]:
sys.path.append("game/")
import wrapped_flappy_bird as game

In [3]:
game_state = game.GameState()

In [None]:
img_rows , img_cols = 80, 80
#Convert image into Black and white
img_channels = 4 #We stack 4 frames
from collections import deque

class DQNAgent:
    def __init__(self):
        self.action_size = 2

        # These are hyper parameters for the DQN
        self.discount_factor = 0.99
        self.learning_rate = 0.0001
        self.epsilon = 0.1
        self.epsilon_min = 0.0001
        self.batch_size = 32
        self.train_start = 10000
        # create replay memory using deque
        self.memory = deque(maxlen=200000)
        self.train_buffer = deque(maxlen=10240)
        self.model = self.build_model()


    # approximate Q function using Neural Network
    # state is input and Q Value of each action is output of network
    def build_model(self):
        model = Sequential()
        model.add(Conv2D(32, kernel_size=(8,8), strides=(4, 4), activation='relu',input_shape=(img_rows,img_cols,img_channels),kernel_initializer=initializers.random_normal(stddev=0.01)))  #80*80*4
        model.add(Conv2D(64, kernel_size=(4,4), strides=(2, 2), activation='relu',kernel_initializer=initializers.random_normal(stddev=0.01)))
        model.add(Conv2D(64, kernel_size=(3,3), strides=(1, 1), activation='relu',kernel_initializer=initializers.random_normal(stddev=0.01)))
        model.add(Flatten())
        model.add(Dense(512,activation='relu',kernel_initializer=initializers.random_normal(stddev=0.01)))
        model.add(Dense(self.action_size,activation='linear',kernel_initializer=initializers.random_normal(stddev=0.01)))
        model.summary()
        model.compile(loss='mse', optimizer=optimizers.Adam(lr=self.learning_rate))
        return model

    # get action from model using epsilon-greedy policy
    def get_action(self, state):
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)
        else:
            state = state.reshape(1, state.shape[0], state.shape[1], state.shape[2])
            q_value = self.model.predict(state)
            return np.argmax(q_value[0])

    # save sample <s,a,r,s'> to the replay memory
    def append_sample(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    # pick samples randomly from replay memory (with batch_size)
    def train_model(self):

        #do training if buffer is full
        if len(self.train_buffer) == self.train_buffer.maxlen:
            batch_size = self.train_buffer.maxlen
            states = np.zeros((batch_size, img_rows, img_cols, img_channels))
            next_states = np.zeros((batch_size, img_rows, img_cols, img_channels))
            target = np.zeros((batch_size, self.action_size)) 
            action, reward, done = [], [], []
            
            for i in range(batch_size):
                states[i] = self.train_buffer[i][0]
                action.append(self.train_buffer[i][1])
                reward.append(self.train_buffer[i][2])
                next_states[i] = self.train_buffer[i][3]
                done.append(self.train_buffer[i][4])

            target = self.model.predict(states)            
            target_next = self.model.predict(next_states)
            
            for i in range(batch_size):
                # Q Learning: get maximum Q value at s' from target model
                if done[i]:
                    target[i][action[i]] = reward[i]
                else:
                    a = np.argmax(target_next[i])
                    target[i][action[i]] = reward[i] + self.discount_factor * (
                        target_next[i][a])
                    
            self.model.fit(states, target, 100, epochs=1, verbose=0)
            self.train_buffer.clear()

In [4]:
def reset_state():
    do_nothing = np.zeros(2)
    do_nothing[random.randrange(2)] = 1
    x_t, r_0, terminal = game_state.frame_step(do_nothing)
    x_t = cv2.cvtColor(cv2.resize(x_t, (80, 80)), cv2.COLOR_BGR2GRAY)
    ret, x_t = cv2.threshold(x_t,1,255,cv2.THRESH_BINARY)
    s_t = np.stack((x_t, x_t, x_t, x_t), axis=2)
    return s_t

In [None]:
agent = DQNAgent()
scores, episodes = [], []
action_count = 0
ACTIONS = 2
EXPLORE = 300000
num_steps = 0

EPISODES = 500000
for e in range(EPISODES):
    done = False
    score = 0
    state = reset_state() 
        
    while not done:
        action = np.zeros([ACTIONS])
        action_idx = agent.get_action(state)
        action[action_idx] = 1
        next_img, reward, done = game_state.frame_step(action)
        
        #process next image
        next_img = cv2.cvtColor(cv2.resize(next_img, (80, 80)), cv2.COLOR_BGR2GRAY)
        ret, next_img = cv2.threshold(next_img, 1, 255, cv2.THRESH_BINARY)
        next_img = next_img.reshape(img_rows, img_cols, 1) #80x80x1
        next_state = np.append(next_img,state[:, :, :3],axis=2)
        num_steps += 1
        agent.append_sample(state, action_idx, reward, next_state, done)
        
        if len(agent.memory) > agent.train_start: 
            if agent.epsilon > agent.epsilon_min:
                agent.epsilon -= (agent.epsilon - agent.epsilon_min) / EXPLORE
            
            action_count += 1
        
            if action_count == 4:
                mini_batch = random.sample(agent.memory, agent.batch_size)
                agent.train_buffer.extend(mini_batch)
                action_count = 0
                          
        agent.train_model()        
        score += reward
        state = next_state

        
        if done:
            scores.append(score)
            episodes.append(e)
            print("episode:", e, " score:", score, " epsilon:", agent.epsilon," memory size: ", len(agent.memory),"number of steps: ", num_steps)

    if e % 1000:
        agent.model.save('flappy_bird_model.h5')

In [10]:
#test model
from keras.models import load_model
model = load_model('flappy_bird__lstm_model.h5')
ACTIONS = 2
img_rows , img_cols = 80, 80
img_channels = 4

EPISODES = 100
for e in range(EPISODES):
    done = False
    score = 0
    state = reset_state() 
        
    while not done:
        action = np.zeros([ACTIONS])
        state_tmp = state.reshape(1, state.shape[0], state.shape[1], state.shape[2])
        q_value = model.predict(state_tmp)
        action_idx = np.argmax(q_value[0])
        
        action[action_idx] = 1
        next_img, reward, done = game_state.frame_step(action)
        
        #process next image
        next_img = cv2.cvtColor(cv2.resize(next_img, (80, 80)), cv2.COLOR_BGR2GRAY)
        ret, next_img = cv2.threshold(next_img, 1, 255, cv2.THRESH_BINARY)
        next_img = next_img.reshape(img_rows, img_cols, 1) #80x80x1
        next_state = np.append(next_img,state[:, :, :3],axis=2)
        score += reward
        state = next_state

        
        if done:
            print( " score:", score)


 score: 63.50000000000047
 score: 1254.899999999963
 score: 896.1000000001061
 score: 35.900000000000205
 score: 1397.3999999998616
 score: 468.20000000003
 score: 445.30000000002593
 score: 569.500000000048


KeyboardInterrupt: 