<a href="https://colab.research.google.com/github/Prithvij2004/grid_world_continuous/blob/main/CartPoleDQN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pygame
!apt-get install -y xvfb x11-utils
!pip install pyvirtualdisplay==0.2.*

In [None]:
import gym
import random
from collections import deque
import tensorflow as tf
import numpy as np
import math
from tensorflow import keras
from pyvirtualdisplay import Display
from gym.wrappers.monitoring.video_recorder import VideoRecorder

In [None]:
# display = Display(visible=False, size=(1400, 900))
# _ = display.start()
env = gym.make('CartPole-v1')

In [None]:
class Agent:
    def __init__(self, stateSize, actionSize):
        self.stateSize = stateSize
        self.actionSize = actionSize

        #Hyper Parameter
        self.alpha = 0.001
        self.gamma = 0.9
        self.epsilon = 1
        self.buffer = deque(maxlen = 2_000)
        self.epsilon_decay = 0.01
        self.epsilon_min = 0.01
        self.epsilon_max = 1

        #Neural Network
        self.main_nn = self.DQNetwork()
        self.target_nn = self.DQNetwork()
        self.update_nn_weights()

    def DQNetwork(self):
        model = keras.models.Sequential()
        model.add(keras.layers.Dense(24, input_dim = self.stateSize, activation ='relu',  kernel_initializer=tf.keras.initializers.HeUniform()))
        model.add(keras.layers.Dense(24, activation = 'relu',  kernel_initializer=tf.keras.initializers.HeUniform()))
        model.add(keras.layers.Dense(self.actionSize, activation = 'linear', kernel_initializer=tf.keras.initializers.HeUniform(), dtype = tf.float32))
        model.compile(loss = tf.keras.losses.Huber(), optimizer = keras.optimizers.Adam(self.alpha))
        return model
    
    def addBuffer(self, state, action, reward, done, newState):
        self.buffer.append((state, action, reward, done, newState))

    def replayBuffer(self, num_samples):
        states, actions, rewards, dones, newStates = [], [], [], [], []
        index = np.random.choice(len(self.buffer), num_samples)
        for i in index:
            element = self.buffer[i]
            state, action, reward, done, newState = element
            states.append(np.array(state, copy = False))
            actions.append(np.array(action, copy = False))
            rewards.append(reward)
            dones.append(done)
            newStates.append(np.array(newState, copy = False))
        states = np.array(states)
        actions = np.array(actions)
        rewards = np.array(rewards, dtype = np.float32)
        dones = np.array(dones, dtype = np.float32)
        newStates = np.array(newStates)
        return states, actions, rewards, dones, newStates
    
    def update_epsilon(self, x):
        # return max(self.epsilon_min, min(1.0, 1.0 - math.log10((x+1)/25)))
        # if self.epsilon > self.epsilon_min:
        #     self.epsilon *= self.epsilon_decay
        # self.epsilon *= np.exp(self.epsilon_decay)
                
        self.epsilon = self.epsilon_min + (self.epsilon_max - self.epsilon_min) * np.exp(-self.epsilon_decay * x)
    
    def chooseAction(self, env, state):
        prob = np.random.random()
        if prob < self.epsilon:
            return env.action_space.sample()
        else :
            return tf.argmax(self.main_nn.predict(state, verbose = 0)[0]).numpy()
    
    def train_nn(self, numSamples):
        minibatch = random.sample(self.buffer, numSamples)
        targetStates = np.array([transition[4] for transition in minibatch])
        currentStates = np.array([transition[0] for transition in minibatch])
        target_qVal_list = self.target_nn.predict(targetStates, verbose = 0)
        current_qVal_list = self.main_nn.predict(currentStates, verbose = 0)

        x, y = [], []
        for index, (state, action, reward, done, newState) in enumerate(minibatch):
            q_target = reward
            if not done:
                q_target += self.gamma * np.amax(target_qVal_list[index])
            current_qVal = current_qVal_list[index]
            current_qVal[action] = q_target
            x.append(state)
            y.append(current_qVal)
        # target_qVal = self.target_nn.predict(newStates, verbose = 0)
        # max_target_qVal = tf.math.reduce_max(target_qVal, axis = -1)
        # target = rewards + (1. - dones) * self.gamma * max_target_qVal 
        self.main_nn.fit(np.array(x), np.array(y), batch_size = numSamples, verbose = 0)
        
    

    def update_nn_weights(self):
        self.target_nn.set_weights(self.main_nn.get_weights())
    
    

In [None]:
# before_training = "before_training.mp4"
# video = VideoRecorder(env, before_training)
stateSize = env.observation_space.shape[0]
actionSize = env.action_space.n

agent = Agent(stateSize, actionSize)
episodes = 1000
batch_size = 64
curFrame = 0
last_100_epi_reward = []

for epi in range(episodes):
    state = env.reset()
    ep_reward, done = 0, False
    step = 0
    while not done:
        # env.render()
        # video.capture_frame()
        state_in = tf.expand_dims(state, axis = 0)
        action = agent.chooseAction(env, state_in)
        next_state, reward, done, _ = env.step(action)
        ep_reward += reward

        agent.addBuffer(state, action, reward, done, next_state)
        state = next_state
        curFrame += 1

        if len(agent.buffer) >= batch_size:
            # states, actions, rewards, dones, newStates = agent.replayBuffer(batch_size)
            if step % 4 == 0:
                agent.train_nn(batch_size)
                # agent.epsilon = agent.update_epsilon(epi)
        if curFrame % 2000 == 0:
            agent.update_nn_weights()
        step += 1
        agent.update_epsilon(epi)
        # if done:    
        #     if step > 100:gv
        #         agent.update_nn_weights()

    if len(last_100_epi_reward) == 100:
        last_100_epi_reward = last_100_epi_reward[1:]
    last_100_epi_reward.append(ep_reward)

    if epi % 50 == 0:
        print(f'Episode {epi}/{episodes}. Epsilon: {agent.epsilon:.3f}. '
          f'Reward in last 100 episodes: {np.mean(last_100_epi_reward):.3f}'
          f'Step: {step}')
# video.close()
env.close()

In [None]:
from base64 import b64encode
def render_mp4(videopath: str) -> str:
  """
  Gets a string containing a b4-encoded version of the MP4 video
  at the specified path.
  """
  mp4 = open(videopath, 'rb').read()
  base64_encoded_mp4 = b64encode(mp4).decode()
  return f'<video width=400 controls><source src="data:video/mp4;' \
         f'base64,{base64_encoded_mp4}" type="video/mp4"></video>'

In [None]:
display = Display(visible=False, size=(1400, 900))
_ = display.start()
before_training = "before_training.mp4"
video = VideoRecorder(env, before_training)
for i in range(3):
    state = env.reset()
    done = False
    while not done:
        env.render()
        video.capture_frame()
        state_in = tf.expand_dims(state, axis = 0)
        action = agent.chooseAction(env, state_in)
        next_state, reward, done, _ = env.step(action)
        state = next_state
env.close()

In [None]:
video.close()
from IPython.display import HTML
html = render_mp4(before_training)
HTML(html)
