# Тележка с шестом - Cart Pole

https://pytorch.org/tutorials/intermediate/reinforcement_q_learning.html

## Описание

Есть тележка *cart*, которая может двигаться линейно вправо или влево. К ней прикреплен шест *pole*, который может вращаться. 

Другое название перевернутый маятний - *Inverted pendulum*

*The CartPole problem is the Hello World of Reinforcement Learning*

Задача управления - удерживать шест в вертикальном положении, применяя смещение тележки вправо и влево.

Пространство действий *action space* состоит из двух действий:
1) Сдвинуть тележку влево - обозначается 0
2) Сдвинуть тележку вправо - обозначается 1

Пространство состояний *states*:
1) Положение тележки - обозначается $x$
2) Скорость тележки - обозначается $\dot x$
3) Угол вращения шеста - обозначается $\Theta$. Минимальное и максимальное значения равны -0,418 радиан (-24 градуса) и 0,418 (24 градуса).
4) Угловая скорость шеста - обозначается $\dot \Theta$

![image-2.png](attachment:image-2.png)

Эпизод завершается при условиях:
1) Угол наклона шеста становится больше $\pm 12$ градусов или $\pm 0,2095$ радиан
2) Положение тележки больше $\pm 2.4$
3) Если количество шагов в эпизоде превышает 500 для версии v1 Cart Pole (200 для версии v0).

Награда *Reward* в размере +1 начисляется за каждый пройденный шаг в рамках эпизода.
Таким образом, более высокая сумма вознаграждений получается за более длительные эпизоды

## Начинаем писать код

Импортируем библиотеки

In [14]:
import gymnasium as gym
import numpy as np
import time
import math
import random
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple, deque
from itertools import count

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

Создадим окружение *environment*

render_mode="human" означает, что мы хотим создать анимацию в отдельном окне.

!!! render_mode='human' Создает визуализацию !!!

*Вы также можете создать окружение без указания параметра render_mode. Это позволит создать окружение без создания анимации. Это полезно для отработки алгоритма обучения с подкреплением, поскольку генерация анимации во время обучения замедлит процесс обучения.*

In [15]:
env=gym.make('CartPole-v1',render_mode='human')

Настройка matplotlib. Отображение графика внутри kernel

In [16]:
is_ipython = 'inline' in matplotlib.get_backend()
if is_ipython:
    from IPython import display

plt.ion()

<contextlib.ExitStack at 0x27ad5b03d00>

Выбор процессора для работы

In [17]:
device = torch.device(
    "cuda" if torch.cuda.is_available() else
    "mps" if torch.backends.mps.is_available() else
    "cpu"
)

print(device)

cpu


## Deep Q Learning (DQN)

Будем обучать агента Deep Q Learning (DQN)

### Replay Memory

Мы будем использовать память воспроизведения опыта **replay memory** для обучения нашего DQN. В ней хранятся переходы *transitions *, которые наблюдает агент, что позволяет нам повторно использовать эти данные позже.

**Transition** - именованный кортеж *named tuple*, представляющий один переход в нашей среде. По сути, он сопоставляет пары (state, action) с их результатом (next_state, reward). По факту это опыт *expirience*.

**Replay Memory** - циклический буфер ограниченного размера, в котором хранятся недавно наблюдавшиеся переходы *transitions*. В нем также реализован метод **.sample()** для выбора случайного батча *batch* для обучения.

``def sample`` - Выборка равна ``batch_size``, отправленному в эту функцию

In [None]:
Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward'))

class ReplayMemory(object):

    def __init__(self, capacity):
        self.memory = deque([], maxlen=capacity)

    def push(self, *args):
        """Save a transition"""
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

Transition(state=2, action=3, next_state=1, reward=4)


### Определим модель

Наша модель будет представлять собой нейронную сеть с *feed forward*, которая учитывает разницу между текущим и предыдущим обновлениями экрана. Она имеет два выхода: $Q(s, left)$ и $Q(s, right)$, где $s$ - вход нейронки. 

