Продолжаем искать решения МППР. В прошлой серии стало понятно, что для этого необходимо вычислить оптимальную стратегию. Для этого применяются 2 основных алгоритма: итерации по ценности и итерации по стратегиям.

Сперва разберем итерацию по ценности. На старте у нас нет никакой стратегии и неизвестна функция ценности состояний. Что ж, инциализируем функцию ценности $V^*$ случайными значениями и модицифируем уравнение Беллмана так, чтобы вместо распределения, которое раньше задавалось стратегией $\pi$, происходил жадный выбор действия, дающего наибольшую награду.

$V^*(s) = max_a \left[ R(s,a) + \gamma \sum_{s'} T(s,a,s') V^*(s') \right]$

в уравнении $V^*(s)$ -- оптимальная ценность состояния $s$ (она же ценность при следовании оптимальной стратегии), $T(s, a, s')$ -- вероятность перехода из состояния $s$ в $s'$ при выборе действия $a$ и $R(s,a)$ -- вознаграждение, полученное в состоянии $s$ при выборе действия $a$.

С помощью $V^*$ вычисляем оптимальную стратегию, снова выбирая действие так, чтобы оно максимизировало выигрыш

$\pi^*(s) = argmax_a \sum_{s'}T(s,a,s')\left[R(s,a,s') + \gamma V^*(s')\right]$

Посмотрим как работает описанный алгоритм в среде FrozenLake

Среда FrozenLake представляет собой сетку по которой агент перемещается из клетки S в клетку G по замерзшим участкам озера (клетки F) и не попасть в полынью H. Игровой цикл на оптимальной стратегии можно посмотреть, добавив отрисовку поля.

In [None]:
import torch
import gym

env = gym.make('FrozenLake-v0')

gamma = 0.99

threshold = 0.0001

In [None]:
def value_iteration(env, gamma, threshold):
    """
    Solve a given environment with value iteration algorithm
    @param env: OpenAI Gym environment
    @param gamma: discount factor
    @param threshold: the evaluation will stop once values for all states are less than the threshold
    @return: values of the optimal policy for the given environment
    """
    n_state = env.observation_space.n
    n_action = env.action_space.n
    V = torch.zeros(n_state)
    while True:
        V_temp = torch.empty(n_state)
        for state in range(n_state):
            v_actions = torch.zeros(n_action)
            for action in range(n_action):
                for trans_prob, new_state, reward, _ in env.env.P[state][action]:
                    v_actions[action] += trans_prob * (reward + gamma * V[new_state])
            V_temp[state] = torch.max(v_actions)
        max_delta = torch.max(torch.abs(V - V_temp))
        V = V_temp.clone()
        if max_delta <= threshold:
            break
    return V

In [None]:
def extract_optimal_policy(env, V_optimal, gamma):
    """
    Obtain the optimal policy based on the optimal values
    @param env: OpenAI Gym environment
    @param V_optimal: optimal values
    @param gamma: discount factor
    @return: optimal policy
    """
    n_state = env.observation_space.n
    n_action = env.action_space.n
    optimal_policy = torch.zeros(n_state)
    for state in range(n_state):
        v_actions = torch.zeros(n_action)
        for action in range(n_action):
            for trans_prob, new_state, reward, _ in env.env.P[state][action]:
                v_actions[action] += trans_prob * (reward + gamma * V_optimal[new_state])
        optimal_policy[state] = torch.argmax(v_actions)
    return optimal_policy

In [None]:
V_optimal = value_iteration(env, gamma, threshold)
print('Optimal values:\n{}'.format(V_optimal))

Optimal values:
tensor([0.5404, 0.4966, 0.4681, 0.4541, 0.5569, 0.0000, 0.3572, 0.0000, 0.5905,
        0.6421, 0.6144, 0.0000, 0.0000, 0.7410, 0.8625, 0.0000])


In [None]:
optimal_policy = extract_optimal_policy(env, V_optimal, gamma)
print('Optimal policy:\n{}'.format(optimal_policy))

Optimal policy:
tensor([0., 3., 3., 3., 0., 0., 0., 0., 3., 1., 0., 0., 0., 2., 1., 0.])


In [None]:
def run_episode(env, policy):
    state = env.reset()
    total_reward = 0
    is_done = False
    while not is_done:
        action = policy[state].item()        
        state, reward, is_done, info = env.step(action)
        total_reward += reward
        if is_done:
            break
    return total_reward

In [None]:
n_episode = 1000
total_rewards = []
for episode in range(n_episode):
    total_reward = run_episode(env, optimal_policy)
    total_rewards.append(total_reward)

print('Average total reward under the optimal policy: {}'.format(sum(total_rewards) / n_episode))

Average total reward under the optimal policy: 0.746


In [None]:
def run_episode_render(env, policy):
    state = env.reset()
    env.render()
    total_reward = 0
    is_done = False
    while not is_done:
        action = int(policy[state].item())
        state, reward, is_done, info = env.step(action)
        total_reward += reward
        env.render()
        if is_done:
            break
    return total_reward

In [None]:
run_episode_render(env, optimal_policy)