In [1]:
import tensorflow as tf
import gym
import numpy as np 
import warnings
import matplotlib.pyplot as plt
from collections import deque
from skimage import transform
warnings.filterwarnings('ignore')

In [2]:
class GameEnv:
    
    """
      This Class creates Atari Game Enivroment and provides some preprocessing functions.
    """
  
    def __init__(self, game = 'SpaceInvaders-v0'):
        self.env = gym.make(game)
        self.n_actions = self.env.action_space.n
        self.frame_size = self.env.observation_space.shape
        self.hot_enc_actions = np.array(np.identity(self.n_actions).tolist()) 
        self.stack_size = 4
        self.stacked_frames = deque([np.zeros((110,84), dtype=np.int) for i in range(self.stack_size)], maxlen=self.stack_size)
        self.hyperparameters = {
                               'learning_rate' : 0.00025,
                               'total_episodes' : 50,
                               'max_steps' : 50000,
                               'btach_size': 64,
                               'explore_start' : 1,
                               'explore_end' : 0.01,
                               'decay_rate' : 0.00001,
                               'gamma' : 0.9,
                               'pretrain_length' : 64,
                               'memory_size' : 1000000,
                               'state_size' : [110, 84, 4]
                               }
        self.training = False
        self.render = False
    
    
    
    
    def _preprocess_frame(self,frame):
        gray_frame = rgb2gray(frame)
        cropped_frame = gray_frame[8:-12,4:-12]

        # Normalize Pixel Values
        normalized_frame = cropped_frame/255.0

        # Resize
        # Thanks to Mikołaj Walkowiak
        preprocessed_frame = transform.resize(normalized_frame, [110,84])

        return preprocessed_frame # 110x84x1 frame
  
    def stack_frame(self, state, new_epis = False):
    
        frame = self._preprocess_frame(state)

        if new_epis:
            self.stacked_frames  =  deque([frame for _ in range(self.stack_size)], maxlen=self.stack_size)
        else:
            self.stacked_frames.append(frame)

        self.stacked_state = np.stack(self.stacked_frames, axis=2)
        return self.stacked_state  



In [9]:
class DDQNN:
  
  def __init__(self, gamenv, name):
    self.gamenv = gamenv
    self.decay_step = 0
    with tf.variable_scope(name):
        self._inputs = tf.placeholder(tf.float32, [None, *self.gamenv.hyperparameters['state_size']], name='inputs')
        self._ISWeights = tf.placeholder(tf.float32, [None,1], name='IS_weights')
        self._actions = tf.placeholder(tf.float32, [None, self.gamenv.n_actions], name='actions')
        self.target_Q = tf.placeholder(tf.float32, [None], name="target")
      
        self.conv1 = tf.layers.conv2d(inputs = self._inputs, 
                                    filters = 32,
                                    kernel_size = [8,8],
                                    strides = [4,4],
                                    padding = 'VALID',
                                    kernel_initializer=tf.contrib.layers.xavier_initializer_conv2d(),
                                    name = 'Conv1')
        self.actvf1 = tf.nn.elu(self.conv1, name='Elu1')
      
        self.conv2 = tf.layers.conv2d(inputs = self.conv1, 
                                    filters = 64,
                                    kernel_size = [4,4],
                                    strides = [2,2],
                                    padding = 'VALID',
                                    kernel_initializer=tf.contrib.layers.xavier_initializer_conv2d(),
                                    name = 'Conv2')
        self.actvf2 = tf.nn.elu(self.conv2, name='Elu2')
      
        self.conv3 = tf.layers.conv2d(inputs = self.conv2, 
                                    filters = 64,
                                    kernel_size = [3,3],
                                    strides = [2,2],
                                    padding = 'VALID',
                                    kernel_initializer=tf.contrib.layers.xavier_initializer_conv2d(),
                                    name = 'Conv3')
        self.actvf3 = tf.nn.elu(self.conv3, name='Elu3')
      
        self.flatten = tf.contrib.layers.flatten(self.actvf3)
        self.value_fc = tf.layers.dense(inputs = self.flatten,
                                        units = 512,
                                        activation = tf.nn.elu,
                                        kernel_initializer = tf.contrib.layers.xavier_initializer(),
                                        name = 'Value_fc')
        self.value = tf.layers.dense(inputs = self.value_fc,
                                    units = 1,
                                    activation = None,
                                    kernel_initializer = tf.contrib.layers.xavier_initializer())
        self.advantage_fc = tf.layers.dense(inputs = self.flatten,
                                  units = 512,
                                  activation = tf.nn.elu,
                                       kernel_initializer=tf.contrib.layers.xavier_initializer(),
                                name="advantage_fc")
            
        self.advantage = tf.layers.dense(inputs = self.advantage_fc,
                                        units = self.gamenv.n_actions,
                                        activation = None,
                                        kernel_initializer=tf.contrib.layers.xavier_initializer(),
                                name="advantages")
        self.fc = tf.layers.dense(inputs = self.flatten,
                                units = 512,
                                activation = tf.nn.elu,
                                kernel_initializer=tf.contrib.layers.xavier_initializer(),
                                name="fc1")
            
        self.output = self.value + tf.subtract(self.advantage, tf.reduce_mean(self.advantage, axis=1, keepdims=True))
        self.Q = tf.reduce_sum(tf.multiply(self.output, self._actions), axis=1)
        
        self.absolute_errors = tf.abs(self.target_Q - self.Q)# for updating Sumtree
            
        self.loss = tf.reduce_mean(self._ISWeights * tf.squared_difference(self.target_Q, self.Q))
            
        self.optimizer = tf.train.RMSPropOptimizer(self.gamenv.hyperparameters['learning_rate']).minimize(self.loss)
      
      
    def predict_action(self, state, sess):
        hyperp = self.gamenv.hyperparameters
        explore_probability = hyperp['explore_end'] + (hyperp['explore_start'] - hyperp['explore_end']) * np.exp(-hyperp['decay_rate'] * self.decay_step)

        if explore_probability > np.random.rand():
            action = self.gamenv.hot_enc_actions[self.gamenv.env.action_space.sample()]

        else:
            Qs = sess.run(self.output,feed_dict = {self._inputs:state.reshape((1,*state.shape))})
            action = self.gamenv.hot_enc_actions[np.argmax(Qs)]

        return action, explore_probability


    
    
    
    
    
    

In [10]:
# Reset the graph
tf.reset_default_graph()
# Make a new Game Object
spaceinvaders = GameEnv()
# Instantiate the DQNetwork
DQNetwork = DDQNN(spaceinvaders, name="DQNetwork")

# Instantiate the target network
TargetNetwork = DDQNN(spaceinvaders, name="TargetNetwork")