In [None]:
from google.colab import drive
import os
drive.mount('/content/drive')
os.chdir("/content/drive/My Drive/Colab Notebooks")

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np
import gym
import scipy.signal

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
eps = np.finfo(np.float32).eps.item()
max_steps = 4000

env = gym.make('CartPole-v1')
observation_dimensions = env.observation_space.shape[0]
num_actions = env.action_space.n

seed = 543

  "Initializing wrapper in old step API which returns one bool instead of two. It is recommended to set `new_step_api=True` to use new step API. This will be the default behaviour in future."
  "Initializing environment in old step API which returns one bool instead of two. It is recommended to set `new_step_api=True` to use new step API. This will be the default behaviour in future."


In [None]:
def discounted_cumulative_sums(x, discount):
    '''
    :param array(n) x: reward
    :param float discount
    '''
    return scipy.signal.lfilter([1], [1, float(-discount)], x[::-1], axis=0)[::-1]

In [None]:
class Buffer:
    def __init__(self, observation_dimensions, buffer_size, gamma=0.99):
        self.observation_buffer = np.zeros(
            (buffer_size, observation_dimensions), dtype=np.float32
        )
        self.action_buffer = np.zeros(buffer_size, dtype=np.int32)
        self.reward_buffer = np.zeros(buffer_size, dtype=np.float32)
        self.advantage_buffer = np.zeros(buffer_size, dtype=np.float32)
        self.gamma = gamma
        self.pointer, self.trajectory_start_index = 0, 0

    def store(self, observation, action, reward):
        self.observation_buffer[self.pointer] = observation
        self.action_buffer[self.pointer] = action
        self.reward_buffer[self.pointer] = reward
        self.pointer += 1

    def finish_trajectory(self):
        path_slice = slice(self.trajectory_start_index, self.pointer)

        self.advantage_buffer[path_slice] = discounted_cumulative_sums(
            self.reward_buffer[path_slice], self.gamma
        )
        advantage_mean, advantage_std = self.advantage_buffer[path_slice].mean(), self.advantage_buffer[path_slice].std()
        self.advantage_buffer[path_slice] = (self.advantage_buffer[path_slice] - advantage_mean) / (advantage_std + eps)
        self.trajectory_start_index = self.pointer

    def get(self):
        return (
            self.observation_buffer[:self.trajectory_start_index],
            self.action_buffer[:self.trajectory_start_index],
            self.advantage_buffer[:self.trajectory_start_index],
        )
    
    def clear(self):
        self.pointer, self.trajectory_start_index = 0, 0

buffer = Buffer(observation_dimensions, max_steps)

In [None]:
def mlp(x, sizes, activation='relu', output_activation=None):
    for size in sizes[:-1]:
        x = layers.Dense(units=size, activation=activation, kernel_initializer=keras.initializers.HeUniform(), bias_initializer=keras.initializers.HeUniform())(x)
        x = layers.Dropout(0.6)(x)
    return layers.Dense(units=sizes[-1], activation=output_activation)(x)

In [None]:
def logprobabilities(logits, actions):
    '''
    :param array(n, num_actions) logits: model output
    :param array(n) actions
    :return array(n) logprobability
    '''
    logprobabilities_all = tf.nn.log_softmax(logits)
    # logprobabilities_all = tf.nn.softmax(logits)
    logprobability = tf.reduce_sum(
        tf.one_hot(actions, num_actions) * logprobabilities_all, axis=1
    )
    return logprobability

In [None]:
@tf.function
def sample_action(observation):
    '''
    :param array(4) observation
    :return int action
    '''
    logits = model(tf.expand_dims(observation, axis=0))
    action = tf.squeeze(tf.random.categorical(logits, 1), axis=1)
    return action[0]

In [None]:
def train_model(observation_buffer, action_buffer, advantage_buffer, n_trajectory):
    '''
    :param array(n, observation_dimensions) observation_buffer
    :param array(n) action_buffer
    :param array(n) advantage_buffer
    '''
    with tf.GradientTape() as tape:
        loss = -tf.reduce_sum(
            logprobabilities(model(observation_buffer), action_buffer) * advantage_buffer
        ) / n_trajectory
    
    grads = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(grads, model.trainable_variables))

    return -loss

In [None]:
def train(epochs=1000, n_trajectory_per_epoch=3, max_perfect_count=10000):
    env.seed(seed=seed)
    mean_reward = np.zeros(epochs)
    cnt = 0
    for epoch in range(epochs):
        observation = env.reset()
        buffer.clear()
        sum_reward = 0
        n_trajectory = 0

        while n_trajectory < n_trajectory_per_epoch:
            action = int(sample_action(observation))
            observation_new, reward, done, _ = env.step(action)
            sum_reward += reward

            buffer.store(observation, action, reward)

            observation = observation_new

            if done:
                n_trajectory += 1
                buffer.finish_trajectory()
                observation = env.reset()

        (
            observation_buffer,
            action_buffer,
            advantage_buffer,
        ) = buffer.get()

        mean_reward[epoch] = sum_reward / n_trajectory
        if epoch % 10 == 0:
            print(
                f"Epoch: {epoch}. Mean Reward: {mean_reward[epoch]}"
            )

        if mean_reward[epoch] > env.spec.reward_threshold:
            cnt += 1
            if cnt > max_perfect_count:
                break

        loss = train_model(observation_buffer, action_buffer, advantage_buffer, n_trajectory)
        if epoch % 10 == 0:
            print(f"Loss: {loss.numpy()}")
        
    return mean_reward

In [None]:
epoch = 1000
keras.backend.clear_session()
np.random.seed(seed=seed)
tf.random.set_seed(seed=seed)

observation_input = keras.Input(shape=(observation_dimensions,), dtype=tf.float32)
logits = mlp(observation_input, [128, num_actions])
model = keras.Model(inputs=observation_input, outputs=logits)

optimizer = keras.optimizers.Adam()

mean_reward = train(n_trajectory_per_epoch=3)

In [None]:
import matplotlib.pyplot as plt

plt.figure(figsize=(15, 10))

plt.plot(np.arange(0, epoch), mean_reward[:epoch], label='One step per epoch')

plt.xlabel('Epoch', fontsize=20) 
plt.ylabel('Mean Reward', fontsize=20)
plt.title("Training Result", fontsize=20)

plt.ylim(0, 510)
plt.yticks([0, np.max(mean_reward), env.spec.reward_threshold])
plt.grid(axis='y')
plt.legend()

plt.show()

In [None]:
def evaluate(env):
    current_state = env.reset()
    sum_reward = 0
    while 1:
        action = int(sample_action(current_state))
        next_state, reward, done, info = env.step(action)
        sum_reward += reward
        current_state = next_state
        if done:
            break

    return sum_reward

In [None]:
evaluate(env)

In [None]:
from gym.wrappers import RecordVideo
record_env = RecordVideo(env, './video')
print(evaluate(record_env))
record_env.close()