In [44]:
import gym
import numpy as np

In [45]:
#import warnings
#warnings.filterwarnings("ignore", category=UserWarning)

In [46]:
import random
import gym
import numpy as np
from collections import deque
import tensorflow.keras as keras
from keras.utils import Sequence

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Activation, Flatten, Conv2D, MaxPooling2D
from tensorflow.keras.optimizers import Adam

In [47]:
#initial environment
env = gym.make('PongNoFrameskip-v4', render_mode='human')

#Atari preprocessing wrapper
env = gym.wrappers.AtariPreprocessing(env, noop_max=30, frame_skip=4, screen_size=84, terminal_on_life_loss=False, grayscale_obs=True, grayscale_newaxis=False, scale_obs=False)
#Frame stacking
env = gym.wrappers.FrameStack(env, 4)

In [48]:
# Initial state
state_size = (88, 80, 1)

In [49]:
#Get number of actions 
action_size = env.action_space.n

In [50]:
#preprocessing the game screen

def preprocess_state(state):
    # Assuming state is a tuple, and the first element is the game screen
    game_screen = state[0]
    game_screen = np.array(game_screen)
    # Crop and resize the image
    image = game_screen[1:176:2, ::2]

    # Convert the image to greyscale
    image = image.mean(axis=2)

    # Improve image contrast
    color = image.mean()
    image[image == color] = 0

    # Normalize the image
    image = (image - 128) / 128 - 1

    # Reshape the image
    #image = np.expand_dims(image.reshape(88, 80, 1), axis=0)

    return image


## Building the Deep Q Networks  

For playing atari games we use the CNN as the DQN which takes the image of the game screen as an input and returns the Q values.

Defining the DQN with three convolutional layers. 

1) The convolutional layers extract the features from the image and output the feature maps

2) We flattened the feature map obtained by the convolutional layers 

3) Feeding the flattened feature maps to the feedforward network (fully connected layer) which returns the Q value

In [51]:
class DQN:
    def __init__(self, state_size, action_size):
        
        #define the state size
        self.state_size = state_size
        
        #define the action size
        self.action_size = action_size
        
        #define the replay buffer
        self.replay_buffer = deque(maxlen=5000)
        
        #define the discount factor
        self.gamma = 0.9  
        
        #define the epsilon value
        self.epsilon = 0.8   
        
        #define the update rate at which we want to update the target network
        self.update_rate = 1000    
        
        #define the main network
        self.main_network = self.build_network()
        
        #define the target network
        self.target_network = self.build_network()
        
        #copy the weights of the main network to the target network
        self.target_network.set_weights(self.main_network.get_weights())
        

    #Let's define a function called build_network which is essentially our DQN. 

    def build_network(self):
        model = Sequential()
        model.add(Conv2D(32, (8, 8), strides=4, padding='same', input_shape=self.state_size))
        model.add(Activation('relu'))
        
        model.add(Conv2D(64, (4, 4), strides=2, padding='same'))
        model.add(Activation('relu'))
        
        model.add(Conv2D(64, (3, 3), strides=1, padding='same'))
        model.add(Activation('relu'))
        model.add(Flatten())


        model.add(Dense(512, activation='relu'))
        model.add(Dense(self.action_size, activation='linear'))
        
        model.compile(loss='mse', optimizer=Adam())

        return model
    
    #We learned that we train DQN by randomly sampling a minibatch of transitions from the replay buffer. 
    #So, we define a function called store_transition which stores the transition information into the replay buffer

    def store_transistion(self, state, action, reward, next_state, done):
        self.replay_buffer.append((state, action, reward, next_state, done))
        

    #We learned that in DQN, to take care of exploration-exploitation trade off, we select action using the epsilon-greedy policy. So, now we define the function called epsilon_greedy
    
    def epsilon_greedy(self, state):
        if random.uniform(0,1) < self.epsilon:
            return np.random.randint(self.action_size)
        
        Q_values = self.main_network.predict(state)
        
        return np.argmax(Q_values[0])

    
    #train the network
    def train(self, batch_size):
        
        #sample a mini batch of transition from the replay buffer
        minibatch = random.sample(self.replay_buffer, batch_size)
        
        #compute the Q value using the target network
        for state, action, reward, next_state, done in minibatch:
            if not done:
                target_Q = (reward + self.gamma * np.amax(self.target_network.predict(next_state)))
            else:
                target_Q = reward
                
            #compute the Q value using the main network 
            Q_values = self.main_network.predict(state)
            
            Q_values[0][action] = target_Q
            
            #train the main network
            self.main_network.fit(state, Q_values, epochs=1, verbose=0)
            
    #update the target network weights by copying from the main network
    def update_target_network(self):
        self.target_network.set_weights(self.main_network.get_weights())


## Training the network

In [55]:
num_episodes = 500

In [56]:
num_timesteps = 20000

In [57]:
batch_size = 8

In [58]:
num_screens = 4

In [59]:
dqn = DQN(state_size, action_size)

In [60]:
from warnings import filterwarnings
filterwarnings(action='ignore', category=DeprecationWarning, message='`np.bool` is a deprecated alias')

In [62]:
done = False
time_step = 0

#for each episode
for i in range(num_episodes):
    
    #set return to 0
    Return = 0
    
    #preprocess the game screen
    state = preprocess_state(env.reset())

    #for each step in the episode
    for t in range(num_timesteps):
        
        #render the environment
        state = env.reset()
        
        env.render()
        
        state = env.render()
        
        #update the time step
        time_step += 1
        
        #update the target network
        if time_step % dqn.update_rate == 0:
            dqn.update_target_network()
        
        #select the action
        action = dqn.epsilon_greedy(state)
        
        #perform the selected action
        next_state, reward, done, _ = env.step(action)
        
        #preprocess the next state
        next_state = preprocess_state(next_state)
        
        #store the transition information
        dqn.store_transistion(state, action, reward, next_state, done)
        
        #update current state to next state
        state = next_state
        
        #update the return
        Return += reward
        
        #if the episode is done then print the return
        if done:
            print('Episode: ',i, ',' 'Return', Return)
            break
            
        #if the number of transistions in the replay buffer is greater than batch size
        #then train the network
        if len(dqn.replay_buffer) > batch_size:
            dqn.train(batch_size)


