In [8]:
%load_ext autoreload
%autoreload 2

import numpy as np
import torch
from torch import nn 

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


# Teaching a quadruped to walk  

Time to try out the learning algorithms that you just implemented on a more difficult problem. The WalkerEnv implements a quadruped robot kind-of thing, see for yourself. The goal is to move in the $x$ direction as fast and as far as possible. 

Your goal is to implement a class ``WalkerPolicy`` with function ``determine_actions()`` just like the StochasticPolicy we used earlier to control the pendulum. Below is a template of this class, but feel free to alter it however you want. The only important thing is the ``determine_actions()`` function! 

After you implement it, copy ``WalkerPolicy`` into a separate file ``WalkerPolicy.py`` that you will upload to BRUTE together with the (optional) learned weights in a zip file. How the policy is implemented is up to you! You are constrained to only the libraries we used so far though, such as torch, numpy etc..

You will get some free points just for uploading a working policy (irrelevant of the performance). Further 2 points will be awarded for successfully traversing a small distance in the x direction.

Do mind, that the evaluation system might uses ``torch==1.12.1``.

# Hints

There is no single easy way of doing this, but here are some suggestions on what you could try to improve your policy:

1) This problem is much more difficult, than balancing a pendulum. It is a good idea to use a bit larger network than for the pendulum policy.

2) You can also try to use a different optimizer, such as Adam and play with the hyperparameters.

3) Using a neural network to compute the normal distribution scale $\sigma$ can lead to too much randomness in the actions (i.e. exploration). You can use a fixed $\sigma$ instead, or replace it with a learnable ```torch.Parameter``` initialized to some small constant. Make sure, you run it through an exponential, or softplus function to ensure $\sigma$ is positive.

4) The exploration can also be reduced by penalizing the variance of the action distribution in an additional loss term. 

5) If you see some undesirable behaviour, you can tweak the reward function to penalize it. Even though the $x$ distance is all we care about, adding extra terms to the reward can help guide the learning process (This is known as reward shaping). Simply define a reward function mapping the state $s_{t+1}$ and action $a_t$ to a scalar reward $r_t$ and put it in the config dictionary under the key ```'reward_fcn'```. See the ```WalkerEnv``` class for the implementation of the default reward.

6) Using the normal distribution on a bounded action space can lead to certain problems caused by action clipping. This can be mitigated by using a different distribution, such as the Beta distribution. See the ```torch.distributions.beta``` module for more information. (Note that Beta distribution is defined on the interval [0,1] and works better with parameters $\alpha,\beta \geq 1$.)

In [9]:
from environment.WalkerEnv import WalkerEnv
from solution import ppo_loss
from torch import nn
from torch.distributions.normal import Normal
from solution import discount_cum_sum
from solution import policy_gradient_loss_advantages, value_loss, policy_gradient_loss_discounted 

In [10]:
from environment.WalkerEnv import base_config
def test_policy(pi, T=512, config=base_config, deterministic=True):
    test_env = WalkerEnv(config)
    mean_reward = 0
    
    s = test_env.vector_reset()
    for i in range(T):
        with torch.no_grad():
            if deterministic:
                actions = pi.determine_actions(torch.tensor(s).float())  # use deterministic actions based on the states
            else:
                actions = pi.sample_actions(torch.tensor(s).float())  # use random actions conditioned on the states
        s, r = test_env.vector_step(actions.numpy())
        mean_reward += sum(r)/(T*config['N'])
        
    test_env.close()
    return mean_reward

In [11]:
# def walker_reward(state, action):
#     """reward function for the walker environment, state is [29] vector, action is [8] vector"""
#     pos = state[:15]  # first 15 elements of state vector are generalized coordinates [xyz, quat, joint_angles]
#     vel = state[15:]  # last 14 elements of state vector are generalized velocities [xyz_vel, omega, joint_velocities]
    
#     joint_angles = state[16:24]  # Assuming joint angles are at indices 16 to 23
#     # print(joint_angles)
#     inactive_legs = [i for i, angle in enumerate(joint_angles) if angle < 0.2]
    
    
    
