In [24]:
# step 1, generate trajectories
'''
As input, we will need:
    - the gym environment
    - a policy "pi" which takes in the state and outputs an action
As output, we return:
    - the trajectory of (s, a, r, a_prob) tuples
'''
import numpy
def generate_trajectory(env, pi, render_last_step: bool=False ):
    episode = []
    s = env.reset()
    a, a_prob = pi(torch.tensor(s))
    done = False
    
    while not done:
        s_p, r, done, info = env.step(torch.argmax(a)[0])
        episode.append((s, a, r, a_prob))
        s = s_p
        a, a_prob = pi(torch.tensor(s_p))
        if done:
            env.render()

    return episode

In [25]:
# step 2, estimate the advantage of the trajectory
'''
Note: Advantage = A(a, s) = sum(r_t + gamma^(t+1) * V(s_t+1) - V(s_t)) where t = 0,1,2,...,T-1, and T = len(trajectory)
As input, we will need:
    - the trajectory, list of (s, a, r) tuples
    - gamma (discount)
    - value function V which takes in the state and outputs a scalar value
As output, we return:
    - advantage estimate per time step
'''
from typing import List
def advantage_estimates(trajectory, gamma : float, V) -> List[float]:
    advantages = []
    for t in range(0, len(trajectory)-1):
        s_t, a_t, r_t, _ = trajectory[t]
        s_t_1, _, _, _ = trajectory[t+1]
        advantages.append(r_t + gamma * V(s_t_1) - V(s_t))
    return advantages

In [26]:
# step 3, sample estimates for objective L_theta_k and KL-Divergence constraint H using advantage estimates
'''
Single path sample estimates for L_theta_k and H
Equations:
    - g_k = gradient_theta(L_theta_k(theta))
    - L_theta_k = sum(discounted future rewards -> just use advantage), where s_0~p_0, a~q, q(a|s) = policy_theta_k(a|s)
    - H = FIM = sum(KL_divergence(pi_theta_old(*|s_n) || pi_theta(*|s_n)))
Inputs:
    - policy_theta_k
    - DONT need action distribution (for single path, this will be equivalent to our policy)
    - number of samples
    - to pass along
        - env (calling generate trajectory)
        - V (calling advantage estimate)
        - gamma (calling advantage estimate)
Outputs:
    - estimated objective L_theta_k(theta)
    - estimated constraint KL divergence H 
'''
import torch
def single_path_sample_estimator(pi, env, V, gamma, num_samples: int):
    # set of trajectories D_k
    D_k = []
    # set of Advantages
    A_k = []
    for i in range(num_samples):
        trajectory = generate_trajectory(env, pi)
        D_k.append(trajectory)
        A_k.append(advantage_estimates(trajectory, gamma))
    # estimate policy gradient
    g_k = None
    for traj_idx in range(len(D_k)):
        episode = D_k[traj_idx]
        adv = A_k[traj_idx]
        for t in range(len(episode)):
            s, a, r, a_prob = episode[t]
            if g_k ==None:
                g_k = torch.zeros(a_prob.grad.shape)
            g_k += a_prob.grad * torch.tensor(adv[t])
           
    return g_k

In [27]:
'''
Runner
'''
# first, let us create our inputs
import gym 
from nn import PolicyNN, ValueNN

# lets get our environment
env = gym.make("CartPole-v1")

# Policy parameters (NN) = "theta". 2 hidden layers w/ 32 nodes. softmax output for each action
theta = PolicyNN(
    env.observation_space.shape[0],
    2,
    32,
    env.action_space.n
    )

# Value function parameters (NN) = "V". outputs value of state (scalar)
V = ValueNN(
    env.observation_space.shape[0],
    2,
    32,
    1
)
GAMMA = 1
NUM_SAMPLES = 1
for _ in range(1):
    print(single_path_sample_estimator(theta, env, V, GAMMA, NUM_SAMPLES))


  action_probs = F.softmax(actions)    # linear output


AssertionError: tensor(0) (<class 'torch.Tensor'>) invalid