# Actor Critic

In [None]:
%matplotlib inline
!pip install "git+https://github.com/NoneSince/actor_critic_tutorial"
!pip install numpy
!pip install matplotlib
!pip install scipy
!pip install torch

## The environment for this tutorial
In this tutorial, *like the policy gradient tutorial*, we are going to work on the gymnasium Pendulum envrionment: https://gymnasium.farama.org/environments/classic_control/pendulum/

<img src="https://raw.githubusercontent.com/NoneSince/actor_critic_tutorial/master/assets/pendulum.gif" style="height:200px">


In this envrionment, the agents goal is to level the pendulum so it will face up, and keep it there.

The agents obseves 3 dimensional vector of the pendulum free end position (x,y) and it's angular velocity:

<img src="https://raw.githubusercontent.com/NoneSince/actor_critic_tutorial/master/assets/pendulum_state.png" style="height:150px">

It's action is one element, which is what toque (force) to apply on the pendulum.

<img src="https://raw.githubusercontent.com/NoneSince/actor_critic_tutorial/master/assets/pendulum_action.png" style="height:70px">

And the reward function is negative (penalty) for how far the pendulum is from being up, for it's angular velocity, and for the torque the agent applies.
The reward in each time step can be between -16.27 to 0.

<img src="https://raw.githubusercontent.com/NoneSince/actor_critic_tutorial/master/assets/pendulum_reward.png" style="height:35px">

At each episode, the pendulum starts at a random state, and the agent plays for a fixed number of steps $T=200$


---
# Reminder from policy gradient tutorial:

