In [1]:
import gym
import numpy as np
# import pandas as pd
import tensorflow as tf
import keras
from keras import Sequential
import gymnasium

In [2]:
env = gym.make("BipedalWalker-v3")

In [None]:
model = Sequential([
    tf.keras.layers.Dense(32, activation='relu'),
    tf.keras.layers.BatchNormalization(),
    tf.keras.layers.Dense(500, activation='relu'),
    tf.keras.layers.BatchNormalization(),
    tf.keras.layers.Dense(500, activation='relu'),
    tf.keras.layers.BatchNormalization(),
    tf.keras.layers.Dense(4, activation='tanh')  # Output actions
])

# State consists of hull angle speed, angular velocity, horizontal speed, vertical speed, position of joints and 
# joints angular speed, legs contact with ground, and 10 lidar rangefinder measurements. There are no coordinates in the state vector.
# obs[0]  : hull angle speed
# obs[1] : hull angular velocity
# obs[2] : horizontal speed
# obs[3] : vertical speed
# obs[4 and 6] : position of joints
# obs[5 and 7] : joints angular speed
# obs[8 and 9] : legs contact with ground
# obs[10 to 19] : 10 lidar rangefinder measurements


def normalize_obs(obs):
    obs_min = np.array([3.14, 5., 5., 5., 3.14, 5., 3.14, 5., 5., 3.14, 5., 3.14, 5., 5., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.])
    obs_max = np.array([-3.14, -5., -5., -5., -3.14, -5., -3.14, -5., -0., -3.14, -5., -3.14, -5., -0., -1., -1., -1., -1., -1., -1., -1., -1., -1., -1.])
    return (obs - obs_min) / (obs_max - obs_min) * 2 - 1

# (abs(obs[4] - obs[5]) * gamma ** 3)
def play_one_step(env, obs, loss_fun , gamma):
    with tf.GradientTape() as tape:
        obs = normalize_obs(obs)
        forward_velocity = obs[2] * gamma ** -3
        stability_penalty = -abs(obs[0]) - abs(obs[1])  # Penalize hull instability
        leg_swing_reward = -abs(obs[4] - obs[6]) * gamma + -abs(obs[5] - obs[7]) * gamma ** 2
        swing_speed_reward = +abs(obs[8] - obs[10]) * gamma + -abs(obs[9] - obs[11]) * gamma ** 2
        contact_reward = (-obs[12] + obs[13]) * gamma ** -2  # Encourage legs to make ground contact
        heuristic_function = forward_velocity + stability_penalty + leg_swing_reward + contact_reward
        modified_obs = np.append(obs, heuristic_function)
        y_prob = model(modified_obs[np.newaxis])

        action = []
        y_target = []

        for state in y_prob[0]:
            np.random.seed(np.random.randint(low = 0 , high = np.iinfo(np.int32).max))
            state_bool = (np.random.uniform(low = -1 , high = 1 , size=[1]) > state)
            int_state = 1 if state_bool else -1
            action.append(int_state)
            target = -int_state
            y_target.append(target)

        y_target = tf.convert_to_tensor(y_target, dtype=tf.float32)
        y_prob = tf.squeeze(y_prob)  # Remove extra dimensions if necessary

        # Calculating loss
        loss = tf.reduce_mean(loss_fun(y_target, y_prob))

    grads = tape.gradient(loss, model.trainable_variables)

    obs, reward, done, info = env.step(action)
    env.render()

    return obs, reward, done, info, grads

def play_multiple_episodes(env , max_episodes , max_steps , loss_fn):
    gamma = 0.9
    all_rewards = []
    all_grads = []
    for episode in range(max_episodes):
        current_rewards = []
        current_grads = []
        env.seed(episode)    
        obs = env.reset()
        for step in range(max_steps):
            obs, reward, done, info, grads = play_one_step(env, obs , loss_fn , gamma)
            current_rewards.append(reward)
            current_grads.append(grads)
            if done:
                break
        all_rewards.append(current_rewards)
        all_grads.append(current_grads)
    return all_rewards, all_grads

def discount_rewards(rewards, discount_factor):
    discounted_reward = np.zeros_like(rewards, dtype=np.float32)
    cummulated = 0.0
    for index in reversed(range(len(rewards))):
        cummulated = rewards[index] + cummulated * discount_factor
        discounted_reward[index] = cummulated 
    return discounted_reward
def discount_and_normalize(all_rewards, discount_factor):
    all_discounted_rewards = [discount_rewards(reward, discount_factor) for reward in all_rewards]
    flat_rewards = np.concatenate(all_discounted_rewards)
    rewards_mean = flat_rewards.mean()
    rewards_std = flat_rewards.std()
    return [(discounted_reward - rewards_mean)/rewards_std for discounted_reward in all_discounted_rewards]

n_iterations = 1600
n_episodes_per_update = 10
n_max_steps = 500
discount_factor = 0.95

optimizer = tf.keras.optimizers.Nadam(learning_rate = 0.01)
loss_fn = tf.keras.losses.binary_crossentropy

for iteration in range(n_iterations):
    print(iteration)
    all_rewards , all_grads = play_multiple_episodes(env , n_episodes_per_update , n_max_steps , loss_fn)
    all_final_rewards = discount_and_normalize(all_rewards , discount_factor)
    all_mean_grad = []
    for var_index in range(len(model.trainable_variables)):
        mean_grads = tf.reduce_mean([final_reward * all_grads[episode_index][step][var_index]
                                    for episode_index,final_reward in enumerate(all_final_rewards)
                                    for  step, final_reward in enumerate(final_reward)],axis=0)
        all_mean_grad.append(mean_grads)
    optimizer.apply_gradients(zip(all_mean_grad, model.trainable_variables))

0
1
2
3


In [20]:
ENV=gymnasium.make('Humanoid-v5',render_mode = 'human')


In [21]:
ENV.reset()
ENV.render()
