Importing required libraries


In [0]:
import os
import time
import random
import numpy as np
import gym
# import pybullet_envs
import matplotlib.pyplot as plt
import torch 
import torch.nn as nn
import torch.nn.functional as F
from gym import wrappers
from torch.autograd import Variable
from collections import deque


Experience replay memory

In [0]:
class ReplayMemory(object):

    def __init__(self, max_size = 1e6):
        self.max_size = max_size
        self.storage = []
        self.ptr = 0

    def add(self, transition):
        if len(self.storage) == self.max_size:
            self.storage[self.ptr] = transition
            self.ptr = (self.position + 1) % self.max_size
        else:
            self.storage.append(None)


    def sample(self, batch_size):
        index = np.random.randint(0,len(self.storage),batch_size)
        batch_states, batch_next_states, batch_actions, batch_rewards, \
        batch_done = [],[],[],[],[]
        for i in ind :
            state,next_state,action,reward,done = self.storage[i]
            batch_states.append(np.array(state, copy = False))
            batch_next_states.append(np.array(next_state, copy = False))
            batch_actions.append(np.array(action, copy = False))
            batch_rewards.append(np.array(reward, copy = False))
            batch_done.append(np.array(done, copy = False))
        return np.array(batch_states), np.array(batch_next_states), \
              np.array(batch_actions), np.array(batch_rewards).reshape(-1,1), \
              np.array(batch_done).reshape(-1,1)

Neural Network for   **ACTOR model**   and    **ACTOR Target**

In [0]:
class Actor(nn.Module):

  def __init__(self,state_dims, action_dims, max_action):
    # activate Inheritance , Intialize all the variables of Parent class
    super(Actor,self).__init__()
    self.layer_1 = nn.Linear(state_dims, 400)
    self.layer_2 = nn.Linear(400,300)
    self.layer_3 = nn.Linear(300,action_dims)
    #max_action is to clip in case we added too much noise
    self.max_action = max_action

  def forward(self,x):
    x = F.relu(self.layer_1(x))
    x = F.relu(self.layer_2(x))
    x = self.max_action * torch.tanh(self.layer_3(x))
    return x

DNN for two  **Critic model** and **Critic Target**

In [0]:
class Critic(nn.Module):

  def __init__(self,state_dims, action_dims):
    # activate Inheritance , Intialize all the variables of Parent class
    super(Actor,self).__init__()
    # First Critic Network
    self.layer_1 = nn.Linear(state_dims + action_dims, 400)
    self.layer_2 = nn.Linear(400,300)
    self.layer_3 = nn.Linear(300,action_dims)

    # Second Critic Network
    self.layer_4 = nn.Linear(state_dims + action_dims, 400)
    self.layer_5 = nn.Linear(400,300)
    self.layer_6 = nn.Linear(300,action_dims)


  def forward(self, x, u): # x- state , u = action
    xu = torch.cat([x,u], 1) # Conacatenation of states and actions as input
    # forward propagation on First Critic 
    x1 = F.relu(self.layer_1(xu))
    x1 = F.relu(self.layer_2(x1))
    x1 = self.layer_3(x1)

    # forward propagation on Second Critic 
    x2 = F.relu(self.layer_4(xu))
    x2 = F.relu(self.layer_5(x2))
    x2 = self.layer_6(x2)

    return x1,x2


  def Q1(self, x, u): # For updating Q values
    xu = torch.cat([x,u],1)
    x1 = F.relu(self.layer_1(xu))
    x1 = F.relu(self.layer_2(x1))
    x1 = self.layer_3(x1)

    return x1

In [0]:
device = torch.device('cuda' if torch.cuda.is_available()  else 'cpu')


**T3D**

