Imports

In [426]:
import numpy as np
import gymnasium as gym
import tensorflow as tf
from tensorflow.keras.models import Sequential, save_model
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import binary_crossentropy


In [2]:
env = gym.make("CartPole-v1")

In [3]:
env.action_space

Discrete(2)

In [4]:
env.reset() # [pos, vel, angle, angular vel]

(array([-0.00880817, -0.02784138, -0.03945461,  0.02059954], dtype=float32),
 {})

In [5]:
env.step(0)

(array([-0.009365  , -0.22237594, -0.03904263,  0.30057767], dtype=float32),
 1.0,
 False,
 False,
 {})

Some basic policy

In [6]:
def basic_policy(obs):
    return int(obs[2] > 0)

In [7]:
episode_reward = []
for episode in range(1000):
    curr_reward = 0
    obs = env.reset()[0]

    for step in range(500):
        action = basic_policy(obs)
        obs, reward, term, trunc, info = env.step(action)
        curr_reward += reward
        if term or trunc:
            break

    episode_reward.append(curr_reward)

In [8]:
np.max(episode_reward), np.min(episode_reward), np.mean(episode_reward), np.std(episode_reward)

(68.0, 24.0, 42.086, 8.909354858798698)

In [9]:
env.close()

REINFORCE Algorithm with Neural Network model

In [379]:
# Loop for number of iteration:
#     Loop for number of episodes:
#         Loop for number of steps:
#             play one step and get reward and grad
#             make an array of reward and grad
#         store array in a matrix for each episode
#     discount and normalize the reward matrix
#     compute reward weighted mean for each step for every model training variables
#     update model vars by applying gradients

In [406]:
def play_one_step(env, model, obs, loss_fn):

    with tf.GradientTape() as tape:
        pred_left_prob = model(obs) # binary classification => probability of going left (1 = left and 0 = right)
        action = tf.random.uniform(shape=(1,1)) > pred_left_prob # Explore and Exploit => 0 (left) with prob of pred_left_prob and 1 (right) with prob of 1-pred_left_prob
        y_target = [[1.]] - tf.cast(action, float) # treat it as y_true
        loss = tf.reduce_mean(loss_fn(y_target, pred_left_prob))

    grad = tape.gradient(loss, model.trainable_variables) # tuple => grad of loss wrt to each model var
    obs, reward, term, trunc, info = env.step(int(action)) # perform the action
    return obs[None,:], reward, term, trunc, grad

In [390]:
def play_one_episode(steps, env, model, loss_fn):

    obs = env.reset()[0][None,:]
    rewards = []
    grads = []
    for step in range(steps):
        obs, reward, term, trunc, grad = play_one_step(env, model, obs, loss_fn)
        # storing reward and grad for each step in a list
        rewards.append(reward)
        grads.append(grad)

        if term or trunc:
            break

    return rewards, grads


In [409]:
def discounting(rewards, discount_factor):
    discounted_rewards = np.array(rewards)
    for i in range(len(rewards)-2, -1, -1):
        discounted_rewards[i] += discounted_rewards[i+1]*discount_factor

    return discounted_rewards

def normalize_and_discount(episodes_rewards, discount_factor):
    
    discounted_episodes_rewards = [discounting(rewards, discount_factor) for rewards in episodes_rewards]
    flat_rewards = np.concatenate(discounted_episodes_rewards)
    mean_reward = flat_rewards.mean()
    std_reward = flat_rewards.std()

    return [(discounted_rewards - mean_reward)/std_reward for discounted_rewards in discounted_episodes_rewards]

In [423]:
n_iter = 100
n_ep = 10
n_steps = 200
discount_factor = 0.95
lr = 0.01

optimizer = Adam(lr)
loss_fn = binary_crossentropy

In [424]:
model = Sequential([
    Dense(4, activation='elu'),
    Dense(1, activation='sigmoid')
])

for iter in range(n_iter):
    episodes_rewards = []
    episodes_grads = []
    for ep in range(n_ep):
        rewards, grads = play_one_episode(n_steps, env, model, loss_fn)
        if(ep%9 == 0):
            print(f'Iter : {iter}, Episode :{ep}, Reward : {np.sum(rewards)}')
        # storing rewards and grads arrays for each episode in a matrix
        episodes_rewards.append(rewards)
        episodes_grads.append(grads)

    # Normalize and apply discounting on rewards
    discounted_episodes_rewards = normalize_and_discount(episodes_rewards, discount_factor)

    # Computing mean gradients for each model trainable vars corresponding to every step
    var_mean_grads = []

    for var_i in range(len(model.trainable_variables)):
        mean_grad = []

        for ep_i, ep in enumerate(discounted_episodes_rewards):
            for step_i, reward in enumerate(ep):
                mean_grad.append(reward*episodes_grads[ep_i][step_i][var_i])
        
        var_mean_grads.append(tf.reduce_mean(mean_grad, axis=0))

    # updating model variables
    optimizer.apply_gradients(zip(var_mean_grads, model.trainable_variables))

Iter : 0, Episode :0, Reward : 69.0
Iter : 0, Episode :9, Reward : 11.0
Iter : 1, Episode :0, Reward : 29.0
Iter : 1, Episode :9, Reward : 29.0
Iter : 2, Episode :0, Reward : 17.0
Iter : 2, Episode :9, Reward : 58.0
Iter : 3, Episode :0, Reward : 29.0
Iter : 3, Episode :9, Reward : 17.0
Iter : 4, Episode :0, Reward : 54.0
Iter : 4, Episode :9, Reward : 38.0
Iter : 5, Episode :0, Reward : 118.0
Iter : 5, Episode :9, Reward : 48.0
Iter : 6, Episode :0, Reward : 29.0
Iter : 6, Episode :9, Reward : 25.0
Iter : 7, Episode :0, Reward : 48.0
Iter : 7, Episode :9, Reward : 89.0
Iter : 8, Episode :0, Reward : 41.0
Iter : 8, Episode :9, Reward : 40.0
Iter : 9, Episode :0, Reward : 14.0
Iter : 9, Episode :9, Reward : 25.0
Iter : 10, Episode :0, Reward : 18.0
Iter : 10, Episode :9, Reward : 40.0
Iter : 11, Episode :0, Reward : 11.0
Iter : 11, Episode :9, Reward : 47.0
Iter : 12, Episode :0, Reward : 18.0
Iter : 12, Episode :9, Reward : 46.0
Iter : 13, Episode :0, Reward : 18.0
Iter : 13, Episode :

In [427]:
save_model(model, 'pg_with_neural_network.h5')



In [428]:
env.close()