# REINFORCE Algorithm

Today we will implement a version of the https://link.springer.com/article/10.1007/BF00992696.

You will work with the CartPole task. You are given a free moving pole, and your task is to generate a re-inforcement learning based controller that pushes the pole just enough to keep it upright.

You will design a neural network that takes state representation as input and outputs a distribution over actions (push left/push right).

In [1]:
!pip install --user "gymnasium"
!pip install --user "pygame"



In [2]:
%matplotlib inline
import pygame
import gymnasium as gym
import torch
import torch.nn as nn
import torch.optim as optim
from torch.distributions import Categorical
import numpy as np
import math
import time
from matplotlib import pyplot as plt
from IPython.display import clear_output
import matplotlib.pyplot as plt
from matplotlib import animation

pygame 2.6.1 (SDL 2.28.4, Python 3.12.12)
Hello from the pygame community. https://www.pygame.org/contribute.html


In [3]:
###############################################################################
# 1. Hyperparameters
###############################################################################

In [4]:
# Description of the task: https://gymnasium.farama.org/environments/classic_control/cart_pole/

ENV_NAME = "CartPole-v1"
GAMMA = 0.99               # discount factor
LR = 1e-3                  # policy learning rate
HIDDEN_SIZE = 128          # hidden layer width for policy network
MAX_EPISODES = 2000        # stop after this many episodes
RENDER = False             # set True to watch training (slow)
LOG_INTERVAL = 50          # print stats every LOG_INTERVAL episodes
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [5]:
###############################################################################
# 2. Policy Network
#    Input: state (4-dim)
#    Output: action probabilities over {0,1}
###############################################################################
# This is our policy network. It "looks" at the state represenation and outputs a probability distribution over actions
class PolicyNet(nn.Module):
    def __init__(self, state_dim=4, hidden_dim=HIDDEN_SIZE, action_dim=2):
        super().__init__()
        self.net = nn.Sequential(
            #TODO: Add nn.Linear layers and nn.SiLU to design your layers, make sure you use the
            #       the hidden dim variable. You can choose how many layers you want here as long as the last layer has "action_dim" outputs
            nn.Linear(state_dim, hidden_dim),
            nn.SiLU(),
            nn.Linear(hidden_dim, action_dim),
            nn.Softmax(dim=-1),  # output is categorical over actions
        )

    def forward(self, x):
        return self.net(x)

In [6]:
###############################################################################
# 3. Episode runner
#    We generate ONE full episode using the current policy πθ,
#    and record: log πθ(a_t|s_t), rewards.
###############################################################################

def run_episode(env, policy: PolicyNet):
    """
    Roll out a single episode using the current policy.

    Returns:
        log_probs: list[Tensor]         # log πθ(a_t | s_t)
        rewards:   list[float]          # r_t
        ep_reward: float                # sum of rewards in this episode
        T:         int                  # length of the episode
    """
    obs, info = env.reset()
    done = False
    truncated = False

    log_probs = []
    rewards = []
    ep_reward = 0.0
    t = 0

    while not (done or truncated):
        state_t = torch.tensor(obs, dtype=torch.float32, device=DEVICE).unsqueeze(0)
        action_probs = policy(state_t)               # shape [1,2]
        m = Categorical(action_probs)                # categorical distribution
        action = m.sample()                          # sample action
        log_prob = m.log_prob(action)                # log πθ(a_t|s_t)

        obs, reward, done, truncated, info = env.step(action.item())

        log_probs.append(log_prob)
        rewards.append(reward)
        ep_reward += reward
        t += 1

    return log_probs, rewards, ep_reward, t



In [7]:
###############################################################################
# 4. Compute discounted returns G_t for each timestep t in an episode
#
#    G_t = r_t + γ r_{t+1} + γ^2 r_{t+2} + ...
#
#    We DO NOT normalize across batch here (simple REINFORCE). You can
#    optionally normalize returns in practice to reduce variance.
###############################################################################

def compute_returns(rewards, gamma=GAMMA):
    """
    rewards: list[float] of length T
    returns: Tensor of shape [T] with G_t
    """
    G = 0.0
    returns = []
    # work backwards
    for r in reversed(rewards):
        G = r + gamma * G #TODO: G_t = r_t + γ r_{t+1} + γ^2 r_{t+2} + ...
        returns.append(G)
    returns.reverse()
    return torch.tensor(returns, dtype=torch.float32, device=DEVICE)

In [8]:
###############################################################################
# 5. Training loop (REINFORCE)
#
#    Loss = - sum_t [ log πθ(a_t|s_t) * G_t ]
#
#    Because we want to ASCEND expected return, we DESCEND this negated loss.
###############################################################################

