## Метод Actor-Critic

Теорема о градиенте стратегии связывает градиент целевой функции  и градиент самой стратегии:

$$\nabla_\theta J(\theta) = \mathbb{E}_\pi [Q^\pi(s, a) \nabla_\theta \ln \pi_\theta(a \vert s)]$$

Встает вопрос, как оценить $Q^\pi(s, a)$? Ранее в REINFORCE мы использовали отдачу $R_t$ (полученную методом Монте-Карло) в качестве несмещенной оценки $Q^\pi(s, a)$. В Actor-Critic же предлагается отдельно обучать нейронную сеть Q-функции - критика.

Актор-критиком часто называют обобщенный фреймворк (подход), нежели какой-то конкретный алгоритм. Как подход актор-критик не указывает, каким конкретно [policy gradient] методом обучается актор и каким [value based] методом обучается критик. Таким образом актор-критик задает целое семейство различных алгоритмов.

In [1]:
try:
    import colab
    COLAB = True
except ModuleNotFoundError:
    COLAB = False
    pass

if COLAB:
    !pip install "gymnasium[classic-control, atari, accept-rom-license]" --quiet
    !pip install piglet --quiet
    !pip install imageio_ffmpeg --quiet
    !pip install moviepy==1.0.3 --quiet

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m883.7/883.7 KB[0m [31m13.2 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.7/13.7 MB[0m [31m19.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.7/1.7 MB[0m [31m15.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for AutoROM.accept-rom-license (pyproject.toml) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m67.5/67.5 KB[0m [31m8.8 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m26.9/26.9 MB[0m [31m62.6 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m388.3/388.3 KB[0m [31m21.4 MB/s[0m eta [36m0:00:00[0m
[?25h  

In [10]:
import torch
import torch.nn as nn
from torch.distributions import Categorical
import gymnasium as gym
import numpy as np

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
device

device(type='cuda', index=0)

### Основной цикл

In [11]:
def print_mean_reward(step, episode_rewards):
    if not episode_rewards:
        return

    t = min(50, len(episode_rewards))    
    mean_reward = sum(episode_rewards[-t:]) / t
    print(f"step: {str(step).zfill(6)}, mean reward: {mean_reward:.2f}")
    return mean_reward


def to_tensor(x, dtype=np.float32):
    if isinstance(x, torch.Tensor):
        return x
    x = np.asarray(x, dtype=dtype)
    x = torch.from_numpy(x).to(device)
    return x


def run(
        env: gym.Env, hidden_size: int, n_hidden_layers:int, lr: float, gamma: float, max_episodes: int, 
        rollout_size: int, replay_buffer_size: int, critic_batch_size: int, critic_updates_per_actor: int
):
    # Инициализируйте агента `agent`, когда сделаете саму реализацию агента ниже по заданию.
    ####### Здесь ваш код ########
    state_dim = env.observation_space.shape[0]
    action_dim = env.action_space.n

    agent = ActorCriticAgent(state_dim, action_dim, hidden_size, n_hidden_layers, 
                             lr, gamma, replay_buffer_size)
    ##############################

    step = 0
    episode_rewards = []

    for i_episode in range(1, max_episodes + 1):
        cumulative_reward = 0        
        terminated = False
        state, _ = env.reset()
        
        while not terminated:
            step += 1

            action = agent.act(state)
            next_state, reward, terminated, truncated, _ = env.step(action)
            
            agent.append_to_replay_buffer(state, action, reward, next_state, terminated)
            state = next_state
            cumulative_reward += reward
            terminated |= truncated

        episode_rewards.append(cumulative_reward)
        
        # выполняем обновление
        if agent.update(rollout_size, critic_batch_size, critic_updates_per_actor):
            mean_reward = print_mean_reward(step, episode_rewards) 
            if mean_reward >= 200:
                print('Принято!')
                return
            episode_rewards = []

In [12]:
from collections import deque, namedtuple
from operator import attrgetter

class ActorBatch:
    def __init__(self):
        self.logprobs = []
        self.qvalues = []
        
    def append(self, log_prob, q_value):
        self.logprobs.append(log_prob)
        self.qvalues.append(q_value)
    
    def clear(self):
        self.logprobs.clear()
        self.qvalues.clear()


Transition = namedtuple('Transition', ['loss', 'state', 'action', 'reward', 'next_state', 'done'])

class PrioritizedReplayBuffer:
    def __init__(self, size):
        self.buffer = deque(maxlen=size)

    def softmax(self, xs, temp=1.):
        if not isinstance(xs, np.ndarray):
            xs = np.array(xs, dtype=np.float32)

        # Обрати внимание, насколько большая температура по умолчанию!
        exp_xs = np.exp((xs - xs.max()) / temp)
        return exp_xs / exp_xs.sum()
    
    def append(self, loss, state, action, reward, next_state, done):
        sample = Transition(loss, state, action, reward, next_state, done)
        self.buffer.append(sample)

    def sample_batch(self, n_samples):
        # Sample randomly `n_samples` samples from replay buffer weighting by priority (sample's loss)
        # and split an array of samples into arrays: states, actions, rewards, next_actions, dones
        # Also, keep samples' indices (into `indices`) to return them too!

        losses = [sample.loss for sample in self.buffer]
        probs = self.softmax(losses)
        indices = np.random.choice(len(self.buffer), n_samples, p=probs)
        states, actions, rewards, next_states, dones = [], [], [], [], []
        for i in indices:
            _, s, a, r, n_s, done = self.buffer[i]
            states.append(s)
            actions.append(a)
            rewards.append(r)
            next_states.append(n_s)
            dones.append(done)

        batch = np.array(states), np.array(actions), np.array(rewards), np.array(next_states), np.array(dones)
        return batch, indices

    def update_batch(self, indices, batch, new_losses):
        """Updates batches with corresponding indices replacing their loss value."""
        states, actions, rewards, next_states, is_done = batch

        for i in range(len(indices)):
            new_sample = Transition(new_losses[i], states[i], actions[i], rewards[i], next_states[i], is_done[i])
            self.buffer[indices[i]] = new_sample

    def sort(self):
        """Sorts replay buffer to move samples with lesser loss to the beginning 
        ==> they will be replaced with the new samples earlier."""
        new_rb = deque(maxlen=self.buffer.maxlen)
        new_rb.extend(sorted(self.buffer, key=attrgetter('loss')))
        self.buffer = new_rb

Попробуйте сначала реализовать без памяти прецедентов, а затем дополните вашу реализацию. Текущей реализацией приоритизированной памяти достаточно, чтобы пользоваться ей по аналогии с AgentBatch, стоит лишь добавить метод выборки всех данных по аналогии с `sample_batch`.

In [13]:
class MLPModel(nn.Module):
    def __init__(self, state_dim, hidden_dim, n_layers):
        super().__init__()
        act = nn.Tanh()
        modules = [nn.Linear(state_dim, hidden_dim), act]
        for _ in range(n_layers):
            modules.extend([nn.Linear(hidden_dim, hidden_dim), act])
        self.net = nn.Sequential(*modules)

    def forward(self, state):
        state = to_tensor(state)
        return self.net(state)

    
class ActorCriticModel(nn.Module):
    def __init__(self, state_dim, hidden_dim, n_hidden_layers, action_dim):
        super().__init__()

        # Инициализируйте сеть агента с двумя головами: softmax-актора и линейного критика
        # self.net, self.actor_head, self.critic_head =
        ####### Здесь ваш код ########
        self.net = MLPModel(state_dim, hidden_dim, n_hidden_layers)
        self.actor_head = nn.Sequential( nn.Linear(hidden_dim, action_dim), nn.Softmax(-1))
        self.critic_head = nn.Linear(hidden_dim, action_dim)

        ##############################
    def forward(self, state):
        # Вычислите выбранное действие, логарифм вероятности его выбора и соответствующее значение Q-функции
        ####### Здесь ваш код ########
        if len(state.shape) == 1:
            state=state[None, :]
        xstate = self.net(state)
        qvalues = self.critic_head(xstate)
        
        action_distribution = Categorical(probs = self.actor_head(xstate))
        action = action_distribution.sample()
        log_prob = action_distribution.log_prob(action)
        ##############################
        return action.item(), log_prob, qvalues[np.arange(state.shape[0]), action]
    
    def value_forward(self, state, action = None):
        # Вычислите значения Q-функции для данного состояния
        ####### Здесь ваш код ########
        if len(state.shape) == 1:
            state=state[None, :]
        xstate = self.net(state)
        qvalues = self.critic_head(xstate)

        if action is None:
            action_distribution = Categorical(probs = self.actor_head(xstate).detach())
            action = action_distribution.sample()
        ##############################
        return qvalues[np.arange(state.shape[0]), action]


class ActorCriticAgent:
    def __init__(self, state_dim, action_dim, hidden_size, n_hidden_layers, lr, gamma, replay_buffer_size):
        self.lr = lr
        self.gamma = gamma

        # Инициализируйте модель актор-критика и SGD оптимизатор (например, `torch.optim.Adam)`)
        ####### Здесь ваш код ########
        self.a2c = ActorCriticModel(state_dim, hidden_size, n_hidden_layers, action_dim).to(device)                          
        ##############################
        
        self.actor_batch = ActorBatch()
        self.replay_buffer = PrioritizedReplayBuffer(replay_buffer_size)
        
    def act(self, state):
        # Произведите выбор действия и сохраните необходимые данные в батч для последующего обучения
        # Не забудьте сделать q_value.detach()
        # self.actor_batch.append(..)
        ####### Здесь ваш код ########
        action, log_prob, qvalue = self.a2c(state)
        self.actor_batch.append(log_prob, qvalue.detach())
        ##############################
        return action

    
    def evaluate(self, state):
        return self.a2c.value_forward(state)
    
    def update(self, rollout_size, critic_batch_size, critic_updates_per_actor):
        if len(self.actor_batch.qvalues) < rollout_size:
            return False
        
        actor_loss = self.update_actor()
        critic_loss = self.update_critic(critic_batch_size, critic_updates_per_actor)

        loss = actor_loss + actor_loss

        # print(f"losses: Actor={actor_loss} .... Critic={critic_loss}")
        self.actor_batch.clear()
        return True

    def update_actor(self):
        qvalues = torch.stack(self.actor_batch.qvalues).to(device)
        logprobs = torch.stack(self.actor_batch.logprobs).to(device)

        # Реализуйте шаг обновления актора. Опционально: сделайте нормализацию отдач
        ####### Здесь ваш код ########
        self.optimizer.zero_grad()
        qvalues = qvalues.squeeze(-1)
        qvalues = (qvalues - qvalues.mean())/(torch.std(qvalues) +1e-6)
        policy_loss = -(qvalues * logprobs).mean() + 1e-3 * (logprobs * torch.exp(logprobs)).sum()
        policy_loss.backward()
        self.optimizer.step()

        return policy_loss.item()


        ##############################
    
    def update_critic(self, batch_size, critic_updates_per_actor):
        # Реализуйте critic_updates_per_actor шагов обучения критика.
        ####### Здесь ваш код ########
        critic_loss = 0
        for _ in range(critic_updates_per_actor):

            self.optimizer.zero_grad()
            batch, indices = self.replay_buffer.sample_batch(batch_size)
            td_loss, td_losses = self.compute_td_loss(*batch)
            td_loss.backward()
            self.optimizer.step()

            critic_loss += td_loss.item()

            with torch.no_grad():
                self.replay_buffer.update_batch(indices, batch, td_losses.detach().cpu().tolist())
    
    ##############################

        # re-sort replay buffer to prioritize replacing with new samples those samples
        # that have the least loss
        if len(self.replay_buffer.buffer) >= .75 * (self.replay_buffer.buffer.maxlen):
            self.replay_buffer.sort()
        

        return critic_loss/critic_updates_per_actor

    # @torch.no_grad()
    def append_to_replay_buffer(self, s, a, r, next_s, done):
        # Добавьте новый экземпляр данных в память прецедентов.
        ####### Здесь ваш код ########
        _, losses = self.compute_td_loss([s], [a], [r], [next_s], [done])
        self.replay_buffer.append(losses.cpu().tolist()[0], s, a, r, next_s, done)
        ##############################
        
    def compute_td_loss(
        self, states, actions, rewards, next_states, is_done, regularizer=.1
    ):
        """ Считатет td ошибку, используя лишь операции фреймворка torch"""

        # переводим входные данные в тензоры
        states = to_tensor(states)                      # shape: [batch_size, state_size]
        actions = to_tensor(actions, int).long()        # shape: [batch_size]
        rewards = to_tensor(rewards)                    # shape: [batch_size]
        next_states = to_tensor(next_states)            # shape: [batch_size, state_size]
        is_done = to_tensor(is_done, bool)              # shape: [batch_size]

        # Реализуйте шаг обновления критика
        ####### Здесь ваш код ########

        state_values = self.evaluate(states).squeeze(-1)
        next_state_values = self.evaluate(next_states).squeeze(-1).detach()
        
        td_losses = (rewards + self.gamma * next_state_values * (~is_done) - state_values)**2
        td_loss = torch.mean(td_losses) + regularizer * next_state_values.mean()
        ##############################
        return td_loss, td_losses

In [14]:
from gymnasium.wrappers.time_limit import TimeLimit
env_name = "CartPole-v1"

run(
    env = TimeLimit(gym.make(env_name), 1000),
    max_episodes = 50000,  # количество эпизодов обучения
    hidden_size = 64,  # кол-во переменных в скрытых слоях
    rollout_size = 500,  # через столько шагов стратегия будет обновляться
    lr = 0.01, # learning rate
    n_hidden_layers = 1,
    gamma = 0.995,  # дисконтирующий множитель,
    replay_buffer_size = 5000,
    critic_batch_size = 64,
    critic_updates_per_actor = 32,
)

step: 000515, mean reward: 23.41
step: 001019, mean reward: 10.50
step: 001524, mean reward: 10.52
step: 002024, mean reward: 9.66
step: 002527, mean reward: 15.24
step: 003028, mean reward: 10.02
step: 003620, mean reward: 197.33
step: 004136, mean reward: 64.50
step: 004648, mean reward: 15.52
step: 005152, mean reward: 19.38
step: 005659, mean reward: 12.68
step: 006169, mean reward: 46.36
step: 006672, mean reward: 20.12
step: 007179, mean reward: 11.79
step: 007685, mean reward: 9.36
step: 008188, mean reward: 22.86
step: 008696, mean reward: 9.78
step: 009199, mean reward: 11.43
step: 009705, mean reward: 10.33
step: 010304, mean reward: 119.80
step: 010806, mean reward: 13.21
step: 011308, mean reward: 10.24
step: 011819, mean reward: 26.89
step: 012327, mean reward: 9.96
step: 012827, mean reward: 500.00
Принято!


In [None]:
# https://github.com/vwxyzjn/cleanrl/blob/master/cleanrl/ppg_procgen.py