In [1]:
from casadi import *
%matplotlib widget
import matplotlib.pyplot as plt
#%matplotlib inline

t = SX.sym('t')
l = SX.sym('l',2)
m = SX.sym('m',3)
phi = SX.sym('phi',3)
dphi = SX.sym('dphi',3)

G = SX.sym('g')
g = vcat([0,G])

u = SX.sym('u')
f = vcat([u,0,0])

P1 = vcat([phi[0],0])
P2 = vcat([l[0] * cos(phi[1]), l[0] * sin(phi[1])]) + P1
P3 = vcat([l[1] * cos(phi[2]), l[1] * sin(phi[2])]) + P2
P = horzcat(P1,P2,P3)

J1 = jacobian(P1,phi)
J2 = jacobian(P2,phi)
J3 = jacobian(P3,phi)
J = [J1,J2,J3]

T = SX([0])
U = SX([0])
for i in range(3):
    T += m[i]/2 *(J[i] @ dphi).T @ J[i] @ dphi 
    U += m[i] * g.T @ P[:,i]

def diff_time(J,x,dx):
    J_dot = []
    for vect in horzsplit(J):
        J_dot.append(jacobian(vect,x) @ dx)
    J_dot = hcat(J_dot)
    return J_dot

J1_dot = diff_time(J1,phi,dphi)
J2_dot = diff_time(J2,phi,dphi)
J3_dot = diff_time(J3,phi,dphi)
J_dot = [J1_dot,J2_dot,J3_dot]

A = SX(3,3)
B = SX(3,3)
C = SX(3,3)
D = SX(3,2)
for i in range(3):
    A += m[i] * (J_dot[i].T @ J[i] + J[i].T @ J_dot[i])
    B += m[i] * (J[i].T @ J[i])
    C += m[i] * (J_dot[i].T @ J[i])
    D += m[i] * (J[i].T)
rhs = solve(B,(f + (C - A) @ dphi - D @ g))
rhs = vertcat(dphi,rhs)

m_real = SX([1,1,1])
l_real = SX([1,1])
G_real = SX([10])

my_rhs = substitute([rhs],[m,l,G],[m_real,l_real,G_real])[0]
my_rhs = Function('rhs',[phi,dphi,u],[my_rhs])

energy = substitute([T + U],[m,l,G],[m_real,l_real,G_real])[0]
energy = Function('energy',[phi,dphi],[energy])

my_P = substitute([P],[m,l],[m_real,l_real])
my_P = Function('P',[phi],my_P)

In [2]:
P1 = vcat([phi[0],0])
P2 = vcat([l[0] * cos(phi[1]), l[0] * sin(phi[1])]) + P1
P3 = vcat([l[1] * cos(phi[2]), l[1] * sin(phi[2])]) + P2
P = horzcat(P1,P2,P3)

J1 = jacobian(P1,phi)
J2 = jacobian(P2,phi)
J3 = jacobian(P3,phi)
J = [J1,J2,J3]

T = SX([0])
U = SX([0])
for i in range(3):
    T += m[i]/2 *(J[i] @ dphi).T @ J[i] @ dphi 
    U += m[i] * g.T @ P[:,i]

def diff_time(J,x,dx):
    J_dot = []
    for vect in horzsplit(J):
        J_dot.append(jacobian(vect,x) @ dx)
    J_dot = hcat(J_dot)
    return J_dot

J1_dot = diff_time(J1,phi,dphi)
J2_dot = diff_time(J2,phi,dphi)
J3_dot = diff_time(J3,phi,dphi)
J_dot = [J1_dot,J2_dot,J3_dot]

A = SX(3,3)
B = SX(3,3)
C = SX(3,3)
D = SX(3,2)
for i in range(3):
    A += m[i] * (J_dot[i].T @ J[i] + J[i].T @ J_dot[i])
    B += m[i] * (J[i].T @ J[i])
    C += m[i] * (J_dot[i].T @ J[i])
    D += m[i] * (J[i].T)
rhs = solve(B,(f + (C - A) @ dphi - D @ g))
rhs = vertcat(dphi,rhs)