#     return pos[0] + vel[0]# - 0.1 * len(inactive_legs)  # return the x velocity as the reward by default


# def sample_trajectories(env, pi, T):    
#     """given an environment env, a stochastic policy pi and number of timesteps T, interact with the environment for T steps 
#     using actions sampled from policy. Return torch tensors of collected states, actions and rewards"""
#     states = np.zeros((T + 1, N, env.num_states), dtype=float)  # states from s(0) to s(T+1)
#     actions = np.zeros((T, N, env.num_actions), dtype=float)  # actions from a(0) to a(T)
#     rewards = np.zeros((T, N), dtype=float)  # rewards from r(0) to r(T)
    
#     s = env.vector_reset()
#     states[0] = s
#     for t in range(T):
#         a = pi.sample_actions(torch.tensor(states[t]).float())  # policy needs float torch tensor (N, state_dim)
#         s_next, r = env.vector_step(np.array(a))  # env needs numpy array of (Nx1)
#         states[t + 1], actions[t], rewards[t] = s_next, a, r    
        
#     tensor_s = torch.tensor(states).float()  # (T+1, N, state_dim)  care for the extra timestep at the end!
#     tensor_a = torch.tensor(actions).float()  # (T, N, 1)
#     tensor_r = torch.tensor(rewards).float()  # (T, N)
    
#     return tensor_s, tensor_a, tensor_r 

# def compute_gae(tensor_r, values, gamma, lambda_):
#     """generalized advantage estimation (GAE) implementation"""
#     delta_t = tensor_r + gamma * values[1:] - values[:-1]
#     advantages = discount_cum_sum(delta_t, gamma * lambda_)
#     value_targets = advantages + values[:-1]
#     return value_targets, advantages

# class WalkerPolicy(nn.Module):
#     def __init__(self, state_dim=29, action_dim=8, fixed_sigma=None):
#         super().__init__()
#         # self.load_weights()  # load learned stored network weights after initialization

#         if fixed_sigma is not None:
#             self.sigma = torch.nn.Parameter(torch.tensor(fixed_sigma), requires_grad=False)
#         else:
#             self.sigma = torch.nn.Parameter(torch.tensor(0.01), requires_grad=True)  # Small constant as an exampl

#         # shared layers between the action and value heads
#         self.shared_layers = nn.Sequential(
#             nn.Linear(state_dim, 128),
#             nn.Tanh(),
#             nn.Linear(128, 64),
#             nn.ReLU(),
#             nn.Linear(64, 32),
#             nn.Tanh(),
#         )
        
#         # action head
#         self.action_layers = nn.Sequential(
#             nn.Linear(32, 128),
#             nn.Tanh(),
#             nn.Linear(128,64),
#             nn.ReLU(),
#             nn.Linear(64, 2*action_dim),
#         )

#         # value head
#         self.value_layers = nn.Sequential(
#             nn.Linear(32, 128),
#             nn.Tanh(),
#             nn.Linear(128,64),      
#             nn.ReLU(),
#             nn.Linear(64, 1),
#         )
        
#     def determine_actions(self, states):
#         """states is (N, state_dim) tensor, returns (N, action_dim) actions tensor. 
#         This function returns deterministic actions based on the input states. This would be used for control. """
#         params = self.action_layers(self.shared_layers(states))  # map states to distribution parameters
#         mu, _ = torch.chunk(params, 2, -1)  # split the parameters into mean and std, return mean
#         return mu
    
#     # TODO: implement a determine_actions() function mapping from (N, state_dim) states into (N, action_dim) actions

#     def save_weights(self, path='walker_weights.pt'):
#         # helper function to save your network weights
#         torch.save(self.state_dict(), path)

#     def load_weights(self, path='walker_weights.pt'):
#         # helper function to load your network weights
#         self.load_state_dict(torch.load(path))
    