def train():
    env = gym.make(ENV_NAME)
    policy = PolicyNet().to(DEVICE)
    optimizer = optim.Adam(policy.parameters(), lr=LR)

    reward_history = []
    running_reward = 0.0

    start_time = time.time()

    for episode in range(1, MAX_EPISODES + 1):

        if RENDER:
            env.render()

        # --- Generate one episode ---
        log_probs, rewards, ep_reward, T = run_episode(env, policy)

        reward_history.append(ep_reward)
        # exponential moving average of reward for logging
        if running_reward == 0.0:
            running_reward = ep_reward
        else:
            running_reward = 0.99 * running_reward + 0.01 * ep_reward

        # --- Compute returns for each step in this episode ---
        returns = compute_returns(rewards, gamma=GAMMA)  # [T]

        # TODO: normalize returns to reduce variance (x - mean(x))/ (std(x) + 1e-5)
        returns = (returns - returns.mean()) / (returns.std() + 1e-5)

        # --- Policy gradient loss ---
        # We want to do gradient ASCENT on sum_t G_t * logπ;
        # equivalently minimize -(G_t * logπ).
        log_probs_tensor = torch.stack(log_probs)        # [T]
        loss = -torch.sum(log_probs_tensor * returns) #TODO Implement negative loss

        # --- Update policy parameters ---
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # --- Logging ---
        if episode % LOG_INTERVAL == 0:
            elapsed = time.time() - start_time
            avg_last_100 = np.mean(reward_history[-100:]) if len(reward_history) >= 100 else np.mean(reward_history)
            print(
                f"Episode {episode:4d} | "
                f"len={T:3d} | "
                f"reward={ep_reward:6.1f} | "
                f"running_reward~{running_reward:6.1f} | "
                f"avg100={avg_last_100:6.1f} | "
                f"loss={loss.item():8.6f} | "
                f"time={elapsed:6.1f}s"
            )

        # --- Solve condition ---
        # CartPole-v1 is considered "solved" around 475+ average reward
        # over the last 100 episodes, but we'll just keep going.
        # (The exact threshold sometimes uses 475 or 500 depending on source.)

    env.close()
    print("Training finished.")
    return policy, reward_history

In [9]:
###############################################################################
# 6. Evaluation helper (no gradients, greedy action = argmax prob)
###############################################################################

def evaluate(policy, episodes=10, render=False):
    env = gym.make(ENV_NAME, render_mode="human" if render else None)
    policy.eval()

    scores = []
    with torch.no_grad():
        for ep in range(episodes):
            obs, info = env.reset()
            done = False
            truncated = False
            ep_reward = 0.0
            while not (done or truncated):
                state_t = torch.tensor(obs, dtype=torch.float32, device=DEVICE).unsqueeze(0)
                probs = policy(state_t)            # [1,2]
                action = torch.argmax(probs, dim=-1).item()
                obs, reward, done, truncated, info = env.step(action)
                ep_reward += reward
                if render:
                    env.render()
            scores.append(ep_reward)
            print(f"[EVAL] Episode {ep+1}: reward={ep_reward}")
    env.close()
    avg_score = float(np.mean(scores))
    print(f"[EVAL] Average reward over {episodes} eval episodes: {avg_score}")
    return avg_score

In [10]:
policy, reward_history = train()



Episode   50 | len= 28 | reward=  28.0 | running_reward~  25.3 | avg100=  21.3 | loss=-0.000095 | time=   0.8s
Episode  100 | len= 29 | reward=  29.0 | running_reward~  24.6 | avg100=  22.3 | loss=-0.000159 | time=   1.7s
Episode  150 | len= 27 | reward=  27.0 | running_reward~  25.2 | avg100=  25.0 | loss=-0.000079 | time=   2.5s
Episode  200 | len= 30 | reward=  30.0 | running_reward~  27.2 | avg100=  28.4 | loss=-0.000050 | time=   3.5s
Episode  250 | len= 68 | reward=  68.0 | running_reward~  30.3 | avg100=  32.4 | loss=-0.000278 | time=   4.7s
Episode  300 | len=  9 | reward=   9.0 | running_reward~  30.1 | avg100=  32.2 | loss=-0.000004 | time=   5.8s
Episode  350 | len= 11 | reward=  11.0 | running_reward~  32.0 | avg100=  32.6 | loss=0.000006 | time=   6.9s
Episode  400 | len= 21 | reward=  21.0 | running_reward~  35.4 | avg100=  38.0 | loss=-0.000039 | time=   8.1s
Episode  450 | len= 67 | reward=  67.0 | running_reward~  34.5 | avg100=  36.6 | loss=-0.000202 | time=   9.5s
Ep

In [11]:
# quick sanity check eval after training
# (render=False by default so it won't pop a window unless you want it)
evaluate(policy, episodes=5, render=False)

[EVAL] Episode 1: reward=180.0
[EVAL] Episode 2: reward=275.0
[EVAL] Episode 3: reward=261.0
[EVAL] Episode 4: reward=65.0
[EVAL] Episode 5: reward=59.0
[EVAL] Average reward over 5 eval episodes: 168.0


168.0

In [12]:
from IPython.display import HTML
import matplotlib.pyplot as plt
from matplotlib import animation
import gymnasium as gym
import torch

def visualize_policy(policy, episodes=1, max_steps=500):
    env = gym.make(ENV_NAME, render_mode="rgb_array")
    policy.eval()

    for ep in range(episodes):
        obs, _ = env.reset()
        frames = []
        done, truncated = False, False
        total_reward = 0

        for _ in range(max_steps):
            frame = env.render()
            frames.append(frame)

            state_t = torch.tensor(obs, dtype=torch.float32, device=DEVICE).unsqueeze(0)
            probs = policy(state_t)
            action = torch.argmax(probs, dim=-1).item()
            obs, reward, done, truncated, _ = env.step(action)
            total_reward += reward
            if done or truncated:
                break

        env.close()
        print(f"Episode {ep+1} reward: {total_reward}")

        # --- Create animation ---
        fig = plt.figure(figsize=(6, 4))
        plt.axis("off")
        im = plt.imshow(frames[0])

        def animate(i):
            im.set_array(frames[i])
            return [im]

        ani = animation.FuncAnimation(fig, animate, frames=len(frames), interval=30, blit=True)
        plt.close(fig)  # avoid duplicate static figure

        # --- Return the HTML animation for inline display ---
        return HTML(ani.to_jshtml())

visualize_policy(policy, episodes=3)


Episode 1 reward: 83.0
