In [None]:
!pip install renderlab  # library used for rendering gym envs on colab

In [None]:
!pip install --upgrade sympy

In [2]:
import gymnasium as gym
import renderlab as rl

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical, Normal
from torch.optim.adam import Adam
import math
import numpy as np
import random
from torch.utils.tensorboard import SummaryWriter

# config = {

#         "learning_rate" : 0.0002,
#         "gamma" : 0.98,           # reward discount factor
#         "lmbda" : 0.95,           # for GAE in PPOtorch
#         "eps_clip" : 0.2,         # for ratio clipping in PPO
#         "K_epoch" : 10,            # how many times you repeatedly reuse your data
#         "T_horizon" : 20,         # number of transition in single minibatch
#         "n_states" : 4+1,         # state space. Additional one dimension for [time]
#         "n_actions": 2,           # action space
#         "n_skills" : 1,           # In DIAYN, dimension of discrete skills
#         "n_train_episode" : 1000, # total number of episodes for training
#         "print_interval" : 10,    # to see the training progress
#         "entropy_coeff": 0.1,
#     }

config = {
    'learning_rate'  : 0.0003,
    'gamma'           : 0.9,
    'lmbda'           : 0.9,
    'eps_clip'        : 0.2,
    'K_epoch'         : 10,
    'rollout_len'    : 3,
    'buffer_size'    : 10,
    'minibatch_size' : 32,
    "entropy_coeff": 0.001,
}

