<a href="https://colab.research.google.com/github/Mctran1724/Games/blob/main/FlappyBirdDQN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [11]:
import random
import numpy as np
import flappy_bird_gym
from collections import deque
from keras.layers import Input, Dense
from keras.models import load_model, save_model, Sequential
from keras.optimizers import RMSprop
import tensorflow as tf
from IPython.display import clear_output


In [5]:
device_name = tf.test.gpu_device_name()
print('Found GPU at: {}'.format(device_name))

Found GPU at: /device:GPU:0


In [19]:
#Create DQNA

def Model(input_shape, output_shape):
    model = Sequential()
    model.add(Dense(512, input_shape = input_shape, activation = "relu", kernel_initializer = "he_uniform"))
    model.add(Dense(256, activation = 'relu', kernel_initializer = "he_uniform"))
    model.add(Dense(128, activation = 'relu', kernel_initializer = "he_uniform"))
    model.add(Dense(64, activation = 'relu', kernel_initializer = "he_uniform"))
    model.add(Dense(output_shape, activation = 'linear', kernel_initializer = "he_uniform"))
    model.compile(loss = 'mse', optimizer = RMSprop(lr = 0.0001, rho = 0.95, epsilon = 0.01), metrics = ['accuracy'])
    model.summary()
    return model


    
class DQNAgent:
    def __init__(self, episodes = 500, gamma = 0.95, epsilon = 1, epsilon_decay = 0.9999, epsilon_min = 0.01, batch_number = 32):
        
        #environment variables
        self.env = flappy_bird_gym.make("FlappyBird-v0")
        self.episodes = episodes
        self.state_space = self.env.observation_space.shape[0]
        self.action_space = self.env.action_space.n
        self.memory = deque(maxlen = 2000)
        
        #hyperparameters
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_decay = epsilon_decay
        self.epsilon_min = epsilon_min
        self.batch_number = batch_number
        
        self.train_start = 1000
        self.jump_prob = 0.01
        
        self.model = Model(input_shape = (self.state_space,) , output_shape = self.action_space)
    
    def act(self, state):
        if np.random.random() > self.epsilon:
            #As you decay epsilon, your model will act
            return np.argmax(self.model.predict(state))
        return 1 if np.random.random() < self.jump_prob else 0
    
    def learn(self):
        #Ensure you have enough data by running as many times as self.train_start
        if len(self.memory) < self.train_start:
            return
        else:
            #create and storing locations for minibatch info
            minibatch = random.sample(self.memory, min(len(self.memory), self.batch_number))
            state = np.zeros((self.batch_number, self.state_space))
            next_state = np.zeros((self.batch_number, self.state_space))
            action, reward, done = [], [], []
            
            #Store data in variables. Go into the minibatch,
            #which has picked out a random memory state and treat those as labels
            for i in range(self.batch_number): #iterate over all batches
                state[i] = minibatch[i][0]
                action.append(minibatch[i][1])
                reward.append(minibatch[i][2])
                next_state[i] = minibatch[i][3]
                done.append(minibatch[i][4])
            #predict y label
            target = self.model.predict(state)
            target_next = self.model.predict(next_state)
            
            for i in range(self.batch_number): #from Deep Q Network DQN algorithm adapted from Mnih et al 2015.
                if done[i]:
                    target[i][action[i]] = reward[i]
                else:
                    target[i][action[i]] = reward[i] + self.gamma * np.amax(target_next[i])

            self.model.fit(state, target, batch_size = self.batch_number, verbose = 0)
            
            
    def train(self):
        #n episode iterations

        for i in range(self.episodes):
            #environment variables for training
            state = self.env.reset() #reset environment every playthrough
            state = np.reshape(state, [1, self.state_space]) #make state space a vector
            done = False #done flag
            score = 0 
            if self.epsilon * self.epsilon_decay > self.epsilon_min: #if your epsilon is greater than the minimum allowed value,
                                                                     #decrease it until it hits that
                self.epsilon *= self.epsilon_decay 
            else:
                self.epsilon = min_epsilon
            while not done:
                clear_output(wait = True)
                #self.env.render() #render environment
                action = self.act(state)
                next_state, reward, done, info = self.env.step(action)
                
                next_state = np.reshape(next_state, [1, self.state_space])
                score += 1
                
                
                
                if done: #done is whether or not we died so if we die it's bad
                    reward -= 100
                    
                
                self.memory.append((state, action, reward, next_state, done))
                
                state = next_state
                
                if done: 
                    if score > 100:
                        self.model.save("weights.h5")
                    print("Episode: {}\n Score: {}\n Epsilon: {}".format(i, score, self.epsilon))
                
                self.learn()
                
    def play(self):
        self.model = load_model("weights.h5")
        while True:
            state = self.env.reset()
            state = np.reshape(state, [1, self.state_space])    
            done = False
            score = 0
            
            while not done:
                self.env.render()
                action = np.argmax(self.model.predict(state)) #make your action a predicted action
                next_sate, reward, done, info = self.env.step(action)
                state = np.reshape(next_state, [1, self.state_space])
                score += 1
                print("Current score: {}".format(score))
                
                if done:
                    print("You died.")
                    break
            break
                
                
                

In [20]:
agent = DQNAgent()
with tf.device(device_name):
  agent.train()


Episode: 499
 Score: 70
 Epsilon: 0.9512270462715811


In [21]:
agent.play()

'/content'