### 1. REINFORCE

In [1]:
import gymnasium as gym
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical

#Hyperparameters
learning_rate = 0.0002
gamma         = 0.98

class Policy(nn.Module):
    def __init__(self):
        super(Policy, self).__init__()
        self.data = [] # stores rewards nd action probabilities for policy gradient updates
        
        self.fc1 = nn.Linear(4, 128) # 4 input neurons CartPole state size, 128 neurons
        self.fc2 = nn.Linear(128, 2) # 128 neurons and 2 output actions: left or right
        self.optimizer = optim.Adam(self.parameters(), lr=learning_rate) # optimizer for training the policy network
        
    def forward(self, x):
        x = F.relu(self.fc1(x)) # introduces non-linearity
        x = F.softmax(self.fc2(x), dim=0) # converts outputs into probability distributions over actions
        return x
      
    def put_data(self, item):
        self.data.append(item)
        
    def train_net(self):
        R = 0 # cumulative reward
        self.optimizer.zero_grad() # reset gradients
        for r, prob in self.data[::-1]:
            R = r + gamma * R # discounted reward
            loss = -torch.log(prob) * R # maximizes probability of good actions (higher rewards); minimizes p of bad actions
            loss.backward()   # computes gradients
        self.optimizer.step() # updates policy network
        self.data = [] 

def main():
    env = gym.make('CartPole-v1')
    pi = Policy()
    score = 0.0
    print_interval = 20
    
    
    for n_epi in range(500):
        s, _ = env.reset()
        done = False
        
        while not done: # CartPole-v1 forced to terminates at 500 step.
            prob = pi(torch.from_numpy(s).float()) # compute action probabilities
            m = Categorical(prob) # create a probability distribution
            a = m.sample() # sample an action
            s_prime, r, done, truncated, info = env.step(a.item()) # take action in env
            pi.put_data((r,prob[a])) # store reward and action probability
            s = s_prime # move to the next state
            score += r # accumulate score
            
        pi.train_net()
        
        if n_epi%print_interval==0 and n_epi!=0:
            print("# of episode :{}, avg score : {}".format(n_epi, score/print_interval))
            score = 0.0
    env.close()
    
if __name__ == '__main__':
    main()

# of episode :20, avg score : 25.65
# of episode :40, avg score : 20.45
# of episode :60, avg score : 22.05
# of episode :80, avg score : 19.55
# of episode :100, avg score : 23.2
# of episode :120, avg score : 23.35
# of episode :140, avg score : 26.05
# of episode :160, avg score : 30.65
# of episode :180, avg score : 26.95
# of episode :200, avg score : 29.3
# of episode :220, avg score : 32.7
# of episode :240, avg score : 41.45
# of episode :260, avg score : 36.8
# of episode :280, avg score : 40.1
# of episode :300, avg score : 42.0
# of episode :320, avg score : 36.9
# of episode :340, avg score : 30.95
# of episode :360, avg score : 40.25
# of episode :380, avg score : 35.7
# of episode :400, avg score : 40.05
# of episode :420, avg score : 54.0
# of episode :440, avg score : 40.2
# of episode :460, avg score : 43.65
# of episode :480, avg score : 51.9


### 2. Vallina Actor-Critic

In [2]:
import gymnasium as gym 
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical

#Hyperparameters
learning_rate = 0.0002
gamma         = 0.98
n_rollout     = 10 # number of steps to collect before updating the model

class ActorCritic(nn.Module):
    def __init__(self):
        super(ActorCritic, self).__init__()
        self.data = []
        
        self.fc1 = nn.Linear(4,256) # shared feature extraction layer 
        self.fc_pi = nn.Linear(256,2) # actor network, outputs action probabilities
        self.fc_v = nn.Linear(256,1) # critic network, outputs single state-value (V(s))
        self.optimizer = optim.Adam(self.parameters(), lr=learning_rate)
        
    def pi(self, x, softmax_dim = 0):
        x = F.relu(self.fc1(x)) # pass through shared network
        x = self.fc_pi(x) # get action logits
        prob = F.softmax(x, dim=softmax_dim) # convert to probabilities
        return prob
    
    def v(self, x): # critic network (value function)
        x = F.relu(self.fc1(x)) # pass through shared network 
        v = self.fc_v(x) # output value estimate
        return v
    
    def put_data(self, transition): 
        self.data.append(transition) # store (state, action, reward, next state, done)
        
    def make_batch(self): # converts stored data into mini-batches for training
        s_lst, a_lst, r_lst, s_prime_lst, done_lst = [], [], [], [], []
        for transition in self.data:
            s,a,r,s_prime,done = transition
            s_lst.append(s)
            a_lst.append([a])
            r_lst.append([r/100.0]) # normalize reward
            s_prime_lst.append(s_prime) 
            done_mask = 0.0 if done else 1.0 # Done flag; helps bootstrapping
            done_lst.append([done_mask])

        # convert list to PyTorch tensors
        s_batch, a_batch, r_batch, s_prime_batch, done_batch = torch.tensor(s_lst, dtype=torch.float), torch.tensor(a_lst), \
                                                               torch.tensor(r_lst, dtype=torch.float), torch.tensor(s_prime_lst, dtype=torch.float), \
                                                               torch.tensor(done_lst, dtype=torch.float)
        self.data = []
        return s_batch, a_batch, r_batch, s_prime_batch, done_batch
  
    def train_net(self):
        s, a, r, s_prime, done = self.make_batch() 

        # compute TD target : R + γ * V(s')
        td_target = r + gamma * self.v(s_prime) * done  # bootstraps future rewards
        delta = td_target - self.v(s) # compute TD error (target-value)

        # compute policy loss
        pi = self.pi(s, softmax_dim=1) # Compute action probabilities
        pi_a = pi.gather(1,a) # Select probabilities corresponding to actions taken
        loss = -torch.log(pi_a) * delta.detach() # updates policy based on TD error
        loss += F.smooth_l1_loss(self.v(s), td_target.detach()) # critic loss: trains value function

        self.optimizer.zero_grad()
        loss.mean().backward()
        self.optimizer.step()         
      
def main():  
    env = gym.make('CartPole-v1')
    model = ActorCritic()    
    print_interval = 20
    score = 0.0

    for n_epi in range(500):
        done = False
        s, _ = env.reset()
        while not done:
            for t in range(n_rollout): # collect experiences
                prob = model.pi(torch.from_numpy(s).float()) # get action probabilities
                m = Categorical(prob) 
                a = m.sample().item() # sample an action

                # take action
                s_prime, r, done, truncated, info = env.step(a)

                # store transition
                model.put_data((s,a,r,s_prime,done))
                
                s = s_prime
                score += r
                
                if done:
                    break                     
            
            model.train_net() # train after "n_rollout" steps
            
        if n_epi%print_interval==0 and n_epi!=0:
            print("# of episode :{}, avg score : {:.1f}".format(n_epi, score/print_interval))
            score = 0.0
    env.close()

if __name__ == '__main__':
    main()

  s_batch, a_batch, r_batch, s_prime_batch, done_batch = torch.tensor(s_lst, dtype=torch.float), torch.tensor(a_lst), \