#     def sample_actions(self, states):
#         """states is (T, N, state_dim) tensor, returns (T, N, action_dim) actions tensor. 
#         This function returns probabilistically sampled actions. This would be used for training the policy."""
#         params = self.action_layers(self.shared_layers(states))  # map states to distribution parameters
#         mu, sigma = torch.chunk(params, 2, -1)  # split the parameters into mean and std
#         # sigma = torch.nn.functional.softplus(self.sigma) + 0.001  # make sure std is positive
#         sigma = torch.nn.functional.softplus(sigma)
#         distribution = Normal(mu, sigma)  # create distribution of size (T, N, action_dim)
#         actions = distribution.sample()  # sample actions
#         return actions
    
#     def log_prob(self, actions, states):
#         """states is (T, N, state_dim) tensor. actions is (T, N, action_dim) tensor.
#         This function returns the log-probabilities of the actions given the states. $\log \pi_\theta(a_t | s_t)$"""
#         params = self.action_layers(self.shared_layers(states))  # map states to distribution parameters
#         mu, sigma = torch.chunk(params, 2, -1)  # split the parameters into mean and std
#         # sigma = torch.nn.functional.softplus(self.sigma) + 0.001  # make sure std is positive
#         sigma = torch.nn.functional.softplus(sigma)
#         distribution = Normal(mu, sigma)  # create distribution of size (T, N, action_dim)
#         logp = distribution.log_prob(actions)
#         if len(logp.shape) == 3 and logp.shape[2] > 1:  # this allows generalization to multi-dim action spaces
#             logp = logp.sum(dim=2, keepdim=True)  # sum over the action dimension
#         return logp
    
#     def value_estimates(self, states):
#         """states is (T, N, state_dim) tensor, returns (T, N) values tensor. Useful for value estimation during training."""
#         return self.value_layers(self.shared_layers(states)).squeeze()
    
# # training parameters
# N = 32
# T = 128
# config = {'N': N, 'vis': 1,'reward_fcn':walker_reward}
# epochs = 500
# lr = 0.003
# gamma=0.99
# epsilon = 0.2

# sgd_iters = 5

# # policy, environment and optimizer
# pi = WalkerPolicy()#fixed_sigma=0.05)
# train_env = WalkerEnv(config)

# # optim = torch.optim.SGD(pi.parameters(), lr=lr)
# optim = torch.optim.Adam(pi.parameters(),lr=lr)

# mean_rewards, p_losses, v_losses = np.zeros(epochs), np.zeros(epochs), np.zeros(epochs)  # for logging mean rewards over epochs
# for epoch in range(epochs):
#     tensor_s, tensor_a, tensor_r = sample_trajectories(train_env, pi, T)  # collect trajectories using current policy
        
#     with torch.no_grad():  # compute the old probabilities
#         logp_old = pi.log_prob(tensor_a, tensor_s[:T]).squeeze(2)  # compute log(pi(a_t | s_t))
    
#     for i in range(sgd_iters):  # we can even do multiple gradient steps
#         values = pi.value_estimates(tensor_s)  # estimate value function for all states 
#         logp = pi.log_prob(tensor_a, tensor_s[:T]).squeeze(2)  # compute log(pi(a_t | s_t))

#         with torch.no_grad():  # no need for gradients when computing the advantages and value targets
#             # value_targets, advantage_estimates = compute_advantage_estimates(tensor_r, values, gamma, bootstrap=True)
#             value_targets, advantage_estimates = compute_gae(tensor_r, values, gamma, lambda_=0.97)
#             advantage_estimates = (advantage_estimates - advantage_estimates.mean()) / advantage_estimates.std()  # normalize advantages
            
#         L_v = value_loss(values[:T], value_targets)  # add the value loss
        
#         p_ratios = torch.exp(logp - logp_old)  # compute the ratios r_\theta(a_t | s_t)
#         L_ppo = ppo_loss(p_ratios, advantage_estimates, epsilon=epsilon)  # compute the policy gradient loss
#         total_loss = L_v + L_ppo
        
#         optim.zero_grad()
#         total_loss.backward()  # backprop and gradient step
#         optim.step()
    
#     if epoch % 10 == 0:
#         print('Epoch %d, mean reward: %.3f, value loss: %.3f' % (epoch, tensor_r.mean(), L_v.item()))
#     mean_rewards[epoch] = tensor_r.mean()
#     v_losses[epoch] = L_v.item()
#     p_losses[epoch] = L_ppo.item()
    
