# RL basics

Термины и понятия:

- агент/среда
- наблюдение $o$ / состояние $s$
- действие $a$, стратегия $\pi: \pi(s) \rightarrow a$ функция перехода $T: T(s, a) \rightarrow s'$
- вознаграждение $r$, ф-я вознаграждений $R: R(s, a) \rightarrow r$
- цикл взаимодействия, траектория $\tau: (s_0, a_0, r_0, s_1, a_1, r_1, ..., s_T, a_T, r_T)$, эпизод
- отдача $G$, подсчет отдачи, средняя[/ожидаемая] отдача $\mathbb{E}[G]$

In [1]:
try:
    import google.colab
    COLAB = True
except ModuleNotFoundError:
    COLAB = False
    pass

if COLAB:
    !pip -q install "gymnasium[classic-control, atari, accept-rom-license]"
    !pip -q install piglet
    !pip -q install imageio_ffmpeg
    !pip -q install moviepy==1.0.3

In [2]:
import glob
import io
import base64
import gymnasium as gym
import numpy as np
from IPython import display as ipythondisplay
from IPython.display import HTML
import matplotlib.pyplot as plt
%matplotlib inline

## Agent, environment

<img src=https://gymnasium.farama.org/_images/lunar_lander.gif caption="lunar lander" width="150" height="50"><img src=https://gymnasium.farama.org/_images/mountain_car.gif caption="mountain car" width="150" height="50">
<img src=https://gymnasium.farama.org/_images/cliff_walking.gif caption="cliff walking" width="300" height="50">
<img src=https://ale.farama.org/_images/montezuma_revenge.gif caption="montezuma revenge" width="150" height="100">
<img src=https://github.com/danijar/crafter/raw/main/media/video.gif caption="crafter" width="150" height="100">
<img src=https://camo.githubusercontent.com/6df2ca438d8fe8aa7a132b859315147818c54af608f8609320c3c20e938acf48/68747470733a2f2f6d656469612e67697068792e636f6d2f6d656469612f344e78376759694d394e44724d724d616f372f67697068792e676966 caption="malmo minecraft" width="150" height="100">
<img src=https://images.ctfassets.net/kftzwdyauwt9/e0c0947f-1a44-4528-4a41450a9f0a/2d0e85871d58d02dbe01b2469d693d4a/table-03.gif caption="roboschool" width="150" height="100">
<img src=https://raw.githubusercontent.com/Tviskaron/mipt/master/2019/RL/02/mdp.png caption="Марковский процесс принятия решений" width="150" height="100">
<img src=https://minigrid.farama.org/_images/DoorKeyEnv.gif caption="minigrid" width="120" height="120">

## Observation, state

TODO:
- добавить примеры наблюдений/состояний (числа, векторы, картинки)
- интуитивное объяснение различия, положить пока, что наблюдение = состояние
- пространство состояний


В каждый момент времени среда имеет некоторое внутреннее состояние. Здесь слово "состояние" я употребил скорее в интуитивном понимании, чтобы обозначить, что среда изменчива (иначе какой смысл с ней взаимодействовать, если ничего не меняется). В обучении с подкреплением под термином состояние $s$ (или $s_t$, где $t$ — текущее время) подразумевают либо абстрактно информацию о "состоянии" среды, либо ее явное представление в виде данных, достаточные для полного описания "состояния". *NB: Здесь можно провести аналогию с компьютерными играми — файл сохранения игры как раз содержит информацию о "состоянии" мира игры, чтобы в будущем можно было продолжить с текущей точки, так что данные этого файла в целом можно с некоторой натяжкой считать состоянием (с натяжкой, потому что редко когда в сложных играх файлы сохранения содержат прямо вот всю информацию, так что после перезагрузки вы получите не совсем точную копию). При этом обычно подразумевается, что состояние не содержит в себе ничего лишнего, то есть это **минимальный** набор информации.*

Наблюдением $o$ называют то, что агент "видит" о текущем состоянии среды. Это не обязательно зрение, а вообще вся доступная ему информация (условно, со всех его органов чувств).

В общем случае наблюдение: кортеж/словарь многомерных векторов чисел.