# of episode :20, avg score : 22.6
# of episode :40, avg score : 19.6
# of episode :60, avg score : 30.0
# of episode :80, avg score : 28.9
# of episode :100, avg score : 23.8
# of episode :120, avg score : 32.1
# of episode :140, avg score : 43.5
# of episode :160, avg score : 50.4
# of episode :180, avg score : 52.4
# of episode :200, avg score : 52.6
# of episode :220, avg score : 69.5
# of episode :240, avg score : 75.3
# of episode :260, avg score : 113.3
# of episode :280, avg score : 79.4
# of episode :300, avg score : 121.5
# of episode :320, avg score : 148.8
# of episode :340, avg score : 183.7
# of episode :360, avg score : 197.5
# of episode :380, avg score : 138.8
# of episode :400, avg score : 198.3
# of episode :420, avg score : 277.3
# of episode :440, avg score : 320.6
# of episode :460, avg score : 232.3
# of episode :480, avg score : 273.6


### 3. DQN

In [3]:
import gymnasium as gym
import collections
import random

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

#Hyperparameters
learning_rate = 0.0005
gamma         = 0.98
buffer_limit  = 5000 # maximum size of the replay buffer
batch_size    = 32 # number of samples per training iteration

# stores past experiences (s,a,r,s'done_mask) in a fixed-sized buffer
# breaks correlation between consecutive experiences
# allows mini-batch training, improving sample efficiency
class ReplayBuffer():
    def __init__(self):
        self.buffer = collections.deque(maxlen=buffer_limit)
    # put a new experience to the buffer
    def put(self, transition):
        self.buffer.append(transition)
    # randomly samples n transitions from the buffer
    def sample(self, n):
        mini_batch = random.sample(self.buffer, n)
        s_lst, a_lst, r_lst, s_prime_lst, done_mask_lst = [], [], [], [], []
        
        for transition in mini_batch:
            s, a, r, s_prime, done_mask = transition
            s_lst.append(s)
            a_lst.append([a])
            r_lst.append([r])
            s_prime_lst.append(s_prime)
            done_mask_lst.append([done_mask])

        return torch.tensor(s_lst, dtype=torch.float), torch.tensor(a_lst), \
               torch.tensor(r_lst), torch.tensor(s_prime_lst, dtype=torch.float), \
               torch.tensor(done_mask_lst)
    # returns the current buffer size
    def size(self):
        return len(self.buffer)

# Deep Q-Network DQN
class Qnet(nn.Module):
    def __init__(self):
        super(Qnet, self).__init__()
        self.fc1 = nn.Linear(4, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, 2)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x
    # uses epsilon greedy Policy
    # with probability epsilon, selects a random action (exploration)
    # otherwise, selects the best action using argmax(Q(s,a)) (exploitation)
    def sample_action(self, obs, epsilon):
        out = self.forward(obs)
        coin = random.random()
        if coin < epsilon:
            return random.randint(0,1)
        else : 
            return out.argmax().item()
            
def train(q, q_target, memory, optimizer):
    for i in range(10):
        s,a,r,s_prime,done_mask = memory.sample(batch_size)

        q_out = q(s) # compute Q-values for all actions in state s
        q_a = q_out.gather(1,a) # select Q-values corresponding to actions taken 
        max_q_prime = q_target(s_prime).max(1)[0].unsqueeze(1) # computes the target Q-value (max Q(s', a'))
        target = r + gamma * max_q_prime * done_mask #Bellman equation: Computes the expected Q-value.
        loss = F.smooth_l1_loss(q_a, target) # Uses Huber loss to stabilize training.
         
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

def main():
    env = gym.make('CartPole-v1')
    q = Qnet() # online Q-network
    q_target = Qnet() # target Q-network
    q_target.load_state_dict(q.state_dict()) # initilize target network
    memory = ReplayBuffer()

    print_interval = 20
    score = 0.0  
    optimizer = optim.Adam(q.parameters(), lr=learning_rate)

    for n_epi in range(500):
        epsilon = max(0.01, 0.08 - 0.01*(n_epi/200)) #Linear annealing from 8% to 1%
        s, _ = env.reset()
        done = False

        while not done:
            a = q.sample_action(torch.from_numpy(s).float(), epsilon) # randomly selects action  
            s_prime, r, done, truncated, info = env.step(a) # take action
            done_mask = 0.0 if done else 1.0
            memory.put((s,a,r/100.0,s_prime, done_mask)) # store info
            s = s_prime

            score += r
            if done:
                break
            
        if memory.size()>2000:
            train(q, q_target, memory, optimizer)

        if n_epi%print_interval==0 and n_epi!=0:
            q_target.load_state_dict(q.state_dict())
            print("n_episode :{}, score : {:.1f}, n_buffer : {}, eps : {:.1f}%".format(
                                                            n_epi, score/print_interval, memory.size(), epsilon*100))
            score = 0.0
    env.close()

if __name__ == '__main__':
    main()

n_episode :20, score : 14.8, n_buffer : 296, eps : 7.9%
n_episode :40, score : 12.1, n_buffer : 537, eps : 7.8%
n_episode :60, score : 13.4, n_buffer : 806, eps : 7.7%
n_episode :80, score : 13.1, n_buffer : 1068, eps : 7.6%
n_episode :100, score : 17.1, n_buffer : 1409, eps : 7.5%
n_episode :120, score : 16.1, n_buffer : 1731, eps : 7.4%
n_episode :140, score : 13.2, n_buffer : 1995, eps : 7.3%
n_episode :160, score : 14.3, n_buffer : 2282, eps : 7.2%
n_episode :180, score : 26.2, n_buffer : 2807, eps : 7.1%
n_episode :200, score : 21.3, n_buffer : 3233, eps : 7.0%
n_episode :220, score : 24.1, n_buffer : 3715, eps : 6.9%
n_episode :240, score : 14.6, n_buffer : 4006, eps : 6.8%
n_episode :260, score : 15.0, n_buffer : 4306, eps : 6.7%
n_episode :280, score : 20.2, n_buffer : 4710, eps : 6.6%
n_episode :300, score : 15.4, n_buffer : 5000, eps : 6.5%
n_episode :320, score : 28.4, n_buffer : 5000, eps : 6.4%
n_episode :340, score : 23.8, n_buffer : 5000, eps : 6.3%
n_episode :360, score

### 4. PPO (Proximal Policy Gradient)

In [5]:
import gymnasium as gym 
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical

#Hyperparameters
learning_rate = 0.0005
gamma         = 0.98
lmbda         = 0.95 # GAE (generalized advantage estimation) decay factor
eps_clip      = 0.1 # clipping parameter for PPO (prevents large updates)
K_epoch       = 3 # number of PPO updates per batch
T_horizon     = 10 # number of steps collected before training

