# Imports

In [1]:
import numpy as np
import gymnasium as gym
import tensorflow as tf
import tqdm.notebook as tqdm
import matplotlib.pyplot as plt

from gymnasium.utils.save_video import save_video

In [2]:
gpus = tf.config.list_physical_devices('GPU')
print(gpus)
if gpus:
    try:
        tf.config.experimental.set_memory_growth(gpus[0], True)
        print('GPU enable')
    except Exception as e:
        print(e)

[PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]
GPU enable


# Environment

Create the environment. You can use any ATARI environment from [here](https://gymnasium.farama.org/environments/atari/), but prefer to use environments with discrete action space with fewer actions.

In [3]:
env = gym.make('ALE/Asterix-v5', render_mode='rgb_array')
eval_env = gym.make('ALE/Asterix-v5')
env.reset()

(array([[[0, 0, 0],
         [0, 0, 0],
         [0, 0, 0],
         ...,
         [0, 0, 0],
         [0, 0, 0],
         [0, 0, 0]],
 
        [[0, 0, 0],
         [0, 0, 0],
         [0, 0, 0],
         ...,
         [0, 0, 0],
         [0, 0, 0],
         [0, 0, 0]],
 
        [[0, 0, 0],
         [0, 0, 0],
         [0, 0, 0],
         ...,
         [0, 0, 0],
         [0, 0, 0],
         [0, 0, 0]],
 
        ...,
 
        [[0, 0, 0],
         [0, 0, 0],
         [0, 0, 0],
         ...,
         [0, 0, 0],
         [0, 0, 0],
         [0, 0, 0]],
 
        [[0, 0, 0],
         [0, 0, 0],
         [0, 0, 0],
         ...,
         [0, 0, 0],
         [0, 0, 0],
         [0, 0, 0]],
 
        [[0, 0, 0],
         [0, 0, 0],
         [0, 0, 0],
         ...,
         [0, 0, 0],
         [0, 0, 0],
         [0, 0, 0]]], dtype=uint8),
 {'lives': 3, 'episode_frame_number': 0, 'frame_number': 0})


# Replay Buffer

Create a replay buffer to hold game history

In [4]:
class ReplayBuffer:

    def __init__(self, max_size: int, observation_space: gym.spaces.Space, action_space: gym.spaces.Space, seed: int | None = None):
        """Stores the replay history with a maximum of `max_size` entries, removing old entries as needed.

        Parameters:
            max_size: maximal number of entries to keep
            observation_space: specification of the observation space
            action_space: specification of the action space
            seed: seed to initialize the internal random number generator for reproducibility"""

        self.max_size = max_size
        self.done = np.zeros(max_size)
        self.step = 0
        self.rng = np.random.default_rng(seed=seed)
        self.len = 0

        self.current_state = np.zeros((max_size, *observation_space.shape))
        self.next_state = np.zeros((max_size, *observation_space.shape))
        self.action = np.zeros((max_size, *action_space.shape), dtype=int)
        self.reward = np.zeros(max_size)
        
    def add(self, current_observation: np.ndarray, action: int, reward: float, next_observation: np.ndarray, done: bool) -> None:
        """Add a new entry to the buffer.

        Parameters:
            current_observation: environment state observed at the current step
            action: action taken by the model
            reward: reward received after taking the action
            next_observation: environment state obversed after taking the action
            done: whether the episode has ended or not"""

        self.current_state[self.step] = current_observation
        self.action[self.step] = action
        self.reward[self.step] = reward
        self.next_state[self.step] = next_observation
        self.done[self.step] = done
        self.step = (self.step + 1) % self.max_size
        self.len = min(self.len + 1, self.max_size)
        
    def sample(self, n_samples: int, replace: bool = True) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
        """Randomly samples `n_samples` from the buffer.

        Parameters:
            n_samples: number of samples to select
            replace: sample with or without replacement

        Returns:
            current observations, actions, rewards, next observations, done"""

        indicies = self.rng.choice(self.len, size=n_samples, replace=replace)
        return self.current_state[indicies], self.action[indicies], self.reward[indicies], self.next_state[indicies], self.done[indicies]

    def clear(self) -> None:
        """Clears the buffer"""

        self.step = self.len = 0

    def __getitem__(self, index: int) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
        """Gets a sample at `index`

        Parameters:
            index: index of the sample to get

        Returns:
            current observation, action, reward, next observation, done"""

        return self.current_state[index], self.action[index], self.reward[index], self.next_state[index], self.done[index]
        
    def __len__(self) -> int:
        """Returns the number of entries in the buffer"""

        return self.len

# Model

Implement your model. Most if not all ATARI environments have an image observation

In [5]:
def get_model(
    input_features: tuple | int, 
    features: int,
    out_features: tuple | int,
    blocks: int, 
    dropout: float = 0.2,
    multiply_freq: int = 1,
    name: str | None = None
) -> tf.keras.Model:

    inputs = x = tf.keras.layers.Input(input_features, name='Input')

    for i in range(blocks):
        x = tf.keras.layers.Conv2D(features, 3, padding='same', name=f'Conv2D_{i}')(x)
        x = tf.keras.layers.PReLU(name=f'PReLU_{i}')(x)
        x = tf.keras.layers.MaxPool2D((2, 2), name=f'MaxPool2D_{i}')(x)
        x = tf.keras.layers.Dropout(dropout, name=f'Dropout_{i}')(x)
        
        if multiply_freq > 0 and (i + 1) % multiply_freq == 0:
            features *= 2
        

    x = tf.keras.layers.Flatten(name='Flatten')(x)
    x = tf.keras.layers.Dense(out_features, name='Predict')(x)
    return tf.keras.Model(inputs=inputs, outputs=x, name=name)

# Sampler

Implement the sampler

In [6]:
class Sampler:
    
    def __init__(self, epsilon: float, seed: int | None = None):
        """Selects a random action with probability `epsilon` otherwise selects the most probably action given by the model.

        Parameters:
            epsilon: the probability to select a random action
            seed: seed to initialize the internal random number generator for reproducibility"""

        self.rng = np.random.default_rng(seed=seed)
        self.epsilon = epsilon
        
    def __call__(self, probabilities: np.ndarray) -> int:
        """Select an action given the `probabilities

        Parameters:
            probabilities: probabilities for each action

        Returns:
            index of the selected action"""

        if self.rng.random() < self.epsilon:
            return self.rng.integers(probabilities.shape[-1])
        return np.argmax(probabilities)

# Play the game

Implement interacting with the environment and storing entries to the replay buffer

In [7]:
def play_game(model: tf.keras.Model, buffer: ReplayBuffer | None, env: gym.Env, max_steps: int, sampler: Sampler, observation: np.ndarray | None = None) -> np.ndarray:
    """Play game and record

    Parameters:
        model: the model to get actions with
        buffer: replay buffer to store the entries to
        env: environment to play
        max_steps: maximal number of steps to perform
        sampler: sampler to use to sample actions
        observation: the observation to resume from

    Returns:
        the last observation"""

    if observation is None:
        observation, _ = env.reset()

    buffer = buffer if buffer is not None else ReplayBuffer(1)

    for _ in range(max_steps):
        a = sampler(model(observation[None], training=False).numpy()[0])
        
        new_observation, score, done, terminated, _ = env.step(a)
        
        buffer.add(observation, a, score, new_observation, done)

        if done or terminated:
            observation, _ = env.reset()
            continue
            
        observation = new_observation

    return observation

# Loss

Implement double q learning loss

In [8]:
def qq_loss(
    current_observation: tf.Tensor, 
    action: tf.Tensor, 
    reward: tf.Tensor, 
    next_observation: tf.Tensor, 
    done: tf.Tensor,
    model: tf.keras.Model,
    target_model: tf.keras.Model,
    gamma: float
) -> tf.Tensor:
    """Computes double q learning loss.

    Parameters:
        current_observation: observations at the current time step
        action: actions taken at the current time step
        reward: rewards at the current time step
        next_observation: observations at the next time step
        done: whether the episode has ended or not
        model: trainig model
        target_model: target model
        gamma: discount

    Returns:
        Computed loss"""

    q_current = model(current_observation)
    q_next = target_model(next_observation)

    a_next = tf.argmax(model(next_observation), axis=-1)
    
    q_ref = reward + gamma * (1. - done) * tf.reshape(tf.gather(q_next, tf.expand_dims(a_next, axis=-1), batch_dims=1), (-1, ))
    
    q = tf.reshape(tf.gather(q_current, tf.expand_dims(action, axis=-1), batch_dims=1), (-1, )) # Оценка от основной модели предсказанных действий

    return tf.math.reduce_mean(tf.square(q_ref - q))

# Training

Create models, replay buffers, sampler, optimizer, epsilon decay etc. Implement training loop, show training progress and perform model evaluation once in a while

In [9]:
model = get_model(env.observation_space.shape, 8, 9, 3, name='Ladders', multiply_freq=2)

In [10]:
model = get_model(env.observation_space.shape, 8, 9, 3, name='Ladders', multiply_freq=2)

In [None]:
model.summary()

In [None]:
model.summary()

In [None]:
target_model = get_model(env.observation_space.shape, 8, 9, 3, name='target_model', multiply_freq=2)

In [None]:
target_model.trainable = False
target_model.set_weights(model.get_weights())

In [None]:
train_buffer = ReplayBuffer(10000, observation_space=env.observation_space, action_space=env.action_space)
train_sampler = Sampler(1)

In [None]:
eval_buffer = ReplayBuffer(100, observation_space=eval_env.observation_space, action_space=eval_env.action_space)
eval_sampler = Sampler(0)

In [None]:
optimizer = tf.keras.optimizers.Adam(0.0001, clipnorm=5, weight_decay=2e-5)

In [None]:
epochs = 10000
batch_size = 64
decay_epochs = epochs // 2
end_epsilon = 0.1
update_frequency = 512
eval_frequency = 512
steps_per_epoch = 32
eval_steps = 1000
initial_samples = 1000
n_evals = 5
eval_threshold = 1000
epsilon_decay = tf.keras.optimizers.schedules.PolynomialDecay(1., decay_epochs, end_learning_rate=end_epsilon)

In [None]:
losses = []
total_loss = 0
eval_score = 0
max_score = 0

s, _ = env.reset()
pbar = tqdm.trange(epochs)
for i in pbar:
    train_sampler.epsilon = epsilon_decay(i).numpy()
    
    s = play_game(model, train_buffer, env, steps_per_epoch, train_sampler, observation=s)
    
    vals = train_buffer.sample(batch_size)
    with tf.GradientTape(watch_accessed_variables=False) as g:
        g.watch(model.trainable_weights)
        loss = qq_loss(*vals, model, target_model, 0.99)
        
    gradient = g.gradient(loss, model.trainable_weights)
    optimizer.apply_gradients(zip(gradient, model.trainable_weights))
    
    losses.append(loss.numpy())
    total_loss += losses[-1]

    if (i + 1) % update_frequency == 0:
        target_model.set_weights(model.get_weights())

    if (i + 1) % eval_frequency == 0:
        eval_score = 0

        for i in range(n_evals):
            eval_buffer.clear()
            play_game(model, eval_buffer, eval_env, eval_steps, eval_sampler)
            eval_score += eval_buffer.reward[:len(eval_buffer)].sum()

        eval_score /= n_evals
        if (int(eval_score) >= (int(max_score) + 100)):
            max_score = eval_score
            model.save_weights(f'models/Atari/model_weights_{int(eval_score)}.h5')
 
        if eval_score >= eval_threshold:
            break

    pbar.set_description(f'L: {losses[-1]:.5f}; AL: {total_loss / (i + 1):.5f}; E: {eval_score:.5f}')

In [None]:
eval_score

In [None]:
model.save_weights(f'./models/Atari/model_weights_end.h5')

# Testing

Test the model on the environment and get a cool video

In [11]:
def save_gameplay(model: tf.keras.Model, render_mode: str = 'human', n_frames: int = 1000, buffer_capacity: int = 1000):
    env = gym.make('ALE/Asterix-v5', render_mode=render_mode)
    buffer = ReplayBuffer(buffer_capacity, env.observation_space, env.action_space)
    play_game(model, buffer, env, n_frames, Sampler(0))

    if render_mode == 'rgb_array_list':
        save_video(env.render(), './videos', durations=[1] * len(), fps=24) 
    
    return buffer

In [12]:
model.load_weights(f'./models/Atari/model_weights_end.h5')

In [13]:
buffer = save_gameplay(model)

KeyboardInterrupt: 

: 

In [None]:
buffer.reward.sum()

In [None]:
buffer = save_gameplay(model, render_mode='rgb_array_list')