In [None]:
print(gym.make("CartPole-v0").reset()[0].shape)
print(gym.make("MountainCar-v0").reset()[0].shape)

## Action, policy, transition function

Рассмотрим следующие MDP:

- A: <img src=https://i.ibb.co/mrCMVZLQ/mdp-a.png caption="A" width="400" height="100">
- B: <img src=https://i.ibb.co/GQ2tVtjC/mdp-b.png caption="B" width="400" height="100">

Links to all:
[A](https://i.ibb.co/mrCMVZLQ/mdp-a.png)
[B](https://i.ibb.co/GQ2tVtjC/mdp-b.png)
[C](https://i.ibb.co/Jj9LYHjP/mdp-c.png)
[D](https://i.ibb.co/Y47Mr83b/mdp-d.png)
[E](https://i.ibb.co/Kjt1Xhmf/mdp-e.png)

Давайте явно запишем пространства состояний $S$ и действий $A$, а также функцию перехода $T$ среды.

In [None]:
states = set(range(3))
actions = set(range(1))

print(f'{states=} | {actions=}')

T = {
    (0, 0): 1,
    (1, 0): 2,
    (2, 0): 2
}
print(f'Transition function {T=}')

A_mdp = states, actions, T

Попробуйте записать функцию перехода в матричном виде:

In [None]:
# Матричная форма функции перехода T
# Матрица перехода P[s][s'] = вероятность перехода из состояния s в состояние s' при действии a
# В данном случае у нас только одно действие (0), поэтому матрица 3x3

# Создаем матрицу перехода для MDP A
# Состояния: 0, 1, 2; Действие: 0
T_matrix = np.zeros((3, 3))
T_matrix[0, 1] = 1.0  # из состояния 0 в состояние 1
T_matrix[1, 2] = 1.0  # из состояния 1 в состояние 2
T_matrix[2, 2] = 1.0  # из состояния 2 в состояние 2 (терминальное)

print("Матрица переходов T:")
print(T_matrix)


Как получить вероятность нахождения агента в состоянии (1) через N шагов? Что происходит с вероятностями нахождения в состояниях при $N \rightarrow \infty$

In [None]:
# Вероятность нахождения агента в состоянии (1) через N шагов
# Начальное состояние: 0
initial_state = np.array([1.0, 0.0, 0.0])  # вероятность быть в состоянии 0 = 1.0

# Рассмотрим несколько значений N
N_values = [1, 2, 3, 5, 10, 20, 50, 100]

probabilities_in_state_1 = []

for N in N_values:
    # Вычисляем распределение вероятностей через N шагов
    state_dist = initial_state.copy()
    for _ in range(N):
        state_dist = state_dist @ T_matrix
    
    # Вероятность быть в состоянии 1
    prob_state_1 = state_dist[1]
    probabilities_in_state_1.append(prob_state_1)
    print(f"Через {N} шагов: P(состояние 1) = {prob_state_1:.6f}")

# Что происходит при N -> infinity?
# Вычисляем стационарное распределение
eigenvalues, eigenvectors = np.linalg.eig(T_matrix.T)
# Находим собственный вектор с собственным значением 1 (стационарное распределение)
stationary_idx = np.where(np.abs(eigenvalues - 1.0) < 1e-6)[0]
if len(stationary_idx) > 0:
    stationary = eigenvectors[:, stationary_idx[0]].real
    stationary = stationary / stationary.sum()  # нормализуем
    print(f"\nСтационарное распределение (N -> infinity):")
    print(f"P(состояние 0) = {stationary[0]:.6f}")
    print(f"P(состояние 1) = {stationary[1]:.6f}")
    print(f"P(состояние 2) = {stationary[2]:.6f}")

# Визуализация
plt.figure(figsize=(10, 5))
plt.plot(N_values, probabilities_in_state_1, 'o-')
plt.xlabel('N (количество шагов)')
plt.ylabel('Вероятность нахождения в состоянии 1')
plt.title('Вероятность нахождения агента в состоянии 1 через N шагов')
plt.grid(True)
plt.show()


Задайте еще несколько MDP:

- C: <img src=https://i.ibb.co/Jj9LYHjP/mdp-c.png caption="C" width="400" height="100">

In [None]:
# MDP C: задаем пространство состояний, действий и функцию перехода
# Исходя из диаграммы C (нужно посмотреть на изображение, но предположу структуру)
# Обычно в таких задачах C может иметь другую структуру переходов

states_C = set(range(3))  # предположим 3 состояния
actions_C = set(range(2))  # предположим 2 действия

# Задаем функцию перехода для MDP C
# (это пример, может потребоваться корректировка в зависимости от реальной диаграммы)
T_C = {
    (0, 0): 1,  # из состояния 0 при действии 0 -> состояние 1
    (0, 1): 2,  # из состояния 0 при действии 1 -> состояние 2
    (1, 0): 2,  # из состояния 1 при действии 0 -> состояние 2
    (1, 1): 0,  # из состояния 1 при действии 1 -> состояние 0
    (2, 0): 2,  # из состояния 2 при действии 0 -> состояние 2 (терминальное)
    (2, 1): 2   # из состояния 2 при действии 1 -> состояние 2 (терминальное)
}

print(f"MDP C:")
print(f"Состояния: {states_C}")
print(f"Действия: {actions_C}")
print(f"Функция перехода: {T_C}")

C_mdp = states_C, actions_C, T_C


Давайте попробуем задать двух агентов: случайного и оптимального (для каждой среды свой).

In [None]:
class Agent:
    def __init__(self, actions):
        self.rng = np.random.default_rng()
        self.actions = np.array(list(actions))

    def act(self, state):
        return self.rng.integers(len(self.actions))

В качестве дополнения, запишите стратегию агента

In [None]:
# Стратегия агента - функция, которая для каждого состояния возвращает действие
# Для случайного агента стратегия - равномерное распределение по действиям

def policy_random(state, actions):
    """
    Случайная стратегия: равномерное распределение по всем действиям
    """
    return np.random.choice(list(actions))

def policy_deterministic(state, actions, policy_dict):
    """
    Детерминированная стратегия: для каждого состояния задано конкретное действие
    policy_dict: словарь {состояние: действие}
    """
    return policy_dict.get(state, list(actions)[0])

# Пример стратегии для MDP A (только одно действие, поэтому тривиально)
policy_A = {
    0: 0,
    1: 0,
    2: 0
}

print(f"Стратегия для MDP A: {policy_A}")

# Пример стратегии для MDP C
policy_C = {
    0: 0,  # в состоянии 0 выбираем действие 0
    1: 1,  # в состоянии 1 выбираем действие 1
    2: 0   # в состоянии 2 выбираем действие 0
}

print(f"Стратегия для MDP C: {policy_C}")

# В вероятностном виде стратегия может быть представлена как:
# π(a|s) - вероятность выбора действия a в состоянии s

def policy_probabilistic(state, actions, policy_probs):
    """
    Вероятностная стратегия
    policy_probs: словарь {состояние: массив вероятностей для каждого действия}
    """
    probs = policy_probs.get(state, np.ones(len(actions)) / len(actions))
    return np.random.choice(list(actions), p=probs)


## Reward, reward function

Теперь добавим произвольную функцию вознаграждения. Например, для A:

In [None]:
R = {
    (0, 0): -0.1,
    (1, 0): 1.0,
    (2, 0): 0.0
}
print(R)

A_mdp = *A_mdp, R
print(A_mdp)

## Interaction loop, trajectory, termination, truncation, episode

Общий цикл взаимодействия в рамках эпизода:
1. Инициализировать среду: $s \leftarrow \text{env.init()}$
2. Цикл:
    - выбрать действие: $a \leftarrow \pi(s)$
    - получить ответ от среды: $s, r, d \leftarrow \text{env.next(a)}$
    - если $d == \text{True}$, выйти из цикла

In [None]:
def run_episode(mdp):
    states, actions, T, R = mdp
    agent = Agent(actions)

    s = 0
    tau = []
    for _ in range(5):
        a = agent.act(s)
        s_next = T[(s, a)]
        r = R[(s, a)]

        tau.append((s, a, r))
        s = s_next

    return tau

run_episode(A_mdp)

Termination — означает окончание эпизода, когда достигнуто терминальное состояние. Является частью задания среды.

Truncation — означает окончание эпизода, когда достигнут лимит по числу шагов (=времени). Обычно является внешне заданным параметром для удобства обучения.

Пока не будем вводить truncation, но поддержим termination: расширьте определение среды информацией о терминальных состояниях для всех описанных ранее сред. Сгенерируйте по несколько случайных траекторий для каждой среды.

### Return, expected return

Наиболее важная метрика оценки качества работы агента: отдача.

Отдача: $G(s_t) = \sum_{i=t}^T r_i$

Обычно также вводят параметр $\gamma \in [0, 1]$, дисконтирующий будущие вознаграждения. А еще, тк отдача может меняться от запуска к запуску благодаря вероятностным процессам, нас интересует отдача в среднем — ожидаемая отдача:

$$\hat{G}(s_t) = \mathbb{E} [ \sum_{i=t}^T \gamma^{i-t} r_i ]$$

Именно ее и оптимизируют в RL.

Давайте научимся считать отдачу для состояний по траектории и считать среднюю отдачу.

In [None]:
# Функция для подсчета отдачи (return) по траектории
def calculate_return(trajectory, gamma=1.0, start_idx=0):
    """
    Вычисляет отдачу G(s_t) = sum_{i=t}^T gamma^{i-t} * r_i
    
    Parameters:
    trajectory: список кортежей (s, a, r) или (s, a, r, s_next)
    gamma: коэффициент дисконтирования
    start_idx: индекс начала подсчета отдачи
    
    Returns:
    return_value: отдача с позиции start_idx
    """
    rewards = [r for s, a, r in trajectory[start_idx:]]
    G = 0.0
    for i, r in enumerate(rewards):
        G += (gamma ** i) * r
    return G

# Функция для подсчета отдачи для каждого состояния в траектории
def calculate_returns_for_states(trajectory, gamma=1.0):
    """
    Вычисляет отдачу для каждого состояния в траектории
    Returns: список отдач G(s_t) для каждого t
    """
    returns = []
    for t in range(len(trajectory)):
        G_t = calculate_return(trajectory, gamma=gamma, start_idx=t)
        returns.append(G_t)
    return returns

# Функция для подсчета средней отдачи (expected return)
def calculate_expected_return(mdp, policy_fn, num_episodes=1000, gamma=1.0):
    """
    Вычисляет среднюю отдачу за num_episodes эпизодов
    
    Parameters:
    mdp: кортеж (states, actions, T, R) или (states, actions, T)
    policy_fn: функция стратегии policy_fn(state, actions) -> action
    num_episodes: количество эпизодов для усреднения
    gamma: коэффициент дисконтирования
    
    Returns:
    expected_return: средняя отдача
    """
    states, actions, T = mdp[:3]
    R = mdp[3] if len(mdp) > 3 else {}
    
    total_returns = []
    
    for episode in range(num_episodes):
        s = 0  # начальное состояние
        trajectory = []
        max_steps = 100
        step = 0
        
        while step < max_steps:
            a = policy_fn(s, actions)
            s_next = T[(s, a)]
            r = R.get((s, a), 0.0)  # если R не задана, используем 0
            
            trajectory.append((s, a, r))
            
            # Проверка на терминальное состояние (если есть)
            if s_next == s and step > 0:  # зацикливание может означать терминальное состояние
                break
                
            s = s_next
            step += 1
        
        if len(trajectory) > 0:
            G = calculate_return(trajectory, gamma=gamma)
            total_returns.append(G)
    
    return np.mean(total_returns) if total_returns else 0.0

# Пример использования
print("Пример траектории из MDP A:")
trajectory_example = run_episode(A_mdp)
print(f"Траектория: {trajectory_example}")

# Отдача для каждого состояния
returns = calculate_returns_for_states(trajectory_example, gamma=1.0)
print(f"\nОтдача для каждого состояния:")
for t, (s, a, r) in enumerate(trajectory_example):
    print(f"G(s_{t}) = {returns[t]:.2f}")

# Средняя отдача
expected_G = calculate_expected_return(
    A_mdp, 
    lambda s, acts: policy_random(s, acts), 
    num_episodes=1000,
    gamma=1.0
)
print(f"\nСредняя отдача (1000 эпизодов): {expected_G:.4f}")
