In [None]:
!apt-get install -y xvfb python-opengl ffmpeg > /dev/null 2>&1
!pip install -U colabgymrender

In [None]:
import os
import tensorflow.python.keras as keras
from tensorflow.python.keras.layers import Dense

class ActorCriticNetwork(keras.Model):
    def __init__(self, actions, Layers = [1024, 512], name = "actor-critic", chkpt_dir = "tmp/actor_critic"):
        super(ActorCriticNetwork, self).__init__()
        self.Layer1_dim = Layers[0]
        self.Layer2_dim = Layers[1]
        self.actions = actions
        self.model_name = name
        self.checkpoint_dir = chkpt_dir
        self.file = os.path.join(self.checkpoint_dir, name+"ac")

        self.Layer1 = Dense(self.Layer1_dim, activation= 'relu')
        self.Layer2 = Dense(self.Layer2_dim, activation= 'relu')
        self.value = Dense(1, activation= None)
        self.policy = Dense(actions, activation="softmax")

    def call(self, state):
        value = self.Layer1(state)
        value = self.Layer2(value)
        v = self.value(value)
        pi = self.policy(value)
        return v, pi

In [None]:
import numpy as np
from numpy import float32
import tensorflow as tf
import tensorflow.python.keras as keras
import keras
# from keras.optimizers import Adam
import tensorflow_probability as tfp
# from networks import ActorCriticNetwork

class Agent:
    def __init__(self, alpha = 0.0003, gamma = 0.99, num_actions = 2):
        self.gamma = gamma
        self.num_actions = num_actions
        self.action = None
        self.action_space = [i for i in range(self.num_actions)]
        self.actor_critic = ActorCriticNetwork(num_actions)
        # opt = keras.optimizers.Adam(learning_rate=alpha)
        self.actor_critic.compile(optimizer= 'adam')

    def choose_action(self, observation):
        # print([observation].shape)

        state = tf.convert_to_tensor(np.asarray([observation], dtype=float32))
        _, probs = self.actor_critic(state)
        action_probability = tfp.distributions.Categorical(probs)
        actions = action_probability.sample()
        self.actions = actions

        return self.actions.numpy()[0]

    def save_model(self):
        print("....saving model....")
        self.actor_critic.save_weights(self.actor_critic.checkpoint_dir)

    def load_model(self):
        print("...loading model...")
        self.actor_critic.load_weights(self.actor_critic.checkpoint_dir)

    def learn(self, state, reward, state_next, done):
        state = tf.convert_to_tensor([state], dtype=float32)
        state_next = tf.convert_to_tensor([state_next], dtype=float32)
        reward = tf.convert_to_tensor([reward], dtype=float32)

        with tf.GradientTape() as tape:
            state_value, probs = self.actor_critic(state)
            state_value_next, _ = self.actor_critic(state_next)
            state_value = tf.squeeze(state_value)
            state_value_next = tf.squeeze(state_value_next)

            action_probs = tfp.distributions.Categorical(probs)
            log_prob = action_probs.log_prob(self.actions)

            delta = reward + self.gamma*state_value_next*(1 - int(done)) - state_value
            actor_loss = -log_prob * delta
            critic_loss = delta**2

            total_loss = actor_loss + critic_loss

        gradient = tape.gradient(total_loss, self.actor_critic.trainable_variables)
        self.actor_critic.optimizer.apply_gradients(zip(gradient, self.actor_critic.trainable_variables))



In [None]:
import gym
import numpy as np
# from actor_critic import Agent
import matplotlib.pyplot as plt

import gym
from colabgymrender.recorder import Recorder

env = gym.make('CartPole-v1', render_mode="human")
agent = Agent(alpha=1e-3, num_actions=2)
num_games = 2000

filename = "cartpole.png"
figure_file = 'plots/' + filename
score_history = []
load_checkpoint = False
best_score = 0

if load_checkpoint:
    agent.load_model()

# env = Recorder(env, "cartpole")


for i in range(num_games):
    print("game num ", i)
    observation = env.reset()
    done = False
    score = 0
    while not done:
        action = agent.choose_action(observation)
        (observation_next, reward, done, info) = env.step(action)
        score += reward
        if not load_checkpoint:
            # print(observation)
            agent.learn(observation, reward, observation_next, done)
            observation = observation_next
    score_history.append(score)
    # avg_score = np.mean(score_history[-100:])
    if score > best_score:
        best_score = score
        if not load_checkpoint:
            agent.save_model()

x = [i+1 for i in range(num_games)]
plt.plot(score_history)

In [None]:
env = Recorder(env, "cartpole")
observation = env.reset()
done = False
score = 0
while not done:
    action = agent.choose_action(observation)
    (observation_next, reward, done, info) = env.step(action)
    score += reward
    if not load_checkpoint:
        # print(observation)
        agent.learn(observation, reward, observation_next, done)
        observation = observation_next