In [1]:
import os
import gym
import torch
import torch.nn as nn
import numpy as np
# from colabgymrender.recorder import Recorder

# Print versions of gym, torch, and numpy
print(f"Gym version: {gym.__version__}")
print(f"Torch version: {torch.__version__}")
print(f"Numpy version: {np.__version__}")


env = gym.make('BipedalWalker-v3', hardcore=True)
# env = gym.make('BipedalWalker-v3')
s_dim = env.observation_space.shape[0]
a_dim = env.action_space.shape[0]
print(s_dim)
print(a_dim)

Gym version: 0.25.2
Torch version: 2.2.2+cpu
Numpy version: 1.26.3
24
4


  from pkg_resources import resource_stream, resource_exists
  deprecation(
  deprecation(


In [2]:
import torch.distributions as dist

In [3]:
import warnings
warnings.filterwarnings('ignore')

In [5]:
import gym
from collections import deque
import numpy as np

# https://alexandervandekleut.github.io/gym-wrappers/

class MyWalkerWrapper(gym.Wrapper):
    '''
    This is custom wrapper for BipedalWalker-v3 and BipedalWalkerHardcore-v3.
    Rewards for failure is decreased to make agent brave for exploration and
    time frequency of dynamic is lowered by skipping two frames.
    '''
    def __init__(self, env, skip=2):
        super().__init__(env)
        self._obs_buffer = deque(maxlen=skip)
        self._skip = skip
        self._max_episode_steps = 1200

    def step(self, action):
        total_reward = 0
        for i in range(self._skip):
            obs, reward, done, info = self.env.step(action)
            if self.env.game_over:
                reward = -10.0
                info["dead"] = True
            else:
            	info["dead"] = False

            self._obs_buffer.append(obs)
            total_reward += reward
            if done:
                break

        return obs, total_reward, done, info

    def reset(self):
        return self.env.reset()

    def render(self, mode="human"):
        for _ in range(self._skip):
            out = self.env.render(mode=mode)
        return out


In [6]:
env = MyWalkerWrapper(env, skip=2)

In [7]:
from collections import deque, namedtuple
import random
import torch
import numpy as np

class ReplayBuffer:
    """Simle experience replay buffer for deep reinforcement algorithms."""
    def __init__(self, action_size, buffer_size, batch_size, device):
        self.action_size = action_size
        self.memory = deque(maxlen=buffer_size)  
        self.device = device
        self.batch_size = batch_size
        self.experience = namedtuple("Experience", field_names=["state", "action", "reward", "next_state", "done"])
        
    
    def add(self, state, action, reward, next_state, done):
        e = self.experience(state, action, reward, next_state, done)
        self.memory.append(e)
    
    def sample(self):
        experiences = random.sample(self.memory, k=self.batch_size)

        states = torch.from_numpy(np.stack([e.state for e in experiences if e is not None], axis=0)).float().to(self.device)
        actions = torch.from_numpy(np.stack([e.action for e in experiences if e is not None], axis=0)).float().to(self.device)
        rewards = torch.from_numpy(np.stack([e.reward for e in experiences if e is not None], axis=0)).float().unsqueeze(-1).to(self.device)
        next_states = torch.from_numpy(np.stack([e.next_state for e in experiences if e is not None], axis=0)).float().to(self.device)
        dones = torch.from_numpy(np.stack([e.done for e in experiences if e is not None], axis=0).astype(np.uint8)).float().unsqueeze(-1).to(self.device)

        return (states, actions, rewards, next_states, dones)

    def __len__(self):
        return len(self.memory)

In [10]:
import torch
from torch import optim
import numpy as np
import os

from itertools import chain

class SACAgent():
    rl_type = 'sac'
    def __init__(self, Actor, Critic, clip_low, clip_high, state_size=24, action_size=4, update_freq=int(1),
            lr=4e-4, weight_decay=0, gamma=0.98, alpha=0.01, tau=0.01, batch_size=64, buffer_size=int(500000), device=None):
        
        self.state_size = state_size
        self.action_size = action_size
        self.update_freq = update_freq

        self.learn_call = int(0)

        self.alpha = alpha
        self.gamma = gamma
        self.tau = tau
        self.batch_size = batch_size

        if device is None:
            self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") 
        else:
            self.device = torch.device(device)

        self.clip_low = torch.tensor(clip_low)
        self.clip_high = torch.tensor(clip_high)

        self.train_actor = Actor(stochastic=True).to(self.device)
        self.actor_optim = torch.optim.AdamW(self.train_actor.parameters(), lr=lr, weight_decay=weight_decay, amsgrad=True)
        print(f'Number of paramters of Actor Net: {sum(p.numel() for p in self.train_actor.parameters())}')
        
        self.train_critic_1 = Critic().to(self.device)
        self.target_critic_1 = Critic().to(self.device).eval()
        self.hard_update(self.train_critic_1, self.target_critic_1) # hard update at the beginning
        self.critic_1_optim = torch.optim.AdamW(self.train_critic_1.parameters(), lr=lr, weight_decay=weight_decay, amsgrad=True)

        self.train_critic_2 = Critic().to(self.device)
        self.target_critic_2 = Critic().to(self.device).eval()
        self.hard_update(self.train_critic_2, self.target_critic_2) # hard update at the beginning
        self.critic_2_optim = torch.optim.AdamW(self.train_critic_2.parameters(), lr=lr, weight_decay=weight_decay, amsgrad=True)
        print(f'Number of paramters of Single Critic Net: {sum(p.numel() for p in self.train_critic_2.parameters())}')
        
        self.memory= ReplayBuffer(action_size= action_size, buffer_size= buffer_size, \
            batch_size= self.batch_size, device=self.device)

        self.mse_loss = torch.nn.MSELoss()
        
    def learn_with_batches(self, state, action, reward, next_state, done):
        self.memory.add(state, action, reward, next_state, done)
        self.learn_one_step()

    def learn_one_step(self):
        if(len(self.memory)>self.batch_size):
            exp=self.memory.sample()
            self.learn(exp)        
            
    def learn(self, exp):
        self.learn_call+=1
        states, actions, rewards, next_states, done = exp
        
        #update critic
        with torch.no_grad():
            next_actions, next_entropies = self.train_actor(next_states)
            Q_targets_next_1 = self.target_critic_1(next_states, next_actions)
            Q_targets_next_2 = self.target_critic_2(next_states, next_actions)
            Q_targets_next = torch.min(Q_targets_next_1, Q_targets_next_2) + self.alpha * next_entropies
            Q_targets = rewards + (self.gamma * Q_targets_next * (1-done))
            #Q_targets = rewards + (self.gamma * Q_targets_next)

        Q_expected_1 = self.train_critic_1(states, actions)
        critic_1_loss = self.mse_loss(Q_expected_1, Q_targets)
        #critic_1_loss = torch.nn.SmoothL1Loss()(Q_expected_1, Q_targets)
        
        self.critic_1_optim.zero_grad(set_to_none=True)
        critic_1_loss.backward()
        #torch.nn.utils.clip_grad_norm_(self.train_critic_1.parameters(), 1)
        self.critic_1_optim.step()

        Q_expected_2 = self.train_critic_2(states, actions)   
        critic_2_loss = self.mse_loss(Q_expected_2, Q_targets)
        #critic_2_loss = torch.nn.SmoothL1Loss()(Q_expected_2, Q_targets)
        
        self.critic_2_optim.zero_grad(set_to_none=True)
        critic_2_loss.backward()
        #torch.nn.utils.clip_grad_norm_(self.train_critic_2.parameters(), 1)
        self.critic_2_optim.step()

        #update actor
        actions_pred, entropies_pred = self.train_actor(states)
        Q_pi = torch.min(self.train_critic_1(states, actions_pred), self.train_critic_2(states, actions_pred))
        actor_loss = -(Q_pi + self.alpha * entropies_pred).mean()
        
        self.actor_optim.zero_grad(set_to_none=True)
        actor_loss.backward()
        #torch.nn.utils.clip_grad_norm_(self.train_actor.parameters(), 1)
        self.actor_optim.step()

        if self.learn_call % self.update_freq == 0:
            self.learn_call = 0        
            #using soft upates
            self.soft_update(self.train_critic_1, self.target_critic_1)
            self.soft_update(self.train_critic_2, self.target_critic_2)

    @torch.no_grad()        
    def get_action(self, state, explore=True):
        #self.train_actor.eval()
        state = torch.from_numpy(state).unsqueeze(0).float().to(self.device)
        #with torch.no_grad():
        action, entropy = self.train_actor(state, explore=explore)
        action = action.cpu().data.numpy()[0]
        #self.train_actor.train()
        return action
    
    def soft_update(self, local_model, target_model):
        for target_param, local_param in zip(target_model.parameters(), local_model.parameters()):
            target_param.data.copy_(self.tau*local_param.data + (1.0-self.tau)*target_param.data)

    def hard_update(self, local_model, target_model):
        for target_param, local_param in zip(target_model.parameters(), local_model.parameters()):
            target_param.data.copy_(local_param.data)
             


    def save_ckpt(self,save_path="Models/sac/sac_bipedal_4.pt"):
        save_path = save_path
        # Create a dictionary containing the state_dict of the actor, critic_1, and critic_2
        checkpoint = {
            'actor': self.train_actor.state_dict(),
            'critic_1': self.train_critic_1.state_dict(),
            'critic_2': self.train_critic_2.state_dict()
        }
        
        # Save the dictionary to a single file
        torch.save(checkpoint, save_path)

    def load_ckpt(self,load_path="Models/sac/sac_bipedal_4.pt"):
        load_path = load_path
        
        # Load the checkpoint dictionary
        try:
            checkpoint = torch.load(load_path)
            
            # Load the state_dicts into the models
            self.train_actor.load_state_dict(checkpoint['actor'])
            self.train_critic_1.load_state_dict(checkpoint['critic_1'])
            self.train_critic_2.load_state_dict(checkpoint['critic_2'])
            print(f"Model loaded successfully: {load_path}")
        except:
            print("Failed to load")



    def train_bipedal_walker(self, env, num_episodes=500, max_timesteps=2000, render=False):
        """
        Trains a Soft Actor-Critic agent on the Bipedal Walker environment.

        :param agent: SACAgent object
        :param env: Bipedal Walker environment (gym.make('BipedalWalker-v3'))
        :param num_episodes: Number of episodes to train for
        :param max_timesteps: Maximum number of timesteps per episode
        :param render: Whether to render the environment or not
        """
        rewards_history = []
        for episode in range(1, num_episodes + 1):
            state = env.reset()
            episode_reward = 0

            for t in range(max_timesteps):
                if render:
                    env.render()

                # Get action from the agent
                action = self.get_action(state, explore=True)

                # Take the action in the environment
                next_state, reward, done, _ = env.step(action)

                # Clip reward to avoid large fluctuations
                reward = np.clip(reward, -10, 10)

                # Store the experience in replay buffer
                self.learn_with_batches(state, action, reward, next_state, done)

                # Move to the next state
                state = next_state
                episode_reward += reward

                if done:
                    break

            rewards_history.append(episode_reward)
            avg_reward = np.mean(rewards_history[-100:])

            print(f"Episode {episode}/{num_episodes}, Reward: {episode_reward:.2f}, Avg Reward (Last 100): {avg_reward:.2f}")

            if episode % 5 == 0:
                self.save_ckpt()

        env.close()
        return rewards_history





In [11]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import gym
import random
from torch.distributions import Normal

EPS = 0.003

class FeedForwardEncoder(nn.Module):
    def __init__(self, input_size, hidden_size, ff_size):
        super(FeedForwardEncoder, self).__init__()
        self.embedding = nn.Linear(input_size, hidden_size)
        nn.init.xavier_uniform_(self.embedding.weight)
        nn.init.zeros_(self.embedding.bias)
        self.block = nn.Sequential(nn.LayerNorm(hidden_size), nn.Linear(hidden_size, ff_size), nn.GELU(), nn.Linear(ff_size, hidden_size))

    def forward(self, x):
        x = self.embedding(x)
        x = x + self.block(x)
        return x

class Critic(nn.Module):

    def __init__(self, state_dim=24, action_dim=4):
        """
        :param state_dim: Dimension of input state (int)
        :param action_dim: Dimension of input action (int)
        :return:
        """
        super(Critic, self).__init__()

        self.state_dim = state_dim
        self.action_dim = action_dim

        self.state_encoder = FeedForwardEncoder(self.state_dim, 96, 192)

        self.fc2 = nn.Linear(96 + self.action_dim, 192)
        nn.init.xavier_uniform_(self.fc2.weight, gain=nn.init.calculate_gain('tanh'))
        
        self.fc_out = nn.Linear(192, 1, bias=False)
        nn.init.uniform_(self.fc_out.weight, -0.003,+0.003)

        self.act = nn.Tanh()

    def forward(self, state, action):
        """
        returns Value function Q(s,a) obtained from critic network
        :param state: Input state (Torch Variable : [n,state_dim] )
        :param action: Input Action (Torch Variable : [n,action_dim] )
        :return: Value function : Q(S,a) (Torch Variable : [n,1] )
        """
        s = self.state_encoder(state)
        x = torch.cat((s,action),dim=1)
        x = self.act(self.fc2(x))
        x = self.fc_out(x)*10
        return x


class Actor(nn.Module):

    def __init__(self, state_dim=24, action_dim=4, stochastic=False):
        """
        :param state_dim: Dimension of input state (int)
        :param action_dim: Dimension of output action (int)
        :param action_lim: Used to limit action in [-action_lim,action_lim]
        :return:
        """
        super(Actor, self).__init__()

        self.state_dim = state_dim
        self.action_dim = action_dim
        self.stochastic = stochastic

        self.state_encoder = FeedForwardEncoder(self.state_dim, 96, 192)

        self.fc = nn.Linear(96, action_dim, bias=False)
        nn.init.uniform_(self.fc.weight, -0.003,+0.003)
        #nn.init.zeros_(self.fc.bias)

        if self.stochastic:
            self.log_std = nn.Linear(96, action_dim, bias=False)
            nn.init.uniform_(self.log_std.weight, -0.003,+0.003)
            #nn.init.zeros_(self.log_std.bias)   

        self.tanh = nn.Tanh()


    def forward(self, state, explore=True):
        """
        returns either:
        - deterministic policy function mu(s) as policy action.
        - stochastic action sampled from tanh-gaussian policy, with its entropy value.
        this function returns actions lying in (-1,1) 
        :param state: Input state (Torch Variable : [n,state_dim] )
        :return: Output action (Torch Variable: [n,action_dim] )
        """
        s = self.state_encoder(state)
        if self.stochastic:
            means = self.fc(s)
            log_stds = self.log_std(s)
            log_stds = torch.clamp(log_stds, min=-10.0, max=2.0)
            stds = log_stds.exp()
            #print(stds)
            dists = Normal(means, stds)
            if explore:
                x = dists.rsample()
            else:
                x = means
            actions = self.tanh(x)
            log_probs = dists.log_prob(x) - torch.log(1-actions.pow(2) + 1e-6)
            entropies = -log_probs.sum(dim=1, keepdim=True)
            return actions, entropies

        else:
            actions = self.tanh(self.fc(s))
            return actions


In [18]:
state_size = env.observation_space.shape[0]
action_size = env.action_space.shape[0]
agent = SACAgent(Actor, Critic, clip_low=env.action_space.low, clip_high=env.action_space.high, state_size=state_size, action_size=action_size)
agent.load_ckpt(load_path="Models/sac/sac_bipedal.pt")

Number of paramters of Actor Net: 40512
Number of paramters of Single Critic Net: 59328
Model loaded successfully: Models/sac/sac_bipedal.pt


In [14]:
rewards_history = agent.train_bipedal_walker(env, num_episodes=10, max_timesteps=1600, render=False)

Episode 1/10, Reward: 1.10, Avg Reward (Last 100): 1.10
Episode 2/10, Reward: 0.10, Avg Reward (Last 100): 0.60
Episode 3/10, Reward: -0.38, Avg Reward (Last 100): 0.28
Episode 4/10, Reward: 120.54, Avg Reward (Last 100): 30.34
Episode 5/10, Reward: 6.80, Avg Reward (Last 100): 25.63
Episode 6/10, Reward: 70.91, Avg Reward (Last 100): 33.18
Episode 7/10, Reward: -7.88, Avg Reward (Last 100): 27.31
Episode 8/10, Reward: 36.18, Avg Reward (Last 100): 28.42
Episode 9/10, Reward: -3.69, Avg Reward (Last 100): 24.85
Episode 10/10, Reward: 26.41, Avg Reward (Last 100): 25.01


In [15]:
def test_bipedal_walker(agent, env, num_episodes=5, max_timesteps=2000, render=True):
    """
    Trains a Soft Actor-Critic agent on the Bipedal Walker environment.
    
    :param agent: SACAgent object
    :param env: Bipedal Walker environment (gym.make('BipedalWalker-v3'))
    :param num_episodes: Number of episodes to train for
    :param max_timesteps: Maximum number of timesteps per episode
    :param render: Whether to render the environment or not
    """
    rewards_history = []
    for episode in range(1, num_episodes + 1):
        state = env.reset()
        
        episode_reward = 0

        for t in range(max_timesteps):
            if render:
                env.render()

            # Get action from the agent
            action = agent.get_action(state, explore=True)
            
            # Take the action in the environment
            next_state, reward, done, _ = env.step(action)
              # Move to the next state
            state = next_state
            episode_reward += reward

            if done:
                break
        
        rewards_history.append(episode_reward)
        avg_reward = np.mean(rewards_history[-100:])

        print(f"Episode {episode}/{num_episodes}, Reward: {episode_reward:.2f}, Avg Reward (Last 100): {avg_reward:.2f}")

       

    env.close()


In [19]:
env = gym.make('BipedalWalker-v3', hardcore=True,render_mode='human')
env = MyWalkerWrapper(env, skip=2)
test_bipedal_walker(agent, env)

Episode 1/5, Reward: 2.60, Avg Reward (Last 100): 2.60
Episode 2/5, Reward: 12.97, Avg Reward (Last 100): 7.79
Episode 3/5, Reward: 45.67, Avg Reward (Last 100): 20.41
Episode 4/5, Reward: 2.16, Avg Reward (Last 100): 15.85
Episode 5/5, Reward: 280.30, Avg Reward (Last 100): 68.74
