## Custom PTAN

In [9]:
%load_ext lab_black

In [124]:
import numpy as np
from ptan import ptan
import torch
import torch.nn as nn
import torch.nn.functional as F

In [13]:
class ActionSelector:
    """
    Abstract class which converts scores to the actions
    """

    def __call__(self, scores):
        raise

In [14]:
class ArgmaxActionSelector(ActionSelector):
    """
    Selects actions using argmax
    """

    def __call__(self, scores):
        assert isinstance(scores, np.ndarray)
        return np.argmax(scores, axis=1)

In [51]:
class EpsilonGreedsyActionSelector(ActionSelector):
    def __init__(self, epsilon=0.05, selector=None):
        self.epsilon = epsilon
        self.selector = selector if selector is not None else ArgmaxActionSelector()

    def __call__(self, scores):
        assert isinstance(scores, np.ndarray)
        batch_size, n_actions = scores.shape
        actions = self.selector(scores)
        mask = np.random.random(size=batch_size) < self.epsilon
        # sum(mask) 개수만큼 random한 값 생성
        rand_actions = np.random.choice(n_actions, sum(mask))
        actions[mask] = rand_actions
        return actions

In [55]:
class ProbabilityActionSelector(ActionSelector):
    def __call__(self, probs):
        assert isinstance(probs, np.ndarray)
        actions = []
        for prob in probs:
            actions.append(np.random.choice(len(prob), p=prob))
        return np.array(actions)

In [97]:
class ProbabilityActionSelector(ActionSelector):
    """
    Converts probabilities of actions into action by sampling them
    """

    def __call__(self, probs):
        assert isinstance(probs, np.ndarray)
        actions = []
        for prob in probs:
            actions.append(np.random.choice(len(prob), p=prob))
        return np.array(actions)

## ArgmaxActionSelector

In [33]:
q_vals = np.array([[1, 2, 3], [1, -1, 0], [0, -1, 4]])

In [34]:
obj = ArgmaxActionSelector()

In [35]:
selector = obj(q_vals)

In [36]:
selector

array([2, 0, 2])

## EpsilonGreedyActionSelector

In [53]:
obj = EpsilonGreedsyActionSelector(epsilon=0.0)

In [54]:
obj(q_vals)

array([2, 0, 2])

### Probability Selector

In [62]:
obj = ProbabilityActionSelector()
for _ in range(10):
    acts = obj(np.array([[0.1, 0.8, 0.1], [0.0, 0.0, 1.0], [0.5, 0.5, 0.0]]))
    print(acts)

[1 2 1]
[1 2 0]
[1 2 1]
[1 2 0]
[1 2 0]
[1 2 0]
[1 2 1]
[1 2 0]
[1 2 0]
[1 2 1]


## Set Agent

In [75]:
class BaseAgent:
    def initial_state(self):
        return None

    def __call__(self, states, agent_states):
        assert isinstance(states, list)
        assert isinstance(agent_states, list)
        assert len(agent_states) == len(states)

        raise NotImplementedError


def default_states_preprocessor(states):
    """
    Convert list of states into the form suitable for model. By default we assume Variable
    :param states: list of numpy arrays with states
    :return: Variable
    """
    if len(states) == 1:
        np_states = np.expand_dims(states[0], 0)
    else:
        np_states = np.array([np.array(s, copy=False) for s in states], copy=False)
    return torch.tensor(np_states)


def float32_preprocessor(states):
    np_states = np.array(states, dtype=np.float32)
    return torch.tensor(np_states)

In [81]:
class DQNAgent(BaseAgent):
    def __init__(
        self,
        dqn_model,
        action_selector,
        device="cpu",
        preprocessor=default_states_preprocessor,
    ):
        self.dqn_model = dqn_model
        self.action_selector = action_selector
        self.preprocessor = preprocessor
        self.device = device

    @torch.no_grad()
    def __call__(self, states, agent_states=None):
        if agent_states is None:
            agent_states = [None] * len(states)
        if self.preprocessor is not None:
            states = self.preprocessor(states)
            if torch.is_tensor(states):
                states = states.to(self.device)
        q_v = self.dqn_model(states)
        q = q_v.data.cpu().numpy()
        actions = self.action_selector(q)
        return actions, agent_states

### DQN Agent

In [82]:
class DQNNet(nn.Module):
    def __init__(self, actions: int):
        super().__init__()
        self.actions = actions

    def forward(self, x):
        return torch.eye(x.size()[0], self.actions)

In [83]:
# torch eye
torch.eye(3)

tensor([[1., 0., 0.],
        [0., 1., 0.],
        [0., 0., 1.]])

In [84]:
net = DQNNet(actions=3)

In [85]:
net(torch.zeros(2, 10))

tensor([[1., 0., 0.],
        [0., 1., 0.]])

In [86]:
selector = EpsilonGreedsyActionSelector(epsilon=1.0)
agent = DQNAgent(dqn_model=net, action_selector=selector)

In [87]:
agent(torch.zeros(10, 5))

(array([0, 2, 1, 0, 0, 0, 1, 2, 1, 1]),
 [None, None, None, None, None, None, None, None, None, None])

