# Обучение с подкреплением

***Первый этап, обучение на правильность ходов***

**Выполнил Кузин Мирослав**

# Траектория:
$$\tau = (s_0,a_0,r_0),\dots,(s_T,a_T,r_T) ,$$
где $s_i$ состояние среды на шаге $i$, $a_i$ воздействие агента на среду на шаге $i$,
$r_i$ вознаграждение на шаге $i$.

Вознаграждение начиная с шага $t$:
$$R_t(\tau)=\sum\limits_{j=t}^{T}\gamma^{j-t}r_{j}, $$
$$R(\tau) = R_0(\tau) $$
где $\gamma$ дисконтный множитель (коэффициент значимости вознаграждения при переходе на следущий шаг).

Обучение с подкреплением - набор методов, позволяющих агенту (интеллектуальной системе) вырабатывать оптимальную стратегию при его взаимодействии со средой (внешним миром). 

Среда предоставляет информацию, описывающую состояние системы. Агент взаимодействует со средой, наблюдая состояние и используя данную информацию при выборе действия. Среда принимает действие и переходит в следующее состояние, а затем возвращает агенту следующее состояние и вознаграждение. Когда цикл «состояние → действие → вознаграждение» завершен, предполагается, что сделан один шаг. Цикл повторяется, пока среда не завершится, например, когда задача решена.

Стратегия $\pi$ — это функция, отображающая состояния вероятности действий, которые используются для выбора действия $a \sim \pi(s) $. 

Стратегия $\pi$ содержит настраеваемые параметры $\omega$, чтобы это подчеркнуть будем писать $\pi_{\omega}(s)$.

Целевая функция — это ожидаемая отдача по всем полным траекториям, порожденным агентом.
$$J(\omega)=J(\pi_{\omega})=M_{\tau\sim \omega}R(\tau),$$
где $M_{\tau\sim \omega}$ математическое ожидание по всех траекториям $\tau$ соответствующим значениям параметров $\omega$.

Задача:
$$J(\omega)\to \max $$

$$\nabla_{\omega}J=\nabla_{\omega} M_{\tau\sim \omega}R(\tau)$$

$$\nabla_{\omega}M_{\tau\sim \omega}R(\tau)=\nabla_{\omega}\int R(\tau)p(\tau\mid\omega)d\tau= $$

$$=\int \nabla_{\omega}(R(\tau)p(\tau\mid\omega))d\tau =\int (\nabla_{\omega}(R(\tau))p(\tau\mid\omega)+R(\tau)\nabla_{\omega}(p(\tau\mid\omega)))d\tau =$$

$$ =\int R(\tau)\nabla_{\omega}(p(\tau\mid\omega))d\tau =\int R(\tau)\nabla_{\omega}(p(\tau\mid\omega)) \dfrac{p(\tau\mid\omega)}{p(\tau\mid\omega)}d\tau =$$

$$ =\int R(\tau)\dfrac{\nabla_{\omega}(p(\tau\mid\omega)) }{p(\tau\mid\omega)}p(\tau\mid\omega)d\tau =\int R(\tau)\nabla_{\omega}(\ln p(\tau\mid\omega))p(\tau\mid\omega)d\tau =$$

$$ = M_{\tau\sim \omega}(R(\tau)\nabla_{\omega}(\ln p(\tau\mid\omega)))$$

$$p(\tau\mid\omega)=\prod\limits_t p(s_{s+1}\mid s_t,a_t)\pi_{\omega}(a_t\mid s_t) $$

$$\ln p(\tau\mid\omega)=\ln\prod\limits_t p(s_{s+1}\mid s_t,a_t)\pi_{\omega}(a_t\mid s_t) = \sum\limits_t (\ln p(s_{s+1}\mid s_t,a_t)+\ln\pi_{\omega}(a_t\mid s_t))$$

$$\nabla_{\omega}\ln p(\tau\mid\omega)=\nabla_{\omega}\sum\limits_t (\ln p(s_{s+1}\mid s_t,a_t)+\ln\pi_{\omega}(a_t\mid s_t))=\nabla_{\omega}\sum\limits_t \ln\pi_{\omega}(a_t\mid s_t)$$



$$Loss(\omega) = - R(\tau)\sum\limits_t \ln\pi_{\omega}(a_t\mid s_t) $$

In [2]:
from mancala import Kalah
import matplotlib.pyplot as plt
%matplotlib inline
from IPython import display
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.distributions import Categorical
import numpy as np
import pandas as pd
import time
import random

**Болваны**

