In [1]:
import tensorflow as tf
tf.compat.v1.disable_eager_execution()
import numpy as np
from collections import deque
import gym
import random
tf.__version__

'1.15.0'

In [2]:
tf.test.is_gpu_available()

True

In [3]:
#tf.keras.backend.clear_session()
#tf.compat.v1.enable_control_flow_v2()

In [4]:
#Experience replay is used because Neural Networks doesn't perform well on correlated data and reinforcement learning data
#is very correlated, imagine that we have a car in x = 0.6 and y = 0.8 that's our current state
#and the next state is x = 0.65 and y = 0.85 actually the two states are very correlated,
#so we use experience replay to store the data of every state instead of throwing them away
#then we take random sample everytime and that's will break the correlation because the states will be randomly sampled.

class ExperienceReplay:
    def __init__(self, max_batch_size):
        self.max_batch_size = max_batch_size #Specifying the maximum length of tuples that the queue can store
        self.experience_buffer = deque(maxlen=self.max_batch_size) #creating double-ended queue
    def sample(self, batch_size):
        #sample randomly to avoid correlation
        #note that random here is not related to numpy
        return random.sample(self.experience_buffer, batch_size)
    def store(self, sample):
        #sample is stored as tuple
        self.experience_buffer.append(sample)

In [5]:
class DQNAgent:
    def __init__(self, epsilon=0.1, env_shape=None, action_shape=None, batch_size=32, gamma = 0.9, maxlen=10000):
        self.epsilon = epsilon #exploration rate (the probability of choosing random action, this increases accuracy)
        self.env_shape = env_shape #the shape of the information in each state
        self.action_shape = action_shape #the number of actions available in the environment
        self.model = self.create_model() #this function creates and compiles the model
        self.experience = ExperienceReplay(maxlen) #creating an instance of ExperienceReplay with maximum length 10000
        self.batch_size = batch_size #batch size for training
        self.gamma = gamma #discount factor to avoid the agent from being short sighted or long sighted

        
    #this method uses self.epsilon to explore, in this example epsilon is 0.1 so the agent will choose random
    #every 10 times
    def e_greedy_step(self, state):
        if np.random.random() < self.epsilon:
            return np.random.randint(0, self.action_shape-1)
        else:
            #choosing the maximum index which have the highest action value
            return np.argmax(self.predict(state, is_batch=False))
        
    def create_model(self):
        #the model is created using keras functional api
        input_shape = tf.keras.layers.Input(self.env_shape) #Input layer which takes the shape
        #Dense layer with relu activation function and 128 neuron so our matrix is [input_shape, 128] (hidden layer)
        dense1 = tf.keras.layers.Dense(64, activation='relu')(input_shape)
        #dense2 = tf.keras.layers.Dense(32, activation='relu')(dense1)
        #output layer which output 2 probabilities which sum up to one and non-zero (1 probability for each action)
        dense3 = tf.keras.layers.Dense(self.action_shape, activation='linear')(dense1) 
        #creating the model by Model method which maps from input_shape to dense2 which is the output function
        model = tf.keras.models.Model(input_shape, dense3)
        model.compile(loss=tf.keras.losses.MSE, optimizer=tf.keras.optimizers.Adam())
        return model
    
    #this function is used for prediction, if we want to predict a batch of states or just one state
    def predict(self, state, is_batch=False):
        if is_batch:
            return self.model.predict(state, self.batch_size)
        else:
            return self.model.predict(state.reshape(1, self.env_shape)).flatten()
    
    def optimise(self, states_x, states_y, epochs=1):
        self.model.fit(states_x, states_y, epochs=epochs, verbose = 0)
    def replay(self):
        #if the buffer doesn't have enough data to train on then we don't need to continue the method
        if len(self.experience.experience_buffer) < self.batch_size:
            return
        #getting the current batch which have constant size
        current_batch = self.experience.sample(self.batch_size)
        #if we are in terminal state it should be a vector of zeros with length of the state shape
        terminal_state = [0 for i in range(self.env_shape)]
        #stacking all the current states in current_states
        current_states = np.array([s[0] for s in current_batch])
        #stacking all the next states in next_states except if we are in terminal state we will replace None with the vector
        #that we specified above
        next_states = np.array([(terminal_state if s[3] is None else s[3]) for s in current_batch])
        #predict batch of states to apply the q learning equation
        q = self.predict(current_states, is_batch=True)
        #predict batch of next states to apply the q learning equation
        q_next = self.predict(next_states, is_batch=True)
        
        #np array it will be used for populating to train on it
        x = np.zeros((self.batch_size, self.env_shape))
        y = np.zeros((self.batch_size, self.action_shape))
        for i in range(self.batch_size):
            #getting every state
            current_state = current_batch[i][0]
            current_action = current_batch[i][1]
            reward = current_batch[i][2]
            next_state = current_batch[i][3]
            
            values = q[i]
            if next_state is None:
                values[current_action] = reward
            else:
                #the equation means that we are updating in the direction of the q target which is reward + self.gamma * max(q_next[i])
                values[current_action] = reward + self.gamma * np.amax(q_next[i])
            x[i] = current_state
            y[i] = values
        self.optimise(x, y, epochs=2) #after populating x and y we will modify our weights on them
    def store_experience(self, sample):
        self.experience.store(sample) #this function is used to store experiences in our Queue

