In [7]:
# from typing import Callable, Dict, List, Optional, Tuple, Type, Union

from typing import Tuple, Callable

import gymnasium as gym
from gymnasium import spaces
# import tensorflow as tf
from torch import nn
import torch as th
import os

from stable_baselines3 import PPO
# from stable_baselines3.common.callbacks import BaseCallback
from stable_baselines3.common.policies import ActorCriticPolicy

# from stable_baselines3.common.monitor import Monitor

In [8]:
# https://stable-baselines3.readthedocs.io/en/master/guide/custom_policy.html

class ResNet(nn.Module):
    def __init__(self, module):
        super().__init__()
        self.module = module

    def forward(self, inputs):
        return self.module(inputs) + inputs
    
class CustomNetwork(nn.Module):
    """
    Custom network for policy and value function.
    It receives as input the features extracted by the features extractor.

    :param feature_dim: dimension of the features extracted with the features_extractor (e.g. features from a CNN)
    :param last_layer_dim_pi: (int) number of units for the last layer of the policy network
    :param last_layer_dim_vf: (int) number of units for the last layer of the value network
    """

    def __init__(
        self,
        feature_dim: int,
        f_layer_dim_pi: int = 36,
        f_layer_dim_vf: int = 36,
        m_layer_dim_pi: int = 36,
        m_layer_dim_vf: int = 36,
        last_layer_dim_pi: int = 64,
        last_layer_dim_vf: int = 64,
    ):
        super().__init__()

        # IMPORTANT:
        # Save output dimensions, used to create the distributions
        self.latent_dim_pi = last_layer_dim_pi
        self.latent_dim_vf = last_layer_dim_vf

        # Policy network
        self.policy_net = nn.Sequential(
            nn.Linear(feature_dim, f_layer_dim_pi), nn.LeakyReLU(),
            ResNet(nn.Sequential(                
                nn.Linear(f_layer_dim_pi, m_layer_dim_pi), nn.ReLU()  ,
                nn.Dropout(0.2),
                nn.BatchNorm1d(m_layer_dim_pi),
                nn.Linear(m_layer_dim_pi, m_layer_dim_pi), nn.ReLU()
            )),
                nn.Linear(m_layer_dim_pi, last_layer_dim_pi), nn.ReLU()
        )
        # Value network
        self.value_net = nn.Sequential(
            nn.Linear(feature_dim, f_layer_dim_vf), nn.LeakyReLU(),
            ResNet(nn.Sequential(                
                nn.Linear(f_layer_dim_vf, m_layer_dim_vf), nn.ReLU()  ,
                nn.Dropout(0.2),
                nn.BatchNorm1d(m_layer_dim_vf),
                nn.Linear(m_layer_dim_vf, m_layer_dim_vf), nn.ReLU()
            )),
                nn.Linear(m_layer_dim_vf, last_layer_dim_vf), nn.ReLU()
        )

    def forward(self, features: th.Tensor) -> Tuple[th.Tensor, th.Tensor]:
        """
        :return: (th.Tensor, th.Tensor) latent_policy, latent_value of the specified network.
            If all layers are shared, then ``latent_policy == latent_value``
        """
        return self.forward_actor(features), self.forward_critic(features)

    def forward_actor(self, features: th.Tensor) -> th.Tensor:
        return self.policy_net(features)

    def forward_critic(self, features: th.Tensor) -> th.Tensor:
        return self.value_net(features)


class CustomActorCriticPolicy(ActorCriticPolicy):
    def __init__(
        self,
        observation_space: spaces.Space,
        action_space: spaces.Space,
        lr_schedule: Callable[[float], float],
        *args,
        **kwargs,
    ):
        # Disable orthogonal initialization
        kwargs["ortho_init"] = False
        super().__init__(
            observation_space,
            action_space,
            lr_schedule,
            # Pass remaining arguments to base class
            *args,
            **kwargs,
        )


    def _build_mlp_extractor(self) -> None:
        self.mlp_extractor = CustomNetwork(self.features_dim)

In [25]:
class TensorboardCallback(BaseCallback):
    """
    Custom callback for plotting additional values in tensorboard.
    """

    def __init__(self, verbose=0):
        super(TensorboardCallback, self).__init__(verbose)

    def _on_step(self) -> bool:                
        self.logger.record('reward', self.training_env.get_attr('total_reward')[0])

        return True

In [9]:
# Create log dir
# log_dir = "tmp/"
# os.makedirs(log_dir, exist_ok=True)

env = gym.make('CartPole-v1')
# env = Monitor(env, log_dir)
model = PPO(CustomActorCriticPolicy, env, verbose=1)



model.learn(5000)

Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 21.9     |
|    ep_rew_mean     | 21.9     |
| time/              |          |
|    fps             | 1271     |
|    iterations      | 1        |
|    time_elapsed    | 1        |
|    total_timesteps | 2048     |
---------------------------------
----------------------------------------
| rollout/                |            |
|    ep_len_mean          | 28.4       |
|    ep_rew_mean          | 28.4       |
| time/                   |            |
|    fps                  | 831        |
|    iterations           | 2          |
|    time_elapsed         | 4          |
|    total_timesteps      | 4096       |
| train/                  |            |
|    approx_kl            | 0.01519261 |
|    clip_fraction        | 0.15       |
|    clip_range           | 0.2        |
|    entropy_loss         | -0.68

<stable_baselines3.ppo.ppo.PPO at 0x2225b9924d0>

In [47]:
vec_env = model.get_env()
obs = vec_env.reset()

R = 0
for i in range(1000):
    action, _state = model.predict(obs, deterministic=True)
    obs, reward, done, info = vec_env.step(action)
    R += reward
print(R)
    # vec_env.render("human")

[1000.]
