<a href="https://colab.research.google.com/github/Tbarkin121/ML_Examples/blob/main/CartPole_Actor_Critic_NoGradTape.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

https://adventuresinmachinelearning.com/a2c-advantage-actor-critic-tensorflow-2/

In [123]:
"""
Classic cart-pole system implemented by Rich Sutton et al.
Copied from http://incompleteideas.net/sutton/book/code/pole.c
permalink: https://perma.cc/C9ZM-652R
"""

import math
import gym
from gym import spaces, logger
from gym.utils import seeding
import numpy as np


class CartPoleEnv(gym.Env):
    """
    Description:
        A pole is attached by an un-actuated joint to a cart, which moves along
        a frictionless track. The pendulum starts upright, and the goal is to
        prevent it from falling over by increasing and reducing the cart's
        velocity.
    Source:
        This environment corresponds to the version of the cart-pole problem
        described by Barto, Sutton, and Anderson
    Observation:
        Type: Box(4)
        Num     Observation               Min                     Max
        0       Cart Position             -4.8                    4.8
        1       Cart Velocity             -Inf                    Inf
        2       Pole Angle                -0.418 rad (-24 deg)    0.418 rad (24 deg)
        3       Pole Angular Velocity     -Inf                    Inf
    Actions:
        Type: Discrete(2)
        Num   Action
        0     Push cart to the left
        1     Push cart to the right
        Note: The amount the velocity that is reduced or increased is not
        fixed; it depends on the angle the pole is pointing. This is because
        the center of gravity of the pole increases the amount of energy needed
        to move the cart underneath it
    Reward:
        Reward is 1 for every step taken, including the termination step
    Starting State:
        All observations are assigned a uniform random value in [-0.05..0.05]
    Episode Termination:
        Pole Angle is more than 12 degrees.
        Cart Position is more than 2.4 (center of the cart reaches the edge of
        the display).
        Episode length is greater than 200.
        Solved Requirements:
        Considered solved when the average return is greater than or equal to
        195.0 over 100 consecutive trials.
    """

    metadata = {
        'render.modes': ['human', 'rgb_array'],
        'video.frames_per_second': 50
    }

    def __init__(self):
        self.gravity = 9.8
        self.masscart = 1.0
        self.masspole = 0.1
        self.total_mass = (self.masspole + self.masscart)
        self.length = 0.5  # actually half the pole's length
        self.polemass_length = (self.masspole * self.length)
        self.force_mag = 10.0
        self.tau = 0.02  # seconds between state updates
        self.kinematics_integrator = 'euler'

        # Angle at which to fail the episode
        self.theta_threshold_radians = 30 * 2 * math.pi / 360
        self.x_threshold = 2.4

        # Angle limit set to 2 * theta_threshold_radians so failing observation
        # is still within bounds.
        high = np.array([self.x_threshold * 2,
                         np.finfo(np.float32).max,
                         self.theta_threshold_radians * 2,
                         np.finfo(np.float32).max],
                        dtype=np.float32)

        self.action_space = spaces.Discrete(2)
        self.observation_space = spaces.Box(-high, high, dtype=np.float32)

        self.seed()
        self.viewer = None
        self.state = None

        self.steps_beyond_done = None

        self.target_location = 0.0
        self.target_weight = 0.0

    def seed(self, seed=None):
        self.np_random, seed = seeding.np_random(seed)
        return [seed]

    def step(self, action):
        err_msg = "%r (%s) invalid" % (action, type(action))
        assert self.action_space.contains(action), err_msg

        x, x_dot, theta, theta_dot = self.state
        force = self.force_mag if action == 1 else -self.force_mag
        costheta = math.cos(theta)
        sintheta = math.sin(theta)

        # For the interested reader:
        # https://coneural.org/florian/papers/05_cart_pole.pdf
        temp = (force + self.polemass_length * theta_dot ** 2 * sintheta) / self.total_mass
        thetaacc = (self.gravity * sintheta - costheta * temp) / (self.length * (4.0 / 3.0 - self.masspole * costheta ** 2 / self.total_mass))
        xacc = temp - self.polemass_length * thetaacc * costheta / self.total_mass

        if self.kinematics_integrator == 'euler':
            x = x + self.tau * x_dot
            x_dot = x_dot + self.tau * xacc
            theta = theta + self.tau * theta_dot
            theta_dot = theta_dot + self.tau * thetaacc
        else:  # semi-implicit euler
            x_dot = x_dot + self.tau * xacc
            x = x + self.tau * x_dot
            theta_dot = theta_dot + self.tau * thetaacc
            theta = theta + self.tau * theta_dot

        self.state = (x, x_dot, theta, theta_dot)

        done = bool(
            x < -self.x_threshold
            or x > self.x_threshold
            or theta < -self.theta_threshold_radians
            or theta > self.theta_threshold_radians
        )
        dist_err = self.target_weight*(self.target_location - self.state[0])**2
        if not done:
            reward = 1.0 - dist_err
            
        elif self.steps_beyond_done is None:
            # Pole just fell!
            self.steps_beyond_done = 0
            reward = 1.0
        else:
            if self.steps_beyond_done == 0:
                logger.warn(
                    "You are calling 'step()' even though this "
                    "environment has already returned done = True. You "
                    "should always call 'reset()' once you receive 'done = "
                    "True' -- any further steps are undefined behavior."
                )
            self.steps_beyond_done += 1
            reward = 0.0

        return np.array(self.state), reward, done, {}

    def reset(self):
        self.state = self.np_random.uniform(low=-0.05, high=0.05, size=(4,))
        self.steps_beyond_done = None
        return np.array(self.state)

    def render(self, mode='human'):
        screen_width = 600
        screen_height = 400

        world_width = self.x_threshold * 2
        scale = screen_width/world_width
        carty = 100  # TOP OF CART
        polewidth = 10.0
        polelen = scale * (2 * self.length)
        cartwidth = 50.0
        cartheight = 30.0

        if self.viewer is None:
            from gym.envs.classic_control import rendering
            self.viewer = rendering.Viewer(screen_width, screen_height)
            l, r, t, b = -cartwidth / 2, cartwidth / 2, cartheight / 2, -cartheight / 2
            axleoffset = cartheight / 4.0
            cart = rendering.FilledPolygon([(l, b), (l, t), (r, t), (r, b)])
            self.carttrans = rendering.Transform()
            cart.add_attr(self.carttrans)
            self.viewer.add_geom(cart)
            l, r, t, b = -polewidth / 2, polewidth / 2, polelen - polewidth / 2, -polewidth / 2
            pole = rendering.FilledPolygon([(l, b), (l, t), (r, t), (r, b)])
            pole.set_color(.8, .6, .4)
            self.poletrans = rendering.Transform(translation=(0, axleoffset))
            pole.add_attr(self.poletrans)
            pole.add_attr(self.carttrans)
            self.viewer.add_geom(pole)
            self.axle = rendering.make_circle(polewidth/2)
            self.axle.add_attr(self.poletrans)
            self.axle.add_attr(self.carttrans)
            self.axle.set_color(.5, .5, .8)
            self.viewer.add_geom(self.axle)
            self.track = rendering.Line((0, carty), (screen_width, carty))
            self.track.set_color(0, 0, 0)
            self.viewer.add_geom(self.track)

            self._pole_geom = pole

        if self.state is None:
            return None

        # Edit the pole polygon vertex
        pole = self._pole_geom
        l, r, t, b = -polewidth / 2, polewidth / 2, polelen - polewidth / 2, -polewidth / 2
        pole.v = [(l, b), (l, t), (r, t), (r, b)]

        x = self.state
        cartx = x[0] * scale + screen_width / 2.0  # MIDDLE OF CART
        self.carttrans.set_translation(cartx, carty)
        self.poletrans.set_rotation(-x[2])

        return self.viewer.render(return_rgb_array=mode == 'rgb_array')

    def close(self):
        if self.viewer:
            self.viewer.close()
            self.viewer = None

    def set_target(self, target, weight):
        self.target_location = target
        self.target_weight = weight


