In [3]:
import rtde_control
from rtde_control import RTDEControlInterface as RTDEControl
import rtde_receive
import time

# Robot IP address
ROBOT_IP = "147.175.108.138"

try:
    # Initialize RTDE connections
    rtde_c = rtde_control.RTDEControlInterface(ROBOT_IP, RTDEControl.FLAG_USE_EXT_UR_CAP)
    rtde_r = rtde_receive.RTDEReceiveInterface(ROBOT_IP)
    
    print("Connected to UR3e successfully!")
    
    # Example: Get current joint positions
    joint_positions = rtde_r.getActualQ()
    print(f"Current joint positions: {joint_positions}")
    
    # Example: Move robot (be careful!)
    rtde_c.moveJ([-0.7866123358355921, -1.562515858826675, -0.002566894283518195, -1.5586068074083705, 0.00971465464681387, 0.014539957046508789], 0.5, 0.5)
    #rtde_c.moveJ([0.0, -1.712, 1.712, 0.0, 0.0, 0.0], 0.2, 0.2)
except Exception as e:
    print(f"Connection failed: {e}")
finally:
    if 'rtde_c' in locals():
        rtde_c.disconnect()
    if 'rtde_r' in locals():
        rtde_r.disconnect()

Connected to UR3e successfully!
Current joint positions: [0.001344919204711914, -1.7922941646971644, 0.5159648100482386, -1.433665008550026, -0.03783637682069951, -0.00012380281557256012]


In [None]:
import gymnasium as gym
import rtde_control
import rtde_receive
import numpy as np
import torch

from rtde_control import RTDEControlInterface as RTDEControl



