# Imports

In [1]:
import numpy as np
import gymnasium as gym
import tensorflow as tf
import tqdm.notebook as tqdm
import matplotlib.pyplot as plt
from gymnasium.utils.save_video import save_video

2024-05-07 16:04:46.554727: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
              tf.config.experimental.set_memory_growth(gpu, True)
    except RuntimeError as e:
        print(e)

2024-05-07 16:04:48.396515: I external/local_xla/xla/stream_executor/cuda/cuda_executor.cc:998] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355
2024-05-07 16:04:48.427161: I external/local_xla/xla/stream_executor/cuda/cuda_executor.cc:998] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355
2024-05-07 16:04:48.427345: I external/local_xla/xla/stream_executor/cuda/cuda_executor.cc:998] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-

# Environment

Create the [environment](https://gymnasium.farama.org/environments/box2d/bipedal_walker/).

In [3]:
env = gym.make('BipedalWalker-v3', hardcore=False)
eval_env = gym.make('BipedalWalker-v3', hardcore=False)

In [4]:
env.observation_space._shape

(24,)

In [5]:
env.action_space._shape

(4,)

# Replay Buffer

Create a replay buffer to hold game history

In [6]:
class ReplayBuffer:

    def __init__(self, max_size: int, observation_space: gym.spaces.Space, action_space: gym.spaces.Space, seed: int | None = None):
        """Stores the replay history with a maximum of `max_size` entries, removing old entries as needed.

        Parameters:
            max_size: maximal number of entries to keep
            observation_space: specification of the observation space
            action_space: specification of the action space
            seed: seed to initialize the internal random number generator for reproducibility"""
        self.max_size = max_size
        self.done = np.zeros(max_size)
        self.step = 0
        self.rng = np.random.default_rng(seed=seed)
        self.len = 0

        self.current_state = np.zeros((max_size, *observation_space.shape))
        self.action = np.zeros((max_size, *action_space.shape), dtype=int)
        self.reward = np.zeros(max_size)
        self.next_state = np.zeros((max_size, *observation_space.shape))
        
    def add(self, current_observation: np.ndarray, action: np.ndarray, reward: float, next_observation: np.ndarray, done: bool) -> None:
        """Add a new entry to the buffer.

        Parameters:
            current_observation: environment state observed at the current step
            action: action taken by the model
            reward: reward received after taking the action
            next_observation: environment state obversed after taking the action
            done: whether the episode has ended or not"""
        self.current_state[self.step] = current_observation
        self.action[self.step] = action
        self.reward[self.step] = reward
        self.next_state[self.step] = next_observation
        self.done[self.step] = done
        self.step = (self.step + 1) % self.max_size
        self.len = min(self.len + 1, self.max_size)
        
    def sample(self, n_samples: int, replace: bool = True) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
        """Randomly samples `n_samples` from the buffer.

        Parameters:
            n_samples: number of samples to select
            replace: sample with or without replacement

        Returns:
            current observations, actions, rewards, next observations, done"""
        indicies = self.rng.choice(self.len, size=n_samples, replace=replace)
        return (
            self.current_state[indicies], 
            self.action[indicies], 
            self.reward[indicies], 
            self.next_state[indicies], 
            self.done[indicies]
        )

    def clear(self) -> None:
        """Clears the buffer"""
        self.step = self.len = 0

    def __getitem__(self, index: int) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
        """Gets a sample at `index`

        Parameters:
            index: index of the sample to get

        Returns:
            current observation, action, reward, next observation, done"""
        return (
            self.current_state[index], 
            self.action[index], 
            self.reward[index], 
            self.next_state[index], 
            self.done[index]
        )
        
    def __len__(self) -> int:
        """Returns the number of entries in the buffer"""
        return self.len

# Model

Implement your model

In [7]:
def get_name(prefix: str | None = None, suffix: str | None = None, separator: str = '_') -> str | None:
    if prefix is None:
        return suffix
    return prefix and prefix + separator + suffix or suffix or None

In [8]:
def generat_blocs(x, blocks, activation, features, dropout, multiply_freq, name):
    for i in range(blocks):
        x = tf.keras.layers.Dense(features, activation=activation, name=get_name(name, f'dense_{i}'))(x)
        if dropout > 0:
            x = tf.keras.layers.Dropout(dropout, name=get_name(name, f'dropout_{i}'))(x)

        if multiply_freq > 0 and (i + 1) % multiply_freq == 0:
            features *= 2
    return x

In [9]:
def get_critic_model(
    input_features: tuple | int, 
    actions: tuple | int,
    features: int,
    out_features: tuple | int,
    blocks: int, 
    activation: str | tf.keras.layers.Activation | None = 'silu',
    dropout: float = 0.,
    multiply_freq: int = 1,
    name: str | None = None
) -> tf.keras.Model:
    input1 = tf.keras.layers.Input((input_features, ), name=get_name(name, 'observations'))
    input2 = tf.keras.layers.Input((actions, ), name=get_name(name, 'actions'))
    x = tf.keras.layers.concatenate([input1, input2])
    inputs = [input1, input2]
    x = generat_blocs(x, blocks, activation, features, dropout, multiply_freq, name)
    x = tf.keras.layers.Dense(out_features, name=get_name(name, 'prediction'))(x)
    return tf.keras.Model(inputs=inputs, outputs=x, name=name)

In [10]:
def get_policy_model(
    input_features: tuple | int, 
    features: int,
    out_features: tuple | int,
    blocks: int, 
    activation: str | tf.keras.layers.Activation | None = 'silu',
    dropout: float = 0.,
    multiply_freq: int = 1,
    name: str | None = None
) -> tf.keras.Model:
    inputs = x = tf.keras.layers.Input((input_features, ), name=get_name(name, 'input'))
    x = generat_blocs(x, blocks, activation, features, dropout, multiply_freq, name)
    x = tf.keras.layers.Dense(out_features, activation='tanh', name=get_name(name, 'prediction'))(x)
    return tf.keras.Model(inputs=inputs, outputs=x, name=name)

# Play the game

Implement interacting with the environment and storing entries to the replay buffer

In [11]:
def play_game(model: tf.keras.Model, buffer: ReplayBuffer | None, env: gym.Env, max_steps: int, observation: np.ndarray | None = None) -> np.ndarray:
    """Play game and record

    Parameters:
        model: the model to get actions with
        buffer: replay buffer to store the entries to
        env: environment to play
        max_steps: maximal number of steps to perform
        observation: the observation to resume from

    Returns:
        the last observation"""
    if observation is None:
        observation, _ = env.reset()
    buffer = buffer if buffer is not None else ReplayBuffer(1)
    for i in range(max_steps):
        a = model(observation[None], training=False).numpy()[0] # Observe state `s` and select action `a`
        new_observation, score, done, terminated, _ = env.step(a) # Execute `a` in the environment
        buffer.add(observation, a, score, new_observation, done) # Store `(s, a, r, s', d)` in buffe
        if done or terminated: # If `s'` is terminal, reset environment state
            observation, _ = env.reset()
            continue
        observation = new_observation
    return observation

# Loss

Implement double q learning loss

In [39]:
def ddpg_loss(
    current_observation: tf.Tensor, 
    action: tf.Tensor, 
    reward: tf.Tensor, 
    next_observation: tf.Tensor, 
    done: tf.Tensor,
    q_model: tf.keras.Model,
    policy_model: tf.keras.Model,
    target_q_model: tf.keras.Model,
    target_policy_model: tf.keras.Model,
    gamma: float
) -> tuple[tf.Tensor, tf.Tensor]:
    """Computes Deep Deterministic Policy Gradient.

    Parameters:
        current_observation: observations at the current time step
        action: actions taken at the current time step
        reward: rewards at the current time step
        next_observation: observations at the next time step
        done: whether the episode has ended or not
        q_model: q-function model
        policy_model: action prediction model
        target_q_model: target q-function model
        target_policy_model: target action prediction model
        gamma: discount

    Returns:
        Computed losses for q-function and policy models"""
    
    # q(state, action) приближаем к q(state, policy_model(state))

    
    q_current = q_model((current_observation, action))
    current_action = target_policy_model(next_observation)
    q_next = target_q_model([next_observation, current_action.numpy()])
    
    q_ref = reward + gamma * (1. - done) * q_next
    q_loss = tf.math.reduce_mean(tf.square(q_current - q_ref))
    # 
    predict_current_action = policy_model(current_observation)
    policy_loss = -tf.math.reduce_mean(q_model((current_observation, policy_model(current_observation).numpy())))
    return q_loss, policy_loss

# Training

Create models, replay buffers, optimizer, epsilon decay etc. Implement training loop, show training progress and perform model evaluation once in a while

In [41]:
# 24 - пространство стейта, 16 - размерность слоя, 1 - размерность выхода, 12 - еколичество блоков
critic_model = get_critic_model(24, 4, 16, 1, 10, name='action_model', dropout=0.1, multiply_freq=2, activation='swish')
critic_model.summary()
target_critic_model = get_critic_model(24, 4, 16, 1, 10, name='action_model', dropout=0.1, multiply_freq=2, activation='swish')
target_critic_model.trainable = False
target_critic_model.set_weights(critic_model.trainable_weights)

In [42]:
policy_model = get_policy_model(24, 16, 4, 10, name='action_model', dropout=0.1, multiply_freq=2, activation='swish')
policy_model.summary()
target_policy_model = get_policy_model(24, 16, 4, 10, name='action_model', multiply_freq=2, activation='swish')
target_policy_model.trainable = False
target_policy_model.set_weights(policy_model.get_weights())

In [43]:
train_buffer = ReplayBuffer(1000, observation_space=env.observation_space, action_space=env.action_space)
eval_buffer = ReplayBuffer(100, observation_space=eval_env.observation_space, action_space=eval_env.action_space)

In [44]:
optimizer = tf.keras.optimizers.Adam(1e-4, clipnorm=5, decay=2e-5)
policy_optimizer = tf.keras.optimizers.Adam(1e-4, clipnorm=5, decay=2e-5)



In [45]:
epochs = 10000
save_loss_frequency = 100
batch_size = 16
update_frequency = 128
eval_frequency = 512
steps_per_epoch = 16
eval_steps = 100
initial_samples = 1000
n_evals = 5
eval_threshold = 400
polyak = 0.95

In [46]:
def mulpiply_weights(model: tf.keras.Model, target_model: tf.keras.Model, number: float | int) -> list[np.ndarray]:
    return [number * target_weights + (1. - number) * model_weights for target_weights, model_weights in zip(target_model.get_weights(), model.get_weights())]

In [47]:
obs, _ = env.reset()
policy_model(obs[None]).numpy()[0]

array([ 5.4892048e-04, -1.4880134e-05,  1.4698583e-04,  1.5841752e-04],
      dtype=float32)

In [51]:
play_game(policy_model, train_buffer, env, initial_samples)

array([ 4.4940582e-01,  6.0636908e-02,  8.0988199e-02, -2.9491073e-02,
        1.7840590e-01, -5.3677410e-01, -6.2572265e-01,  3.7123761e-04,
        1.0000000e+00,  8.4264085e-02, -5.3664267e-01, -6.2612450e-01,
       -6.4955157e-04,  1.0000000e+00,  3.4989437e-01,  3.5386795e-01,
        3.6625251e-01,  3.8857853e-01,  4.2394194e-01,  4.7820106e-01,
        5.6288409e-01,  7.0320487e-01,  9.7184026e-01,  1.0000000e+00],
      dtype=float32)

In [50]:
q_losses = []
p_losses = []
total_q_loss = 0
total_p_loss = 0
eval_score = 0
all_q_loss_saver = []
all_p_loss_saver = []

s, _ = env.reset()
pbar = tqdm.trange(epochs)
for i in pbar:
    
    s = play_game(policy_model, train_buffer, env, steps_per_epoch, observation=s) # Select action, play and store in buffer
    
    vals = train_buffer.sample(batch_size) # Randomly sample a batch of transitions

    critic_model_weights = [v.value for v in critic_model.trainable_weights]
    policy_model_weights = [v.value for v in policy_model.trainable_weights]

    with tf.GradientTape(watch_accessed_variables=False) as q_g, tf.GradientTape(watch_accessed_variables=False) as policy_g:
        q_g.watch(critic_model_weights)
        policy_g.watch(policy_model_weights)
        q_loss, policy_loss = ddpg_loss(*vals, q_model=critic_model, policy_model=policy_model, target_q_model=target_critic_model, target_policy_model=target_policy_model, gamma=0.99) # MSBE and mean score from Policy

    q_gradient = q_g.gradient(q_loss, critic_model.trainable_weights)
    print(policy_loss)
    p_gradient = policy_g.gradient(policy_loss, policy_model.trainable_weights)
    optimizer.apply_gradients(zip(q_gradient, critic_model.trainable_weights))
    policy_optimizer.apply_gradients(zip(p_gradient, policy_model.trainable_weights))
    
        
    q_losses.append(q_loss.numpy())
    p_losses.append(policy_loss.numpy())
    
    total_q_loss += q_losses[-1]
    total_p_loss += p_losses[-1]

    if (i + 1) % update_frequency == 0:
        target_model.set_weights(mulpiply_weights(model, target_model, polyak))
        target_policy_model.set_weights(mulpiply_weights(policy_model, target_policy_model, polyak))

    if (i + 1) % eval_frequency == 0:
        eval_score = 0

        for i in range(n_evals):
            eval_buffer.clear()
            play_game(policy_model, eval_buffer, eval_env, eval_steps)
            eval_score += eval_buffer.reward[:len(eval_buffer)].sum()

        eval_score /= n_evals
        if eval_score >= eval_threshold:
            break
    if (i + 1) % save_loss_frequency == 0:
        all_q_loss_saver.append(total_q_loss / (i + 1))
        all_p_loss_saver.append(total_p_loss / (i + 1))

    pbar.set_description(f'Qloss: {q_losses[-1]:.5f}; AllQloss: {total_q_loss / (i + 1):.5f}; Ploss: {p_losses[-1]:.5f}; AllPloss: {total_p_loss / (i + 1):.5f}; E: {eval_score:.5f}')


  0%|          | 0/10000 [00:00<?, ?it/s]

tf.Tensor(-0.0017272164, shape=(), dtype=float32)


ValueError: No gradients provided for any variable.

# Testing

Test the model on the environment and get a cool video