In [10]:
import os
import yaml
import torch
from skrl.agents.torch.ppo import PPO
from skrl.models.torch import GaussianMixin, Model
from skrl.resources.preprocessors.torch import RunningStandardScaler
from skrl.trainers.torch import SequentialTrainer
from skrl.envs.torch import wrap_env
from torch import nn

In [11]:
observation_space: int = 12
action_space: int = 2
action_scale: float = 1.0
network_path = "/home/federico/isaaclab/IsaacLab/logs/skrl/jetbot_direct_ppo/2025-04-05_17-46-30_ppo_torch/checkpoints/best_agent.pt"
cfg_path = "/home/federico/isaaclab/IsaacLab/source/isaaclab_tasks/isaaclab_tasks/direct/jetbot/agents/skrl_ppo_lagrangian_cfg.yaml"
with open(cfg_path, 'r') as f:
    cfg_dict = yaml.safe_load(f)
    models_cfg = cfg_dict['models']
    agent_cfg = cfg_dict['agent']
    memory_cfg = cfg_dict['memory']

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

policy_network = models_cfg['policy']['network'][0]
clip_actions = models_cfg['policy']['clip_actions']
clip_log_std = models_cfg['policy']['clip_log_std']
min_log_std = models_cfg['policy']['min_log_std']
max_log_std = models_cfg['policy']['max_log_std']
initial_log_std = models_cfg['policy']['initial_log_std']

In [17]:
# Define the model
class Policy(GaussianMixin, Model):
    def __init__(self, observation_space, action_space, device, clip_actions=False,
                 clip_log_std=True, min_log_std=-20, max_log_std=2):
        Model.__init__(self, observation_space, action_space, device)
        GaussianMixin.__init__(self, clip_actions, clip_log_std, min_log_std, max_log_std)

        self.net = nn.Sequential()
        policy_layers = []
        input_dim = observation_space
        for layer_size in policy_network['layers']:
            policy_layers.append(nn.Linear(input_dim, layer_size))
            if policy_network['activations'] == 'relu' or policy_network['activations'] == 'Relu' or policy_network['activations'] == 'ReLU' or policy_network['activations'] == 'RELU':
                policy_layers.append(nn.ReLU())
            else:
                print("ERROR: Unsupported activation function. Please define it.")
            # policy_layers.append(getattr(nn, policy_network_config['activations'])())
            input_dim = layer_size
        policy_layers.append(nn.Linear(input_dim, action_space)) # Output layer for mean

        self.net = nn.Sequential(*policy_layers)
        self.log_std_parameters = nn.Parameter(torch.zeros(action_space))

        


    def compute(self, inputs, role):
        return self.net(inputs["states"]), self.log_std_parameters, {}

# Instantiate the model
policy = Policy(observation_space, action_space, device, clip_actions=clip_actions)


In [None]:
# extract the weights from the checkpoint
checkpoint = torch.load(network_path, map_location=device)

if 'policy' in checkpoint:
    checkpoint_policy_state_dict = checkpoint['policy']
elif 'model' in checkpoint and 'policy' in checkpoint['model']:
     # Sometimes it's nested like in RL-games checkpoints
    checkpoint_policy_state_dict = checkpoint['model']['policy']
else:
    raise KeyError("Could not find 'policy' state_dict in the checkpoint.")

print("Keys available in the checkpoint's policy state_dict:", checkpoint_policy_state_dict.keys())
print("Keys expected by the target model's state_dict:", policy.state_dict().keys())

new_state_dict = {}
key_mapping = {
    "log_std_parameter": "log_std_parameters",
    "net_container.0.weight": "net.0.weight",
    "net_container.0.bias": "net.0.bias",
    "net_container.2.weight": "net.2.weight",
    "net_container.2.bias": "net.2.bias",
    # Assuming 'policy_layer' in the checkpoint corresponds to the *final* linear layer
    # in your Sequential 'net'. Based on the sequential structure [Linear, ReLU, Linear, ReLU, Linear]
    # the indices are 0, 1, 2, 3, 4. The last linear layer is at index 4.
    "policy_layer.weight": "net.4.weight",
    "policy_layer.bias": "net.4.bias"
}

for k, v in checkpoint_policy_state_dict.items():
    # Check if this checkpoint key is in our mapping
    if k in key_mapping:
        target_key = key_mapping[k]
        # Check if the target key actually exists in the current model
        if target_key in policy.state_dict():
             new_state_dict[target_key] = v
             # print(f"Mapped '{k}' to '{target_key}'") # Optional: uncomment for debugging
        else:
             print(f"Warning: Mapped target key '{target_key}' for checkpoint key '{k}' not found in the target model. Skipping.")
    # Optionally, handle keys you explicitly want to ignore (like value_layer)
    elif k.startswith("value_layer."):
         print(f"Ignoring value layer key: {k}")
         pass
    # Optional: Warn about any keys in the checkpoint['policy'] that weren't mapped or ignored
    else:
        print(f"Warning: Checkpoint key '{k}' is not included in the explicit mapping or ignore list. Skipping.")


# load the weights into the model
policy.load_state_dict(new_state_dict, strict=False)
policy.to(device)
policy.eval()

Keys available in the checkpoint's policy state_dict: odict_keys(['log_std_parameter', 'net_container.0.weight', 'net_container.0.bias', 'net_container.2.weight', 'net_container.2.bias', 'policy_layer.weight', 'policy_layer.bias', 'value_layer.weight', 'value_layer.bias'])
Keys expected by the target model's state_dict: odict_keys(['log_std_parameters', 'net.0.weight', 'net.0.bias', 'net.2.weight', 'net.2.bias', 'net.4.weight', 'net.4.bias'])
Ignoring value layer key: value_layer.weight
Ignoring value layer key: value_layer.bias


  checkpoint = torch.load(network_path, map_location=device)


