# 'FrozenLake-v0' gym environment

In [1]:
# %pip install gymnasium

In [2]:
import gymnasium as gym
import numpy as np
import time

In [3]:
gym.__version__

'1.1.1'

In [4]:
env = gym.make('FrozenLake-v1', render_mode="ansi")
env.reset()
print(env.render())


[41mS[0mFFF
FHFH
FFFH
HFFG



## Policy Iteration

![Policy Iteration](https://miro.medium.com/max/2400/1*07AboseYfdjqknCq0kG8Ow.png)


---

### Policy Iteration (с использованием итеративной оценки) для приближённого вычисления $\pi \approx \pi_*$

#### 1. Инициализация

Задать $V(s) \in \mathbb{R}$ и $\pi(s) \in \mathcal{A}(s)$ произвольно для всех $s \in \mathcal{S}$

---

#### 2. Оценка стратегии (Policy Evaluation)

Цикл:

* $\Delta \leftarrow 0$
* Цикл по каждому $s \in \mathcal{S}$:

  * $v \leftarrow V(s)$
  * $V(s) \leftarrow \sum_{s', r} p(s', r \mid s, \pi(s)) \left[ r + \gamma V(s') \right]$
  * $\Delta \leftarrow \max(\Delta, |v - V(s)|)$

Пока $\Delta \geq \theta$
(где $\theta$ — небольшое положительное число, задающее точность оценки)

---

#### 3. Улучшение стратегии (Policy Improvement)

* $\text{policy-stable} \leftarrow \text{true}$
* Для каждого $s \in \mathcal{S}$:

  * $\text{old-action} \leftarrow \pi(s)$
  * $\pi(s) \leftarrow \arg\max_a \sum_{s',r} p(s', r \mid s,a)\left[ r + \gamma V(s') \right]$
  * Если $\text{old-action} \neq \pi(s)$, то $\text{policy-stable} \leftarrow \text{false}$

Если стратегия стабильна — остановиться и вернуть $V \approx v_*$, $\pi \approx \pi_*$;
иначе перейти к шагу 2.

---

Если хочешь — могу переписать это как Python-функцию, понятную для Jupyter.


### Описание параметров для функции policy iteration

**policy**: Двумерный массив размера n(S) × n(A), каждая ячейка представляет собой вероятность выбора действия a в состоянии s.

**environment**: Инициализированный объект среды OpenAI Gym

**discount_factor**: Коэффициент дисконтирования для MDP

**theta**: Порог изменения функции ценности. Когда обновление значения становится меньше этого числа, итерации прекращаются.

**max_iterations**: Максимальное количество итераций, чтобы избежать бесконечного выполнения программы

Функция вернёт вектор размера nS, представляющий функцию ценности для каждого состояния.

---

Теперь начнём с шага оценки стратегии (policy evaluation). Цель — сходиться к истинной функции ценности для заданной стратегии π. Мы определим функцию, которая будет возвращать нужную функцию ценности.


In [5]:
def policy_evaluation(policy, environment, discount_factor=1.0, theta=1e-9, max_iterations=1e9):
    # Количество итераций оценки
    evaluation_iterations = 1
    # Инициализируем функцию ценности для каждого состояния нулём
    V = np.zeros(environment.observation_space.n)
    # Повторяем, пока изменение значений меньше порога
    for i in range(int(max_iterations)):
        # Инициализируем изменение функции ценности нулём
        delta = 0
        # Проходим по каждому состоянию
        for state in range(environment.observation_space.n):
           # Начальное новое значение для текущего состояния
           v = 0
           # Перебираем все возможные действия из этого состояния
           for action, action_probability in enumerate(policy[state]):
             # Проверяем, каким будет следующее состояние
             for state_probability, next_state, reward, terminated in environment.P[state][action]:
                  # Вычисляем ожидаемое значение
                  v += action_probability * state_probability * (reward + discount_factor * V[next_state])

           # Вычисляем абсолютное изменение функции ценности
           delta = max(delta, np.abs(V[state] - v))
           # Обновляем значение функции ценности
           V[state] = v
        evaluation_iterations += 1

        # Завершаем, если изменение функции ценности незначительно
        if delta < theta:
            print(f'Стратегия оценена за {evaluation_iterations} итераций.')
            return V


Теперь переходим к части улучшения стратегии в алгоритме итерации стратегии (policy iteration).  
Нам понадобится вспомогательная функция, которая будет делать «один шаг вперёд» (one-step lookahead), чтобы вычислить функцию ценности состояния.  
Эта функция будет возвращать массив длины nA, содержащий ожидаемое значение для каждого действия.


In [6]:
def one_step_lookahead(environment, state, V, discount_factor):
    action_values = np.zeros(environment.action_space.n)
    for action in range(environment.action_space.n):
        for probability, next_state, reward, terminated in environment.P[state][action]:
            action_values[action] += probability * (reward + discount_factor * V[next_state])
    return action_values

Теперь общий процесс итерации стратегии (policy iteration) будет описан следующим образом.  
Функция вернёт кортеж (policy, V), где:

- **policy** — оптимальная стратегия в виде матрицы вероятностей (для каждого состояния и действия)
- **V** — функция ценности для каждого состояния


In [7]:
def policy_iteration(environment, discount_factor=1.0, max_iterations=1e9):
    # Начинаем со случайной стратегии:
    # количество состояний × количество действий / равномерно по действиям
    policy = np.ones([environment.observation_space.n, environment.action_space.n]) / environment.action_space.n
    # Счётчик оценённых стратегий
    evaluated_policies = 1
    # Повторять до сходимости или достижения предела итераций
    for i in range(int(max_iterations)):
        stable_policy = True
        # Оцениваем текущую стратегию
        V = policy_evaluation(policy, environment, discount_factor=discount_factor)
        # Проходим по каждому состоянию и пытаемся улучшить выбранные действия (улучшение стратегии)
        for state in range(environment.observation_space.n):
            # Текущее лучшее действие в этом состоянии по текущей стратегии
            current_action = np.argmax(policy[state])
            # Смотрим на шаг вперёд и проверяем, оптимально ли текущее действие
            # Проверим все возможные действия в этом состоянии
            action_value = one_step_lookahead(environment, state, V, discount_factor)
            # Выбираем более выгодное действие
            best_action = np.argmax(action_value)
            # Если действие изменилось
            if current_action != best_action:
                stable_policy = False
                # Обновляем стратегию жадно (greedy)
                policy[state] = np.eye(environment.action_space.n)[best_action]
        evaluated_policies += 1
        # Если стратегия не изменилась — алгоритм сошёлся, возвращаем результат
        if stable_policy:
            print(f'Оценено {evaluated_policies} стратегий.')
            return policy, V

## Value Iteration

![Policy Iteration](https://miro.medium.com/max/2400/1*2Gi8h7WzP4-vfyMzZnLuqw.png)

## Value Iteration — для приближения оптимальной стратегии $ \pi^* $

Алгоритм находит оптимальную стратегию, итеративно обновляя ценности состояний $ V(s) $.

### 🔧 Параметры:
- $ \theta > 0 $: маленький порог, определяющий точность сходимости
- $ \gamma $: коэффициент дисконтирования

### 🔁 Шаги алгоритма:

1. **Инициализация**:  
Задаём начальные значения $ V(s) $ для всех состояний $ s \in \mathcal{S}^+ $, кроме терминальных:  
$$
V(\text{terminal}) = 0
$$

2. **Основной цикл**:  
Повторяем до тех пор, пока значения не стабилизируются:

```
Δ ← 0  
Для каждого состояния s ∈ S:  
 v ← V(s)  
 V(s) ← maxₐ ∑_{s', r} p(s', r | s, a) · [r + γ·V(s')]  
 Δ ← max(Δ, |v − V(s)|)  
Повторять, пока Δ ≥ θ  
```

- Здесь $ p(s', r \mid s, a) $ — вероятность попасть в $ s' $ с наградой $ r $, выбрав действие $ a $ из состояния $ s $
- Мы ищем лучшее действие $ a $, которое максимизирует ожидаемую сумму наград

3. **Формирование стратегии** (после сходимости):  
Когда $ V(s) $ перестаёт меняться существенно, определяем стратегию:

$$
\pi(s) = \arg\max_a \sum_{s', r} p(s', r | s, a) \cdot [r + \gamma V(s')]
$$

- В каждом состоянии $ s $ выбираем такое действие $ a $, которое ведёт к наибольшей ожидаемой ценности.

### 🧠 Суть:
- Алгоритм напрямую улучшает значения $ V(s) $
- Стратегия $ \pi(s) $ строится в самом конце на основе финальных $ V(s) $


Параметры определяются аналогично для value iteration.  
Алгоритм value iteration можно реализовать похожим образом в виде кода:


In [8]:
def value_iteration(environment, discount_factor=1.0, theta=1e-9, max_iterations=1e9):
    # Инициализируем функцию ценности состояний нулями для всех состояний среды
    V = np.zeros(environment.observation_space.n)
    for i in range(int(max_iterations)):
        # Условие досрочной остановки
        delta = 0
        # Обновляем каждое состояние
        for state in range(environment.observation_space.n):
            # Один шаг вперёд для расчёта ценностей действий из состояния
            action_value = one_step_lookahead(environment, state, V, discount_factor)
            # Выбираем лучшее действие на основе наибольшей ценности Q(s,a)
            best_action_value = np.max(action_value)
            # Вычисляем изменение ценности
            delta = max(delta, np.abs(V[state] - best_action_value))
            # Обновляем функцию ценности для текущего состояния
            V[state] = best_action_value
        # Проверка, можно ли остановиться
        if delta < theta:
            print(f'Итерация ценности сошлась на итерации #{i}.')
            break

    # Строим детерминированную стратегию на основе оптимальной функции ценности
    policy = np.zeros([environment.observation_space.n, environment.action_space.n])
    for state in range(environment.observation_space.n):
        # Один шаг вперёд, чтобы найти лучшее действие из текущего состояния
        action_value = one_step_lookahead(environment, state, V, discount_factor)
        # Выбираем лучшее действие по наибольшей ценности
        best_action = np.argmax(action_value)
        # Обновляем стратегию: в этом состоянии выбираем только лучшее действие
        policy[state, best_action] = 1.0
    return policy, V


Наконец, давайте сравним оба метода, чтобы понять, какой из них работает лучше на практике. Для этого мы попытаемся обучить оптимальную стратегию для среды FrozenLake, используя оба описанных выше подхода. Затем мы проверим, какая из техник показала лучший результат, сравнив среднюю награду после 10 000 эпизодов.


In [9]:
def play_episodes(environment, n_episodes, policy):
    wins = 0
    total_reward = 0
    for episode in range(n_episodes):
        terminated = False
        state = environment.reset()[0]
        while not terminated:
            # Выбираем лучшее действие в текущем состоянии
            action = np.argmax(policy[state])
            # Выполняем действие и наблюдаем реакцию среды
            next_state, reward, terminated, truncated, info = environment.step(action)
            # Суммируем общее вознаграждение
            total_reward += reward
            # Переходим в новое состояние
            state = next_state
            # Считаем количество успешных эпизодов
            if terminated and reward == 1.0:
                wins += 1
    average_reward = total_reward / n_episodes
    return wins, total_reward, average_reward


In [10]:
# Количество эпизодов для прогона
n_episodes = 10000
# Список функций для поиска оптимальной стратегии
solvers = [('Policy Iteration', policy_iteration),
           ('Value Iteration', value_iteration)]

for iteration_name, iteration_func in solvers:
    # Загружаем среду Frozen Lake
    environment = gym.make('FrozenLake-v1')
    # Ищем оптимальную стратегию с помощью выбранного метода
    start = time.time()
    policy, V = iteration_func(environment.unwrapped)
    done = time.time()
    elapsed = done - start
    # Применяем найденную стратегию в реальной среде
    wins, total_reward, average_reward = play_episodes(environment, n_episodes, policy)
    print(f'{iteration_name} :: количество побед за {n_episodes} эпизодов = {wins}')
    print(f'{iteration_name} :: средняя награда за {n_episodes} эпизодов = {average_reward}')
    print(f'время = {elapsed:.2f} сек \n\n')


Стратегия оценена за 66 итераций.
Стратегия оценена за 170 итераций.
Стратегия оценена за 428 итераций.
Оценено 4 стратегий.
Policy Iteration :: количество побед за 10000 эпизодов = 8290
Policy Iteration :: средняя награда за 10000 эпизодов = 0.829
время = 0.06 сек 


Итерация ценности сошлась на итерации #523.
Value Iteration :: количество побед за 10000 эпизодов = 8298
Value Iteration :: средняя награда за 10000 эпизодов = 0.8298
время = 0.07 сек 


