In [1]:
import torch
import torch.optim as optim
from torch.distributions import Normal
import torch.nn as nn
import numpy as np
from gym.wrappers.monitoring.video_recorder import VideoRecorder
import warnings
from typing import Union
from utils import ReplayBuffer, get_env, run_episode

warnings.filterwarnings("ignore", category=DeprecationWarning)
warnings.filterwarnings("ignore", category=UserWarning)

Neural network

In [2]:
class NeuralNetwork(nn.Module):
    '''
    This class implements a neural network with a variable number of hidden layers and hidden units.
    You may use this function to parametrize your policy and critic networks.
    '''
    def __init__(self, input_dim: int, output_dim: int, hidden_size: int, 
                                hidden_layers: int, activation: str):
        super(NeuralNetwork, self).__init__()

        # TODO: Implement this function which should define a neural network 
        # with a variable number of hidden layers and hidden units.
        # Here you should define layers which your network will use.
        self.input_dim = input_dim
        self.output_dim = output_dim
        self.hidden_size = hidden_size
        self.hidden_layers = hidden_layers
        self.activations = {
            'relu': nn.ReLU(),
            'sigmoid': nn.Sigmoid(),
            'tanh': nn.Tanh()
            }
        self.activation = self.activations[activation]
        self.input = nn.Linear(self.input_dim, self.hidden_size)
        self.linears = nn.ModuleList([nn.Linear(self.hidden_size,self.hidden_size) for i in range(self.hidden_layers)])
        self.putput = nn.Linear(self.hidden_size, self.output_dim)

    def forward(self, s: torch.Tensor) -> torch.Tensor:
        # TODO: Implement the forward pass for the neural network you have defined.
        #pass
        s = self.input(s)
        s = self.activation(s)
        for i in range(0,self.hidden_layers):
            s = self.linears[i](s)
            s = self.activation(s)
        s = self.putput(s)
        s = self.activation(s)
        return s


Actor and Critic

In [24]:
class Actor:
    def __init__(self,hidden_size: int, hidden_layers: int, actor_lr: float,
                state_dim: int = 3, action_dim: int = 1, device: torch.device = torch.device('cpu')):
        super(Actor, self).__init__()

        self.hidden_size = hidden_size
        self.hidden_layers = hidden_layers
        self.actor_lr = actor_lr
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.device = device
        self.LOG_STD_MIN = -20
        self.LOG_STD_MAX = 2
        self.setup_actor()

    def setup_actor(self):
        '''
        This function sets up the actor network in the Actor class.
        '''
        # TODO: Implement this function which sets up the actor network. 
        # Take a look at the NeuralNetwork class in utils.py. 
        
        self.NN_actor = NeuralNetwork(self.state_dim, 
                                      self.action_dim + 1, 
                                      self.hidden_size, 
                                      self.hidden_layers, "relu")
        
        # ---- other parameters for training ------

        self.optimizer = optim.Adam(self.NN_actor.parameters(), lr=self.actor_lr)

    def clamp_log_std(self, log_std: torch.Tensor) -> torch.Tensor:
        '''
        :param log_std: torch.Tensor, log_std of the policy.
        Returns:
        :param log_std: torch.Tensor, log_std of the policy clamped between LOG_STD_MIN and LOG_STD_MAX.
        '''
        return torch.clamp(log_std, self.LOG_STD_MIN, self.LOG_STD_MAX)

    def get_action_and_log_prob(self, state: torch.Tensor, 
                                deterministic: bool) -> (torch.Tensor, torch.Tensor):
        '''
        :param state: torch.Tensor, state of the agent
        :param deterministic: boolean, if true return a deterministic action 
                                otherwise sample from the policy distribution.
        Returns:
        :param action: torch.Tensor, action the policy returns for the state.
        :param log_prob: log_probability of the the action.
        '''
        assert state.shape == (3,) or state.shape[1] == self.state_dim, 'State passed to this method has a wrong shape'
        action , log_prob = torch.zeros(state.shape[0], 1), torch.ones(state.shape[0], 1)
        # TODO: Implement this function which returns an action and its log probability.
        # If working with stochastic policies, make sure that its log_std are clamped 
        # using the clamp_log_std function.
        #if deterministic == False:
        #log_std = self.clamp_log_std(log_std)

        print("shape", action.shape)

        assert action.shape == (state.shape[0], self.action_dim) and \
            log_prob.shape == (state.shape[0], self.action_dim), 'Incorrect shape for action or log_prob.'
        return action, log_prob


