# Семинар 4. Глубокое Q обучение


In [None]:
# @title Установка зависимостей

try:
    import google.colab
    COLAB = True
except ModuleNotFoundError:
    COLAB = False
    pass

if COLAB:
    !pip -q install "gymnasium[classic-control, atari, accept-rom-license]"
    !pip -q install piglet
    !pip -q install imageio_ffmpeg
    !pip -q install moviepy==1.0.3

In [None]:
# @title Импортирование зависимостей

import time
import glob
import io
import base64
from IPython import display as ipythondisplay
from IPython.display import HTML
import gymnasium as gym
from gymnasium.wrappers import RecordVideo
import numpy as np
import matplotlib.pyplot as plt
from IPython.display import clear_output
%matplotlib inline
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)

import torch
import torch.nn as nn
from collections import deque

def show_video(folder="./video"):
    mp4list = glob.glob(folder + '/*.mp4')
    if len(mp4list) > 0:
        mp4 = sorted(mp4list, key=lambda x: x[-15:], reverse=True)[0]
        video = io.open(mp4, 'r+b').read()
        encoded = base64.b64encode(video)
        ipythondisplay.display(HTML(data='''<video alt="test" autoplay
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>'''.format(encoded.decode('ascii'))))
    else:
        print("Could not find video")


def show_progress(rewards_batch, log):
    """
    Удобная функция, которая отображает прогресс обучения.
    """
    mean_reward = np.mean(rewards_batch)
    log.append(mean_reward)

    clear_output(True)
    plt.figure(figsize=[8, 4])
    plt.subplot(1, 2, 1)
    plt.plot(log, label='Mean rewards')
    plt.legend(loc=4)
    plt.show()

def video_episode(env_name, policy=None):
    # создаем среду с ограничением на число шагов в среде
    env = gym.make(env_name, render_mode="rgb_array", max_episode_steps=250)
    # добавляем визуализацию
    env = RecordVideo(env, f"./video", name_prefix=str(int(time.time())))

    # проводим инициализацию и запоминаем начальное состояние
    s, _ = env.reset()
    done = False

    while not done:
        # выполняем действие, получаем s, r, term, trunc, info
        if policy:
            s, r, terminated, truncated, _ = env.step(policy(s))
        else:
            s, r, terminated, truncated, _ = env.step(env.action_space.sample())
        done = terminated or truncated

    env.close()
    show_video()