class UR3eReach(gym.Env):
    def __init__(self, robot_ip="147.175.108.138"):
        self.rtde_c = rtde_control.RTDEControlInterface(robot_ip, RTDEControl.FLAG_USE_EXT_UR_CAP)
        self.rtde_r = rtde_receive.RTDEReceiveInterface(robot_ip)

        # Observations:  Box(-inf, inf, (25,), float32)
        # Actions:  Box(-1.0, 1.0, (6,), float32)

        self.observation_space = gym.spaces.Box(low=-float("inf"), high=float("inf"), shape=(25,), dtype=float)
        self.action_space = gym.spaces.Box(low=-1.0, high=1.0, shape=(6,), dtype=float)

        self.default_joint_position = [0.0, -1.712, 1.712, 0.0, 0.0, 0.0]

        self.last_actions = torch.zeros((6, 1))

    def _get_observations(self):
        try:
            joint_positions = self.rtde_r.getActualQ() # 6
            joint_velocities = self.rtde_r.getActualQd() # 6

            tool_center = self.rtde_r.getActualTCPPose()

            goal_position = self._generate_goal_position() # 3
            goal_rotation = self._generate_goal_rotation() # 4 - should be quaternion

            #end_effector_pose = self.rtde_r.get_tool_pose()
            print("Joint Positions:", joint_positions)
            #print("End Effector Pose:", end_effector_pose)

            return {
                "joint_positions": joint_positions,
                "joint_velocities": joint_velocities,
                "tool_center": tool_center,
                "goal_position": goal_position,
                "goal_rotation": goal_rotation,
                "last_actions": self.last_actions
            }

        except Exception as e:
            print("Error getting observations:", e)
            return self.default_joint_position + [0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
        
    def _generate_goal_position(self):
        # self.commands.ee_pose.ranges.pos_x = (0.35, 0.4)
        # self.commands.ee_pose.ranges.pos_y = (-0.1, 0.1)
        # self.commands.ee_pose.ranges.pos_z = (0.15, 0.45)

        range_x = (0.35, 0.4)
        range_y = (-0.1, 0.1)
        range_z = (0.15, 0.45)

        goal = [
            np.random.uniform(*range_x),
            np.random.uniform(*range_y),
            np.random.uniform(*range_z),
        ]

        return list(goal)
    
    def _generate_goal_rotation(self):
        # self.commands.ee_pose.ranges.pitch = (math.pi / 2, math.pi / 2)

        goal = [
            0.0,
            np.random.uniform(np.pi / 2, np.pi / 2),
            np.random.uniform(-np.pi, np.pi)
        ]

        return list(goal)



In [6]:
torch.zeros((6, 1))

tensor([[0.],
        [0.],
        [0.],
        [0.],
        [0.],
        [0.]])

In [2]:
env = UR3eReach()

In [4]:
obs = env._get_observations()
obs, len(obs)

Joint Positions: [-0.7929261366473597, -1.5604003363153716, 0.00605947176088506, -1.5565631550601502, 0.00436015147715807, 0.016711734235286713]


([-0.7929261366473597,
  -1.5604003363153716,
  0.00605947176088506,
  -1.5565631550601502,
  0.00436015147715807,
  0.016711734235286713,
  0.0,
  0.0,
  0.0,
  -0.0,
  -0.0,
  0.0,
  -0.27393328901869574,
  -0.25616682315174233,
  0.6937997953184033,
  -0.7835107966469409,
  -1.7507451077993939,
  1.7847540814308716,
  0.38270703473242024,
  -0.021508604535532133,
  0.29455592611941017,
  0.0,
  1.5707963267948966,
  2.2644955995101927],
 24)

In [10]:
env._generate_goal_position()

[0.39202029101843633, 0.038021564112655476, 0.22879126602887667]

In [3]:
import gym
import time
import threading
import numpy as np
from packaging import version

import frankx


class ReachingFranka(gym.Env):
    def __init__(self, robot_ip="172.16.0.2", device="cuda:0", control_space="joint", motion_type="waypoint", camera_tracking=False):
        # gym API
        self._drepecated_api = version.parse(gym.__version__) < version.parse(" 0.25.0")

        self.device = device
        self.control_space = control_space  # joint or cartesian
        self.motion_type = motion_type  # waypoint or impedance

        if self.control_space == "cartesian" and self.motion_type == "impedance":
            # The operation of this mode (Cartesian-impedance) was adjusted later without being able to test it on the real robot.
            # Dangerous movements may occur for the operator and the robot.
            # Comment the following line of code if you want to proceed with this mode.
            raise ValueError("See comment in the code to proceed with this mode")
            pass

        # camera tracking (disabled by default)
        self.camera_tracking = camera_tracking
        if self.camera_tracking:
            threading.Thread(target=self._update_target_from_camera).start()

        # spaces
        self.observation_space = gym.spaces.Box(low=-1000, high=1000, shape=(18,), dtype=np.float32)
        if self.control_space == "joint":
            self.action_space = gym.spaces.Box(low=-1, high=1, shape=(7,), dtype=np.float32)
        elif self.control_space == "cartesian":
            self.action_space = gym.spaces.Box(low=-1, high=1, shape=(3,), dtype=np.float32)
        else:
            raise ValueError("Invalid control space:", self.control_space)

        # init real franka
        print("Connecting to robot at {}...".format(robot_ip))
        self.robot = frankx.Robot(robot_ip)
        self.robot.set_default_behavior()
        self.robot.recover_from_errors()

        # the robot's response can be better managed by independently setting the following properties, for example:
        # - self.robot.velocity_rel = 0.2
        # - self.robot.acceleration_rel = 0.1
        # - self.robot.jerk_rel = 0.01
        self.robot.set_dynamic_rel(0.25)

        self.gripper = self.robot.get_gripper()
        print("Robot connected")

        self.motion = None
        self.motion_thread = None

        self.dt = 1 / 120.0
        self.action_scale = 2.5
        self.dof_vel_scale = 0.1
        self.max_episode_length = 100
        self.robot_dof_speed_scales = 1
        self.target_pos = np.array([0.65, 0.2, 0.2])
        self.robot_default_dof_pos = np.radians([0, -45, 0, -135, 0, 90, 45])
        self.robot_dof_lower_limits = np.array([-2.8973, -1.7628, -2.8973, -3.0718, -2.8973, -0.0175, -2.8973])
        self.robot_dof_upper_limits = np.array([ 2.8973,  1.7628,  2.8973, -0.0698,  2.8973,  3.7525,  2.8973])

        self.progress_buf = 1
        self.obs_buf = np.zeros((18,), dtype=np.float32)

    def _update_target_from_camera(self):
        pixel_to_meter = 1.11 / 375  # m/px: adjust for custom cases

        import cv2
        cap = cv2.VideoCapture(0)
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break

            # convert to HSV and remove noise
            hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
            hsv = cv2.medianBlur(hsv, 15)

            # color matching in HSV
            mask = cv2.inRange(hsv, np.array([80, 100, 100]), np.array([100, 255, 255]))
            M = cv2.moments(mask)
            if M["m00"]:
                x = M["m10"] / M["m00"]
                y = M["m01"] / M["m00"]

                # real-world position (fixed z to 0.2 meters)
                pos = np.array([pixel_to_meter * (y - 185), pixel_to_meter * (x - 320), 0.2])
                if self is not None:
                    self.target_pos = pos

                # draw target
                frame = cv2.circle(frame, (int(x), int(y)), 30, (0,0,255), 2)
                frame = cv2.putText(frame, str(np.round(pos, 4).tolist()), (10, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,0,255), 1, cv2.LINE_AA)

            # show images
            cv2.imshow("frame", frame)
            cv2.imshow("mask", mask)
            k = cv2.waitKey(1) & 0xFF
            if k == ord('q'):
                cap.release()

    def _get_observation_reward_done(self):
        # get robot state
        try:
            robot_state = self.robot.get_state(read_once=True)
        except frankx.InvalidOperationException:
            robot_state = self.robot.get_state(read_once=False)

        # observation
        robot_dof_pos = np.array(robot_state.q)
        robot_dof_vel = np.array(robot_state.dq)
        end_effector_pos = np.array(robot_state.O_T_EE[-4:-1])

        dof_pos_scaled = 2.0 * (robot_dof_pos - self.robot_dof_lower_limits) / (self.robot_dof_upper_limits - self.robot_dof_lower_limits) - 1.0
        dof_vel_scaled = robot_dof_vel * self.dof_vel_scale

        self.obs_buf[0] = self.progress_buf / float(self.max_episode_length)
        self.obs_buf[1:8] = dof_pos_scaled
        self.obs_buf[8:15] = dof_vel_scaled
        self.obs_buf[15:18] = self.target_pos

        # reward
        distance = np.linalg.norm(end_effector_pos - self.target_pos)
        reward = -distance

        # done
        done = self.progress_buf >= self.max_episode_length - 1
        done = done or distance <= 0.075

        print("Distance:", distance)
        if done:
            print("Target or Maximum episode length reached")
            time.sleep(1)

        return self.obs_buf, reward, done

    def reset(self):
        print("Resetting...")

        # end current motion
        if self.motion is not None:
            self.motion.finish()
            self.motion_thread.join()
        self.motion = None
        self.motion_thread = None

        # open/close gripper
        # self.gripper.open()
        # self.gripper.clamp()

        # go to 1) safe position, 2) random position
        self.robot.move(frankx.JointMotion(self.robot_default_dof_pos.tolist()))
        dof_pos = self.robot_default_dof_pos + 0.25 * (np.random.rand(7) - 0.5)
        self.robot.move(frankx.JointMotion(dof_pos.tolist()))

        # get target position from prompt
        if not self.camera_tracking:
            while True:
                try:
                    print("Enter target position (X, Y, Z) in meters")
                    raw = input("or press [Enter] key for a random target position: ")
                    if raw:
                        self.target_pos = np.array([float(p) for p in raw.replace(' ', '').split(',')])
                    else:
                        noise = (2 * np.random.rand(3) - 1) * np.array([0.25, 0.25, 0.10])
                        self.target_pos = np.array([0.5, 0.0, 0.2]) + noise
                    print("Target position:", self.target_pos)
                    break
                except ValueError:
                    print("Invalid input. Try something like: 0.65, 0.0, 0.2")

        # initial pose
        affine = frankx.Affine(frankx.Kinematics.forward(dof_pos.tolist()))
        affine = affine * frankx.Affine(x=0, y=0, z=-0.10335, a=np.pi/2)

        # motion type
        if self.motion_type == "waypoint":
            self.motion = frankx.WaypointMotion([frankx.Waypoint(affine)], return_when_finished=False)
        elif self.motion_type == "impedance":
            self.motion = frankx.ImpedanceMotion(500, 50)
        else:
            raise ValueError("Invalid motion type:", self.motion_type)

        self.motion_thread = self.robot.move_async(self.motion)
        if self.motion_type == "impedance":
            self.motion.target = affine

        input("Press [Enter] to continue")

        self.progress_buf = 0
        observation, reward, done = self._get_observation_reward_done()

        if self._drepecated_api:
            return observation
        else:
            return observation, {}

    def step(self, action):
        self.progress_buf += 1

        # control space
        # joint
        if self.control_space == "joint":
            # get robot state
            try:
                robot_state = self.robot.get_state(read_once=True)
            except frankx.InvalidOperationException:
                robot_state = self.robot.get_state(read_once=False)
            # forward kinematics
            dof_pos = np.array(robot_state.q) + (self.robot_dof_speed_scales * self.dt * action * self.action_scale)
            affine = frankx.Affine(self.robot.forward_kinematics(dof_pos.flatten().tolist()))
            affine = affine * frankx.Affine(x=0, y=0, z=-0.10335, a=np.pi/2)
        # cartesian
        elif self.control_space == "cartesian":
            action /= 100.0
            if self.motion_type == "waypoint":
                affine = frankx.Affine(x=action[0], y=action[1], z=action[2])
            elif self.motion_type == "impedance":
                # get robot pose
                try:
                    robot_pose = self.robot.current_pose(read_once=True)
                except frankx.InvalidOperationException:
                    robot_pose = self.robot.current_pose(read_once=False)
                affine = robot_pose * frankx.Affine(x=action[0], y=action[1], z=action[2])

        # motion type
        # waypoint motion
        if self.motion_type == "waypoint":
            if self.control_space == "joint":
                self.motion.set_next_waypoint(frankx.Waypoint(affine))
            elif self.control_space == "cartesian":
                self.motion.set_next_waypoint(frankx.Waypoint(affine, frankx.Waypoint.Relative))
        # impedance motion
        elif self.motion_type == "impedance":
            self.motion.target = affine
        else:
            raise ValueError("Invalid motion type:", self.motion_type)

        # the use of time.sleep is for simplicity. This does not guarantee control at a specific frequency
        time.sleep(0.1)  # lower frequency, at 30Hz there are discontinuities

        observation, reward, done = self._get_observation_reward_done()

        if self._drepecated_api:
            return observation, reward, done, {}
        else:
            return observation, reward, done, done, {}

    def render(self, *args, **kwargs):
        pass

    def close(self):
        pass

ModuleNotFoundError: No module named 'frankx'

In [4]:
import torch
import numpy as np
import matplotlib.pyplot as plt
from typing import Dict, List

def test_model_loading(model_path: str):
    """Test loading the model and inspect its structure"""
    print("="*60)
    print("STEP 1: Loading and Inspecting Model")
    print("="*60)
    
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")
    
    # Load checkpoint
    print(f"\nLoading model from: {model_path}")
    checkpoint = torch.load(model_path, map_location=device)
    
    # Inspect checkpoint structure
    print("\n--- Checkpoint Structure ---")
    print(f"Checkpoint type: {type(checkpoint)}")
    
    # If checkpoint is OrderedDict, it's likely a state_dict
    if isinstance(checkpoint, dict):
        print(f"Checkpoint keys: {list(checkpoint.keys())[:10]}")  # Show first 10 keys
        
        # Check if it's a state dict by looking at the keys
        sample_keys = list(checkpoint.keys())[:5]
        if any('.' in str(key) for key in sample_keys):
            print("\n✓ This appears to be a state_dict (model weights only)")
            print("Sample keys:", sample_keys)
            
            # We need to instantiate the model architecture
            # Try to infer the architecture from the state dict
            print("\n--- Inferring Model Architecture ---")
            
            # Analyze the state dict to understand the network
            layer_info = {}
            for key, value in checkpoint.items():
                if 'weight' in key:
                    layer_name = key.replace('.weight', '')
                    layer_info[layer_name] = value.shape
                    
            print("Detected layers:")
            for layer, shape in list(layer_info.items())[:10]:
                print(f"  {layer}: {shape}")
            
            # Determine input and output dimensions
            first_layer = next(iter(layer_info.items()))
            last_layer = list(layer_info.items())[-1]
            
            # Common SKRL policy network structure
            if len(layer_info) > 0:
                # Get input dimension (first layer's input size)
                input_dim = first_layer[1][1] if len(first_layer[1]) > 1 else first_layer[1][0]
                # Get output dimension (last layer's output size)
                output_dim = last_layer[1][0]
                
                print(f"\nInferred dimensions:")
                print(f"  Input dim: {input_dim} (should be 25 for your obs space)")
                print(f"  Output dim: {output_dim} (should be 6 for joint actions)")
                
                # Create a simple MLP that matches the state dict
                model = create_mlp_from_state_dict(checkpoint, input_dim, output_dim, device)
                
                if model is not None:
                    print("✓ Successfully created model from state dict")
                else:
                    print("❌ Failed to create model from state dict")
                    return None, device
            else:
                print("❌ Could not determine model architecture from state dict")
                return None, device
                
        elif "agent" in checkpoint:
            print("\nFound 'agent' in checkpoint")
            print("Agent keys:", checkpoint["agent"].keys())
            if "models" in checkpoint["agent"]:
                print("Models available:", checkpoint["agent"]["models"].keys())
                if "policy" in checkpoint["agent"]["models"]:
                    model_data = checkpoint["agent"]["models"]["policy"]
                    print(f"Policy type: {type(model_data)}")
                    
                    # Check if it's a state dict
                    if isinstance(model_data, dict) and any('.' in str(k) for k in model_data.keys()):
                        print("Policy is a state dict, creating model...")
                        # Analyze and create model
                        input_dim = 25  # Your observation space
                        output_dim = 6  # Your action space
                        model = create_mlp_from_state_dict(model_data, input_dim, output_dim, device)
                    else:
                        model = model_data
            else:
                print("❌ Unexpected agent structure")
                return None, device
        else:
            print("❌ Unexpected checkpoint structure")
            return None, device
    else:
        print(f"❌ Unexpected checkpoint type: {type(checkpoint)}")
        return None, device
    
    # Set to eval mode if we have a model
    if model is not None:
        model.eval()
        model.to(device)
        
        # Count parameters
        total_params = sum(p.numel() for p in model.parameters())
        print(f"\nModel loaded successfully!")
        print(f"Total parameters: {total_params:,}")
    
    return model, device


def create_mlp_from_state_dict(state_dict, input_dim, output_dim, device):
    """Create an MLP model that matches the state dict structure"""
    import torch.nn as nn
    
    # Analyze state dict to find layer sizes
    layers = {}
    for key, value in state_dict.items():
        if 'weight' in key:
            # Extract layer number/name
            parts = key.split('.')
            if len(parts) >= 2:
                layer_idx = parts[0] if parts[0].isdigit() else parts[1] if len(parts) > 1 and parts[1].isdigit() else parts[0]
                if 'weight' in key:
                    layers[layer_idx] = value.shape
    
    # Sort layers by name/index
    sorted_layers = sorted(layers.items(), key=lambda x: x[0])
    
    print(f"\nCreating MLP with structure:")
    print(f"  Input: {input_dim}")
    
    # Build the network
    class PolicyNetwork(nn.Module):
        def __init__(self):
            super().__init__()
            self.layers = nn.ModuleList()
            
            prev_size = input_dim
            for i, (name, shape) in enumerate(sorted_layers):
                out_size, in_size = shape
                if i == 0 and in_size != input_dim:
                    print(f"  ⚠️ Warning: First layer expects {in_size} inputs, but obs space is {input_dim}")
                
                print(f"  Layer {name}: {in_size} -> {out_size}")
                self.layers.append(nn.Linear(in_size, out_size))
                prev_size = out_size
            
            # Add activation functions (assuming ReLU for hidden layers)
            self.activation = nn.ReLU()
            self.output_activation = nn.Tanh()  # Common for RL policies
            
        def forward(self, x):
            for i, layer in enumerate(self.layers):
                x = layer(x)
                if i < len(self.layers) - 1:
                    x = self.activation(x)
                else:
                    # Last layer - might use tanh for bounded actions
                    x = self.output_activation(x)
            return x
    
    try:
        model = PolicyNetwork()
        
        # Load the state dict
        model.load_state_dict(state_dict)
        model.to(device)
        
        # Test forward pass
        test_input = torch.randn(1, input_dim).to(device)
        with torch.no_grad():
            test_output = model(test_input)
            print(f"  Output: {output_dim} (shape: {test_output.shape})")
            
        return model
        
    except Exception as e:
        print(f"\n❌ Error creating model: {e}")
        
        # Try a simpler approach - just create a standard MLP
        print("\nTrying standard MLP architecture...")
        class StandardMLP(nn.Module):
            def __init__(self):
                super().__init__()
                self.fc1 = nn.Linear(input_dim, 256)
                self.fc2 = nn.Linear(256, 256)
                self.fc3 = nn.Linear(256, output_dim)
                self.activation = nn.ReLU()
                self.output_activation = nn.Tanh()
                
            def forward(self, x):
                x = self.activation(self.fc1(x))
                x = self.activation(self.fc2(x))
                x = self.output_activation(self.fc3(x))
                return x
        
        try:
            model = StandardMLP()
            # Try to load whatever weights match
            model_dict = model.state_dict()
            pretrained_dict = {k: v for k, v in state_dict.items() if k in model_dict and v.shape == model_dict[k].shape}
            model_dict.update(pretrained_dict)
            model.load_state_dict(model_dict, strict=False)
            model.to(device)
            print("✓ Created standard MLP and loaded matching weights")
            return model
        except Exception as e2:
            print(f"❌ Failed with standard MLP: {e2}")
            return None


def create_mock_observations(batch_size: int = 5):
    """Create mock observations matching your IsaacLab config"""
    print("\n" + "="*60)
    print("STEP 2: Creating Mock Observations")
    print("="*60)
    
    # Based on your ObservationsCfg:
    # - joint_pos (6): relative joint positions
    # - joint_vel (6): relative joint velocities  
    # - pose_command (7): target position (3) + quaternion (4)
    # - actions (6): last action
    # Total: 25 dimensions
    
    observations = []
    descriptions = []
    
    # Default joint positions (from your IsaacLab config)
    default_joints = np.array([0.0, -1.712, 1.712, 0.0, 0.0, 0.0])
    
    # Test case 1: At default position, no velocity, target straight ahead
    obs1 = np.zeros(25)
    obs1[0:6] = 0.0  # At default position (relative = 0)
    obs1[6:12] = 0.0  # No velocity
    obs1[12:15] = [0.3, 0.0, 0.3]  # Target position
    obs1[15:19] = [1.0, 0.0, 0.0, 0.0]  # Target orientation (quaternion w,x,y,z)
    obs1[19:25] = 0.0  # No previous action
    observations.append(obs1)
    descriptions.append("At home position, target ahead")
    
    # Test case 2: Slightly off default, target to the right
    obs2 = np.zeros(25)
    obs2[0:6] = [0.1, -0.1, 0.1, 0.0, 0.0, 0.0]  # Slightly off default
    obs2[6:12] = [0.05, -0.05, 0.0, 0.0, 0.0, 0.0]  # Small velocities
    obs2[12:15] = [0.2, 0.2, 0.3]  # Target to the right
    obs2[15:19] = [1.0, 0.0, 0.0, 0.0]
    obs2[19:25] = [0.01, -0.01, 0.01, 0.0, 0.0, 0.0]  # Small previous action
    observations.append(obs2)
    descriptions.append("Slightly off home, target right")
    
    # Test case 3: Random position within reasonable bounds
    obs3 = np.zeros(25)
    obs3[0:6] = np.random.uniform(-0.5, 0.5, 6)  # Random relative positions
    obs3[6:12] = np.random.uniform(-0.1, 0.1, 6)  # Random velocities
    obs3[12:15] = [0.25, -0.1, 0.35]  # Target to the left
    obs3[15:19] = [1.0, 0.0, 0.0, 0.0]
    obs3[19:25] = np.random.uniform(-0.1, 0.1, 6)
    observations.append(obs3)
    descriptions.append("Random position, target left")
    
    # Test case 4: Near target position
    obs4 = np.zeros(25)
    obs4[0:6] = [0.2, -0.3, 0.3, 0.1, 0.0, 0.0]  # Position near a reaching pose
    obs4[6:12] = [0.02, -0.02, 0.01, 0.0, 0.0, 0.0]  # Small velocities
    obs4[12:15] = [0.3, 0.0, 0.3]  # Target position (close to current)
    obs4[15:19] = [1.0, 0.0, 0.0, 0.0]
    obs4[19:25] = [0.02, -0.02, 0.01, 0.0, 0.0, 0.0]
    observations.append(obs4)
    descriptions.append("Near target position")
    
    # Test case 5: Moving fast towards target
    obs5 = np.zeros(25)
    obs5[0:6] = [0.15, -0.2, 0.2, 0.05, 0.0, 0.0]
    obs5[6:12] = [0.2, -0.15, 0.1, 0.05, 0.0, 0.0]  # Higher velocities
    obs5[12:15] = [0.35, 0.1, 0.4]  # Target further away
    obs5[15:19] = [1.0, 0.0, 0.0, 0.0]
    obs5[19:25] = [0.1, -0.08, 0.05, 0.02, 0.0, 0.0]  # Larger previous action
    observations.append(obs5)
    descriptions.append("Moving fast towards distant target")
    
    return np.array(observations), descriptions


def test_model_inference(model, device, observations, descriptions):
    """Test model inference with mock observations"""
    print("\n" + "="*60)
    print("STEP 3: Testing Model Inference")
    print("="*60)
    
    model.eval()
    actions_list = []
    
    with torch.no_grad():
        for i, (obs, desc) in enumerate(zip(observations, descriptions)):
            print(f"\n--- Test Case {i+1}: {desc} ---")
            
            # Print observation summary
            print(f"Observation shape: {obs.shape}")
            print(f"  Joint pos (rel): {obs[0:6].round(3)}")
            print(f"  Joint vel (rel): {obs[6:12].round(3)}")
            print(f"  Target pos: {obs[12:15].round(3)}")
            print(f"  Target quat: {obs[15:19].round(3)}")
            print(f"  Last action: {obs[19:25].round(3)}")
            
            # Convert to tensor and add batch dimension
            obs_tensor = torch.FloatTensor(obs).unsqueeze(0).to(device)
            
            # Get action from model
            try:
                action_tensor = model(obs_tensor)
                action = action_tensor.cpu().numpy().squeeze()
                actions_list.append(action)
                
                print(f"\nAction output: {action.round(3)}")
                print(f"  Shape: {action.shape}")
                print(f"  Mean: {action.mean():.3f}")
                print(f"  Std: {action.std():.3f}")
                print(f"  Min: {action.min():.3f}")
                print(f"  Max: {action.max():.3f}")
                print(f"  Range: [{action.min():.3f}, {action.max():.3f}]")
                
                # Check if actions are reasonable
                if np.abs(action).max() > 10:
                    print("  ⚠️ WARNING: Large action values detected!")
                elif np.abs(action).max() < 0.001:
                    print("  ⚠️ WARNING: Very small action values detected!")
                else:
                    print("  ✓ Action values seem reasonable")
                    
            except Exception as e:
                print(f"  ❌ Error during inference: {e}")
                import traceback
                traceback.print_exc()
    
    return np.array(actions_list) if actions_list else None


def analyze_actions(actions, observations, descriptions):
    """Analyze the pattern of actions"""
    print("\n" + "="*60)
    print("STEP 4: Analyzing Action Patterns")
    print("="*60)
    
    if actions is None or len(actions) == 0:
        print("No actions to analyze")
        return
    
    # Overall statistics
    print("\n--- Overall Action Statistics ---")
    print(f"Mean across all actions: {actions.mean():.3f}")
    print(f"Std across all actions: {actions.std():.3f}")
    print(f"Min value: {actions.min():.3f}")
    print(f"Max value: {actions.max():.3f}")
    print(f"Absolute max: {np.abs(actions).max():.3f}")
    
    # Per-joint statistics
    print("\n--- Per-Joint Statistics ---")
    for j in range(actions.shape[1]):
        print(f"Joint {j+1}: mean={actions[:, j].mean():.3f}, "
              f"std={actions[:, j].std():.3f}, "
              f"range=[{actions[:, j].min():.3f}, {actions[:, j].max():.3f}]")
    
    # Check for patterns
    print("\n--- Pattern Analysis ---")
    
    # Check if actions correlate with distance to target
    for i, (action, obs, desc) in enumerate(zip(actions, observations, descriptions)):
        target_pos = obs[12:15]
        # Simple distance metric (this is approximate)
        distance_to_target = np.linalg.norm(target_pos - np.array([0.2, 0.0, 0.2]))
        action_magnitude = np.linalg.norm(action)
        print(f"Case {i+1}: action_magnitude={action_magnitude:.3f}, "
              f"approx_distance={distance_to_target:.3f}")
    
    # Visualize if matplotlib is available
    try:
        fig, axes = plt.subplots(2, 3, figsize=(15, 8))
        fig.suptitle('Action Analysis', fontsize=16)
        
        # Plot action values for each test case
        for i in range(min(6, actions.shape[1])):
            ax = axes[i // 3, i % 3]
            ax.bar(range(len(actions)), actions[:, i])
            ax.set_title(f'Joint {i+1} Actions')
            ax.set_xlabel('Test Case')
            ax.set_ylabel('Action Value')
            ax.grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.savefig('action_analysis.png')
        print("\n✓ Saved action analysis plot to 'action_analysis.png'")
        plt.show()
    except ImportError:
        print("\n(Matplotlib not available for visualization)")


def suggest_deployment_parameters(actions):
    """Suggest deployment parameters based on action analysis"""
    print("\n" + "="*60)
    print("STEP 5: Deployment Recommendations")
    print("="*60)
    
    if actions is None or len(actions) == 0:
        print("No actions available for recommendations")
        return
    
    max_action = np.abs(actions).max()
    mean_action = np.abs(actions).mean()
    
    print("\nBased on the model outputs:")
    
    # Suggest action scaling
    if max_action > 5:
        scale = 0.005
        print(f"⚠️ Actions are large (max={max_action:.2f})")
        print(f"   Recommended position_scale: {scale}")
        print(f"   This will limit max position change to ~{max_action * scale:.3f} rad")
    elif max_action > 1:
        scale = 0.01
        print(f"⚠️ Actions are moderate (max={max_action:.2f})")
        print(f"   Recommended position_scale: {scale}")
        print(f"   This will limit max position change to ~{max_action * scale:.3f} rad")
    elif max_action < 0.01:
        scale = 1.0
        print(f"⚠️ Actions are very small (max={max_action:.4f})")
        print(f"   Recommended position_scale: {scale}")
        print(f"   Consider checking if model is outputting correctly")
    else:
        scale = 0.1
        print(f"✓ Actions are in reasonable range (max={max_action:.2f})")
        print(f"   Recommended position_scale: {scale}")
        print(f"   This will limit max position change to ~{max_action * scale:.3f} rad")
    
    # Suggest safety limits
    print(f"\nRecommended safety limits:")
    print(f"  max_joint_velocity: 0.2 rad/s (start conservative)")
    print(f"  max_joint_acceleration: 0.3 rad/s²")
    print(f"  control_frequency: 10 Hz")
    
    # Suggest testing approach
    print(f"\nTesting approach:")
    print(f"  1. Run in debug mode first to verify observations")
    print(f"  2. Start with position_scale = {scale * 0.1:.4f} (10% of recommended)")
    print(f"  3. Gradually increase to {scale:.4f}")
    print(f"  4. Monitor actual vs commanded positions")
    
    # Check for potential issues
    print(f"\nPotential issues to watch for:")
    if np.std(actions) < 0.001:
        print(f"  ⚠️ Very low action variance - model might be outputting constants")
    if np.any(np.isnan(actions)) or np.any(np.isinf(actions)):
        print(f"  ❌ NaN or Inf values detected in actions!")
    
    # Action type inference
    print(f"\nInferred action type based on range:")
    if max_action < 0.1:
        print(f"  Likely: Direct joint position targets (radians)")
    elif max_action < 2:
        print(f"  Likely: Normalized actions [-1, 1] for position/velocity control")
    else:
        print(f"  Likely: Unnormalized outputs (may need scaling/clipping)")


# Main execution
if __name__ == "__main__":
    # Path to your model
    MODEL_PATH = "/home/urkui-3/Documents/Ales/IsaacLab/runs/torch/Isaac-Reach-UR3e-v0/UR3e/checkpoints/best_agent.pt"
    
    print("UR3e Model Testing Suite")
    print("========================\n")
    
    # Step 1: Load model
    model, device = test_model_loading(MODEL_PATH)
    
    if model is None:
        print("\n❌ Failed to load model. Please check the checkpoint structure.")
        exit(1)
    
    # Step 2: Create mock observations
    observations, descriptions = create_mock_observations()
    
    # Step 3: Test inference
    actions = test_model_inference(model, device, observations, descriptions)
    
    # Step 4: Analyze actions
    if actions is not None:
        analyze_actions(actions, observations, descriptions)
        
        # Step 5: Get deployment recommendations
        suggest_deployment_parameters(actions)
    
    print("\n" + "="*60)
    print("Testing Complete!")
    print("="*60)
    print("\nNext steps:")
    print("1. Review the action outputs above")
    print("2. Adjust the deployment script based on recommendations")
    print("3. Test with debug mode on the real robot")
    print("4. Gradually enable real control with safety limits")

UR3e Model Testing Suite

STEP 1: Loading and Inspecting Model
Using device: cpu

Loading model from: /home/urkui-3/Documents/Ales/IsaacLab/runs/torch/Isaac-Reach-UR3e-v0/UR3e/checkpoints/best_agent.pt

--- Checkpoint Structure ---
Checkpoint type: <class 'dict'>
Checkpoint keys: ['policy', 'value', 'optimizer', 'state_preprocessor', 'value_preprocessor']
❌ Unexpected checkpoint structure

❌ Failed to load model. Please check the checkpoint structure.

STEP 2: Creating Mock Observations

STEP 3: Testing Model Inference


  return torch._C._cuda_getDeviceCount() > 0
  checkpoint = torch.load(model_path, map_location=device)


AttributeError: 'NoneType' object has no attribute 'eval'