# Hybrid Proximal Policy Optimization

## Guide d'utilisation

**/!\ ATTENTION /!\**

Ce Google colab ne fonctionne pas dès la première exécution.

Veuillez suivre les étapes suivantes pour le faire fonctionner correctement:
1. Cliquer sur `Runtime` puis `Run all`.
2. Une erreur va être générée.
3. Cliquer sur `Runtime` puis `Restart session`
4. A présent vous pouvez utiliser le jupyter notebook.

## Imports et installations


In [1]:
import gym

In [2]:
if gym.__version__ != "0.10.5":
  !pip install gym==0.10.5
  raise Exception("Le Google Colab doit être rééxécuté, comme indiqué à l'étape 3 du guide d'utilisation.")

In [3]:
from google.colab import drive

# Drive connection
drive.mount('/content/drive', force_remount=True)

# Path when the files are in a shared drive
path = '"/content/drive/Shareddrives/ING3 IA & applications/"'

# Move in directories
%cd $path

Mounted at /content/drive
/content/drive/Shareddrives/ING3 IA & applications


In [4]:
!pip install -e code/gym-goal

Obtaining file:///content/drive/Shareddrives/ING3%20IA%20%26%20applications/code/gym-goal
  Preparing metadata (setup.py) ... [?25l[?25hdone
Installing collected packages: gym-goal
  Attempting uninstall: gym-goal
    Found existing installation: gym-goal 0.0.1
    Uninstalling gym-goal-0.0.1:
      Successfully uninstalled gym-goal-0.0.1
  Running setup.py develop for gym-goal
Successfully installed gym-goal-0.0.1


In [5]:
import gym_goal
from matplotlib import pyplot as plt
import numpy as np

from datetime import datetime as dt

import torch
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f"Exécution sur {device}")

Exécution sur cpu


## Environnement

Environnement dans lequel l'agent évolue : [Robot Soccer Goal](https://github.com/cycraig/gym-goal)


In [6]:
env = gym.make('Goal-v0')

pygame 2.5.2 (SDL 2.28.2, Python 3.10.12)
Hello from the pygame community. https://www.pygame.org/contribute.html


  result = entry_point.load(False)


## Modèle

In [7]:
# Simple Neural Network used by default in PPO implementation
class LinearNN(torch.nn.Module):
  """
  Simple linear neural network for regression
  """
  def __init__(self, input_dim, output_dim):
    """
    Create a LinearNN object.

    #### Parameters:
    :input_dim: positive integer, dimension of the input data
    :output_dim: positive integer, dimension of the output data

    #### Return:
    None
    """
    super(LinearNN, self).__init__()
    self.model = torch.nn.Sequential(
          torch.nn.Linear(input_dim,64),
          torch.nn.ReLU(),
          torch.nn.Linear(64,64),
          torch.nn.ReLU(),
          torch.nn.Linear(64,output_dim)
        )

  def forward(self, x):
    """
    Runs a forward pass throw the model.

    #### Parameters:
    :x: input data

    #### Return:
    Data computed by the model
    """
    if isinstance(x, np.ndarray):
      x = torch.tensor(x, dtype=torch.float)
    return self.model(x)


class LinearClassifier(torch.nn.Module):
  """
  Simple linear neural network for classification
  """
  def __init__(self, input_dim, output_dim):
    """
    Create a LinearNN object.

    #### Parameters:
    :input_dim: positive integer, dimension of the input data
    :output_dim: positive integer, dimension of the output data

    #### Return:
    None
    """
    super(LinearClassifier, self).__init__()
    self.model = torch.nn.Sequential(
          torch.nn.Linear(input_dim,64),
          torch.nn.ReLU(),
          torch.nn.Linear(64,64),
          torch.nn.ReLU(),
          torch.nn.Linear(64,output_dim),
          torch.nn.Softmax(dim=0)
        )

  def forward(self, x):
    """
    Runs a forward pass throw the model.

    #### Parameters:
    :x: input data

    #### Return:
    Data computed by the model
    """
    if isinstance(x, np.ndarray):
      x = torch.tensor(x, dtype=torch.float)
    return self.model(x)



def flatten(args):
    try:
        iter(args)
        final = []
        for arg in args:
            final += flatten(arg)
        return tuple(final)
    except TypeError:
        return (args, )




# H-PPO implementation
class HPPO:
  """
  Class to create Hybrid Proximal Policy Optimization models for Deep Reinforcement Learning

  It is only adapted to run on Robot Soccer Goal environnement : https://github.com/cycraig/gym-goal
  """

  # TODO: rewrite documentation

  # Private methods, name starting with '__'

  def __init__(self,
              env,
              actor_classifier=None,
              actor_regression=None,
              critic=None,
              path_to_actor_classifier="",
              path_to_actor_regression="",
              path_to_critic="",
              actor_classifier_optim=torch.optim.Adam,
              actor_regression_optim=torch.optim.Adam,
              critic_optim=torch.optim.Adam,
              actor_classifier_hyperparams={"lr":0.005},
              actor_regression_hyperparams={"lr":0.005},
              critic_hyperparams={"lr":0.005},
              gamma=0.95
  ):
    """
    H-PPO generator for a given environment.

    #### Parameters:
    :env: gym environment where the model will evolve
    :actor_classifier: Neural Network class implementing torch.nn.Module . By default create a simple neural network if there is no path for saved model.
    :actor_regression: Neural Network class implementing torch.nn.Module . By default create a simple neural network if there is no path for saved model.
    :critic: Neural Network class implementing torch.nn.Module . By default create a simple neural network if there is no path for saved model.
    :path_to_actor_classifier: string, path to a saved actor classifier model. By default use the given `actor_classifier` model.
    :path_to_actor_regression: string, path to a saved actor regression model. By default use the given `actor_regression` model.
    :path_to_critic: string, path to a saved critic model. By default use the given `critic` model.
    :actor_optim_classifier: torch.optim.Optimizer object, optimizer for the actor classifier model
    :actor_optim_regression: torch.optim.Optimizer object, optimizer for the actor regression model
    :critic_optim: torch.optim.Optimizer object, optimizer for the critic model
    :actor_classifier_hyperparams: dictionnary of the actor's hyperparameters
    :actor_regression_hyperparams: dictionnary of the actor's hyperparameters
    :critic_hyperparams: dictionnary of the critic's hyperparameters

    #### Return:
    None
    """
    # Extract the environment
    self.env = env
    observation_space = len(flatten(env.reset()))
    regression_space = len(flatten(env.action_space.sample()[1]))

    # Build the actor and critic models
    if actor_classifier:
      self.actor_classifier = actor_classifier(
          observation_space,
          3
      )
    else:
      try:
        self.actor_classifier = torch.load(path_to_actor_classifier)
      except:
        self.actor_classifier = LinearClassifier(
            observation_space,
            3
        )
    if actor_regression:
      self.actor_regression = actor_regression(
          observation_space,
          regression_space
      )
    else:
      try:
        self.actor_regression = torch.load(path_to_actor_regression)
      except:
        self.actor_regression = LinearNN(
            observation_space,
            regression_space
        )
    if critic:
      self.critic = critic(
          observation_space,
          1
      )
    else:
      try:
        self.critic = torch.load(path_to_critic)
      except:
        self.critic = LinearNN(
            observation_space,
            1
        )

    # Create optimizers for the neural networks
    self.actor_regression_optim = actor_regression_optim(self.actor_regression.parameters(), **actor_regression_hyperparams)
    self.actor_classifier_optim = actor_classifier_optim(self.actor_classifier.parameters(), **actor_classifier_hyperparams)
    self.critic_optim = critic_optim(self.critic.parameters(), **critic_hyperparams)

    # Initialize the covariance matrix used to query the actor for actions
    self.cov_mat_classifier = torch.diag(
      torch.full(
          size=(3,),
          fill_value=0.5
      )
    )
    self.cov_mat_regression = torch.diag(
      torch.full(
          size=(regression_space,),
          fill_value=0.5
      )
    )

    self.gamma = gamma


  def __random_action(self, obs):
    """
    Compute a random action based on a prediction made by the actor model.

    #### Parameters:
    :obs: observation of the current environment state

    #### Return:
    A tupple of:
    - the continuous action to take, as a numpy array
    - the log probability of the selected continuous action in the distribution
    - the discontinuous action to take, as a numpy array
    - the log probability of the selected discontinuous action in the distribution
    """
    obs=torch.tensor(flatten(obs), dtype=torch.float32)

    # Regression
    # Create a distribution of actions based on the actor prediction and the PPO covariance matrix
    dist = torch.distributions.MultivariateNormal(self.actor_regression(obs), self.cov_mat_regression)
    # Pick a random action from the distribution
    action_regression = dist.sample()
    # Calculate the log probability for that action
    log_prob_regression = dist.log_prob(action_regression)

    # Classification
    # Create a distribution of actions based on the actor prediction and the PPO covariance matrix
    dist = torch.distributions.MultivariateNormal(
        self.actor_classifier(obs),
        self.cov_mat_classifier)
    # Pick a random action from the distribution
    action_classifier = dist.sample()
    # Calculate the log probability for that action
    log_prob_classifier = dist.log_prob(action_classifier)
    return action_regression.detach().numpy(), log_prob_regression.detach(), action_classifier.detach().numpy(), log_prob_classifier.detach()


  def batch_exploration(self, batch_size=5, max_actions=1000, verbose=0):
    """
    Compute several simulation then group them in a batch.

    #### Parameters:
    :batch_size: positive integer, number of simulation to run in a batch
    :max_actions: positive integer, maximum of actions per simulation
    :verbose: integer, display option: if superior than 0 then print the simulations' number

    #### Return:
    A tupple of tensors:
    - batch_obs, the environment's states
    - batch_acts_classifier, the discontinuous actions made by the H-PPO
    - batch_log_probs_classifier, the probability of each discontinuous action
    - batch_acts_regression, the continuous actions made by the H-PPO
    - batch_log_probs_regression, the probability of each continuous action
    - batch_rtgs, the rewards-to-go
    """
    batch_obs = []
    batch_acts_regression = []
    batch_acts_classifier = []
    batch_log_probs_regression = []
    batch_log_probs_classifier = []
    batch_rews = []
    batch_rtgs = []

    for batch in range(batch_size):
      if verbose > 0:
        print("Simulation ", batch+1,"/",batch_size)

      rews_simu = []
      obs = self.env.reset()
      end_simu = False
      nb_actions = 0

      # Run a simulation
      while (nb_actions < max_actions) and not(end_simu):
        batch_obs.append(flatten(obs))
        action_regression, log_prob_regression, action_classifier, log_prob_classifier = self.__random_action(obs)
        action = (np.argmax(action_classifier), (action_regression[0:2], action_regression[2], action_regression[3]))
        obs, rew, end_simu, _ = self.env.step(action)
        rews_simu.append(rew)
        batch_acts_regression.append(action_regression)
        batch_log_probs_regression.append(log_prob_regression)
        batch_acts_classifier.append(action_classifier)
        batch_log_probs_classifier.append(log_prob_classifier)
        nb_actions += 1

      # # Compute rewards to go
      # discounted_reward = 0
      # index_rtgs = len(batch_rews)
      # for rew in reversed(rews_simu):
      #   discounted_reward = rew + discounted_reward * self.gamma
      #   batch_rtgs.insert(index_rtgs, discounted_reward)

      batch_rews.append(rews_simu)

    # Compute rewards to go
    batch_rtgs = []
    for ep_rews in reversed(batch_rews):
      discounted_reward = 0
      for rew in reversed(ep_rews):
        discounted_reward = rew + discounted_reward * self.gamma
        batch_rtgs.insert(0, discounted_reward)

    # Reshape data as tensors
    batch_obs = torch.tensor(batch_obs, dtype=torch.float)
    batch_acts_classifier = torch.tensor(batch_acts_classifier, dtype=torch.float)
    batch_log_probs_classifier = torch.tensor(batch_log_probs_classifier, dtype=torch.float)
    batch_acts_regression = torch.tensor(batch_acts_regression, dtype=torch.float)
    batch_log_probs_regression = torch.tensor(batch_log_probs_regression, dtype=torch.float)
    batch_rtgs = torch.tensor(batch_rtgs, dtype=torch.float)
    return batch_obs, batch_acts_classifier, batch_log_probs_classifier, batch_acts_regression, batch_log_probs_regression, batch_rtgs, batch_rews


  # Public methods

  def fit(self, explorations=3, batch_size=3, max_actions=1600, epochs=5, clip=0.2, saving_path="", verbose=0):
    """
    Train the model.

    #### Parameters:
    :exploration: positive integer, number of times that the model will explore the environment
    :batch_size: positive integer, size of the batch used to train the actor and critic networks
    :max_actions: positive integer, maximum of actions per exploration
    :epochs: positive integer corresponding to the number of epochs to train the actor and critic networks
    :clip: float, clip value for advantage during loss computing
    :saving_path: string, path to the saved file of the actor and critic models and a report of the training
    :verbose: positive integer, display option

    #### Return:
    Dictionnary reporting the training results
    """
    report_data = {
        "ppo" : {
            "environment" : str(self.env.spec),
            "gamma" : float(self.gamma)
        },
        "actor_regression_model" : {
            "file" : "",
            "optimizer" : str(self.actor_regression_optim),
            "hyperparameters" : str(dict(self.actor_regression_optim.state_dict()))
        },
        "actor_classifier_model" : {
            "file" : "",
            "optimizer" : str(self.actor_classifier_optim),
            "hyperparameters" : str(dict(self.actor_classifier_optim.state_dict()))
        },
        "critic_model" : {
            "file" : "",
            "optimizer" : str(self.critic_optim),
            "hyperparameters" : str(dict(self.critic_optim.state_dict()))
        },
        "fit_hyperparameters": {
          "explorations" : explorations,
          "batch_size" : batch_size,
          "max_actions": max_actions,
          "epochs" : epochs,
          "clip" : float(clip)
        },
        "results" : {
          "computing_time_seconds" : int(dt.timestamp(dt.now())),
          "timesteps" : 0,
          "batch_lens" : [],
          "avg_batch_rewards" : [],
          "avg_actor_classifier_losses" : [],
          "avg_actor_regression_losses" : [],
          "avg_critic_losses" : []
        }
    }

    for k in range(explorations):
      if verbose > 0:
        print("-----------------------------------------------------")
        print("- Exploration ",k+1,"/",explorations,":")

      batch_obs, batch_acts_classifier, batch_log_probs_classifier, batch_acts_regression, batch_log_probs_regression, batch_rtgs, batch_rews = self.batch_exploration(
          batch_size=batch_size,
          max_actions=max_actions,
          verbose=verbose-1
      )

      report_data["results"]["timesteps"] += len(batch_obs)
      report_data["results"]["batch_lens"].append(int(len(batch_obs)))
      report_data["results"]["avg_batch_rewards"].append(float(np.mean(flatten(batch_rews))))

      V = self.critic(batch_obs).squeeze()
      A_k = batch_rtgs - V.detach()

      # Advantages normalization
      A_k = (A_k - A_k.mean()) / (A_k.std() + 1e-10)

      actor_classifier_losses = []
      actor_regression_losses = []
      critic_losses = []
      for t in range(epochs):
        if verbose > 2:
          print("Epoch ",t+1,"/",epochs)

        V = self.critic(batch_obs).squeeze()
        log_pi_t_classifier = torch.distributions.MultivariateNormal(self.actor_classifier(batch_obs), self.cov_mat_classifier).log_prob(batch_acts_classifier)
        log_pi_t_regression = torch.distributions.MultivariateNormal(self.actor_regression(batch_obs), self.cov_mat_regression).log_prob(batch_acts_regression)

        # Calculate losses
        pi_ratio_classifier = torch.exp(log_pi_t_classifier - batch_log_probs_classifier)
        pi_ratio_regression = torch.exp(log_pi_t_regression - batch_log_probs_regression)
        surrogate_losses_frac_pi_classifier =  pi_ratio_classifier * A_k
        surrogate_losses_frac_pi_regression =  pi_ratio_regression * A_k
        surrogate_losses_g_classifier = torch.clamp(pi_ratio_classifier, 1 - clip, 1 + clip) * A_k
        surrogate_losses_g_regression = torch.clamp(pi_ratio_regression, 1 - clip, 1 + clip) * A_k
        actor_loss_classifier = (-torch.min(surrogate_losses_frac_pi_classifier, surrogate_losses_g_classifier)).mean()
        actor_loss_regression = (-torch.min(surrogate_losses_frac_pi_regression, surrogate_losses_g_regression)).mean()
        critic_loss = torch.nn.MSELoss()(V, batch_rtgs)

        # Updating neural networks
        self.actor_classifier_optim.zero_grad()
        self.actor_regression_optim.zero_grad()
        actor_loss_classifier.backward(retain_graph=True)
        actor_loss_regression.backward(retain_graph=True)
        self.actor_classifier_optim.step()
        self.actor_regression_optim.step()
        self.critic_optim.zero_grad()
        critic_loss.backward()
        self.critic_optim.step()

        actor_classifier_losses.append(actor_loss_classifier.detach())
        actor_regression_losses.append(actor_loss_regression.detach())
        critic_losses.append(critic_loss.detach())

      report_data["results"]["avg_actor_regression_losses"].append(float(np.mean(actor_regression_losses)))
      report_data["results"]["avg_actor_classifier_losses"].append(float(np.mean(actor_classifier_losses)))
      report_data["results"]["avg_critic_losses"].append(float(np.mean(critic_losses)))

    if verbose > 0:
      print("-----------------------------------------------------\n")

    report_data["results"]["computing_time_seconds"] = int(dt.timestamp(dt.now())) - report_data["results"]["computing_time_seconds"]

    if saving_path:
      saving_timestamp = int(dt.timestamp(dt.now()))
      self.save(
          saving_path,
          actor_regression_name="actor_regression_model_"+str(saving_timestamp),
          actor_classifier_name="actor_classifier_model_"+str(saving_timestamp),
          critic_name="critic_model_"+str(saving_timestamp)
      )
      report_data["actor_regression_model"]["file"] = "actor_regression_model_"+str(saving_timestamp)+".pth"
      report_data["actor_classifier_model"]["file"] = "actor_classifier_model_"+str(saving_timestamp)+".pth"
      report_data["critic_model"]["file"] = "critic_model_"+str(saving_timestamp)+".pth"
      self.report(report_data, saving_path, report_name="report_"+str(saving_timestamp))

    return report_data["results"]


  def predict(self, obs):
    """
    Predict an action based on the given observation.

    #### Parameters:
    :obs: observation of the gym environment

    #### Returns:
    gym action
    """
    obs=torch.tensor(flatten(obs), dtype=torch.float32)
    action_regression = self.actor_regression(obs)
    action_classifier = self.actor_classifier(obs)
    action = (
        action_classifier.detach().numpy().argmax(),
        (
            action_regression[0:2].detach().numpy(),
            action_regression[2].detach().numpy(),
            action_regression[3].detach().numpy()
        )
    )
    return action


  def evaluate(self, max_actions=1000, win_limit=1, render=None, games_limit=100):
    """
    Evaluate the PPO model's performance.

    #### Parameters:
    :max_actions: positive integer, number of actions allowed in the environment
    :win_limit: positive integer, limit of times that the model is allowed to win the ultimate reward of the environment
    :render: render_mode for gym environment, choose a render_mode
    :games_limit: positive integer, maximum of playable games during the evaluation

    #### Return:
    Tupple containing:
    - the sum of the rewards
    - the number of done actions during the evaluation
    - the number of times that the model won the ultimate reward of the environment
    - the render of the environment
    """
    nb_actions = 0
    won_games = 0
    total_rewards = 0
    games = 0
    obs = self.env.reset()
    while (nb_actions < max_actions) and (won_games < win_limit) and (games < games_limit):
      action = self.predict(obs)
      obs, rew, end, _ = self.env.step(action)
      total_rewards += rew
      if rew == 50:
        won_games += 1
      games += int(end)
      nb_actions += 1

    return total_rewards, nb_actions, won_games, self.env.render()


  def report(
      self,
      data,
      path,
      report_name="report_"+str(int(dt.timestamp(dt.now()))),
      verbose=0
  ):
    """
    Save a json report file.

    #### Parameters:
    :data: dictionnary of the data to save
    :path: string, path to the folder where the file will be saved
    :report_name: string, name of the report
    :verbose: integer, display option

    #### Return:
    None
    """
    import json

    report_path = path+"/"+report_name
    with open(report_path+".json", "w") as outfile:
      json.dump(data, outfile)


  def save(
      self,
      path,
      actor_regression_name="actor_regression_model_"+str(int(dt.timestamp(dt.now()))),
      actor_classifier_name="actor_classifier_model_"+str(int(dt.timestamp(dt.now()))),
      critic_name="critic_model_"+str(int(dt.timestamp(dt.now())))
  ):
    """
    Save the actor and critic models in files.

    #### Parameters:
    :path: string, path where the files will be created
    :actor_regression_name: string, file's name where the regressor actor model will be saved
    :actor_classifier_name: string, file's name where the classifier actor model will be saved
    :critic_name: string, file's name where the critic model will be saved

    #### Return:
    None
    """
    torch.save(self.actor_regression, path+"/"+actor_regression_name+".pth")
    torch.save(self.actor_classifier, path+"/"+actor_classifier_name+".pth")
    torch.save(self.critic, path+"/"+critic_name+".pth")




### Génération

Génération d'un nouveau modèle ou import d'une sauvegarde

In [8]:
# @markdown Vous pouvez importer un modèle pré-entraîné du drive ou en générer un nouveau.

# @markdown Les modèles pré-entraînés se situent dans le répertoire `code/models` du drive partagé. Ils portent des noms ayant le format : `NAME_model_TIMESTAMP.pth`.
# @markdown

# @markdown - Remplir le champs ci-dessous avec le `TIMESTAMP` des modèles que vous souhaitez importer.
model_timestamp = 1706287566 # @param {type:"integer"}
# @markdown - Cocher cette case si vous préférez générer un nouveau modèle non entraîné.
new_model = False # @param {type:"boolean"}


In [9]:
if new_model:
  # Create a new model
  model = HPPO(env)
else:
  # Import an existing one
  model = HPPO(
      env,
      path_to_actor_classifier="code/models/actor_classifier_model_"+str(model_timestamp)+".pth",
      path_to_actor_regression="code/models/actor_regression_model_"+str(model_timestamp)+".pth",
      path_to_critic="code/models/critic_model_"+str(model_timestamp)+".pth"
  )

### Entraînement

Entraînement et sauvegarde du modèle

In [10]:
# @markdown - Nombre de entraînements à effectuer :
explorations = 2 # @param {type:"integer"}
# @markdown - Nombre de parties à jouer pour un seul entraînement :
batch_size = 2 # @param {type:"integer"}
# @markdown - Nombre d'époques d'apprentissage des réseaux de neurones par entraînement :
epochs = 5 # @param {type:"integer"}
# @markdown - Cocher cette case si vous souhaitez que le modèle entraîné soit sauvegardé sur le drive.
save = False # @param {type:"boolean"}

In [11]:
if save:
  report = model.fit(
      explorations=explorations,
      max_actions=100,
      epochs=epochs,
      batch_size=batch_size,
      saving_path="code/models",
      verbose=4
  )
else:
  report = model.fit(
      explorations=explorations,
      max_actions=100,
      epochs=epochs,
      batch_size=batch_size,
      saving_path="code/models",
      verbose=4
  )

-----------------------------------------------------
- Exploration  1 / 2 :
Simulation  1 / 2
Simulation  2 / 2
Epoch  1 / 5
Epoch  2 / 5
Epoch  3 / 5
Epoch  4 / 5
Epoch  5 / 5
-----------------------------------------------------
- Exploration  2 / 2 :
Simulation  1 / 2
Simulation  2 / 2
Epoch  1 / 5
Epoch  2 / 5
Epoch  3 / 5
Epoch  4 / 5
Epoch  5 / 5
-----------------------------------------------------



  batch_acts_classifier = torch.tensor(batch_acts_classifier, dtype=torch.float)


### Evaluation

Evaluation du modèle

In [12]:
rews, acts, wins, rend = model.evaluate(max_actions=1000, win_limit=100, games_limit=1000)
print("Nombre d'actions effectuées pendant toutes les parties de test :", acts)
print("Total de toutes les récompenses :",rews)
print("Nombre de parties gagnées :", wins)
print("Récompense moyenne par action :", rews/acts)

Nombre d'actions effectuées pendant toutes les parties de test : 107
Total de toutes les récompenses : 4967.349821940147
Nombre de parties gagnées : 100
Récompense moyenne par action : 46.42383011159016


## Illustration

Aucune partie ne peut être illustrée sur Google Colab. Vous pouvez trouver une vidéo d'une partie de notre H-PPO entraîné dans le dossier `videos` du drive partagé.