In [34]:
from gym.spaces import Box, Discrete
import numpy as np

# Box: observation in a continuous range
observation_space = Box(low=np.array([-0.4, -np.inf]),    #there are two variables with range [-0.4, 0.4] and (-infinity, infinity)
                        high=np.array([0.4, np.inf]),     #and the observed data type is float 
                        dtype=np.float64)

# DiscreteL "Finite" number of observations or actions
action_space = Discrete(3)                               # there are only three actions the agent can take


In [35]:
incomplete_number_of_inputs = observation_space.shape     # (2,)
number_of_inputs = observation_space.shape[0]             # 2
number_of_actor_outputs = action_space.n                  # 3

In [39]:
#The imports needed for run the PPO_Agent
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical
from torch import from_numpy, no_grad, save, load, tensor, clamp
from torch import float as torch_float
from torch import long as torch_long
from torch import min as torch_min
from torch.utils.data.sampler import BatchSampler, SubsetRandomSampler
import numpy as np
from torch import manual_seed
from collections import namedtuple

In [40]:
#from PPO_agent import PPOAgent class, Transition tuple
class PPOAgent:
    """
    PPOAgent implements the PPO RL algorithm (https://arxiv.org/abs/1707.06347).
    It works with a set of discrete actions.
    It uses the Actor and Critic neural network classes defined below.
    """
    def __init__(self, number_of_inputs, number_of_actor_outputs, clip_param=0.2, max_grad_norm=0.5, ppo_update_iters=5,
                 batch_size=8, gamma=0.99, use_cuda=False, actor_lr=0.001, critic_lr=0.003, seed=None):
        super().__init__()
        if seed is not None:
            manual_seed(seed)

        self.clip_param = clip_param
        self.max_grad_norm = max_grad_norm
        self.ppo_update_iters = ppo_update_iters
        self.batch_size = batch_size
        self.gamma = gamma
        self.use_cuda = use_cuda

        self.actor_net = Actor(number_of_inputs, number_of_actor_outputs)
        self.critic_net = Critic(number_of_inputs)

        if self.use_cuda:
            self.actor_net.cuda()
            self.critic_net.cuda()

        self.actor_optimizer = optim.Adam(self.actor_net.parameters(), actor_lr)
        self.critic_net_optimizer = optim.Adam(self.critic_net.parameters(), critic_lr)

        self.buffer = []

    def work(self, agent_input, type_="simple"):
        agent_input = from_numpy(np.array(agent_input)).float().unsqueeze(0)  # Add batch dimension with unsqueeze
        if self.use_cuda:
            agent_input = agent_input.cuda()
        with no_grad():
            action_prob = self.actor_net(agent_input)

        if type_ == "simple":
            output = [action_prob[0][i].data.tolist() for i in range(len(action_prob[0]))]
            return output
        elif type_ == "selectAction":
            c = Categorical(action_prob)
            action = c.sample()
            return action.item(), action_prob[:, action.item()].item()
        elif type_ == "selectActionMax":
            return np.argmax(action_prob).item(), 1.0
        else:
            raise Exception("Wrong type in agent.work(), returning input")

    def get_value(self, state):
        state = from_numpy(state)
        with no_grad():
            value = self.critic_net(state)
        return value.item()

    def save(self, path):
        save(self.actor_net.state_dict(), path + '_actor.pkl')
        save(self.critic_net.state_dict(), path + '_critic.pkl')

    def load(self, path):
        actor_state_dict = load(path + '_actor.pkl')
        critic_state_dict = load(path + '_critic.pkl')
        self.actor_net.load_state_dict(actor_state_dict)
        self.critic_net.load_state_dict(critic_state_dict)

    def store_transition(self, transition):
        self.buffer.append(transition)

    def train_step(self, batch_size=None):
        if batch_size is None:
            if len(self.buffer) < self.batch_size:
                return
            batch_size = self.batch_size

        state = tensor([t.state for t in self.buffer], dtype=torch_float)
        action = tensor([t.action for t in self.buffer], dtype=torch_long).view(-1, 1)
        reward = [t.reward for t in self.buffer]
        old_action_log_prob = tensor([t.a_log_prob for t in self.buffer], dtype=torch_float).view(-1, 1)

        R = 0
        Gt = []
        for r in reward[::-1]:
            R = r + self.gamma * R
            Gt.insert(0, R)
        Gt = tensor(Gt, dtype=torch_float)

        if self.use_cuda:
            state, action, old_action_log_prob = state.cuda(), action.cuda(), old_action_log_prob.cuda()
            Gt = Gt.cuda()

        # Repeat the update procedure for ppo_update_iters
        for _ in range(self.ppo_update_iters):
            for index in BatchSampler(SubsetRandomSampler(range(len(self.buffer))), batch_size, False):
                Gt_index = Gt[index].view(-1, 1)
                V = self.critic_net(state[index])
                delta = Gt_index - V
                advantage = delta.detach()

                action_prob = self.actor_net(state[index]).gather(1, action[index])  # new policy

                ratio = (action_prob / old_action_log_prob[index])  # Ratio between current and old policy probabilities
                surr1 = ratio * advantage
                surr2 = clamp(ratio, 1 - self.clip_param, 1 + self.clip_param) * advantage

                action_loss = -torch_min(surr1, surr2).mean()  # MAX->MIN descent
                self.actor_optimizer.zero_grad()  # Delete old gradients
                action_loss.backward()  # Perform backward step to compute new gradients
                nn.utils.clip_grad_norm_(self.actor_net.parameters(), self.max_grad_norm)  # Clip gradients
                self.actor_optimizer.step()  # Perform training step based on gradients

                value_loss = F.mse_loss(Gt_index, V)
                self.critic_net_optimizer.zero_grad()
                value_loss.backward()
                nn.utils.clip_grad_norm_(self.critic_net.parameters(), self.max_grad_norm)
                self.critic_net_optimizer.step()

        # After each training step, the buffer is cleared
        del self.buffer[:]
        
