# Actor-Critic

Теорема о градиенте стратегии связывает градиент целевой функции  и градиент самой стратегии:

$$\nabla_\theta J(\theta) = \mathbb{E}_\pi [Q^\pi(s, a) \nabla_\theta \ln \pi_\theta(a \vert s)]$$

Встает вопрос, как оценить $Q^\pi(s, a)$? В чистом policy-based алгоритме REINFORCE используется отдача $G_t$, полученная методом Монте-Карло в качестве несмещенной оценки $Q^\pi(s, a)$. В Actor-Critic же предлагается отдельно обучать нейронную сеть Q-функции — критика.

Актор-критиком часто называют обобщенный фреймворк (подход), нежели какой-то конкретный алгоритм. Как подход актор-критик не указывает, каким конкретно [policy gradient] методом обучается актор и каким [value based] методом обучается критик. Таким образом актор-критик задает целое [семейство](https://proceedings.neurips.cc/paper_files/paper/1999/file/6449f44a102fde848669bdd9eb6b76fa-Paper.pdf) различных алгоритмов. Рекомендую в качестве шпаргалки использовать упомянутый в тетрадке с REINFORCE [пост из блога Lilian Weng](https://lilianweng.github.io/posts/2018-04-08-policy-gradient/), посвященный наиболее популярным алгоритмам семейства актор-критиков

В данной тетрадке познакомимся с наиболее простым вариантом актор-критика, который так и называют Actor-Critic:

In [47]:
# Cтавим нужные зависимости, если это колаб
try:
    import google.colab
    COLAB = True
except ModuleNotFoundError:
    COLAB = False
    pass

if COLAB:
    !pip -q install "gymnasium[classic-control, atari, accept-rom-license]"
    !pip -q install piglet
    !pip -q install imageio_ffmpeg
    !pip -q install moviepy==1.0.3

In [48]:
from collections import deque

import gymnasium as gym
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
from torch.distributions import Categorical

%matplotlib inline

In [49]:
env = gym.make("CartPole-v1")
env.reset()

print(f'{env.observation_space=}')
print(f'{env.action_space=}')

n_actions = env.action_space.n
state_dim = env.observation_space.shape
print(f'Action_space: {n_actions} | State_space: {env.observation_space.shape}')

env.observation_space=Box([-4.8000002e+00 -3.4028235e+38 -4.1887903e-01 -3.4028235e+38], [4.8000002e+00 3.4028235e+38 4.1887903e-01 3.4028235e+38], (4,), float32)
env.action_space=Discrete(2)
Action_space: 2 | State_space: (4,)


(1 балл)

In [50]:
def to_tensor(x, dtype=np.float32):
    if isinstance(x, torch.Tensor):
        return x
    x = np.asarray(x, dtype=dtype)
    x = torch.from_numpy(x)
    return x

def symlog(x):
    """Compute symlog values for a vector `x`. It's an inverse operation for symexp."""
    return torch.sign(x) * torch.log(torch.abs(x) + 1)

def symexp(x):
    """Compute symexp values for a vector `x`. It's an inverse operation for symlog."""
    return torch.sign(x) * (torch.exp(torch.abs(x)) - 1.0)


class SymExpModule(nn.Module):
    def forward(self, x):
        return symexp(x)

def select_action_eps_greedy(Q, state, epsilon):
    """Выбирает действие epsilon-жадно."""
    if not isinstance(state, torch.Tensor):
        state = torch.tensor(state, dtype=torch.float32)
    Q_s = Q(state).detach().numpy()

    # action =
    ####### Здесь ваш код ########
    if np.random.random() < epsilon:
        action = np.random.randint(0, len(Q_s))
    else:
        action = np.argmax(Q_s)
    ##############################

    action = int(action)
    return action

def sample_batch(replay_buffer, n_samples):
    # sample randomly `n_samples` samples from replay buffer
    # and split an array of samples into arrays: states, actions, rewards, next_actions, terminateds
    ####### Здесь ваш код ########
    rng = np.random.default_rng()
    indices = rng.choice(len(replay_buffer), size=n_samples, replace=True)
    samples = [replay_buffer[i] for i in indices]
    states, actions, rewards, next_states, terminateds = zip(*samples)
    ##############################

    return np.array(states), np.array(actions), np.array(rewards), np.array(next_states), np.array(terminateds)

## Shared-body Actor-Critic

Актор и критик могут обучаться в разных режимах — актор только on-policy (шаг обучения на текущей собранной подтраектории), а критик on-policy или off-policy (шаг обучения на текущей подтраектории или на батче из replay buffer). Это с одной стороны привносит гибкость в обучение, с другой — усложняет его.

Если актор и критик оба обучаются on-policy, то имеет смысл объединить их сетки в одну и делать общий шаг обратного распространения ошибки. Однако, если они обучаются в разных режимах (и с разной частотой обновления), то велика вероятность, что их шаги обучения могут начать конфликтовать в случае общего тела — для такого варианта намного предпочтительнее разделить их на разные подсети (либо аккуратно настраивать гиперпарметры, чтобы стабилизировать обучение). В целом, рекомендуется использовать общий энкодер наблюдений, а далее как можно скорее разделять головы.

Сделаем реализацию актор-критика с общим телом и с on-policy вариантом обучения.

In [51]:
class ActorBatch:
    def __init__(self):
        self.logprobs = []
        self.q_values = []
        self.states = []
        self.actions = []

    def append(self, log_prob, q_value, state=None, action=None):
        self.logprobs.append(log_prob)
        self.q_values.append(q_value)
        if state is not None:
            self.states.append(state)
        if action is not None:
            self.actions.append(action)

    def clear(self):
        self.logprobs.clear()
        self.q_values.clear()
        self.states.clear()
        self.actions.clear()

(3 балла)

In [52]:
class ActorCriticModel(nn.Module):
    def __init__(self, input_dim, hidden_dims, output_dim):
        super().__init__()

        # Инициализируйте сеть агента с двумя головами: softmax-актора и линейного критика
        # self.net, self.actor_head, self.critic_head =
        ####### Здесь ваш код ########
        # Создаем общее тело (encoder) сети
        layers = []
        prev_dim = input_dim
        for hidden_dim in hidden_dims:
            layers.append(nn.Linear(prev_dim, hidden_dim))
            layers.append(nn.ReLU())
            prev_dim = hidden_dim
        self.net = nn.Sequential(*layers)
        
        # Actor head: линейный слой для logits действий
        self.actor_head = nn.Linear(prev_dim, output_dim)
        
        # Critic head: линейный слой для Q-значений (одно значение на действие)
        self.critic_head = nn.Linear(prev_dim, output_dim)
        ##############################

    def forward(self, state):
        # Вычислите выбранное действие, логарифм вероятности его выбора и соответствующее значение Q-функции
        ####### Здесь ваш код ########
        # Пропускаем состояние через общее тело
        features = self.net(state)
        
        # Получаем logits для актора и создаем распределение
        actor_logits = self.actor_head(features)
        dist = Categorical(logits=actor_logits)
        
        # Сэмплируем действие и получаем log_prob
        action = dist.sample()
        log_prob = dist.log_prob(action)
        
        # Получаем Q-значения для всех действий из критика
        q_values = self.critic_head(features)
        
        # Берем Q-значение для выбранного действия
        # Обрабатываем случай, когда state - один пример (1D) или батч (2D)
        if q_values.dim() == 1:
            Q_s_a = q_values[action]
        else:
            Q_s_a = q_values.gather(1, action.unsqueeze(1)).squeeze(1)
        ##############################

        return action, log_prob, Q_s_a

    def evaluate(self, state):
        # Вычислите значения Q-функции для данного состояния
        ####### Здесь ваш код ########
        # Пропускаем состояние через общее тело
        features = self.net(state)
        
        # Получаем Q-значения для всех действий из критика
        q_values = self.critic_head(features)
        ##############################
        return q_values

(6 баллов)

In [None]:
class ActorCriticAgent:
    def __init__(self, state_dim, action_dim, hidden_dims, lr, gamma, critic_rb_size):
        self.lr = lr
        self.gamma = gamma

        # Инициализируйте модель актор-критика и SGD оптимизатор (например, `torch.optim.Adam)`)
        ####### Здесь ваш код ########
        self.actor_critic = ActorCriticModel(state_dim, hidden_dims, action_dim)
        self.opt = torch.optim.Adam(self.actor_critic.parameters(), lr=lr)
        ##############################

        self.actor_batch = ActorBatch()
        self.critic_rb = deque(maxlen=critic_rb_size)

    def act(self, state):
        # Произведите выбор действия и сохраните необходимые данные в батч для последующего обучения
        # Не забудьте сделать q_value.detach()
        # self.actor_batch.append(..)
        ####### Здесь ваш код ########
        if not isinstance(state, torch.Tensor):
            state = torch.tensor(state, dtype=torch.float32)
        
        # Добавляем batch dimension, если его нет
        if state.dim() == 1:
            state = state.unsqueeze(0)
        
        action, log_prob, q_value = self.actor_critic(state)
        
                # Сохраняем данные в батч до squeeze (для правильной формы тензоров)    
        # Сохраняем также состояние и действие для возможного пересчета Q-значений
        # Сохраняем log_prob без detach, чтобы можно было использовать градиенты при необходимости
        self.actor_batch.append(
            log_prob,
            q_value.detach().item(),
            state.detach().clone(),
            action.detach().clone()
        )
        
        # Убираем batch dimension для возврата
        action = action.squeeze(0) if action.dim() > 0 else action
        log_prob = log_prob.squeeze(0) if log_prob.dim() > 0 else log_prob
        q_value = q_value.squeeze(0) if q_value.dim() > 0 else q_value
        
        return action.item()
        ##############################

    def append_to_replay_buffer(self, s, a, r, next_s, terminated):
        # Добавьте новый экземпляр данных в память прецедентов.
        ####### Здесь ваш код ########
        self.critic_rb.append((s, a, r, next_s, terminated))
        ##############################

    def evaluate(self, state):
        return self.actor_critic.evaluate(state)

    def update(self, rollout_size, critic_batch_size, critic_updates_per_actor):
        if len(self.actor_batch.q_values) < rollout_size:
            return

        self.opt.zero_grad()
        loss = self.update_critic(critic_batch_size, critic_updates_per_actor)  
        loss = loss + self.update_actor()  # Используем + вместо += для избежания in-place операции
        loss.backward()
        
        # Gradient clipping для стабильности обучения
        torch.nn.utils.clip_grad_norm_(self.actor_critic.parameters(), max_norm=1.0)

        self.opt.step()
        self.actor_batch.clear()
        # Не очищаем critic_rb, так как это replay buffer для off-policy обучения
        # self.critic_rb.clear()

    def update_actor(self):
        # Пересчитываем Q-значения и logprobs после обновления критика для более точной оценки
        if len(self.actor_batch.states) > 0 and len(self.actor_batch.actions) > 0:
            # Убеждаемся, что состояния имеют правильную форму перед stack
            states_list = []
            for s in self.actor_batch.states:
                # Убираем batch dimension, если он есть
                if s.dim() > 1:
                    states_list.append(s.squeeze(0))
                else:
                    states_list.append(s)
            states_tensor = torch.stack(states_list)
            
            actions_tensor = torch.stack(self.actor_batch.actions)
            
            # Пересчитываем logprobs с градиентами
            features = self.actor_critic.net(states_tensor)
            actor_logits = self.actor_critic.actor_head(features)
            dist = Categorical(logits=actor_logits)
            # actions_tensor после stack может иметь форму [batch_size] или [batch_size, 1]
            # Преобразуем в одномерный тензор и убеждаемся, что это long
            # Преобразуем actions_tensor в одномерный тензор
            if actions_tensor.dim() > 1:
                actions_flat = actions_tensor.squeeze(-1).long()
            else:
                actions_flat = actions_tensor.long()
            logprobs = dist.log_prob(actions_flat)
            
            # Получаем Q-значения для всех действий (с градиентами для обновления актора)
            q_values_all = self.actor_critic.evaluate(states_tensor)
            # Берем Q-значения для выбранных действий
            # Убеждаемся, что actions_flat имеет правильную форму [batch_size]
            if actions_flat.dim() == 0:
                actions_flat = actions_flat.unsqueeze(0)
            elif actions_flat.dim() > 1:
                actions_flat = actions_flat.squeeze()
            
            # q_values_all должна иметь форму [batch_size, num_actions]
            # actions_flat должна иметь форму [batch_size]
            # Для gather нужен индекс формы [batch_size, 1]
            # Используем индексацию напрямую, если gather не работает
            if q_values_all.dim() == 2 and actions_flat.dim() == 1:
                # Стандартный случай: q_values_all [batch_size, num_actions], actions_flat [batch_size]
                Q_s_a = q_values_all.gather(1, actions_flat.unsqueeze(1)).squeeze(1)
            else:
                # Альтернативный способ: используем индексацию
                batch_indices = torch.arange(q_values_all.shape[0], device=q_values_all.device)
                Q_s_a = q_values_all[batch_indices, actions_flat]
            
            # Нормализуем Q-значения для стабильности (вычитаем среднее)
            Q_s_a = Q_s_a - Q_s_a.mean().detach()
        else:
            # Fallback: пересчитываем logprobs, если состояния не сохранены
            # Используем сохраненные logprobs (они должны иметь градиенты)
            logprobs = torch.stack(self.actor_batch.logprobs)
            Q_s_a = to_tensor(self.actor_batch.q_values)
            # Нормализуем Q-значения для стабильности
            Q_s_a = Q_s_a - Q_s_a.mean().detach()

        # Реализуйте шаг обновления актора — вычислите ошибку `loss` и произведите шаг обновления градиентным спуском.
        ####### Здесь ваш код ########
        # Policy gradient loss: -E[Q(s,a) * log π(a|s)]
        # Минимизируем отрицательный policy gradient (максимизируем ожидаемую награду)
        # Q_s_a уже нормализован выше
        loss = -(Q_s_a * logprobs).mean()
        ##############################
        
        return loss

    def update_critic(self, batch_size, n_updates=1):
        # Реализуйте n_updates шагов обучения критика.
        ####### Здесь ваш код ########
        total_loss = None
        n_actual_updates = 0
        
        for _ in range(n_updates):
            if len(self.critic_rb) < batch_size:
                break
            
            # Сэмплируем батч из replay buffer
            states, actions, rewards, next_states, terminateds = sample_batch(
                self.critic_rb, batch_size
            )
            
            # Вычисляем TD loss
            loss = self.compute_td_loss(
                states, actions, rewards, next_states, terminateds
            )
            
            if total_loss is None:
                total_loss = loss
            else:
                total_loss = total_loss + loss
            n_actual_updates += 1
        
        # Возвращаем средний loss (или 0, если не было обновлений)
        if n_actual_updates > 0:
            total_loss = total_loss / n_actual_updates
        else:
            total_loss = torch.tensor(0.0, requires_grad=True)
        
        return total_loss
        ##############################

    def compute_td_loss(
        self, states, actions, rewards, next_states, terminateds, regularizer=0.1
    ):
        # переводим входные данные в тензоры
        s = to_tensor(states)                     # shape: [batch_size, state_size]
        a = to_tensor(actions, int).long()        # shape: [batch_size]
        r = to_tensor(rewards)                    # shape: [batch_size]
        s_next = to_tensor(next_states)           # shape: [batch_size, state_size]
        term = to_tensor(terminateds, bool)       # shape: [batch_size]


        # получаем Q[s, a] для выбранных действий в текущих состояниях (для каждого примера из батча)
        # Q_s_a = ...
        ####### Здесь ваш код ########
        q_values = self.actor_critic.evaluate(s)
        Q_s_a = q_values.gather(1, a.unsqueeze(1)).squeeze(1)
        ##############################

        # получаем Q[s_next, *] — значения полезности всех действий в следующих состояниях
        # Q_sn = ...,
        # а затем вычисляем V*[s_next] — оптимальные значения полезности следующих состояний
        # V_sn = ...
        ####### Здесь ваш код ########
        with torch.no_grad():
            Q_sn = self.actor_critic.evaluate(s_next)
            # Для Actor-Critic используем значение из текущей политики, а не max
            # Получаем вероятности действий из актора
            features_next = self.actor_critic.net(s_next)
            actor_logits_next = self.actor_critic.actor_head(features_next)
            probs_next = torch.softmax(actor_logits_next, dim=1)
            # V(s_next) = E_a~π[Q(s_next, a)] = sum_a π(a|s_next) * Q(s_next, a)
            V_sn = (probs_next * Q_sn).sum(dim=1)
        ##############################

        # вычисляем TD target и далее TD error
        # target = ...
        # td_error = ...
        ####### Здесь ваш код ########
        # TD target: r + gamma * V*(s_next) * (1 - terminated)
        target = r + self.gamma * V_sn * (~term).float()
        # TD error: Q(s,a) - target
        td_error = Q_s_a - target
        ##############################

        # MSE loss для минимизации
        loss = torch.mean(td_error ** 2)
        # добавляем регуляризацию на значения Q
        loss += regularizer * Q_s_a.mean()
        return loss

In [56]:
def run_actor_critic(
        env_name="CartPole-v1",
        hidden_dims=(128, 128), lr=5e-4,
        total_max_steps=200_000,
        train_schedule=16, replay_buffer_size=50000, batch_size=64, critic_updates_per_actor=4,
        eval_schedule=1000, smooth_ret_window=10, success_ret=200.
):
    env = gym.make(env_name)
    episode_return_history = deque(maxlen=smooth_ret_window)

    agent = ActorCriticAgent(
        state_dim=env.observation_space.shape[0], action_dim=env.action_space.n, hidden_dims=hidden_dims,
        lr=lr, gamma=.995, critic_rb_size=replay_buffer_size
    )

    s, _ = env.reset()
    done, episode_return = False, 0.
    eval = False

    for global_step in range(1, total_max_steps+1):
        a = agent.act(s)
        s_next, r, terminated, truncated, _ = env.step(a)
        episode_return += r
        done = terminated or truncated

        # train step
        agent.append_to_replay_buffer(s, a, r, s_next, terminated)
        agent.update(train_schedule, batch_size, critic_updates_per_actor)

        # evaluate
        if global_step % eval_schedule == 0:
            eval = True

        s = s_next
        if done:
            if eval:
                episode_return_history.append(episode_return)
                avg_return = np.mean(episode_return_history)
                print(f'{global_step=} | {avg_return=:.3f}')
                if avg_return >= success_ret:
                    print('Решено!')
                    break

            s, _ = env.reset()
            done, episode_return = False, 0.
            eval = False

run_actor_critic(eval_schedule=2000, total_max_steps=100_000)

global_step=2007 | avg_return=10.000
global_step=4005 | avg_return=11.500
global_step=6010 | avg_return=21.333
global_step=8006 | avg_return=19.250
global_step=10019 | avg_return=20.600
global_step=12011 | avg_return=20.333
global_step=14035 | avg_return=23.000
global_step=16024 | avg_return=24.000
global_step=18012 | avg_return=29.778
global_step=20052 | avg_return=36.400
global_step=22111 | avg_return=48.200
global_step=24096 | avg_return=67.300
global_step=26047 | avg_return=76.400
global_step=28029 | avg_return=90.200
global_step=30110 | avg_return=101.100
global_step=32153 | avg_return=118.300
global_step=34102 | avg_return=126.000
global_step=36086 | avg_return=135.900
global_step=38065 | avg_return=138.900
global_step=40123 | avg_return=144.300
global_step=42047 | avg_return=148.000
global_step=44089 | avg_return=143.800
global_step=46187 | avg_return=154.000
global_step=48001 | avg_return=152.400
global_step=50113 | avg_return=152.400
global_step=52081 | avg_return=146.800
glob