m_real = SX([1,1,1])
l_real = SX([1,1])
G_real = SX([10])

my_rhs = substitute([rhs],[m,l,G],[m_real,l_real,G_real])[0]
my_rhs = Function('rhs',[phi,dphi,u],[my_rhs])

energy = substitute([T + U],[m,l,G],[m_real,l_real,G_real])[0]
energy = Function('energy',[phi,dphi],[energy])

my_P = substitute([P],[m,l],[m_real,l_real])
my_P = Function('P',[phi],my_P)

In [3]:
from scipy import integrate
def get_next_state(state,u,dt):
    return integrate.odeint(lambda x,t: my_rhs.call([x[:3],x[3:],[u]])[0].T.full()[0] , state, [0,dt])[1]
def state_to_coords(state):
    return my_P.call([state[:3]])[0].full()
def get_energy(state):
     return energy.call([state[:3],state[3:]])[0].full()[0]

In [10]:
from torch import nn
import torch
from torch.utils.tensorboard import SummaryWriter
import gym
from torch.distributions import Normal
from gym.spaces import Box
import random
from IPython.display import clear_output
from torch.distributions import MultivariateNormal

In [11]:
state0 = np.array([0,np.pi/2,np.pi/2,
                0,0,0])
state = state0
state

array([0.        , 1.57079633, 1.57079633, 0.        , 0.        ,
       0.        ])

# Environment

In [12]:
class DoublePendulumEnv(gym.Env):
    
    def __init__(self, init_state, dt = 0.1):
        self.action_space = Box(low = -2, high = 2)
        self.observation_space = 6
        self.state = init_state
        self.init_state = init_state
        self.dt = dt
        print('Environment initialized')
        self.init_coords = state_to_coords(init_state)

    def _take_action(self,action):
        self.state = get_next_state(self.state,action,self.dt)

    def _reward_function(self, done):
        """
        if Condition 1: Check if the absolute angle of pole 1 is less than 10 degrees.
            ->> agent will get reward between [0, 0.5]
            if Condition 2: Check if the absolute angle of pole 2 is less than 10 degrees.
                ->> agent will get reward from previous condition plus another reward in range [0,0.5]
                ->> therefore, at equilibrium, reward = 1.0
        else:
            absolute angle of pole 1 is greater than 10 degrees and the state space associated with this state of environment
            is not worth exploring. Therefore, it makes sense to terminate the environment and reset/restart.
            reward = -1

        This means that agent will collect more reward trying to stay in arc of +10, -10 degrees. Every time, agent's
        action leads to a state outside the desired area, agent will get negative reward and a direct termination.

        Condition 3: If cart position is far away from the initial state then there will be a penalty

        Condition 4: If cart pole is not in the same line with cart then it will give additional penalty
        
        Condition 5: velocity penalty (halves the reward if spinning too fast)
    

        """
        state = self.state
        reward = 0
        # degree reward
        normalized_angle_1 = np.degrees(normalize_angle(state[1]))
        normalized_angle_2 = np.degrees(normalize_angle(state[2]))

#         if normalized_angle_2 > 80 and normalized_angle_2 < 110 :
#             reward = 9 -  (90 - normalized_angle_2)*0.1
# #             if np.abs(np.degrees(state[2])) < 100:
# #                 reward = reward + 9 - (90 - np.degrees(state[2]))*0.1
# #                 reward *= 1
#         else:
#             reward += -10
#             done = True
            
        #another degree reward system
        cost = 10*(normalized_angle_1 - np.pi) + \
               10*(normalized_angle_2 - np.pi)

        reward = -cost

# degree_reward for staying upright
        
        
#         deg_reward = ((np.sin(state[1]))*10 + (np.sin(state[2]))*10)/2
#         #if np.sin(state[1]
#         reward += deg_reward
#         print(state[1])
                  
        
        # distance penalty
        if state[0] < 2  and state[0] > -2:
            reward += 5
        else:
            reward -= -50
            done = True
            
        # distance2 rew