In [0]:
class T3D(object):

    def __init__(self,state_dims, action_dims, max_action):

      self.actor = Actor(state_dims, action_dims, max_action).to(device)
      self.actor_target = Actor(state_dims, action_dims, max_action).to(device)
      # Intializing with model weights to keeo them same
      self.actor_target.load_state_dict(self.actor.state_dict)
      self.actor_optimizer = torch.optim.Adam(self.actor.parameters())

      self.critic = Critic(state_dims, action_dims).to(device)
      self.critic_target = Critic(state_dims, action_dims).to(device)
      # Intializing with model weights to keeo them same
      self.critic_target.load_state_dict(self.critic.state_dict)
      self.critic_optimizer = torch.optim.Adam(self.critic.parameters())

      self.max_action = max_action

    def select_action(self,state):
      state = torch.Tensor(state.reshape(1.-1)).to(device)
      return self.actor(state).cpu().data.numpy().flatten()

    def train(self, replay_buffer, iterations, batch_size=100,
              discount=0.99, tau = 0.005, policy_noise_clip = 0.5, 
              policy_freq = 2):
      for it in range(iterations):
        # Step 4 We sample from a batch of transitions (s,s',a,r) from memory
        batch_states, batch_next_states, batch_actions, batch_rewards, \
        batch_done = replay_buffer.sample(batch_size) 
        state = torch.Tensor(batch_states).to(device)
        next_state = torch.Tensor(batch_next_states).to(device)
        action = torch.Tensor(batch_actions).to(device)
        reward = torch.Tensor(batch_reward).to(device)
        done = torch.Tensor(batch_done).to(device)
  
        # Step 5 : From the next state s',the actor target plays the next actions a'
        next_action = self.actor_target.forward(next_state)

        # Step-6 We add Gaussian noise to this next action a' and and we clamp
        # it in a range of values supported by this environment
        noise = torch.Tensor(batch_actions).data.normal_(0,policy_noise).to(device)
        noise = noise.clamp(-noise_clip, noise_clip)
        next_action = (next_action + noise).clamp(-self.max_action, self.max_action)

        # Step-7  The two critic models takes input (s' , a') and return two Q values, 
        # Qt1(s',a') and Qt2(s' ,a') as outputs)
        targetQ1, targetQ2 = self.critic_target.forward (next_state, next_action)

        #Step-8 : Take the minimum of these two Q-values
        target_Q = torch.min(target_Q1, target_Q2)

        # Step-9 : We get final target of the two critic model,which is:
        #Qt = r + gamma*min(Qt1,Qt2)
        target_Q = reward + ((1-done) * discount * target_Q).detach()

        # Step 10 :The two critic models take each the couple(s,a)
        # as input and return two  Q values
        current_Q1, current_Q2 = self.critic.forward(state, action)

        # Step-11 : We compute the loss coming from the two Critic models
        critic_loss = F.mse_loss(current_Q1,target_Q) + F.mse_loss(current_Q2,target_Q)

        #Step-12 We backpropagate this critic Loss and update the parameters
        #of the two critic models with a Adam optimizer
        self.critic_optimizer.zero_grad()
        critic_loss.backward()
        self.critic_optimizer.step()


        #Step 13: Once every two iterations , we update our Actor model by
        # performing gradient ascent on the output off the first critic model
        if it % policy_freq == 0:
          #DPG
          actor_loss = -(self.critic.Q1(state, self.actor(state)).mean())
          self.actor_optimizer.grad_zero()
          actor_loss.backward()
          self.actor_optimizer.step()

          #Step-14: Still once every two iterations, we update the weights of the
          # of the Actor target by Polyak averaging
          for param, target_param in zip(self.actor.parameters(),self.actor_target.parameters()):
            target_param.data.copy_(tau * param.data + (1-tau) * target_param.data)

          #Step-15 : Still once every two iterations, we update the weights of the 
          #Critic target by Polyak averaging
          for param, target_param in zip(self.critic.parameters(),self.critic_target.parameters()):
            target_param.data.copy_(tau * param.data + (1-tau) * target_param.data)
