In [513]:
import gymnasium as gym

In [514]:
env = gym.make("LunarLander-v2", render_mode="human")
observation, info = env.reset()

In [515]:
for _ in range(1000):
    action = env.action_space.sample()
    observation, reward, terminated, truncated, info = env.step(action)
    
    if terminated or truncated:
        observation, info = env.reset()

env.close()

In [516]:
reward

-2.8145547768651227

### Hands on | CH 18 Reinforcement Learning

In [1]:
import gymnasium as gym

In [2]:
## list of all environments in gym
gym.envs.registry.keys()

dict_keys(['CartPole-v0', 'CartPole-v1', 'MountainCar-v0', 'MountainCarContinuous-v0', 'Pendulum-v1', 'Acrobot-v1', 'CartPoleJax-v0', 'CartPoleJax-v1', 'PendulumJax-v0', 'LunarLander-v2', 'LunarLanderContinuous-v2', 'BipedalWalker-v3', 'BipedalWalkerHardcore-v3', 'CarRacing-v2', 'Blackjack-v1', 'FrozenLake-v1', 'FrozenLake8x8-v1', 'CliffWalking-v0', 'Taxi-v3', 'Reacher-v2', 'Reacher-v4', 'Pusher-v2', 'Pusher-v4', 'InvertedPendulum-v2', 'InvertedPendulum-v4', 'InvertedDoublePendulum-v2', 'InvertedDoublePendulum-v4', 'HalfCheetah-v2', 'HalfCheetah-v3', 'HalfCheetah-v4', 'Hopper-v2', 'Hopper-v3', 'Hopper-v4', 'Swimmer-v2', 'Swimmer-v3', 'Swimmer-v4', 'Walker2d-v2', 'Walker2d-v3', 'Walker2d-v4', 'Ant-v2', 'Ant-v3', 'Ant-v4', 'Humanoid-v2', 'Humanoid-v3', 'Humanoid-v4', 'HumanoidStandup-v2', 'HumanoidStandup-v4', 'GymV22Environment-v0', 'GymV26Environment-v0'])

In [517]:
env = gym.make("CartPole-v1", render_mode="human")
obs = env.reset() #this initializes the environment
obs

(array([-0.02758559, -0.00663392, -0.02858965,  0.01628222], dtype=float32),
 {})

In [132]:
# env.render()
env.action_space #possible actions in the environment

Discrete(2)

- Discreate(2) means that either action 1 or 0 is possible

In [518]:
action = 1
obs, reward, done, _, info = env.step(action)

In [519]:
obs

array([-0.02771827,  0.18888612, -0.028264  , -0.28528222], dtype=float32)

In [520]:
def basic_policy(obs):
    angle = obs[2]
    return 0 if angle < 0 else 1

totals = []
for episode in range(500):
    episode_rewards = 0
    obs = env.reset()[0]
    for step in range(200):
        action = basic_policy(obs)
        obs, reward, done, _, info = env.step(action)
        episode_rewards += reward
        if done:
            break
    totals.append(episode_rewards)

In [522]:
# env.close()

In [25]:
import numpy as np
np.mean(totals), np.std(totals), np.max(totals), np.min(totals)

(41.69, 9.29440154071256, 68.0, 24.0)

## Deep Reinforcement Learning
- Instead of hard coding policies use a neural network to learn a policy.
    - Input: observation given the current state
    - Output: probability of the action to be taken

In [105]:
import torch
from torch import nn
from torch.autograd import grad

In [277]:
class PolicyNN(nn.Module):
    def __init__(self,
                input_shape:int,
                hidden_units:int,
                output_shape: int):
        super().__init__()
        self.layer_stack = nn.Sequential(
            nn.Flatten(),
            nn.Linear(in_features=input_shape,
                     out_features=hidden_units),
            nn.Linear(in_features=hidden_units,
                     out_features=output_shape)
        )
        
    def forward(self, x):
        return torch.sigmoid(self.layer_stack(x))