In [93]:
selector.epsilone = 0.5

In [94]:
agent(torch.zeros(10, 5))[0]

array([1, 2, 2, 1, 0, 1, 2, 0, 1, 1])

In [95]:
selector.epsilon = 0.1

In [96]:
agent(torch.zeros(10, 5))[0]

array([0, 1, 2, 0, 0, 0, 0, 0, 0, 0])

### Policy Agent

In [98]:
class PolicyAgent(BaseAgent):
    def __init__(
        self,
        model,
        action_selector=ProbabilityActionSelector(),
        device="cpu",
        apply_softmax=False,
        preprocessor=default_states_preprocessor,
    ):
        self.model = model
        self.action_selector = action_selector
        self.device = device
        self.apply_softmax = apply_softmax
        self.preprocessor = preprocessor

    @torch.no_grad()
    def __call__(self, states, agent_states=None):
        if agent_states is None:
            agent_states = [None] * len(states)
        if self.preprocessor is not None:
            states = self.preprocessor(states)
            if torch.is_tensor(states):
                states = states.to(self.device)
        probs_v = self.model(states)
        if self.apply_softmax:
            probs_v = F.softmax(probs_v, dim=1)
        probs = probs_v.data.cpu().numpy()
        actions = self.action_selector(probs)
        return np.array(actions), agent_states

In [99]:
class PolicyNet(nn.Module):
    def __init__(self, actions: int):
        super().__init__()
        self.actions = actions

    def forward(self, x):
        shape = (x.size()[0], self.actions)
        res = torch.zeros(shape, dtype=torch.float32)
        res[:, 0] = 1
        res[:, 1] = 1
        return res

In [100]:
net = PolicyNet(actions=5)

In [101]:
net(torch.zeros(5, 10))

tensor([[1., 1., 0., 0., 0.],
        [1., 1., 0., 0., 0.],
        [1., 1., 0., 0., 0.],
        [1., 1., 0., 0., 0.],
        [1., 1., 0., 0., 0.]])

In [102]:
selector = ProbabilityActionSelector()

In [103]:
agent = PolicyAgent(model=net, action_selector=selector, apply_softmax=True)

In [104]:
agent(torch.zeros(5, 3))

(array([0, 0, 3, 0, 1]), [None, None, None, None, None])

## Toy Enviroment

In [108]:
import gym
from typing import List, Any, Optional

In [131]:
class ToyEnv(gym.Env):
    def __init__(self):
        super().__init__()
        self.observation_space = gym.spaces.Discrete(n=5)
        self.action_space = gym.spaces.Discrete(n=3)
        self.step_index = 0

    def reset(self):
        self.step_index = 0
        return self.step_index

    def step(self, action):
        is_done = self.step_index == 10
        if is_done:
            return self.step_index % self.observation_space.n, 0.0, is_done, {}
        self.step_index += 1
        return (
            self.step_index % self.observation_space.n,
            float(action),
            self.step_index == 10,
            {},
        )

In [132]:
class DullAgent(ptan.agent.BaseAgent):
    def __init__(self, action: int):
        self.action = action

    def __call__(self, observations: List[Any], state: Optional[List] = None):
        return [self.action for _ in observations], state

## The Experience Source Class

In [133]:
env = ToyEnv()

In [134]:
agent = DullAgent(action=1)

In [135]:
exp_source = ptan.experience.ExperienceSource(env=env, agent=agent, steps_count=2)

In [136]:
for idx, exp in enumerate(exp_source):
    if idx > 2:
        break
    print(exp)

(Experience(state=0, action=1, reward=1.0, done=False), Experience(state=1, action=1, reward=1.0, done=False))
(Experience(state=1, action=1, reward=1.0, done=False), Experience(state=2, action=1, reward=1.0, done=False))
(Experience(state=2, action=1, reward=1.0, done=False), Experience(state=3, action=1, reward=1.0, done=False))


In [137]:
for idx, exp in enumerate(exp_source):
    if idx > 15:
        break
    print(exp)

(Experience(state=0, action=1, reward=1.0, done=False), Experience(state=1, action=1, reward=1.0, done=False))
(Experience(state=1, action=1, reward=1.0, done=False), Experience(state=2, action=1, reward=1.0, done=False))
(Experience(state=2, action=1, reward=1.0, done=False), Experience(state=3, action=1, reward=1.0, done=False))
(Experience(state=3, action=1, reward=1.0, done=False), Experience(state=4, action=1, reward=1.0, done=False))
(Experience(state=4, action=1, reward=1.0, done=False), Experience(state=0, action=1, reward=1.0, done=False))
(Experience(state=0, action=1, reward=1.0, done=False), Experience(state=1, action=1, reward=1.0, done=False))
(Experience(state=1, action=1, reward=1.0, done=False), Experience(state=2, action=1, reward=1.0, done=False))
(Experience(state=2, action=1, reward=1.0, done=False), Experience(state=3, action=1, reward=1.0, done=False))
(Experience(state=3, action=1, reward=1.0, done=False), Experience(state=4, action=1, reward=1.0, done=True))
(E