In [6]:
class Environment:
    def __init__(self, env_name=None, episods_num=100, render=False):
        if env_name is not None:
            self.env_name = env_name #getting the name of the environment
            self.env = gym.make(self.env_name) #creating the environment
            self.n_actions = self.env.action_space.n #getting the number of available action for that environment
            self.env_shape = self.env.observation_space.shape[0] #getting how many information variables we will get from every state
            self.render = render #boolean variable to check if we want to see the output or not
            self.agent = DQNAgent(env_shape=self.env_shape, action_shape = self.n_actions, gamma=1) #creating an instance of the DQN agent
            self.episods_num=episods_num
        else:
            raise Exception("env_name should not be None")
    def begin(self): #training function
        for i in range(self.episods_num):
            state = self.env.reset() #we should reset every time we start the environment
            total_reward = 0 #variable to know the total reward after each episode
            while True:
                if self.render:
                    self.env.render()
                else:
                    pass
                action = self.agent.e_greedy_step(state) #getting the action from our policy
                next_state, reward, done, _ = self.env.step(action) #applying the action in the environment
                if done:
                    next_state = None #check if done this will mean that we are in terminal state so next_state should be equal to None
                self.agent.store_experience((state, action, reward, next_state)) #storing experience in our experience buffer
                self.agent.replay() #learn the agent to perform better in the next episodes
                state = next_state #updaing the state to next state if we are not in terminal state
                total_reward += reward #increasing the total reward
                if done:
                    break
            print(f'Total Reward {total_reward}')

In [7]:
environment = Environment('MountainCar-v0', render = True)

Instructions for updating:
If using Keras pass *_constraint arguments to layers.


In [None]:
environment.begin()

Total Reward -200.0
Total Reward -200.0
Total Reward -200.0
Total Reward -200.0
Total Reward -200.0
Total Reward -200.0
Total Reward -200.0
Total Reward -200.0
Total Reward -200.0
Total Reward -200.0
Total Reward -200.0
Total Reward -200.0
Total Reward -200.0
Total Reward -200.0
Total Reward -200.0
Total Reward -200.0
Total Reward -200.0
Total Reward -200.0
Total Reward -200.0
Total Reward -200.0
Total Reward -200.0
Total Reward -200.0
Total Reward -200.0
Total Reward -200.0
Total Reward -200.0
Total Reward -200.0
Total Reward -200.0
Total Reward -131.0
Total Reward -200.0
Total Reward -200.0
Total Reward -200.0
Total Reward -200.0
Total Reward -200.0
Total Reward -200.0
Total Reward -200.0
Total Reward -200.0
Total Reward -200.0
Total Reward -200.0
Total Reward -200.0
Total Reward -200.0
Total Reward -200.0
Total Reward -200.0
Total Reward -200.0
Total Reward -200.0
Total Reward -200.0
Total Reward -200.0
Total Reward -200.0
Total Reward -200.0
Total Reward -200.0
Total Reward -200.0


In [None]:
#I got some help from this website: https://jaromiru.com/2016/10/03/lets-make-a-dqn-implementation/