In [124]:
%matplotlib inline
!pip install stable-baselines3[extra]



In [125]:
%%bash
# Install additional packages for visualization
sudo apt-get install -y xvfb python-opengl > /dev/null 2>&1
pip install pyvirtualdisplay > /dev/null 2>&1
pip install git+https://github.com/tensorflow/docs > /dev/null 2>&1

In [126]:
import time
import datetime, os

import collections
import gym
import numpy as np
import statistics
import tensorflow as tf
import tqdm

from matplotlib import pyplot as plt
from tensorflow.keras import layers
from typing import Any, List, Sequence, Tuple

from collections import namedtuple, deque
import random

# Set seed for experiment reproducibility
# seed = 66
seed = 8675302
tf.random.set_seed(seed)
np.random.seed(seed)
random.seed(seed)
# Small epsilon value for stabilizing division operations
eps = np.finfo(np.float32).eps.item()

In [127]:
# Create the environment
# env = gym.make("CartPole-v1")
env = CartPoleEnv()
env.set_target(target=1.0, weight=0.2)
env.seed(seed)

# Box(4,) means that it is a Vector with 4 components
print("Observation space:", env.observation_space)
print("Shape:", env.observation_space.shape[0])
# Discrete(2) means that there is two discrete actions
print("Action space:", env.action_space)

