In [1]:
import numpy as np
import matplotlib.pyplot as plt
import gymnasium as gym
from tqdm.notebook import tqdm

import torch
import torch.nn as nn
from torch.optim import Adam
from torch.utils.data import Dataset, DataLoader
from torch.distributions.categorical import Categorical


In [2]:
torch.manual_seed(42)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cpu')

In [30]:
# environment parameters
env_name="CartPole-v1"
env = gym.make(env_name)
obs_dim = env.observation_space.shape[0]
acts_dim = env.action_space.n
print(f"{obs_dim} obs | {acts_dim} acts")

# mlp parameters
hidden_sizes = [32]
sizes = [obs_dim]+hidden_sizes+[acts_dim]

# training parameters
epochs=100
batch_size=5000
lr = 1e-2

4 obs | 2 acts


In [4]:
class MLP(nn.Module):
    def __init__(self, sizes, activation=nn.Tanh, output_activation=nn.Identity):
        super().__init__()
        layers = []
        for j in range(len(sizes)-1):
            act = activation if j < len(sizes)-2 else output_activation
            layers += [nn.Linear(sizes[j], sizes[j+1]), act()]
        self.layers = nn.Sequential(*layers)
        

    def forward(self, x):
        return self.layers(x)


In [5]:

def get_policy(obs):
    logits = model(obs)
    return Categorical(logits=logits)

def get_action(obs): # only one observation as input
    # you can remove .item() to pass batch of obs as input
    return get_policy(obs).sample().item()

# make loss function whose gradient, for the right data, is policy gradient
def compute_loss(obs, act, weights):
    """
    Even though we describe this as a loss function, it is not a loss function in the typical sense from supervised learning.
    1. The data distribution depends on the parameters
    2. It doesn’t measure performance
    """
    logp = get_policy(obs).log_prob(act)
    return -(logp * weights).mean()


# Train

In [31]:
env = gym.make(env_name)

model = MLP(sizes)

optimizer = Adam(model.parameters(), lr=lr)

# training loop
for i in range(epochs):
    # make some empty lists for logging.
    batch_obs = []          # for observations
    batch_acts = []         # for actions
    batch_weights = []      # for R(tau) weighting in policy gradient
    batch_rets = []         # for measuring episode returns
    batch_lens = []         # for measuring episode lengths

    # reset episode-specific variables
    obs, _ = env.reset()    
    done = False            
    ep_rews = []            # list for rewards accrued throughout ep

    # collect experience by acting in the environment with current policy
    while True:
        batch_obs.append(obs.copy())
        
        act = get_action(torch.as_tensor(obs, dtype=torch.float32))
        obs, rew, done, _, _ = env.step(act)
        
        batch_acts.append(act)
        ep_rews.append(rew)
        
        if done:
            # if episode is over, record info about episode
            ep_ret, ep_len = sum(ep_rews), len(ep_rews)
            batch_rets.append(ep_ret)
            batch_lens.append(ep_len)
            
            # the weight for each logprob(a|s) is R(tau)
            batch_weights += [ep_ret] * ep_len

            obs, _ = env.reset()
            done = False
            ep_rews = []
            
            if len(batch_obs) > batch_size:
                break


    # take a single policy gradient update step using the experience gained
    optimizer.zero_grad()
    batch_loss = compute_loss(obs=torch.as_tensor(batch_obs, dtype=torch.float32),
                              act=torch.as_tensor(batch_acts, dtype=torch.int32),
                              weights=torch.as_tensor(batch_weights, dtype=torch.float32)
                              )
    batch_loss.backward()
    optimizer.step()
    
    print('epoch: %3d \t loss: %.3f \t return: %.3f \t ep_len: %.3f'%
            (i, batch_loss, np.mean(batch_rets), np.mean(batch_lens)))


epoch:   0 	 loss: 22.133 	 return: 22.986 	 ep_len: 22.986
epoch:   1 	 loss: 21.624 	 return: 25.020 	 ep_len: 25.020
epoch:   2 	 loss: 26.439 	 return: 27.657 	 ep_len: 27.657
epoch:   3 	 loss: 26.680 	 return: 30.349 	 ep_len: 30.349
epoch:   4 	 loss: 28.714 	 return: 32.103 	 ep_len: 32.103
epoch:   5 	 loss: 31.471 	 return: 36.158 	 ep_len: 36.158
epoch:   6 	 loss: 34.872 	 return: 39.698 	 ep_len: 39.698
epoch:   7 	 loss: 37.969 	 return: 44.434 	 ep_len: 44.434
epoch:   8 	 loss: 40.971 	 return: 48.000 	 ep_len: 48.000
epoch:   9 	 loss: 43.462 	 return: 53.000 	 ep_len: 53.000
epoch:  10 	 loss: 41.648 	 return: 54.290 	 ep_len: 54.290
epoch:  11 	 loss: 42.443 	 return: 56.191 	 ep_len: 56.191
epoch:  12 	 loss: 56.620 	 return: 66.026 	 ep_len: 66.026
epoch:  13 	 loss: 55.302 	 return: 68.685 	 ep_len: 68.685
epoch:  14 	 loss: 46.511 	 return: 64.615 	 ep_len: 64.615
epoch:  15 	 loss: 53.508 	 return: 70.732 	 ep_len: 70.732
epoch:  16 	 loss: 57.788 	 return: 77.0

In [33]:
batch_rets, len(batch_obs)

([1177.0, 634.0, 609.0, 554.0, 1198.0, 912.0], 5084)

In [None]:
PATH = f"{env_name}_{ep_len}"
torch.save(model, PATH)

# Test

In [35]:
# model_name = "CartPole-v1_1276"
# model = torch.load(f"models\\{model_name}")

env = gym.make(env_name, render_mode = "human")

num_episodes = 10

for e in range(num_episodes):
    state, _ = env.reset()
    done = False; score = 0
    
    while not done:
        action = get_action(torch.as_tensor(obs, dtype=torch.float32))
        state, reward, done, _, _ = env.step(action)
        score += reward
        env.render()
        if score % 10 == 0:
            print(f"Episode {e}, score {score}")

env.close()

Episode 0, score 10.0
Episode 1, score 10.0
Episode 2, score 10.0
Episode 3, score 10.0
Episode 4, score 10.0
Episode 5, score 10.0
Episode 6, score 10.0
Episode 7, score 10.0
Episode 8, score 10.0
Episode 9, score 10.0
