<a href="https://colab.research.google.com/github/HassanChowdhry/DeepReinforcementLearning/blob/main/REINFORCE_InvertedPendulum_Mujoco.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Training using REINFORCE for Mujoco

In [1]:
%pip install -q gymnasium[mujoco] pyvirtualdisplay
from __future__ import annotations
import os
os.environ["MUJOCO_GL"] = "egl"
import random

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
import torch
import torch.nn as nn
from torch.distributions.normal import Normal
from typing import Tuple

import gymnasium as gym
from tqdm import tqdm

plt.rcParams["figure.figsize"] = (10, 5)



## Policy Network

In [2]:
# ----------------------------------------------------------
# Policy network: π_θ(a|s) ~ N(μ(s), σ²)
# ----------------------------------------------------------
class Policy(nn.Module):
  def __init__(self,
               obs_dims: int,
               action_dims: int,
               hidden_dims: Tuple[int, int] = (32, 32),
            ):
    """Initializes a neural network that estimates the mean and standard deviation
      of a normal distribution from which an action is sampled from.
    """

    super().__init__()

    # network
    self.input_layer = nn.Linear(obs_dims, hidden_dims[0])
    self.hidden_layer = nn.Linear(hidden_dims[0], hidden_dims[1])
    self.mean = nn.Linear(hidden_dims[1], action_dims)
    self.std = nn.Parameter(torch.full((action_dims,), -0.5))
    self.activation = nn.ReLU()

  def forward(self, x: torch.Tensor):
    """Conditioned on the observation, returns the mean and standard deviation
      of a normal distribution from which an action is sampled from.

    Args:
        x: Observation from the environment

    Returns:
        action_means: predicted mean of the normal distribution
        action_stddevs: predicted standard deviation of the normal distribution
    """
    if not isinstance(x, torch.Tensor):
      x = torch.tensor(x, dtype=torch.float32)
      x = x.unsqueeze(0)

    x = self.activation(self.input_layer(x))
    x = self.activation(self.hidden_layer(x))
    mean = self.mean(x)
    std = torch.exp(self.std)

    return mean, std

## Building an agent

In [3]:
from logging import log
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
class REINFORCE:
  def __init__(self, observation_space, action_space, gamma=0.99, lr=3e-4, epsilon=1e-6):
    """Initializes an agent that learns a policy via REINFORCE algorithm [1]
    to solve the task at hand (Inverted Pendulum v4).

    Args:
        obs_space_dims: Dimension of the observation space
        action_space_dims: Dimension of the action space
    """
    self.lr = lr
    self.gamma = gamma
    self.epsilon = epsilon

    self.policy = Policy(observation_space, action_space)
    self.optimizer = torch.optim.Adam(self.policy.parameters(), lr=self.lr)

  def sample_action(self, obs_np):
    """Returns an action, conditioned on the policy and observation.

    Args:
        obs: Observation from the environment

    Returns:
        action: Action to be performed
    """

    obs = torch.tensor(obs_np, dtype=torch.float32, device=DEVICE)
    mean, std = self.policy(obs)

    dist = Normal(mean, std + self.epsilon)
    action = dist.sample()

    log_prob = dist.log_prob(action).sum()

    action = np.clip(action.cpu().numpy(), -1.0, 1.0)

    return action, log_prob

  def update(self, log_prob, rewards):
    """Updates the policy network's weights.
      Loss = E[Gt * log(p)]
    """
    returns = []
    g = 0

    for reward in reversed(rewards):
      g = reward + self.gamma * g
      returns.insert(0, g)

    returns = torch.tensor(returns, dtype=torch.float32, device=DEVICE)
    returns = (returns - returns.mean()) / (returns.std() + 1e-8)
    log_prob = torch.stack(log_prob)
    loss = -torch.sum(log_prob * returns)

    self.optimizer.zero_grad()
    loss.backward()
    self.optimizer.step()

## Evaluation

In [None]:
from IPython.display import clear_output
from IPython.display import HTML
from base64 import b64encode
import glob

# --- Configs ---
ENV_ID = "InvertedPendulum-v5"
MODEL_PATH = "reinforce_policy.pth"
EVAL_EPISODES = 5
VIDEO_FOLDER = "videos"
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# --- Load environment and model ---
os.makedirs(VIDEO_FOLDER, exist_ok=True)

# --- Setup environment with video recording ---
env = gym.make(ENV_ID, render_mode="human", reset_noise_scale=0.6)

obs_dim = env.observation_space.shape[0]
act_dim = env.action_space.shape[0]

policy = Policy(obs_dim, act_dim).to(DEVICE)
policy.load_state_dict(torch.load(MODEL_PATH, map_location=DEVICE))
policy.eval()

# --- Run Evaluation ---
returns = []
for ep in range(EVAL_EPISODES):
    obs, info = env.reset()
    done = False
    ep_return = 0

    while not done:
        obs_tensor = torch.tensor(obs, dtype=torch.float32, device=DEVICE)
        with torch.no_grad():
            mu, std = policy(obs_tensor)
            action = mu.cpu().numpy()  # use mean (deterministic)

        obs, reward, terminated, truncated, _ = env.step(action)
        ep_return += reward
        done = terminated or truncated
    returns.append(ep_return)
    clear_output(wait=True)

env.close()
print(f"\n✅ Average Return over {EVAL_EPISODES} episodes: {np.mean(returns):.2f}")

In [None]:
# make env
base_env = gym.make("InvertedPendulum-v5")
env = gym.wrappers.RecordEpisodeStatistics(base_env)  # Records episode-reward

# init -> episodes, obs, actions
episodes = int(1e4)
observation_space = env.observation_space.shape[0]
action_space = env.action_space.shape[0]

agent = REINFORCE(observation_space, action_space)

for episode in tqdm(range(1, episodes+1), desc="Training"):
  obs, info = env.reset()
  done = False
  ep_log_probs, ep_rewards = [], []

  while not done:
    action, log_prob = agent.sample_action(obs)
    obs, reward, terminated, truncated, info = env.step(action)

    ep_rewards.append(reward)
    ep_log_probs.append(log_prob)
    done = terminated or truncated

  agent.update(ep_log_probs, ep_rewards)

  if episode % 1000 == 0:
    avg_reward = int(np.mean(env.return_queue))
    ep_return = sum(ep_rewards)
    print()
    print("Episode:", episode, "Average Reward:", avg_reward, "Return: ", ep_return)
    torch.save(agent.policy.state_dict(), f"reinforce_policy_ep{episode}.pth")
    print("-"*50)
env.close()