In [14]:
import gym
import numpy as np
import copy

import torch
import torch.nn as nn
import torch.optim as optim

In [8]:
device = torch.device("cuda" if torch.cuda.is_available else "cpu")
print(device)

cuda


In [3]:
env_name = "CartPole-v0"
env = gym.make(env_name)
env.reset()

array([ 0.02424717, -0.04318125, -0.03326246,  0.02928835])

In [5]:
GAMMA = 0.99
LEARNING_RATE = 0.001
ENTROPY_BETA = 0.01
REWARD_STEPS = 4

In [6]:
class A2C(nn.Module):
    def __init__(self, input_shape, n_actions):
        super(A2C, self).__init__()

        self.policy = nn.Sequential(
            nn.Linear(input_shape[0], 512),
            nn.ReLU(),
            nn.Linear(512, n_actions)
        )

        self.value = nn.Sequential(
            nn.Linear(input_shape[0], 512), 
            nn.ReLU(),
            nn.Linear(512, 1)
        )

    def forward(self, x):
        return self.policy(x), self.value(x)

In [55]:
def step(env, step_size, state, net, gamma, gamma_counter, device="cpu"):
    states = []
    actions = []
    rewards = []
    not_done_idx = []
    next_states = []
    total_score = 0
    
    for idx in range(step_size):
        state_v = torch.tensor(state, dtype=torch.float).to(device)
        policy, _ = net(state_v)
        policy = nn.Softmax(dim=-1)(policy)
        action = np.random.choice(len(policy), p=policy.detach().numpy())
        states.append(np.array(state, copy=False))
        actions.append(action) 
        
        next_state, reward, done, _ = env.step(action)
        total_score += reward
        rewards.append(sum(rewards) + reward * (gamma ** gamma_counter))
        gamma_counter += 1
        if done:
            break
        else:
            not_done_idx.append(idx)
            next_states.append(np.array(next_state, copy=False))
            state = next_state       
        
    rewards_np = np.array(rewards, dtype=np.float)
    state_tensor = torch.tensor(np.array(states, copy=False), dtype=torch.float).to(device)
    actions_tensor = torch.tensor(np.array(actions), dtype=torch.long).to(device)

    # * If not_done_idx is not empty
    if not_done_idx:
        next_states_tensor = torch.tensor(np.array(next_states, copy=False), dtype=torch.float).to(device)
        _, next_states_value = net(next_states_tensor)
        # * vectorize : np.array([[1],[2],[3]]) ->np.array([1,2,3])
        next_states_value_np = next_states_value.data.cpu().numpy()[:, 0]
        next_states_value_np *= GAMMA ** step_size
        rewards_np[not_done_idx] += next_states_value_np

    ref_vals_tensor = torch.tensor(rewards_np, dtype=torch.float).to(device)

    return state_tensor, actions_tensor, ref_vals_tensor, total_score

In [56]:
initial_state = env.reset()
observation_space_shape = env.observation_space.shape
action_n = env.action_space.n
net = A2C(observation_space_shape, action_n)
optimizer = optim.Adam(net.parameters(), lr=LEARNING_RATE)
print(net)

A2C(
  (policy): Sequential(
    (0): Linear(in_features=4, out_features=512, bias=True)
    (1): ReLU()
    (2): Linear(in_features=512, out_features=2, bias=True)
  )
  (value): Sequential(
    (0): Linear(in_features=4, out_features=512, bias=True)
    (1): ReLU()
    (2): Linear(in_features=512, out_features=1, bias=True)
  )
)


In [63]:
gamma_counter = 0
total_reward = []
print_idx = 0
while True:
    print_idx += 1
    states_v, actions_t, ref_vals_tensor, total_score = step(env, step_size=5, state=initial_state, net=net, gamma=0.99, gamma_counter=gamma_counter)
    total_reward.append(total_score)
    if len(ref_vals_tensor) == 1:
        state = env.reset()
        gamma_counter = 0
    
    optimizer.zero_grad()
    logits_tensor, value_tensor = net(states_v)
    loss_value_tensor = nn.MSELoss()(value_tensor.squeeze(-1), ref_vals_tensor)

    adv_tensor = ref_vals_tensor - value_tensor.detach()
    log_prob_tensor = nn.LogSoftmax(dim=1)(logits_tensor)
    log_prob_actions_tensor = adv_tensor * log_prob_tensor[:, actions_t]
    loss_policy_tensor = -log_prob_actions_tensor.mean()

    prob_tensor = nn.Softmax(dim=1)(logits_tensor)
    entropy_loss_tensor = ENTROPY_BETA * (prob_tensor * log_prob_tensor).sum(dim=1).mean()

    loss_tensor = entropy_loss_tensor + loss_value_tensor
    loss_tensor.backward()
    optimizer.step()

    loss_tensor += loss_policy_tensor

    if print_idx % 100 == 0:
        print(f"Total_reward : {np.mean(total_reward[10:])}, loss : {loss_tensor.item():.3f}")

Total_reward : 4.344444444444444, loss : 73.058
Total_reward : 4.178947368421053, loss : 86.090
Total_reward : 4.0793103448275865, loss : 165.532
Total_reward : 4.0487179487179485, loss : 173.693
Total_reward : 4.010204081632653, loss : 106.093
Total_reward : 3.9966101694915253, loss : 359.200
Total_reward : 4.01304347826087, loss : 70.939
Total_reward : 4.00253164556962, loss : 48.661
Total_reward : 3.993258426966292, loss : 289.602
Total_reward : 3.995959595959596, loss : 46.000
Total_reward : 3.992660550458716, loss : 121.433
Total_reward : 3.969747899159664, loss : 47.741
Total_reward : 3.954263565891473, loss : 467.948
Total_reward : 3.948201438848921, loss : 322.613
Total_reward : 3.9436241610738256, loss : 475.851
Total_reward : 3.952830188679245, loss : 707.422
Total_reward : 3.9485207100591717, loss : 23.301
Total_reward : 3.941899441340782, loss : 111.505
Total_reward : 3.93015873015873, loss : 171.326
Total_reward : 3.928643216080402, loss : 186.543
Total_reward : 3.94066985

KeyboardInterrupt: 