#         state_coords = state_to_coords(state)
#        # dist_pen = (state_coords[0][1] - state_coords[0][0])**2 +  (state_coords[0][2] - state_coords[0][0])**2
#         dist_rew =  self.init_coords[1][1] + self.init_coords[1][2] + ( state_coords[1][1] - self.init_coords[1][1])*5 +  ( state_coords[1][2] - self.init_coords[1][2])*5
#         reward += dist_pen
        
        
     
    
        #velocity penalty
        vel_pen = ((1 + np.exp(-0.5 * state[-3:] ** 2)) / 2).sum()/10
        reward -= vel_pen
       
       
        return reward, done
        
    def step(self, action):
        """
        observation -  [x,phi,theta,dx,dphi,dtheta]
        Num     Observation               Min                     Max
        0       Cart Position             -4 m                  4 m
        1       Pole1 Angle               -pi                     +pi
        2       Pole2 Angle               -pi                     +pi
        3       Cart Velocity             -Inf                    Inf
        4       Pole1 Angular Velocity    -Inf                    Inf
        5       Pole1 Angular Velocity    -Inf                    Inf
        
        """
        done = False
        info = {}
        self._take_action(action)
        
        #alive_rew = 10
        reward, done = self._reward_function(done)
        return np.array(self.state), reward, done, info

    def render(self):
        """
        Compute the render frames as specified by render_mode attribute during initialization of the environment.

        """
        state = self.state
        ani = animation.FuncAnimation(fig, animate, frames=300,
                              interval=20, blit=True, init_func=init)
        plt.show()
    def reset(self):
        """
        Resets the environment to an initial state and returns the initial observation.
        """
        self.rew_sum = 0
        self.state = self.init_state
        
        return  np.array(self.state)

In [13]:
class ReplayBuffer:
    def __init__(self, capacity):
        self.capacity = capacity
        self.buffer = []
        self.position = 0
    
    def push(self, state, action, reward, next_state, done):
        if len(self.buffer) < self.capacity:
            self.buffer.append(None)
        self.buffer[self.position] = (state, action, reward, next_state, done)
        self.position = (self.position + 1) % self.capacity
    
    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        state, action, reward, next_state, done = map(np.stack, zip(*batch))
        return state, action, reward, next_state, done
    
    def __len__(self):
        return len(self.buffer)