# 1. Основы PyTorch
Будем использовать библиотеку [pytorch](https://pytorch.org/) для обучения нейронных сетей, хотя можно использовать и любую другую библиотеку (например, [tensorflow](https://www.tensorflow.org/), [flax](https://github.com/google/flax), [equinox](https://docs.kidger.site/equinox/)).

Рассмотрим простую задачу линейной регрессии. Для обучения с подкреплением она является основой для обучения функции полезности.

Линейная регрессия: восстановить параметры функции на основе экспериментальных данных.

Общий вид линейной функции:
$$y = ax + b + \epsilon,$$
где $a, b$ - параметры, а $\epsilon$ - некоторый шум.

Экспериментальные данные (датасет) - набор пар: аргумент-значение, для нашего примера это $(x, y)$. Решая задачу линейной регрессии, мы будем искать параметры, которые позволяют задать отображение $f_{a,b}(x) = ax + b$.




In [None]:
a = 1
b = 2

# генерация данных
np.random.seed(42)
x = np.random.rand(100, 1)
y = a + b * x + 0.1 * np.random.randn(100, 1)

# Перемещиваем индексы
idx = np.arange(100)
np.random.shuffle(idx)

# Разделяем на тренировочную и валидационную выборки (чисто для демонстрационных целей)
train_idx = idx[:80]
val_idx = idx[80:]

x_train, y_train = x[train_idx], y[train_idx]
x_val, y_val = x[val_idx], y[val_idx]

In [None]:
plt.scatter(x, y)
plt.show()

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'

# Наши данные это numpy массивы, необходимо их перенести на вычислительное
# устройство и преобразовать в тензоры торча
x_train_tensor = torch.from_numpy(x_train).float().to(device)
y_train_tensor = torch.from_numpy(y_train).float().to(device)

# Посмотрим разницу между объектами
print(type(x_train), type(x_train_tensor), x_train_tensor.type())

In [None]:
device

In [None]:
def make_train_step(model, loss_fn, optimizer):
    # Строим функцию, осуществяющую обучение
    def train_step(x, y):
        # Выставляем TRAIN режим
        model.train()
        # Step 1: Делаем предсказания
        yhat = model(x)
        # Step 2: Вычисляем функцию потерь
        loss = loss_fn(y, yhat)
        # Step 3: Вычисляем градиенты
        loss.backward()
        # Step 4: Обновляем параметры и стираем градиенты
        optimizer.step()
        optimizer.zero_grad()
        # Returns the loss
        return loss.item()

    # Возвращаем функцию обучения
    return train_step

# Начинаем обучение с нуля
lr = 1e-1
n_epochs = 1000
model = nn.Sequential(nn.Linear(1, 1)).to(device)
loss_fn = nn.MSELoss(reduction='mean')
optimizer = torch.optim.SGD(model.parameters(), lr=lr)

# Задаем функцию для обучения
train_step = make_train_step(model, loss_fn, optimizer)
losses = []

for epoch in range(n_epochs):
    # Один шаг обучения
    loss = train_step(x_train_tensor, y_train_tensor)
    losses.append(loss)

# Проверим параметры модели
print(model.state_dict())

In [None]:
plt.plot(losses)
plt.yscale('log')
plt.show()

In [None]:
x_true = np.linspace(0, 1, 100)
y_true = a + b * x_true

plt.plot(x_true, y_true, label='True')
plt.scatter(x_train, y_train)
y_pred = model(torch.from_numpy(x_true[:, None]).float().to(device)).cpu().detach().numpy()
plt.plot(x_true, y_pred, label='Predicted')
plt.legend()
plt.show()

# 2. Deep Q Network <a name = 'dqn'></a>

Зачем в обучении с подкреплением использовать аппроксиматоры?

Рассмотрим среду [CartPole-v1](https://gymnasium.farama.org/environments/classic_control/cart_pole/). На данном этапе мы ограничены, и можем использовать только дискретное пространство действий.

<img src="https://www.researchgate.net/publication/362568623/figure/fig5/AS:1187029731807278@1660021350587/Screen-capture-of-the-OpenAI-Gym-CartPole-problem-with-annotations-showing-the-cart.png" width=500/>

## **Задание**
Откройте среды [Classic control](https://gymnasium.farama.org/environments/classic_control/), какие из них подходит под условие использования дискретных действий?

In [None]:
env = gym.make("CartPole-v1")
env.reset()
n_actions = env.action_space.n
state_dim = env.observation_space.shape
print(f'Action_space: {n_actions} \nState_space: {state_dim}')
env.close()

Т.к. описание состояния в задаче с маятником представляет собой не "сырые" признаки, а уже предобработанные (координаты, углы), нам не нужна для начала сложная архитектура, начнем с такой:
<img src="https://raw.githubusercontent.com/Tviskaron/mipt/master/2020/RL/figures/DQN.svg" width=500>

## **Вопрос**

В чем преимущество использования глубоких нейронных сетей?

Для начала попробуйте использовать только полносвязные слои (``torch.nn.Linear``) и простые активационные функции (``torch.nn.ReLU``).

## **Задание**

Допишите код, инициализирующий полносвязную нейронную сеть с 3-мя скрытыми слоями.

In [None]:
def create_network(input_dim, hidden_dims: list[int], output_dim):
    # network = nn.Sequential(
    #    torch.nn.Linear(input_dim, ...),
    #    torch.nn.ReLU(),
    #    ...
    # )
    # add 3 layers!
    ####### Здесь ваш код ########

    ##############################
    return network

## Задание
Реализуйте эпсилон-жадную стратегию для Q-функции, определенной через нейронную сеть.

In [None]:
def select_action_eps_greedy(network, state, epsilon):
    """Выбирает действие epsilon-жадно."""
    if not isinstance(state, torch.Tensor):
        state = torch.tensor(state, dtype=torch.float32)
    Q_s = network(state).detach().numpy()

    # action =
    ####### Здесь ваш код ########

    ##############################

    action = int(action)
    return action

In [None]:
env = gym.make("CartPole-v1", render_mode="rgb_array")
network = create_network(env.observation_space.shape[0], [128, 128], env.action_space.n)
video_episode("CartPole-v1", lambda x: select_action_eps_greedy(network, x, 0))

Будем приближать Q-функцию агента, минимизируя среднеквадратичную TD-ошибку:
$$
\delta = Q_{target}(s, a) - Q_{\theta}(s, a) = [r(s, a) + \gamma \cdot max_{a'} Q_{-}(s', a')] - Q_{\theta}(s, a) \\
L = \frac{1}{N} \sum_i \delta_i^2,
$$
где
* $s, a, r, s'$ состояние, действие, вознаграждение и следующее состояние,
* $\gamma$ дисконтирующий множитель.

Основная тонкость состоит в использовании $Q_{-}(s',a')$. Это та же самая функция, что и $Q_{\theta}$, которая является выходом нейронной сети, но при обучении сети, мы не пропускаем через эти слои градиенты. В статьях можно обнаружить следующее обозначение для остановки градиента: $SG(\cdot)$.

In [None]:
def compute_td_loss(
        network, states, actions, rewards, next_states, is_done, gamma=0.99,
        check_shapes=False, regularizer=.1
):
    """ Считатет td ошибку, используя лишь операции фреймворка torch. Используйте формулу выше. """

    # переводим входные данные в тензоры
    states = torch.tensor(np.array(states), dtype=torch.float32)    # shape: [batch_size, state_size]
    actions = torch.tensor(actions, dtype=torch.long)     # shape: [batch_size]
    rewards = torch.tensor(rewards, dtype=torch.float32)  # shape: [batch_size]


    next_states = torch.tensor(np.array(next_states), dtype=torch.float32) # shape: [batch_size, state_size]
    is_done = torch.tensor(is_done, dtype=torch.bool)    # shape: [batch_size]

    # получаем значения Q(s, ...) для всех действий из текущих состояний
    predicted_qvalues = network(states)

    # получаем Q(s, a) для выбранных действий
    predicted_qvalues_for_actions = predicted_qvalues[range(states.shape[0]), actions]

    # применяем сеть для получения Q(s', ...) для следующих состояний (next_states)
    # predicted_next_qvalues =
    ####### Здесь ваш код ########

    ##############################

    # ВАЖНО!!! Необходимо остановить градиенты - открепить значения
    # от графа вычислений. Можно использовать .detach()
    # вычисляем V*(next_states), что соответствует max_{a'} Q(s',a')
    # next_state_values =
    ####### Здесь ваш код ########

    ##############################

    assert next_state_values.dtype == torch.float32

    # вычисляем target q-values для функции потерь
    #  target_qvalues_for_actions =
    ####### Здесь ваш код ########

    ##############################

    # для последнего действия в эпизоде используем
    # упрощенную формулу Q(s,a) = r(s,a),
    # т.к. s' для него не существует
    target_qvalues_for_actions = torch.where(is_done, rewards, target_qvalues_for_actions)

    # MSE loss для минимизации
    losses = (predicted_qvalues_for_actions - target_qvalues_for_actions.detach()) ** 2
    loss = torch.mean(losses)
    # добавляем регуляризацию на значения Q
    loss += regularizer * torch.abs(predicted_qvalues_for_actions).mean()

    if check_shapes:
        assert predicted_next_qvalues.data.dim(
        ) == 2, "убедитесь, что вы предсказали q-значения для всех действий в следующем состоянии"
        assert next_state_values.data.dim(
        ) == 1, "убедитесь, что вы вычислили V (s ') как максимум только по оси действий, а не по всем осям"
        assert target_qvalues_for_actions.data.dim(
        ) == 1, "что-то не так с целевыми q-значениями, они должны быть вектором"

    return loss, losses

## **Задание**

Что нужно изменить, чтобы получить SARSA?

Объединяем все в цикл взаимодействия агента со средой и обучения.

In [None]:
def generate_session(env, network, opt, t_max=300, epsilon=0, train=False):
    """генерация сессии и обучение"""
    total_reward = 0
    s, _ = env.reset()
    epsilon = epsilon if train else 0.

    for t in range(t_max):
        a = select_action_eps_greedy(network, s, epsilon=epsilon)
        next_s, r, terminated, truncated, _ = env.step(a)

        if train:
            opt.zero_grad()
            loss, _ = compute_td_loss(network, [s], [a], [r], [next_s], [terminated])
            loss.backward()
            opt.step()

        total_reward += r
        s = next_s
        if terminated or truncated:
            break

    return total_reward

In [None]:
lr = .0001
eps, eps_decay = .5, .998
train_ep_len, eval_schedule = 10000, 50
eval_rewards = deque(maxlen=5)

env.reset()
network = create_network(env.observation_space.shape[0], [128, 128], env.action_space.n)
opt = torch.optim.Adam(network.parameters(), lr=lr)

for ep in range(train_ep_len):
    _ = generate_session(env, network, opt, epsilon=eps, train=True)

    if (ep + 1) % eval_schedule == 0:
        ep_rew = generate_session(env, network, opt, epsilon=eps, train=False)
        eval_rewards.append(ep_rew)
        running_avg_rew = np.mean(eval_rewards)
        print(f"Epoch: #{ep}\tmean reward = {running_avg_rew:.3f}\tepsilon = {eps:.3f}")

        if eval_rewards and running_avg_rew >= 200.:
            print("Принято!")
            break

    eps *= eps_decay

`mean_reward` - это средняя отдача за эпизод на истории из последних 5 эпизодов. В случае корректной реализации, этот показатель будет низким первые 1000 шагов и только затем будет возрастать и сойдется на 5000-15000 шагах в зависимости от архитектуры сети.

## **Вопрос**
Почему используется затухание для $\epsilon$?

In [None]:
video_episode("CartPole-v1", lambda x: select_action_eps_greedy(network, x, 0))

# 3. DQN with Experience Replay

Теперь добавим поддержку памяти прецедентов (Replay Buffer), которая будет из себя представлять очередь из наборов: $\{(s, a, r, s', done)\}$.

Во время обучения каждый новый переход будет добавляться в память, а обучение будет целиком производиться на переходах, просэмплированных из памяти прецедентов.

## Задание

Реализуйте алгоритм семплирования из памяти прецедентов.

In [None]:
def sample_batch(replay_buffer, n_samples):
    # sample randomly `n_samples` samples from replay buffer
    # and split an array of samples into arrays: states, actions, rewards, next_actions, dones
    ####### Здесь ваш код ########

    ##############################

    return np.array(states), np.array(actions), np.array(rewards), np.array(next_states), np.array(terminals)

Добавим в алгоритм взаимодействия агента со средой пополнение памяти прецедентов и обучение на основе выборки данных из памяти.

In [None]:
def generate_session_rb(
        env, network, opt, replay_buffer, glob_step,
        train_schedule, batch_size,
        t_max=300, epsilon=0, train=False
):
    """генерация сессии и обучение"""
    total_reward = 0
    s, _ = env.reset()
    epsilon = epsilon if train else 0.

    for t in range(t_max):
        a = select_action_eps_greedy(network, s, epsilon=epsilon)
        next_s, r, terminated, truncated, _ = env.step(a)

        if train:
            # put new sample into replay_buffer
            replay_buffer.append((s, a, r, next_s, terminated))

            if replay_buffer and glob_step % train_schedule == 0:
                # sample new batch
                train_batch = sample_batch(replay_buffer, batch_size)
                states, actions, rewards, next_states, is_terminal = train_batch

                opt.zero_grad()
                loss, _ = compute_td_loss(network, states, actions, rewards, next_states, is_terminal)
                loss.backward()
                opt.step()

        glob_step += 1
        total_reward += r
        s = next_s
        if terminated or truncated:
            break

    return total_reward, glob_step

Осталось протестировать новый DQN с памятью предентов.

In [None]:
lr = .0001
eps, eps_decay = .5, .998
train_ep_len, eval_schedule = 10000, 50
train_schedule, batch_size = 4, 32
replay_buffer = deque(maxlen=4000)
eval_rewards = deque(maxlen=5)
glob_step = 0

env.reset()
network = create_network(env.observation_space.shape[0], [128, 128], env.action_space.n)
opt = torch.optim.Adam(network.parameters(), lr=lr)

for ep in range(train_ep_len):
    _, glob_step = generate_session_rb(
        env, network, opt, replay_buffer, glob_step, train_schedule, batch_size, epsilon=eps, train=True
    )

    if (ep + 1) % eval_schedule == 0:
        ep_rew, _ = generate_session_rb(
            env, network, opt, replay_buffer, 0, train_schedule, batch_size, epsilon=eps, train=False
        )
        eval_rewards.append(ep_rew)
        running_avg_rew = np.mean(eval_rewards)
        print("Epoch: #{}\tmean reward = {:.3f}\tepsilon = {:.3f}".format(ep, running_avg_rew, eps))

        if eval_rewards and running_avg_rew >= 200.:
            print("Принято!")
            break

    eps *= eps_decay

In [None]:
video_episode("CartPole-v1", lambda x: select_action_eps_greedy(network, x, 0))

# 4. DQN with Prioritized Experience Replay

Добавим каждому примеру, хранящемуся в памяти, значение приоритета. Приоритет будет влиять на частоту случайного выбора примеров в пакет на обучение. Удачный выбор приоритета позволит повысить эффективность обучения. Популярным вариантом является абсолютное значение TD-ошибки: акцент при обучении Q-функции отводится примерам, на которых аппроксиматор ошибается сильнее.

Необходимо помнить, что это значение быстро устаревает, если его не обновлять. Но и обновлять для всей памяти каждый раз накладно. Из-за этого потребуется искать баланс между точностью оценки приоритета и скоростью работы.

В данном задании мы будем делать следующее:

- Использовать TD-ошибку в качестве приоритета.
- Так как для батча данных, используемых при обучении, в любом случае будет вычислена TD-ошибка, воспользуемся полученными значениями для обновления значений приоритета в памяти для каждого примера из данного батча.
- Будем периодически сортировать память для того, чтобы новые добавляемые переходы заменяли собой те переходы, у которых наименьший приоритет (т.е. наименьшие значения ошибки).

**Обратите внимание**, что софтмакс очень чувствителен к масштабу величин и часто требует подбора температуры. Чтобы частично нивелировать эту проблему, предлагается использовать не softmax(priorities) напрямую, а воспользоваться функцией
$$\mathrm{symlog}(x) = \mathrm{sign}(x)\mathrm{log}(|x| + 1),$$
то есть softmax(symlog(priorities)), и не подбирать температуру. Идея взята из статьи [DreamerV3](https://arxiv.org/pdf/2301.04104).

In [None]:
def symlog(x):
    """
    Compute symlog values for a vector `x`.
    It's an inverse operation for symexp.
    """
    return np.sign(x) * np.log(np.abs(x) + 1)

def softmax(xs, temp=1.):
    exp_xs = np.exp((xs - xs.max()) / temp)
    return exp_xs / exp_xs.sum()

def sample_prioritized_batch(replay_buffer, n_samples):
    # Sample randomly `n_samples` examples from replay buffer
    # weighting by priority (example's TD error) and split an array
    # of sample tuples into arrays:
    #    states, actions, rewards, next_states, terminateds
    # Also, keep samples' indices (into `indices`) to return them too!
    # Note that each sample in replay buffer is a tuple:
    #   (priority, state, action, reward, next_state, terminated)
    # Use
    ####### Здесь ваш код ########

    ##############################

    batch = (
        np.array(states), np.array(actions), np.array(rewards),
        np.array(next_states), np.array(terminateds)
    )
    return batch, indices

def update_batch(replay_buffer, indices, batch, new_priority):
    """Updates batches with corresponding indices
    replacing their priority values."""
    states, actions, rewards, next_states, terminateds = batch

    for i in range(len(indices)):
        new_batch = (
            new_priority[i], states[i], actions[i], rewards[i],
            next_states[i], terminateds[i]
        )
        replay_buffer[indices[i]] = new_batch

def sort_replay_buffer(replay_buffer):
    """Sorts replay buffer to move samples with
    lesser priority to the beginning ==> they will be
    replaced with the new samples sooner."""
    new_rb = deque(maxlen=replay_buffer.maxlen)
    new_rb.extend(sorted(replay_buffer, key=lambda sample: sample[0]))
    return new_rb