class PPO(nn.Module):
    def __init__(self):
        super(PPO, self).__init__()
        self.data = []
        
        self.fc1   = nn.Linear(4,256) 
        self.fc_pi = nn.Linear(256,2) # actor network
        self.fc_v  = nn.Linear(256,1) # critic network
        self.optimizer = optim.Adam(self.parameters(), lr=learning_rate) 

    # policy (actor)
    def pi(self, x, softmax_dim = 0):
        x = F.relu(self.fc1(x))
        x = self.fc_pi(x)
        prob = F.softmax(x, dim=softmax_dim)
        return prob

    # value (critic)
    def v(self, x):
        x = F.relu(self.fc1(x))
        v = self.fc_v(x)
        return v

    # stores tuples (state, action, reward, next_state, action_prob, done_flag)
    def put_data(self, transition):
        self.data.append(transition)
        
    def make_batch(self):
        s_lst, a_lst, r_lst, s_prime_lst, prob_a_lst, done_lst = [], [], [], [], [], []
        for transition in self.data:
            s, a, r, s_prime, prob_a, done = transition
            # convert tuple into PyTorch Tensors
            s_lst.append(s)
            a_lst.append([a])
            r_lst.append([r])
            s_prime_lst.append(s_prime)
            prob_a_lst.append([prob_a])
            done_mask = 0 if done else 1
            done_lst.append([done_mask])
            
        s,a,r,s_prime,done_mask, prob_a = torch.tensor(s_lst, dtype=torch.float), torch.tensor(a_lst), \
                                          torch.tensor(r_lst), torch.tensor(s_prime_lst, dtype=torch.float), \
                                          torch.tensor(done_lst, dtype=torch.float), torch.tensor(prob_a_lst)
        self.data = []
        return s, a, r, s_prime, done_mask, prob_a
        
    def train_net(self):
        s, a, r, s_prime, done_mask, prob_a = self.make_batch()

        for i in range(K_epoch): # PPO update steps
            td_target = r + gamma * self.v(s_prime) * done_mask # estimates expected return
            delta = td_target - self.v(s) # TD error
            delta = delta.detach().numpy()

            # Generalized Advantage Estimation
            advantage_lst = []
            advantage = 0.0
            for delta_t in delta[::-1]: # reverse iterate through batch
                advantage = gamma * lmbda * advantage + delta_t[0] # computes advantage function using GAE
                advantage_lst.append([advantage])
            advantage_lst.reverse()
            advantage = torch.tensor(advantage_lst, dtype=torch.float)

            # computing the PPO loss
            pi = self.pi(s, softmax_dim=1)
            pi_a = pi.gather(1,a)
            # pi_a / prob_a : measures how much the new policy differs from the old policy
            ratio = torch.exp(torch.log(pi_a) - torch.log(prob_a))  # a/b == exp(log(a)-log(b))
            
            surr1 = ratio * advantage
            # clipping prevents large policy updates (stablizes learning)
            surr2 = torch.clamp(ratio, 1-eps_clip, 1+eps_clip) * advantage
            # implements PPO clipped surrogate objective
            loss = -torch.min(surr1, surr2) + F.smooth_l1_loss(self.v(s) , td_target.detach())

            # computates gradients and updates parameters
            self.optimizer.zero_grad()
            loss.mean().backward()
            self.optimizer.step()
        
def main():
    env = gym.make('CartPole-v1')
    model = PPO()
    score = 0.0
    print_interval = 20

    for n_epi in range(500):
        s, _ = env.reset()
        done = False
        while not done:
            for t in range(T_horizon):
                prob = model.pi(torch.from_numpy(s).float())
                m = Categorical(prob) # selects actions using a stochastic policy
                a = m.sample().item()
                s_prime, r, done, truncated, info = env.step(a)

                model.put_data((s, a, r/100.0, s_prime, prob[a].item(), done))
                s = s_prime

                score += r
                if done:
                    break

            model.train_net()

        if n_epi%print_interval==0 and n_epi!=0:
            print("# of episode :{}, avg score : {:.1f}".format(n_epi, score/print_interval))
            score = 0.0

    env.close()

if __name__ == '__main__':
    main()

# of episode :20, avg score : 44.7
# of episode :40, avg score : 53.0
# of episode :60, avg score : 63.8
# of episode :80, avg score : 96.5
# of episode :100, avg score : 55.7
# of episode :120, avg score : 75.8
# of episode :140, avg score : 191.4
# of episode :160, avg score : 186.7
# of episode :180, avg score : 288.0
# of episode :200, avg score : 232.9
# of episode :220, avg score : 129.1
# of episode :240, avg score : 109.8
# of episode :260, avg score : 349.1
# of episode :280, avg score : 233.1
# of episode :300, avg score : 297.1
# of episode :320, avg score : 366.1
# of episode :340, avg score : 814.8
# of episode :360, avg score : 389.9
# of episode :380, avg score : 105.7
# of episode :400, avg score : 1718.3
# of episode :420, avg score : 58.4
# of episode :440, avg score : 60.4
# of episode :460, avg score : 83.4
# of episode :480, avg score : 86.2


### 5. DDPG (Deep Deterministic Policy Gradient)

In [4]:
import gymnasium as gym
import random
import collections
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

#Hyperparameters
lr_mu        = 0.0005
lr_q         = 0.001
gamma        = 0.99
batch_size   = 32
buffer_limit = 5000
tau          = 0.005 # for target network soft update

class ReplayBuffer():
    def __init__(self):
        self.buffer = collections.deque(maxlen=buffer_limit)

    def put(self, transition):
        self.buffer.append(transition)
    
    def sample(self, n):
        mini_batch = random.sample(self.buffer, n)
        s_lst, a_lst, r_lst, s_prime_lst, done_mask_lst = [], [], [], [], []

        for transition in mini_batch:
            s, a, r, s_prime, done = transition
            s_lst.append(s)
            a_lst.append([a])
            r_lst.append([r])
            s_prime_lst.append(s_prime)
            done_mask = 0.0 if done else 1.0 
            done_mask_lst.append([done_mask])
        
        return torch.tensor(s_lst, dtype=torch.float), torch.tensor(a_lst, dtype=torch.float), \
                torch.tensor(r_lst, dtype=torch.float), torch.tensor(s_prime_lst, dtype=torch.float), \
                torch.tensor(done_mask_lst, dtype=torch.float)
    
    def size(self):
        return len(self.buffer)

