## RL in tensorflow
Some experiments to learn how to use tensorflow to do RL.

We start with this demo: https://www.tensorflow.org/tutorials/reinforcement_learning/actor_critic

In [8]:
import gymnasium as gym
import collections
import numpy as np
import statistics
import tensorflow as tf
import tqdm

from matplotlib import pyplot as plt
from tensorflow.keras import layers
from typing import Any, List, Sequence, Tuple

In [16]:
env = gym.make('CartPole-v1')

seed = 42
tf.random.set_seed(seed)
np.random.seed(seed)

eps = np.finfo(np.float32).eps.item()

In [17]:
class ActorCritic(tf.keras.Model):
    """combined actor-critic network. """

    def __init__(
        self,
        num_actions: int,
        num_hidden_units: int
    ):
        super().__init__()
        
        self.common = layers.Dense(num_hidden_units, activation = 'relu')
        self.actor = layers.Dense(num_actions)
        self.critic = layers.Dense(1)

    def call(self, inputs: tf.Tensor) -> Tuple[tf.Tensor, tf.Tensor]:
        x = self.common(inputs)
        return self.actor(x), self.critic(x)


In [19]:
env.action_space

Discrete(2)

In [20]:
num_actions = env.action_space.n #.shape[0] for continuous
num_hidden_units = 128

model = ActorCritic(num_actions, num_hidden_units)

In [21]:
# turn this into a tf.numpy_function(Tout=[tf.float32, tf.float32, tf.int32])
@tf.numpy_function(Tout=[tf.float32, tf.int32, tf.int32])
def env_step(action: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """Returns, state, reward and done flag given an action"""

    state, reward, done, truncated, info = env.step(action)
    return (state.astype(np.float32),
            np.array(reward, np.int32),
            np.array(done, np.int32))

def run_episode(
    initial_state: tf.Tensor,
    model: tf.keras.Model,
    max_steps: int
) -> Tuple[tf.Tensor, tf.Tensor, tf.Tensor]:
    """Runs a single episode to collect training data"""

    # this is a set of tf tensors to store results
    action_probs = tf.TensorArray(dtype=tf.float32, size=0, dynamic_size=True)
    values = tf.TensorArray(dtype=tf.float32, size=0, dynamic_size=True)
    rewards = tf.TensorArray(dtype=tf.int32, size=0, dynamic_size=True)

    initial_state_shape = initial_state.shape
    state = initial_state

    for t in tf.range(max_steps):
        # convert state into batched tensor (batch_size =1)
        state = tf.expand_dims(state, 0)

        # run the model and get action probs, critic val
        action_logits_t, value = model(state)
        

        # sample next action from action dist
        ## need to update this for continuous
        action = tf.random.categorical(action_logits_t, 1)[0,0]
        action_probs_t = tf.nn.softmax(action_logits_t)

        # store critic values
        values = values.write(t, tf.squeeze(value))

        # store log probs of chosen action- I think this works because action is discrete for cart pole
        action_probs = action_probs.write(t, action_probs_t[0, action])

        # apply action to the environment
        state, reward, done = env_step(action)
        state.set_shape(initial_state_shape)

        # store reward
        rewards = rewards.write(t, reward)

        return action_probs, values, rewards
        


In [22]:
def get_expected_return(
    rewards: tf.Tensor,
    gamma: float,
    standardize: bool = True
):
    """Compute expected returns"""

    n = tf.shape(rewards)[0] 
    returns = tf.TensorArray(dtype=tf.float32, size=n)

    # start at last reward and then accumulate reward sums into returns array
    rewards = tf.cast(rewards[::-1], dtype=tf.float32)
    discounted_sum = tf.constant(0,0)
    discounted_sum_shape = discounted_sum.shape
    for i in tf.range(n):
        reward = rewards[i]
        discounted_sum = reward + gamma * discounted_sum # discounted_sum= 0 for last reward (i.e. first element in loop)
        discounted_sum.set_shape(discounted_sum_shape)
        returns = returns.write(i, discounted_sum)
    returns = returns.stack()[::-1] # take final element

    if standardize:
        returns = ((returns - tf.reduce_mean(returns)) / 
                    (tf.reduce_std(returns) + eps))
    
    return returns

In [23]:
huber_loss = tf.keras.losses.Huber(reduction = tf.keras.losses.Reduction.SUM)

def compute_loss(
    action_probs: tf.Tensor,
    values: tf.Tensor,
    returns: tf.Tensor
) -> tf.Tensor:
    """Computes combined actor-critic loss"""

    advantage = returns - values

    action_log_probs = tf.log(action_probs)
    actor_loss = -tf.reduce_sum(action_log_probs * advantage)

    critic_loss = huber_loss(values, returns)

    return actor_loss + critic_loss

In [24]:
optimizer = tf.keras.optimizers.Adam(learning_rate=0.01)

# set this as a tf.function for use
@tf.function
def train_step(
    initial_state: tf.Tensor,
    model: tf.keras.Model,
    optimizer: tf.keras.optimizers.Optimizer,
    gamma: float,
    max_steps_per_episode: int) -> tf.Tensor:
    """Runs a model traning step"""

    with tf.GradientTape() as tape:

        # run one episode:
        action_probs, values, rewards = run_episode(
            initial_state, model, max_steps_per_episode
        )

        # calculate the expected returns:
        returns = get_expected_return(rewards, gamma)

        # convert training data to appropriate TF tensor shapes
        action_probs, values, returns = [
            tf.expand_dims(x, 1) for x in [action_probs, values, returns]
        ]

        # calculate the loss values
        loss = compute_loss(action_probs, values, returns)

    # compute the gradients from the loss
    grads = tape.gradient(loss, model.trainable_variables)

    # apply gradients to model params
    optimizer.apply_gradients(zip(grads, model.trainable_variables))

    episode_reward = tf.reduce_sum(rewards)

    return episode_reward


In [None]:
# run it

min_episodes_criterion = 100
max_episodes=10000
max_steps_per_episode=500

reward_threshold=475
running_reward=0

gamma = 0.99

episodes_reward: collections.deque = collections.deque(maxlen=min_episodes_criterion)

t = tqdm.trange(max_episodes)
for i in t:
    initial_state, info = env.reset()
    initial_state = tf.constant(initial_state, dtype=tf.float32)
    episode_reward = int(
        train_step(initial_state, model, optimizer, gamma, max_steps_per_episode)
        )

    episodes_reward.append(episode_reward)
    running_reward = statistics.mean(episodes_reward)

    t.set_postfix(
        episode_reward=episode_reward, running_reward=running_reward
    )

    # show the average episode reward ever 10 episodes
    if i % 10 == 0:
        print(f'Episode {i}: average reward: {avg_reward}')
    
    if running_reward > reward_threshold and i >= min_episodes_criterion:
        break

print(f'\nSolved at episode {i}: average reward: {running_reward:.2f}')