In [None]:
!pip install git+https://github.com/Control-RL/Function-Approximation --upgrade

In [6]:
from torch.distributions import Categorical
import gymnasium as gym
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim

In [7]:
import torch
import torch.optim as optim


# Model Definition

In [None]:
gamma = 0.99

class Pi(nn.Module):
    def __init__(self, in_dim, out_dim):
        super(Pi, self).__init__()
        layers = [
            nn.Linear(in_dim, 64),
            nn.ReLU(),
            nn.Linear(64, out_dim),
        ]
        self.model = nn.Sequential(*layers)
        self.onpolicy_reset()
        self.train()  # set training mode

    def onpolicy_reset(self):
        self.log_probs = []
        self.rewards = []

    def forward(self, x):
        pdparam = self.model(x)
        return pdparam

    def act(self, state):
        x = torch.from_numpy(state.astype(np.float32))  # to tensor
        pdparam = self.forward(x)  # forward pass
        pd = torch.distributions.Categorical(logits=pdparam)  # probability distribution
        action = pd.sample()  # pi(a|s) in action via pd
        log_prob = pd.log_prob(action)  # log prob of pi(a|s)
        self.log_probs.append(log_prob)  # store for training
        return action.item()

# Inner gradient loop of basic policy-gradient algorithm (Reinforce)

In [None]:


def train(pi, optimizer):
    # Inner gradient-ascent loop of REINFORCE algorithm
    T = len(pi.rewards)
    rets = np.empty(T, dtype=np.float32)  # the returns
    future_ret = 0.0
    # compute the returns efficiently
    for t in reversed(range(T)):
        future_ret = pi.rewards[t] + gamma * future_ret
        rets[t] = future_ret

    # Compute returns
    rets = torch.tensor(rets)
    log_probs = torch.stack(pi.log_probs)
    
    # Compute loss (REINFORCE gradient term)
    loss = -log_probs * rets  # Negative for maximizing
    loss = torch.sum(loss)

    # Perform gradient ascent
    optimizer.zero_grad()
    loss.backward()  # Backpropagate, compute gradients
    optimizer.step()  # Gradient-ascent, update the weights

    return loss

# Training loop

In [None]:
env = gym.make('CartPole-v1')

in_dim = env.observation_space.shape[0]  # 4
out_dim = env.action_space.n  # 2

pi = Pi(in_dim, out_dim)  # Policy π_θ for REINFORCE
optimizer = optim.Adam(pi.parameters(), lr=0.01)

for epi in range(300):
    state = env.reset()[0]
    for t in range(200):  # CartPole max timestep is 200
        action = pi.act(state)
        state, reward, done, _, _ = env.step(action)
        pi.rewards.append(reward)
        # env.render()
        if done:
            break

    # Train the policy per episode
    loss = train(pi, optimizer)
    total_reward = sum(pi.rewards)
    solved = total_reward > 195.0

    # On-policy: clear memory after training
    pi.onpolicy_reset()
    print(f'Episode {epi}, loss: {loss}, '
            f'total_reward: {total_reward}, solved: {solved}')

Episode 0, loss: 636.5335693359375, total_reward: 46.0, solved: False
Episode 1, loss: 285.105224609375, total_reward: 30.0, solved: False
Episode 2, loss: 109.40056610107422, total_reward: 17.0, solved: False
Episode 3, loss: 144.4670867919922, total_reward: 21.0, solved: False
Episode 4, loss: 96.35108947753906, total_reward: 17.0, solved: False
Episode 5, loss: 293.4607849121094, total_reward: 31.0, solved: False
Episode 6, loss: 90.36298370361328, total_reward: 16.0, solved: False
Episode 7, loss: 139.30355834960938, total_reward: 20.0, solved: False
Episode 8, loss: 275.1560974121094, total_reward: 30.0, solved: False
Episode 9, loss: 288.7371520996094, total_reward: 31.0, solved: False
Episode 10, loss: 594.1530151367188, total_reward: 45.0, solved: False
Episode 11, loss: 44.8603630065918, total_reward: 11.0, solved: False
Episode 12, loss: 386.4444580078125, total_reward: 36.0, solved: False
Episode 13, loss: 98.17557525634766, total_reward: 17.0, solved: False
Episode 14, loss

# Record the agent

In [13]:
from gymnasium.wrappers import RecordVideo
from typing import Optional
import os


In [14]:
def evaluate(
    pi: Pi,
    env: gym.Env,
    n_eval_episodes: int = 10,
    video_name: Optional[str] = None,
) -> None:
    episode_returns, episode_reward = [], 0.0
    total_episodes = 0
    done = False

    # Setup video recorder
    video_recorder = None
    if video_name is not None and env.render_mode == "rgb_array":
        os.makedirs("../logs/videos/", exist_ok=True)

        # New gym recorder always wants to cut video into episodes,
        # set video length big enough but not to inf (will cut into episodes)
        env = RecordVideo(env, "../logs/videos", step_trigger=lambda _: False, video_length=100_000)
        env.start_recording(video_name)

    obs, _ = env.reset()
    n_actions = int(env.action_space.n)

    while total_episodes < n_eval_episodes:

        ### YOUR CODE HERE
        action = pi.act(obs)


        # Send the action to the env
        obs, reward, terminated, truncated, _ = env.step(action)

        ### END OF YOUR CODE

        episode_reward += float(reward)

        done = terminated or truncated
        if done:
            episode_returns.append(episode_reward)
            episode_reward = 0.0
            total_episodes += 1
            obs, _ = env.reset()

    if isinstance(env, RecordVideo):
        print(f"Saving video to ../logs/videos/{video_name}")
        env.close()

    print(f"Total reward = {np.mean(episode_returns):.2f} +/- {np.std(episode_returns):.2f}")

In [15]:
env_id = "CartPole-v1"

eval_env = gym.make(env_id, render_mode="rgb_array")
video_name = f"PG_{env_id}.mp4"
n_eval_episodes = 3

In [16]:
evaluate(pi, eval_env, n_eval_episodes, video_name=video_name)

  logger.warn(


Saving video to ../logs/videos/PG_CartPole-v1.mp4
Total reward = 500.00 +/- 0.00


In [17]:
from dqn_tutorial.notebook_utils import show_videos

print(f"PG agent on {env_id} after 300 iterations:")
show_videos("../logs/videos/", prefix=video_name)

PG agent on CartPole-v1 after 300 iterations:
