<a href="https://colab.research.google.com/github/hallpaz/drl/blob/main/notebooks/actor_critic.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

O objetivo deste projeto é implementar um ambiente de simulação no framework Gym para resolver um problema específico. Neste problema, há uma coluna denominada "Random" contendo 50 posições, cada uma com um valor aleatório entre 1 e 100 (distribuição uniforme). Além disso, há um campo chamado "Target" que recebe um valor aleatório entre 1 e 100 (distribuição uniforme).

A missão do agente é selecionar até 5 posições da coluna "Random", uma de cada vez, de modo que a soma dos valores dessas 5 posições alcance o valor do campo "Target", indicando o fim do episódio.

In [None]:
!pip install gymnasium



É importante destacar que o agente só pode "olhar" para uma posição de cada vez da coluna "Random".

Um episódio é finalizado se a soma de target foi satisfeita ou todas os valores na coluna Random foram vistos ao menos 1 vez.

Implementação:
1 - Criação do Ambiente de simulação utilizando o framework Gym.
2 - Definição inicial dos hiperparâmetros.
3 - Arquitetura da Rede Neural com Tensorflow.
4 - Treinamento do Agente.
5 - Teste e Demonstração do Agente Maduro.

In [None]:
import gymnasium as gym
import numpy as np
import tensorflow as tf
import tensorflow_probability as tfp

In [None]:
COMBINE = 1
DONT_COMBINE = 0

In [None]:
class MatchingEnv(gym.Env):
  def __init__(self, options_size=50, target_size=1, options_limit=5):
    self.options_size = options_size
    self.target_size = target_size
    self.options_limit = options_limit
    self.action_space = [0, 1]

    self.reset()

  def reset(self):
    self.options = np.random.randint(1, 101, self.options_size)
    self.target = np.random.randint(1, 101, self.target_size)
    self.current_option_index = 0
    self.current_target_index = 0
    self.selected = []

    return (self.options[self.current_option_index],
            self.target[self.current_target_index],
            self.options_size - 1,
            len(self.selected))

  def step(self, action):
    done = False
    reward = 0
    if action == COMBINE:
      current_value = self.options[self.current_option_index]
      self.selected.append(current_value)
      # avança para o próximo valor
      self.current_option_index += 1
      # calcula o quanto falta em relação ao target
      # self.target[self.current_target_index] -= current_value
      remaining_value = self.target[self.current_target_index] - sum(self.selected)
      # quantos passos ainda faltam
      remaining_steps = self.options_size - self.current_option_index - 1

      if remaining_value > 0:
        reward = current_value
      elif remaining_value == 0:
        reward = 100 * 10**(self.options_limit - len(self.selected))
        done = True
      else:
        reward = -sum(self.selected)
        done = True

    else:
      self.current_option_index += 1
      # calcula o quanto falta em relação ao target
      remaining_value = self.target[self.current_target_index] - sum(self.selected)
      # quantos passos ainda faltam
      remaining_steps = self.options_size - self.current_option_index - 1

    if remaining_steps < 0 and not self.selected:
      reward = -100

    if remaining_steps < 0 or len(self.selected) >= self.options_limit or reward < 0:
      done = True

    next_state = (self.options[self.current_option_index % self.options_size],
                remaining_value,
                remaining_steps,
                len(self.selected))
    print(next_state)
    return next_state, reward, done, {}

In [None]:
env = MatchingEnv()

A cada seleção que o agente aposta em fazer, as posições da coluna "Random" são sorteadas novamente, exceto aquelas que foram previamente escolhidas pelo agente.

In [None]:
class critic(tf.keras.Model):
  def __init__(self):
    super().__init__()
    self.d1 = tf.keras.layers.Dense(1024,activation='relu')
    self.d2 = tf.keras.layers.Dense(512,activation='relu')
    self.v = tf.keras.layers.Dense(1, activation = None)

  def call(self, input_data):
    x = self.d1(input_data)
    x = self.d2(x)
    v = self.v(x)
    return v


class actor(tf.keras.Model):
  def __init__(self):
    super().__init__()
    self.d1 = tf.keras.layers.Dense(1024,activation='relu')
    self.d2 = tf.keras.layers.Dense(512,activation='relu')
    self.a = tf.keras.layers.Dense(2, activation='softmax')

  def call(self, input_data):
    x = self.d1(input_data)
    x = self.d2(x)
    a = self.a(x)
    return a


