In [5]:
import gym
import random
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
from tensorflow.keras import layers
from collections import deque

In [6]:
problem = 'Pendulum-v0'
env = gym.make(problem)

num_states = env.observation_space.shape
num_actions = env.action_space.shape

upper_bound = env.action_space.high[0]
lower_bound = env.action_space.low[0]

print(num_states, num_actions, upper_bound, lower_bound)

(3,) (1,) 2.0 -2.0


In [7]:
class OUActionNoise:
    def __init__(self, mean, std_deviation, theta=0.15, dt=1e-2, x_initial=None):
        self.theta = theta
        self.mean = mean
        self.std_dev = std_deviation
        self.dt = dt
        self.x_initial = x_initial
        self.reset()

    def __call__(self):
        x = (
            self.x_prev
            + self.theta * (self.mean - self.x_prev) * self.dt
            + self.std_dev * np.sqrt(self.dt) * np.random.normal(size=self.mean.shape)
        )
        self.x_prev = x
        return x

    def reset(self):
        if self.x_initial is not None:
            self.x_prev = self.x_initial
        else:
            self.x_prev = np.zeros_like(self.mean)

In [None]:
class Buffer():
    def __init__(self, max_mem, batch_size):
        self.max_mem = max_mem
        self.batch_size = batch_size
        self.memory = deque(maxlen=self.max_mem)
        
    def store(self, state, action, reward, n_state, done):
        pack = [np.expand_dims(state, axis=0), action, reward, np.expand_dims(n_state, axis=0)]
        self.memory.append(pack)
    
    @tf.function
    def update(self, states, actions, rewards, n_states):
        with tf.GradientTape() as tape:
            target_actions = target_actor(n_states, training=True)
            y = rewards + gamma * target_critic([n_states, target_actions])
            critic_value = critic_model([states, actions], training=True)
            critic_loss = tf.math.reduce_mean(tf.math.square(y - critic_value))
        
        critic_grad = tape.gradients(critic_loss, critic_model.trainable_variables)
        critic_optimizer.apply_gradients(zip(critic_grad, critic_model.trainable_variables))
        
        with tf.GradientTape() as tape:
            acts = actor_model(states, training=True)
            critic_value = critic_model([states, actions], training=True)
            actor_loss = -tf.math.reduce_mean(critic_value)
            
        actor_grad = tape.gradient(actor_loss, actor_model.trainable_variables)
        actor_optimizer.apply_gradients(zip(actor_grad, actor_model.trainable_variables))
    
    def learn(self):
        if len(self.memory) >= 2*self.batch_size:
            pack = random.sample(self.memory, batch_size)
            
            states = []
            actions = []
            rewards = []
            n_states = []
            
            for i in range(batch_size):
                states.append(pack[i][0])
                actions.append(pack[i][1])
                rewards.append(pack[i][2])
                n_states.append(pack[i][3])
            
            states = tf.convert_to_tensor(states)
            actions = tf.convert_to_tensor(actions)
            rewards = tf.cast(rewards, dtype=tf.float32)
            n_states = tf.convert_to_tensor(n_states)
            
            self.update(states, actions, rewards, n_states)
        
        @tf.function
        def update_target(target_weights, weights, tau):
            for (a, b) in zip(target_weights, weights):
                a.assign(b * tau + a * (1 - tau))