In [None]:
# Бот простой, рандомно выбирает не нулевые элементы
def do_simple_bot_step(state: torch.Tensor) -> int:
    nonzero_state_indexs = torch.nonzero(state[:6]).flatten()
    rez = nonzero_state_indexs[torch.randint(0, len(nonzero_state_indexs), (1,))[0]]
    return rez + 1

do_simple_bot_step(torch.Tensor([1, 0, 1, 0, 0, 0, 0]))

tensor(1)

**Модель**

In [3]:
model = torch.nn.Sequential(
    torch.nn.Linear(14, 42),
    torch.nn.ReLU(),
    torch.nn.Linear(42, 6),
    torch.nn.Softmax(dim = -1)
)

**Настройка обучения**

In [4]:
def loss_func(probs: torch.Tensor, action: int, m: Categorical, R: int, gamma: int):
    alpha = 0.9
    # print(Categorical(probs).log_prob(action))
    return -alpha*R*m.log_prob(action)

loss = loss_func
optimizer = torch.optim.Adam(model.parameters(), lr=1.0e-3, amsgrad=True)

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

cpu


In [60]:
m = Categorical(torch.Tensor([0.9, 0.1]))
action = m.sample()
print(loss_func(torch.Tensor([0.9, 0.1]), action, m, -25, 2))

tensor(-2.3706)


**Обучение**

In [175]:
# Проверка работы модели
game = Kalah()
model.to(device)
model.to(torch.double)

print(game.get_general_state())
model(game.get_general_state().to(torch.double))

tensor([6, 6, 6, 6, 6, 6, 0, 0, 6, 6, 6, 6, 6, 6])


tensor([0.0476, 0.0784, 0.6468, 0.1430, 0.0341, 0.0500], dtype=torch.float64,
       grad_fn=<SoftmaxBackward0>)

*Награды и штрафы*

In [176]:
# Первый этап обучения
winner_reward_stage_1 = 1e-3
loser_reward_stage_1 = -1e-3
draw_reward_stage_1 = -0.5e-3
bad_step_reward_stage_1 = -25 # общий
good_step_reward_stage_1 = 2
good_step_captured_reward_stage_1 = 4
# bad_step_reward_stage_1 = -25 # общий
# good_step_reward_stage_1 = 1
# good_step_captured_reward_stage_1 = 2

In [203]:
game = Kalah()
episodes_count = 1000
neuronet_walker_queue = 1
count_win_neuronet = 0
count_choisen_zero = 0
gamma = 2
loss_story = []
check_optim = False

for episode in range(0, episodes_count):
    game.set_new_game()
    tr = []
    if episode % 2:
        old_player_making_step = game.get_player_making_step()
        neuronet_walker_queue = 2
        while not game.get_game_over() and old_player_making_step == game.get_player_making_step():
            bot_action = do_simple_bot_step(game.get_state())
            rezult_step = game.take_step(bot_action)
            # print("bot took step", game.get_player_making_step(), "Был выбор", bot_action, rezult_step)
    else:
        neuronet_walker_queue = 1

    while not game.get_game_over():

        # Выбор хода
        probs = model(game.get_state().to(torch.double))
        m = Categorical(probs)
        action = m.sample()

        # Выбор нагады и выполнение хода
        reward = 0
        old_state = game.get_state().to(torch.double)
        old_player_making_step = game.get_player_making_step()

        rezult_step = game.take_step(action + 1)
        # print("neuronet took step", game.get_player_making_step(), "Был выбор", action + 1, rezult_step)
        if rezult_step == "Куча пустая, выберите другую!":
            count_choisen_zero += 1
            reward = bad_step_reward_stage_1
        elif rezult_step == "Хороший ход!":
            reward = good_step_reward_stage_1
        elif rezult_step == "Хороший ход! Захват!":
            reward = good_step_captured_reward_stage_1

        tr.append((old_state, action, reward, m))

        while not game.get_game_over() and old_player_making_step != game.get_player_making_step():
            bot_action = do_simple_bot_step(game.get_state())
            rezult_step = game.take_step(bot_action)
            # print("bot took step", game.get_player_making_step(), "Был выбор", bot_action, rezult_step)

    if game.get_winner() != None:
        if game.get_winner() != 0:
            if neuronet_walker_queue == game.get_winner():
                count_win_neuronet += 1
                reward += winner_reward_stage_1
            else:
                reward += loser_reward_stage_1
        else:
            reward += draw_reward_stage_1

    loss = 0.
    count_played_step = len(tr) # Кол-во сыгранных ходов
    for id_current_step in range(count_played_step):
        R = 0.
        for id_next_step in range(id_current_step, count_played_step):
            R += (gamma**(id_current_step-id_next_step))*tr[id_next_step][2]
        # R += rwd_in_end_game
        loss += loss_func(model(tr[id_current_step][0]), tr[id_current_step][1], tr[id_current_step][3], R, gamma)

    # loss_story.append(loss)

    if not check_optim and episode > 1:
        check_optim = True
        for g in optimizer.param_groups:
            g['lr'] = 1.0e-5
        print("updated loss")

    if not episode % 100:
        print(loss)

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