# train_env.close()


### 5.30

In [12]:
from solution import discount_cum_sum
from solution import policy_gradient_loss_advantages, value_loss

def compute_advantage_estimates(tensor_r, values, gamma, bootstrap=False):
    """given reward tensor (T, N), value estimates tensor (T+1, N) and gamma scalar"""
    if bootstrap:  # use last value estimates as a return estimate
        terminal_value_estimates = values[-1].unsqueeze(0)  # values of the last states (1, N)
        rs_v = torch.cat((tensor_r, terminal_value_estimates), dim=0)
        value_targets = discount_cum_sum(rs_v, gamma)[:-1]
    else:
        value_targets = discount_cum_sum(tensor_r, gamma)
    advantages = value_targets - values[:-1]
    return value_targets, advantages

def compute_gae(tensor_r, values, gamma, lambda_):
    """generalized advantage estimation (GAE) implementation"""
    delta_t = tensor_r + gamma * values[1:] - values[:-1]
    advantages = discount_cum_sum(delta_t, gamma * lambda_)
    value_targets = advantages + values[:-1]
    return value_targets, advantages

def sample_trajectories(env, pi, T):    
    """given an environment env, a stochastic policy pi and number of timesteps T, interact with the environment for T steps 
    using actions sampled from policy. Return torch tensors of collected states, actions and rewards"""
    states = np.zeros((T + 1, N, env.num_states), dtype=float)  # states from s(0) to s(T+1)
    actions = np.zeros((T, N, env.num_actions), dtype=float)  # actions from a(0) to a(T)
    rewards = np.zeros((T, N), dtype=float)  # rewards from r(0) to r(T)
    
    s = env.vector_reset()
    states[0] = s
    for t in range(T):
        a = pi.sample_actions(torch.tensor(states[t]).float())  # policy needs float torch tensor (N, state_dim)
        s_next, r = env.vector_step(np.array(a))  # env needs numpy array of (Nx1)
        states[t + 1], actions[t], rewards[t] = s_next, a, r    
        
    tensor_s = torch.tensor(states).float()  # (T+1, N, state_dim)  care for the extra timestep at the end!
    tensor_a = torch.tensor(actions).float()  # (T, N, 1)
    tensor_r = torch.tensor(rewards).float()  # (T, N)
    
    return tensor_s, tensor_a, tensor_r 

class WalkerPolicy(nn.Module):
    def __init__(self, state_dim=29, action_dim=8):
        super(WalkerPolicy, self).__init__()
        
        # shared layers between the action and value heads
        self.shared_layers = nn.Sequential(
            nn.Linear(state_dim, 32),
            nn.Tanh(),
        )
        
        # action head
        self.action_layers = nn.Sequential(
            nn.Linear(32, 64),
            nn.Tanh(),
            nn.Linear(64, 2*action_dim),
        )

        # value head
        self.value_layers = nn.Sequential(
            nn.Linear(32, 64),
            nn.Tanh(),
            nn.Linear(64, 1),
        )
        # self.load_weights()  # load learned stored network weights after initialization
        
    def determine_actions(self, states):
        """states is (N, state_dim) tensor, returns (N, action_dim) actions tensor. 
        This function returns deterministic actions based on the input states. This would be used for control. """
        params = self.action_layers(self.shared_layers(states))  # map states to distribution parameters
        mu, _ = torch.chunk(params, 2, -1)  # split the parameters into mean and std, return mean
        return mu
    
    def save_weights(self, path='walker_weights.pt'):
        # helper function to save your network weights
        torch.save(self.state_dict(), path)

    def load_weights(self, path='walker_weights.pt'):
        # helper function to load your network weights
        self.load_state_dict(torch.load(path))

    def sample_actions(self, states):
        """states is (T, N, state_dim) tensor, returns (T, N, action_dim) actions tensor. 
        This function returns probabilistically sampled actions. This would be used for training the policy."""
        params = self.action_layers(self.shared_layers(states))  # map states to distribution parameters
        mu, sigma = torch.chunk(params, 2, -1)  # split the parameters into mean and std
        sigma = torch.nn.functional.softplus(sigma)  # make sure std is positive
        distribution = Normal(mu, sigma)  # create distribution of size (T, N, action_dim)
        actions = distribution.sample()  # sample actions
        return actions
    
    def log_prob(self, actions, states):
        """states is (T, N, state_dim) tensor. actions is (T, N, action_dim) tensor.
        This function returns the log-probabilities of the actions given the states. $\log \pi_\theta(a_t | s_t)$"""
        params = self.action_layers(self.shared_layers(states))  # map states to distribution parameters
        mu, sigma = torch.chunk(params, 2, -1)  # split the parameters into mean and std
        sigma = torch.nn.functional.softplus(sigma)  # make sure std is positive
        distribution = Normal(mu, sigma)  # create distribution of size (T, N, action_dim)
        logp = distribution.log_prob(actions)
        if len(logp.shape) == 3 and logp.shape[2] > 1:  # this allows generalization to multi-dim action spaces
            logp = logp.sum(dim=2, keepdim=True)  # sum over the action dimension
        return logp
    
    def value_estimates(self, states):
        """states is (T, N, state_dim) tensor, returns (T, N) values tensor. Useful for value estimation during training."""
        return self.value_layers(self.shared_layers(states)).squeeze()