In [14]:
class ActorCritic(nn.Module):
    def __init__(self, state_dim, action_dim, action_std_init, hidden_size = 528):
        super(ActorCritic, self).__init__()

        self.action_dim = action_dim
        # Instead of directly using variane as input to normal distribution, standard deviation is set as hyperparameter 
        # and used to calculate the variance.
        self.action_var = torch.full((action_dim,), action_std_init * action_std_init)
        # Actor NN
        self.hidden_size = hidden_size
        self.actor = nn.Sequential(
            nn.Linear(state_dim, self.hidden_size),
            nn.Tanh(),
            nn.Linear(self.hidden_size, self.hidden_size),
            nn.Tanh(),
            nn.Linear(self.hidden_size, self.hidden_size),
            nn.Tanh(),
            nn.Dropout(),
            nn.Linear(self.hidden_size, self.hidden_size),
            nn.Tanh(),
            nn.Dropout(),
            nn.Linear(self.hidden_size, action_dim),
            nn.Tanh()
        )
        # Critic NN
        self.critic = nn.Sequential(
                        nn.Linear(state_dim, self.hidden_size),
                        nn.Tanh(),
                        nn.Linear(self.hidden_size, self.hidden_size),
                        nn.Tanh(),
                        nn.Dropout(),
                        nn.Linear(self.hidden_size, self.hidden_size),
                        nn.Tanh(),
                        nn.Dropout(),
                        nn.Linear(self.hidden_size, self.hidden_size),
                        nn.Tanh(),
                        nn.Dropout(),
                        nn.Linear(self.hidden_size, 1)
                    )

    def set_action_std(self, new_action_std):
        """
        Performance of PPO is sensitive to standard deviation. The standard deviation is decaying by 0.05 
        every 90000 timestep. This function sets new standard deviation to be used while creating normal distribution.
        """
        self.action_var = torch.full((self.action_dim,), new_action_std * new_action_std)

    def get_action(self, state):
        """
        Called during sampling phase.
        Passing a State through actor network to get the mean and plot normal distribution based on that mean to sample
        an action.
        """
        action_mean = self.actor(state)                            # output of actor network
        cov_mat = torch.diag(self.action_var).unsqueeze(dim=0)     # Variance of Normal distribution.
        policy = MultivariateNormal(action_mean, cov_mat)          # Generating a policy based on (mean, variance)

        action = policy.sample()                                   # Action sampeled from policy and to be applied.
        action_logprob = policy.log_prob(action)                   # log of prob. of that action given the distribution.

        # Since, we are only interested in the action and its prob., we do not perform SGD on them and can detach the 
        # computational graph associated with it.
        return action.detach(), action_logprob.detach()             
    

    def evaluate(self, state, action):
        """
        Called during update phase.
        In order to calculate the ratio of prob. of taking an action given a state under new policy, we need to
        pass the old sampled state and action taken old policy and get mean, value and logprob under new policy.
        New policy means updated model weights compared to the weights using which the action and value was approximated
        during sampling phase.
        """
        action_mean = self.actor(state)

        action_var = self.action_var.expand_as(action_mean)
        cov_mat = torch.diag_embed(action_var)
        policy = MultivariateNormal(action_mean, cov_mat)

        if self.action_dim == 1:
            action = action.reshape(-1, self.action_dim)
        action_logprobs = policy.log_prob(action)
        policy_entropy = policy.entropy()
        state_values = self.critic(state)
        # The 'action_logprobs' will be used to calculate the ratio for surrogate loss.
        # The 'state_values' will be used to calculate the MSE for critic loss.
        # The 'policy_entropy' will be used give bonus for exploration in final loss.
        return action_logprobs, state_values, policy_entropy

