# DQN (5 баллов)

Метод обучения DQN — это нейросетевая адаптация алгоритма Q-learning. Также для него разработан набор дополнений, которые становятся актуальными при переходе к обучению глубоких нейронных сетей и решению более сложных задач (то есть задач с бОльшим пространством состояний).

Реализуем алгоритм DQN для решения среды [CartPole](https://gymnasium.farama.org/environments/classic_control/cart_pole/), цель которой балансировать палочкой в вертикальном положении, управляя только тележкой, к которой она прикреплена. Будем использовать библиотеку PyTorch для обучения нейронной сети, аппроксимирующей Q-функцию (но вы можете воспользоваться и любой другой библиотекой для обучения глубоких сетей, таких как TensorFlow или Jax).

![cartpole](https://gymnasium.farama.org/_images/cart_pole.gif)

![cartpole](https://www.researchgate.net/publication/362568623/figure/fig5/AS:1187029731807278@1660021350587/Screen-capture-of-the-OpenAI-Gym-CartPole-problem-with-annotations-showing-the-cart.png)

In [27]:
import abc
import base64
import io
import math
import os
import random
import time
from dataclasses import dataclass

import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt
import pygame
from gymnasium import spaces
from gymnasium.envs.registration import WrapperSpec
%matplotlib inline

In [28]:
np.random.seed(42)

### Пространство действий

Действие представляет собой `ndarray` формы `(1,)`, принимающее значения из множества `{0, 1}`, указывающие направление постоянной силы, приложенной к тележке:

- **0**: толкнуть тележку влево  
- **1**: толкнуть тележку вправо  

**Примечание:** величина изменения скорости тележки не фиксирована и зависит от угла наклона шеста. Положение центра тяжести шеста влияет на количество энергии, требующейся для перемещения тележки под ним.

---

### Пространство наблюдений

Наблюдение — это `ndarray` формы `(4,)`, элементы которого соответствуют следующим величинам:

| №  | Наблюдение              | Минимум            | Максимум           |
|----|-------------------------|--------------------|--------------------|
| 0  | Положение тележки       | -4.8               | 4.8                |
| 1  | Скорость тележки        | -∞                 | +∞                 |
| 2  | Угол отклонения шеста   | ≈ -0.418 рад (−24°) | ≈ 0.418 рад (+24°) |
| 3  | Угловая скорость шеста  | -∞                 | +∞                 |

**Примечание:** приведённые выше диапазоны — это возможные значения в пространстве наблюдений, но **эпизод** закончится при выходе за более жёсткие границы:

- Положение тележки (индекс 0) может лежать в `(-4.8, 4.8)`, но эпизод прерывается, если тележка уходит за `(-2.4, 2.4)`.  
- Угол шеста может наблюдаться в `(-0.418, 0.418)` рад (±24°), но эпизод прерывается, если угол выходит за `(-0.2095, 0.2095)` рад (±12°).

---

### Награды

За каждый шаг (включая последний) начисляется **+1** балл. Максимальная длина эпизода — **500** шагов для версии **v1** и **200** шагов для версии **v0**.

---

### Начальное состояние

В начале эпизода все четыре компоненты наблюдения равномерно случайны в диапазоне **(-0.05, 0.05)**.

---

### Завершение эпизода

Эпизод заканчивается при выполнении любого из условий:

1. **Терминация:** угол отклонения шеста превышает ±12°.  
2. **Терминация:** положение тележки выходит за ±2.4 (центр тележки достигает края экрана).  
3. **Ограничение по длине:** число шагов превышает 500 (200 для **v0**).

In [29]:
env = gym.make("CartPole-v1", max_episode_steps=1000)
env.reset()

# Выведем информацию о пространствах состояний и действий
print(f'{env.observation_space=}')
print(f'{env.action_space=}')

n_actions = env.action_space.n
state_dim = env.observation_space.shape
print(f'Action_space: {n_actions} | State_space: {state_dim}')

env.observation_space=Box([-4.8               -inf -0.41887903        -inf], [4.8               inf 0.41887903        inf], (4,), float32)
env.action_space=Discrete(2)
Action_space: 2 | State_space: (4,)


Т.к. описание состояния в задаче с маятником представляет собой не "сырые" признаки, а уже предобработанные (координаты, углы), нам не нужна для начала сложная архитектура, начнем с такой:
<img src="https://raw.githubusercontent.com/Tviskaron/mipt/master/2020/RL/figures/DQN.svg">

Будем использовать только полносвязные слои (``torch.nn.Linear``) и простые активационные функции (``torch.nn.ReLU``). Сигмоиды и другие похожие функции активации могут плохо работать с ненормализованными входными данными.

Будем приближать Q-функцию агента, минимизируя среднеквадратичную TD-ошибку:
$$
\delta = Q_{\theta}(s, a) - [r(s, a) + \gamma \cdot max_{a'} Q_{-}(s', a')]
$$
$$
L = \frac{1}{N} \sum_i \delta_i^2,
$$
где
* $s, a, r, s'$ состояние, действие, вознаграждение и следующее состояние
* $\gamma$ дисконтирующий множитель.

Основная тонкость состоит в использовании $Q_{-}(s',a')$. Это та же самая функция, что и $Q_{\theta}$, которая является выходом нейронной сети, но при обучении сети, мы не пропускаем через эти слои градиенты. В научных статьях можно обнаружить следующее обозначение для остановки градиента: $SG(\cdot)$. В PyTorch есть метод `.detach()` класса `Tensor`, который возвращает тензор с выключенными градиентами, а также контекстный менеджер `with torch.no_grad()`, который задает контекст с вычислениями, для которых не вычисляется градиент.

In [30]:
import torch
import torch.nn as nn

def create_network(input_dim, hidden_dims, output_dim):
    layers: list[nn.Module] = []
    prev_dim = input_dim

    for h in hidden_dims:
        layers.append(nn.Linear(prev_dim, h))
        layers.append(nn.ReLU())
        prev_dim = h

    layers.append(nn.Linear(prev_dim, output_dim))

    network = nn.Sequential(*layers)
    return network

Добавим $\epsilon$-жадный выбор действий:

In [31]:
def select_action_eps_greedy(Q, state, epsilon):
    """Выбирает действие epsilon-жадно."""
    if not isinstance(state, torch.Tensor):
        state = torch.tensor(state, dtype=torch.float32)
    Q_s = Q(state).detach().numpy()

    if np.random.rand() < epsilon:
        action = np.random.randint(Q_s.shape[0])
    else:
        action = np.argmax(Q_s)

    action = int(action)
    return action


Q = create_network(
    input_dim=np.prod(state_dim), hidden_dims=[64, 64], output_dim=n_actions
)
select_action_eps_greedy(Q, env.reset()[0].flatten(), epsilon=0.1)

0

In [32]:
def to_tensor(x, dtype=np.float32):
    if isinstance(x, torch.Tensor):
        return x
    x = np.asarray(x, dtype=dtype)
    x = torch.from_numpy(x)
    return x

def compute_td_target(
        Q, rewards, next_states, terminateds, gamma=0.99, check_shapes=True,
):
    """ Считает TD-target."""

    r = to_tensor(rewards)
    s_next = to_tensor(next_states)
    term = to_tensor(terminateds, bool)

    with torch.no_grad():
        Q_sn = Q(s_next)
        V_sn = torch.max(Q_sn, dim=1)[0]

    target = r + gamma * V_sn * (~term)


    assert V_sn.dtype == torch.float32

    if check_shapes:
        assert Q_sn.data.dim() == 2, \
            "убедитесь, что вы предсказали q-значения для всех действий в следующем состоянии"
        assert V_sn.data.dim() == 1, \
            "убедитесь, что вы вычислили V (s ') как максимум только по оси действий, а не по всем осям"
        assert target.data.dim() == 1, \
            "что-то не так с целевыми q-значениями, они должны быть вектором"

    return target


def compute_td_loss(
        Q, states, actions, td_target, regularizer=.1, out_non_reduced_losses=False
):
    """ Считает TD ошибку."""

    # переводим входные данные в тензоры
    s = to_tensor(states)  # shape: [batch_size, state_size]
    a = to_tensor(actions, int).long()  # shape: [batch_size]
    target = to_tensor(td_target)

    Q_s = Q(s)
    Q_s_a = Q_s.gather(dim=1, index=a.unsqueeze(1)).squeeze(1)
    td_error = Q_s_a - target


    td_losses = td_error ** 2
    loss = torch.mean(td_losses)
    # добавляем L1 регуляризацию на значения Q
    loss += regularizer * torch.abs(Q_s_a).mean()

    if out_non_reduced_losses:
        return loss, td_losses.detach()

    return loss

In [33]:
def eval_dqn(env_name, Q):
    """Оценка качества работы алгоритма на одном эпизоде"""
    env = gym.make(env_name)
    s, _ = env.reset()
    done, ep_return = False, 0.

    while not done:
        # set epsilon = 0 to make an agent act greedy
        a = select_action_eps_greedy(Q, s, epsilon=0.)
        s_next, r, terminated, truncated, _ = env.step(a)
        done = terminated or truncated
        ep_return += r
        s = s_next

        if done:
            break

    return ep_return

In [34]:
from collections import deque

def linear(st, end, duration, t):
    """
    Линейная интерполяция значений в пределах диапазона [st, end],
    используя прогресс по времени t относительно всего отведенного
    времени duration.
    """

    if t >= duration:
        return end
    return st + (end - st) * (t / duration)

def run_dqn(
        env_name="CartPole-v1",
        hidden_dims=(128, 128), lr=1e-3, gamma=0.99,
        eps_st=.4, eps_end=.02, eps_dur=.25, total_max_steps=100_000,
        train_schedule=1, eval_schedule=1000, smooth_ret_window=10, success_ret=200.
):
    env = gym.make(env_name)
    eval_return_history = deque(maxlen=smooth_ret_window)

    Q = create_network(
        input_dim=env.observation_space.shape[0], hidden_dims=hidden_dims, output_dim=env.action_space.n
    )
    opt = torch.optim.Adam(Q.parameters(), lr=lr)

    s, _ = env.reset()
    done = False

    for global_step in range(1, total_max_steps + 1):
        epsilon = linear(eps_st, eps_end, eps_dur * total_max_steps, global_step)

        a = select_action_eps_greedy(Q, s, epsilon=epsilon)
        s_next, r, terminated, truncated, _ = env.step(a)
        done = terminated or truncated

        if global_step % train_schedule == 0:
            opt.zero_grad()
            td_target = compute_td_target(Q, [r], [s_next], [terminated], gamma=gamma)
            loss = compute_td_loss(Q, [s], [a], td_target)
            loss.backward()
            opt.step()

        if global_step % eval_schedule == 0:
            eval_return = eval_dqn(env_name, Q)
            eval_return_history.append(eval_return)
            avg_return = np.mean(eval_return_history)
            print(f'{global_step=} | {avg_return=:.3f} | {epsilon=:.3f}')
            if avg_return >= success_ret:
                print('Решено!')
                break

        s = s_next
        if done:
            s, _ = env.reset()
            done = False

run_dqn(eval_schedule=250)

global_step=250 | avg_return=10.000 | epsilon=0.396
global_step=500 | avg_return=9.500 | epsilon=0.392
global_step=750 | avg_return=15.667 | epsilon=0.389
global_step=1000 | avg_return=15.500 | epsilon=0.385
global_step=1250 | avg_return=15.800 | epsilon=0.381
global_step=1500 | avg_return=16.000 | epsilon=0.377
global_step=1750 | avg_return=15.429 | epsilon=0.373
global_step=2000 | avg_return=19.750 | epsilon=0.370
global_step=2250 | avg_return=19.333 | epsilon=0.366
global_step=2500 | avg_return=20.600 | epsilon=0.362
global_step=2750 | avg_return=22.400 | epsilon=0.358
global_step=3000 | avg_return=26.900 | epsilon=0.354
global_step=3250 | avg_return=30.300 | epsilon=0.351
global_step=3500 | avg_return=30.800 | epsilon=0.347
global_step=3750 | avg_return=32.400 | epsilon=0.343
global_step=4000 | avg_return=42.200 | epsilon=0.339
global_step=4250 | avg_return=46.100 | epsilon=0.335
global_step=4500 | avg_return=44.300 | epsilon=0.332
global_step=4750 | avg_return=45.900 | epsilon=0.3

Комментарии к получаемым результатам:
- `avg_return` - это средняя отдача за эпизод на истории из последних десяти эпизодов. В случае корректной реализации, этот показатель будет низким первые 1000 шагов и только затем будет возрастать и сойдется на 5000-15000 шагах в зависимости от архитектуры сети.
- Если сеть не достигает нужных результатов к концу цикла, попробуйте увеличить число нейронов в скрытом слое или поменяйте начальный $\epsilon$.
- Переменная `epsilon` обеспечивает стремление агента исследовать среду. В данной реализации используется линейное затухание для частоты исследования.

### DQN with Experience Replay

Теперь попробуем добавить поддержку памяти прецедентов (Replay Buffer), которая будет из себя представлять очередь из наборов: $\{(s, a, r, s', 1_\text{terminated})\}$.

Тогда во время обучения каждый новый переход будет добавляться в память, а обучение будет целиком производиться на переходах, просэмплированных из памяти прецедентов.

In [35]:
def sample_batch(replay_buffer, n_samples):
    rng = np.random.default_rng()
    buffer_size = len(replay_buffer)
    replace = buffer_size < n_samples
    indices = rng.choice(buffer_size, size=n_samples, replace=replace)
    batch = [replay_buffer[i] for i in indices]
    states, actions, rewards, next_states, terminateds = zip(*batch)


    return np.array(states), np.array(actions), np.array(rewards), np.array(next_states), np.array(terminateds)

In [36]:
def run_dqn_rb(
        env_name="CartPole-v1",
        hidden_dims=(256, 256), lr=1e-3, gamma=0.99,
        eps_st=.4, eps_end=.02, eps_dur=.25, total_max_steps=100_000,
        train_schedule=4, replay_buffer_size=400, batch_size=32,
        eval_schedule=1000, smooth_ret_window=5, success_ret=200.
):
    env = gym.make(env_name)
    replay_buffer = deque(maxlen=replay_buffer_size)
    eval_return_history = deque(maxlen=smooth_ret_window)

    Q = create_network(
        input_dim=env.observation_space.shape[0], hidden_dims=hidden_dims, output_dim=env.action_space.n
    )
    opt = torch.optim.Adam(Q.parameters(), lr=lr)

    s, _ = env.reset()
    done = False

    for global_step in range(1, total_max_steps + 1):
        epsilon = linear(eps_st, eps_end, eps_dur * total_max_steps, global_step)
        a = select_action_eps_greedy(Q, s, epsilon=epsilon)
        s_next, r, terminated, truncated, _ = env.step(a)

        replay_buffer.append((s, a, r, s_next, terminated))
        done = terminated or truncated

        if global_step % train_schedule == 0:
            train_batch = sample_batch(replay_buffer, batch_size)
            states, actions, rewards, next_states, terminateds = train_batch

            opt.zero_grad()
            td_target = compute_td_target(Q, rewards, next_states, terminateds, gamma=gamma)
            loss = compute_td_loss(Q, states, actions, td_target)
            loss.backward()
            opt.step()

        if global_step % eval_schedule == 0:
            eval_return = eval_dqn(env_name, Q)
            eval_return_history.append(eval_return)
            avg_return = np.mean(eval_return_history)
            print(f'{global_step=} | {avg_return=:.3f} | {epsilon=:.3f}')
            if avg_return >= success_ret:
                print('Решено!')
                break

        s = s_next
        if done:
            s, _ = env.reset()
            done = False

run_dqn_rb(eval_schedule=250)

global_step=250 | avg_return=10.000 | epsilon=0.396
global_step=500 | avg_return=9.500 | epsilon=0.392
global_step=750 | avg_return=16.667 | epsilon=0.389
global_step=1000 | avg_return=22.250 | epsilon=0.385
global_step=1250 | avg_return=24.000 | epsilon=0.381
global_step=1500 | avg_return=28.000 | epsilon=0.377
global_step=1750 | avg_return=35.400 | epsilon=0.373
global_step=2000 | avg_return=45.400 | epsilon=0.370
global_step=2250 | avg_return=61.000 | epsilon=0.366
global_step=2500 | avg_return=77.400 | epsilon=0.362
global_step=2750 | avg_return=84.000 | epsilon=0.358
global_step=3000 | avg_return=106.200 | epsilon=0.354
global_step=3250 | avg_return=110.800 | epsilon=0.351
global_step=3500 | avg_return=152.200 | epsilon=0.347
global_step=3750 | avg_return=148.000 | epsilon=0.343
global_step=4000 | avg_return=190.400 | epsilon=0.339
global_step=4250 | avg_return=194.200 | epsilon=0.335
global_step=4500 | avg_return=207.800 | epsilon=0.332
Решено!


## DQN with Prioritized Experience Replay

Добавим каждому примеру, хранящемуся в памяти, значение приоритета. Приоритет будет влиять на частоту случайного выбора примеров в пакет на обучение. Удачный выбор приоритета позволит повысить эффективность обучения. Популярным вариантом является абсолютное значение TD-ошибки. Таким образом акцент при обучении Q-функции отводится примерам, на которых аппроксиматор ошибается сильнее.

Однако, нужно помнить, что это значение быстро устаревает, если его не обновлять. Но и обновлять для всей памяти каждый раз накладно. Из-за этого потребуется искать баланс между точностью оценки приоритета и скоростью работы.

В данном задании мы будем делать следующее:

- Использовать TD-ошибку в качестве приоритета.
- Так как для пакета данных, используемых при обучении, в любом случае будет вычислена TD-ошибка, воспользуемся полученными значениями для обновления значений приоритета в памяти для каждого примера из данного пакета.
- Будем периодически сортировать память для того, чтобы новые добавляемые переходы заменяли собой те переходы, у которых наименьший приоритет (т.е. наименьшие значения ошибки). Сортировка - дорогостоящая операция, поэтому выбрана редкая периодичность.

NB: Обратите внимание, что софтмакс очень чувствителен к масштабу величин и часто требует подбора температуры. Чтобы частично нивелировать эту проблему, предлагается использовать не `softmax(priorities)` напрямую, а воспользоваться функцией $\text{symlog} = \text{sign}(x) \cdot \log (|x| + 1)$, то есть `softmax(symlog(priorities))`, и не подбирать температуру. Идея взята из статьи DreamerV2 —-- в этой статье можно ознакомиться с идеей применения функций *symlog* и *simexp*, так как это полезная альтернатива нормализации некоторых величин в RL (вознаграждений, отдач, полезностей, логитов).

In [37]:
def symlog(x):
    """
    Compute symlog values for a vector `x`.
    It's an inverse operation for symexp.
    """
    return np.sign(x) * np.log(np.abs(x) + 1)

def softmax(xs, temp=1.):
    exp_xs = np.exp((xs - xs.max()) / temp)
    return exp_xs / exp_xs.sum()

def sample_prioritized_batch(replay_buffer, n_samples):
    if len(replay_buffer) < n_samples:
        n_samples = len(replay_buffer)
    
    buffer_size = len(replay_buffer)
    replace = buffer_size < n_samples
    priorities = np.array([sample[0] for sample in replay_buffer], dtype=np.float32)
    scores = symlog(priorities)
    probs = softmax(scores)
    indices = np.random.choice(buffer_size, size=n_samples, replace=replace, p=probs)

    batch_samples = [replay_buffer[i] for i in indices]
    states, actions, rewards, next_states, terminateds = zip(
        *[(s, a, r, sn, t) for (_, s, a, r, sn, t) in batch_samples]
    )

    batch = (
        np.array(states), np.array(actions), np.array(rewards),
        np.array(next_states), np.array(terminateds)
    )
    return batch, indices

def update_batch(replay_buffer, indices, batch, new_priority):
    """Updates batches with corresponding indices
    replacing their priority values."""
    states, actions, rewards, next_states, terminateds = batch

    for i in range(len(indices)):
        new_batch = (
            new_priority[i], states[i], actions[i], rewards[i],
            next_states[i], terminateds[i]
        )
        replay_buffer[indices[i]] = new_batch

def sort_replay_buffer(replay_buffer):
    """Sorts replay buffer to move samples with
    lesser priority to the beginning ==> they will be
    replaced with the new samples sooner."""
    new_rb = deque(maxlen=replay_buffer.maxlen)
    new_rb.extend(sorted(replay_buffer, key=lambda sample: sample[0]))
    return new_rb

In [39]:
def run_dqn_prioritized_rb(
        env_name="CartPole-v1",
        hidden_dims=(256, 256), lr=1e-3, gamma=0.99,
        eps_st=.4, eps_end=.02, eps_dur=.25, total_max_steps=100_000,
        train_schedule=4, replay_buffer_size=400, batch_size=32,
        eval_schedule=1000, smooth_ret_window=5, success_ret=200.
):
    env = gym.make(env_name)
    replay_buffer = deque(maxlen=replay_buffer_size)
    eval_return_history = deque(maxlen=smooth_ret_window)

    Q = create_network(
        input_dim=env.observation_space.shape[0], hidden_dims=hidden_dims,
        output_dim=env.action_space.n
    )
    opt = torch.optim.Adam(Q.parameters(), lr=lr)

    s, _ = env.reset()
    done = False

    for global_step in range(1, total_max_steps + 1):
        epsilon = linear(
            eps_st, eps_end, eps_dur * total_max_steps, global_step
        )
        a = select_action_eps_greedy(Q, s, epsilon=epsilon)
        s_next, r, terminated, truncated, _ = env.step(a)

        with torch.no_grad():
            s_t = to_tensor(s).unsqueeze(0)
            s_next_t = to_tensor(s_next).unsqueeze(0)

            Q_val = Q(s_t)                     # [1, n_actions]
            Q_next = Q(s_next_t)               # [1, n_actions]
            V_next, _ = Q_next.max(dim=1)      # [1]

            mask = 0.0 if terminated else 1.0
            td_target_single = torch.tensor(r, dtype=torch.float32) + gamma * V_next[0] * mask
            Q_sa = Q_val[0, a]

            # абсолютная TD-ошибка
            loss = (Q_sa - td_target_single).abs().item()

        replay_buffer.append((loss, s, a, r, s_next, terminated))
        done = terminated or truncated

        if global_step % train_schedule == 0:
            train_batch, indices = sample_prioritized_batch(
                replay_buffer, batch_size
            )
            (
                states, actions, rewards,
                next_states, terminateds
            ) = train_batch

            opt.zero_grad()
            td_target = compute_td_target(Q, rewards, next_states, terminateds, gamma=gamma)
            loss, td_losses = compute_td_loss(Q, states, actions, td_target, out_non_reduced_losses=True)
            loss.backward()
            opt.step()

            update_batch(
                replay_buffer, indices, train_batch, td_losses.numpy()
            )

        # with much slower scheduler periodically re-sort replay buffer
        # such that it will overwrite the least important samples
        if global_step % (10 * train_schedule) == 0:
            replay_buffer = sort_replay_buffer(replay_buffer)

        if global_step % eval_schedule == 0:
            eval_return = eval_dqn(env_name, Q)
            eval_return_history.append(eval_return)
            avg_return = np.mean(eval_return_history)
            print(f'{global_step=} | {avg_return=:.3f} | {epsilon=:.3f}')
            if avg_return >= success_ret:
                print('Решено!')
                break

        s = s_next
        if done:
            s, _ = env.reset()
            done = False

run_dqn_prioritized_rb(eval_schedule=250)

global_step=250 | avg_return=9.000 | epsilon=0.396
global_step=500 | avg_return=9.000 | epsilon=0.392
global_step=750 | avg_return=14.333 | epsilon=0.389
global_step=1000 | avg_return=49.000 | epsilon=0.385
global_step=1250 | avg_return=70.400 | epsilon=0.381
global_step=1500 | avg_return=89.800 | epsilon=0.377
global_step=1750 | avg_return=103.400 | epsilon=0.373
global_step=2000 | avg_return=100.000 | epsilon=0.370
global_step=2250 | avg_return=90.800 | epsilon=0.366
global_step=2500 | avg_return=84.800 | epsilon=0.362
global_step=2750 | avg_return=98.400 | epsilon=0.358
global_step=3000 | avg_return=116.200 | epsilon=0.354
global_step=3250 | avg_return=151.800 | epsilon=0.351
global_step=3500 | avg_return=172.800 | epsilon=0.347
global_step=3750 | avg_return=199.800 | epsilon=0.343
global_step=4000 | avg_return=221.200 | epsilon=0.339
Решено!