# training parameters
N = 32
T = 128
config = {'N': N, 'vis': 1}
epochs = 200
lr = 0.01
gamma=0.95

# policy, environment and optimizer
pi = WalkerPolicy()
train_env = WalkerEnv(config)
# optim = torch.optim.SGD(pi.parameters(), lr=lr)
optim = torch.optim.Adam(pi.parameters(), lr=lr)

mean_rewards, p_losses, v_losses = np.zeros(epochs), np.zeros(epochs), np.zeros(epochs)  # logging
for epoch in range(epochs):
    tensor_s, tensor_a, tensor_r = sample_trajectories(train_env, pi, T)  # collect trajectories using current policy
    
    values = pi.value_estimates(tensor_s)  # estimate value function for all states 
    logp = pi.log_prob(tensor_a, tensor_s[:T]).squeeze(2)  # compute log(pi(a_t | s_t))

    with torch.no_grad():  # no need for gradients when computing the advantages and value targets
        value_targets, advantage_estimates = compute_advantage_estimates(tensor_r, values, gamma, bootstrap=True)
        # value_targets, advantage_estimates = compute_gae(tensor_r, values, gamma, lambda_=0.99)

    L_pg = policy_gradient_loss_advantages(logp, advantage_estimates)  # compute the policy gradient loss
    L_v = value_loss(values[:T], value_targets)  # add the value loss
    total_loss = L_pg + L_v
    
    optim.zero_grad()
    total_loss.backward()  # backprop and gradient step
    optim.step()
    
    if epoch % 10 == 0:
        print('Epoch %d, mean reward: %.3f, value loss: %.3f' % (epoch, tensor_r.mean(), L_v.item()))
    mean_rewards[epoch] = tensor_r.mean()
    v_losses[epoch] = L_v.item()
    p_losses[epoch] = L_pg.item()

train_env.close()
    
# plot_training(mean_rewards, p_losses, v_losses)

Environment ready





Epoch 0, mean reward: -0.032, value loss: 15.924
Epoch 10, mean reward: 0.034, value loss: 30.399
Epoch 20, mean reward: 0.058, value loss: 62.763
Epoch 30, mean reward: 0.044, value loss: 47.541
Epoch 40, mean reward: 0.115, value loss: 22.554
Epoch 50, mean reward: 0.070, value loss: 18.800
Epoch 60, mean reward: 0.042, value loss: 13.170
Epoch 70, mean reward: 0.042, value loss: 12.338
Epoch 80, mean reward: 0.094, value loss: 17.915
Epoch 90, mean reward: 0.088, value loss: 25.348
Epoch 100, mean reward: 0.044, value loss: 28.715
Epoch 110, mean reward: 0.150, value loss: 28.737
Epoch 120, mean reward: 0.079, value loss: 50.370
Epoch 130, mean reward: -0.029, value loss: 39.567
Epoch 140, mean reward: -0.009, value loss: 52.795
Epoch 150, mean reward: -0.046, value loss: 42.176
Epoch 160, mean reward: -0.120, value loss: 35.987
Epoch 170, mean reward: 0.106, value loss: 19.057
Epoch 180, mean reward: -0.032, value loss: 42.672
Epoch 190, mean reward: 0.045, value loss: 61.032