Policy(
  (net): Sequential(
    (0): Linear(in_features=12, out_features=192, bias=True)
    (1): ReLU()
    (2): Linear(in_features=192, out_features=128, bias=True)
    (3): ReLU()
    (4): Linear(in_features=128, out_features=2, bias=True)
  )
)

In [32]:
observation = torch.rand(observation_space, device=device).unsqueeze(0)  # Add batch dimension

with torch.no_grad():
    # Get distribution parameters (mean, log_std) from the policy
    mean, log_std_parameters, _ = policy.compute({"states": observation}, role="policy")

print("Action:", mean.squeeze(0).cpu().numpy())


Action: [13.799861 12.745668]


In [30]:
# Instantiate the agent
agent = PPO(models=models_cfg,  # models dict
            memory=memory_cfg,  # memory instance, or None if not required
            cfg=agent_cfg,  # configuration dict (preprocessors, learning rate schedulers, etc.)
            observation_space=observation_space,
            action_space=action_space,
            device=device)

agent.load(network_path)  # load the model from the specified path

AttributeError: 'bool' object has no attribute 'to'

In [18]:
observation = torch.randn(observation_space).to(device)  # example observation
action = agent.act(observation, 0, 1)  # get the action from the agent
print("Action:", action)

NameError: name 'agent' is not defined

In [14]:
class NetworkController:
    def __init__(self):
        self.observation_space: int = 12
        self.action_space: int = 2
        self.action_scale: float = 1.0
        network_path = "/home/federico/isaaclab/IsaacLab/logs/skrl/jetbot_direct_ppo/2025-04-05_17-46-30_ppo_torch/final_model.zip"
        cfg_path = "/home/federico/isaaclab/IsaacLab/source/isaaclab_tasks/isaaclab_tasks/direct/jetbot/agents/skrl_ppo_lagrangian_cfg.yaml"
        with open(cfg_path, 'r') as f:
            cfg_dict = yaml.safe_load(f)
            models_cfg = cfg_dict['models']
            agent_cfg = cfg_dict['agent']
            memory_cfg = cfg_dict['memory']

        # Manually instantiate the policy network
        policy_network_config = models_cfg['policy']['network'][0] # Access the first dictionary in the list
        policy_layers = []
        input_dim = self.observation_space
        for layer_size in policy_network_config['layers']:
            policy_layers.append(nn.Linear(input_dim, layer_size))
            if policy_network_config['activations'] == 'relu' or policy_network_config['activations'] == 'Relu' or policy_network_config['activations'] == 'ReLU' or policy_network_config['activations'] == 'RELU':
                policy_layers.append(nn.ReLU())
            else:
                print("ERROR: Unsupported activation function. Please define it.")
            # policy_layers.append(getattr(nn, policy_network_config['activations'])())
            input_dim = layer_size
        policy_layers.append(nn.Linear(input_dim, self.action_space)) # Output layer for mean

        policy_class = models_cfg['policy']['class']
        clip_actions = models_cfg['policy']['clip_actions']
        clip_log_std = models_cfg['policy']['clip_log_std']
        min_log_std = models_cfg['policy']['min_log_std']
        max_log_std = models_cfg['policy']['max_log_std']
        initial_log_std = models_cfg['policy']['initial_log_std']

        class PolicyNetwork(GaussianMixin, Model):
            def __init__(self, observation_space, action_space, clip_actions=True, clip_log_std=False,
                         min_log_std=-20.0, max_log_std=2.0, initial_log_std=0.0, device='cpu'):
                GaussianMixin.__init__(self, clip_actions, clip_log_std, min_log_std, max_log_std)
                Model.__init__(self, observation_space, action_space, device)
                self.net = nn.Sequential(*policy_layers)
                self.mean_layer = nn.Linear(policy_network_config['layers'][-1] if policy_network_config['layers'] else observation_space, action_space)
                self.log_std_parameter = nn.Parameter(torch.full((action_space,), initial_log_std))
                self.device = device
                self.to(self.device)

            def forward(self, states, taken_actions=None, inference=False):
                output = self.net(states)
                mean = self.mean_layer(output)
                log_std = self.log_std_parameter.expand_as(mean)
                distribution = torch.distributions.Normal(mean, log_std.exp())
                return mean, log_std

        device = 'cuda' if torch.cuda.is_available() else 'cpu'
        policy = PolicyNetwork(self.observation_space, self.action_space, clip_actions, clip_log_std,
                               min_log_std, max_log_std, initial_log_std, device)

        # Instantiate the PPO agent with the manually created policy
        self.agent = PPO(models={"policy": policy},
                    memory=memory_cfg,
                    cfg=agent_cfg,
                    observation_space=self.observation_space,
                    action_space=self.action_space,
                    device='cuda' if torch.cuda.is_available() else 'cpu',
        )

        # Load the pre-trained weights
        self.agent.load(network_path)
        self.agent.set_mode('eval')

    def get_action(self, observation):
        """
        Get the action from the agent given an observation.
        :param observation: The observation from the environment.
        :return: The action to take.
        """
        # Convert the observation to a tensor
        observation_tensor = torch.tensor(observation, dtype=torch.float32).unsqueeze(0).to(self.agent.device)
        # Get the action from the agent
        with torch.no_grad():
            # The act method now expects the observation tensor directly
            action, _, _ = self.agent.act(observation_tensor, timestep=0, timesteps=1)
        # Scale the action
        action = action.squeeze(0).cpu().numpy() * self.action_scale
        return action

In [None]:
controller = NetworkController()

observation = torch.zeros(12)
action = controller.get_action(observation)
print("Action:", action)

TypeError: 'str' object is not callable