In [None]:
from torch.distributions import Categorical
import gym
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim

In [None]:
gamma = 0.99

class Pi(nn.Module):
    def __init__(self, in_dim, out_dim):
        super(Pi, self).__init__()
        layers = [
        nn.Linear(in_dim, 32),
        nn.ReLU(),
        nn.Linear(32, out_dim),
        ]
        self.model = nn.Sequential(*layers)
        self.onpolicy_reset()
        self.train() # установить режим обучения

    def onpolicy_reset(self):
        self.log_probs = []
        self.rewards = []

    def forward(self, x):
        pdparam = self.model(x)
        return pdparam

    def act(self, state):
        x = torch.from_numpy(state.astype(np.float32)) # преобразование в тензор
        pdparam = self.forward(x) # прямой проход
        pd = Categorical(logits=pdparam) # вероятностное распределен34 
        action = pd.sample() # pi(a|s) выбор действия по распределению pd
        log_prob = pd.log_prob(action) # логарифм вероятности pi(a|s)
        self.log_probs.append(log_prob) # сохраняем для обучения
        return action.item()

def train(pi, optimizer):
    # Внутренний цикл градиентного восхождения в алгоритме REINFORCE
    T = len(pi.rewards)
    rets = np.empty(T, dtype=np.float32) # отдачи
    future_ret = 0.0
    # эффективное вычисление отдачи
    for t in reversed(range(T)):
        future_ret = pi.rewards[t] + gamma * future_ret
        rets[t] = future_ret
    rets = torch.tensor(rets)
    log_probs = torch.stack(pi.log_probs)
    loss = - log_probs * rets # член градиента; знак минуса для максимизации
    loss = torch.sum(loss)
    optimizer.zero_grad()
    loss.backward() # обратное распространение, вычисление градиентов
    optimizer.step() # градиентное восхождение, обновление весов
    return loss

def main():
    env = gym.make('CartPole-v0')
    in_dim = env.observation_space.shape[0] # 4
    out_dim = env.action_space.n # 2
    pi = Pi(in_dim, out_dim) # стратегия pi_theta для REINFORCE
    optimizer = optim.Adam(pi.parameters(), lr=0.01)
    for epi in range(300):
        state = env.reset()
        for t in range(200): # 200 — максимальное количество шагов в cartpole
            action = pi.act(state)
            state, reward, done, _ = env.step(action)
            pi.rewards.append(reward)
            env.render()
            if done:
                break
        loss = train(pi, optimizer) # обучение в эпизоде
        total_reward = sum(pi.rewards)
        solved = total_reward > 195.0
        pi.onpolicy_reset() # обучение по актуальному опыту: очистить память после обучения
        print(f'Episode {epi}, loss: {loss}, \
        total_reward: {total_reward}, solved: {solved}')

In [None]:
env = gym.make('CartPole-v0')
in_dim = env.observation_space.shape[0] # 4
out_dim = env.action_space.n # 2
pi = Pi(in_dim, out_dim) # стратегия pi_theta для REINFORCE
optimizer = optim.Adam(pi.parameters(), lr=0.01)

In [None]:
for epi in range(1250):
    state = env.reset()
    for t in range(200): # 200 — максимальное количество шагов в cartpole
        action = pi.act(state)
        state, reward, done, _ = env.step(action)
        pi.rewards.append(reward)
#         env.render()
        if done:
            break
    loss = train(pi, optimizer) # обучение в эпизоде
    total_reward = sum(pi.rewards)
    solved = total_reward > 195.0
    pi.onpolicy_reset() # обучение по актуальному опыту: очистить память после обучения
    print(f'Episode {epi}, loss: {loss}, \
    total_reward: {total_reward}, solved: {solved}')

In [None]:
env = gym.make('CartPole-v0')

for epi in range(10):
    state = env.reset()
    for t in range(200): # 200 — максимальное количество шагов в cartpole
        action = pi.act(state)
        state, reward, done, _ = env.step(action)
        pi.rewards.append(reward)
        env.render()
        if done:
            break

In [None]:
# conda install -c conda-forge gym
# !pip install pyglet