### 5.08

In [13]:
# import torch
# from torch import nn
# from torch.distributions.normal import Normal
# from solution import policy_gradient_loss_simple
# 
# 
# def walker_reward(state, action):
#     """reward function for the walker environment, state is [29] vector, action is [8] vector"""
#     pos = state[:15]  # first 15 elements of state vector are generalized coordinates [xyz, quat, joint_angles]
#     vel = state[15:]  # last 14 elements of state vector are generalized velocities [xyz_vel, omega, joint_velocities]
#     
#     return pos[0] + vel[0] # return the x velocity as the reward by default
# 
# class WalkerPolicy(nn.Module):
#     def __init__(self, state_dim=29, action_dim=8):
#         super().__init__()
#         # self.load_weights()  # load learned stored network weights after initialization
# 
#         self.network = nn.Sequential(
#             nn.Linear(state_dim, 128),
#             nn.Tanh(),
#             nn.Linear(128, 64),
#             nn.Tanh(),
#             nn.Linear(64, 2*action_dim),
#         )
# 
#     def determine_actions(self, states):
#         """states is (N, state_dim) tensor, returns (N, action_dim) actions tensor. 
#         This function returns deterministic actions based on the input states. This would be used for control. """
#         params = self.network(states)  # map states to distribution parameters
#         mu, _ = torch.chunk(params, 2, -1)  # split the parameters into mean and std, return mean
#         return mu
#     
#     def save_weights(self, path='walker_weights.pt'):
#         # helper function to save your network weights
#         torch.save(self.state_dict(), path)
# 
#     def load_weights(self, path='walker_weights.pt'):
#         # helper function to load your network weights
#         self.load_state_dict(torch.load(path))
#         
#     def sample_actions(self, states):
#         """states is (T, N, state_dim) tensor, returns (T, N, action_dim) actions tensor. 
#         This function returns probabilistically sampled actions. This would be used for training the policy."""
#         params = self.network(states)  # map states to distribution parameters
#         mu, sigma = torch.chunk(params, 2, -1)  # split the parameters into mean and std
#         sigma = torch.nn.functional.softplus(sigma)  # make sure std is positive
#         distribution = Normal(mu, sigma)  # create distribution of size (T, N, action_dim)
#         actions = distribution.sample()  # sample actions
#         return actions
# 
#     def log_prob(self, actions, states):
#         """states is (T, N, state_dim) tensor. actions is (T, N, action_dim) tensor.
#         This function returns the log-probabilities of the actions given the states. $\log \pi_\theta(a_t | s_t)$"""
#         params = self.network(states)  # map states to distribution parameters
#         mu, sigma = torch.chunk(params, 2, -1)  # split the parameters into mean and std
#         sigma = torch.nn.functional.softplus(sigma)  # make sure std is positive
#         distribution = Normal(mu, sigma)  # create distribution of size (T, N, action_dim)
#         logp = distribution.log_prob(actions)
#         if len(logp.shape) == 3 and logp.shape[2] > 1:  # this allows generalization to multi-dim action spaces
#             logp = logp.sum(dim=2, keepdim=True)  # sum over the action dimension
#         return logp
#     
# def sample_trajectories(env, pi, T):    
#     """given an environment env, a stochastic policy pi and number of timesteps T, interact with the environment for T steps 
#     using actions sampled from policy. Return torch tensors of collected states, actions and rewards"""
#     states = np.zeros((T + 1, N, env.num_states), dtype=float)  # states from s(0) to s(T+1)
#     actions = np.zeros((T, N, env.num_actions), dtype=float)  # actions from a(0) to a(T)
#     rewards = np.zeros((T, N), dtype=float)  # rewards from r(0) to r(T)
#     
#     s = env.vector_reset()
#     states[0] = s
#     for t in range(T):
#         a = pi.sample_actions(torch.tensor(states[t]).float())  # policy needs float torch tensor (N, state_dim)
#         s_next, r = env.vector_step(np.array(a))  # env needs numpy array of (Nx1)
#         states[t + 1], actions[t], rewards[t] = s_next, a, r    
#         
#     tensor_s = torch.tensor(states).float()  # (T+1, N, state_dim)  care for the extra timestep at the end!
#     tensor_a = torch.tensor(actions).float()  # (T, N, 1)
#     tensor_r = torch.tensor(rewards).float()  # (T, N)
#     
#     return tensor_s, tensor_a, tensor_r
# 
# # training parameters
# N = 32
# T = 128
# config = {'N': N, 'vis': 1, "reward_fcn":walker_reward}
# epochs = 500
# lr = 0.01
# gamma=0.99
# 
# # policy, environment and optimizer
# pi = WalkerPolicy()
# train_env = WalkerEnv(config)
# # optim = torch.optim.SGD(pi.parameters(), lr=lr)
# optim = torch.optim.Adam(pi.parameters(), lr=lr)
# 
# mean_rewards, p_losses = np.zeros(epochs), np.zeros(epochs)
# for epoch in range(epochs):
#     tensor_s, tensor_a, tensor_r = sample_trajectories(train_env, pi, T)  # collect trajectories using current policy
# 
#     logp = pi.log_prob(tensor_a, tensor_s[:T]).squeeze(2)  # compute log(pi(a_t | s_t))
#     loss = policy_gradient_loss_discounted(logp, tensor_r, gamma)  # compute the policy gradient loss
#     
#     optim.zero_grad()
#     loss.backward()  # backprop and gradient step
#     optim.step()
#     
#     if epoch % 10 == 0:
#         print('Epoch %d, mean reward: %.3f' % (epoch, tensor_r.mean()))
#     mean_rewards[epoch] = tensor_r.mean()
#     p_losses[epoch] = loss.item()
#     
# train_env.close()
# 
# pi.save_weights()
# # plot_training(mean_rewards, p_losses)