class Critic:
    def __init__(self, hidden_size: int, 
                 hidden_layers: int, critic_lr: int, state_dim: int = 3, 
                    action_dim: int = 1,device: torch.device = torch.device('cpu')):
        super(Critic, self).__init__()
        self.hidden_size = hidden_size
        self.hidden_layers = hidden_layers
        self.critic_lr = critic_lr
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.device = device
        self.setup_critic()

    def setup_critic(self):
        # TODO: Implement this function which sets up the critic(s). Take a look at the NeuralNetwork 
        # class in utils.py. Note that you can have MULTIPLE critic networks in this class.
        #pass
        #We set the output to 1, but are not sure if the expected value returns a vector
        self.NN_critic_V = NeuralNetwork(self.state_dim, 1, self.hidden_size, self.hidden_layers, "relu")    #---------> Still unsure what they mean by multiple critic networks

        # ---- other parameters for training ------

        self.optimizer = optim.Adam(self.NN_critic_V.parameters(), lr=self.critic_lr)

        # alpha parameter in the SAC paper
        self.temperature = TrainableParameter(init_param = 0.005, 
                                              lr_param = 0.1,
                                              train_param = True)

In [25]:
C = Critic(hidden_size = 25, 
            hidden_layers = 3,
            critic_lr = 0.1)

print("Neural network", C.NN_critic_V.parameters()) # network parameters
print("Temperature", C.temperature)

print("input dim is", C.NN_critic_V.input)

input = torch.tensor([1.,2.,2.]) # convert inputs to float (important)

C.temperature.get_param() * C.NN_critic_V.forward(input)

print(C.temperature.get_param()) # part of a computation graph

Neural network <generator object Module.parameters at 0x123d47b50>
Temperature <__main__.TrainableParameter object at 0x123de1c90>
input dim is Linear(in_features=3, out_features=25, bias=True)
tensor(0.0050, dtype=torch.float64, grad_fn=<ExpBackward0>)


Trainable Parameter:

In [26]:
class TrainableParameter:
    '''
    This class could be used to define a trainable parameter in your method. You could find it 
    useful if you try to implement the entropy temerature parameter for SAC algorithm.
    '''
    def __init__(self, init_param: float, lr_param: float, 
                 train_param: bool, device: torch.device = torch.device('cpu')):
        
        self.log_param = torch.tensor(np.log(init_param), requires_grad=train_param, device=device)
        self.optimizer = optim.Adam([self.log_param], lr=lr_param)

    def get_param(self) -> torch.Tensor:
        return torch.exp(self.log_param)

    def get_log_param(self) -> torch.Tensor:
        return self.log_param

Agent:

