In [2]:
import tensorflow as tf
import numpy as np
import gym
import time

In [3]:
env = gym.make("CartPole-v1")

  "Initializing wrapper in old step API which returns one bool instead of two. It is recommended to set `new_step_api=True` to use new step API. This will be the default behaviour in future."
  "Initializing environment in old step API which returns one bool instead of two. It is recommended to set `new_step_api=True` to use new step API. This will be the default behaviour in future."


In [4]:
SEED = 43
tf.random.set_seed(SEED)

In [77]:
observations = env.reset()
arr = np.array([0.00123])
observation = np.append(observations,arr)
# observations = observations.append(arr)
observations

array([-0.00848986,  0.01820944, -0.03901765, -0.02499007], dtype=float32)

In [86]:
LAYERS = [
    tf.keras.layers.Dense(5, activation = 'relu'),
    # tf.keras.layers.Dense(2, activation = 'relu'),
    tf.keras.layers.Dense(1, activation='sigmoid')  # here to get the probability of left, and right = 1 - left
]
model = tf.keras.Sequential(LAYERS)

In [89]:
## calling without training for checking
left_probability = model(observations[np.newaxis])
left_probability

<tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[0.5021006]], dtype=float32)>

In [88]:
model.summary()

Model: "sequential_12"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense_26 (Dense)            (1, 5)                    25        
                                                                 
 dense_27 (Dense)            (1, 1)                    6         
                                                                 
Total params: 31
Trainable params: 31
Non-trainable params: 0
_________________________________________________________________


Policy Gradients
Optimize learnable parameters of policy by following the gradients towards higher reward (maximizing reward)

### steps
#### let the NN play the game multiple times and at every step just calculate the gradients (wrt reward) but dont apply it immidiately.
#### Once you have completed several episodes then compute the actions using discounted method.
#### result of previous step 2 can +ve or -ve

In [12]:
## creating a policy gradient
def pg_policy(observation, model): ## observation from the game, model which we defined
    left_probability = model.predict(observation[np.newaxis])  #probability value between 0 and 1
    action = int(np.random.rand() > left_probability) 
    return action

## just for exploitation vs exploration part, where we will take a random number bw 0 and 1 and compare with 
# left_probability. If the left probability is more than 0.5 which means we should move left and not right, 
# and hence in this action will become 1 and we know 1 means moving left and 0 means moving right. 

In [19]:
tf.random.uniform([1,1])

<tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[0.37664938]], dtype=float32)>

In [99]:
def play_one_step(env,observation, model, loss_fn):
    with tf.GradientTape() as tape:
        left_probability = model(observation[np.newaxis])
        action = (tf.random.uniform([1,1]) > left_probability) # gives true or false
        y_target = tf.constant([[1.]]) - tf.cast(action, tf.float32)
        loss = tf.reduce_mean(loss_fn(y_target,left_probability))
    grads = tape.gradient(loss,model.trainable_variables) # dc/dw
    new_observation, reward, done, info = env.step(int(action))
    return new_observation,reward, done, grads

In [100]:
def play_multiple_episodes(env, n_episodes, n_max_steps, model, loss_fn):
    all_rewards = list()
    all_grads = list()
    for episode in range(n_episodes):
        current_rewards = list()
        current_grads = list()
        observation = env.reset()
        for step in range(n_max_steps):
            observation, rewards, done, grads = play_one_step(env,observation,model, loss_fn)
            current_rewards.append(reward)
            current_grads.append(grads)
            if done:
                break
            all_rewards.append(current_rewards)
            all_grads.append(current_grads)
    return all_rewards, all_grads



In [101]:
def discount_rewards(rewards, discount_factor):
    discounted = np.array(rewards)
    N = len(rewards)
    for step in range(N - 2, -1, -1):
        # a_n + a_n+1*gamma
        discounted[step] = discounted[step] + discounted[step + 1] * discount_factor
    return discounted

In [102]:
def discount_and_normalize_rewards(all_rewards, discount_factor):
    all_discounted_rewards = list()
    for reward in all_rewards:
        # discounted rewards
        drs = discount_rewards(reward, discount_factor)
        all_discounted_rewards.append(drs)

    flat_rewards = np.concatenate(all_discounted_rewards)
    reward_mean = flat_rewards.mean()
    reward_std = flat_rewards.std()

    normalize_rewards = list()
    for discounted_rewards in all_discounted_rewards:
        nrs = (discounted_rewards - reward_mean) / reward_std
        normalize_rewards.append(nrs)
    return normalize_rewards

In [103]:
## parameters
n_iterations = 150
n_episodes_per_update = 10
n_max_steps = 200
discount_factor = 0.95
learning_rate = 0.01

In [104]:
obs = env.reset(seed=SEED)
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
loss_fn = tf.keras.losses.binary_crossentropy

In [105]:
for epochs in range(n_iterations):
    all_rewards , all_grads = play_multiple_episodes(
        env, n_episodes_per_update, n_max_steps, model, loss_fn
    )
    total_rewards = sum(map(sum, all_rewards))
    print(f"epoch:{epoch + 1}/{n_iterations}, mean rewards: {total_rewards/n_episodes_per_update}")
    all_final_rewards = discount_and_normalize_rewards(all_rewards, discount_factor)
    all_mean_grads = list()
    N= len(model.trainable_variables)
    for var_index in range(N):
        temp_reduce_mean = list()
        for episode_index, final_rewards  in enumerate(final_rewards):
            result = final_reward * all_grads[episode_index][step][var_index]
            temp_reduce_mean.append(result)
        mean_grads = tf.reduce_mean(temp_reduce_mean, axis = 0)
        all_mean_grads.append(mean_grads)
    optimizer.apply_gradients(zip(all_mean_grads,model.trainable_variables))


ValueError: Attempt to convert a value (<keras.losses.BinaryCrossentropy object at 0x0000013F726C3EC8>) with an unsupported type (<class 'keras.losses.BinaryCrossentropy'>) to a Tensor.

In [25]:
action = (tf.random.uniform([1,1]) > 0.564) # True and False
y_target = tf.constant([[1.]]) - tf.cast(action, tf.float32)
y_target, action

(<tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[0.]], dtype=float32)>,
 <tf.Tensor: shape=(1, 1), dtype=bool, numpy=array([[ True]])>)

In [26]:
tf.constant([[1.]])

<tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[1.]], dtype=float32)>

In [30]:
x = tf.cast(False, tf.float32)
x

<tf.Tensor: shape=(), dtype=float32, numpy=0.0>

In [31]:
model.trainable_variables

ValueError: Weights for model sequential_1 have not yet been created. Weights are created when the Model is first called on inputs or `build()` is called with an `input_shape`.