### 3.42 m:

In [14]:
# import torch
# from torch import nn
# from torch.distributions.normal import Normal
# from solution import policy_gradient_loss_simple

# class WalkerPolicy(nn.Module):
#     def __init__(self, state_dim=29, action_dim=8):
#         super().__init__()
#         # self.load_weights()  # load learned stored network weights after initialization

#         self.network = nn.Sequential(
#             nn.Linear(state_dim, 128),
#             nn.Tanh(),
#             nn.Linear(128, 64),
#             nn.Tanh(),
#             nn.Linear(64, 2*action_dim),
#         )

#     def determine_actions(self, states):
#         """states is (N, state_dim) tensor, returns (N, action_dim) actions tensor. 
#         This function returns deterministic actions based on the input states. This would be used for control. """
#         params = self.network(states)  # map states to distribution parameters
#         mu, _ = torch.chunk(params, 2, -1)  # split the parameters into mean and std, return mean
#         return mu
    
#     def save_weights(self, path='walker_weights.pt'):
#         # helper function to save your network weights
#         torch.save(self.state_dict(), path)

#     def load_weights(self, path='walker_weights.pt'):
#         # helper function to load your network weights
#         self.load_state_dict(torch.load(path))
        
#     def sample_actions(self, states):
#         """states is (T, N, state_dim) tensor, returns (T, N, action_dim) actions tensor. 
#         This function returns probabilistically sampled actions. This would be used for training the policy."""
#         params = self.network(states)  # map states to distribution parameters
#         mu, sigma = torch.chunk(params, 2, -1)  # split the parameters into mean and std
#         sigma = torch.nn.functional.softplus(sigma)  # make sure std is positive
#         distribution = Normal(mu, sigma)  # create distribution of size (T, N, action_dim)
#         actions = distribution.sample()  # sample actions
#         return actions