# Wrap OpenAI Gym's `env.step` call as an operation in a TensorFlow function.
# This would allow it to be included in a callable TensorFlow graph.

def env_step(action: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
  """Returns state, reward and done flag given an action."""

  state, reward, done, _ = env.step(action)
  return (state.astype(np.float32), np.array(reward, np.float32), np.array(done, np.int32))


def tf_env_step(action: tf.Tensor) -> List[tf.Tensor]:
  return tf.numpy_function(env_step, [action], [tf.float32, tf.float32, tf.int32])
  

num_obs = env.observation_space.shape[0]
num_actions = env.action_space.n

print('num_obs = {}'.format(num_obs))
print('num_actions = {}'.format(num_actions))


Observation space: Box(-3.4028234663852886e+38, 3.4028234663852886e+38, (4,), float32)
Shape: 4
Action space: Discrete(2)
num_obs = 4
num_actions = 2


In [128]:
Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward', 'done'))

class ExperienceReplay(object):

    def __init__(self, capacity):
        self.memory = deque([],maxlen=capacity)

    def push(self, *args):
        """Save a transition"""
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

In [129]:
class ActorNet(tf.keras.Model):
  """Actor network."""
  def __init__(self, num_actions: int, num_hidden_units: int):
    """Initialize."""
    super().__init__()
    # self.actor_input = layers.Input(shape=(4))
    self.d1 = layers.Dense(num_hidden_units)
    self.lr1 = layers.LeakyReLU()
    self.d2 = layers.Dense(num_hidden_units)
    self.lr2 = layers.LeakyReLU()
    # self.a = layers.Dense(num_actions, activation='softmax')
    self.a = layers.Dense(num_actions, activation='tanh')
    # self.a = layers.Dense(num_actions, activation='sigmoid')
    # self.a = layers.Dense(num_actions)
    

  def call(self, inputs: tf.Tensor) -> Tuple[tf.Tensor, tf.Tensor]:
    x = self.d1(inputs)
    # x = tf.keras.activations.tanh(x)
    x = self.lr1(x)
    x = self.d2(x)
    x = self.lr2(x)
    return self.a(x)


class CriticNet(tf.keras.Model):
  """Critic network."""
  def __init__(self, num_actions:int, num_hidden_units: int):
    """Initialize."""
    super().__init__()
    # self.critic_input = layers.Input(shape=(4))
    self.d1 = layers.Dense(num_hidden_units)
    self.lr1 = layers.LeakyReLU()
    self.d2 = layers.Dense(num_hidden_units)
    self.lr2 = layers.LeakyReLU()
    # self.critic = layers.Dense(num_actions)
    self.c = layers.Dense(1)

  def call(self, inputs: tf.Tensor) -> Tuple[tf.Tensor, tf.Tensor]:
    x = self.d1(inputs)
    # x = tf.keras.activations.tanh(x)
    x = self.lr1(x)
    x = self.d2(x)
    x = self.lr2(x)
    return self.c(x)

In [130]:
from tensorflow.keras.losses import Loss
class ActorLoss(Loss):
  def call(self, y_true, y_pred):
    return tf.reduce_mean((y_pred - y_true)**2, axis=-1)

testActorLoss = ActorLoss()
y_true = tf.Variable([1,2], dtype='float32')
y_pred = tf.Variable([3,3], dtype='float32')
print(y_true.shape)
loss = testActorLoss(y_true, y_pred)
print('loss = {}'.format(loss))



(2,)
loss = 2.5


In [131]:
class Agent():
    def __init__(self, gamma = 0.99, mem_size = 100000, batch_size = 1024, mini_batch_size = 128, n_mini_batches=1):
        self.gamma = gamma
        self.mem_size = mem_size
        self.batch_size = batch_size
        self.mini_batch_size = mini_batch_size
        self.n_mini_batches = n_mini_batches
        self.memory = ExperienceReplay(self.mem_size)

        self.a_opt = tf.keras.optimizers.Adam(learning_rate=1e-3)
        self.c_opt = tf.keras.optimizers.Adam(learning_rate=1e-3)


        self.actor = ActorNet(num_actions, 32)
        self.critic = CriticNet(num_actions, 32)

        self.actor.compile(
                    optimizer='adam')

        self.critic.compile(
                    optimizer='adam')
        
        self.huber_loss = tf.keras.losses.Huber(reduction=tf.keras.losses.Reduction.SUM)

        self.I = 1
        # Define our metrics
        self.actor_loss_metric = tf.keras.metrics.Mean('actor_loss', dtype=tf.float32)
        self.critic_loss_metric = tf.keras.metrics.Mean('critic_loss', dtype=tf.float32)
        self.logits1_metric = tf.keras.metrics.Mean('logits1', dtype=tf.float32)
        self.logits2_metric = tf.keras.metrics.Mean('logits2', dtype=tf.float32)
        self.probs1_metric = tf.keras.metrics.Mean('probs1', dtype=tf.float32)
        self.probs2_metric = tf.keras.metrics.Mean('probs2', dtype=tf.float32)
        
        # train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy('train_accuracy')
        # test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy('test_accuracy')
          
    def act(self, state, deterministic=False):
        logits = self.actor(state)
        if (deterministic):
            action = tf.argmax(logits, 1)[0]
        else:
            # action = tf.random.categorical(logits, 1)[0, 0]
            # print('logits a = {}'.format(logits))
            # logits = logits + 0.1
            # print('logits b = {}'.format(logits))
            probs = tf.nn.softmax(logits)
            log_probs = tf.math.log(probs)
            action = tf.random.categorical(log_probs, 1)[0, 0]
        return action
    def reset_replay_buffer(self):
        # Reset memory buffer
        self.memory = ExperienceReplay(self.mem_size)
    
    def fill_replay_buff(self, n_samples):        
        # Reset Environment
        state = tf.constant(env.reset(), dtype=tf.float32)
        # Convert state into a batched tensor (batch size = 1)
        state = tf.expand_dims(state, 0)
        for i in range(n_samples):
          # Run the model and to get action probabilities and critic value
          action = self.act(state)

          # print('action = {}'.format(action))
          # Apply action to the environment to get next state and reward
          next_state, reward, done = tf_env_step(action)
          next_state = tf.expand_dims(next_state, 0)
        
          # print('next_state = {}'.format(next_state))
          # print('reward = {}'.format(reward))
          # print('done = {}'.format(done))
          # Store the transition in memory
          # print('')
          #If Episode is done, reset the environment
          if tf.cast(done, tf.bool):
            self.memory.push(state, action, next_state, 0, done)
            state = tf.constant(env.reset(), dtype=tf.float32)
            state = tf.expand_dims(state, 0)
          else:
            self.memory.push(state, action, next_state, reward, done)
            state=next_state

    # @tf.function
    def train(self, n_mini_batches = 1):
        self.fill_replay_buff(self.batch_size)
        for _ in range(n_mini_batches):
            mini_batch = self.memory.sample(self.mini_batch_size)
            # mini_batch = list(agent.memory.memory)[0:self.mini_batch_size]
            mini_batch = Transition(*zip(*mini_batch))
            state_t1 = tf.reshape(mini_batch[0], shape=(self.mini_batch_size, num_obs))
            action_t1 = tf.reshape(mini_batch[1], shape=(self.mini_batch_size, 1))
            state_t2 = tf.reshape(mini_batch[2], shape=(self.mini_batch_size, num_obs))
            reward_t2 = tf.reshape(mini_batch[3], shape=(self.mini_batch_size, 1))
            done = tf.reshape(mini_batch[4], shape=(self.mini_batch_size, 1))
            
            with tf.GradientTape() as tape1, tf.GradientTape() as tape2:
                logits = self.actor(state_t1, training=True)
                values_t1 = self.critic(state_t1, training=True)
                values_t2 = self.critic(state_t2, training=True)
                # print('logits = {}'.format(logits))
                # print('action_t1 = {}'.format(action_t1))
                
                # print('logit_actions = {}'.format(logit_actions))
                # print('logit_actions.shape = {}'.format(logit_actions.shape))
                # time.sleep(5)

                probs = tf.nn.softmax(logits)
                l = logits.numpy()
                p = probs.numpy()
                self.logits1_metric(l[0])
                self.logits2_metric(l[1])
                self.probs1_metric(p[0])
                self.probs2_metric(p[1])

                probs_actions = tf.gather(probs, action_t1, axis=1, batch_dims=1)
                log_probs = tf.math.log(probs_actions)
                # print('probs_actions = {}'.format(probs_actions))
                # print('probs_actions.shape = {}'.format(probs_actions.shape))
                # print('log_probs = {}'.format(log_probs))
                # print('log_probs.shape = {}'.format(log_probs.shape))
                # time.sleep(5)

                returns = (tf.cast(reward_t2, 'float32')) + self.gamma*values_t2*(1-tf.cast(done, 'float32'))
                # returns = -tf.cast(done, 'float32') + self.gamma*values_t2*(1-tf.cast(done, 'float32'))
                advantage =  returns - values_t1 

                entropy_loss = -tf.math.reduce_mean(probs_actions*log_probs)
                # actor_loss = -log_probs*advantage
                # actor_loss = tf.math.reduce_mean(-self.I*log_probs * advantage) - 0.00001*entropy_loss
                actor_loss = tf.math.reduce_mean(-log_probs * advantage) - 0.001*entropy_loss
                self.I *= self.gamma
                critic_loss = 0.5*self.huber_loss(values_t1, returns)
                # critic_loss = 0.5*tf.math.reduce_mean(advantage**2)

                
                
            grads1 = tape1.gradient(actor_loss, self.actor.trainable_variables)
            grads2 = tape2.gradient(critic_loss, self.critic.trainable_variables)

            self.a_opt.apply_gradients(zip(grads1, self.actor.trainable_variables))
            self.c_opt.apply_gradients(zip(grads2, self.critic.trainable_variables))

            self.actor_loss_metric(actor_loss)
            self.critic_loss_metric(critic_loss)

        # return actor_loss, critic_loss
    def summary(self):
        pass
        # state = tf.constant(env.reset(), dtype=tf.float32)
        # state = tf.expand_dims(state, 0)
        # self.actor.build(input_shape=(1,4))
        # self.actor.summary()
        # self.critic.build(input_shape=(1,4))
        # self.critic.summary()

In [10]:
agent = Agent(mem_size=3, batch_size=100, mini_batch_size=10, n_mini_batches=10)
agent.reset_replay_buffer()
agent.fill_replay_buff(3)

state = tf.constant(env.reset(), dtype=tf.float32)
state = tf.expand_dims(state, 0)
print(state)
# output = agent.actor(state)
# output2 = agent.critic(state)
# print(output)
# print(output2)

tf.Tensor([[-0.0495821  -0.02786041 -0.03143258 -0.03141892]], shape=(1, 4), dtype=float32)


In [11]:
# agent = Agent(mem_size=3, batch_size=100, mini_batch_size=10, n_mini_batches=10)
# agent.reset_replay_buffer()
# agent.fill_replay_buff(3)
# # agent.summary()

# for i in range(len(agent.memory)):
#   print('mem = {}'.format(i))
#   print(agent.memory.memory[i][0])

# agent.fill_replay_buff(1)

# for i in range(len(agent.memory)):
#   print('mem = {}'.format(i))
#   print(agent.memory.memory[i][0])



In [132]:
# # Trying to log graph... didn't really work
# import datetime
# stamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
# logdir = 'logs/func/%s' % stamp
# writer = tf.summary.create_file_writer(logdir)

# tf.summary.trace_on(graph=True, profiler=True)
agent = Agent(mem_size=10000, gamma=0.99, batch_size=501, mini_batch_size=1000, n_mini_batches=10)
agent.fill_replay_buff(1000)
# agent.actor(state)
# with writer.as_default():
#   tf.summary.trace_export(
#       name="my_func_trace",
#       step=0,
#       profiler_outdir=logdir)

In [13]:
import datetime

reward_metric = tf.keras.metrics.Mean('reward', dtype=tf.float32)

current_time = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
grand_log_dir = 'logs/gradient_tape/' + current_time + '/grand'
# actor_log_dir = 'logs/gradient_tape/' + current_time + '/actor'
# critic_log_dir = 'logs/gradient_tape/' + current_time + '/critic'
# reward_log_dir = 'logs/gradient_tape/' + current_time + '/reward'
grand_summary_writer = tf.summary.create_file_writer(grand_log_dir)
# actor_summary_writer = tf.summary.create_file_writer(actor_log_dir)
# critic_summary_writer = tf.summary.create_file_writer(critic_log_dir)
# reward_summary_writer = tf.summary.create_file_writer(reward_log_dir)


min_episodes_criterion = 100
max_episodes = 500
max_steps_per_episode = 1000

# Cartpole-v0 is considered solved if average reward is >= 195 over 100 
# consecutive trials
reward_threshold = 495
running_reward = 0


# Keep last episodes reward
episodes_reward: collections.deque = collections.deque(maxlen=min_episodes_criterion)
env.set_target(-1.9, 0.2)
with tqdm.trange(max_episodes) as t:
  with grand_summary_writer.as_default():
    for i in t:
      agent.train(agent.n_mini_batches)
      # with actor_summary_writer.as_default():
      tf.summary.scalar('actor_loss', agent.actor_loss_metric.result(), step=i)
      # with critic_summary_writer.as_default():
      tf.summary.scalar('critic_loss', agent.critic_loss_metric.result(), step=i)
      tf.summary.scalar('logits1', agent.logits1_metric.result(), step=i)
      tf.summary.scalar('logits2', agent.logits2_metric.result(), step=i)
      tf.summary.scalar('probs1', agent.probs1_metric.result(), step=i)
      tf.summary.scalar('probs2', agent.probs2_metric.result(), step=i)
      
      agent.actor_loss_metric.reset_states()
      agent.critic_loss_metric.reset_states()
      agent.logits1_metric.reset_states()
      agent.logits2_metric.reset_states()
      agent.probs1_metric.reset_states()
      agent.probs2_metric.reset_states()
      

      episode_reward = 0
      state = tf.constant(env.reset(), dtype=tf.float32)
      state = tf.expand_dims(state, 0)
      logits = agent.actor(state)
      probs = tf.nn.softmax(logits)
      log_probs = tf.math.log(probs)
      p = probs.numpy()
      lp = log_probs.numpy()
      # print('p(a1) = {}. p(a2) = {}'.format(lp[0][0],lp[0][1]))
      for _ in range(500):
          action = agent.act(state, deterministic=True)

          # print('action_logits_t = {}'.format(action_logits_t))
          
          # print('action = {}'.format(action))
          # Apply action to the environment to get next state and reward
          state, reward, done = tf_env_step(action)
          state = tf.expand_dims(state, 0)
          episode_reward += reward
          if (tf.cast(done, tf.bool)):
              env.reset()
              break
      reward_metric(episode_reward)
      # with reward_summary_writer.as_default():
      tf.summary.scalar('episode reward', reward_metric.result(), step=i)
      reward_metric.reset_states()

      episodes_reward.append(episode_reward.numpy())
      running_reward = statistics.mean(episodes_reward)

      t.set_description(f'Episode {i}')
      t.set_postfix(episode_reward=episode_reward.numpy(), running_reward=running_reward, pa0 = p[0][0], pa1 = p[0][1])

      # Show average episode reward every 10 episodes
      if i % 10 == 0:
        pass
        # print(f'Episode {i}: average reward: {avg_reward}')

      if running_reward > reward_threshold and i >= min_episodes_criterion:  
        break

print(f'\nSolved at episode {i}: average reward: {running_reward:.2f}!')

Episode 65:  13%|█▎        | 66/500 [01:32<10:07,  1.40s/it, episode_reward=102, pa0=0.298, pa1=0.702, running_reward=45.7]


KeyboardInterrupt: ignored

In [None]:
action = agent.act(state)
actor_out = agent.actor(state)
print(actor_out)

In [None]:
print(agent.actor(state))

In [None]:
# Render an episode and save as a GIF file
from IPython import display as ipythondisplay
from PIL import Image
from pyvirtualdisplay import Display
from datetime import datetime

display = Display(visible=0, size=(400, 300))
display.start()


def render_episode(env: gym.Env, model: tf.keras.Model, max_steps: int): 
  screen = env.render(mode='rgb_array')
  im = Image.fromarray(screen)

  images = [im]

  state = tf.constant(env.reset(), dtype=tf.float32)
  state = tf.expand_dims(state, 0)
  for i in range(1, 500 + 1):

    action = agent.act(state, deterministic=True)
    # action = agent.act(state, deterministic=False)
    state, reward, done = tf_env_step(action)
    state = tf.expand_dims(state, 0)
    print('reward = {}'.format(reward))
    # Render screen every n steps
    n=2
    if i % n == 0:
      screen = env.render(mode='rgb_array')
      images.append(Image.fromarray(screen))

    if done:
      break

  return images


# Save GIF image
images = render_episode(env, agent.actor, max_steps_per_episode)
image_file = 'cartpole-v1.gif'
# loop=0: loop forever, duration=1: play each frame for 1ms
images[0].save(
    image_file, save_all=True, append_images=images[1:], loop=0, duration=1)


import tensorflow_docs.vis.embed as embed
embed.embed_file(image_file)

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!zip -r /content/file.zip /content/logs/
# from google.colab import files
# files.download("/content/file.zip")

In [None]:
# rm -r logs/

In [None]:
# %load_ext tensorboard
# %reload_ext tensorboard
%tensorboard --logdir ./logs/func

In [None]:
class ActorNet(tf.keras.Model):
  """Actor network."""
  def __init__(self, num_actions: int, num_hidden_units: int):
    """Initialize."""
    super().__init__()
    # self.actor_input = layers.Input(shape=(4))
    self.d1 = layers.Dense(num_hidden_units)
    self.lr1 = layers.LeakyReLU()
    self.d2 = layers.Dense(num_hidden_units)
    self.lr2 = layers.LeakyReLU()
    # self.a = layers.Dense(num_actions, activation='softmax')
    self.a = layers.Dense(num_actions, activation='tanh')
    # self.a = layers.Dense(num_actions, activation='sigmoid')
    # self.a = layers.Dense(num_actions)
    

  def call(self, inputs: tf.Tensor) -> Tuple[tf.Tensor, tf.Tensor]:
    x = self.d1(inputs)
    # x = tf.keras.activations.tanh(x)
    x = self.lr1(x)
    x = self.d2(x)
    x = self.lr2(x)
    return self.a(x)


# inputs = tf.keras.Input(shape=(3,))
# x = tf.keras.layers.Dense(4, activation=tf.nn.relu)(inputs)
# outputs = tf.keras.layers.Dense(5, activation=tf.nn.softmax)(x)
# model = tf.keras.Model(inputs=inputs, outputs=outputs)

model = ActorNet(1,2)
model.build(input_shape=[1, 3])
x = tf.random.uniform((1, 3))
test = model(x)

# model.summary()

In [None]:
# The function to be traced.
@tf.function
def eval_model(x):
  # A simple hand-rolled layer.
  return model(x)

# Set up logging.
stamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
logdir = 'logs/func/%s' % stamp
writer = tf.summary.create_file_writer(logdir)

# Sample data for your function.
x = tf.random.uniform((1, 3))

# Bracket the function call with
# tf.summary.trace_on() and tf.summary.trace_export().
# tf.summary.trace_on(graph=True, profiler=True)
tf.profiler.experimental.stop(logdir)
tf.profiler.experimental.start(logdir)
tf.profiler.experimental.trace("Train", step_num=1):
  z = eval_model(x)
# with writer.as_default():
#   tf.summary.trace_export(
#       name="my_func_trace",
#       step=0,
#       profiler_outdir=logdir)
tf.profiler.experimental.stop(logdir)

# Call only one tf.function when tracing.
# z = eval_model(x)
# with writer.as_default():
#   tf.summary.trace_export(
#       name="my_func_trace",
#       step=0,
#       profiler_outdir=logdir)