In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np

import env.game as game

In [24]:
env = game.Game()
ob = env.reset()

In [25]:
state_dim = 16
n_actions = 4

In [34]:
network = nn.Sequential()

network.add_module('layer1', nn.Linear(state_dim, 128))
network.add_module('layer2', nn.ReLU())
network.add_module('layer3', nn.Linear(128, 128))
network.add_module('layer4', nn.ReLU())
network.add_module('layer3', nn.Linear(128, 128))
network.add_module('layer4', nn.ReLU())
network.add_module('layer3', nn.Linear(128, 128))
network.add_module('layer4', nn.ReLU())
network.add_module('layer5', nn.Linear(128, n_actions))

network.train()

Sequential(
  (layer1): Linear(in_features=16, out_features=128, bias=True)
  (layer2): ReLU()
  (layer3): Linear(in_features=128, out_features=128, bias=True)
  (layer4): ReLU()
  (layer5): Linear(in_features=128, out_features=4, bias=True)
)

In [35]:
def get_action(state, epsilon=0):
    """
    sample actions with epsilon-greedy policy
    recap: with p = epsilon pick random action, else pick action with highest Q(s,a)
    """
    state = torch.tensor(state[None], dtype=torch.float32, device='cpu')
    q_values = network(state).detach().cpu().numpy()

    if np.random.rand() < epsilon:
        return np.random.randint(low=0, high=n_actions)
    else:
        return int(np.argmax(q_values))

In [36]:
def compute_td_loss(states, actions, rewards, next_states, is_done, gamma=0.99, check_shapes=False):
    """ Compute td loss using torch operations only. Use the formula above. """
    states = torch.tensor(
        states, dtype=torch.float32, device='cpu')                                  # shape: [batch_size, state_size]
    actions = torch.tensor(actions, dtype=torch.long, device='cpu')                 # shape: [batch_size]
    rewards = torch.tensor(rewards, dtype=torch.float32, device='cpu')              # shape: [batch_size]
    # shape: [batch_size, state_size]
    next_states = torch.tensor(next_states, dtype=torch.float32, device='cpu')
    is_done = torch.tensor(is_done, dtype=torch.uint8, device='cpu')                # shape: [batch_size]

    # get q-values for all actions in current states
    predicted_qvalues = network(states)                               # shape: [batch_size, n_actions]

    # select q-values for chosen actions
    predicted_qvalues_for_actions = predicted_qvalues[                # shape: [batch_size]
      range(states.shape[0]), actions
    ]

    # compute q-values for all actions in next states
    predicted_next_qvalues = network(next_states)

    # compute V*(next_states) using predicted next q-values
    next_state_values = torch.max(predicted_next_qvalues, dim=1)[0]
    assert next_state_values.dtype == torch.float32

    # compute "target q-values" for loss - it's what's inside square parentheses in the above formula.
    target_qvalues_for_actions = rewards + gamma * next_state_values

    # at the last state we shall use simplified formula: Q(s,a) = r(s,a) since s' doesn't exist
    target_qvalues_for_actions = torch.where(
        is_done, rewards, target_qvalues_for_actions)

    # mean squared error loss to minimize
    loss = torch.mean((predicted_qvalues_for_actions -
                       target_qvalues_for_actions.detach()) ** 2)

    if check_shapes:
        assert predicted_next_qvalues.data.dim(
        ) == 2, "make sure you predicted q-values for all actions in next state"
        assert next_state_values.data.dim(
        ) == 1, "make sure you computed V(s') as maximum over just the actions axis and not all axes"
        assert target_qvalues_for_actions.data.dim(
        ) == 1, "there's something wrong with target q-values, they must be a vector"

    return loss

In [37]:
opt = torch.optim.Adam(network.parameters(), lr=1e-4)

In [38]:
def generate_session(env, t_max=100000, epsilon=0, train=False):
    """play env with approximate q-learning agent and train it at the same time"""
    total_reward = 0
    s = env.reset()
    s = np.array(s).ravel()

    for t in range(t_max):
        a = get_action(s, epsilon=epsilon)
        next_s, r, terminated = env.action(a)
        print(r)
        next_s = np.array(next_s).ravel()

        if train:
            opt.zero_grad()
            compute_td_loss([s], [a], [r], [next_s], [terminated]).backward()
            opt.step()

        total_reward += r
        s = next_s
        if terminated:
            break

    return total_reward

In [39]:
epsilon = 0.3

In [40]:
for i in range(1000):
    session_rewards = [generate_session(env, epsilon=epsilon, train=True) for _ in range(100)]
    print("epoch #{}\tmean reward = {:.3f}\tepsilon = {:.3f}".format(i, np.mean(session_rewards), epsilon))

    epsilon *= 0.99
    assert epsilon >= 1e-4, "Make sure epsilon is always nonzero during training"

0
0
0
0
0
0
4
0
0
0
0
0
0
0
0
8
0
4
0
0
0
12
0
0
0
0
0
0
8
8
0
0
28
12
0
4
0
0
0
0
0
0
0
0
0
0
0
0
0
0
4
12
0
0
0
0
16
4
8
4
4
8
16
4
0
4
0
0
0
0
0
0
0
12
20
52
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
4
8
0
4
0
8
8
0
0
0
4
0
24
40
4
4
8
16
0
4
4
16
0
4
8
16
4
0
4
4
72
4
8
0
0
0
0
0
0
0
0
0
0
0
0
4
4
4
12
0
0
0
0
4
0
0
0
0
0
0
0
0
0
0
0
0
0
8
16
0
0
0
0
0
0
0
0
4
0
0
4
4
0
4
8
4
0
0
0
4
12
0
4
8
0
0
0
0
0
0
0
0
0
0
0
24
4
4
0
8
8
0
16
4
0
8
4
0
0
0
0
0
0
0
0
4
12
0
0
0
0
0
4
8
4
24
48
0
0
0
0
0
0
0
0
0
0
0
0
0
0
4
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
8
12
8
0
0
4
72
0
0
4
0
8
0
4
0
0
0
0
0
0
0
0
0
0
0
12
0
0
0
0
0
0
0
0
0
0
16
12
8
0
24
0
12
44
64
4
8
16
4
0
44
0
72
152
36
8
4
0
0
0
0
0
0
0
16
4
8
4
16
16
0
0
0
0
0
0
8
8
0
20
40
8
0
64
0
0
0
0
0
0
0
0
0
8
16
0
0
4
8
4
0
8
32
4
12
0
0
4
0
0
0
28
20
0
0
0
0
0
0
0
0
0
4
0
0
0
0
0
0
0
0
0
0
0
8
0
0
0
4
0
8
0
4
0
0
0
4
16
8
0
0
4
8
0
0
24
0
0
0
4
0
20
8
0
0
0
0
0
0
4
0
8
0
4
0
0
0
0
0
0
0
0
0
0
12
0
0
0
0
0
0
0
0
0
0
0
0
4
0
0
28
4
0
0
0
0
20
40


KeyboardInterrupt: 

In [33]:
import sys
print(sys.executable)

/home/ilya/anaconda3/bin/python
