## Обучение с подкреплением
![Img](https://raw.githubusercontent.com/DPritykin/Control-Theory-Course/main/Term%202%20Nonlinear%20Dynamics%20and%20Control/Images/rl.png)

<font size="3">Термин <b>подкрепление (reinforcement)</b>  пришёл из поведенческой психологии и обозначает награду или наказание за некоторый получившийся результат, зависящий не только от самих принятых решений, но и внешних, не обязательно подконтрольных, факторов. Под обучением здесь понимается поиск способов достичь желаемого результата методом проб и ошибок (trial and error), то есть попыток решить задачу и использование накопленного опыта для усовершенствования своей стратегии в будущем.</font>

<font size="3">В лекции будут введены основные определения и описана формальная постановка задачи. Под желаемым результатом мы далее будем понимать максимизацию некоторой скалярной величины, называемой <b> вознаграждением (reward)</b> . Интеллектуальную сущность (систему/робота/алгоритм), принимающую решения, будем называть <b> агентом (agent)</b> . Агент взаимодействует с миром (world) или средой (environment), которая задаётся зависящим от времени <b> состоянием (state)</b> . Агенту в каждый момент времени в общем случае доступно только некоторое <b> наблюдение (observation)</b>  текущего состояния мира. Сам агент задаёт процедуру выбора <b> действия (action) </b> по доступным наблюдениям; эту процедуру далее будем называть <b> стратегией (policy)</b> . Процесс взаимодействия агента и среды определяется <b> динамикой среды (world dynamics)</b> , включающей правила смены состояний среды во времени и генерации вознаграждения.</font>

<font size="3">Буквы $\mathbf{s}$, $\mathbf{a}$, $\mathbf{r}$ зарезервируем для состояний, действий и наград соответственно; буквой t будем обозначать время в процессе взаимодействия</font>

Во многом изложение будет следовать конспекту
[Sergey Ivanov Reinforcement Learning Textbook. (Конспект по обучению с подкреплением)](https://arxiv.org/abs/2201.09746)

### Марковская динамическая система

Пусть заданы:

1. Пространство возможных состояний: $s \in \mathcal{S}$;
    
2. Пространство управляющий воздействий (или действий): $a \in \mathcal{A}$;
    
3. Оператор эволюции системы 
$$
    \begin{equation*}
    T: \mathcal{S} \times \mathcal{A} \times \mathcal{S'} \rightarrow [0,1]
    \end{equation*}
$$
    (определяет вероятность перехода в состояние $s' \in \mathcal{S}$ из состояния $s \in \mathcal{S}$ при выборе действия $a \in \mathcal{A}$: $s'\gets p(s'|s,a)$);
    
4. Вознаграждение
$$
    \begin{equation*}
    r(s, a, s'): \mathcal{S} \times \mathcal{A} \times \mathcal{S'} \rightarrow \mathbb{R}
    \end{equation*}
$$

5. $\gamma \in [0,1)$ - степенной показатель для кумулятивной награды $R$:
$$
    \begin{equation}
         R = \sum_{t=0}^{N} \gamma^t r_t
    \end{equation}
$$

6. Говорят, что система обладает марковским свойством, если

$$p(s_{t+1}=s, r_{t+1} = r|s_t, a_t, r_t, s_{t-1}, a_{t-1}, r_{t-1}, \dots, s_0, a_0) \equiv p(s_{t+1}=s, r_{t+1} = r|s_t, a_t), \quad \forall t, \forall s \in \mathcal{S}, \forall a \in \mathcal{A},$$
то есть эволюция системы не зависит только от текущего состояния и действия, и не зависит от предыдущих состояний и действий.

Эта терминология применяется для математического описания алгоритмов взаимодействия агента со средой, имеющих целью обучение агента стратегии (то есть алгоритму выбора действий $a$, исходя из текущего состояния $s$):
$$
\pi(s, a) = p(a | s).
$$

Оптимальной стратегией будем считать стратегию $\pi_{*}$, максимизирующую ожидаемое вознаграждение
$$
\pi_{*}(s, a) = \text{arg} \max_{\pi} E[R].
$$

Агент взаимодействует со средой, выбирая $\textit{действия}$ и получая по обратной связи состояния и $\textit{вознаграждения}$. Исходя из вознаграждений он "понимает", правильное ли действие он выбрал и таким образом обучается.


### V-функция и Q-функция

Введём следующие вспомогательные скалярные функции:

1. V-функцию (value-function, аналог функции Беллмана в динамическом программировании):
$$
    \begin{equation}
            V_{\pi}(s) = E \left[\sum_{t=0}^{\infty} \gamma^t r_t | \pi(s),\, s_0 = s \right]
    \end{equation}
$$
(ожидаемое вознаграждение, которое агент получит при старте из состояния $s$, где $E$ - математическое ожидание.
        
2. Q-функцию (q-function):
$$
    \begin{equation}
    Q_{\pi}(s,a) = \sum_{s'} T(s,a,s')\left(r(s,a,s') + \gamma V_{\pi}(s')\right)
    \end{equation}
$$    
(ожидаемое вознаграждение, которое агент получит, если находясь в состоянии $s$, выберет действие $a$)

### Принцип оптимальности Беллмана для value-функций

Принцип оптимальности Беллмана (см. лекции по задачам оптимального управления): экстремаль остаётся экстремалью для любой своей точки выбранной в качестве начальных условий.

Если агент стремится выбирать наилучшее действие $a$ для любого наперёд заданного состояния $s$, то
$$V_{*}(s) = \max_a Q_{*}(s,a) \quad \text{и} \quad Q_{*}(s,a) = \max_{\pi}Q(s,a).$$

Для функции $V(s)$ принцип оптимальности Беллмана выглядит так:

$$
\begin{equation}\label{eq:bellmanV}
V(s) = \max_a{ \sum_{s'} T(s,a,s')(r(s,a,s') + \gamma V(s'))}
\end{equation}
$$
(если мы находимся в состоянии s, мы выбираем лучшее действие)

Принцип оптимальности Беллмана позволяет получать значения $V(s)$ при переходе от $s'$ к $s$ (от следующего состояния к предыдущему), если известна $V(s')$.

Для Q-функции:
$$
\begin{equation}\label{eq:bellmanQ}
Q(s,a) = \sum_{s'} T(s,a,s')(r(s,a,s') + \gamma \max_{a'} Q(s',a'))
\end{equation}
$$

### Простейшая среда GridWorld

![Img](https://raw.githubusercontent.com/DPritykin/Control-Theory-Course/main/Term%202%20Nonlinear%20Dynamics%20and%20Control/Images/gridworld.png)

<font size="3">  Агент начинает игру в произвольном свободном положении(на примере слева снизу поля) и заканчивает в ячейке с  +1 ("сектор приз") либо -1 ("сектор штраф"). 
    На каждом шагу он может двигаться вертикально или горизонтально, но не может проходить через препятствия или уходить за пределы игрового поля.</font>

#### Детерминированная версия:
</font> <font size="3">Подразумевается, что действие на каждом шаге детерминировано, то есть если агент выбрал действие "переместиться в клетку справа", то именно такое действие и реализуется (нет случайных факторов, которые могут повлиять на выполнение команды).</font>

#### Недетерминированная версия
</font> <font size="3">Реализация выбранного действия на каждом шаге не детерминирована, то есть агент, принимая решение перейти на соседнюю клеточку (например, наверх), оказывается на ней с вероятностью 0.8, оставшаяся вероятность равномерно распределяется между перпендикулярными направлениями движения (то есть, если агент принял решение пойти наверх, то перпендикулярные направления - вправо-влево).</font>

## Поле
<font size="3">Зададим переменные, определяющие поле для игры</font>

In [1]:
import numpy as np

class WorldOptions(object):
    pass

options = WorldOptions()
options.board_rows = 3
options.board_cols = 4
options.win_state = (2, 3)
options.lose_state = (1, 3)
options.start_state = (0, 0)
options.obstacles = [(1, 1), (1, 2)]
options.deterministic = False

<font size="3">Класс <b>GridWorld </b> определяет параметры текущего состояния, а также возвращает следущее состояние в зависимости от выбранного действия</font>

In [2]:
class GridWorld:
    def __init__(self, world_options):
        self.board = np.zeros([world_options.board_rows, world_options.board_cols])
        self.obstacles = world_options.obstacles
        for obs in self.obstacles:
            self.board[obs[0], obs[1]] = -1
        self.action_dict = {"up": (1, 0),
                            "down": (-1, 0),
                            "left": (0, -1),
                            "right": (0, 1)
                            }
        self.start_state = world_options.start_state
        self.state =  self.start_state
        self.is_terminal_state = False
        self.win_state = world_options.win_state
        self.lose_state = world_options.lose_state
        self.deterministic = world_options.deterministic

    # Определяет вознаграждение согласно текущему состоянию
    def give_reward(self):
        if self.state == self.win_state:
            return 1
        elif self.state == self.lose_state:
            return -1
        else:
            return 0

    # Проверяет, лежит ли состояние state в границах игрового поля
    def is_within_bounds(self, state):
        return state[0] >= 0 and state[0] < self.board.shape[0] and \
               state[1] >= 0 and state[1] < self.board.shape[1]

    # Возвращает валидное следущее состояние в зависимости от выбранного действия action
    def next_position(self, action, move=True):
        if not self.deterministic:
            if action == "up":
                action_dst = np.random.choice(["up", "left", "right"], p=[0.8, 0.1, 0.1])
            if action == "down":
                action_dst = np.random.choice(["down", "left", "right"], p=[0.8, 0.1, 0.1])
            if action == "left":
                action_dst =  np.random.choice(["left", "up", "down"], p=[0.8, 0.1, 0.1])
            if action == "right":
                action_dst =  np.random.choice(["right", "up", "down"], p=[0.8, 0.1, 0.1])           
        else:
            action_dst = action
            
        next_state = tuple(self.state[i] + self.action_dict[action_dst][i] for i in range(2))
            
        if not self.is_within_bounds(next_state) or next_state in self.obstacles:
            next_state = self.state
        
        if move:
            self.state = next_state
            
            # Определяет, является ли текущее состояние state терминальным
            if (self.state == self.win_state) or (self.state == self.lose_state):
                self.is_terminal_state = True            
            
        return next_state
    
    # возвращает игру в исходное состояние
    def reset(self):
        self.state = self.start_state
        self.is_terminal_state = False

    # Выводит игровое поле
    def show_board(self):
        for row_id in reversed(range(np.size(self.board, 0))):
            print('-----------------')
            out = '| '
            for col_id in range(np.size(self.board, 1)):
                if self.board[row_id, col_id] == -1:
                    token = 'X'
                if self.board[row_id, col_id] == 0:
                    token = '0'
                if (row_id, col_id) == self.win_state:
                    token = 'W'
                if (row_id, col_id) == self.lose_state:
                    token = 'L'
                if (row_id, col_id) == self.start_state:
                    token = '*'                    
                out += token + ' | '
            print(out)
        print('-----------------')

In [3]:
from abc import ABC, abstractmethod

class Agent(ABC):

    def __init__(self, env_options):
        self.actions = ["up", "down", "left", "right"]  # множество допустимых действий

        self.learning_rate = 0.2                        # показатель степени для вычисления кумулятивного вознаграждения # learning rate - \alpha 
        self.exploration_rate = 0.3                     # exploration rate - вероятность "попробовать пойти не туда" 
        self.reward_discount = 0.9                      # показатель степени для вычисления кумулятивного вознаграждения (gamma)
        
        self.env = GridWorld(env_options)               # интерфейс взаимодействия со средой                
        self.env.show_board()
        
        self.traj_states = []                           # последовательный набор кортежей состояние-действие, траектория
        self.init_state_values()

    # абстрактные метод для реализации в классах-наследниках    
    @abstractmethod
    def init_state_values(self):     # инициализация нулями V-function или Q-function
        pass
    
    @abstractmethod
    def update_state_values(self):   # пересчёт значений V-function или Q-function в соответствии с алгоритмом обучения
        pass
    
    @abstractmethod
    def extract_value(self, action): # чтение текущего значения V-function или Q-function
        pass        
    
    # Выбирает лучшее действие для текущего состояния
    def choose_action(self):
        max_next_reward = 0
        action = "up"

        # Добавляем элемент случайности, чтобы агент не ходил всегда по одному пути
        if np.random.uniform(0, 1) <= self.exploration_rate:
            action = np.random.choice(self.actions)
        else:
            for a in self.actions:
                next_reward = self.extract_value(a)

                if next_reward >= max_next_reward:
                    action = a
                    max_next_reward = next_reward
        return action

    # Возвращает следующее состояние, соответствующее действию action
    def take_action(self, action):
        position = self.env.next_position(action, move=True)
        return 

    # стирает информацию о пройденной агентом траектории и ставит его на стартовую позицию
    def reset(self):
        self.env.reset()
        self.traj_states = []        

    # В течение нескольких раундов производим обновление значений, соответствующих каждому состоянию
    def play(self, roundsCount=10):
        round_id = 0

        while round_id < roundsCount:
            # Конец раунда
            if self.env.is_terminal_state: 
                self.append_state_to_traj(self.env.state, None)
                
                # обновление state_values в конце раунда (из конца траектории в начао)
                self.update_state_values()

                self.reset()
                round_id += 1
            else:
                action = self.choose_action()
                
                prev_state = self.env.state
                # Переходим в следущее состояние, выполняя действие
                self.take_action(action)
                
                # Добавляем состояние к текущей траектории
                self.append_state_to_traj(prev_state, action)

    # Выводит текущие значения для всех состояний
    def show_values(self):
        for row_id in reversed(range(np.size(self.state_values, 0))):
            print('----------------------------------')
            out = '| '
            for col_id in range(np.size(self.state_values, 1)):
                out += str(self.state_values[(row_id, col_id)]).ljust(6) + ' | '
            print(out)
        print('----------------------------------')

## Алгоритм обучения агента (Value-iteration)
<font size="3">В результате обучения агент должен уметь для каждого состояния определять лучшее действие. Для этого мы в первую очередь соотнесем каждому состоянию  значение (ожидаемое вознаграждение), используя формулу:</font>
### $V(s_t) \leftarrow (1-\alpha) \cdot V(s_t) + \alpha \cdot V(s_{t+1}) = V(s_t) + \alpha \cdot [V(s_{t+1}) - V(s_t)], \tag{1}$
<font size="3">где $s_t$ - текущее состояние, $s_{t+1}$ - состояние в следующий момент времени, $\alpha$ - скорость обучения.</font>

<font size="3">Имея такое соответствие, агент cможет на каждом шагу выбирать действие в соответствии с его представлениями о максимальном ожидаемом вознаграждении.</font>

<font size="3">В начале агент инициализирует все значения нулем, затем он случайным образом перемещается по полю, пока не доберётся до терминального положения. После этого с помощью формулы (1), проходя по траектории от терминального к начальному состоянию, агент обновляет значения V-функции для всех пройденных состояний.</font>

In [4]:
class Agent_V(Agent):
    def init_state_values(self):
        self.state_values = np.zeros(np.shape(self.env.board))
        
    def append_state_to_traj(self, state, action):
        self.traj_states.append(state)
        
    def update_state_values(self):
        reward = self.env.give_reward()
        self.state_values[self.env.state] = reward        
        
        for state in reversed(self.traj_states):
            reward = self.state_values[state] + self.learning_rate * (reward - self.state_values[state]) # формула (1)
            self.state_values[state] = round(reward, 3)
    
    def extract_value(self, action):
        next_state = self.env.next_position(action, move=False)
        return self.state_values[next_state]

## Алгортим обучения агента (Q-learning)

<font size="3">1. Рассмотрим кортеж $(s, a, r, s')$:</font>

<font size="3">2. Если $Q(s, a)$ вычислена на $k$-й итерации, информация $(s, a, r, s')$ позволяет получить оценку Q-функции для текущей траектории:</font>

### $\hat{Q}(s, a) = r(s,a,s') + \gamma \cdot \max_{a'}{Q_k(s',a')} \tag{2}$

<font size="3">3. Будем вычислять новую оценку для Q-функции как взвешенную сумму имеющейся оценки $Q_k(s, a)$ и новой информации $\hat{Q}(s,a)$:</font>

### $Q_{k+1}(s, a) = (1-\alpha) \cdot Q_{k}(s, a) + \alpha \cdot \hat{Q}(s,a) =  Q_{k}(s, a) + \alpha \cdot (\hat{Q}(s,a) - Q_{k}(s, a)) \tag{3}$

In [5]:
class Agent_Q(Agent):
    def init_state_values(self):
        self.state_values = {}
        for row_id in reversed(range(np.size(self.env.board, 0))):
            for col_id in range(np.size(self.env.board, 1)):
                self.state_values[(row_id, col_id)] = {}
                for action in self.actions:
                    self.state_values[(row_id, col_id)][action] = 0  # Q value is a dict of dict
                    
    def append_state_to_traj(self, state, action):
        self.traj_states.append([(state), action])
                    
    def update_state_values(self):
        reward = self.env.give_reward()
        
        for state in reversed(self.traj_states):
            if state[1] is None:
                for action in self.actions:
                    self.state_values[self.env.state][action] = reward                
            else:
                current_q_value = self.state_values[state[0]][state[1]]
                #q_value_hat = self.reward_discount * reward
                updated_q_value = current_q_value + self.learning_rate * (q_value_hat - current_q_value) # формула (3)
                self.state_values[state[0]][state[1]] = round(updated_q_value, 3)
            
            q_value_hat = max(self.state_values[state[0]].values()) # формула 2
    
    def extract_value(self, action):
        return self.state_values[self.env.state][action]

### Value-iteration в детерминированной среде

In [6]:
options.deterministic = True

ag = Agent_V(options)
ag.play(100)
ag.show_values()

-----------------
| 0 | 0 | 0 | W | 
-----------------
| 0 | X | X | L | 
-----------------
| * | 0 | 0 | 0 | 
-----------------
----------------------------------
| 0.994  | 0.996  | 0.998  | 1.0    | 
----------------------------------
| 0.992  | 0.0    | 0.0    | -1.0   | 
----------------------------------
| 0.953  | 0.769  | 0.294  | -0.467 | 
----------------------------------


### Value-iteration в среде со случайными возмущениями

In [7]:
options.deterministic = False

ag = Agent_V(options)
ag.play(200)
ag.show_values()

-----------------
| 0 | 0 | 0 | W | 
-----------------
| 0 | X | X | L | 
-----------------
| * | 0 | 0 | 0 | 
-----------------
----------------------------------
| 0.988  | 0.996  | 0.998  | 1.0    | 
----------------------------------
| 0.986  | 0.0    | 0.0    | -1.0   | 
----------------------------------
| 0.966  | 0.93   | 0.859  | -0.187 | 
----------------------------------


### Q-learning в среде со случайными возмущениями

In [8]:
options.deterministic = False

ag = Agent_Q(options)
ag.play(100)

print("resulting Q-values ... \n")
ag.state_values

-----------------
| 0 | 0 | 0 | W | 
-----------------
| 0 | X | X | L | 
-----------------
| * | 0 | 0 | 0 | 
-----------------
resulting Q-values ... 



{(2, 0): {'up': 0.9, 'down': 0.627, 'left': 0.481, 'right': 0.994},
 (2, 1): {'up': 0.872, 'down': 0.711, 'left': 0.757, 'right': 0.996},
 (2, 2): {'up': 0.756, 'down': 0.617, 'left': 0.348, 'right': 0.998},
 (2, 3): {'up': 1, 'down': 1, 'left': 1, 'right': 1},
 (1, 0): {'up': 0.992, 'down': 0.962, 'left': 0.919, 'right': 0.922},
 (1, 1): {'up': 0, 'down': 0, 'left': 0, 'right': 0},
 (1, 2): {'up': 0, 'down': 0, 'left': 0, 'right': 0},
 (1, 3): {'up': -1, 'down': -1, 'left': -1, 'right': -1},
 (0, 0): {'up': 0.984, 'down': 0.874, 'left': 0.762, 'right': 0.768},
 (0, 1): {'up': 0.337, 'down': 0.368, 'left': 0.939, 'right': 0.0},
 (0, 2): {'up': 0.0, 'down': 0.0, 'left': 0.0, 'right': 0.0},
 (0, 3): {'up': -0.723, 'down': 0.0, 'left': -0.216, 'right': -0.006}}