Инициализируем слои нейронки: вход ``self.layer1``, внутренний ``self.layer2``, выход ``self.layer3``

Возвращает ``tensor([[left0exp,right0exp]...])``

In [None]:
class DQN(nn.Module):

    def __init__(self, n_observations, n_actions):
        super(DQN, self).__init__()
        self.layer1 = nn.Linear(n_observations, 128)
        self.layer2 = nn.Linear(128, 128)
        self.layer3 = nn.Linear(128, n_actions)

    # Called with either one element to determine next action, or a batch
    # during optimization. Returns tensor([[left0exp,right0exp]...]).
    def forward(self, x):
        x = F.relu(self.layer1(x))
        x = F.relu(self.layer2(x))
        return self.layer3(x)

### Обучение

`BATCH_SIZE` - это количество переходов, выбранных из буфера воспроизведения

``GAMMA`` - коэффициент дисконтирования

``EPS_START`` - начальное значение epsilon

``EPS_END`` - конечное значение epsilon

``EPS_DECAY`` контролирует скорость экспоненциального затухания epsilon, чем выше, тем медленнее затухание

``TAU`` - скорость обновления целевой сети (*rate of the target network*)

``LR`` - скорость обучения оптимизатора `AdamW`

Мы можем получить основную информацию о нашей среде, используя эти строки кода:

In [None]:
BATCH_SIZE = 128
GAMMA = 0.99
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 1000
TAU = 0.005
LR = 1e-4

Получим количество действий из пространства действий в *gym action space*

Получим количество *state observations*

In [None]:
n_actions = env.action_space.n

state, info = env.reset()
n_observations = len(state)

In [None]:
policy_net = DQN(n_observations, n_actions).to(device)
target_net = DQN(n_observations, n_actions).to(device)
target_net.load_state_dict(policy_net.state_dict())


``select_action`` - выберет действие в соответствии с политикой жадности epsilon. 

``plot_durations`` - вспомогательный инструмент для отображения графика продолжительности эпизодов, а также среднего значения за последние 100 эпизодов 

In [None]:
optimizer = optim.AdamW(policy_net.parameters(), lr=LR, amsgrad=True)
memory = ReplayMemory(10000)


steps_done = 0


def select_action(state):
    global steps_done
    sample = random.random()
    eps_threshold = EPS_END + (EPS_START - EPS_END) * \
        math.exp(-1. * steps_done / EPS_DECAY)
    steps_done += 1
    print(f'sample: {sample} / eps_threshold: {eps_threshold}')
    if sample > eps_threshold:
        with torch.no_grad():
            # t.max(1) will return the largest column value of each row.
            # second column on max result is index of where max element was
            # found, so we pick action with the larger expected reward.
            return policy_net(state).max(1).indices.view(1, 1)
    else:
        return torch.tensor([[env.action_space.sample()]], device=device, dtype=torch.long)


episode_durations = []


def plot_durations(show_result=False):
    plt.figure(1)
    durations_t = torch.tensor(episode_durations, dtype=torch.float)
    if show_result:
        plt.title('Result')
    else:
        plt.clf()
        plt.title('Training...')
    plt.xlabel('Episode')
    plt.ylabel('Duration')
    plt.plot(durations_t.numpy())
    # Take 100 episode averages and plot them too
    if len(durations_t) >= 100:
        means = durations_t.unfold(0, 100, 1).mean(1).view(-1)
        means = torch.cat((torch.zeros(99), means))
        plt.plot(means.numpy())

    plt.pause(0.001)  # pause a bit so that plots are updated
    if is_ipython:
        if not show_result:
            display.display(plt.gcf())
            display.clear_output(wait=True)
        else:
            display.display(plt.gcf())

### Цикл обучения

Функция ``optimize_model`` выполняет один шаг оптимизации. Сначала он производит выборку batch, объединяет все тензоры в один, вычисляет

$$
Q(s_t,a_t)
$$ 

$$
V(s_{t+1}) = max_a Q(s_{t+1},a)
$$
и объединяет их в нашу потерю.

