In [1]:
from stable_baselines3 import PPO
from Env_1 import robotEnv

from stable_baselines3.common.vec_env import SubprocVecEnv,DummyVecEnv
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.env_checker import check_env

In [2]:
config_path = "/usr/src/data/configuration.yaml"

In [3]:
# Function to generate the environment to stack in a vectorEnv
def make_env(path):
    def _init():
        env = Monitor(robotEnv(config_file=path))
        return env
    return _init

In [4]:
num_cpu = 2 # Number of processes/Env to use
env = SubprocVecEnv([make_env(config_path) for i in range(1,num_cpu+ 1)]) # Create the vectorEnv (ROS_ID start at 1)



In [10]:
from gymnasium import spaces
import torch as th
import torch.nn as nn
from stable_baselines3.common.torch_layers import BaseFeaturesExtractor

class CustomCNN(BaseFeaturesExtractor):
    def __init__(self, observation_space: spaces.Box, features_dim: int = 256):
        super().__init__(observation_space, features_dim)

        n_input_channels = observation_space.shape[0]
        ks = 3

        self.cnn = nn.Sequential(
            nn.Conv2d(n_input_channels, 16, kernel_size=ks, stride=2, padding=1),
            nn.LeakyReLU(),
            nn.Conv2d(16, 32, kernel_size=ks, stride=2, padding=1),
            nn.LeakyReLU(),
            nn.Conv2d(32, 64, kernel_size=ks, stride=2, padding=1),
            nn.LeakyReLU(),
            nn.Conv2d(64, 64, kernel_size=ks, stride=3, padding=1),
            nn.LeakyReLU(),
            nn.Flatten(),
        )

        # Dynamically calculate CNN output size
        with th.no_grad():
            dummy_input = th.zeros(1, *observation_space.shape)
            flat_output = self.cnn(dummy_input)
            cnn_output_dim = flat_output.shape[1]
            #print(f"Raw CNN output dim: {cnn_output_dim}")

        # Final projection to fixed feature dim
        self.linear = nn.Sequential(
            nn.Linear(cnn_output_dim, features_dim),
            nn.ReLU()
        )

        self._features_dim = features_dim

    def forward(self, observations: th.Tensor) -> th.Tensor:
        features = self.cnn(observations)
        return self.linear(features)


pi = [256,256,128]
vf = [256,256,128]

features_dim = 256
optimizer_kwargs= dict (weight_decay=1e-5,)


policy_kwargs = dict(normalize_images=False,
                     features_extractor_class=CustomCNN,
                     features_extractor_kwargs=dict(features_dim=features_dim),
                     net_arch=dict(pi=pi, vf=vf),
                     optimizer_kwargs=optimizer_kwargs
                     )


In [11]:
# n_steps=2

# policy_kwargs=dict(normalize_images=False)

# model = PPO(
#     "CnnPolicy",
#     env,
#     n_steps=n_steps,
#     batch_size=n_steps*num_cpu, #Required for VectorEnv
#     n_epochs=8,    
#     learning_rate=0.001, 
#     clip_range=0.3,
#     tensorboard_log= ".",
#     policy_kwargs=policy_kwargs,
#     verbose=2
# )


n_steps=3

model = PPO(
    "CnnPolicy",
    env,
    n_steps=n_steps,
    batch_size=n_steps*num_cpu,
    n_epochs=20,
    learning_rate=0.003, 
    clip_range=0.3,
    #gamma=0.95,
    ent_coef=0.01, 
    #vf_coef=0.5,
    #max_grad_norm=.5,
    verbose=0,
    seed=123,
    tensorboard_log= ".",
    policy_kwargs=policy_kwargs,
    
)

In [None]:
model.learn(total_timesteps=10000,reset_num_timesteps=False,progress_bar=True)