## Policy Gradient Theorem
It turns out (and it's not that obvious) that the gradient of the return with respect to the policy parameter can be computed.
The proof is not trivial, but the result is

<img src="https://raw.githubusercontent.com/NoneSince/actor_critic_tutorial/master/assets/pg_theorem.png" style="height:80px">

## Parameterized Policy
A parametrized policy can be any function with parameters that has an input of a state, outputs a distribution function over actions, and has learnable parameters.

We will use this linear policy parametrization:

<img src="https://raw.githubusercontent.com/NoneSince/actor_critic_tutorial/master/assets/policy_func.jpg" style="height:100px">

<img src="https://raw.githubusercontent.com/NoneSince/actor_critic_tutorial/master/assets/REINFORCE.png" style="height:500px">

---
# Actor–Critic

<img src="https://raw.githubusercontent.com/NoneSince/actor_critic_tutorial/master/assets/general_policy_gradient_algorithm.jpg" style="height:400px">

This lets us do more evaluations and approximations besides the policy PI!

These evaluated functions can help us choose the step size of "gradient log likelyhood", normalize the values, or subtract a factor for our rewards

As an example:

<img src="https://raw.githubusercontent.com/NoneSince/actor_critic_tutorial/master/assets/reinforce_with_baseline.jpg" style="height:400px">

Here we introduce a TD-error-like parameter. we hold a state-value function for that, and since we update the policy in a monti-carlo approach already, we also update the state-value in a monti-carlo approach.

REINFORCE with baseline is not considered an actor-critic method because its state-value function is only used as a baseline, not a critic. In other words, a state-evaluation is called a "critic" if it uses the estimation of the subsequent states to update the estimation of the current state (aka bootstrapping)

With bootstrapping we introduce a bias and an asymptotic dependence on the quality of the function approximation, but this is often beneficial because it reduces the variance.

Actor–Critic reinforcement learning methods are policy search algorithms, where the agent learns to map the state to two outputs:

1. Recommended action: A probability value for each action in the action space.
In short, this is "Actor": the policy

2. Estimated rewards in the future: Sum of all rewards it expects to receive in the future.
In short, this is "Critic": the state-value function V, or action-value function Q

in more formal way,
* The actor corresponds to a conventional action-selection policy, mapping states to actions in a probabilistic manner.
* The critic corresponds to a conventional state-value function, mapping states to expected cumulative future reward.

Thus, the critic addresses a problem of prediction, whereas the actor is concerned with control.

These problems are separable, but are solved simultaneously to find an optimal policy


The main concept:

<img src="https://raw.githubusercontent.com/NoneSince/actor_critic_tutorial/master/assets/actor_critic_scheme.jpg" style="height:500px">


The actor-critic faimly:

<img src="https://raw.githubusercontent.com/NoneSince/actor_critic_tutorial/master/assets/actor_critic_template.jpg" style="height:500px">

Note the difference between REINFORCE-with-baseline and the actor-critic family:
the policy and value are being updated as we are going in the episode. we don't need to wait a full episode to start evaluating.

*   model based/free?
*   on/off policy?
*   online/offline?

Limitations of Actor-Critic Methods
* High Variance: The actor-critic algorithm uses the observed reward signal to update the policy and value function. This approach can lead to high variance in the estimates, especially when the reward signal is sparse or noisy.
* Slow Convergence: The actor-critic algorithm is a model-free reinforcement learning algorithm, which means that it does not use a model of the environment. This makes it slower to converge compared to model-based methods.
* Function Approximation Error: The actor and critic networks in the actor-critic algorithm are typically implemented as neural networks that approximate the policy and value function, respectively. The approximation error in these networks can affect the quality of the learned policy and value function.
* Sensitivity to Hyperparameters: The actor-critic algorithm is sensitive to the choice of hyperparameters such as the learning rates for the actor and critic, the discount factor, and the architecture of the neural networks. Choosing the right hyperparameters is important for the success of the algorithm, but it can be difficult in practice.
* Non-stationarity: The environment in reinforcement learning is non-stationary, meaning that the transition probabilities and rewards can change over time. This can make it difficult for the actor-critic algorithm to learn the optimal policy, especially if the changes are sudden or large.

The algorithm we have chosen:

<img src="https://raw.githubusercontent.com/NoneSince/actor_critic_tutorial/master/assets/one_step_actor_critic.jpg" style="height:500px">

note: *I* is literally gamma to the power of t, not some parameter

As seen above, we need to approximate the policy and the state-value.

We will use this linear policy parametrization:

<img src="https://raw.githubusercontent.com/NoneSince/actor_critic_tutorial/master/assets/policy_func.jpg" style="height:300px">

Why do so for PI? So we can easily compute the gradient using the chain rule and the gradient of a linear function:

<img src="https://raw.githubusercontent.com/NoneSince/actor_critic_tutorial/master/assets/log_grad.png" style="height:300px">


And will use this linear value parametrization:

<img src="https://raw.githubusercontent.com/NoneSince/actor_critic_tutorial/master/assets/value_func.jpg" style="height:200px">

Why do so for V? Just to showcase the simplest case, note that no GRAD(LOG(V)) required.


## Code Section

In [None]:
import gymnasium as gym
from pg_tutorial.utils import evaluate_agent, visualize_policy
from pg_tutorial.utils import ActivePlotter
from pg_tutorial.PendulumNNPolicy import PendulumNNPolicy

%matplotlib inline

# for simplicity and time saving, we will make our environment easier by reducing gravity
game = "Pendulum-v1"
gravity = 1.0
env = gym.make(game, render_mode="rgb_array", g=gravity)

In [None]:
import numpy as np

class PendulumVanillaPolicy:
    def __init__(self):
        self.theta_1 = np.random.uniform(-1, 1)
        self.theta_2 = np.random.uniform(-1, 1)
        self.theta_3 = np.random.uniform(-1, 1)

        self.omega_1 = np.random.uniform(-1, 1)
        self.omega_2 = np.random.uniform(-1, 1)
        self.omega_3 = np.random.uniform(-1, 1)

    def get_action(self, state):
        # action is deterministic for this policy:
        return np.array([self.theta_1 * state[0] + self.theta_2 * state[1] + self.theta_3*state[2]])
    def get_value(self, state):
        return np.array([self.omega_1 * state[0] + self.omega_2 * state[1] + self.omega_3*state[2]])

    def gradient(self, state):
        return np.array([state[0], state[1], state[2]])

    def grad_log_likelihood(self, state):
        # compute grad (log(pi(s)) according to the chain rule:
        return (1/self.get_action(state)) * self.gradient(state)
    def grad_value(self, state):
        return np.array([state[0], state[1], state[2]])

    def sample_action(self, state):
        # used by our algorithm, returns action and the log likelihood gradient of this action
        return self.get_action(state), self.grad_log_likelihood(state)

    def get_parameters_vector(self):
        return np.array([self.theta_1, self.theta_2, self.theta_3])
    def get_value_parameters_vector(self):
        return np.array([self.omega_1, self.omega_2, self.omega_3])

    def set_parameters_vector(self, parameters_vector):
        self.theta_1, self.theta_2, self.theta_3 = parameters_vector
    def set_value_parameters_vector(self, value_parameters_vector):
        self.omega_1, self.omega_2, self.omega_3 = value_parameters_vector

In [None]:
def one_step_actor_critic(env, policy, learning_rate=0.0002, value_learning_rate=0.0002, gamma=0.97, n_episodes=10000, plotter=None):
    mean_eval_rewards = []
    for episode in range(n_episodes):
        state, _ = env.reset()
        I = 1
        value_parameters = policy.get_value_parameters_vector()
        policy_parameters = policy.get_parameters_vector()
        terminated, truncated = False, False
        while (not terminated) and (not truncated):
            action, log_likelihood_grad = policy.sample_action(state)
            next_state, reward, terminated, truncated, info = env.step(action)

            if terminated:
                TD_error = reward - policy.get_value(state)
            else:
                TD_error = reward + gamma*policy.get_value(next_state) - policy.get_value(state)

            value_parameters = value_parameters + value_learning_rate * I * TD_error * policy.grad_value(state)
            policy.set_value_parameters_vector(value_parameters)

            policy_parameters = policy_parameters + learning_rate * I * TD_error * log_likelihood_grad
            policy.set_parameters_vector(policy_parameters)

            I = I*gamma
            state = next_state

        # for visualization only (not a part of the algorithm, and doesnt affect the parameters):
        if episode % 5 == 0 or episode==n_episodes-1:
            # add the agent's stats to a grapth or log them
            mean_reward = evaluate_agent(policy, env, n_episodes=5)
            mean_eval_rewards.append(mean_reward)
            if plotter is None:
                print(f"Episode {episode}: Evaluation mean accumulated reward = {mean_reward}")
            else:
                plotter.update_plot(episode, mean_reward)

    return mean_eval_rewards

In [None]:
import random
random.seed(42)
np.random.seed(42)
n_episodes=1000

env = gym.make(game, render_mode="rgb_array", g=gravity)
plotter = ActivePlotter(max_iteration=n_episodes,reward_range=(-1900, 0))
policy = PendulumVanillaPolicy()
vanilla_mean_eval_rewards = one_step_actor_critic(env, policy, gamma=0.97, learning_rate=0.0005, value_learning_rate=0.0005, n_episodes=n_episodes, plotter=plotter)

In [None]:
agent_evaluation = evaluate_agent(policy, env, n_episodes=1000) # way better than after only 100 episodes
print(f"agent evaluation mean reward:", agent_evaluation)

theta_params = policy.get_parameters_vector()
print(f"there are {len(theta_params)} parameters in this policy function")
print(theta_params)
omega_params = policy.get_value_parameters_vector()
print(f"there are {len(omega_params)} parameters in this state-value function")
print(omega_params)
visualize_policy(policy, env, n_episodes=2)

# TODO 1:

We provided you with another agent that approximates policy and state-value with a neural network, named *PendulumNNPolicy*

Run the new agent,for 1000 episode, and compare with the simple linear agent, with a discount factor of 0.97, and learning rates are 0.0005

the new agent needs from you the state dimension, the action dimension, and the max action (regaring the actions, it doesnt take min and max because it supposes  [0 to +m] U [-m to 0])

In [None]:
import random
random.seed(42)
np.random.seed(42)
plotter = ActivePlotter(max_iteration=n_episodes,reward_range=(-1900, 0))

### TODO: run ac with NN policy for 1000 episodes ###
# create "Pendulum-v1" env, and  NN policy:
# state_dim= ... # length of the state vector/scalar
# action_dim= ... # length of the actiion vector/scalar
# action_range= ... # the action values are from (0 to +range) or (0 to -range)
# NN_mean_eval_rewards = ... one_step_actor_critic() # uncomment and complete this

In [None]:
agent_evaluation = evaluate_agent(policy, env, n_episodes=1000)
print(f"agent evaluation mean reward:", agent_evaluation)

theta_params = policy.get_parameters_vector()
print(f"there are {len(theta_params)} parameters in this policy function")
print(theta_params)
omega_params = policy.get_value_parameters_vector()
print(f"there are {len(omega_params)} parameters in this state-value function")
print(omega_params)
visualize_policy(policy, env, n_episodes=2)

In [None]:
import matplotlib.pyplot as plt
plt.plot(NN_mean_eval_rewards, label="NN")
plt.plot(vanilla_mean_eval_rewards, label="Vanilla")
plt.legend()
plt.show()

# TODO 2:
As we know, there are two paradigms of updating the agent:

Online Learning/TD Learning: Updating the agent after every action taken

Monte Carlo: Updating the agent after an episode ends

Turns out, N step learning is the general case of these two: the agent takes N steps forwards, then estimates the value of the stated N steps back, based on the N rewards we has collected.

<img src="https://raw.githubusercontent.com/NoneSince/actor_critic_tutorial/master/assets/n_step_idea.jpg" style="height:500px">


<img src="https://raw.githubusercontent.com/NoneSince/actor_critic_tutorial/master/assets/n_step_totals.jpg" style="height:500px">

For some natural number N, the reward updates looks like this:

<img src="https://raw.githubusercontent.com/NoneSince/actor_critic_tutorial/master/assets/n_step_general_formula.jpg" style="height:200px">

N step learning is very importent in actor critic, because it makes the algorithm more stable and lower the variance.

so now, you gonna implement n-step learning by yourself. write the missing parts in this function:

In [None]:
from collections import deque
def n_step_actor_critic(env, policy, n=5, learning_rate=0.0002, value_learning_rate=0.0002, gamma=0.97, n_episodes=10000, plotter=None):
    mean_eval_rewards = []
    states=deque(maxlen=n)
    actions=deque(maxlen=n)
    rewards=deque(maxlen=n)
    next_states=deque(maxlen=n)
    dones=deque(maxlen=n)
    for episode in range(n_episodes):
        state, _ = env.reset()
        I = 1
        count=0
        value_parameters = policy.get_value_parameters_vector()
        policy_parameters = policy.get_parameters_vector()
        terminated, truncated = False, False
        while (not terminated) and (not truncated):
            count+=1
            action, log_likelihood_grad = policy.sample_action(state)
            next_state, reward, terminated, truncated, info = env.step(action)
            #store the current state,action, and reward
            states.append(state)
            actions.append(action)
            rewards.append(reward)
            next_states.append(next_state)
            dones.append(1 if terminated else 0)

            if terminated:
                count-=1
                TD_error=0

                #hint: exacly like the main n-step loop, but just teel count, not teel n.


                #
                value_parameters = value_parameters + value_learning_rate * I * TD_error * policy.grad_value(state)
                policy.set_value_parameters_vector(value_parameters)
                policy_parameters = policy_parameters + learning_rate * I * TD_error * log_likelihood_grad
                policy.set_parameters_vector(policy_parameters)
                states.popleft()
                actions.popleft()
                rewards.popleft()
                next_states.popleft()
            #n step loop
            #update just the first state!
            if(count==n):
              count-=1
              #compute TD error by the formula
              TD_error=0


              #calculate the parameters like in the original function


              #delete the oldest state
              states.popleft()
              actions.popleft()
              rewards.popleft()
              next_states.popleft()

            I = I*gamma
            state = next_state

        # for visualization only (not a part of the algorithm, and doesnt affect the parameters):
        if episode % 5 == 0 or episode==n_episodes-1:
            mean_reward = evaluate_agent(policy, env, n_episodes=5)
            mean_eval_rewards.append(mean_reward)
            if plotter is None:
                print(f"Episode {episode}: Evaluation mean accumulated reward = {mean_reward}")
            else:
                plotter.update_plot(episode, mean_reward)

    return mean_eval_rewards

Run the algorithm for n=5 with the simple vanilla agent and plot the results

In [None]:
import random
random.seed(42)
np.random.seed(42)
n_episodes=1000
plotter = ActivePlotter(max_iteration=n_episodes,reward_range=(-1900, 0))

# env = ...
# policy = ...
n=5
# vanilla_mean_eval_rewards = n_step_actor_critic(...)

In [None]:
agent_evaluation = evaluate_agent(policy, env, n_episodes=100)
print(f"agent evaluation mean reward:", agent_evaluation)

theta_params = policy.get_parameters_vector()
print(f"there are {len(theta_params)} parameters in this policy function")
print(theta_params)
omega_params = policy.get_value_parameters_vector()
print(f"there are {len(omega_params)} parameters in this state-value function")
print(omega_params)
visualize_policy(policy, env, n_episodes=2)

# TODO 3 - optional:
To remind you, there was an algorithm in-between:

<img src="https://raw.githubusercontent.com/NoneSince/actor_critic_tutorial/master/assets/reinforce_with_baseline.jpg" style="height:400px">

We want you to implement the algorithm, so we give you the REINFORCE to start from, along with some helper functions.

In [None]:
def collect_episode(env, policy):
    """
    collect one episode with the required data for reinforce, for each step return:
     - the reward
     - the gradient of the log likelihood for the action taken at the state
    each in a list in the length of the episode (we don't need more than that for reinforce)
    """
    episode_rewards, actions_log_likelihood_grads, states = [], [], []  # to be returned
    terminated, truncated = False, False
    state, _ = env.reset()
    while (not terminated) and (not truncated):
        action, log_likelihood_grad = policy.sample_action(state)
        state, reward, terminated, truncated, info = env.step(action)
        episode_rewards.append(reward)
        actions_log_likelihood_grads.append(log_likelihood_grad)
        states.append(state)

    return episode_rewards, actions_log_likelihood_grads, states

def compute_returns(episode_rewards, gamma):
    """ given reward collected for each step and a discount factor, compute the return for this step, should be efficient. """
    episode_returns = []
    current_return = 0
    for reward in episode_rewards[::-1]:
        current_return = reward + gamma * current_return
        episode_returns.insert(0, current_return)

    return episode_returns

def normalize_returns(episode_returns):
    """ normalize the returns for an episode for numerical stability """
    episode_returns = np.array(episode_returns)
    episode_returns = (episode_returns - episode_returns.mean()) / (episode_returns.std() + 1e-9)
    return episode_returns

In [None]:
def reinforce(env, policy, gamma=0.97, learning_rate=0.0005, n_episodes=100, plotter=None):
    mean_eval_rewards = []
    for episode in range(n_episodes):
        # collect one episode:
        episode_rewards, actions_log_likelihood_grads, _ = collect_episode(env, policy)

        # compute returns:
        episode_returns = compute_returns(episode_rewards, gamma)
        episode_returns = normalize_returns(episode_returns)

        # retrive parameters, and change them in the direction of the gradient:
        policy_parameters = policy.get_parameters_vector()
        for i in range(min(len(episode_returns), 150)):  # ignore the last 50 steps. Think why?
            ret = episode_returns[i]
            log_likelihood_grad = actions_log_likelihood_grads[i]

            policy_parameters = policy_parameters + learning_rate * ret * log_likelihood_grad
        # update the policy with the new parameters:
        policy.set_parameters_vector(policy_parameters)

        # for visualization only (not a part of the algorithm, and doesnt affect the parameters):
        if episode % 5 == 0 or episode==n_episodes-1:
            mean_reward = evaluate_agent(policy, env, n_episodes=5)
            mean_eval_rewards.append(mean_reward)
            if plotter is None:
                print(f"Episode {episode}: Evaluation mean accumulated reward = {mean_reward}")
            else:
                plotter.update_plot(episode, mean_reward)

    return mean_eval_rewards

In [None]:
def reinforce_with_baseline(env, policy, gamma=0.97, learning_rate=0.0005, value_learning_rate=0.0005, n_episodes=100, plotter=None):
    pass

Run the algorithm with the simple vanilla agent and plot the results

In [None]:
import random
random.seed(42)
np.random.seed(42)
n_episodes=1000
plotter = ActivePlotter(max_iteration=n_episodes,reward_range=(-1900, 0))

# env = ...
# policy = ...
# reinforce_with_baseline_vanilla_mean_eval_rewards = reinforce_with_baseline(...)

In [None]:
agent_evaluation = evaluate_agent(policy, env, n_episodes=100)
print(f"agent evaluation mean reward:", agent_evaluation)

theta_params = policy.get_parameters_vector()
print(f"there are {len(theta_params)} parameters in this policy function")
print(theta_params)
omega_params = policy.get_value_parameters_vector()
print(f"there are {len(omega_params)} parameters in this state-value function")
print(omega_params)
visualize_policy(policy, env, n_episodes=2)