In [None]:
class Agent():
    def __init__(self, gamma = 0.99):
        self.gamma = gamma
        self.a_opt = tf.keras.optimizers.Adam(learning_rate=5e-6)
        self.c_opt = tf.keras.optimizers.Adam(learning_rate=5e-6)
        self.actor = actor()
        self.critic = critic()
        self.log_prob = None

    def act(self,state):
        prob = self.actor(np.array([state]))
        prob = prob.numpy()
        dist = tfp.distributions.Categorical(probs=prob, dtype=tf.float32)
        action = dist.sample()
        return int(action.numpy()[0])

    def actor_loss(self, prob, action, td):
        dist = tfp.distributions.Categorical(probs=prob, dtype=tf.float32)
        log_prob = dist.log_prob(action)
        loss = -log_prob*td
        return loss


    def learn(self, state, action, reward, next_state, done):
        state = np.array([state])
        next_state = np.array([next_state])

        with tf.GradientTape() as tape1, tf.GradientTape() as tape2:
            p = self.actor(state, training=True)

            v =  self.critic(state,training=True)
            vn = self.critic(next_state, training=True)
            td = reward + self.gamma*vn*(1-int(done)) - v
            a_loss = self.actor_loss(p, action, td)
            c_loss = td**2
        grads1 = tape1.gradient(a_loss, self.actor.trainable_variables)
        grads2 = tape2.gradient(c_loss, self.critic.trainable_variables)
        self.a_opt.apply_gradients(zip(grads1, self.actor.trainable_variables))
        self.c_opt.apply_gradients(zip(grads2, self.critic.trainable_variables))
        return a_loss, c_loss

In [None]:
agent = Agent()
steps = 10000

In [None]:
for s in range(steps):
  done = False
  state = env.reset()
  total_reward = 0
  all_aloss = []
  all_closs = []

  while not done:
    action = agent.act(state)
    next_state, reward, done, _ = env.step(action)
    aloss, closs = agent.learn(state, action, reward, next_state, done)
    all_aloss.append(aloss)
    all_closs.append(closs)
    state = next_state
    total_reward += reward

    if done:
      print(f"Selected: {env.selected}, for target: {env.target[0]}")
      print("total reward after {} steps is {}".format(s, total_reward))

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
(70, 4, 36, 1)
(88, 4, 35, 1)
(17, 4, 34, 1)
(65, 4, 33, 1)
(57, 4, 32, 1)
(19, 4, 31, 1)
(31, 4, 30, 1)
(30, 4, 29, 1)
(49, 4, 28, 1)
(46, 4, 27, 1)
(12, 4, 26, 1)
(57, 4, 25, 1)
(36, 4, 24, 1)
(94, 4, 23, 1)
(73, 4, 22, 1)
(81, 4, 21, 1)
(17, 4, 20, 1)
(51, 4, 19, 1)
(46, 4, 18, 1)
(60, 4, 17, 1)
(76, 4, 16, 1)
(76, 4, 15, 1)
(73, 4, 14, 1)
(85, 4, 13, 1)
(26, 4, 12, 1)
(53, 4, 11, 1)
(18, 4, 10, 1)
(94, 4, 9, 1)
(24, 4, 8, 1)
(60, 4, 7, 1)
(93, 4, 6, 1)
(68, 4, 5, 1)
(31, 4, 4, 1)
(86, 4, 3, 1)
(81, 4, 2, 1)
(44, 4, 1, 1)
(40, 4, 0, 1)
(69, 4, -1, 1)
Selected: [6], for target: 10
total reward after 5847 steps is 6
(76, 18, 48, 1)
(83, 18, 47, 1)
(63, 18, 46, 1)
(54, 18, 45, 1)
(13, 18, 44, 1)
(7, 5, 43, 2)
(49, -2, 42, 3)
Selected: [71, 13, 7], for target: 89
total reward after 5848 steps is -7
(40, 46, 48, 1)
(49, 6, 47, 2)
(41, 6, 46, 2)
(8, 6, 45, 2)
(54, -2, 44, 3)
Selected: [31, 40, 8], for target: 77
total reward