In [None]:
class PPO(nn.Module):
    def __init__(self, config):
        super(PPO, self).__init__()
        self.data = []
        self.entropy_coeff = config['entropy_coeff']
        self.learning_rate = config['learning_rate']
        self.fc1   = nn.Linear(3,128)
        self.fc_mu = nn.Linear(128,1)
        self.fc_std  = nn.Linear(128,1)
        self.fc_v = nn.Linear(128,1)
        self.fc_dv = nn.Linear(128,1)
        self.target_network = nn.Sequential(nn.Linear(3,128), nn.ReLU(), nn.Linear(128,1))
        self.prediction_network = nn.Sequential(nn.Linear(3,128), nn.ReLU(), nn.Linear(128,1))
        self.optimizer = optim.Adam(self.parameters(), lr=self.learning_rate)
        self.optimization_step = 0
        self.buffer_size = config['buffer_size']
        self.minibatch_size = config['minibatch_size']
        self.rollout_len = config['rollout_len']
        self.eps_clip = config['eps_clip']
        self.gamma = config['gamma']
        self.lmbda = config['lmbda']
        self.K_epoch = config['K_epoch']

    def pi(self, x, softmax_dim = 0):
        x = F.relu(self.fc1(x))
        mu = 2.0*torch.tanh(self.fc_mu(x))
        std = F.softplus(self.fc_std(x))
        return mu, std

    def v(self, x):
        x = F.relu(self.fc1(x))
        v = self.fc_v(x)

        return v
    def dv(self, x):
        x = F.relu(self.fc1(x))
        dv = self.fc_dv(x)

        return dv
    def put_data(self, transition):
        self.data.append(transition)

    def make_batch(self):
        s_batch, a_batch, r_batch, rnd_batch, s_prime_batch, prob_a_batch, done_batch = [], [], [], [], [], [],[]
        data = []

        for j in range(self.buffer_size):
            for i in range(self.minibatch_size):
                rollout = self.data.pop()
                s_lst, a_lst, r_lst, rnd_lst,s_prime_lst, prob_a_lst, done_lst = [], [], [], [], [], [], []

                for transition in rollout:
                    s, a, r, s_prime, prob_a, done = transition

                    s_lst.append(s)
                    a_lst.append([a])
                    r_lst.append([r[0]])
                    rnd_lst.append([r[1]])
                    s_prime_lst.append(s_prime)
                    prob_a_lst.append([prob_a])
                    done_mask = 0 if done else 1
                    done_lst.append([done_mask])

                s_batch.append(s_lst)
                a_batch.append(a_lst)
                r_batch.append(r_lst)
                rnd_batch.append(rnd_lst)
                s_prime_batch.append(s_prime_lst)
                prob_a_batch.append(prob_a_lst)
                done_batch.append(done_lst)

            mini_batch = torch.tensor(s_batch, dtype=torch.float), torch.tensor(a_batch, dtype=torch.float), \
                          torch.tensor(r_batch, dtype=torch.float), torch.tensor(rnd_batch, dtype=torch.float),\
                          torch.tensor(s_prime_batch, dtype=torch.float), \
                          torch.tensor(done_batch, dtype=torch.float), torch.tensor(prob_a_batch, dtype=torch.float)
            data.append(mini_batch)

        return data

    def calc_advantage(self, data):
        data_with_adv = []
        for mini_batch in data:
            s, a, r, rnd_r,s_prime, done_mask, old_log_prob = mini_batch
            with torch.no_grad():
                # print(self.gamma)
                # print(r[0].shape)
                # print(self.v(s_prime).shape)
                # print(done_mask.shape)
                td_target = r + self.gamma * self.v(s_prime) * done_mask
                delta = td_target - self.v(s)
                div_td_target = rnd_r + self.gamma * self.dv(s_prime) * done_mask
                div_delta = div_td_target - self.dv(s)
            delta = delta.numpy()
            div_delta = div_delta.numpy()
            div_advantage_lst=[]
            advantage_lst = []
            advantage = 0.0
            div_advantage = 0.0
            for delta_t in delta[::-1]:
                advantage = self.gamma * self.lmbda * advantage + delta_t[0]
                advantage_lst.append([advantage])
            advantage_lst.reverse()
            for delta_t in div_delta[::-1]:
                div_advantage = self.gamma * self.lmbda * div_advantage + delta_t[0]
                div_advantage_lst.append([div_advantage])
            div_advantage_lst.reverse()
            advantage = torch.tensor(advantage_lst, dtype=torch.float)
            div_advantage = torch.tensor(div_advantage_lst, dtype=torch.float)
            data_with_adv.append((s, a, r, rnd_r, s_prime, done_mask, old_log_prob, td_target, div_td_target, advantage, div_advantage))

        return data_with_adv


    def train_net(self):
        if len(self.data) == self.minibatch_size * self.buffer_size:
            data = self.make_batch()
            data = self.calc_advantage(data)

            for i in range(self.K_epoch):
                for mini_batch in data:
                    s, a, r, rnd_r, s_prime, done_mask, old_log_prob, td_target, div_td_target, advantage, div_advantage = mini_batch

                    mu, std = self.pi(s, softmax_dim=1)
                    dist = Normal(mu, std)
                    log_prob = dist.log_prob(a)
                    ratio = torch.exp(log_prob - old_log_prob)  # a/b == exp(log(a)-log(b))

                    surr1 = ratio * (advantage+div_advantage)
                    surr2 = torch.clamp(ratio, 1-self.eps_clip, 1+self.eps_clip) * (advantage+div_advantage)
                    loss = -torch.min(surr1, surr2) + (self.v(s) - td_target).pow(2).mean() + (self.dv(s)- div_td_target).pow(2).mean()
                    idx = torch.randint(0, s.size()[0], size=((s.size()[0])//8,))
                    # print(idx)
                    # print(idx.shape)

                    with torch.no_grad():
                      target_output = self.target_network(s_prime[idx])
                    predicted_output = self.prediction_network(s_prime[idx])
                    # print(target_output.shape)
                    # print(predicted_output.shape)
                    rnd_loss = (target_output - predicted_output).pow(2).mean()
                    # loss += self.entropy_coeff * dist.entropy()
                    loss += rnd_loss
                    self.optimizer.zero_grad()
                    loss.mean().backward()
                    nn.utils.clip_grad_norm_(self.parameters(), 1.0)
                    self.optimizer.step()
                    self.optimization_step += 1

def main():
    env = gym.make('Pendulum-v1')
    model = PPO(config)
    score, div_score, best_ext_score = 0.0, 0.0, -10000

    print_interval = 20
    rollout = []
    writer = SummaryWriter(log_dir=f'runs/PPO')

    for n_epi in range(10000):
        s, _ = env.reset()
        done = False
        count = 0
        episodic_score, episodic_int_score = 0.0, 0.0
        while count < 200 and not done:
            for t in range(model.rollout_len):
                mu, std = model.pi(torch.from_numpy(s).float())
                dist = Normal(mu, std)
                a = dist.sample()
                log_prob = dist.log_prob(a)
                s_prime, r, done, truncated, info = env.step([a.item()])
                with torch.no_grad():
                  int_r = (model.target_network(torch.from_numpy(s_prime).float())- model.prediction_network(torch.from_numpy(s_prime).float())).pow(2).mean()
                # print(int_r.item())
                # print(r)
                rollout.append((s, a, (r/100.0, int_r.item()*10), s_prime, log_prob.item(), done))
                if len(rollout) == model.rollout_len:
                    model.put_data(rollout)
                    rollout = []

                s = s_prime
                score += r
                episodic_score+=r
                div_score += int_r.item()
                episodic_int_score+=int_r.item()
                count += 1

            model.train_net()
        writer.add_scalar("Extrinsic Reward", episodic_score/count, n_epi)
        writer.add_scalar("Intrinsic_Reward", episodic_int_score/count, n_epi)

        if n_epi%print_interval==0 and n_epi!=0:
            cur_ext_score = score/print_interval
            cur_div_score = div_score/print_interval
            print("# of episode :{}, avg score : {:.1f}, , adv_div_score: {:.4f}, optmization step: {}".format(n_epi, cur_ext_score, cur_div_score, model.optimization_step))
            score = 0.0
            div_score = 0.0
        if n_epi > 100 and cur_ext_score > best_ext_score:
            torch.save(model, "./best_model.pt")
            best_ext_score = cur_ext_score
            print(f"new model saved. current best score: {best_ext_score} ")
        if n_epi%100==0:
            torch.save(model, "./"+ str(n_epi)+"_model.pt")
    env.close()

if __name__ == '__main__':
    main()

In [41]:
def play(model):

    env = gym.make('Pendulum-v1', render_mode = "rgb_array")
    env = rl.RenderFrame(env, "./output")

    obs, info = env.reset(seed=42)
    r= 0.0

    while True:

        a, std = model.pi(torch.from_numpy(obs).float())
        
        obs, reward, terminated, truncated, info = env.step([a.item()])
        r+=reward
        if terminated or truncated:
            print(r)
            break

    env.play()

In [None]:
model = torch.load("best_model.pt")
play(model)