In [None]:
class PPO:
    def __init__(self, state_dim, action_dim, lr_actor, lr_critic, gamma, K_epochs, eps_clip, action_std_init=0.6):
        self.action_std = action_std_init

        self.gamma = gamma                                  # Discount factor
        self.eps_clip = eps_clip                            # Clipping range for Clipped Surrogate Function.
        self.K_epochs = K_epochs                            # Total number of epochs.

        self.buffer = ReplayBuffer()
        
        self.policy = ActorCritic(state_dim, action_dim, action_std_init)
        # Adam Optimizer to perform optimization given the loss on NN parameters.
        self.optimizer = torch.optim.Adam([
            {'params': self.policy.actor.parameters(), 'lr': lr_actor},
            {'params': self.policy.critic.parameters(), 'lr': lr_critic}
        ])
        
        # Sampling is always done under old policy(aka old weights) under the evaluation for update step is always done
        # under new policy(aka update/new weights). After 800 timestep * Number of Epochs,
        # old_policy parameters set same as new_policy parameters. 
        self.policy_old = ActorCritic(state_dim, action_dim, action_std_init)
        self.policy_old.load_state_dict(self.policy.state_dict())
        
        # Mean Squared Error 
        self.MseLoss = nn.MSELoss()

    def set_action_std(self, new_action_std):
        # Setting new standard deviation.
        self.action_std = new_action_std
        self.policy.set_action_std(new_action_std)
        self.policy_old.set_action_std(new_action_std)

    def decay_action_std(self, action_std_decay_rate, min_action_std):
        print("--------------------------------------------------------------------------------------------")
        # Calculate the new standard deviation corresponding to decay rate.
        self.action_std = self.action_std - action_std_decay_rate
        self.action_std = round(self.action_std, 2)
        # In case, standard deviation decays below threshold, set it to minimum.
        if self.action_std <= min_action_std:
            self.action_std = min_action_std
            print("setting actor output action_std to min_action_std : ", self.action_std)
        else:
            print("setting actor output action_std to : ", self.action_std)
        self.set_action_std(self.action_std)

    def select_action(self, state):
        """
        Selecting action under old policy during sampling phase. At same time, save the state and reward into 
        rollout buffer to be used during evaluation.
        """
        with torch.no_grad():
            state = torch.FloatTensor(state)
            action, action_logprob = self.policy_old.get_action(state)

        self.buffer.states.append(state)
        self.buffer.actions.append(action)
        self.buffer.logprobs.append(action_logprob)

        return action.detach().numpy().flatten()

    def update(self):
        """
        Note: Adam is Gradient descent algorithm. Therefore, it tries to find global minimum. For surrogate loss, 
        gradient ascent is requirred and therefore a negative surrogate loss is used during optimization using
        Adam. Maximization of loss function is equal to minimizing a negative loss function.
        """
        # Using Monte Carlo estimates of return we can calcuate the advantage function.
        rewards = []
        discounted_reward = 0
        
        # reward for terminal state is zero. Starting from terminal state, add the discounted reward collected
        # till the initial state and store them into rewards list. So, no bootstrapping.
        
        for reward, done in zip(reversed(self.buffer.rewards), reversed(self.buffer.dones)):
            if done:
                discounted_reward = 0
            discounted_reward = reward + (self.gamma * discounted_reward)
            rewards.insert(0, discounted_reward)

        # By normalizing the rewards, variance in advantage is reduced further.
        rewards = torch.tensor(rewards, dtype=torch.float32)
        rewards = (rewards - rewards.mean()) / (rewards.std() + 1e-7) # to avoid division by zero.

        # The buffer has multiple list. The list as it is cannot be passed to the NN made using pytorch.
        # Pytorch works with tensors and therefore, a conversion is done.
        
        old_states = torch.squeeze(torch.stack(self.buffer.states, dim=0)).detach()
        old_actions = torch.squeeze(torch.stack(self.buffer.actions, dim=0)).detach()
        old_logprobs = torch.squeeze(torch.stack(self.buffer.logprobs, dim=0)).detach()
        
        # Using the sampled date from old policy, calculate loss over multiple trajectories and perform optimization.
        # Data/ state tuple corresponding to 800 time steps is processed 'K_epochs' times before updating old_policy
        # parameters to same as new updated policy parameters.
        
        for _ in range(self.K_epochs):
            logprobs, state_values, policy_entropy = self.policy.evaluate(old_states, old_actions)
            state_values = torch.squeeze(state_values)

            # Finding the ratio (pi_theta / pi_theta__old)
            ratios = torch.exp(logprobs - old_logprobs.detach())

            # Finding surrogate loss and then maximizing it. Since, maximization of surrogate loss corresponds to 
            # increase in prob. of action which gives higher reward given the state. 
            advantages = rewards - state_values.detach()
            surr1 = ratios * advantages
            surr2 = torch.clamp(ratios, 1 - self.eps_clip, 1 + self.eps_clip) * advantages
            policy_loss = torch.min(surr1, surr2)

            # Combined loss of PPO
            value_loss = self.MseLoss(state_values, rewards)
            
            loss = -policy_loss + 0.5 * value_loss - 0.01 * policy_entropy

            # Performing Optimization step.
            self.optimizer.zero_grad()
            loss.mean().backward()
            self.optimizer.step()

        # Finally, old_policy is same as new _policy until next update phase.
        self.policy_old.load_state_dict(self.policy.state_dict())
        # Prepare buffer for next round of sampling by clearing all previous entries.
        self.buffer.clear()

    def save(self, checkpoint_path):
        """
        Save the weights and biases of old policy to be used later for evaluating PPO performance via rendering the 
        Environment.
        """
        torch.save(self.policy_old.state_dict(), checkpoint_path)

    def load(self, checkpoint_path):
        """
        Load previously trained model parameters for testing.
        """
        self.policy_old.load_state_dict(torch.load(checkpoint_path, map_location=lambda storage, loc: storage))
        self.policy.load_state_dict(torch.load(checkpoint_path, map_location=lambda storage, loc: storage))