# Imports

In [1]:
import numpy as np
import gymnasium as gym
import tensorflow as tf
import tqdm.notebook as tqdm
import matplotlib.pyplot as plt


from gymnasium.utils.save_video import save_video

  np.bool8: (False, True),
  np.bool8: (False, True),


In [2]:
gpus = tf.config.list_physical_devices('GPU')
print(gpus)
if gpus:
    try:
        tf.config.experimental.set_memory_growth(gpus[0], True)
        print('GPU enable')
    except Exception as e:
        print(e)

[PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]
GPU enable


# Environment

Create the [environment](https://gymnasium.farama.org/environments/box2d/bipedal_walker/).

In [4]:
env = gym.make('BipedalWalker-v3', hardcore=False)
eval_env = gym.make('BipedalWalker-v3', hardcore=False)

# Replay Buffer

Create a replay buffer to hold game history

In [10]:
class ReplayBuffer:

    def __init__(self, max_size: int, observation_space: gym.spaces.Space, action_space: gym.spaces.Space, seed: int | None = None):
        """Stores the replay history with a maximum of `max_size` entries, removing old entries as needed.

        Parameters:
            max_size: maximal number of entries to keep
            observation_space: specification of the observation space
            action_space: specification of the action space
            seed: seed to initialize the internal random number generator for reproducibility"""
        self.max_size = max_size
        self.done = np.zeros(max_size)
        self.step = 0
        self.rng = np.random.default_rng(seed=seed)
        self.len = 0

        self.current_state = np.zeros((max_size, *observation_space.shape))
        self.action = np.zeros((max_size, *action_space.shape), dtype=int)
        self.reward = np.zeros(max_size)
        self.next_state = np.zeros((max_size, *observation_space.shape))
        
    def add(self, current_observation: np.ndarray, action: np.ndarray, reward: float, next_observation: np.ndarray, done: bool) -> None:
        """Add a new entry to the buffer.

        Parameters:
            current_observation: environment state observed at the current step
            action: action taken by the model
            reward: reward received after taking the action
            next_observation: environment state obversed after taking the action
            done: whether the episode has ended or not"""
        self.current_state[self.step] = current_observation
        self.action[self.step] = action
        self.reward[self.step] = reward
        self.next_state[self.step] = next_observation
        self.done[self.step] = done
        self.step = (self.step + 1) % self.max_size
        self.len = min(self.len + 1, self.max_size)
        
    def sample(self, n_samples: int, replace: bool = True) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
        """Randomly samples `n_samples` from the buffer.

        Parameters:
            n_samples: number of samples to select
            replace: sample with or without replacement

        Returns:
            current observations, actions, rewards, next observations, done"""
        indicies = self.rng.choice(self.len, size=n_samples, replace=replace)
        return (
            self.current_state[indicies], 
            self.action[indicies], 
            self.reward[indicies], 
            self.next_state[indicies], 
            self.done[indicies]
        )

    def clear(self) -> None:
        """Clears the buffer"""
        self.step = self.len = 0

    def __getitem__(self, index: int) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
        """Gets a sample at `index`

        Parameters:
            index: index of the sample to get

        Returns:
            current observation, action, reward, next observation, done"""
        return (
            self.current_state[index], 
            self.action[index], 
            self.reward[index], 
            self.next_state[index], 
            self.done[index]
        )
        
    def __len__(self) -> int:
        """Returns the number of entries in the buffer"""
        return self.len

# Model

Implement your model

In [11]:
def get_name(prefix: str | None = None, suffix: str | None = None, separator: str = '/') -> str | None:
    return prefix and prefix + separator + suffix or suffix or None

In [12]:
def get_model(
    input_features: tuple | int, 
    features: int,
    out_features: tuple | int,
    blocks: int, 
    activation: str | tf.keras.layers.Activation | None = 'silu',
    dropout: float = 0.,
    multiply_freq: int = 1,
    name: str | None = None
) -> tf.keras.Model:
    inputs = x = tf.keras.layers.Input((input_features, ), name=get_name(name, 'input'))

    for i in range(blocks):
        x = tf.keras.layers.Dense(features, activation=activation, name=get_name(name, f'dense_{i}'))(x)
        if dropout > 0:
            x = tf.keras.layers.Dropout(dropout, name=get_name(name, f'dropout_{i}'))(x)

        if multiply_freq > 0 and (i + 1) % multiply_freq == 0:
            features *= 2

    x = tf.keras.layers.Dense(out_features, name=get_name(name, 'prediction'))(x)
    return tf.keras.Model(inputs=inputs, outputs=x, name=name)

# Play the game

Implement interacting with the environment and storing entries to the replay buffer

In [13]:
def play_game(model: tf.keras.Model, buffer: ReplayBuffer | None, env: gym.Env, max_steps: int, observation: np.ndarray | None = None) -> np.ndarray:
    """Play game and record

    Parameters:
        model: the model to get actions with
        buffer: replay buffer to store the entries to
        env: environment to play
        max_steps: maximal number of steps to perform
        observation: the observation to resume from

    Returns:
        the last observation"""
    if observation is None:
        observation, _ = env.reset()

    buffer = buffer if buffer is not None else ReplayBuffer(1)

    for i in range(max_steps):
        a = model(observation[None], training=False).numpy()[0]
        
        new_observation, score, done, terminated, _ = env.step(a)
        
        buffer.add(observation, a, score, new_observation, done)

        if done or terminated:
            observation, _ = env.reset()
            continue
            
        observation = new_observation

    return observation

# Loss

In [None]:
def ddpg_loss(
    current_observation: tf.Tensor, 
    action: tf.Tensor, 
    reward: tf.Tensor, 
    next_observation: tf.Tensor,
    done: tf.Tensor,
    q_model: tf.keras.Model,
    policy_model: tf.keras.Model,
    target_q_model: tf.keras.Model,
    target_policy_model: tf.keras.Model,
    gamma: float
) -> tuple[tf.Tensor, tf.Tensor]:
    """Computes Deep Deterministic Policy Gradient.

    Parameters:
        current_observation: observations at the current time step
        action: actions taken at the current time step
        reward: rewards at the current time step
        next_observation: observations at the next time step
        done: whether the episode has ended or not
        q_model: q-function model
        policy_model: action prediction model
        target_q_model: target q-function model
        target_policy_model: target action prediction model
        gamma: discount

    Returns:
        Computed losses for q-function and policy models"""
    q_current = q_model(current_observation)
    q_next = target_q_model(next_observation)

    a_next = tf.argmax(q_model(next_observation), axis=-1)
    
    q_ref = reward + gamma * tf.reshape(tf.gather(q_next, tf.expand_dims(a_next, axis=-1), batch_dims=1), (-1, )) * (1. - done) # Оценка от таргет модели предсказанных действий основной моделью
    
    q = tf.reshape(tf.gather(q_current, tf.expand_dims(action, axis=-1), batch_dims=1), (-1, )) # Оценка от основной модели предсказанных действий

    q_loss = tf.math.reduce_mean(tf.square(q_ref - q))

    policy_loss = tf.math.reduce_mean(-)

    return q_loss, policy_loss

# Training

Create models, replay buffers, optimizer. Implement training loop, show training progress and perform model evaluation once in a while

In [None]:
model = get_model(24, 16, 4, 12, name='nogi', dropout=0.1, multiply_freq=2, activation='swish')
model.summary()

In [None]:
target_model = get_model(24, 16, 4, 12, name='target_nogi', multiply_freq=2, activation='swish')
target_model.trainable = False
target_model.set_weights(model.get_weights())

In [None]:
policy_model = get_model(24, 16, 4, 8, name='policy_model', multiply_freq=2) # Предсказывает действие по состоянию среды
policy_model.summary()

In [None]:
target_policy_model = get_model(24, 16, 4, 8, name='target_policy_model', multiply_freq=2)
target_policy_model.trainable = False
target_policy_model.set_weights(policy_model.get_weights())

In [None]:
train_buffer = ReplayBuffer(10000, observation_space=env.observation_space, action_space=env.action_space)

In [None]:
eval_buffer = ReplayBuffer(100, observation_space=eval_env.observation_space, action_space=eval_env.action_space)

In [None]:
optimizer = tf.keras.optimizers.Adam(1e-4, clipnorm=5, decay=2e-5)

In [None]:
epochs = 20000
batch_size = 256
decay_epochs = epochs // 2
end_epsilon = 0.1
update_frequency = 512
eval_frequency = 512
steps_per_epoch = 32
eval_steps = 1000
initial_samples = 1000
n_evals = 5
eval_threshold = 200
polyak = 0.95

In [None]:
q_losses = []
policy_losses = []
total_q_loss = 0
total_policy_loss = 0
eval_score = 0

s, _ = env.reset()
pbar = tqdm.trange(epochs)
for i in pbar:
    
    s = play_game(model, train_buffer, env, steps_per_epoch, observation=s)
    
    vals = train_buffer.sample(batch_size)
    with tf.GradientTape(watch_accessed_variables=False) as q_g:
        q_g.watch(model.trainable_weights)

    with tf.GradientTape(watch_accessed_variables=False) as p_g:
        p_g.watch(policy_model.trainable_weights)
    
    q_loss, policy_loss = ddpg_loss(*vals, model, target_model, policy_model, target_policy_model, 0.99)
        
    q_gradient = q_g.gradient(q_loss, model.trainable_weights)
    optimizer.apply_gradients(zip(q_gradient, model.trainable_weights))

    p_gradient = p_g.gradient(policy_loss, policy_model.trainable_weights)
    optimizer.apply_gradients(zip(p_gradient, policy_model.trainable_weights))
    
    q_losses.append(q_loss.numpy())
    policy_losses.append(policy_loss.numpy())
    
    total_q_loss += q_losses[-1]
    total_policy_loss += policy_losses[-1]

    if (i + 1) % update_frequency == 0:
        target_model.set_weights(polyak * target_model.get_weights() + (1. - polyak) * model.get_weights())
        target_policy_model.set_weights(polyak * target_policy_model.get_weights() + (1. - polyak) * policy_model.get_weights())

    if (i + 1) % eval_frequency == 0:
        eval_score = 0

        for i in range(n_evals):
            eval_buffer.clear()
            play_game(model, eval_buffer, eval_env, eval_steps)
            eval_score += eval_buffer.reward[:len(eval_buffer)].sum()

        eval_score /= n_evals
        if eval_score >= eval_threshold:
            break

    pbar.set_description(f'Qloss: {q_losses[-1]:.5f}; AllQloss: {total_q_loss / (i + 1):.5f}; Ploss: {policy_loss[-1]:.5f}; AllPloss: {total_policy_loss / (i + 1):.5f}; E: {eval_score:.5f}')

In [None]:
model.save_weights(f'./models/nogi_model_{eval_score}')

# Testing

Test the model on the environment and get a cool video

In [None]:
def save_gameplay(model: tf.keras.Model, render_mode: str = 'human', n_frames: int = 1000, buffer_capacity: int = 1000):
    env = gym.make('ALE/AirRaid-v5', render_mode=render_mode)
    buffer = ReplayBuffer(buffer_capacity, env.observation_space, env.action_space)
    play_game(model, buffer, env, n_frames)
    # save_video(env.render(), './videos', durations=[1] * len(), fps=24) if you wanna use this line change 'render_mode' -> 'rgb_array_list'
    return buffer

In [None]:
model.load_weights('./models/atari_model')

In [None]:
buffer = save_gameplay(model)

In [None]:
buffer.reward.sum()