In [28]:
class Agent:
    def __init__(self):
        # Environment variables. You don't need to change this.
        self.state_dim = 3  # [cos(theta), sin(theta), theta_dot]
        self.action_dim = 1  # [torque] in[-1,1]
        self.batch_size = 200 # each batch is an episode
        self.min_buffer_size = 1000
        self.max_buffer_size = 100000
        # If your PC possesses a GPU, you should be able to use it for training, 
        # as self.device should be 'cuda' in that case.
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        print("Using device: {}".format(self.device))
        self.memory = ReplayBuffer(self.min_buffer_size, self.max_buffer_size, self.device)
        
        self.setup_agent()

    def setup_agent(self):
        # TODO: Setup off-policy agent with policy and critic classes. 
        # Feel free to instantiate any other parameters you feel you might need.   

        self.actor = Actor(hidden_size = 25, 
                             hidden_layers = 3,
                             actor_lr = 0.1)
        
        self.critic = Critic(hidden_size = 25, 
                             hidden_layers = 3,
                             critic_lr = 0.1)

        #Name parameters from the paper

        self.Tau = 0.005

    def get_action(self, s: np.ndarray, train: bool) -> np.ndarray:
        """
        :param s: np.ndarray, state of the pendulum. shape (3, )
        :param train: boolean to indicate if you are in eval or train mode. 
                    You can find it useful if you want to sample from deterministic policy.
        :return: np.ndarray,, action to apply on the environment, shape (1,)
        """
        # TODO: Implement a function that returns an action from the policy for the state s.
        #action = np.random.uniform(-1, 1, (1,))
        # Convert the state to a torch tensor, which is the required input for the actor
        s = torch.tensor(s)
        #Import action from the actor and discard the log probability here, possibly used elsewhere
        action, _ = self.actor.get_action_and_log_prob(s, not(train))
        # only get one action -> we have to sample in get_action_and_log_prob
        #Convert the returned tensor action to an nd.array
        action = action[0].numpy()
        #Need log probability for something -------> ?

        assert action.shape == (1,), 'Incorrect action shape.'
        assert isinstance(action, np.ndarray ), 'Action dtype must be np.ndarray' 
        return action

    @staticmethod
    # Union[Actor, Critic] means that object is either Actor or Critic
    def run_gradient_update_step(object: Union[Actor, Critic], loss: torch.Tensor):
        '''
        This function takes in a object containing trainable parameters and an optimizer, 
        and using a given loss, runs one step of gradient update. If you set up trainable parameters 
        and optimizer inside the object, you could find this function useful while training.
        :param object: object containing trainable parameters and an optimizer
        '''
        object.optimizer.zero_grad() 
        loss.mean().backward() 
        object.optimizer.step() 

        # what does this return? nothing of interest
        # performs update of network inside of the object
        # also updates trainable parameters (temperature for critic network)

    def critic_target_update(self, base_net: NeuralNetwork, target_net: NeuralNetwork, 
                             tau: float, soft_update: bool):
        '''
        This method updates the target network parameters using the source network parameters.
        If soft_update is True, then perform a soft update, otherwise a hard update (copy).
        :param base_net: source network
        :param target_net: target network
        :param tau: soft update parameter
        :param soft_update: boolean to indicate whether to perform a soft update or not
        '''
        for param_target, param in zip(target_net.parameters(), base_net.parameters()):
            if soft_update:
                param_target.data.copy_(param_target.data * (1.0 - tau) + param.data * tau)
            else:
                param_target.data.copy_(param.data)

    def train_agent(self):  #------------> ? Christoph is a bit confused, but we need to implement phi, psi and theta gradient updates
        '''
        This function represents one training iteration for the agent. It samples a batch 
        from the replay buffer,and then updates the policy and critic networks 
        using the sampled batch.
        '''
        # TODO: Implement one step of training for the agent.
        # Hint: You can use the run_gradient_update_step for each policy and critic.
        # Example: self.run_gradient_update_step(self.policy, policy_loss)

        # Batch sampling
        batch = self.memory.sample(self.batch_size)
        s_batch, a_batch, r_batch, s_prime_batch = batch

        print("size of batch", batch.size)
        
        # ---- TODO: Implement Critic(s) update here.

        # compute critic loss given the batch (is this a float?) => J_Q value
        temp = self.critic.temperature.get_param()
        print("temperature is", temp)
        value = 0. + temp * 0. # probs with a loop
        critic_loss = torch.tensor(value)

        # store current network (used for bootstrap estimate
        base_net = self.critic.NN_critic_V

        # perform update step  
        self.run_gradient_update_step(self.critic, critic_loss)

        print("updated parameter temperature is", self.self.critic.temperature.get_param())

        self.critic_target_update(base_net = base_net, 
                                 target_net = self.critic.NN_critic_V, 
                                 tau = self.Tau, soft_update =  True)


        # ----- TODO: Implement Policy update here

        # compute policy loss given the batch, J_pi value
        policy_loss = 0.123

        # perform update step  
        self.run_gradient_update_step(self.actor, policy_loss)








In [29]:
a = Agent()

s = np.array([1., 2., 3.])

a.get_action(s, True)

Using device: cpu
shape torch.Size([3, 1])


array([0.], dtype=float32)

Main routine:

In [None]:
TRAIN_EPISODES = 50
TEST_EPISODES = 300

# You may set the save_video param to output the video of one of the evalution episodes, or 
# you can disable console printing during training and testing by setting verbose to False.
save_video = False
verbose = True

agent = Agent()
env = get_env(g=10.0, train=True)

for EP in range(TRAIN_EPISODES):
    run_episode(env, agent, None, verbose, train=True)

if verbose:
    print('\n')

test_returns = []
env = get_env(g=10.0, train=False)

if save_video:
    video_rec = VideoRecorder(env, "pendulum_episode.mp4")

for EP in range(TEST_EPISODES):
    rec = video_rec if (save_video and EP == TEST_EPISODES - 1) else None
    with torch.no_grad():
        episode_return = run_episode(env, agent, rec, verbose, train=False)
    test_returns.append(episode_return)

avg_test_return = np.mean(np.array(test_returns))

print("\n AVG_TEST_RETURN:{:.1f} \n".format(avg_test_return))

if save_video:
    video_rec.close()