<a href="https://colab.research.google.com/github/arjunprakash027/HandcraftedML/blob/main/Reinforcement_Learning/Basics/Reinforce_DRL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
%%capture
!apt install python-opengl
!apt install ffmpeg
!apt install xvfb
!pip install pyvirtualdisplay
!pip install pyglet==1.5.1

In [None]:
from pyvirtualdisplay import Display

virtual_display = Display(visible=0, size=(1400, 900))
virtual_display.start()

<pyvirtualdisplay.display.Display at 0x7e428b05f4d0>

In [None]:
!pip install git+https://github.com/ntasfi/PyGame-Learning-Environment.git
!pip install git+https://github.com/simoninithomas/gym-games
!pip install huggingface_hub
!pip install imageio-ffmpeg
!pip install pyyaml==6.0

Collecting git+https://github.com/ntasfi/PyGame-Learning-Environment.git
  Cloning https://github.com/ntasfi/PyGame-Learning-Environment.git to /tmp/pip-req-build-7mn8cbr7
  Running command git clone --filter=blob:none --quiet https://github.com/ntasfi/PyGame-Learning-Environment.git /tmp/pip-req-build-7mn8cbr7
  Resolved https://github.com/ntasfi/PyGame-Learning-Environment.git to commit 3dbe79dc0c35559bb441b9359948aabf9bb3d331
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting git+https://github.com/simoninithomas/gym-games
  Cloning https://github.com/simoninithomas/gym-games to /tmp/pip-req-build-a3uo3s68
  Running command git clone --filter=blob:none --quiet https://github.com/simoninithomas/gym-games /tmp/pip-req-build-a3uo3s68
  Resolved https://github.com/simoninithomas/gym-games to commit f31695e4ba028400628dc054ee8a436f28193f0b
  Preparing metadata (setup.py) ... [?25l[?25hdone


## Imports

In [None]:
import numpy as np

from collections import deque

import matplotlib.pyplot as plt
%matplotlib inline

# PyTorch
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical

# Gym
import gymnasium as gym
import gym_pygame

# Hugging Face Hub
from huggingface_hub import notebook_login # To log to our Hugging Face account to be able to upload models to the Hub.
import imageio

from tqdm import trange

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
device

device(type='cpu')

## HF HUB

In [None]:
from huggingface_hub import HfApi, snapshot_download
from huggingface_hub.repocard import metadata_eval_result, metadata_save

from pathlib import Path
import datetime
import json
import imageio

import tempfile

import os

def record_video(env, policy, out_path, max_steps=1000, fps=30):
    """
    Generate a replay video of the agent.
    :param env: Gymnasium or Gym environment (must be created with render_mode='rgb_array')
    :param policy: Agent with a F_pass(state) method returning (action, log_prob)
    :param out_path: Path to save the output video (e.g., 'replay.mp4')
    :param max_steps: Maximum steps to record (safety cap)
    :param fps: Desired frames per second for the output video
    """
    images = []
    state, _ = env.reset()
    img = env.render()
    images.append(img)
    done = False
    steps = 0
    while not done and steps < max_steps:
        action, _ = policy.F_pass(state)
        state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated
        img = env.render()
        images.append(img)
        steps += 1
    imageio.mimsave(out_path, [np.array(frame) for frame in images], fps=fps)

def push_to_hub(repo_id,
                model,
                hyperparameters,
                eval_env,
                video_fps=30
                ):
  """
  Evaluate, Generate a video and Upload a model to Hugging Face Hub.
  This method does the complete pipeline:
  - It evaluates the model
  - It generates the model card
  - It generates a replay video of the agent
  - It pushes everything to the Hub

  :param repo_id: repo_id: id of the model repository from the Hugging Face Hub
  :param model: the pytorch model we want to save
  :param hyperparameters: training hyperparameters
  :param eval_env: evaluation environment
  :param video_fps: how many frame per seconds to record our video replay
  """

  _, repo_name = repo_id.split("/")
  api = HfApi()

  # Step 1: Create the repo
  repo_url = api.create_repo(
        repo_id=repo_id,
        exist_ok=True,
  )

  with tempfile.TemporaryDirectory() as tmpdirname:
    local_directory = Path(tmpdirname)

    # Step 2: Save the model
    torch.save(model, local_directory / "model.pt")

    # Step 3: Save the hyperparameters to JSON
    # Convert numpy int64 to standard int for JSON serialization
    serializable_hyperparameters = {k: int(v) if isinstance(v, np.int64) else v for k, v in hyperparameters.items()}
    with open(local_directory / "hyperparameters.json", "w") as outfile:
      json.dump(serializable_hyperparameters, outfile)

    # Step 4: Evaluate the model and build JSON
    mean_reward, std_reward = evaluate_agent(eval_env,
                                            hyperparameters["max_t"],
                                            hyperparameters["n_evaluation_episodes"],
                                            model)
    # Get datetime
    eval_datetime = datetime.datetime.now()
    eval_form_datetime = eval_datetime.isoformat()

    evaluate_data = {
          "env_id": hyperparameters["env_id"],
          "mean_reward": mean_reward,
          "n_evaluation_episodes": hyperparameters["n_evaluation_episodes"],
          "eval_datetime": eval_form_datetime,
    }

    # Write a JSON file
    with open(local_directory / "results.json", "w") as outfile:
        json.dump(evaluate_data, outfile)

    # Step 5: Create the model card
    env_name = hyperparameters["env_id"]

    metadata = {}
    metadata["tags"] = [
          env_name,
          "reinforce",
          "reinforcement-learning",
          "custom-implementation",
          "deep-rl-class"
      ]

    # Add metrics
    eval = metadata_eval_result(
        model_pretty_name=repo_name,
        task_pretty_name="reinforcement-learning",
        task_id="reinforcement-learning",
        metrics_pretty_name="mean_reward",
        metrics_id="mean_reward",
        metrics_value=f"{mean_reward:.2f} +/- {std_reward:.2f}",
        dataset_pretty_name=env_name,
        dataset_id=env_name,
      )

    # Merges both dictionaries
    metadata = {**metadata, **eval}

    model_card = f"""
  # **Reinforce** Agent playing **{env_id}**
  This is a trained model of a **Reinforce** agent playing **{env_id}** .
  To learn to use this model and train yours check Unit 4 of the Deep Reinforcement Learning Course: https://huggingface.co/deep-rl-course/unit4/introduction
  """

    readme_path = local_directory / "README.md"
    readme = ""
    if readme_path.exists():
        with readme_path.open("r", encoding="utf8") as f:
          readme = f.read()
    else:
      readme = model_card

    with readme_path.open("w", encoding="utf-8") as f:
      f.write(readme)

    # Save our metrics to Readme metadata
    metadata_save(readme_path, metadata)

    # Step 6: Record a video
    video_path =  local_directory / "replay.mp4"
    record_video(env, model, video_path, video_fps)

    # Step 7. Push everything to the Hub
    api.upload_folder(
          repo_id=repo_id,
          folder_path=local_directory,
          path_in_repo=".",
    )

    print(f"Your model is pushed to the Hub. You can view your model here: {repo_url}")

## The base NN and training algo

In [None]:
class Policy(nn.Module):

  def __init__(
      self,
      state_size: int,
      action_size: int,
      hidden_size: int,
  ):

    super(Policy, self).__init__()
    self.fc1 = nn.Linear(state_size, hidden_size)
    self.fc2 = nn.Linear(hidden_size, action_size)

  def forward(
      self,
      state: torch.Tensor,
  ):
    x = self.fc1(state)
    x = F.relu(x)
    x = self.fc2(x)
    x = F.softmax(x, dim=1)
    return x

  def F_pass(
      self,
      state
  ):

    state = torch.from_numpy(state).float().unsqueeze(0).to(device)
    probs = self.forward(state).cpu()
    m = Categorical(probs)
    action = m.sample()
    log_prob = m.log_prob(action)

    return action.item(), log_prob


## Reinforce Algorithm

In [None]:
def collect_experience(
    policy,
    env,
    max_t
):
  saved_log_probs = []
  rewards = []
  state = env.reset()[0]

  for t in range(max_t):

    #state_tensor = torch.FloatTensor(state).unsqueeze(0)
    action, log_prob = policy.F_pass(state)
    saved_log_probs.append(log_prob)

    state, reward, done, _, _ = env.step(action)

    rewards.append(reward)

    if done:
      #print(f"Episode ended at step {t}")
      break

  return saved_log_probs, rewards

def calculate_returns(
    rewards,
    gamma
):

  returns = []
  advantage = 0

  for reward in reversed(rewards):

    advantage = reward + gamma * advantage
    returns.insert(0, advantage)

  return returns

def calculate_policy_loss(
    saved_log_probs,
    returns
):

  policy_losses = []

  for log_prob, return_ in zip(saved_log_probs, returns):

    policy_losses.append(-log_prob * return_)

  return torch.cat(policy_losses).sum()

def reinforce(
    policy,
    optimizer,
    n_training_episodes,
    max_t,
    gamma,
    print_every
):

  scores_deque = deque(maxlen=100)
  scores = []

  progress_bar = trange(1, n_training_episodes + 1, desc="Training", leave=True)

  for i_episode in progress_bar:

    saved_log_probs, rewards = collect_experience(policy, env, max_t)

    scores_deque.append(sum(rewards))
    scores.append(sum(rewards))

    returns = calculate_returns(rewards, gamma)

    eps = np.finfo(np.float32).eps.item()
    returns = torch.tensor(returns)
    returns = (returns - returns.mean()) / (returns.std() + eps)

    policy_loss = calculate_policy_loss(saved_log_probs, returns)

    optimizer.zero_grad()
    policy_loss.backward()
    optimizer.step()

    progress_bar.set_postfix({
                "Avg100": np.mean(scores_deque),
                "Policy Loss": policy_loss.item()
            })

    # if i_episode % print_every == 0:
    #   print('Episode {}\tAverage Score: {:.2f}'.format(i_episode, np.mean(scores_deque)))

  return scores

In [None]:
def evaluate_agent(env, max_steps, n_eval_episodes, policy):
  """
  Evaluate the agent for ``n_eval_episodes`` episodes and returns average reward and std of reward.
  :param env: The evaluation environment
  :param n_eval_episodes: Number of episode to evaluate the agent
  :param policy: The Reinforce agent
  """
  episode_rewards = []
  for episode in range(n_eval_episodes):
    state = env.reset()[0]
    step = 0
    done = False
    total_rewards_ep = 0

    for step in range(max_steps):
      action, _ = policy.F_pass(state)
      new_state, reward, done, info, _ = env.step(action)

      #print(step,reward,done)
      total_rewards_ep += reward

      if done:
        break

      state = new_state
    episode_rewards.append(total_rewards_ep)
  mean_reward = np.mean(episode_rewards)
  std_reward = np.std(episode_rewards)

  return mean_reward, std_reward

## Cartpole

In [None]:
env_id = "CartPole-v1"

env = gym.make(env_id, render_mode="rgb_array")
eval_env = gym.make(env_id)

state_size = env.observation_space.shape[0]
action_size = env.action_space.n

print("Sample observation", env.observation_space.sample())
print("Action Space Sample", env.action_space.sample())

state_size, action_size


Sample observation [-0.5911613   0.872172    0.29185674 -0.52675134]
Action Space Sample 0


(4, np.int64(2))

In [None]:
cartpole_hyperparameters = {
    "h_size": 16,
    "n_training_episodes": 1000,
    "n_evaluation_episodes": 10,
    "max_t": 1000,
    "gamma": 1.0,
    "lr": 1e-2,
    "env_id": env_id,
    "state_space": state_size,
    "action_space": action_size,
}

# Create policy and place it to the device
cartpole_policy = Policy(cartpole_hyperparameters["state_space"], cartpole_hyperparameters["action_space"], cartpole_hyperparameters["h_size"]).to(device)
cartpole_optimizer = optim.Adam(cartpole_policy.parameters(), lr=cartpole_hyperparameters["lr"])

In [None]:
scores = reinforce(cartpole_policy,
                   cartpole_optimizer,
                   cartpole_hyperparameters["n_training_episodes"],
                   cartpole_hyperparameters["max_t"],
                   cartpole_hyperparameters["gamma"],
                   100)

Training: 100%|██████████| 1000/1000 [10:28<00:00,  1.59it/s, Avg100=991, Policy Loss=-16.2]


In [None]:
evaluate_agent(eval_env,
               cartpole_hyperparameters["max_t"],
               cartpole_hyperparameters["n_evaluation_episodes"],
               cartpole_policy)

(np.float64(22.5), np.float64(11.253888216967503))

In [None]:
#with tempfile.TemporaryDirectory() as tmpdirname:
local_directory = Path("/Vids")
video_path =  local_directory / "replay.mp4"
record_video(env, cartpole_policy, video_path, 30)




## PixelCopter

In [None]:
!pip install gym-games pygame



In [None]:
env_id = "Pixelcopter-PLE-v0"

env = gym.make(env_id, render_mode="rgb_array")
eval_env = gym.make(env_id)

state_size = env.observation_space.shape[0]
action_size = env.action_space.n

print("Sample observation", env.observation_space.sample())
print("Action Space Sample", env.action_space.sample())

state_size, action_size

NameNotFound: Environment `Pixelcopter-PLE` doesn't exist.

In [None]:
pixelcopter_hyperparameters = {
    "h_size": 64,
    "n_training_episodes": 50000,
    "n_evaluation_episodes": 10,
    "max_t": 10000,
    "gamma": 0.99,
    "lr": 1e-4,
    "env_id": env_id,
    "state_space": s_size,
    "action_space": a_size,
}

## Upload to HF

In [None]:
notebook_login()

VBox(children=(HTML(value='<center> <img\nsrc=https://huggingface.co/front/assets/huggingface_logo-noborder.sv…

In [None]:
repo_id = "Arjunrao/reinforce-cartpole"
push_to_hub(repo_id,
          cartpole_policy, # The model we want to save
          cartpole_hyperparameters, # Hyperparameters
          eval_env, # Evaluation environment
          video_fps=30
    )



model.pt:   0%|          | 0.00/3.39k [00:00<?, ?B/s]

Your model is pushed to the Hub. You can view your model here: https://huggingface.co/Arjunrao/reinforce-cartpole


In [None]:
import gymnasium as gym
print(gym.envs.registry.keys())

dict_keys(['CartPole-v0', 'CartPole-v1', 'MountainCar-v0', 'MountainCarContinuous-v0', 'Pendulum-v1', 'Acrobot-v1', 'phys2d/CartPole-v0', 'phys2d/CartPole-v1', 'phys2d/Pendulum-v0', 'LunarLander-v3', 'LunarLanderContinuous-v3', 'BipedalWalker-v3', 'BipedalWalkerHardcore-v3', 'CarRacing-v3', 'Blackjack-v1', 'FrozenLake-v1', 'FrozenLake8x8-v1', 'CliffWalking-v1', 'CliffWalkingSlippery-v1', 'Taxi-v3', 'tabular/Blackjack-v0', 'tabular/CliffWalking-v0', 'Reacher-v2', 'Reacher-v4', 'Reacher-v5', 'Pusher-v2', 'Pusher-v4', 'Pusher-v5', 'InvertedPendulum-v2', 'InvertedPendulum-v4', 'InvertedPendulum-v5', 'InvertedDoublePendulum-v2', 'InvertedDoublePendulum-v4', 'InvertedDoublePendulum-v5', 'HalfCheetah-v2', 'HalfCheetah-v3', 'HalfCheetah-v4', 'HalfCheetah-v5', 'Hopper-v2', 'Hopper-v3', 'Hopper-v4', 'Hopper-v5', 'Swimmer-v2', 'Swimmer-v3', 'Swimmer-v4', 'Swimmer-v5', 'Walker2d-v2', 'Walker2d-v3', 'Walker2d-v4', 'Walker2d-v5', 'Ant-v2', 'Ant-v3', 'Ant-v4', 'Ant-v5', 'Humanoid-v2', 'Humanoid-v3', 