По определению, мы устанавливаем 
$$
V(s) = 0
$$
если $s$ это конечное состояние.

Мы также *target network* сеть для вычисления $V(s_{t_1}) = 0$ для дополнительной стабильности.

*Target network* обновляется на каждом шаге с помощью программного обновления, управляемого гиперпараметром ``TAU``, который был определен ранее

In [None]:
def optimize_model():
    if len(memory) < BATCH_SIZE:
        return
    transitions = memory.sample(BATCH_SIZE)
    # Transpose the batch (see https://stackoverflow.com/a/19343/3343043 for
    # detailed explanation). This converts batch-array of Transitions
    # to Transition of batch-arrays.
    batch = Transition(*zip(*transitions))

    # Compute a mask of non-final states and concatenate the batch elements
    # (a final state would've been the one after which simulation ended)
    non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
                                          batch.next_state)), device=device, dtype=torch.bool)
    non_final_next_states = torch.cat([s for s in batch.next_state
                                                if s is not None])
    state_batch = torch.cat(batch.state)
    action_batch = torch.cat(batch.action)
    reward_batch = torch.cat(batch.reward)

    # Compute Q(s_t, a) - the model computes Q(s_t), then we select the
    # columns of actions taken. These are the actions which would've been taken
    # for each batch state according to policy_net
    state_action_values = policy_net(state_batch).gather(1, action_batch)

    # Compute V(s_{t+1}) for all next states.
    # Expected values of actions for non_final_next_states are computed based
    # on the "older" target_net; selecting their best reward with max(1).values
    # This is merged based on the mask, such that we'll have either the expected
    # state value or 0 in case the state was final.
    next_state_values = torch.zeros(BATCH_SIZE, device=device)
    with torch.no_grad():
        next_state_values[non_final_mask] = target_net(non_final_next_states).max(1).values
    # Compute the expected Q values
    expected_state_action_values = (next_state_values * GAMMA) + reward_batch

    # Compute Huber loss
    criterion = nn.SmoothL1Loss()
    loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1))

    # Optimize the model
    optimizer.zero_grad()
    loss.backward()
    # In-place gradient clipping
    torch.nn.utils.clip_grad_value_(policy_net.parameters(), 100)
    optimizer.step()

Ниже основной цикл обучения. 

В начале мы сбрасываем *reset* окружение и получаем тензор исходного состояния. 

Затем мы пробуем действие, выполняем его, наблюдаем за следующим состоянием и вознаграждением (всегда 1) и оптимизируем нашу модель один раз. 

Когда эпизод заканчивается (наша модель выходит из строя), мы перезапускаем цикл.

In [None]:
if torch.cuda.is_available() or torch.backends.mps.is_available():
    num_episodes = 600
else:
    num_episodes = 50

for i_episode in range(num_episodes):
    # Initialize the environment and get its state
    state, info = env.reset()
    state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
    for t in count():
        action = select_action(state)
        observation, reward, terminated, truncated, _ = env.step(action.item())
        reward = torch.tensor([reward], device=device)
        done = terminated or truncated

        if terminated:
            next_state = None
        else:
            next_state = torch.tensor(observation, dtype=torch.float32, device=device).unsqueeze(0)

        # Store the transition in memory
        memory.push(state, action, next_state, reward)

        # Move to the next state
        state = next_state

        # Perform one step of the optimization (on the policy network)
        optimize_model()

        # Soft update of the target network's weights
        # θ′ ← τ θ + (1 −τ )θ′
        target_net_state_dict = target_net.state_dict()
        policy_net_state_dict = policy_net.state_dict()
        for key in policy_net_state_dict:
            target_net_state_dict[key] = policy_net_state_dict[key]*TAU + target_net_state_dict[key]*(1-TAU)
        target_net.load_state_dict(target_net_state_dict)

        if done:
            episode_durations.append(t + 1)
            plot_durations()
            break

Вывод графика

In [None]:
print('Complete')
plot_durations(show_result=True)
plt.ioff()
plt.show()