## Installation

In [4]:
!pip install gym[box2d] --user
!pip install matplotlib
!pip install torch
!pip install tqdm

















## Imports

In [4]:
import gym
import numpy as np
import matplotlib.pyplot as plt
from collections import deque
%matplotlib inline

# Pytorch imports
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.distributions as D

# Use GPU is possible else use CPU
device = torch.device('cpu')
if(torch.cuda.is_available()): 
    device = torch.device('cuda:0')
    torch.cuda.empty_cache()
    print("Device set to : " + str(torch.cuda.get_device_name(device)))
else:
    print("Device set to : cpu")

Device set to : cpu


## Test The Environment

In [5]:
env = gym.make('LunarLanderContinuous-v2')
state = env.reset()
print(state)
print("Sample Action: ", env.action_space.sample())
print("Obsevation Space Action: ", env.observation_space.shape)
print("Sample Observation: ", env.observation_space.sample())

[-0.00338373  1.4144926  -0.3427514   0.15877305  0.00392769  0.07763831
  0.          0.        ]
Sample Action:  [-0.56501263 -0.9725253 ]
Obsevation Space Action:  (8,)
Sample Observation:  [ 0.62856275  1.1637546   0.57594156  1.7596751   1.9705803   1.3826134
 -0.99700797 -0.6544249 ]


## Test With Random Walk

In [6]:
episodes = 10
for ep in range(episodes):
    
    states = env.reset()
    done = False
    score = 0
    
    while not done:
        action = env.action_space.sample()
        # print('Action Taken {}'.format(action))
        # https://stackoverflow.com/questions/73195438/openai-gyms-env-step-what-are-the-values
        next_state, reward, done , info = env.step(action)
        score += reward
        
    print('Episodes: {} Score {}'.format(ep, score))
    print('Next state: {}, reward: {}, done: {}, info {}'.format(next_state, reward, done, info))
    print('\n')
        
env.close()

Episodes: 0 Score -349.5996386662349
Next state: [-1.0066665  0.7347164 -1.8781767 -0.9616197  1.2742034  0.4709881
  0.         0.       ], reward: -100, done: True, info {}


Episodes: 1 Score -305.0449154537938
Next state: [ 0.08544865  0.0081467   1.3655968   0.12027487 -2.038954   -1.4922372
  1.          0.        ], reward: -100, done: True, info {}


Episodes: 2 Score -188.0681845213545
Next state: [ 0.6013529  -0.14706254  1.3968407  -0.7038593  -0.365758    2.090886
  1.          1.        ], reward: -100, done: True, info {}


Episodes: 3 Score -241.20542876323051
Next state: [ 0.89951193  0.1705021   0.560556    0.32094297 -1.3890476  -1.1396773
  1.          0.        ], reward: -100, done: True, info {}


Episodes: 4 Score -379.90295297848735
Next state: [-1.0117997   0.3439705  -1.9118421  -1.2424201   1.2993275   0.26892695
  0.          0.        ], reward: -100, done: True, info {}