In [391]:
# compute gradients and save them
def play_one_step(env, obs, model, loss_fn):
    """
    Play one step and compute the gradients
    Generate new obs, reward etc.
    """
    obs = torch.tensor(obs[np.newaxis])
    left_proba = model(obs)
    action = (torch.FloatTensor(1,1).uniform_(0,1) > left_proba)
    action = action.clone().detach().type(torch.float32)
    y_target = torch.ones(1,1) - action
    loss = loss_fn(left_proba, y_target)

    loss.backward() #compute gradients
    grads = []
    for param in model.parameters():
        grads.append(param.grad.view(-1))
    grads = torch.cat(grads)    
    obs, reward, done, _, info = env.step(int(action.numpy()[0]))
    return obs, reward, done, grads

In [392]:
def play_multiple_episodes(env, n_episodes, n_max_steps, model, loss_fn):
    """
    Play multiple episodes.
    """
    all_rewards = []
    all_grads = []
    for episode in range(n_episodes):
        current_rewards = []
        current_grads = []
        obs = env.reset()[0]
        for step in range(n_max_steps):
            obs, reward, done, grads = play_one_step(env, obs, model, loss_fn)
            current_rewards.append(reward)
            current_grads.append(grads)
            if done:
                break
        all_rewards.append(current_rewards)
        all_grads.append(current_grads)
    return all_rewards, all_grads

In [393]:
def discount_rewards(rewards, discount_factor):
    discounted = np.array(rewards)
    for step in range(len(rewards)-2, -1, -1):
        discounted[step] += discounted[step+1] * discount_factor
    return discounted

In [394]:
def discount_and_normalize_rewards(all_rewards, discount_factor):
    all_discounted_rewards = [discount_rewards(rewards, discount_factor)
                              for rewards in all_rewards]
    flat_rewards = np.concatenate(all_discounted_rewards)
    reward_mean = flat_rewards.mean()
    rewards_std = flat_rewards.std()
    return [(discounted_rewards - reward_mean)/rewards_std
           for discounted_rewards in all_discounted_rewards]

In [395]:
discount_rewards([10,0,-50], discount_factor=0.8)
discount_and_normalize_rewards([[10,0,-50], [10, 20]],
                              discount_factor=0.8)

[array([-0.28435071, -0.86597718, -1.18910299]),
 array([1.26665318, 1.0727777 ])]

In [484]:
model = PolicyNN(input_shape=4,
                hidden_units=10,
                output_shape=1)

loss_fn = nn.BCELoss() #loss function
optimizer = torch.optim.SGD(model.parameters(), #optimizer
                           lr=0.01)

#hyperparameters
n_iterations = 150
n_episodes_per_update = 10
n_max_steps = 200
discount_factor = 0.95

In [485]:
env = gym.make("CartPole-v1", render_mode="human")
obs = env.reset() #this initializes the environment
action = 1
obs, reward, done, _, info = env.step(action)

In [486]:
# _,_,_,w = play_one_step(env, obs, model, loss_fn)
# w

In [487]:
# env.reset()[0]

In [498]:
### training loop
for iteration in range(n_iterations):
    all_rewards, all_grads = play_multiple_episodes(
        env, n_episodes_per_update, n_max_steps, model, loss_fn)
    all_final_rewards = discount_and_normalize_rewards(all_rewards,
                                                      discount_factor)
    all_mean_grads = []
    for var_index in range(sum(p.numel() for p in model.parameters() if p.requires_grad)):
        mean_grads = [torch.tensor(final_rewards)*all_grads[episode_index][step][var_index] 
             for episode_index, final_rewards in enumerate(all_final_rewards) 
                 for step, final_reward in enumerate(final_rewards)]#, dim=0)
        all_mean_grads.append(mean_grads)
        

In [500]:
# [final_rewards*np.array(all_grads[episode_index][step][var_index])
#             for episode_index, final_rewards in enumerate(all_final_rewards)
#                 for step, final_reward in enumerate(final_rewards)]
# all_mean_grads

In [508]:
# optimizer.step()

In [509]:
# model.state_dict()