#I will try to build an simple environment, and alongside it, a DRL system

In [0]:
from __future__ import absolute_import, division, print_function, unicode_literals

# TensorFlow and tf.keras
try:
  # %tensorflow_version only exists in Colab.
  %tensorflow_version 2.x
except Exception:
  pass
import tensorflow as tf
from tensorflow import keras
import numpy as np

In [0]:
import matplotlib.pyplot as plt
import tensorflow.keras.layers as kl
import tensorflow.keras.losses as kls
import tensorflow.keras.optimizers as ko
import logging

#First we need a simple environment.
It should take an action, give observations and a reward.
I will use my ballon_env


In [0]:
#This will be a simple environment where you only have to take one action to survive.
#If you get to 100 points you win
class InfiniteEnv():
  def __init__(self):
    self.action_space = 3 #You can either do A, B, or C.
    self.observation_space = np.zeros((3)) #This will literally be random numbers.
    self.current_step = -1
    self.Death = False
    self.totalReward = 0

  def _get_reward(self):
    """Reward is given for staying alive."""
    if self.Death == False:
        return 1
    else:
        return 0.0

  def _get_state(self):
    """Get the observation."""
    ob = np.random.rand((3))
    return ob        

  def _take_action(self, action):
    if(action == 1 or action == 2):
      self.Death = True
    elif(self.totalReward == 100):
      self.Death = True
    else:
      pass

  def step(self,action):
    if self.Death == True:
      raise RuntimeError("Episode is done")
    self.curr_step += 1
    self._take_action(action)
    reward = self._get_reward()
    self.totalReward += reward
    obs = self._get_state()
    return obs, reward, self.Death, {}

  def reset(self):
    self.curr_step = -1
    self.Death = False
    return self._get_state()


In [98]:
print(np.zeros((3)).shape)
print(np.random.rand((3)).shape)

(3,)
(3,)


In [0]:
class ProbabilityDistribution(tf.keras.Model):
  def call(self, logits, **kwargs):
    # Sample a random categorical action from the given logits.
    return tf.squeeze(tf.random.categorical(logits,1), axis = -1)

In [0]:
class Model(tf.keras.Model): #what the model is: https://www.tensorflow.org/api_docs/python/tf/keras/Model
  def __init__(self, num_actions):
    super().__init__("mlp_policy")
    self.hidden1 = kl.Dense(128, activation = 'relu')
    self.hidden2 = kl.Dense(64, activation= 'relu')
    self.value = kl.Dense(1, name = "value") 
    self.logits = kl.Dense(num_actions, name = "policy_logits")
    self.dist = ProbabilityDistribution()
  
  def call(self,inputs, **kwargs):
    x = tf.convert_to_tensor(inputs) #Inputs son las observations
    hidden_logs = self.hidden1(x)
    hidden_vals = self.hidden2(x)
    return self.logits(hidden_logs), self.value(hidden_vals)
  
  def action_value(self,obs):
    logits, value = self.predict_on_batch(obs) 
    action = self.dist.predict_on_batch(logits)
    return (action, value)

In [0]:
env = InfiniteEnv()
env.action_space
model = Model(num_actions=env.action_space)

Si le pasamos solo la (obs) esta tiene una shape 3, lo que hace que todos las dense nos regresen la forma 3, outputShape. Con obs[None] la forma cambia  a 1,3 lo que hace que la dense regrese formas 1, outputshape.
print(tf.convert_to_tensor(obs)) #La shape de este es 3,
tf.convert_to_tensor(obs[None]) #La shape de este es 1,3

In [102]:
obs = env.reset()
action, value = model.action_value(obs[None])
print(action, value)

tf.Tensor([2], shape=(1,), dtype=int64) tf.Tensor([[0.30355552]], shape=(1, 1), dtype=float32)


Para este punto ya tenemos una red que regresa una acción y un ambiente que regresa lo necesario

#Now we need to train it
We create an agent class to do this.