Episodes: 5 Score -53.45371918525028
Next state: [-0.3593564  -0.07387808 -0.4460289 

In [7]:
state = torch.FloatTensor(env.observation_space.sample()).to("cpu").unsqueeze(0)
print(state)

tensor([[ 0.2850,  0.0501, -0.1043,  0.8141, -1.5025, -1.8337, -0.5464, -0.6183]])


### Trust Region Policy Optimization Algorithm

In [8]:
class ReplayBuffer():
    def __init__(self):
        super(ReplayBuffer, self).__init__()
        self.memory = []
    # Add replay memory
    def add(self, state, action, reward, next_state, done):
        # print('action: {}'.format(action))
        self.memory.append((state, action, reward, next_state, done))
    
    def sample(self):
        batch = self.memory
        '''
        np.stack: Join a sequence of arrays along a new axis.
        zip: returns a zip object, an iterator of tuples where the first item in each passed iterator is paired together, 
             and then the second item in each passed iterator are paired together etc.
        '''
        states, actions, rewards, next_states, dones = map(np.stack, zip(*batch))
        # Return the a set of trajectories of state, action, reward, next_state, done
        return  states, actions, rewards, next_states, dones
    
    def reset(self):
        self.memory = []

In [9]:
class ContinousPolicyNet(nn.Module):
    def __init__(self, state_num, min_action, max_action):
        super(ContinousPolicyNet, self).__init__()
        self.min_action = min_action
        self.max_action = max_action
        
        self.input = nn.Linear(state_num, 32)
        self.mu = nn.Linear(32, 2)
        self.std = nn.Linear(32, 2)
        
    def forward(self, x):
        x = F.relu(self.input(x))
        mu = (self.max_action - self.min_action) * torch.sigmoid(self.mu(x)) + self.min_action
        std = (self.max_action - self.min_action) * torch.sigmoid(self.std(x)) / 2
        return mu, std

In [10]:
class CriticNet(nn.Module):
    def __init__(self, state_num):
        super(CriticNet, self).__init__()
        self.input = nn.Linear(state_num, 32)
        self.output = nn.Linear(32, 2)
    
    def forward(self, x):
        x = F.relu(self.input(x))
        value = self.output(x)
        return value

In [11]:
class TRPO():
    # Gamma: Discount Factor, Delta: Gradient Descent Learning Rule
    def __init__(self, env, gamma=0.99, learning_rate=1e-3, delta=0.05):
        self.env = env
        '''
        Observation Space = 8: 
        1&2: coordinates of the lander in `x` & `y`
        3&4: linear velocities in `x` & `y`
        5&6: angle, angular velocity,
        7&8: two booleans that represent whether each leg is in contact with the ground or not
        Action Max = 1
        Action Min = -1
        ''' 
        self.state_num = self.env.observation_space.shape[0]
        self.action_max = float(env.action_space.high[0])
        self.action_min = float(env.action_space.low[0])
               
        # Torch
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        
        # Discount Factor setting
        self.gamma = gamma
        
        # Constraint
        self.delta = delta
        
        # Policy (actor)
        self.actor_net = ContinousPolicyNet(self.state_num, self.action_min, self.action_max).to(self.device)
        
        # Critic
        self.critic_net = CriticNet(self.state_num).to(self.device)
        '''
        Adam is a first-order optimizers
        https://pytorch.org/docs/stable/generated/torch.optim.Adam.html
        optima.Adam returns θt
        '''
        self.critic_opt = optim.Adam(self.critic_net.parameters(), lr=learning_rate)
        
        # Rollout
        self.memory = ReplayBuffer()
    
    # Get an action
    def get_action(self, state):
        state = torch.FloatTensor(state).to(self.device).unsqueeze(0)
        mu, std = self.actor_net(state)
        
        # print('mu: {}, std:{}'.format(mu.cpu().detach().numpy(),std.cpu().detach().numpy()))
        
        # first_action = D.Normal(mu, std).sample()
        # second_action = D.Normal(mu, std).sample()
        # first_action = first_action.cpu().detach().numpy()
        # second_action = second_action.cpu().detach().numpy()
        # action = [[first_action[0][0], second_action[0][0]]]
        action = D.Normal(mu, std).sample()
        action = action.cpu().detach().numpy()
        return action[0]
    
    """"""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""

    # Flatten a gradient
    def flat_grad(self, y, x, retain_graph=False, create_graph=False):
        retain_graph = True if create_graph == True else retain_graph
        '''
        autograd.grad: Computes and returns the sum of gradients of outputs with respect to the inputs
        https://pytorch.org/docs/stable/generated/torch.autograd.grad.html
        
        cat: Concatenates the given sequence of tensors in the given dimension
        https://pytorch.org/docs/stable/generated/torch.cat.html
        '''
        grad = torch.autograd.grad(y, x, retain_graph=retain_graph, create_graph=create_graph)
        grad = torch.cat([t.view(-1) for t in grad])
        return grad
    
    # Hessian vector product
    def hvp(self, d_kl, v, params, retain_graph):
        return self.flat_grad(d_kl @ v, params, retain_graph)
    
    # Conjugate gradient to calculate Ax = b
    def conjugate_gradient(self, A, d_kl, params, retain_graph, b, max_iterations=10):
        x = torch.zeros_like(b)
        r = b.clone() # b - Ax
        v = r.clone() # r
        
        for _ in range(max_iterations):
            Av = A(d_kl, v, params, retain_graph)
            alpha = (r @ r) / (v @ Av)
            x_new = x + alpha * v
            r = r - alpha * Av
            v = r - (r @ Av) / (v @ Av) * v
            x = x_new
        return x
    
    # Surrogate objective for maximizing
    def surrogate_objective(self, log_prob_old, log_prob_new, advantages):
        objective = advantages * torch.exp(log_prob_new - log_prob_old)
        return objective.mean()
       # KL divergence
    def kl_divergence(self, mu_old, std_old, logstd_old, mu_new, std_new, logstd_new):
        kl = (logstd_old - logstd_new) + (std_old.pow(2) + (mu_old - mu_new).pow(2)) / (2.0 * std_new.pow(2)) - 0.5
        return kl.sum()
    
    # Flatten a gradient
    def flat_grad(self, y, x, retain_graph=False, create_graph=False):
        retain_graph = True if create_graph == True else retain_graph
        grad = torch.autograd.grad(y, x, retain_graph=retain_graph, create_graph=create_graph)
        grad = torch.cat([t.view(-1) for t in grad])
        return grad
    
    # Update a parameter from flattend gradient
    def param_update(self, policy_net, flattened_grad):
        index = 0
        for param in policy_net.parameters():
            param_length = param.numel()
            grad = flattened_grad[index : index+param_length].view(param.shape)
            param.data += grad
            index += param_length
            
    """"""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""
    def learn(self):
        # Get memory from rollout
        states, actions, rewards, next_states, dones = self.memory.sample()
        states = torch.FloatTensor(states).to(self.device)
        actions = torch.FloatTensor(actions).to(self.device)
        rewards = torch.FloatTensor(rewards).to(self.device)
        next_state = torch.FloatTensor(next_states[-1]).to(self.device)
        done = dones[-1]
        
        # Critic network
        values = self.critic_net(states)
        next_value = self.critic_net(next_state)
        
        # Calculate target values and advantages
        R = [0] * (actions.size(dim=0) + 1)
        R[-1] = next_value if not done else 0
        for i in reversed(range(len(R)-1)):
            R[i] = rewards[i] + self.gamma * R[i+1]
        R = torch.FloatTensor(R[:-1]).to(self.device).view(-1,1)
        
        # Calculate and normalize advantages to reduce skewness and improve convergence
        advantages = R.detach() - values
        advantages = ((advantages - advantages.mean()) / advantages.std()).view(1, -1) if len(advantages) > 1 else advantages

        # Calculate critic losses and optimize the critic network
        critic_loss = 0.5 * advantages.pow(2).mean()
        self.critic_opt.zero_grad()
        critic_loss.backward()
        self.critic_opt.step()
        
        # Get pi theta old
        mu_old, std_old = self.actor_net(states)
        dist_old = D.Normal(mu_old, std_old)
        
        '''
        Error here, matrix size problem
        '''
        log_probs_old = dist_old.log_prob(actions)
        
        # Compute L and KL
        L_old = self.surrogate_objective(log_probs_old.detach(), log_probs_old, advantages)
        KL_old = self.kl_divergence(mu_old.detach(), std_old.detach(), log_probs_old.detach(), mu_old, std_old, log_probs_old)
        
        # Policy network parameters
        params = list(self.actor_net.parameters())

        # Set the g and kl gradient
        g = self.flat_grad(L_old, params, retain_graph=True)
        d_kl = self.flat_grad(KL_old, params, create_graph=True)
        
        # s ia a search direction and beta is a maximal step length
        s = self.conjugate_gradient(self.hvp, d_kl, params, True, g)
        beta = torch.sqrt(2 * self.delta / (s @ self.hvp(d_kl, s, params, True)))
        max_step = beta * s

        # Line search
        for i in range(10):
            # Set the step size
            step = (0.9 ** i) * max_step
            
            # Apply parameters' update
            self.param_update(self.actor_net, step)

            with torch.no_grad():            
                # Get pi theta new after updating the network
                mu_new, std_new = self.actor_net(states)
                dist_new = D.Normal(mu_new, std_new)
                log_probs_new = dist_new.log_prob(actions)
                
                # Compute L and KL after updating the network
                L_new = self.surrogate_objective(log_probs_old.detach(), log_probs_new, advantages)
                KL_new = self.kl_divergence(mu_old.detach(), std_old.detach(), log_probs_old.detach(), mu_new, std_new, log_probs_new)

            # Calculate the improvement of objective value
            L_improvement = L_new - L_old
            
            # If the improvement of L is positive and the kl value is lower than delta, fix the parameters
            if L_improvement > 0 and KL_new <= self.delta:
                break
            
            # Else, reset the parameters
            self.param_update(self.actor_net, -step)
            
        # Reset the memory
        self.memory.reset()
    

### Testing

In [12]:
def main():
    env = gym.make('LunarLanderContinuous-v2')

    agent = TRPO(env, gamma=0.99, learning_rate=1e-6, delta=0.01)
    ep_rewards = deque(maxlen=20)
    total_episode = 1
    
    state = env.reset()
    action = agent.get_action(state)
    next_state, reward, done , _ = env.step(action)
    print(action)
    print(next_state)
    print(reward, done)

    for i in range(total_episode):
        state = env.reset()
        rewards = []
        
        while True:
            action = agent.get_action(state)
            next_state, reward, done , _ = env.step(action)
            agent.memory.add(state, action, reward, next_state, done)
            rewards.append(reward)
            if done:
                agent.learn()
                ep_rewards.append(sum(rewards))
                if i % 20 == 0:
                    print("episode: {}\treward: {}".format(i, round(np.mean(ep_rewards), 3)))
                break
            state = next_state

In [None]:
main()