## HW 4: Policy gradient
_Reference: based on Practical RL course by YSDA_

In this notebook you have to master Policy gradient Q-learning and apply it to familiar (and not so familiar) RL problems once again.

To get used to `gymnasium` package, please, refer to the [documentation](https://gymnasium.farama.org/introduction/basic_usage/).


In the end of the notebook, please, copy the functions you have implemented to the template file and submit it to the Contest.

In [3]:
import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt

In [5]:
env = gym.make("CartPole-v1")

env.reset()
n_actions = env.action_space.n
state_dim = env.observation_space.shape



# Building the network for Policy Gradient (REINFORCE)

For REINFORCE algorithm, we'll need a model that predicts action probabilities given states.

For numerical stability, please __do not include the softmax layer into your network architecture__.
We'll use softmax or log-softmax where appropriate.

In [6]:
import torch
import torch.nn as nn

In [7]:
# Build a simple neural network that predicts policy logits.
# Keep it simple: CartPole isn't worth deep architectures.
model = nn.Sequential(
    nn.Linear(state_dim[0],256),
    nn.ReLU(),
    nn.Linear(256,64),
    nn.ReLU(),
    nn.Linear(64,n_actions)
)


#### Predicting the action probas

Note: **output value of this function is not a torch tensor, it's a numpy array.**

So, here gradient calculation is not needed.

Use [no_grad](https://pytorch.org/docs/stable/autograd.html#torch.autograd.no_grad)
to suppress gradient calculation.

Also, `.detach()` can be used instead, but there is a difference:

* With `.detach()` computational graph is built but then disconnected from a particular tensor, so `.detach()` should be used if that graph is needed for backprop via some other (not detached) tensor;
* In contrast, no graph is built by any operation in `no_grad()` context, thus it's preferable here.

In [8]:
def predict_probs(states):
    """
    Predict action probabilities given states.
    :param states: numpy array of shape [batch, state_shape]
    :returns: numpy array of shape [batch, n_actions]
    """
    # Use no_grad to suppress gradient calculation.
    with torch.no_grad():
        #перевод в tensor для model
        nn_states=torch.tensor(states,dtype=torch.float32)
        logi=model(nn_states)
        probs = torch.softmax(logi, dim=-1)
        #обратно
        return probs.numpy()

In [9]:

test_states = np.array([env.reset()[0] for _ in range(5)])
test_probas = predict_probs(test_states)
assert isinstance(test_probas, np.ndarray), \
    "you must return np array and not %s" % type(test_probas)
assert tuple(test_probas.shape) == (test_states.shape[0], env.action_space.n), \
    "wrong output shape: %s" % np.shape(test_probas)
assert np.allclose(np.sum(test_probas, axis=1), 1), "probabilities do not sum to 1"
print('s')

s


### Play the game

We can now use our newly built agent to play the game.

In [10]:
def generate_session(env, t_max=1000):
    states, actions, rewards = [], [], []
    s, info = env.reset()

    for t in range(t_max):
        action_probs = predict_probs(np.array([s]))[0]
        a = np.random.choice(n_actions, p=action_probs)
        new_s, r, done, truncated, info = env.step(a)

        states.append(s)
        actions.append(a)
        rewards.append(r)

        s = new_s
        if done:
            break

    return states, actions, rewards

In [11]:
# test it
states, actions, rewards = generate_session(env)

### Computing cumulative rewards

To work with sequential environments we need the cumulative discounted reward for known for every state. To compute it we can **roll back** from the end of the session to the beginning and compute the discounted cumulative reward as following:

$$
\begin{align*}
G_t &= r_t + \gamma r_{t + 1} + \gamma^2 r_{t + 2} + \ldots \\
&= \sum_{i = t}^T \gamma^{i - t} r_i \\
&= r_t + \gamma * G_{t + 1}
\end{align*}
$$

In [12]:
def get_cumulative_rewards(rewards,  # rewards at each step
                           gamma=0.99  # discount for reward
                           ):
    """
    Take a list of immediate rewards r(s,a) for the whole session
    and compute cumulative returns (a.k.a. G(s,a) in Sutton '16).

    G_t = r_t + gamma*r_{t+1} + gamma^2*r_{t+2} + ...

    A simple way to compute cumulative rewards is to iterate from the last
    to the first timestep and compute G_t = r_t + gamma*G_{t+1} recurrently

    You must return an array/list of cumulative rewards with as many elements as in the initial rewards.
    """
    #G_t = r_t + gamma*G_{t+1}
    #то есть для Gt+1=0 так как нет будущ наград
    cumulative_rewards=[]
    Gt=0
    #считаем с конца по формулке
    for i in reversed(rewards):
        Gt=i+gamma*Gt
        #cumulative_rewards.insert(0,Gt) за O(n), так что возможно развернув будет даже быстрее
        cumulative_rewards.append(Gt)
    return cumulative_rewards[::-1]

In [13]:
get_cumulative_rewards(rewards)
assert len(get_cumulative_rewards(list(range(100)))) == 100
assert np.allclose(
    get_cumulative_rewards([0, 0, 1, 0, 0, 1, 0], gamma=0.9),
    [1.40049, 1.5561, 1.729, 0.81, 0.9, 1.0, 0.0])
assert np.allclose(
    get_cumulative_rewards([0, 0, 1, -2, 3, -4, 0], gamma=0.5),
    [0.0625, 0.125, 0.25, -1.5, 1.0, -4.0, 0.0])
assert np.allclose(
    get_cumulative_rewards([0, 0, 1, 2, 3, 4, 0], gamma=0),
    [0, 0, 1, 2, 3, 4, 0])
print("looks good!")

looks good!


### Loss function and updates

We now need to define objective and update over policy gradient.

Our objective function is

$$ J \approx  { 1 \over N } \sum_{s_i,a_i} G(s_i,a_i) $$

REINFORCE defines a way to compute the gradient of the expected reward with respect to policy parameters. The formula is as follows:

$$ \nabla_\theta \hat J(\theta) \approx { 1 \over N } \sum_{s_i, a_i} \nabla_\theta \log \pi_\theta (a_i \mid s_i) \cdot G_t(s_i, a_i) $$

We can abuse PyTorch's capabilities for automatic differentiation by defining our objective function as follows:

$$ \hat J(\theta) \approx { 1 \over N } \sum_{s_i, a_i} \log \pi_\theta (a_i \mid s_i) \cdot G_t(s_i, a_i) $$

When you compute the gradient of that function with respect to network weights $\theta$, it will become exactly the policy gradient.

Final loss should also include the entropy regularization term $H(\pi_\theta (a_i \mid s_i))$ to enforce the exploration:

$$
L = -\hat J(\theta) - \lambda H(\pi_\theta (a_i \mid s_i)),
$$
where $\lambda$ is the `entropy_coef`. 

This function might be useful:

In [14]:
def to_one_hot(y_tensor, ndims):
    """ helper: take an integer vector and convert it to 1-hot matrix. """
    y_tensor = y_tensor.type(torch.LongTensor).view(-1, 1)
    y_one_hot = torch.zeros(
        y_tensor.size()[0], ndims).scatter_(1, y_tensor, 1)
    return y_one_hot

In [15]:
def get_loss(states, actions, rewards, gamma=0.99, entropy_coef=1e-2):
    """
    Compute the loss for the REINFORCE algorithm.
    """
    states = torch.tensor(states, dtype=torch.float32)
    actions = torch.tensor(actions, dtype=torch.int64)  # тут похоже опечатка так как int32 не работает, он наоборот в минимум стремится почему то
    cumulative_returns = np.array(get_cumulative_rewards(rewards, gamma))
    cumulative_returns = (cumulative_returns - cumulative_returns.mean()) / (cumulative_returns.std() + 1e-8)
    cumulative_returns = torch.tensor(cumulative_returns, dtype=torch.float32)

    # predict logits, probas and log-probas using an agent.
    logits = model(states)
    probs = torch.softmax(logits, dim=-1)
    log_probs = torch.log_softmax(logits, dim=-1)

    # как по лекции через one-hot, но он медленный немножко, но работают оба
    # one_hot= to_one_hot(actions, n_actions)
    # log_probs_for_actions=torch.sum(log_probs*one_hot, dim=1)

    # select log-probabilities for chosen actions, log pi(a_i|s_i)
    # прочитал, что gather будет быстрее правда чтобы он работал, нужно сделать столько же размерностей сколько и у входа
    log_probs_for_actions = log_probs.gather(1, actions.unsqueeze(1)).squeeze(1)
    
    J_hat = torch.mean(log_probs_for_actions * cumulative_returns)
    p_loss = -J_hat

    entropy = -torch.sum(probs * log_probs, dim=1).mean()
    loss = p_loss - entropy_coef * entropy

    return loss

In [16]:
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

def train_on_session(states, actions, rewards, optimizer=optimizer, gamma=0.99, entropy_coef=1e-2):
    loss = get_loss(states, actions, rewards, gamma, entropy_coef)

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    return np.sum(rewards)

### The actual training

In [17]:
for i in range(500):
    rewards = [train_on_session(*generate_session(env), entropy_coef=1e-2) for _ in range(100)]  # generate new sessions

    print("mean reward:%.3f" % (np.mean(rewards)))

    if np.mean(rewards) > 500:
        print("You Win!")  # but you can train even further
        break

  states = torch.tensor(states, dtype=torch.float32)


mean reward:30.160
mean reward:56.050
mean reward:161.480
mean reward:344.590
mean reward:815.640
You Win!


### Watch the video of your results:

In [1]:
import os
import numpy as np
import gymnasium as gym
from gymnasium.utils.save_video import save_video

env_for_video = gym.make("CartPole-v1", render_mode="rgb_array_list")
n_actions = env_for_video.action_space.n

episode_index = 0
step_starting_index = 0

obs, info = env_for_video.reset()

for step_index in range(800):
    probs = predict_probs(np.array([obs]))[0]
    action = np.random.choice(n_actions, p=probs)

    obs, reward, terminated, truncated, info = env_for_video.step(action)
    done = terminated or truncated

    if done or step_index == 799:
        # env_for_video.render() now returns the LIST of frames accumulated so far
        frames = env_for_video.render()
        os.makedirs("videos", exist_ok=True)
        save_video(
            frames, "videos",
            fps=env_for_video.metadata.get("render_fps", 30),
            step_starting_index=step_starting_index,
            episode_index=episode_index,
        )
        episode_index += 1
        step_starting_index = step_index + 1
        obs, info = env_for_video.reset()

env_for_video.close()


DependencyNotInstalled: moviepy is not installed, run `pip install "gymnasium[other]"`

Congratulations! Finally, copy the `predict_probs`, `get_cumulative_rewards` and `get_loss` to the template and submit them to the Contest.

Good luck!

## Bonus part (no points, just for the interested ones)

Try solving the `Acrobot-v1` environment. It is more complex than regular `CartPole-v1`, so the default Policy Gradient (REINFORCE) algorithm might not work. Maybe the baseline idea could help...

![Acrobot](https://gymnasium.farama.org/_images/acrobot.gif)


In [6]:
import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim

# Определяем устройство (GPU/CPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Создаём среду
env = gym.make("Acrobot-v1")
env.reset()
n_actions = env.action_space.n
state_dim = env.observation_space.shape[0]  # scalar, not tuple

# Модель с двумя выходами: policy logits и value estimate
class PolicyValueNetwork(nn.Module):
    def __init__(self, state_dim, n_actions, hidden1=256, hidden2=64):
        super().__init__()
        self.common = nn.Sequential(
            nn.Linear(state_dim, hidden1),
            nn.ReLU(),
            nn.Linear(hidden1, hidden2),
            nn.ReLU(),
        )
        self.policy_head = nn.Linear(hidden2, n_actions)   # policy logits
        self.value_head = nn.Linear(hidden2, 1)            # state value V(s)

    def forward(self, x):
        features = self.common(x)
        policy_logits = self.policy_head(features)
        value = self.value_head(features).squeeze(-1)      # [batch] instead of [batch, 1]
        return policy_logits, value

# model = PolicyValueNetwork(state_dim, n_actions).to(device)

try:
    model = torch.load("Acrobo.pth", map_location=device)
except Exception as e:
    print(f"Ошибка загрузки: {e}")

Using device: cuda


  model = torch.load("Acrobo.pth", map_location=device)


In [8]:

def predict_probs(states):
    """
    Predict action probabilities given states.
    :param states: numpy array of shape [batch, state_dim]
    :returns: numpy array of shape [batch, n_actions]
    """
    with torch.no_grad():
        states_tensor = torch.tensor(states, dtype=torch.float32).to(device)
        logits, _ = model(states_tensor)
        probs = torch.softmax(logits, dim=-1)
        return probs.cpu().numpy()

def generate_session(env, t_max=500):
    states, actions, rewards = [], [], []
    s, info = env.reset()

    for t in range(t_max):
        action_probs = predict_probs(np.array([s]))[0]
        #a = np.argmax(action_probs)
        a = np.random.choice(n_actions, p=action_probs)
        new_s, r, done, truncated, info = env.step(a)

        states.append(s)
        actions.append(a)
        rewards.append(r)

        s = new_s
        if done or truncated:
            break

    return states, actions, rewards

def get_cumulative_rewards(rewards, gamma=0.99):
    cumulative_rewards = []
    Gt = 0
    for r in reversed(rewards):
        Gt = r + gamma * Gt
        cumulative_rewards.append(Gt)
    return cumulative_rewards[::-1]

def get_loss(states, actions, rewards, gamma=0.99, entropy_coef=1e-2, critic_coef=0.5):
    states = torch.tensor(np.array(states), dtype=torch.float32).to(device)
    actions = torch.tensor(actions, dtype=torch.int64).to(device)

    # Получаем G_t (Monte-Carlo returns)
    G_t = torch.tensor(get_cumulative_rewards(rewards, gamma), dtype=torch.float32).to(device)

    # Прогоняем через модель: получаем logits и V(s)
    logits, values = model(states)  # values: [T], G_t: [T]

    # === Actor loss (policy) ===
    log_probs = torch.log_softmax(logits, dim=-1)
    probs = torch.softmax(logits, dim=-1)
    log_probs_for_actions = log_probs.gather(1, actions.unsqueeze(1)).squeeze(1)

    # Advantage = G_t - V(s_t)
    advantages = G_t - values
    advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)

    actor_loss = -(log_probs_for_actions * advantages).mean()

    # === Critic loss (value function) ===
    critic_loss = nn.functional.mse_loss(values, G_t)

    # === Entropy bonus ===
    entropy = -(probs * log_probs).sum(dim=-1).mean()

    # === Total loss ===
    total_loss = actor_loss + critic_coef * critic_loss - entropy_coef * entropy

    return total_loss

optimizer = optim.Adam(model.parameters(), lr=1e-3)


In [None]:
def train_on_session(states, actions, rewards, optimizer, gamma=0.99, entropy_coef=1e-2):
    loss = get_loss(states, actions, rewards, gamma, entropy_coef)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    return float(np.sum(rewards))

# Обучение
for i in range(500):
    session_rewards = []
    for _ in range(100):
        states, actions, rewards = generate_session(env)
        total_reward = train_on_session(states, actions, rewards, optimizer, entropy_coef=1e-4)
        session_rewards.append(total_reward)

    mean_reward = np.mean(session_rewards)
    print(f"Episode {i+1}: mean reward = {mean_reward:.3f}")

    if mean_reward > -80:
        print("You Win!")
        break

env.close()

Episode 1: mean reward = -106.490
Episode 2: mean reward = -108.100
Episode 3: mean reward = -99.470
Episode 4: mean reward = -107.910
Episode 5: mean reward = -119.870
Episode 6: mean reward = -103.190
Episode 7: mean reward = -114.910
Episode 8: mean reward = -100.360
Episode 9: mean reward = -102.340
Episode 10: mean reward = -98.800
Episode 11: mean reward = -109.630
Episode 12: mean reward = -105.530


In [19]:
torch.save(model, "Acrobo.pth")

In [20]:
import os
import numpy as np
import gymnasium as gym
from gymnasium.utils.save_video import save_video

env_for_video = gym.make("Acrobot-v1", render_mode="rgb_array_list")
n_actions = env_for_video.action_space.n

episode_index = 0
step_starting_index = 0

obs, info = env_for_video.reset()

for step_index in range(800):
    probs = predict_probs(np.array([obs]))[0]
    action = np.random.choice(n_actions, p=probs)

    obs, reward, terminated, truncated, info = env_for_video.step(action)
    done = terminated or truncated

    if done or step_index == 799:
        # env_for_video.render() now returns the LIST of frames accumulated so far
        frames = env_for_video.render()
        os.makedirs("videos", exist_ok=True)
        save_video(
            frames, "videos",
            fps=env_for_video.metadata.get("render_fps", 30),
            step_starting_index=step_starting_index,
            episode_index=episode_index,
        )
        episode_index += 1
        step_starting_index = step_index + 1
        obs, info = env_for_video.reset()

env_for_video.close()
