In [1]:
import numpy as np
import tensorflow as tf
import tensorflow_probability as tfp
import gym

In [2]:
env = gym.make("CartPole-v0")

In [3]:
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n 

In [4]:
class ActorCritic(tf.keras.Model):
    def __init__(self, action_dim):
        super().__init__()
        self.fc1 = tf.keras.layers.Dense(512, activation='relu')
        self.fc2 = tf.keras.layers.Dense(128, activation='relu')
        self.critic = tf.keras.layers.Dense(1, activation='linear')
        self.actor = tf.keras.layers.Dense(action_dim, activation='linear')
    
    def call(self, input_data):
        x = self.fc1(input_data)
        x1 = self.fc2(x)

        actor = self.actor(x1)
        critic = self.critic(x1)
        return actor, critic

In [6]:
observation = env.reset()
observation = observation.reshape([1, -1])
observation

array([[ 0.02895096,  0.01745123,  0.02670889, -0.03680816]],
      dtype=float32)

In [7]:
actor_critic = ActorCritic(action_dim=action_dim)

Metal device set to: Apple M1

systemMemory: 8.00 GB
maxCacheSize: 2.67 GB



2022-02-23 12:12:32.022636: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:305] Could not identify NUMA node of platform GPU ID 0, defaulting to 0. Your kernel may not have been built with NUMA support.
2022-02-23 12:12:32.023599: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:271] Created TensorFlow device (/job:localhost/replica:0/task:0/device:GPU:0 with 0 MB memory) -> physical PluggableDevice (device: 0, name: METAL, pci bus id: <undefined>)


In [9]:
gamma = 0.99
learning_rate = 1e-3
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)



### function: `get_action` 

* Arg: 
    * state (`np.ndarray`) : observation or state from the environment at current time step


* Return:
    * action (`int`) : action following the current policy at the time step

In [10]:
# Gets action from actor network

def get_action(state):
    """
    Gets action following the policy at current time step
    Arg: 
        state (np.ndarray) : observation or state from environment
    return: 
        action (int) : action following the policy 
    """
    action = np.random.randint(action_dim)
    state = np.array(state).reshape([1, -1])

    action_probs, _ = actor_critic(state)
    action_probs = tf.nn.softmax(action_probs)
    action_probs = action_probs.numpy()

    dist = tfp.distributions.Categorical(probs=action_probs, dtype=tf.float32)
    action = int(dist.sample())

    return action 

### function: `actor_loss`

* Args:
    * prob (`tf.Tensor`) : raw output from actor network (must be processed by softmax / sigmoid)
    * action (`int`) : action from current state  
    * td (`tf.Tensor`) : temporal difference target 

* Return:
    * loss (`tf.Tensor`) : loss output which will be used for computing gradients

In [13]:
def actor_loss(prob, action, td):
    prob = tf.nn.softmax(prob)
    dist = tfp.distributions.Categorical(prob, dtype=tf.float32)
    return (-1)*(dist.log_prob(action))*td

In [14]:
action_probs, value = actor_critic(observation)
action_probs, value

(<tf.Tensor: shape=(1, 2), dtype=float32, numpy=array([[ 0.00074995, -0.00206248]], dtype=float32)>,
 <tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[0.0023877]], dtype=float32)>)

In [15]:
episodes = 2

In [26]:
state = env.reset()

### Learning in one time Step

In [35]:
# learning in one step

done = False 
action = get_action(state)
next_state, reward, done, _ = env.step(action)
state, action, reward, next_state 

state = state.reshape([1, -1])
next_state = next_state.reshape([1, -1])

print(f"Prediction before updating : {actor_critic(state)[1]}")

with tf.GradientTape() as tape: 
    action_probs, value = actor_critic(state, training=True)
    _, value_next = actor_critic(next_state, training=True)
    td = reward + gamma * value_next * (1-int(done)) - value
    loss_a = actor_loss(prob=action_probs, action=action, td=td)
    loss_c = td ** 2 
    total_loss = loss_a + loss_c
grads = tape.gradient(total_loss, actor_critic.trainable_variables)
optimizer.apply_gradients(zip(grads, actor_critic.trainable_variables))
state = next_state

print(f"Prediction after updating : {actor_critic(state)[1]}")

Prediction before updating : [[0.00813797]]
Prediction after updating : [[-0.19515835]]


In [37]:

def learn(
    state : np.ndarray,
    action,
    reward,
    next_state: np.ndarray,
    done):
    """Learning function for each step 
    Args : 
        state (np.ndarray) : current state (shape = (1, state_dim))
        action (int) : current action 
        reward (float) : reward returned by the environment after taking action        
        next_state (np.ndarray) : next state (shape = (1, state_dim))
        done (boolean) : whether the episode ends 
    Return : 
        loss (tf.Tensor) : returns loss after taking a gradient update step
        updated Model (tf.Model) : updated model after taking a gradient update step
    """
    state = state.reshape([1, -1])
    next_state = next_state.reshape([1, -1]) 

    with tf.GradientTape() as tape:
        action_probs, value = actor_critic(state)
        _, next_val = actor_critic(next_state)
        td = reward + gamma * next_val * (1-int(done)) - value 
        loss_a = actor_loss(action_probs, action, td)
        loss_c = td ** 2
        total_loss = loss_a + loss_c 
    
    grads = tape.gradient(total_loss, actor_critic.trainable_variables)
    optimizer.apply_gradients(zip(grads, actor_critic.trainable_variables))
    return total_loss

        



In [40]:
loss = []

In [72]:
done = False 
action = get_action(state)
next_state, reward, done, _ = env.step(action)
state, action, reward, next_state 

loss.append(learn(state, action ,reward, next_state, done))
state = next_state 


  logger.warn(


In [73]:
print(f"state {state} -> next state: {next_state}")
print(f"loss = {loss[-1]}")

state [ 0.17739686  0.26038256 -0.24739863 -0.7663703 ] -> next state: [ 0.17739686  0.26038256 -0.24739863 -0.7663703 ]
loss = [[0.9072479]]
