In [1]:
import tensorflow as tf
import numpy as np
import time
import gymnasium as gym
import tqdm

SEED = 42
tf.random.set_seed(SEED)

In [2]:
env = gym.make("CartPole-v1", render_mode="human")
model = tf.keras.Sequential()
model.add(tf.keras.layers.Dense(5,activation='relu'))
model.add(tf.keras.layers.Dense(1,activation='sigmoid')) #left probability

#### CREDIT ASSIGNMENT

`We're trying to give weightage to the action which gave more rewards. Kind of weightage rewards. We do this by mutliplying the rewards by dicounted rate and adding it to the previous sum od rewards`

`Discounted Rate (Gamma) : [0.9,0.99] Mostly`

In Pole example, one good reward step followed by bad rewards step can make the pole fall. So we need to assign more importance to the good step

#### Policy Gradients
` Optimize learnable parameters of policy by following the gradients towards higher reward.`

#### Steps

`Step 1: let the nn play its game multiple timesand at every step just calculate the gradiets (wrt reward) but don't apply it immidiately.`

`Step 2: Once you have completed several episodes then compute the actions using discounted method. `

`Step 3: Result of previous step 2 can be poisitive and negative. `

In [3]:
def pg_policy(observation, model):
    left_prob = model.predict(observation[np.newaxis]) # prob 0-1
    action = int(np.random.rand()>left_prob) # exploration vs exploitation concept
    # to force the algo to go for the other option to explore
    return action

def play_one_step(env, observations, model, loss_fn):
    with tf.GradientTape() as tape:
        left_prob = model(observations[np.newaxis]) # --> Predicted
        action = (tf.random.uniform([1,1])>left_prob) # True or False [0 if left, 1 if right]
        y_target = tf.constant([[1.]]) - tf.cast(action, tf.float32) # y_target is the prob of left that should be 1. So, 1 - action (float32)
        loss = tf.reduce_mean(loss_fn(y_target, left_prob)) # --> (y_true,y_pred)
    
    grads = tape.gradient(loss, model.trainable_variables) #dc/dw
    new_observations, reward, done, info, _ = env.step(int(action))
    return new_observations, reward, done, grads

def play_multiple_episdoes(env, N_episodes, N_steps, model, loss_fn):
    total_rewards = list()
    total_grads = list()
    
    for episode in range(N_episodes):
        current_rewards = list()
        current_grads = list()
        observation, info = env.reset()  # observation : [CartPosition, CartVelocity, PoleAngle, PoleAngularVelocity]
        for step in range(N_steps):
            new_observations, reward, done, grads = play_one_step(env, observation, model, loss_fn)
            env.render()
            current_rewards.append(reward)
            current_grads.append(grads)
            if done:
                break
        total_rewards.append(current_rewards)
        total_grads.append(current_grads)
    
    return total_rewards, total_grads

def discount_rewards(rewards, discount_factor):
    discounted = np.array(rewards)
    N = len(rewards)
    for step in range(N -2, -1, -1):
        discounted[step] = discounted[step] + discounted[step + 1] * discount_factor
    return discounted

def discount_and_normalize_rewards(total_rewards, discount_factor):
    total_discounted_rewards = list()
    for rewards in total_rewards:
        total_discounted_rewards.append(discount_rewards(rewards,discount_factor))
    
    flat_rewards = np.concatenate(total_discounted_rewards)
    reward_mean, reward_std = flat_rewards.mean(), flat_rewards.std()
    
    normalized_discounted_rewards = list()
    for discounted_reward in total_discounted_rewards:
        nrs = (discounted_reward - reward_mean)/reward_std
        normalized_discounted_rewards.append(nrs)
        
    return normalized_discounted_rewards

In [4]:
N_episodes = 10
N_steps = 150
N_max_steps = 200
discount_factor = 0.95
learning_rate = 0.01

In [5]:
obs = env.reset(seed=SEED)
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
loss_fn = tf.keras.losses.binary_crossentropy

for iteration in range(N_steps):
    total_rewards, total_grads = play_multiple_episdoes(env, N_episodes, N_max_steps, model, loss_fn)
    sum_total_rewards = sum(map(sum,total_rewards))
    print(f"Iteration: {iteration+1}/{N_steps}",
         f"Mean Rewards: {sum_total_rewards/N_episodes}")
    
    total_final_rewards = discount_and_normalize_rewards(total_rewards, discount_factor)
    total_mean_grads = list()
    
    # Weights for 5 hidden nodes, bias for 5 nodes, w for output nodes, bias for output node
    
    N = len(model.trainable_variables)
    for var_index in range(N):
        temp_reduce_mean = list()
        for episode_index, final_rewards in enumerate(total_final_rewards):
            for step, final_reward in enumerate(final_rewards):
                result = final_reward * total_grads[episode_index][step][var_index]
                temp_reduce_mean.append(result)
        mean_grads = tf.reduce_mean(temp_reduce_mean, axis=0)
        total_mean_grads.append(mean_grads)
    optimizer.apply_gradients(zip(total_mean_grads, model.trainable_variables))

Iteration: 1/150 Mean Rewards: 22.85
Iteration: 2/150 Mean Rewards: 24.8
Iteration: 3/150 Mean Rewards: 19.15
Iteration: 4/150 Mean Rewards: 22.95
Iteration: 5/150 Mean Rewards: 19.9
Iteration: 6/150 Mean Rewards: 19.6
Iteration: 7/150 Mean Rewards: 27.1
Iteration: 8/150 Mean Rewards: 22.05
Iteration: 9/150 Mean Rewards: 21.85
Iteration: 10/150 Mean Rewards: 19.8
Iteration: 11/150 Mean Rewards: 21.4
Iteration: 12/150 Mean Rewards: 21.4
Iteration: 13/150 Mean Rewards: 21.6
Iteration: 14/150 Mean Rewards: 21.75
Iteration: 15/150 Mean Rewards: 18.75
Iteration: 16/150 Mean Rewards: 21.6
Iteration: 17/150 Mean Rewards: 26.4
Iteration: 18/150 Mean Rewards: 22.2
Iteration: 19/150 Mean Rewards: 22.45
Iteration: 20/150 Mean Rewards: 30.15
Iteration: 21/150 Mean Rewards: 22.5
Iteration: 22/150 Mean Rewards: 21.5
Iteration: 23/150 Mean Rewards: 26.45
Iteration: 24/150 Mean Rewards: 20.7
Iteration: 25/150 Mean Rewards: 23.5
Iteration: 26/150 Mean Rewards: 21.35
Iteration: 27/150 Mean Rewards: 21.2

In [None]:
import re
import time

unique_name = re.sub(r"[\s+:]","_",time.asctime())
model_name = f"model_{unique_name}.h5"
model.save(model_name)
print("Model saved as {}".format(model_name))

In [None]:
def show_one_episode(policy, model, N_steps=500, seed=42):
    env = gym.make("CartPole-v1")
    obs, info = env.reset()
    for step in range(N_steps):
        env.render()
        action = policy(obs, model)
        Observations, reward, done, info, _ = env.step(action)
        if done:
            break

    env.close()
    return step, Observations

show_one_episode(pg_policy, model)