# actor network
class MuNet(nn.Module):
    def __init__(self):
        super(MuNet, self).__init__()
        self.fc1 = nn.Linear(3, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc_mu = nn.Linear(64, 1)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        mu = torch.tanh(self.fc_mu(x))*2 # Multipled by 2 because the action space of the Pendulum-v0 is [-2,2]
        return mu

# critic network (Q-network)
class QNet(nn.Module):
    def __init__(self):
        super(QNet, self).__init__()
        self.fc_s = nn.Linear(3, 64)
        self.fc_a = nn.Linear(1,64)
        self.fc_q = nn.Linear(128, 32)
        self.fc_out = nn.Linear(32,1)

    def forward(self, x, a):
        h1 = F.relu(self.fc_s(x))
        h2 = F.relu(self.fc_a(a))
        cat = torch.cat([h1,h2], dim=1)
        q = F.relu(self.fc_q(cat))
        q = self.fc_out(q)
        return q

# exploration strategy: Ornstein-Uhlenbeck Noise
# Handles exploration in continuous action spaces.
# Ornstein-Uhlenbeck Noise introduces correlated noise, useful for smooth actions.

class OrnsteinUhlenbeckNoise:
    def __init__(self, mu):
        self.theta, self.dt, self.sigma = 0.1, 0.01, 0.1
        self.mu = mu
        self.x_prev = np.zeros_like(self.mu)

    def __call__(self):
        x = self.x_prev + self.theta * (self.mu - self.x_prev) * self.dt + \
                self.sigma * np.sqrt(self.dt) * np.random.normal(size=self.mu.shape)
        self.x_prev = x
        return x

# updating networks
def train(mu, mu_target, q, q_target, memory, q_optimizer, mu_optimizer):
    s,a,r,s_prime,done_mask  = memory.sample(batch_size)

    # critic loss
    target = r + gamma * q_target(s_prime, mu_target(s_prime)) * done_mask
    q_loss = F.smooth_l1_loss(q(s,a), target.detach())
    q_optimizer.zero_grad()
    q_loss.backward()
    q_optimizer.step()

    # actor loss
    mu_loss = -q(s,mu(s)).mean() # maximize expected Q-value
    mu_optimizer.zero_grad()
    mu_loss.backward()
    mu_optimizer.step()

# target network soft update
# prevents dramatic Q-value changes
def soft_update(net, net_target):
    for param_target, param in zip(net_target.parameters(), net.parameters()):
        param_target.data.copy_(param_target.data * (1.0 - tau) + param.data * tau)
    
def main():
    # a continuous action env
    env = gym.make('Pendulum-v1', max_episode_steps=50, autoreset=True)
    memory = ReplayBuffer()

    q, q_target = QNet(), QNet()
    q_target.load_state_dict(q.state_dict())
    mu, mu_target = MuNet(), MuNet()
    mu_target.load_state_dict(mu.state_dict())

    score = 0.0
    print_interval = 10

    mu_optimizer = optim.Adam(mu.parameters(), lr=lr_mu)
    q_optimizer  = optim.Adam(q.parameters(), lr=lr_q)
    ou_noise = OrnsteinUhlenbeckNoise(mu=np.zeros(1))

    for n_epi in range(500):
        s, _ = env.reset()
        done = False

        count = 0
        while count < 200 and not done:
            a = mu(torch.from_numpy(s).float())  # take an action using mu, adds noise for exploration
            a = a.item() + ou_noise()[0] # add exploration noise 
            s_prime, r, done, truncated, info = env.step([a]) # replay buffer
            memory.put((s,a,r/100.0,s_prime,done))
            score +=r
            s = s_prime
            count += 1
        # training and soft update
        # trains on a batch of samples from the buffer
        # performs soft updates on both actor and critic target networks
        if memory.size()>1000:
            for i in range(10):
                train(mu, mu_target, q, q_target, memory, q_optimizer, mu_optimizer)
                soft_update(mu, mu_target)
                soft_update(q,  q_target)
        
        if n_epi%print_interval==0 and n_epi!=0:
            print("# of episode :{}, avg score : {:.1f}".format(n_epi, score/print_interval))
            score = 0.0

    env.close()

if __name__ == '__main__':
    main()

# Quick summary
# replay buffer -> stores past experiences for training
# actor network (mu) -> maps states to continuous actions
# critic network (Q) -> evaluates state-action pairs
# Ornstein-Uhlenbeck Noise -> smooths exploration in continuous space 
# target networks (mu_target, Q_target) -> improves stability, prevents divergence
# soft target updates -> gradually updates target networks (\tau-weighted average)
# policy gradient update -> maximizes expected Q-values
# critic update -> uses bellman equation to estimate q-values

TypeError: PendulumEnv.__init__() got an unexpected keyword argument 'autoreset' was raised from the environment creator for Pendulum-v1 with kwargs ({'autoreset': True})

### 6. A3C 

In [5]:
import gymnasium as gym
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical
import torch.multiprocessing as mp
import time

# Hyperparameters
n_train_processes = 3
learning_rate = 0.0002
update_interval = 5
gamma = 0.98
max_train_ep = 100
max_test_ep = 150


class ActorCritic(nn.Module):
    def __init__(self):
        super(ActorCritic, self).__init__()
        self.fc1 = nn.Linear(4, 256)
        self.fc_pi = nn.Linear(256, 2)
        self.fc_v = nn.Linear(256, 1)

    def pi(self, x, softmax_dim=0):
        x = F.relu(self.fc1(x))
        x = self.fc_pi(x)
        prob = F.softmax(x, dim=softmax_dim)
        return prob

    def v(self, x):
        x = F.relu(self.fc1(x))
        v = self.fc_v(x)
        return v

# training multiple processes in parallel
def train(global_model, rank):
    local_model = ActorCritic() # copies global model into its local model
    local_model.load_state_dict(global_model.state_dict())

    optimizer = optim.Adam(global_model.parameters(), lr=learning_rate)

    env = gym.make('CartPole-v1')

    # collecting trajectories (experience replay)
    for n_epi in range(max_train_ep):
        done = False
        s = env.reset()
        while not done:
            s_lst, a_lst, r_lst = [], [], []
            for t in range(update_interval):
                prob = local_model.pi(torch.from_numpy(s).float())
                m = Categorical(prob) # selects an action stochastically
                a = m.sample().item()
                s_prime, r, done, info = env.step(a)

                s_lst.append(s)
                a_lst.append([a])
                r_lst.append(r/100.0)

                s = s_prime
                if done:
                    break

            # computes the advantage function
            s_final = torch.tensor(s_prime, dtype=torch.float)
            R = 0.0 if done else local_model.v(s_final).item()
            td_target_lst = []
            for reward in r_lst[::-1]:
                R = gamma * R + reward
                td_target_lst.append([R])
            td_target_lst.reverse()

            # advantage function: 
            # measures how much better an action is compared to the baseline (state value estimate)
            s_batch, a_batch, td_target = torch.tensor(s_lst, dtype=torch.float), torch.tensor(a_lst), \
                torch.tensor(td_target_lst)
            advantage = td_target - local_model.v(s_batch) # TD target - V(s)

            # policy gradient loss
            pi = local_model.pi(s_batch, softmax_dim=1)
            pi_a = pi.gather(1, a_batch)
            loss = -torch.log(pi_a) * advantage.detach() + \
                F.smooth_l1_loss(local_model.v(s_batch), td_target.detach()) # stablize training

            # updating global model
            optimizer.zero_grad()
            loss.mean().backward()
            for global_param, local_param in zip(global_model.parameters(), local_model.parameters()):
                global_param._grad = local_param.grad
            optimizer.step()
            local_model.load_state_dict(global_model.state_dict())

    env.close()
    print("Training process {} reached maximum episode.".format(rank))


def test(global_model):
    env = gym.make('CartPole-v1')
    score = 0.0
    print_interval = 20

    for n_epi in range(max_test_ep):
        done = False
        s = env.reset()
        while not done:
            prob = global_model.pi(torch.from_numpy(s).float())
            a = Categorical(prob).sample().item()
            s_prime, r, done, info = env.step(a)
            s = s_prime
            score += r

        if n_epi % print_interval == 0 and n_epi != 0:
            print("# of episode :{}, avg score : {:.1f}".format(
                n_epi, score/print_interval))
            score = 0.0
            time.sleep(1)
    env.close()


if __name__ == '__main__':
    # the global model is shared across multiple training processes 
    # each process runs independently, collecting experience and updating the global model
    global_model = ActorCritic()
    global_model.share_memory()

    processes = []
    # spawns multiple processes, n_train_processes for training, 1 for testing
    for rank in range(n_train_processes + 1):  # + 1 for test process
        if rank == 0:
            p = mp.Process(target=test, args=(global_model,))
        else:
            p = mp.Process(target=train, args=(global_model, rank,))
        p.start()
        processes.append(p)
    # each process operates asynchronously and updates the global model
    for p in processes:
        p.join()

### 7. ACER

In [8]:
import gymnasium as gym
import random
import collections
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical

# Characteristics
# combines value-based learning and policy optimization with off-policy correction. 
# It is designed for discrete action spaces and operates in a single-threaded env without trust-region updates

# 1. Discrete action space, single thread version.
# 2. Does not support trust-region updates.

#Hyperparameters
learning_rate = 0.0002
gamma         = 0.98
buffer_limit  = 6000  
rollout_len   = 10   
batch_size    = 4     # Indicates 4 sequences per mini-batch (4*rollout_len = 40 samples total)
c             = 1.0   # For truncating importance sampling ratio


# stores trajectories rather than single transitions
class ReplayBuffer():
    def __init__(self):
        self.buffer = collections.deque(maxlen=buffer_limit)

    def put(self, seq_data):
        self.buffer.append(seq_data)
    
    def sample(self, on_policy=False):
        if on_policy: # use the most recent trajectory
            mini_batch = [self.buffer[-1]]
        else: # samples randomly from the replay buffer
            mini_batch = random.sample(self.buffer, batch_size)

        s_lst, a_lst, r_lst, prob_lst, done_lst, is_first_lst = [], [], [], [], [], []
        for seq in mini_batch:
            is_first = True  # Flag for indicating whether the transition is the first item from a sequence
            for transition in seq:
                s, a, r, prob, done = transition

                s_lst.append(s)
                a_lst.append([a])
                r_lst.append(r)
                prob_lst.append(prob)
                done_mask = 0.0 if done else 1.0
                done_lst.append(done_mask)
                is_first_lst.append(is_first)
                is_first = False

        s,a,r,prob,done_mask,is_first = torch.tensor(s_lst, dtype=torch.float), torch.tensor(a_lst), \
                                        r_lst, torch.tensor(prob_lst, dtype=torch.float), done_lst, \
                                        is_first_lst
        return s,a,r,prob,done_mask,is_first
    
    def size(self):
        return len(self.buffer)
      
class ActorCritic(nn.Module):
    def __init__(self):
        super(ActorCritic, self).__init__()
        self.fc1 = nn.Linear(4,256)
        self.fc_pi = nn.Linear(256,2)
        self.fc_q = nn.Linear(256,2)
        
    def pi(self, x, softmax_dim = 0):
        x = F.relu(self.fc1(x))
        x = self.fc_pi(x)
        pi = F.softmax(x, dim=softmax_dim) # probabilities of actions
        return pi
    
    def q(self, x):
        x = F.relu(self.fc1(x))
        q = self.fc_q(x) # q-values for each action
        return q
      
def train(model, optimizer, memory, on_policy=False):
    s,a,r,prob,done_mask,is_first = memory.sample(on_policy)
    
    q = model.q(s)
    q_a = q.gather(1,a)
    pi = model.pi(s, softmax_dim = 1)
    pi_a = pi.gather(1,a) # computes current policy probabilities for chosen actions
    v = (q * pi).sum(1).unsqueeze(1).detach() # approximates V(s)
    
    rho = pi.detach()/prob # measure how much the new policy differs from behavior policy
    rho_a = rho.gather(1,a)
    rho_bar = rho_a.clamp(max=c)
    # clipped importance sampling: limits extreme updates for stability
    correction_coeff = (1-c/rho).clamp(min=0) 

    q_ret = v[-1] * done_mask[-1]
    q_ret_lst = []
    for i in reversed(range(len(r))):
        q_ret = r[i] + gamma * q_ret
        q_ret_lst.append(q_ret.item())
        q_ret = rho_bar[i] * (q_ret - q_a[i]) + v[i] # retrace (lambda) equation
        
        if is_first[i] and i!=0:
            q_ret = v[i-1] * done_mask[i-1] # When a new sequence begins, q_ret is initialized  

    # loss function
    q_ret_lst.reverse()
    q_ret = torch.tensor(q_ret_lst, dtype=torch.float).unsqueeze(1)
    # converts retraced q-value into a tensor for loss computation
    loss1 = -rho_bar * torch.log(pi_a) * (q_ret - v) 
    loss2 = -correction_coeff * pi.detach() * torch.log(pi) * (q.detach()-v) # bias correction term
    loss = loss1 + loss2.sum(1) + F.smooth_l1_loss(q_a, q_ret)

    # loss breakdown
    # policy loss loss 1 ; uses truncated importance sampling
    # bias correction term loss 2: adjusts for off-policy bias 
    # critic loss : ensures q-value match retrace estimates
    
    # update model
    optimizer.zero_grad()
    loss.mean().backward()
    optimizer.step()
        
def main():
    env = gym.make('CartPole-v1')
    memory = ReplayBuffer()
    model = ActorCritic()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    score = 0.0
    print_interval = 20    

    for n_epi in range(500):
        s, _ = env.reset()
        done = False
        
        while not done:
            seq_data = []
            for t in range(rollout_len): 
                prob = model.pi(torch.from_numpy(s).float())
                a = Categorical(prob).sample().item()
                s_prime, r, done, truncated, info = env.step(a)
                seq_data.append((s, a, r/100.0, prob.detach().numpy(), done))

                score +=r
                s = s_prime
                if done:
                    break
                    
            memory.put(seq_data)
            if memory.size()>100:
                train(model, optimizer, memory, on_policy=True)
                train(model, optimizer, memory)
        
        if n_epi%print_interval==0 and n_epi!=0:
            print("# of episode :{}, avg score : {:.1f}, buffer size : {}".format(n_epi, score/print_interval, memory.size()))
            score = 0.0

    env.close()

if __name__ == '__main__':
    main()

# of episode :20, avg score : 23.3, buffer size : 56
# of episode :40, avg score : 27.2, buffer size : 120
# of episode :60, avg score : 34.0, buffer size : 195
# of episode :80, avg score : 33.0, buffer size : 270
# of episode :100, avg score : 42.4, buffer size : 363
# of episode :120, avg score : 60.8, buffer size : 494
# of episode :140, avg score : 76.0, buffer size : 656
# of episode :160, avg score : 129.3, buffer size : 925
# of episode :180, avg score : 184.9, buffer size : 1304
# of episode :200, avg score : 270.9, buffer size : 1854
# of episode :220, avg score : 263.9, buffer size : 2389
# of episode :240, avg score : 294.1, buffer size : 2987
# of episode :260, avg score : 280.1, buffer size : 3556
# of episode :280, avg score : 360.4, buffer size : 4286
# of episode :300, avg score : 292.3, buffer size : 4878
# of episode :320, avg score : 305.2, buffer size : 5497
# of episode :340, avg score : 436.3, buffer size : 6000
# of episode :360, avg score : 404.2, buffer size :

### 8. A2C

In [None]:
import gymnasium as gym
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical
import torch.multiprocessing as mp
import numpy as np

# Hyperparameters
n_train_processes = 3
learning_rate = 0.0002
update_interval = 5
gamma = 0.98
max_train_steps = 500
PRINT_INTERVAL = update_interval * 10

class ActorCritic(nn.Module):
    def __init__(self):
        super(ActorCritic, self).__init__()
        self.fc1 = nn.Linear(4, 256)
        self.fc_pi = nn.Linear(256, 2)
        self.fc_v = nn.Linear(256, 1)

    def pi(self, x, softmax_dim=1):
        x = F.relu(self.fc1(x))
        x = self.fc_pi(x)
        prob = F.softmax(x, dim=softmax_dim)
        return prob

    def v(self, x):
        x = F.relu(self.fc1(x))
        v = self.fc_v(x)
        return v

# parallel environment handling
# to speed up training, use multiple environments running in parallel
def worker(worker_id, master_end, worker_end):
    master_end.close()  # close master-end in worker processes
    env = gym.make('CartPole-v1')
    env.seed(worker_id)

    while True:
        cmd, data = worker_end.recv()
        if cmd == 'step':
            ob, reward, done, info = env.step(data)
            if done:
                ob = env.reset()
            worker_end.send((ob, reward, done, info))
        elif cmd == 'reset':
            ob = env.reset()
            worker_end.send(ob)
        elif cmd == 'reset_task':
            ob = env.reset_task()
            worker_end.send(ob)
        elif cmd == 'close':
            worker_end.close()
            break
        elif cmd == 'get_spaces':
            worker_end.send((env.observation_space, env.action_space))
        else:
            raise NotImplementedError

# master process managing multiple workers
class ParallelEnv:
    def __init__(self, n_train_processes):
        self.nenvs = n_train_processes
        self.waiting = False
        self.closed = False
        self.workers = list()
        # use pipe() communication to send/receive data
        master_ends, worker_ends = zip(*[mp.Pipe() for _ in range(self.nenvs)])
        self.master_ends, self.worker_ends = master_ends, worker_ends

        for worker_id, (master_end, worker_end) in enumerate(zip(master_ends, worker_ends)):
            p = mp.Process(target=worker,
                           args=(worker_id, master_end, worker_end))
            p.daemon = True
            p.start()
            self.workers.append(p)

        # Forbid master to use the worker end for messaging
        for worker_end in worker_ends:
            worker_end.close()

    def step_async(self, actions):
        for master_end, action in zip(self.master_ends, actions):
            master_end.send(('step', action))
        self.waiting = True

    def step_wait(self):
        results = [master_end.recv() for master_end in self.master_ends]
        self.waiting = False
        obs, rews, dones, infos = zip(*results)
        return np.stack(obs), np.stack(rews), np.stack(dones), infos

    def reset(self):
        for master_end in self.master_ends:
            master_end.send(('reset', None))
        return np.stack([master_end.recv() for master_end in self.master_ends])

    def step(self, actions):
        self.step_async(actions)
        return self.step_wait()

    def close(self):  # For clean up resources
        if self.closed:
            return
        if self.waiting:
            [master_end.recv() for master_end in self.master_ends]
        for master_end in self.master_ends:
            master_end.send(('close', None))
        for worker in self.workers:
            worker.join()
            self.closed = True

def test(step_idx, model):
    env = gym.make('CartPole-v1')
    score = 0.0
    done = False
    num_test = 10

    for _ in range(num_test):
        s = env.reset()
        while not done:
            prob = model.pi(torch.from_numpy(s).float(), softmax_dim=0)
            a = Categorical(prob).sample().numpy()
            s_prime, r, done, info = env.step(a)
            s = s_prime
            score += r
        done = False
    print(f"Step # :{step_idx}, avg score : {score/num_test:.1f}")

    env.close()

# computing TD target
def compute_target(v_final, r_lst, mask_lst):
    G = v_final.reshape(-1)
    td_target = list()

    for r, mask in zip(r_lst[::-1], mask_lst[::-1]):
        G = r + gamma * G * mask
        td_target.append(G)

    return torch.tensor(td_target[::-1]).float()

if __name__ == '__main__':
    envs = ParallelEnv(n_train_processes)

    model = ActorCritic()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    step_idx = 0
    # experience collection
    s = envs.reset()
    while step_idx < max_train_steps:
        s_lst, a_lst, r_lst, mask_lst = list(), list(), list(), list()
        for _ in range(update_interval):
            prob = model.pi(torch.from_numpy(s).float())
            a = Categorical(prob).sample().numpy()
            s_prime, r, done, info = envs.step(a)

            s_lst.append(s)
            a_lst.append(a)
            r_lst.append(r/100.0)
            mask_lst.append(1 - done)

            s = s_prime
            step_idx += 1

        s_final = torch.from_numpy(s_prime).float()
        v_final = model.v(s_final).detach().clone().numpy()
        td_target = compute_target(v_final, r_lst, mask_lst)

        td_target_vec = td_target.reshape(-1)
        s_vec = torch.tensor(s_lst).float().reshape(-1, 4)  # 4 == Dimension of state
        a_vec = torch.tensor(a_lst).reshape(-1).unsqueeze(1)
        advantage = td_target_vec - model.v(s_vec).reshape(-1) # TD target - estimated state-value

        pi = model.pi(s_vec, softmax_dim=1)
        pi_a = pi.gather(1, a_vec).reshape(-1)
        loss = -(torch.log(pi_a) * advantage.detach()).mean() +\
            F.smooth_l1_loss(model.v(s_vec).reshape(-1), td_target_vec)

        # performs gradient update
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if step_idx % PRINT_INTERVAL == 0:
            test(step_idx, model)

    envs.close()

### 9. SAC 

In [None]:
import gymnasium as gym
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Normal
import numpy as np
import collections, random

# a model-free RL algorithm designed for 
# continuous action spaces
# Key features

# 1. entropy regularization: encourages exploration by maximizing entropy in the policy
# 2. twin q-networks; addresses overestimation bias by training two q-networks and using the minimum Q-value for updates
# 3. target networks; stabilizes training using soft target updates
# 4. automatic temperature tuning : adjusts entropy weight dynamically
#Hyperparameters
lr_pi           = 0.0005
lr_q            = 0.001
init_alpha      = 0.01
gamma           = 0.98
batch_size      = 32
buffer_limit    = 5000
tau             = 0.01 # for target network soft update
target_entropy  = -1.0 # for automated alpha update
lr_alpha        = 0.001  # for automated alpha update

class ReplayBuffer():
    def __init__(self):
        self.buffer = collections.deque(maxlen=buffer_limit)

    def put(self, transition):
        self.buffer.append(transition)
    
    def sample(self, n):
        mini_batch = random.sample(self.buffer, n)
        s_lst, a_lst, r_lst, s_prime_lst, done_mask_lst = [], [], [], [], []

        for transition in mini_batch:
            s, a, r, s_prime, done = transition
            s_lst.append(s)
            a_lst.append([a])
            r_lst.append([r])
            s_prime_lst.append(s_prime)
            done_mask = 0.0 if done else 1.0 
            done_mask_lst.append([done_mask])
        
        return torch.tensor(s_lst, dtype=torch.float), torch.tensor(a_lst, dtype=torch.float), \
                torch.tensor(r_lst, dtype=torch.float), torch.tensor(s_prime_lst, dtype=torch.float), \
                torch.tensor(done_mask_lst, dtype=torch.float)
    
    def size(self):
        return len(self.buffer)

class PolicyNet(nn.Module):
    def __init__(self, learning_rate):
        super(PolicyNet, self).__init__()
        self.fc1 = nn.Linear(3, 128)
        self.fc_mu = nn.Linear(128,1)
        self.fc_std  = nn.Linear(128,1)
        self.optimizer = optim.Adam(self.parameters(), lr=learning_rate)

        # automatic temperature tuning
        self.log_alpha = torch.tensor(np.log(init_alpha))
        self.log_alpha.requires_grad = True # entropy coeff automatically adjusted using gradient descent
        self.log_alpha_optimizer = optim.Adam([self.log_alpha], lr=lr_alpha)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        mu = self.fc_mu(x)
        std = F.softplus(self.fc_std(x))
        dist = Normal(mu, std) # outputs a stochastic action; normal distribution
        action = dist.rsample() # reparameterization trick for differentiable sampling
        log_prob = dist.log_prob(action)
        real_action = torch.tanh(action)
        real_log_prob = log_prob - torch.log(1-torch.tanh(action).pow(2) + 1e-7) # corrects log prob
        return real_action, real_log_prob

    def train_net(self, q1, q2, mini_batch):
        s, _, _, _, _ = mini_batch
        a, log_prob = self.forward(s)
        entropy = -self.log_alpha.exp() * log_prob

        q1_val, q2_val = q1(s,a), q2(s,a)
        # uses twin Q-networks to prevent overestimations
        q1_q2 = torch.cat([q1_val, q2_val], dim=1)
        # uses min Q-value to update the policy
        min_q = torch.min(q1_q2, 1, keepdim=True)[0]

        # performs gradient ascent to maximize expected reward and entropy
        loss = -min_q - entropy # for gradient ascent
        self.optimizer.zero_grad()
        loss.mean().backward()
        self.optimizer.step()

        # updating alpha
        # encourages policy to explore more when entropy is too low
        self.log_alpha_optimizer.zero_grad()
        alpha_loss = -(self.log_alpha.exp() * (log_prob + target_entropy).detach()).mean()
        alpha_loss.backward()
        self.log_alpha_optimizer.step()

class QNet(nn.Module):
    def __init__(self, learning_rate):
        super(QNet, self).__init__()
        self.fc_s = nn.Linear(3, 64)
        self.fc_a = nn.Linear(1,64)
        self.fc_cat = nn.Linear(128,32)
        self.fc_out = nn.Linear(32,1)
        self.optimizer = optim.Adam(self.parameters(), lr=learning_rate)

    def forward(self, x, a):
        h1 = F.relu(self.fc_s(x))
        h2 = F.relu(self.fc_a(a))
        cat = torch.cat([h1,h2], dim=1) # concat state&action embeddings
        q = F.relu(self.fc_cat(cat))
        q = self.fc_out(q)
        return q

    def train_net(self, target, mini_batch):
        s, a, r, s_prime, done = mini_batch
        loss = F.smooth_l1_loss(self.forward(s, a) , target)
        self.optimizer.zero_grad()
        loss.mean().backward()
        self.optimizer.step()

    # updates target q-networks smoothly
    def soft_update(self, net_target):
        for param_target, param in zip(net_target.parameters(), self.parameters()):
            param_target.data.copy_(param_target.data * (1.0 - tau) + param.data * tau)

def calc_target(pi, q1, q2, mini_batch):
    s, a, r, s_prime, done = mini_batch

    with torch.no_grad():
        # computes SAC target value
        a_prime, log_prob= pi(s_prime)
        entropy = -pi.log_alpha.exp() * log_prob
        q1_val, q2_val = q1(s_prime,a_prime), q2(s_prime,a_prime)
        q1_q2 = torch.cat([q1_val, q2_val], dim=1)
        min_q = torch.min(q1_q2, 1, keepdim=True)[0]
        target = r + gamma * done * (min_q + entropy)

    return target
    
def main():
    env = gym.make('Pendulum-v1')
    memory = ReplayBuffer()
    q1, q2, q1_target, q2_target = QNet(lr_q), QNet(lr_q), QNet(lr_q), QNet(lr_q)
    pi = PolicyNet(lr_pi)

    q1_target.load_state_dict(q1.state_dict())
    q2_target.load_state_dict(q2.state_dict())

    score = 0.0
    print_interval = 20

    for n_epi in range(200):
        s, _ = env.reset()
        done = False
        count = 0

        while count < 200 and not done:
            a, log_prob= pi(torch.from_numpy(s).float())
            s_prime, r, done, truncated, info = env.step([2.0*a.item()])
            memory.put((s, a.item(), r/10.0, s_prime, done))
            score +=r
            s = s_prime
            count += 1
                
        if memory.size()>200:
            for i in range(20):
                mini_batch = memory.sample(batch_size)
                td_target = calc_target(pi, q1_target, q2_target, mini_batch)
                q1.train_net(td_target, mini_batch)
                q2.train_net(td_target, mini_batch)
                entropy = pi.train_net(q1, q2, mini_batch)
                q1.soft_update(q1_target)
                q2.soft_update(q2_target)
                
        if n_epi%print_interval==0 and n_epi!=0:
            print("# of episode :{}, avg score : {:.1f} alpha:{:.4f}".format(n_epi, score/print_interval, pi.log_alpha.exp()))
            score = 0.0

    env.close()

if __name__ == '__main__':
    main()

### 10. PPO continuous

In [None]:
import gymnasium as gym
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Normal

#Hyperparameters
learning_rate  = 0.0003
gamma           = 0.9
lmbda           = 0.9
eps_clip        = 0.2
K_epoch         = 10
rollout_len    = 3
buffer_size    = 10
minibatch_size = 32

class PPO(nn.Module):
    def __init__(self):
        super(PPO, self).__init__()
        self.data = []
        
        self.fc1   = nn.Linear(3,128)
        self.fc_mu = nn.Linear(128,1)
        self.fc_std  = nn.Linear(128,1)
        self.fc_v = nn.Linear(128,1)
        self.optimizer = optim.Adam(self.parameters(), lr=learning_rate)
        self.optimization_step = 0

    def pi(self, x, softmax_dim = 0):
        x = F.relu(self.fc1(x))
        mu = 2.0*torch.tanh(self.fc_mu(x))
        std = F.softplus(self.fc_std(x))
        return mu, std
    
    def v(self, x):
        x = F.relu(self.fc1(x))
        v = self.fc_v(x)
        return v
      
    def put_data(self, transition):
        self.data.append(transition)
        
    def make_batch(self):
        s_batch, a_batch, r_batch, s_prime_batch, prob_a_batch, done_batch = [], [], [], [], [], []
        data = []

        for j in range(buffer_size):
            for i in range(minibatch_size):
                rollout = self.data.pop()
                s_lst, a_lst, r_lst, s_prime_lst, prob_a_lst, done_lst = [], [], [], [], [], []

                for transition in rollout:
                    s, a, r, s_prime, prob_a, done = transition
                    
                    s_lst.append(s)
                    a_lst.append([a])
                    r_lst.append([r])
                    s_prime_lst.append(s_prime)
                    prob_a_lst.append([prob_a])
                    done_mask = 0 if done else 1
                    done_lst.append([done_mask])

                s_batch.append(s_lst)
                a_batch.append(a_lst)
                r_batch.append(r_lst)
                s_prime_batch.append(s_prime_lst)
                prob_a_batch.append(prob_a_lst)
                done_batch.append(done_lst)
                    
            mini_batch = torch.tensor(s_batch, dtype=torch.float), torch.tensor(a_batch, dtype=torch.float), \
                          torch.tensor(r_batch, dtype=torch.float), torch.tensor(s_prime_batch, dtype=torch.float), \
                          torch.tensor(done_batch, dtype=torch.float), torch.tensor(prob_a_batch, dtype=torch.float)
            data.append(mini_batch)

        return data

    def calc_advantage(self, data):
        data_with_adv = []
        for mini_batch in data:
            s, a, r, s_prime, done_mask, old_log_prob = mini_batch
            with torch.no_grad():
                td_target = r + gamma * self.v(s_prime) * done_mask
                delta = td_target - self.v(s)
            delta = delta.numpy()

            advantage_lst = []
            advantage = 0.0
            for delta_t in delta[::-1]:
                advantage = gamma * lmbda * advantage + delta_t[0]
                advantage_lst.append([advantage])
            advantage_lst.reverse()
            advantage = torch.tensor(advantage_lst, dtype=torch.float)
            data_with_adv.append((s, a, r, s_prime, done_mask, old_log_prob, td_target, advantage))

        return data_with_adv

        
    def train_net(self):
        if len(self.data) == minibatch_size * buffer_size:
            data = self.make_batch()
            data = self.calc_advantage(data)

            for i in range(K_epoch):
                for mini_batch in data:
                    s, a, r, s_prime, done_mask, old_log_prob, td_target, advantage = mini_batch

                    mu, std = self.pi(s, softmax_dim=1)
                    dist = Normal(mu, std)
                    log_prob = dist.log_prob(a)
                    ratio = torch.exp(log_prob - old_log_prob)  # a/b == exp(log(a)-log(b))

                    # clipped PPO objective
                    surr1 = ratio * advantage
                    surr2 = torch.clamp(ratio, 1-eps_clip, 1+eps_clip) * advantage
                    loss = -torch.min(surr1, surr2) + F.smooth_l1_loss(self.v(s) , td_target)

                    self.optimizer.zero_grad()
                    loss.mean().backward()
                    nn.utils.clip_grad_norm_(self.parameters(), 1.0)
                    self.optimizer.step()
                    self.optimization_step += 1
        
def main():
    env = gym.make('Pendulum-v1')
    model = PPO()
    score = 0.0
    print_interval = 20
    rollout = []

    for n_epi in range(10000):
        s, _ = env.reset()
        done = False
        count = 0
        while count < 200 and not done:
            for t in range(rollout_len):
                mu, std = model.pi(torch.from_numpy(s).float())
                dist = Normal(mu, std)  # uses stochastic policy for action selection
                a = dist.sample()
                log_prob = dist.log_prob(a) # stores log-probabilities for PPO updates
                s_prime, r, done, truncated, info = env.step([a.item()])

                # stores rollouts steps at a time
                rollout.append((s, a, r/10.0, s_prime, log_prob.item(), done))
                if len(rollout) == rollout_len:
                    model.put_data(rollout)
                    rollout = []

                s = s_prime
                score += r
                count += 1

            model.train_net()

        if n_epi%print_interval==0 and n_epi!=0:
            print("# of episode :{}, avg score : {:.1f}, optmization step: {}".format(n_epi, score/print_interval, model.optimization_step))
            score = 0.0

    env.close()

if __name__ == '__main__':
    main()


# Key features
# 1. uses clipped objective function to ensure stable policy updates
# 2. advantage estimation
# 3. stochastic policy (uses gaussian policies for continuous control)
# 4. gradient clipping 
# 5. batch updates

### 11. V traces

In [None]:
import gymnasium as gym
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical

#Hyperparameters
learning_rate      = 0.0005
gamma              = 0.98
T_horizon          = 20
clip_rho_threshold = 1.0
clip_c_threshold   = 1.0
print_interval     = 20

class Vtrace(nn.Module):
    def __init__(self):
        super(Vtrace, self).__init__()
        self.data = []
        
        self.fc1   = nn.Linear(4,256)
        self.fc_pi = nn.Linear(256,2)
        self.fc_v  = nn.Linear(256,1)
        self.optimizer = optim.Adam(self.parameters(), lr=learning_rate)

        self.clip_rho_threshold = torch.tensor(clip_rho_threshold, dtype=torch.float)
        self.clip_c_threshold = torch.tensor(clip_c_threshold, dtype=torch.float)

    def pi(self, x, softmax_dim = 0):
        x = F.relu(self.fc1(x))
        x = self.fc_pi(x)
        prob = F.softmax(x, dim=softmax_dim)
        return prob
    
    def v(self, x):
        x = F.relu(self.fc1(x))
        v = self.fc_v(x)
        return v
      
    def put_data(self, transition):
        self.data.append(transition)
        
    def make_batch(self):
        s_lst, a_lst, r_lst, s_prime_lst, mu_a_lst, done_lst = [], [], [], [], [], []
        for transition in self.data:
            s, a, r, s_prime, mu_a, done = transition
            
            s_lst.append(s)
            a_lst.append([a])
            r_lst.append([r])
            s_prime_lst.append(s_prime)
            mu_a_lst.append([mu_a])
            done_mask = 0 if done else 1
            done_lst.append([done_mask])
            
        s,a,r,s_prime,done_mask, mu_a = torch.tensor(s_lst, dtype=torch.float), torch.tensor(a_lst), \
                                        torch.tensor(r_lst), torch.tensor(s_prime_lst, dtype=torch.float), \
                                        torch.tensor(done_lst, dtype=torch.float), torch.tensor(mu_a_lst)
        self.data = []
        return s, a, r, s_prime, done_mask, mu_a

    def vtrace(self, s, a, r, s_prime, done_mask, mu_a):
        with torch.no_grad():
            pi = self.pi(s, softmax_dim=1)
            pi_a = pi.gather(1,a)
            v, v_prime = self.v(s), self.v(s_prime)
            ratio = torch.exp(torch.log(pi_a) - torch.log(mu_a))  # a/b == exp(log(a)-log(b))
            
            rhos = torch.min(self.clip_rho_threshold, ratio)
            cs = torch.min(self.clip_c_threshold, ratio).numpy()
            td_target = r + gamma * v_prime * done_mask
            delta = rhos*(td_target - v).numpy()
            
            vs_minus_v_xs_lst = []
            vs_minus_v_xs = 0.0
            vs_minus_v_xs_lst.append([vs_minus_v_xs])
            
            for i in range(len(delta)-1, -1, -1):
                vs_minus_v_xs = gamma * cs[i][0] * vs_minus_v_xs + delta[i][0]
                vs_minus_v_xs_lst.append([vs_minus_v_xs])
            vs_minus_v_xs_lst.reverse()
            
            vs_minus_v_xs = torch.tensor(vs_minus_v_xs_lst, dtype=torch.float)
            vs = vs_minus_v_xs[:-1] + v.numpy()
            vs_prime = vs_minus_v_xs[1:] + v_prime.numpy()
            advantage = r + gamma * vs_prime - v.numpy()
            
        return vs, advantage, rhos

    def train_net(self):
        s, a, r, s_prime, done_mask, mu_a = self.make_batch()
        vs, advantage, rhos = self.vtrace(s, a, r, s_prime, done_mask, mu_a)

        pi = self.pi(s, softmax_dim=1)
        pi_a = pi.gather(1,a)
       
        val_loss = F.smooth_l1_loss(self.v(s) , vs)
        pi_loss = -rhos * torch.log(pi_a) * advantage
        loss =  pi_loss + val_loss

        self.optimizer.zero_grad()
        loss.mean().backward()
        self.optimizer.step()
        
def main():
    env = gym.make('CartPole-v1')
    model = Vtrace()
    score = 0.0
    
    for n_epi in range(10000):
        s, _ = env.reset()
        done = False
        while not done:
            for t in range(T_horizon):
                prob = model.pi(torch.from_numpy(s).float())
                m = Categorical(prob)
                a = m.sample().item()
                s_prime, r, done, truncated, info = env.step(a)

                model.put_data((s, a, r/100.0, s_prime, prob[a].item(), done))
                s = s_prime

                score += r
                if done:
                    break

            model.train_net()

        if n_epi%print_interval==0 and n_epi!=0:
            print("# of episode :{}, avg score : {:.1f}".format(n_epi, score/print_interval))
            score = 0.0

    env.close()

if __name__ == '__main__':
    main()