In [0]:
class A2CAgent:
  def __init__(self, model, lr=7e-3, gamma=0.99, value_c=0.5, entropy_c=1e-4):
    # Coefficients are used for the loss terms.
    self.value_c = value_c
    self.entropy_c = entropy_c
    self.gamma = gamma

    self.model = model
    self.model.compile(
      optimizer=ko.RMSprop(lr=lr),
      # Define separate losses for policy logits and value estimate.
      loss=[self._logits_loss, self._value_loss])

  def test(self, env, render=True):
    obs, done, ep_reward = env.reset(), False, 0 #Get the initial values from the environment
    while not done:
      action, _ = self.model.action_value(obs[None]) #returns the action the model recomends and a value? (What is the value for?)
      obs, reward, done, _ = env.step(action)
      ep_reward += reward #get the reward of the action
    return ep_reward

  def _value_loss(self, returns, value):
    # Value loss is typically MSE between value estimates and returns.
    return self.value_c * kls.mean_squared_error(returns, value)

  def _logits_loss(self, actions_and_advantages, logits):
    # A trick to input actions and advantages through the same API.
    # split that value into 2 ¿through the -1 axis?
    actions, advantages = tf.split(actions_and_advantages, 2, axis=-1)

    # Sparse categorical CE loss obj that supports sample_weight arg on `call()`.
    # `from_logits` argument ensures transformation into normalized probabilities.
    weighted_sparse_ce = kls.SparseCategoricalCrossentropy(from_logits=True)

    # Policy loss is defined by policy gradients, weighted by advantages.
    # Note: we only calculate the loss on the actions we've actually taken.
    actions = tf.cast(actions, tf.int32) #transforms into shape tf.int32
    policy_loss = weighted_sparse_ce(actions, logits, sample_weight=advantages)

    # Entropy loss can be calculated as cross-entropy over itself.
    probs = tf.nn.softmax(logits)
    entropy_loss = kls.categorical_crossentropy(probs, probs)

    # We want to minimize policy and maximize entropy losses.
    # Here signs are flipped because the optimizer minimizes.
    return policy_loss - self.entropy_c * entropy_loss

  def _returns_advantages(self, rewards, dones, values, next_value):
    # `next_value` is the bootstrap value estimate of the future state (critic).
    #append next_value to np_zeroes_like(rewards) through the -1 axis, and that is returns
    returns = np.append(np.zeros_like(rewards), next_value)

    # Returns are calculated as discounted sum of future rewards.
    for t in reversed(range(rewards.shape[0])): 
      #bellmans equation
      returns[t] = rewards[t] + self.gamma * returns[t + 1] * (1 - dones[t])
    returns = returns[:-1] #Take just a single value

    # Advantages are equal to returns - baseline (value estimates in our case).
    advantages = returns - values
    return returns, advantages

  def train(self, env, batch_sz=64, updates=250):
    # Storage helpers for a single batch of data.
    actions = np.empty((batch_sz,), dtype=np.int32)
    rewards, dones, values = np.empty((3, batch_sz)) #Create 3 empty arrays of batch_size:
    observations = np.empty((batch_sz,) + env.observation_space.shape) #create an array of (batchsize, observation_space.shape)

    # Training loop: collect samples, send to optimizer, repeat updates times.
    ep_rewards = [0.0] #current episode rewards
    next_obs = env.reset() #get observation from the environment
    for update in range(updates): #updates is how many times we play the game (our batch count)
      for step in range(batch_sz): #run 64 frames to get the batch
        observations[step] = next_obs.copy() #set the next_obs in an array of observation
        actions[step], values[step] = self.model.action_value(next_obs[None, :]) #pass the obs into the action of our model (the two nnets)
        next_obs, rewards[step], dones[step], _ = env.step(actions[step]) #get the next state of the nevironment with the action taken

        ep_rewards[-1] += rewards[step]
        if dones[step]:  #if we win or loose.
          ep_rewards.append(0.0) #start new rewards
          next_obs = env.reset() #start new environment
          logging.info("Episode: %03d, Reward: %03d" % (
            len(ep_rewards) - 1, ep_rewards[-2]))

      _, next_value = self.model.action_value(next_obs[None, :]) #after a single batch is finished, run another one before starting the new loop

      returns, advs = self._returns_advantages(rewards, dones, values, next_value)
      # A trick to input actions and advantages through same API.
      acts_and_advs = np.concatenate([actions[:, None], advs[:, None]], axis=-1)

      # Performs a full training step on the collected batch.
      # Note: no need to mess around with gradients, Keras API handles it.
      #observations is the training data, [...the other array...] is the target data.
      losses = self.model.train_on_batch(observations, [acts_and_advs, returns])

      logging.debug("[%d/%d] Losses: %s" % (update + 1, updates, losses))

    return ep_rewards



In [104]:
agent = A2CAgent(model)
rewards_history = agent.train(env)
print("Finished training, testing...")
print("%d out of 200" % agent.test(env)) # 200 out of 200

train_enter
Finished training, testing...
0 out of 200


Nota: No se como poner que "ganaste" pero debería de incrementar la reward con el training. Para comprobar haré una prueba simple

In [108]:
#En este modelo, siempre usar 0 es lo correcto. Y después de entrenar efectivamente solo da 1, Victoria!
obs = env.reset()
action, _ = model.action_value(obs[None])
print(action)

tf.Tensor([0], shape=(1,), dtype=int64)