from collections import namedtuple       #Named_tuple, with addional elements similar to object
Transition = namedtuple('Transition', ['state', 'action', 'a_log_prob', 'reward', 'next_state']) 

In [41]:
# The Actor and Critic nodes are used with in the PPOAgent Class
class Actor(nn.Module):
    def __init__(self, number_of_inputs, number_of_outputs):
        super(Actor, self).__init__()
        self.fc1 = nn.Linear(number_of_inputs, 10)
        self.fc2 = nn.Linear(10, 10)
        self.action_head = nn.Linear(10, number_of_outputs)
    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        action_prob = F.softmax(self.action_head(x), dim=1)
        return action_prob


class Critic(nn.Module):
    def __init__(self, number_of_inputs):
        super(Critic, self).__init__()
        self.fc1 = nn.Linear(number_of_inputs, 10)
        self.fc2 = nn.Linear(10, 10)
        self.state_value = nn.Linear(10, 1)
    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        value = self.state_value(x)
        return value

In [6]:
import numpy as np
import torch

my_list = [1, 2, 3, 4, 5]
numpy_array = np.array(my_list)  # Convert the list to a NumPy array
print(numpy_array)

new_array = torch.from_numpy(numpy_array)  # Create a new NumPy array from the existing NumPy array

print(new_array)

[1 2 3 4 5]
tensor([1, 2, 3, 4, 5], dtype=torch.int32)


In [70]:
from torch import from_numpy
import numpy as np
numpy_array = np.array([1,2,3])             #[1,2,3], "but its numpy"
tensor = from_numpy(numpy_array)            #tensor([1, 2, 3], dtype=torch.int32)
tensor = tensor.float().unsqueeze(0)        #tensor([[1., 2., 3.]])       # doubles with two brackets
agent_input = tensor
#agent_input = from_numpy(np.array(agent_input)).float().unsqueeze(0)\
print(agent_input)

from torch import no_grad
Actor(agent_input)                      #self.actor_net = Actor(number_of_inputs, number_of_actor_outputs)
#if self.use_cuda:
#    agent_input = agent_input.cuda()
#with no_grad():
#     action_prob = self.actor_net(agent_input)

tensor([[1., 2., 3.]])


TypeError: __init__() missing 1 required positional argument: 'number_of_outputs'

IndentationError: expected an indented block (2166547415.py, line 7)

In [3]:
from torch.distributions import Categorical
