# Reinforcement Learning


Имя, Фамилия:
Лидия Карпович


---


In [5]:
!pip install gymnasium[all]

Collecting box2d-py==2.3.5 (from gymnasium[all])
  Using cached box2d-py-2.3.5.tar.gz (374 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting swig==4.* (from gymnasium[all])
  Using cached swig-4.3.1-py3-none-manylinux_2_12_x86_64.manylinux2010_x86_64.whl.metadata (3.5 kB)
Collecting mujoco-py<2.2,>=2.1 (from gymnasium[all])
  Using cached mujoco_py-2.1.2.14-py3-none-any.whl.metadata (669 bytes)
Collecting cython<3 (from gymnasium[all])
  Using cached Cython-0.29.37-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_24_x86_64.whl.metadata (3.1 kB)
Collecting mujoco>=2.1.5 (from gymnasium[all])
  Using cached mujoco-3.3.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (44 kB)
Collecting glfw (from mujoco>=2.1.5->gymnasium[all])
  Using cached glfw-2.9.0-py2.py27.py3.py30.py31.py32.py33.py34.py35.py36.py37.py38.p39.p310.p311.p312.p313-none-manylinux_2_28_x86_64.whl.metadata (5.4 kB)
Collecting fasteners~=0.15 (from mujoco-py<2.2,>=2.1

In [6]:
import random

from tqdm.auto import trange, tqdm

import matplotlib.pyplot as plt
import numpy as np

In [7]:
# utils, нужно чтобы рендерить видео траекторий в colab
from IPython.display import HTML
from base64 import b64encode

# source: https://stackoverflow.com/a/69990457
def show_video(video_path, video_width = 600):
    video_file = open(video_path, "r+b").read()
    video_url = f"data:video/mp4;base64,{b64encode(video_file).decode()}"
    return HTML(f"""<video width={video_width} controls><source src="{video_url}"></video>""")

## Введение в Gymnasium


Документация: https://gymnasium.farama.org

Изначально появился под именем [Gym](https://github.com/openai/gym) (и так и остался), стал де-факто стандартом взаимодействия с различными средами и симуляторами для RL исследователей и инженеров. С тех пор все новые среды оформляют похожим образом. Удобно, т.к. уже написанные для Gym алгоритмы могут работать на любых новых средах, достаточно лишь, чтобы они соответствовали API.

Тем не менее, спустя время OpenAI перестали его поддерживать и обновлять, накопилось багов, поэтому появился [Gymnasium](https://github.com/Farama-Foundation/Gymnasium) - форк который поддерживается [комьюнити](https://farama.org) и постоянно обновляется. По сравнению с замороженным Gym было добавлено несколько больших изменений, ломающих обратную совместимость, поэтому сейчас Gym считается `deprecated` и весь новый код нужно писать на Gymnasium

In [8]:
# по привычке импортирут под старым именем
import gymnasium as gym

from gymnasium.wrappers import RecordVideo

Gymnasium предоставляет некоторое количество стандартных сред (полный список можно посмотреть в документации). Сторонние библиотеки как правило регистрируют свои среды в Gym, поэтому в идеале для них ничего не менятся (если разрабы все сделали правильно).

Рассмотрим какую-нибудь поближе, например классическую [CartPole-v1](https://gymnasium.farama.org/environments/classic_control/cart_pole/):

<img src="https://gymnasium.farama.org/_images/cart_pole.gif" width="300">

**NB**! Как правило одна и та же среда разных версий может различаться, поэтому на это стоит обращать внимание (особенно если воспроизводите чужой метод, результаты на разных версиях могут отличаться).

In [9]:
env = gym.make("CartPole-v1")
env

<TimeLimit<OrderEnforcing<PassiveEnvChecker<CartPoleEnv<CartPole-v1>>>>>

Глобально, среда определяется двумя вещами:

1. **observation space** - пространство возможных состояний;
2. **action space** - пространство возможных действий.

В зависимости от пространства состояний может меняться архитектура агента и сложность задачи. С действиями еще сложнее, т.к. некоторые методы работают только с дискретными действиями (например, классический DQN), некоторые только с вещественными (например, DDPG, SAC). К счастью, бывают алгоритмы которые умеют и так и так (например, PPO).

Задача же определяется с помощью награды (в целом очевидно).

In [10]:
print("Observation space:", env.observation_space)
print("Acton space:", env.action_space)

Observation space: Box([-4.8               -inf -0.41887903        -inf], [4.8               inf 0.41887903        inf], (4,), float32)
Acton space: Discrete(2)


In [11]:
print("Obs shape", env.observation_space.shape)
print("Obs min values", env.observation_space.low)
print("Obs max values", env.observation_space.high)

Obs shape (4,)
Obs min values [-4.8               -inf -0.41887903        -inf]
Obs max values [4.8               inf 0.41887903        inf]


Взаимодействие с средой происходит через два основных метода:
1. env**.reset()**
2. env**.step()**

In [12]:
# возвращает среду к начальному состоянию -> начинается новый эпизод
reset_obs, reset_info = env.reset()

print(reset_obs)
print(reset_info)

[0.00114413 0.0005388  0.00693469 0.04288638]
{}


In [13]:
next_obs, reward, terminated, truncated, info = env.step(action=0)

print(next_obs)     # следующее состояние, в которое перешла среда после совершения действия
print(reward)       # полученная награда
print(terminated)   # считается ли задача выполненной
print(truncated)    # нужно ли закончить эпизод по любой другой причине (например лимит кол-во шагов в одном эпизоде)
print(info)         # дополнителные логи, если есть

[ 0.0011549  -0.1946819   0.00779242  0.33774918]
1.0
False
False
{}


В целом, весь эпизод обычно выглядит так:

In [14]:
!rm -rf videos

In [15]:
env = gym.make("CartPole-v1", render_mode="rgb_array")

# это wrapper, с помощью них можно писать обертки, которые добавляют какие-то новые фичи поверх
# в данном случае сохраняет видео эпизода, полезно для дебага и просто ради любопытства, посмотреть, что агент там делает
env = RecordVideo(env, "videos", episode_trigger=lambda t: True, disable_logger=True)

done = False # если True, то эпизод закончился
obs, info = env.reset()

while not done:
    # пока что случайное действие
    action = env.action_space.sample()

    obs, reward, terminated, truncated, info = env.step(action)
    done = terminated or truncated

# не всегда нужно, но лучше так делать
env.close()

In [16]:
show_video("videos/rl-video-episode-0.mp4")

Краткий экскурс закончен, погнали уже что-то делать ;)



---



## Задание 1: REINFORCE (3 балла)



Попробуем написать разобранный на лекции простой RL алгоритм - REINFORCE. Он умеет и в дискретные и вещественные действия (достаточно поменять Categorical распределение на Normal в акторе).

Более современные on-policy методы, такие как A2C, PPO, PPG строятся по похожему принципу (меняются только лоссы, и то как собираются данные), поэтому важно понять общую идею.

In [17]:
import torch
import torch.nn.functional as F
import torch.nn as nn

from torch.distributions import Categorical

device = "cuda" if torch.cuda.is_available() else "cpu"
print("Device: ", device)

Device:  cuda


Начнем с актора $\pi(a | s)$. В данном случае нам хватит маленькой трехслойной сети. На выходе мы хотим получать категориальное распределение - вероятности действий.

Удобно также отдельно написать метод, который будет возвращать конкретное действие для стейта. Обычно такой метод не предполагает батчей, а работает сразу с np.ndarray, чтобы скрыть детали реализации (т.к. они бывают разные у разных методов).

In [18]:
class Actor(nn.Module):
    def __init__(self, obs_dim: int, action_dim: int, hidden_dim: int = 32):
        super().__init__()
        """
        Политика для агента.
        Args:
            obs_dim (int): Размерность пространства наблюдений.
            action_dim (int): Количество возможных дискретных действий.
            hidden_dim (int): Количество нейронов в скрытом слое.
        """
        self.net = nn.Sequential(
            nn.Linear(obs_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, action_dim)
        )

    def forward(self, obs: torch.Tensor) -> Categorical:
        """
        Прямой проход: возвращает категориальное распределение над действиями.

        Args:
            obs (torch.Tensor): Наблюдение (размер [batch_size, obs_dim]).

        Returns:
            Categorical: Распределение над действиями.
        """
        logits = self.net(obs)
        return Categorical(logits=logits)

    @torch.no_grad()
    def get_action(self, obs: np.ndarray, greedy: bool = False) -> int:
        """
        Возвращает действие на основе текущей политики.

        Args:
            obs (np.ndarray): Наблюдение размером (obs_dim,)
            greedy (bool): Если True — вернуть наиболее вероятное действие.

        Returns:
            int: Индекс выбранного действия.
        """
        obs_tensor = torch.from_numpy(obs).float().unsqueeze(0)  # добавляем batch размерность
        dist = self.forward(obs_tensor)
        if greedy:
            action = torch.argmax(dist.probs, dim=-1)
        else:
            action = dist.sample()
        return action.item()



Дальше займемся сбором данных для обучения. Последовательно (в наивной имплементации, как делать лучше обсудим чуть позже) соберем $N$ эпизодов, сохраним все что нужно для обучения - состояния, действия, суммарные награды за эпизоды.

In [21]:
from typing import Tuple, Dict

def collect_experience(env: gym.Env,
                       actor: Actor,
                       num_episodes: int) -> Tuple[torch.Tensor,
                                                   torch.Tensor,
                                                   torch.Tensor,
                                                   Dict]:
    """
    Собирает опыт (траектории) от взаимодействия актора с окружением.

    Args:
        env (gym.Env): Среда Gym, с которой взаимодействует агент.
        actor (Actor): Политика (актор), которая выбирает действия на основе наблюдений.
        num_episodes (int): Количество эпизодов, которые нужно собрать.

    Returns:
        Tuple[torch.Tensor, torch.Tensor, torch.Tensor, Dict]:
            - batch_obs: Батч наблюдений.
            - batch_actions: Батч действий.
            - batch_scores: Веса для градиента политики — суммарная награда эпизода,
                            повторенная для каждого шага эпизода.
            - info (dict): Дополнительная информация, например:
                - 'mean_return': средняя суммарная награда на эпизод,
                - 'mean_len': средняя длина эпизода.

    Notes:
        Для каждого шага в эпизоде сохраняются наблюдение и действие.
        В конце эпизода суммарная награда эпизода (episode_reward) повторяется episode_len раз
        и добавляется как вес для policy gradient.
    """

    batch_obs = []          # for observations
    batch_actions = []      # for actions
    batch_scores = []       # for R(tau) weighting in policy gradient

    returns, lengths = [], []
    for _ in range(num_episodes):
        obs, done = env.reset()[0], False

        episode_reward, episode_len = 0, 0
        while not done:
            action = actor.get_action(obs)
            next_obs, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated

            batch_obs.append(obs)
            batch_actions.append(action)
            episode_reward += reward
            episode_len += 1

            obs = next_obs

        # the weight for each logprob(a|s) is R(tau)
        batch_scores += [episode_reward] * episode_len
        returns.append(episode_reward)
        lengths.append(episode_len)

    info = {
        "mean_return": np.mean(returns),
        "mean_len": np.mean(lengths)
    }
    return (
        torch.tensor(np.array(batch_obs), dtype=torch.float, device=device),
        torch.tensor(np.array(batch_actions), dtype=torch.float, device=device),
        torch.tensor(np.array(batch_scores), dtype=torch.float, device=device),
        info
    )

Определим наш лосс. В отличие от лоссов привычных в DL, данный **лосс сам по себе не имеет интерпретации** и не стоит пытаться делать каких-то выводов на основе его значений. Для нас важно то, что градиент от этого лосса совпадает с policy gradient - то что нам нужно для обновления весов.

In [22]:
def policy_gradient_loss(log_prob: torch.Tensor,
                         score: torch.Tensor) -> torch.Tensor:
    """
    Вычисляет loss.

    Args:
        log_probs (torch.Tensor): Логарифмированные вероятности действий.
                                  shape: [batch_size]
        score (torch.Tensor): Оценки, на основе которых происходит взвешивание
                              лог-вероятностей.
                              shape: [batch_size]

    Returns:
        torch.Tensor: Значение loss.
    """
    return -(log_prob * score).mean()

Осталось собрать все вместе. На каждой эпохе будем собирать `batch_episodes` эпизодов, пересчитывать $\log \pi(a | s)$, обновлять веса обычным для PyTorch способом.

In [27]:
def train(actor: Actor,
          env_name: str,
          lr: float = 1e-3,
          epochs: int = 1000,
          batch_episodes: int = 10):
    """
    Обучение агента.

    Args:
        actor (Actor): Политика агента.
        env_name (str): Название среды Gym для обучения.
        lr (float, optional): Скорость обучения для оптимизатора.
        epochs (int, optional): Количество эпох обучения.
        batch_episodes (int, optional): Количество эпизодов на одну итерацию обучения.
    """

    env = gym.make(env_name)
    assert isinstance(env.action_space, gym.spaces.Discrete), "This example only works for discrete action spaces."
    optim = torch.optim.Adam(actor.parameters(), lr=lr)

    for i in trange(epochs, desc="Epochs"):
        log_probs = []
        rewards = []
        lengths = []

        for _ in range(batch_episodes):
            obs, _ = env.reset()
            done = False
            ep_rewards = []
            ep_log_probs = []

            while not done:
                obs_tensor = torch.tensor(obs, dtype=torch.float32).unsqueeze(0)
                dist = actor(obs_tensor)
                action = dist.sample()
                log_prob = dist.log_prob(action)

                next_obs, reward, terminated, truncated, _ = env.step(action.item())
                done = terminated or truncated
                obs = next_obs


                ep_rewards.append(reward)
                ep_log_probs.append(log_prob)

            total_reward = sum(ep_rewards)
            episode_len = len(ep_rewards)
            rewards.extend([total_reward] * episode_len)
            log_probs.extend(ep_log_probs)
            lengths.append(episode_len)

        log_probs = torch.stack(log_probs)
        rewards = torch.tensor(rewards, dtype=torch.float32)
        loss = policy_gradient_loss(log_probs, rewards)

        optim.zero_grad()
        loss.backward()
        optim.step()

        if i % 10 == 0:
            mean_return = sum(rewards.tolist()) / batch_episodes
            mean_len = sum(lengths) / batch_episodes
            tqdm.write(f"Epoch {i}. Mean return: {mean_return:.2f}, Mean length: {mean_len:.1f}")


In [28]:
actor = Actor(env.observation_space.shape[-1], env.action_space.n)

train(actor, "CartPole-v1", epochs=200, batch_episodes=25, lr=1e-2)

Epochs:   0%|          | 0/200 [00:00<?, ?it/s]

Epoch 0. Mean return: 413.80, Mean length: 18.8
Epoch 10. Mean return: 554.80, Mean length: 21.8
Epoch 20. Mean return: 259.60, Mean length: 15.4
Epoch 30. Mean return: 233.20, Mean length: 14.7
Epoch 40. Mean return: 161.04, Mean length: 12.5
Epoch 50. Mean return: 140.36, Mean length: 11.7
Epoch 60. Mean return: 144.12, Mean length: 11.9
Epoch 70. Mean return: 101.04, Mean length: 10.0
Epoch 80. Mean return: 88.32, Mean length: 9.4
Epoch 90. Mean return: 87.24, Mean length: 9.3
Epoch 100. Mean return: 90.36, Mean length: 9.5
Epoch 110. Mean return: 88.32, Mean length: 9.4
Epoch 120. Mean return: 84.28, Mean length: 9.2
Epoch 130. Mean return: 88.92, Mean length: 9.4
Epoch 140. Mean return: 91.04, Mean length: 9.5
Epoch 150. Mean return: 91.36, Mean length: 9.5
Epoch 160. Mean return: 86.96, Mean length: 9.3
Epoch 170. Mean return: 83.08, Mean length: 9.1
Epoch 180. Mean return: 90.36, Mean length: 9.5
Epoch 190. Mean return: 93.56, Mean length: 9.6


Попробуем визуализировать обученного агента

In [29]:
!rm -rf videos

In [35]:
import uuid

def rollout(env_name: str, agent: Actor) -> float:
    """
    Выполняет один эпизод взаимодействия агента со средой.

    Args:
        env_name (str): Название среды Gym, которую нужно использовать.
        agent (Actor): Объект агента, реализующий метод `get_action`.

    Returns:
        float: Общая награда, полученная за эпизод.
    """

    env = gym.make(env_name, render_mode="rgb_array")
    video_dir = f"videos/{uuid.uuid4()}"
    env = RecordVideo(env, video_dir, episode_trigger=lambda t: True, disable_logger=True)

    done = False
    obs, info = env.reset()

    total_reward = 0.0
    while not done:
        action = agent.get_action(obs, greedy=True)

        obs, reward, terminated, truncated, info = env.step(action)
        done = terminated or truncated
        total_reward += reward

    env.close()
    return total_reward

In [36]:
rollout("CartPole-v1", actor)

9.0

In [37]:
show_video("videos/rl-video-episode-0.mp4")



---



### Бонусная часть: StableBaselines3



[StableBaselines3](https://stable-baselines3.readthedocs.io/en/master/) - наверное, самая популярная библиотека для RL. Подходит тем, кто хочет применять популярные RL бейзлайны к своим задачам, но не хочет лезть внутрь. В целом кастомизация сильно ограничена, но поменять архитектуру агента в некоторых случаях возможно.



Часто лучший способ проверить насколько сложная у вас задача - попробовать несколько разных методов от туда, потюнить параметры. Потом уже решать, стоит ли коммититься, на то, чтобы написать свою имплементацию более сложного (или кастомизируемого) метода.

In [38]:
!pip install gym pyglet rich tqdm
!pip install stable-baselines3

Collecting pyglet
  Downloading pyglet-2.1.6-py3-none-any.whl.metadata (7.7 kB)
Downloading pyglet-2.1.6-py3-none-any.whl (983 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m984.0/984.0 kB[0m [31m19.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pyglet
Successfully installed pyglet-2.1.6
Collecting stable-baselines3
  Downloading stable_baselines3-2.6.0-py3-none-any.whl.metadata (4.8 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch<3.0,>=2.3->stable-baselines3)
  Using cached nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch<3.0,>=2.3->stable-baselines3)
  Using cached nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch<3.0,>=2.3->stable-baselines3)
  Using cached nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collect

In [39]:
import gym as oldgym

from stable_baselines3 import DQN
from stable_baselines3.common.vec_env import DummyVecEnv
from stable_baselines3.common.env_util import make_vec_env

В отличие от нашей наивной реализации REINFORCE, где данные собираются последовательно, никакой настоящей нужды в таком подходе нет. Мы можем собирать эпизоды параллельно сразу с нескольких сред (в реальных задачах может доходить до десяток тысяч и больше)


В stable-baselines для этого уже написаны готовые обертки (как и в обычном [gymnasium](https://gymnasium.farama.org/api/vector/)).В остальном примерение алгоритмов максимально простое и заключается в основном в чтении документации, чтобы понять какие параметры там есть, и какие нужны и полезны под конкретную задачу.



Попробуем решить ту же среду с помощью DQN:

In [40]:
env = make_vec_env("CartPole-v1", n_envs=8, seed=0, vec_env_cls=DummyVecEnv)

model = DQN("MlpPolicy", env)
model.learn(total_timesteps=100_000, progress_bar=True)

Output()

<stable_baselines3.dqn.dqn.DQN at 0x79472000bdd0>

Работает это все шустрее и лучше, чем базовый REINFORCE *(и надо сильно меньше кода)*

In [None]:
eval_env = oldgym.make("CartPole-v1")

obs, done = eval_env.reset(), False
total_reward = 0.0
while not done:
    action = model.predict(obs)[0].item()
    obs, reward, done, _ = eval_env.step(action)
    total_reward += reward

print("Reward:", total_reward)



---



### Дополнительные задания



*Каждый пункт дает 1 дополнительный балл.*

1. Попробовать discounted return вместо суммарной награды за эпизод. Для этого придется суммировать дисконтированную награду (домноженную на $\gamma$ в нужной степени)

    P.S. По честному, для дисконтированной награды policy gradient [чуть чуть меняется](https://stanford.edu/~ashlearn/RLForFinanceBook/PolicyGradient.pdf), но на практике на это обычно закрывают глаза и используют формулу, которую мы вывели без учета дисконтирования. Это в целом интересная история (См. [Why there is a problem with the Policy Gradient theorem in Deep Reinforcement Learning](https://towardsdatascience.com/why-there-is-a-problem-with-the-policy-gradient-theorem-in-deep-reinforcement-learning-958d845218f1))

2. Попробовать вместо награды за всю траекторию использовать discounted return-to-go, то есть награду начиная с этого шага и дальше (См. [тут](https://spinningup.openai.com/en/latest/spinningup/rl_intro3.html) секцию **Don’t Let the Past Distract You**)

    Понимаете ли вы почему так правильнее? Как это отразится на скорости сходимости? Для сравнения имеет смысл логгировать награды, чтобы можно было построить график обучения (кол-во эпизодов по оси х, награда по y).

3. Попробовать учить value function $V(s)$ для baseline (См. [тут](https://spinningup.openai.com/en/latest/spinningup/rl_intro3.html) секцию **Baselines in Policy Gradients**).





In [41]:
from stable_baselines3 import DQN
from stable_baselines3.common.env_util import make_vec_env

env = make_vec_env("CartPole-v1", n_envs=8, seed=0)
model = DQN("MlpPolicy", env)
model.learn(total_timesteps=100_000, progress_bar=True)

Output()

<stable_baselines3.dqn.dqn.DQN at 0x794729b36910>

In [43]:
from typing import List

def compute_returns(rewards: List[float], gamma: float = 0.99) -> List[float]:
    """
    Вычисляет return-to-go для каждого шага в эпизоде.

    Args:
        rewards (List[float]): список наград за эпизод.
        gamma (float): коэффициент дисконтирования.

    Returns:
        List[float]: список return-to-go, по одному на каждый шаг.
    """
    returns = []
    G = 0.0
    for r in reversed(rewards):
        G = r + gamma * G
        returns.insert(0, G)
    return returns

In [46]:
import torch.optim as optim

def train(actor: Actor, env_name: str, lr=1e-2, epochs=200, batch_episodes=25, gamma=0.99):
    optimizer = optim.Adam(actor.parameters(), lr=lr)
    all_mean_rewards = []

    for epoch in range(epochs):
        batch_obs = []
        batch_actions = []
        batch_returns = []
        total_rewards = []

        for episode in range(batch_episodes):
            env = gym.make(env_name)
            obs, _ = env.reset()
            done = False

            rewards = []
            log_probs = []
            while not done:
                action, log_prob = actor.get_action(obs, return_log_prob=True)
                new_obs, reward, terminated, truncated, _ = env.step(action)

                batch_obs.append(obs)
                batch_actions.append(action)
                rewards.append(reward)
                log_probs.append(log_prob)

                obs = new_obs
                done = terminated or truncated

            total_rewards.append(sum(rewards))
            returns = compute_returns(rewards, gamma=gamma)
            batch_returns.extend(returns)

        returns_tensor = torch.tensor(batch_returns, dtype=torch.float32)
        returns_tensor = (returns_tensor - returns_tensor.mean()) / (returns_tensor.std() + 1e-8)

        log_probs_tensor = torch.stack(log_probs)
        loss = -torch.sum(log_probs_tensor * returns_tensor) / len(batch_returns)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        mean_reward = np.mean(total_rewards)
        all_mean_rewards.append(mean_reward)
        print(f"Epoch {epoch+1}: mean reward = {mean_reward:.2f}")

    return all_mean_rewards



---



## Задание 2: Q-learning (2 балла)

Обучите алгоритм Q-learning для [FrozenLake-v1](https://gymnasium.farama.org/environments/toy_text/frozen_lake/) (1 балл) и [Blackjack-v1](https://gymnasium.farama.org/environments/toy_text/blackjack/)(1 балл), в частности подберите оптимальную **alpha** для каждой среды.

*Задание для "бесплатных" баллов :)))*


In [7]:
import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt

def q_learning(env_id, alpha_values, episodes=1000, gamma=0.99, epsilon=0.1):
    best_alpha = None
    best_reward = -float('inf')
    all_rewards = []

    for alpha in alpha_values:
        env = gym.make(env_id)
        state_space = env.observation_space.n
        action_space = env.action_space.n

        Q = np.zeros((state_space, action_space))
        rewards = []

        for ep in range(episodes):
            state, _ = env.reset()
            done = False
            total_reward = 0

            while not done:
                if np.random.rand() < epsilon:
                    action = env.action_space.sample()
                else:
                    action = np.argmax(Q[state])

                next_state, reward, terminated, truncated, _ = env.step(action)
                done = terminated or truncated

                Q[state, action] += alpha * (reward + gamma * np.max(Q[next_state]) - Q[state, action])
                state = next_state
                total_reward += reward

            rewards.append(total_reward)

        avg_reward = np.mean(rewards)
        all_rewards.append(avg_reward)

        if avg_reward > best_reward:
            best_reward = avg_reward
            best_alpha = alpha

        env.close()

    return best_alpha, all_rewards

alphas = np.linspace(0.01, 1.0, 20)
best_alpha_frozen, rewards_frozen = q_learning("FrozenLake-v1", alphas)
print(f"Best alpha for FrozenLake: {best_alpha_frozen:.2f}")

def q_learning_blackjack(alpha_values, episodes=10000, gamma=1.0, epsilon=0.1):
    best_alpha = None
    best_reward = -float('inf')
    all_rewards = []

    for alpha in alpha_values:
        env = gym.make("Blackjack-v1", sab=True)
        Q = {}
        rewards = []

        for ep in range(episodes):
            state, _ = env.reset()
            done = False
            total_reward = 0

            while not done:
                if state not in Q:
                    Q[state] = np.zeros(env.action_space.n)

                if np.random.rand() < epsilon:
                    action = env.action_space.sample()
                else:
                    action = np.argmax(Q[state])

                next_state, reward, terminated, truncated, _ = env.step(action)
                done = terminated or truncated

                if next_state not in Q:
                    Q[next_state] = np.zeros(env.action_space.n)

                Q[state][action] += alpha * (reward + gamma * np.max(Q[next_state]) - Q[state][action])
                state = next_state
                total_reward += reward

            rewards.append(total_reward)

        avg_reward = np.mean(rewards)
        all_rewards.append(avg_reward)

        if avg_reward > best_reward:
            best_reward = avg_reward
            best_alpha = alpha

        env.close()

    return best_alpha, all_rewards

best_alpha_blackjack, rewards_blackjack = q_learning_blackjack(alphas)
print(f"Best alpha for Blackjack: {best_alpha_blackjack:.2f}")


Best alpha for FrozenLake: 0.74
Best alpha for Blackjack: 0.06




---



## Задание 3: PPO (5 баллов)

В этом задании вам предстоить реализовать алгоритм PPO. С помощью этого алгоритма тюнили ChatGPT, учили ботов играть в Dota 2, Minecraft, Rocket League и много других игр и не только. В целом считается, что PPO один самых стабильных и "работающих из коробки" RL алгоритмов, поэтому его очень любят и ценят практики. Несмотря на концептуальную простоту и множество заслуг, реализация тем не менее требует знания некоторых трюков, которые могут сильно влиять на итоговый результат.

**Список ссылок на которые стоит смотреть во время выполнения задания:**

1. Настоятельно рекомендуется прочесть эту страничку: [Part 3: Intro to Policy Optimization](https://spinningup.openai.com/en/latest/spinningup/rl_intro3.html)   
2. Краткое введение в PPO (прочесть обязательно): [Proximal Policy Optimization](https://spinningup.openai.com/en/latest/algorithms/ppo.html)

3. Набор советов по тому, как обычно имплементируется PPO (без страшной теории, но читать лучше последним): [The 37 Implementation Details of Proximal Policy Optimization](https://ppo-details.cleanrl.dev//2021/11/05/ppo-implementation-details/)

In [8]:
!pip install gymnasium imageio tqdm torch

Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch)
  Downloading nvidia_cublas_cu12-12.4.5.8-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cufft-cu12==11.2.1.3 (from torch)
  Downloading nvidia_cufft_cu12-11.2.1.3-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-curand-cu12==10.3.5.147 (from torch)
  Downloading nvidia_curand_cu12-10.3.5

In [9]:
import os
import random
import torch
import torch.nn as nn
import torch.nn.functional as F

import imageio
import numpy as np
import gymnasium as gym

from gymnasium.wrappers import ClipAction

from torch.optim import Adam
from torch.optim.lr_scheduler import LinearLR
from torch.distributions import Normal

from tqdm.auto import trange, tqdm
from typing import Tuple


DEVICE = "cuda" if torch.cuda.is_available() else "cpu"


def set_seed(seed):
    os.environ["PYTHONHASHSEED"] = str(seed)
    np.random.seed(seed)
    random.seed(seed)
    torch.manual_seed(seed)

In [13]:
!pip install "gymnasium[mujoco]"

Collecting mujoco>=2.1.5 (from gymnasium[mujoco])
  Downloading mujoco-3.3.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (44 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.4/44.4 kB[0m [31m3.5 MB/s[0m eta [36m0:00:00[0m
Collecting glfw (from mujoco>=2.1.5->gymnasium[mujoco])
  Downloading glfw-2.9.0-py2.py27.py3.py30.py31.py32.py33.py34.py35.py36.py37.py38.p39.p310.p311.p312.p313-none-manylinux_2_28_x86_64.whl.metadata (5.4 kB)
Downloading mujoco-3.3.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (6.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.6/6.6 MB[0m [31m83.1 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading glfw-2.9.0-py2.py27.py3.py30.py31.py32.py33.py34.py35.py36.py37.py38.p39.p310.p311.p312.p313-none-manylinux_2_28_x86_64.whl (243 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m243.5/243.5 kB[0m [31m22.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packag



---



### Среда

Решать будем среду [HalfCheetah-v4](https://gymnasium.farama.org/environments/mujoco/half_cheetah/). Выглядит примерно так:

![halfcheetah](https://gymnasium.farama.org/_images/half_cheetah.gif)

Наша задача заставить его бежать вперед настолько быстро, насколько это возможно. Среда предполагает continuous действия, размерности 6. Состояния представляют собой вектора размерности 17. Это будет выходными и входными размерностами вашего актора соответственно. Подробнее почитать, за что отвечают каждые размерности можно в документации, но на самом деле это не очень важно (агент сам выучит все что ему надо)

In [10]:
ENV_NAME = "HalfCheetah-v4"

In [14]:
gym.make(ENV_NAME).observation_space

Box(-inf, inf, (17,), float64)

In [15]:
gym.make(ENV_NAME).action_space

Box(-1.0, 1.0, (6,), float32)

Начнем с того, что определимся с тем, как нам подготовить среду для обучения. Агент будет выдавать нормальное распределение для действий, поэтому нам нужно будет клипать возможные выбросы по значениям [`ClipAction`]. Можно делать TanhNormal, но часто хватает простого клиппинга без усложнений.

In [16]:
def make_env(env_name: str) -> gym.Env:
    """
    Создает и оборачивает среду, применяя нормализацию действий.

    Args:
        env_name (str): Название среды Gym для создания.

    Returns:
        gym.Env: Инициализированная среда с примененным обертыванием ClipAction.
    """

    env = gym.make(env_name)
    env = ClipAction(env)
    return env

In [17]:
make_env(ENV_NAME)

<ClipAction<TimeLimit<OrderEnforcing<PassiveEnvChecker<HalfCheetahEnv<HalfCheetah-v4>>>>>>



---



### Актор

В данном случае актор - это сущность которая взаимодействуе со средой. Он должен предсказывать распределение действий $\pi(a | s)$, а также будущую среднюю награду из этого состояния - $V(s)$.

Таким образом, мы ожидаем, что актор должен уметь:

1. Предсказывать распределение действий. Не забудьте что $\sigma$ должна быть неотрицательной. Этого можно достичь если обучать $\log \sigma$ и в нужный момент делать `torch.exp(log_sigma)`

2. Предсказывать награду

Нам еще понадобится метод для сэмплирования действия. Во время обучения он не используется, но полезен для оценки агента и будущего инференса.

In [43]:
class Actor(nn.Module):
    def __init__(self, obs_dim: int, action_dim: int, hidden_dim: int = 32):
        super().__init__()
        """
        Args:
            obs_dim (int): Размерность пространства наблюдений.
            action_dim (int): Размерность пространства действий.
            hidden_dim (int, optional): Количество нейронов в скрытых слоях.
        """
        self.actor_net = nn.Sequential(
            nn.Linear(obs_dim, hidden_dim),
            nn.Tanh(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.Tanh(),
            nn.Linear(hidden_dim, action_dim)
        )

        self.log_std = nn.Parameter(torch.zeros(action_dim))
        self.critic_net = nn.Sequential(
            nn.Linear(obs_dim, hidden_dim),
            nn.Tanh(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.Tanh(),
            nn.Linear(hidden_dim, 1)
        )


    def get_action_dist(self, obs: torch.Tensor) -> Normal:
        """
        Возвращает распределение действий для заданного состояния.

        Args:
            obs (torch.Tensor): Тензор наблюдений.

        Returns:
            Normal: Распределение действий.
        """
        mean = self.actor_net(obs)
        std = torch.exp(self.log_std)
        return Normal(mean, std)


    def get_value(self, obs: torch.Tensor) -> torch.Tensor:
        """
        Возвращает оценку ценности заданного состояния.

        Args:
            obs (torch.Tensor): Тензор наблюдений.

        Returns:
            torch.Tensor: Оценка значения состояния.
        """
        return self.critic_net(obs).squeeze(-1)


    def forward(self, obs: torch.Tensor) -> Tuple[Normal, torch.Tensor]:
        """
        Прямой проход.

        Args:
            obs (torch.Tensor): Тензор наблюдений.

        Returns:
            Tuple[Normal, torch.Tensor]:
                - Распределение действий (Normal).
                - Оценка значения состояния (torch.Tensor).
        """
        dist = self.get_action_dist(obs)
        value = self.get_value(obs)
        return dist, value


    def get_action(self, obs: torch.Tensor,
                   greedy: bool = False) -> torch.Tensor:
        """
        Возвращает действие на основе текущей политики.

        Args:
            obs (torch.Tensor): Тензор наблюдений, размерности.
            greedy (bool, optional): Флаг, указывающий на использование жадной стратегии.

        Returns:
            torch.Tensor: Выбранное действие.
        """
        dist = self.get_action_dist(obs)
        if greedy:
            return dist.mean
        else:
            return dist.sample()

In [44]:
# sanity checks
test_actor_ = Actor(obs_dim=17, action_dim=6, hidden_dim=32)
test_obs_ = torch.randn(5, 17)

assert test_actor_.get_action_dist(test_obs_).loc.shape == (5, 6)
assert test_actor_.get_action_dist(test_obs_).scale.shape == (5, 6)
assert test_actor_.get_action_dist(test_obs_).sample().shape == (5, 6)

assert test_actor_.get_value(test_obs_).shape == (5,)
assert len(test_actor_(test_obs_)) == 2

assert torch.isclose(
    test_actor_.get_action(test_obs_, greedy=True),
    test_actor_.get_action(test_obs_, greedy=True)
).sum()

assert not torch.isclose(
    test_actor_.get_action(test_obs_, greedy=False),
    test_actor_.get_action(test_obs_, greedy=False)
).sum()



---



### Награда


Для PPO нам нужно уметь оценивать returns траектории $R(\tau)$, что на самом деле то же самое, что $Q(s, a)$ - будущая награда в состоянии $s_{t}$ при совершении действия $a_{t}$. Существует несколько способов это посчитать, самый простой и наивный, воспользоваться следующим свойством: $$Q(s, a) = r_t + \gamma V(s_{t + 1})$$



Важно также помнить, что в случае когда среда завершилась (`done`), будущая награда равна 0, поэтому нужно это учитывать во всех подсчетах. Это легко сделать домножив будущую награду на (`1 - done`).



Т.к. дальше мы будем работать с векторизованными средами, функции должны работать сразу с батчами:

  

*   **rewards**: `[num_env_steps, batch_size]`   
*   **values**: `[num_env_steps + 1, batch_size]`
*   **dones**: `[num_env_steps, batch_size]`


      


In [20]:
def one_step_returns(rewards: torch.Tensor,
                     values: torch.Tensor,
                     dones: torch.Tensor,
                     gamma: float = 0.99) -> torch.Tensor:
    """
    Вычисляет one-step returns.

    Args:
        rewards (torch.Tensor): Тензор наград.
        values (torch.Tensor): Тензор оценок значений состояний.
        dones (torch.Tensor): Бинарные флаги окончания эпизода.
        gamma (float, optional): Дисконтный множитель.

    Returns:
        torch.Tensor: One-step returns.
    """

    assert len(rewards) + 1 == len(values), "values should contain 1 more estimate for the final state"

    # считаем r + V(s_next) для всех примеров сразу
    returns = rewards + (1 - dones) * gamma * values[1:]

    return returns

Может возникнуть вопрос: *откуда мы возьмем $V(s)$?*

Его будем предсказывать с помощью агента во время сбора данных и сохранять. Нюанс в том, что в начале обучения $V(s)$ будет далека от реальной value функции агента, но со временем сойдется. Тем не менее, тот variance, что возникнет из-за неточных предсказаний, может привести к тому, что агент разойдется с самого начала. Поэтому лучше считать награду по всей траектории явно, а вот последнее состояние (если траектория частичная) оценивать с помощью $V(s_{last})$. Как раз поэтому $V(\cdot)$ содержит в себе на один больше шагов. Это пригодится для оценки последнего состояния (мы должны знать для него следующую оценку).


Вам нужно посчитать: $$\sum_{i=t}^{T-1} \gamma^{i - t} r_i + \gamma^{T - t}V(s_T)$$

То есть с каждого шага $t$ посчитать будущую дисконтированную награду до конца эпизода (или до конца собранного промежутка, а в конце оценить с помощью $V$).

Это можно сделать наивно за $O(n^2)$, но подумайте как сделать это **за один проход**. Не забывайте учитывать `done` т.к. он и определяет конец эпизода.


*Подсказка: попробуйте считать с конца, рекуррентно*.

In [21]:
def multi_step_returns(rewards: torch.Tensor,
                       values: torch.Tensor,
                       dones: torch.Tensor,
                       gamma: float = 0.99) -> torch.Tensor:
    """
    Вычисляет multi-step returns.

    Args:
        rewards (torch.Tensor): Тензор наград [T, B].
        values (torch.Tensor): Тензор оценок состояний [T+1, B].
        dones (torch.Tensor): Тензор флагов завершения эпизодов [T, B].
        gamma (float): Дисконтирующий множитель.

    Returns:
        torch.Tensor: Тензор returns [T, B].
    """
    T = rewards.shape[0]
    returns = torch.zeros_like(rewards)

    next_return = values[-1]

    for t in reversed(range(T)):
        next_return = rewards[t] + gamma * next_return * (1 - dones[t])
        returns[t] = next_return

    return returns


In [22]:
# simple determenistic checks
test_rewards_ = torch.tensor([0.1, 0.2, -0.01, 0.5, 4.0, -1.0])
test_values_ = torch.tensor([0.1, 0.3, 0.15, 0.3, 0.4, 0.6, 0.5])
test_dones_ = torch.tensor([0, 0, 0, 1, 0, 0])

test_answer_ = torch.tensor([ 0.7733,  0.6802,  0.4850,  0.5000,  3.5000, -0.5050])

assert torch.all(torch.isclose(
    multi_step_returns(test_rewards_, test_values_, test_dones_).round(decimals=4),
    test_answer_
))



---



### Сбор данных

Как мы обсуждали в части про `StableBaselines3`, собирать траектории последовательно не оптимально, гораздо быстрее делать это одновременно, собирая все состояния в один батч для агента и получать все действия на этом шаге за раз.

Сделать это достаточно просто (нам хватит наивной реализации, которая последовательно делает step в N средах):

In [23]:
from gymnasium.vector import SyncVectorEnv

vec_env = SyncVectorEnv(
    [lambda: make_env(ENV_NAME) for _ in range(10)]
)

In [24]:
vec_env.observation_space

Box(-inf, inf, (10, 17), float64)

In [25]:
vec_env.action_space

Box(-inf, inf, (10, 6), float32)

In [26]:
vec_env.reset()[0].shape

(10, 17)

Как можно заметить, теперь появилась дополнительная размерность размера 10 (количество сред). Схематически обучение в таком случае выглядит так:

```python
envs = VecEnv(num_envs=N)
actor = Actor()
data = []

obs = envs.reset()
for update in range(1, total_timesteps // (N*M)):
  # ROLLOUT PHASE
  for step in range(0, M):
      action, action_logprob, value = actor.get_action_and_logprob_and_value(obs)
      next_obs, reward, done, info = envs.step(action) # step in N environments
      data.append([obs, action, reward, done, value, action_logprob, some_other_stuff]) # store data
      
      obs = next_obs

  # LEARNING PHASE
  for epoch in range(num_epochs):
      batch = sample(data)
      ppo_loss(actor, batch) # len(data) = N*M
      # step optimizer here
```

Плюсы такой имплементации в более быстром сборе данных, а также в их разнообразии, т.к. мы одновременно получаем опыт в средах с разной инициализацией.



---



### PPO Loss

Вначале напишем PPO лосс, после чего сам train loop. Лосс PPO состоит из трех частей: лосса актора, лосса критика и обычно добавлют энтропийный лосс, чтобы поощрять exploration (чтобы энтропия распределения действий не падала)

Лосс критика представляет собой обычный MSE лосс между предсказаниями текущего критика и таргетом - настоящими наградами, их мы уже научились считать выше.

In [27]:
def critic_loss(target_values: torch.Tensor,
                pred_values: torch.Tensor) -> torch.Tensor:
    """
    Вычисляет loss для критика.

    Args:
        target_values (torch.Tensor): Целевые значения, вычисленные с помощью multi-step returns.
        pred_values (torch.Tensor): Предсказания критика для соответствующих состояний.

    Returns:
        torch.Tensor: Значение loss для критика.
    """
    return torch.nn.functional.mse_loss(pred_values, target_values)

Лосс актора в точности следует формуле, которую вы можете найти (и лучше прочитать интерпретацию) [тут](https://spinningup.openai.com/en/latest/spinningup/rl_intro3.html)

In [28]:
def actor_loss(new_log_probs: torch.Tensor,
               old_log_probs: torch.Tensor,
               advantages: torch.Tensor,
               eps: float = 0.2) -> torch.Tensor:
    """
    Вычисляет loss для актора.

    Args:
        new_log_probs (torch.Tensor): Логарифм вероятностей действий согласно текущей политики.
        old_log_probs (torch.Tensor): Логарифм вероятностей действий согласно политике, которой собирались данные.
        advantages (torch.Tensor): Оценки A(s, a) для пар состояние-действие.
        eps (float, optional): Параметр epsilon из PPO (см. Open AI Spinning Up про PPO).

    Returns:
        torch.Tensor: Значение loss для актора.
    """
    ratio = torch.exp(new_log_probs - old_log_probs)
    unclipped = ratio * advantages
    clipped = torch.clamp(ratio, 1 - eps, 1 + eps) * advantages
    loss = -torch.mean(torch.min(unclipped, clipped))

    return loss

In [29]:
def entropy_loss(action_dist: Normal) -> torch.Tensor:
    """
    Вычисляет loss на основе энтропии распределения действий.

    Args:
        action_dist (Normal): Распределение действий, параметризованное актором.

    Returns:
        torch.Tensor: Энтропия распределения действий.
    """
    entropy = action_dist.entropy()
    return -entropy.mean()



---



### Оценка

Немного кода для удобства оценки и визуализации траекторий

In [30]:
@torch.no_grad()
def rollout(env: gym.Env, actor: Actor, seed: int, device: str = "cpu") -> float:
    """
    Выполняет один rollout в среде с использованием текущей политики актора.

    Args:
        env (gym.Env): Среда Gym, в которой происходит rollout.
        actor (Actor): Политика, используемая для выбора действий.
        seed (int): Сид для инициализации среды.
        device (str, optional): Устройство, на котором будут выполняться вычисления.

    Returns:
        float: Суммарная награда, полученная за весь эпизод.
    """

    obs, done = env.reset(seed=seed)[0], False
    total_reward = 0.0

    while not done:
        obs = torch.tensor(obs, dtype=torch.float, device=device).reshape(1, -1)
        action = actor.get_action(obs, greedy=True)

        obs, reward, terminated, truncated, info = env.step(action.flatten().cpu().numpy())
        done = terminated or truncated
        total_reward += reward

    return total_reward


Хорошим тоном считается фиксировать сиды среды для оценки, чтобы можно было сравнивать разные гипепараметры

In [31]:
@torch.no_grad()
def evaluate(actor: Actor,
             num_episodes: int,
             seed: int, device: str = "cpu") -> np.ndarray:
    """
    Оценивает производительность актора, выполняя несколько эпизодов в среде.

    Args:
        actor (Actor): Оцениваемая политика.
        num_episodes (int): Количество эпизодов для оценки.
        seed (int): Базовый сид для инициализации среды (для воспроизводимости).
        device (str, optional): Устройство, на котором будут выполняться вычисления.

    Returns:
        np.ndarray: Массив формы со средними наградами за каждый эпизод.
    """
    env = gym.make(ENV_NAME)

    returns = np.zeros(num_episodes)
    for e in range(num_episodes):
        returns[e] = rollout(env, actor, seed=seed + e, device=device)

    return returns



---



### Trainer

Напишем игрушечный трейнер. Вначале внимательно прочитайте весь код, чтобы понять глобально, что происходит. Ниже также можете посмотреть пример использования.

In [50]:
def compute_gae(rewards, values, dones, gamma=0.99, gae_lambda=0.95):
    num_steps, num_envs = rewards.shape
    advantages = torch.zeros_like(rewards)
    last_gae = 0

    for t in reversed(range(num_steps)):
        mask = 1.0 - dones[t]
        delta = rewards[t] + gamma * values[t + 1] * mask - values[t]
        advantages[t] = last_gae = delta + gamma * gae_lambda * mask * last_gae

    returns = advantages + values[:-1]
    return returns, advantages


class PPOTrainer:
    def __init__(
            self,
            total_timesteps: int = 1_000_000,
            num_vec_envs: int = 8,
            num_env_steps: int = 256,
            num_update_epochs: int = 5,
            batch_size: int = 64,
            gamma: float = 0.99,
            eps: float = 0.2,
    ):
        self.train_env = SyncVectorEnv(
            [lambda: make_env(ENV_NAME) for _ in range(num_vec_envs)]
        )
        self.total_updates = round(total_timesteps / (num_env_steps * num_vec_envs))

        # set up Actor and optimizer
        self.actor = Actor(
            obs_dim=self.train_env.single_observation_space.shape[-1],
            action_dim=self.train_env.single_action_space.shape[-1],
            hidden_dim=64
        ).to(DEVICE)
        self.optim = Adam(self.actor.parameters(), lr=3e-4, eps=1e-5)
        self.lr_scheduler = LinearLR(
            self.optim, start_factor=1.0, end_factor=0.0, total_iters=self.total_updates
        )
        self.eps = eps
        self.gamma = gamma
        self.num_env_steps = num_env_steps
        self.num_vec_envs = num_vec_envs
        self.num_update_epochs = num_update_epochs
        self.batch_size = batch_size


    def update_epochs(self, observations, actions, log_probs, returns, advantages):
        all_idxs = np.arange(self.num_vec_envs * self.num_env_steps)

        for epoch in range(self.num_update_epochs):
            np.random.shuffle(all_idxs)
            for start in range(0, len(all_idxs), self.batch_size):
                batch_idxs = all_idxs[start:start + self.batch_size]

                batch_obs = observations[batch_idxs]
                batch_actions = actions[batch_idxs]
                batch_old_log_probs = log_probs[batch_idxs]
                batch_returns = returns[batch_idxs]
                batch_advantages = advantages[batch_idxs]

                action_dist = self.actor.get_action_distribution(batch_obs)
                new_log_probs = action_dist.log_prob(batch_actions).sum(-1)
                entropy_l = entropy_loss(action_dist)
                value_preds = self.actor.get_value(batch_obs).squeeze()

                actor_l = actor_loss(new_log_probs, batch_old_log_probs, batch_advantages, eps=self.eps)
                critic_l = critic_loss(batch_returns, value_preds)

                loss = actor_l + 0.5 * critic_l + 0.001 * entropy_l

                self.optim.zero_grad()
                loss.backward()
                nn.utils.clip_grad_norm_(self.actor.parameters(), 0.5)
                self.optim.step()

        self.lr_scheduler.step()


    def learn(self):
        obs_shape = self.train_env.single_observation_space.shape[-1]
        action_shape = self.train_env.single_action_space.shape[-1]

        observations = torch.zeros((self.num_env_steps, self.num_vec_envs, obs_shape), device=DEVICE)
        actions = torch.zeros((self.num_env_steps, self.num_vec_envs, action_shape), device=DEVICE)
        log_probs = torch.zeros(self.num_env_steps, self.num_vec_envs, device=DEVICE)
        rewards = torch.zeros(self.num_env_steps, self.num_vec_envs, device=DEVICE)

        values = torch.zeros(self.num_env_steps + 1, self.num_vec_envs, device=DEVICE)
        dones = torch.zeros(self.num_env_steps, self.num_vec_envs, device=DEVICE)

        obs = torch.tensor(self.train_env.reset(seed=10)[0], dtype=torch.float, device=DEVICE)
        for update in trange(self.total_updates, desc="Updates"):
            for step in range(self.num_env_steps):
                with torch.no_grad():
                    action_dist = self.actor.get_action_distribution(obs)
                    action = action_dist.sample()
                    log_prob = action_dist.log_prob(action).sum(-1)
                    value = self.actor.get_value(obs)

                next_obs, reward, terminated, truncated, infos = self.train_env.step(action.cpu().numpy())

                observations[step] = obs.to(DEVICE)
                actions[step] = action.to(DEVICE)
                log_probs[step] = log_prob.to(DEVICE)
                rewards[step] = torch.tensor(reward, device=DEVICE)
                values[step] = value.squeeze().to(DEVICE)
                dones[step] = torch.tensor(terminated, device=DEVICE)

                obs = torch.tensor(next_obs, dtype=torch.float, device=DEVICE)
            else:
                values[-1] = self.actor.get_value(obs).squeeze()

            returns, advantages = compute_gae(
                rewards, values, dones, gamma=self.gamma, gae_lambda=0.95
            )

            advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)

            self.update_epochs(
                observations.flatten(0, 1),
                actions.flatten(0, 1),
                log_probs.flatten(),
                returns.flatten(),
                advantages.flatten()
            )

            if update % 50 == 0:
                eval_returns = evaluate(self.actor, 10, seed=42, device=DEVICE)
                tqdm.write(f"Mean return: {eval_returns.mean()}")

        return self.actor

In [55]:
class PPOTrainer:
    def __init__(
            self,
            total_timesteps: int = 1_000_000,
            num_vec_envs: int = 8,
            num_env_steps: int = 256,
            num_update_epochs: int = 5,
            batch_size: int = 64,
            gamma: float = 0.99,
            eps: float = 0.2,
    ):
        self.train_env = SyncVectorEnv(
            [lambda: make_env(ENV_NAME) for _ in range(num_vec_envs)]
        )
        self.total_updates = round(total_timesteps / (num_env_steps * num_vec_envs))

        self.actor = Actor(
            obs_dim=self.train_env.single_observation_space.shape[-1],
            action_dim=self.train_env.single_action_space.shape[-1],
            hidden_dim=64
        ).to(DEVICE)

        self.optim = Adam(self.actor.parameters(), lr=3e-4, eps=1e-5)
        self.lr_scheduler = LinearLR(
            self.optim, start_factor=1.0, end_factor=0.0, total_iters=self.total_updates
        )
        self.eps = eps
        self.gamma = gamma
        self.num_env_steps = num_env_steps
        self.num_vec_envs = num_vec_envs
        self.num_update_epochs = num_update_epochs
        self.batch_size = batch_size


    def update_epochs(self, observations, actions, log_probs, returns, advantages):
        all_idxs = np.arange(self.num_vec_envs * self.num_env_steps)

        for epoch in range(self.num_update_epochs):
            np.random.shuffle(all_idxs)
            for start in range(0, len(all_idxs), self.batch_size):
                batch_idxs = all_idxs[start:start + self.batch_size]

                batch_obs = observations[batch_idxs]
                batch_actions = actions[batch_idxs]
                batch_old_log_probs = log_probs[batch_idxs]
                batch_returns = returns[batch_idxs]
                batch_advantages = advantages[batch_idxs]

                action_dist, _ = self.actor(batch_obs)
                new_log_probs = action_dist.log_prob(batch_actions).sum(-1)
                entropy_l = entropy_loss(action_dist)
                value_preds = self.actor.get_value(batch_obs).squeeze()

                actor_l = actor_loss(new_log_probs, batch_old_log_probs, batch_advantages, eps=self.eps)
                critic_l = critic_loss(batch_returns, value_preds)

                loss = actor_l + 0.5 * critic_l + 0.001 * entropy_l

                self.optim.zero_grad()
                loss.backward()
                nn.utils.clip_grad_norm_(self.actor.parameters(), 0.5)
                self.optim.step()

        self.lr_scheduler.step()


    def learn(self):
        obs_shape = self.train_env.single_observation_space.shape[-1]
        action_shape = self.train_env.single_action_space.shape[-1]

        observations = torch.zeros((self.num_env_steps, self.num_vec_envs, obs_shape), device=DEVICE)
        actions = torch.zeros((self.num_env_steps, self.num_vec_envs, action_shape), device=DEVICE)
        log_probs = torch.zeros(self.num_env_steps, self.num_vec_envs, device=DEVICE)
        rewards = torch.zeros(self.num_env_steps, self.num_vec_envs, device=DEVICE)

        values = torch.zeros(self.num_env_steps + 1, self.num_vec_envs, device=DEVICE)
        dones = torch.zeros(self.num_env_steps, self.num_vec_envs, device=DEVICE)

        obs = torch.tensor(self.train_env.reset(seed=10)[0], dtype=torch.float, device=DEVICE)

        for update in trange(self.total_updates, desc="Updates"):
            for step in range(self.num_env_steps):
                with torch.no_grad():
                    action_dist, _ = self.actor(obs)
                    action = action_dist.sample()
                    log_prob = action_dist.log_prob(action).sum(-1)
                    value = self.actor.get_value(obs)

                next_obs, reward, terminated, truncated, infos = self.train_env.step(action.cpu().numpy())

                observations[step] = obs
                actions[step] = action
                log_probs[step] = log_prob
                rewards[step] = torch.tensor(reward, device=DEVICE)
                values[step] = value.squeeze()
                dones[step] = torch.tensor(terminated, device=DEVICE)

                obs = torch.tensor(next_obs, dtype=torch.float, device=DEVICE)
            else:
                values[-1] = self.actor.get_value(obs).squeeze()

            returns, advantages = compute_gae(
                rewards, values, dones, gamma=self.gamma, gae_lambda=0.95
            )

            advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)

            self.update_epochs(
                observations.flatten(0, 1).clone().detach(),
                actions.flatten(0, 1).clone().detach(),
                log_probs.flatten().clone().detach(),
                returns.flatten().clone().detach(),
                advantages.flatten().clone().detach()

            )

            if update % 50 == 0:
                eval_returns = evaluate(self.actor, 10, seed=42, device=DEVICE)
                tqdm.write(f"Mean return: {eval_returns.mean()}")

        return self.actor




---



### Обучение и визуализация

Попробуем обучить агента на 5М транзиций. Я постаралась зафиксировать сиды, так чтобы при каждом запуске с идентичными параметрами получался тот же результат. Хорошим результатом считается 10к+ награды, но т.к. полученная имплементация во многих смыслах все еще игрушечная, нам будет достаточно 3к

При следующих параметрах правильная имплементация набирает 4к+ награды. Вы можете поэксперементировать и попробовать свои гиперпараметры (включая те, что я захардкодила в трейнере). Учтите, что RL алгоритмы гораздо менее стабильны, чем в среднем в DL, и при неправильных параметрах даже правильная имплементация может не обучаться ;)))

In [56]:
set_seed(42)
trainer = PPOTrainer(
    total_timesteps=5_000_000,
    batch_size=64,
    num_update_epochs=3,
    num_env_steps=1000
)
trained_actor = trainer.learn()

Updates:   0%|          | 0/625 [00:00<?, ?it/s]

Mean return: 52.335170248842346
Mean return: 650.2495230830858
Mean return: 1896.0188906281414
Mean return: 2251.9263317587183
Mean return: 2719.7345617423557
Mean return: 3123.309076321365
Mean return: 3322.208690788895
Mean return: 3344.668231542447
Mean return: 3447.354252179595
Mean return: 3555.764373885058
Mean return: 3656.5754626780863
Mean return: 3753.588845542085
Mean return: 3771.6704986873024


После обучения иногда интересно посмотреть визуально, что агент выучил. Для этого в gym можно указать `render_mode` при инициализации среды. Там есть несколько опций, нам пригодится та, что возвращает в конце список картинок в numpy. Дальше мы можем их зарендерить и отразить с помощью `imageio`.

In [None]:
eval_env = gym.make(ENV_NAME, render_mode="rgb_array_list")

rollout(eval_env, trained_actor, seed=32)
imageio.mimsave("rollout.mp4", eval_env.render(), fps=16, format="mp4")

В идеале он должен быстро бежать

*(не обязательно на ногах, иногда может и на спине...)*

In [None]:
from IPython.display import Video

Video("rollout.mp4")

In [None]:
!rm rollout.mp4



---



# Домашнее задание № 14

Выполните все задания в этом ноутбуке

+ Мягкий дедлайн: `27.05.25 23:59`
+ Жесткий дедлайн: `03.06.25 23:59` (половина баллов)


После жесткого дедлайна задание не принимается.