tensor(22.7437, dtype=torch.float64, grad_fn=<AddBackward0>)
updated loss
tensor(25.9859, dtype=torch.float64, grad_fn=<AddBackward0>)
tensor(-72.7508, dtype=torch.float64, grad_fn=<AddBackward0>)
tensor(63.2375, dtype=torch.float64, grad_fn=<AddBackward0>)
tensor(48.8271, dtype=torch.float64, grad_fn=<AddBackward0>)
tensor(33.3220, dtype=torch.float64, grad_fn=<AddBackward0>)
tensor(86.2056, dtype=torch.float64, grad_fn=<AddBackward0>)
tensor(49.0961, dtype=torch.float64, grad_fn=<AddBackward0>)
tensor(27.1993, dtype=torch.float64, grad_fn=<AddBackward0>)
tensor(57.7266, dtype=torch.float64, grad_fn=<AddBackward0>)


In [205]:
print(check_optim)
print(count_choisen_zero)

True
159


**Тестирование модели**

In [209]:
game = Kalah()
episodes_count = 1000
neuronet_walker_queue = 1
count_win_neuronet = 0
count_choisen_zero = 0
bad_choise = []
rezult_step = "Хороший ход!"

for episode in range(episodes_count):
    game.set_new_game()
    if episode % 2:
        old_player_making_step = game.get_player_making_step()
        neuronet_walker_queue = 2
        while not game.get_game_over() and old_player_making_step == game.get_player_making_step():
            bot_action = do_simple_bot_step(game.get_state())
            rezult_step = game.take_step(bot_action)
            # print("bot took step", game.get_player_making_step(), "Был выбор", bot_action, rezult_step)
    else:
        neuronet_walker_queue = 1

    while not game.get_game_over():

        # Выбор хода
        probs = model(game.get_state().to(torch.double))
        action = probs.argmax()

        while action in bad_choise:
            probs[probs.argmax()] = 0
            action = probs.argmax()

        # Выбор награды и выполнение хода
        reward = 0
        old_state = game.get_state().to(torch.double)
        old_player_making_step = game.get_player_making_step()

        rezult_step = game.take_step(action + 1)
        if rezult_step == "Куча пустая, выберите другую!":
            count_choisen_zero += 1
            # reward = bad_step_reward_stage_1
            print("Neuronet took step", game.get_player_making_step(), "Был выбор", action + 1, rezult_step)
            # print("state:")
            # game.print_state()
            print("Номер эпизода происшествия:", episode)
            bad_choise.append(action)
        else:
            bad_choise = []
        # elif rezult_step == "Хороший ход!":
        #     reward = good_step_reward_stage_1
        # elif rezult_step == "Хороший ход! Захват!":
        #     reward = good_step_captured_reward_stage_1

        while not game.get_game_over() and old_player_making_step != game.get_player_making_step():
            bot_action = do_simple_bot_step(game.get_state())
            rezult_step = game.take_step(bot_action)
            if rezult_step == "Куча пустая, выберите другую!":
                print("Bot took step", game.get_player_making_step(), "Был выбор", bot_action, rezult_step)
            # print("bot took step", game.get_player_making_step(), "Был выбор", bot_action, rezult_step)


    if game.get_winner() != None:
        if game.get_winner() != 0:
            if neuronet_walker_queue == game.get_winner():
                count_win_neuronet += 1
                reward += winner_reward_stage_1
            else:
                reward += loser_reward_stage_1
        else:
            reward += draw_reward_stage_1

    # print("Очередь хода нейронки", neuronet_walker_queue)
    # print("Результат:", game.get_player_winner(), 'k', game.get_winner())
    # print("Номер попытки:", episode)
    # print("Награда/Штраф:", reward)

    # print(episode)

print(count_choisen_zero)
print(count_win_neuronet/episodes_count)

0
0.703


In [208]:
print(count_choisen_zero)
print(count_win_neuronet/episodes_count)

0
0.7


In [None]:
probs = torch.Tensor([0.9, 0.1])
m = Categorical(probs)
action = m.sample()
print(action)
# t = Categorical(probs)
# print(action, t.sample())
print(m.log_prob(action))


tensor(1)
tensor(1) tensor(0)
tensor(-2.3026)


In [210]:
# is_save = True
is_save = False
if is_save:
    torch.save(model, "./model_non_zero_top_tryed3.pt")