#     def log_prob(self, actions, states):
#         """states is (T, N, state_dim) tensor. actions is (T, N, action_dim) tensor.
#         This function returns the log-probabilities of the actions given the states. $\log \pi_\theta(a_t | s_t)$"""
#         params = self.network(states)  # map states to distribution parameters
#         mu, sigma = torch.chunk(params, 2, -1)  # split the parameters into mean and std
#         sigma = torch.nn.functional.softplus(sigma)  # make sure std is positive
#         distribution = Normal(mu, sigma)  # create distribution of size (T, N, action_dim)
#         logp = distribution.log_prob(actions)
#         if len(logp.shape) == 3 and logp.shape[2] > 1:  # this allows generalization to multi-dim action spaces
#             logp = logp.sum(dim=2, keepdim=True)  # sum over the action dimension
#         return logp
    
# def sample_trajectories(env, pi, T):    
#     """given an environment env, a stochastic policy pi and number of timesteps T, interact with the environment for T steps 
#     using actions sampled from policy. Return torch tensors of collected states, actions and rewards"""
#     states = np.zeros((T + 1, N, env.num_states), dtype=float)  # states from s(0) to s(T+1)
#     actions = np.zeros((T, N, env.num_actions), dtype=float)  # actions from a(0) to a(T)
#     rewards = np.zeros((T, N), dtype=float)  # rewards from r(0) to r(T)
    
#     s = env.vector_reset()
#     states[0] = s
#     for t in range(T):
#         a = pi.sample_actions(torch.tensor(states[t]).float())  # policy needs float torch tensor (N, state_dim)
#         s_next, r = env.vector_step(np.array(a))  # env needs numpy array of (Nx1)
#         states[t + 1], actions[t], rewards[t] = s_next, a, r    
        
#     tensor_s = torch.tensor(states).float()  # (T+1, N, state_dim)  care for the extra timestep at the end!
#     tensor_a = torch.tensor(actions).float()  # (T, N, 1)
#     tensor_r = torch.tensor(rewards).float()  # (T, N)
    
#     return tensor_s, tensor_a, tensor_r

# # training parameters
# N = 32
# T = 128
# config = {'N': N, 'vis': 1}
# epochs = 500
# lr = 0.01

# # policy, environment and optimizer
# pi = WalkerPolicy()
# train_env = WalkerEnv(config)
# optim = torch.optim.SGD(pi.parameters(), lr=lr)

# mean_rewards, p_losses = np.zeros(epochs), np.zeros(epochs)  # for logging mean rewards over epochs
# for epoch in range(epochs):
#     tensor_s, tensor_a, tensor_r = sample_trajectories(train_env, pi, T)  # collect trajectories using current policy
    
#     logp = pi.log_prob(tensor_a, tensor_s[:T]).squeeze(2)  # compute log(pi(a_t | s_t))
#     loss = policy_gradient_loss_simple(logp, tensor_r)  # compute the policy gradient loss
    
#     optim.zero_grad()
#     loss.backward()  # backprop and gradient step
#     optim.step()
    
#     if epoch % 10 == 0:
#         print('Epoch %d, mean reward: %.3f' % (epoch, tensor_r.mean()))
#     mean_rewards[epoch] = tensor_r.mean()
#     p_losses[epoch] = loss.item()
    
# train_env.close()

# # pi.save_weights()
# # plot_training(mean_rewards, p_losses)

### Testing

In [15]:
# config = {'N': 1, 'vis': 1}
# env = WalkerEnv(config)

In [16]:
# obs = env.vector_reset()
# env.render()

In [17]:
# obs[0, 0]  # this is the x coordinate of the robot, we want to maximize this

In [18]:
# T = 512
# for i in range(512):
    # tensor_s, tensor_a, tensor_r = sample_trajectories(train_env, pi, T)  # collect trajectories using
    # values = pi.value_estimates(tensor_s)  # estimate value function for all states
    # logp = pi.log_prob(tensor_a, tensor_s[:T]).squeeze(2)  # compute log(pi(a_t | s_t))
    
#     a = np.random.randn(1, 8)
#     obs, reward = env.vector_step(a)
# env.close()

In [20]:
test_policy(pi)

Environment ready


  actions = pi.determine_actions(torch.tensor(s).float())  # use deterministic actions based on the states



0.15915431915709632