In [1]:
%cd /workspace
import sys
import os

# Set PYTHONPATH environment variable for the kernel
robofin_path = os.path.join(os.getcwd(), 'robofin')
current_pythonpath = os.environ.get('PYTHONPATH', '')
if robofin_path not in current_pythonpath:
    os.environ['PYTHONPATH'] = f"{robofin_path}:{current_pythonpath}" if current_pythonpath else robofin_path

# Also add to sys.path for immediate effect
if robofin_path not in sys.path:
    sys.path.insert(0, robofin_path)

/workspace


  self.shell.db['dhist'] = compress_dhist(dhist)[-100:]


In [2]:
def get_type(obj):
    """
    Function for displaying nested types.
    
    e.g. get_type(dict_str_float) -> "dict[str, float]"
    """
    if isinstance(obj, dict):
        if not obj:
            return "dict[?, ?]"
        key_types = {get_type(k) for k in obj.keys()}
        value_types = {get_type(v) for v in obj.values()}
        return f"dict[{', '.join(key_types)}, {', '.join(value_types)}]"
    elif isinstance(obj, list):
        if not obj:
            return "list[?]"
        elem_types = {get_type(elem) for elem in obj}
        return f"list[{', '.join(elem_types)}]"
    elif isinstance(obj, tuple):
        if not obj:
            return "tuple[?]"
        elem_types = [get_type(elem) for elem in obj]
        return f"tuple[{', '.join(elem_types)}]"
    elif isinstance(obj, set):
        if not obj:
            return "set[?]"
        elem_types = {get_type(elem) for elem in obj}
        return f"set[{', '.join(elem_types)}]"
    else:
        return type(obj).__name__

In [3]:
%reload_ext autoreload
%autoreload 2

In [4]:
import numpy as np
import torch
from robofin.robots import Robot

# Load the Robot class with the standard URDF file (that uses relative filepaths)
robot = Robot("assets/panda/panda.urdf")
# robot = Robot("assets/gp7/gp7.urdf")

## Dataloader/Replay buffer testing

In [5]:
from avoid_everything.data_loader import DataModule

training_model_parameters = {
  "robot_dof": 7,  # robot's degrees of freedom
  "point_match_loss_weight": 1.0,  # point match loss (BC loss) weight for actor
  "actor_loss_weight": 1.0,        # RL actor loss weight
  "collision_loss_weight": 1.0,    # collision loss weight (only used for validation right now)
  "collision_loss_margin": 0.03,   # margin for collision loss [m]
  "min_lr": 1.0e-5,
  "max_lr": 5.0e-5,
  "warmup_steps": 5000,   # number of steps to warm up the learning rate linearly from min_lr to max_lr
  "weight_decay": 1e-4,   # weight decay for AdamW optimizers
  "gamma": 0.99,          # discount factor for Q-learning
  "tau": 0.005,           # target network's soft update rate
  "grad_clip_norm": 1.0,  # gradient clipping norm
  "pc_bounds": [[-1.5, -1.5, -0.1], [1.5, 1.5, 1.5]],
  "rollout_length": 69   # number of steps to rollout for the actor
}
data_module_parameters = {
    "data_dir": "/workspace/datasets/ae_aristotle1_5mm_cubbies",
    "train_trajectory_key": "global_solutions",
    "val_trajectory_key": "global_solutions",
    "num_obstacle_points": 4096,
    "random_scale": 0.015,
    "include_reward": True,
    "num_target_points": 128,
}
shared_parameters = {
    "urdf_path": "assets/panda/panda.urdf",
    "num_robot_points": 2048,
    "goal_reward": 100.0,      # reward for reaching the goal
    "collision_reward": -10.0, # reward for colliding with an obstacle
    "step_reward": -1.0        # reward for each step that doesn't terminate the episode
}
cfg = {
    "train_batch_size": 12,
    "val_batch_size": 12,
    "num_workers": 4,
    "expert_fraction": 0.25
}


  from .autonotebook import tqdm as notebook_tqdm


In [6]:
dm = DataModule(
    train_batch_size=cfg["train_batch_size"],
    val_batch_size=cfg["val_batch_size"],
    num_workers=cfg["num_workers"],
    **data_module_parameters,
    **shared_parameters,
)
dm.setup("fit")

[32mLoaded StateRewardDataset for training[0m


In [7]:
from avoid_everything.col.replay import ReplayBuffer

replay_buffer = ReplayBuffer(
    capacity=1000,
    urdf_path=shared_parameters["urdf_path"],
    num_robot_points=shared_parameters["num_robot_points"],
    num_target_points=data_module_parameters["num_target_points"],
    dataset=dm.data_train,
)

In [8]:
from lightning.fabric import Fabric
from avoid_everything.col.col import CoLMotionPolicyTrainer

torch.set_float32_matmul_precision("high")

trainer = CoLMotionPolicyTrainer(
    replay_buffer=replay_buffer,
    **shared_parameters,
    **training_model_parameters,
)
fabric = Fabric(accelerator="gpu", devices=1)
fabric.launch()

expert_loader = fabric.setup_dataloaders(dm.train_dataloader(), move_to_device=False)

opt_cfg = trainer.configure_optimizers()
actor_optim  = opt_cfg["actor_optim"]
critic_optim = opt_cfg["critic_optim"]
actor_sch    = opt_cfg["actor_scheduler"]
critic_sch   = opt_cfg["critic_scheduler"]

# fabric setup: wrap trainable modules w/ their optimizers
trainer.actor,  actor_optim  = fabric.setup(trainer.actor,  actor_optim)
trainer.critic, critic_optim = fabric.setup(trainer.critic, critic_optim)

# target networks have no optimizers
trainer.target_actor  = fabric.setup(trainer.target_actor)
trainer.target_critic = fabric.setup(trainer.target_critic)

# now that actor/critic are on the right device, initialize trainer
trainer.setup()



In [9]:
from avoid_everything.col.mixed_batch_provider import MixedBatchProvider
mixed_provider = MixedBatchProvider(
    expert_loader=expert_loader, agent_replay=replay_buffer)

In [10]:
mixed, data_loader_iterations = mixed_provider.sample(
    cfg["train_batch_size"],
    expert_fraction=cfg["expert_fraction"],
    pretraining=True,
)
batch = trainer.move_batch_to_device(mixed, fabric.device)
batch["configuration"].shape
print("expert data loader iterations:", data_loader_iterations)

expert data loader iterations: 1


In [11]:
metrics = trainer.agent_rollout(batch)
print(f"Mean episode reward: {metrics['avg_episode_reward']}")
print(f"Transitions collected: {metrics['transitions_collected']}")

Mean episode reward: -13.666666984558105
Transitions collected: 56


In [12]:
len(replay_buffer)

56

In [17]:
# sample 25/75 | expert/agent
mixed, data_loader_iterations = mixed_provider.sample(
    cfg["train_batch_size"],
    expert_fraction=cfg["expert_fraction"],
    pretraining=False,
)
batch = trainer.move_batch_to_device(mixed, fabric.device)
batch["configuration"].shape
print("expert data loader iterations:", data_loader_iterations)

expert data loader iterations: 1


In [None]:
import numpy as np
import torch

def convert_to_numpy_f32(arr: np.ndarray | torch.Tensor) -> np.ndarray:
    """
    Convert a NumPy array or Torch tensor to a NumPy float32 array.
    
    Parameters
    ----------
    arr : np.ndarray or torch.Tensor
        Input array to convert.
    
    Returns
    -------
    np.ndarray
        Converted array with dtype float32.
    """
    if isinstance(arr, torch.Tensor):
        np_arr: np.ndarray = arr.cpu().numpy()
    elif isinstance(arr, np.ndarray):
        np_arr: np.ndarray = arr
    else:
        raise TypeError("convert_to_numpy_f32: Input must be a NumPy array or Torch tensor")
    return np_arr.astype(np.float32) 

In [None]:
cuboid_dims = convert_to_numpy_f32(sample["cuboid_dims"])
cuboid_centers = convert_to_numpy_f32(sample["cuboid_centers"])
cuboid_quaternions = convert_to_numpy_f32(sample["cuboid_quats"])
for dims, center, quat in zip(cuboid_dims, cuboid_centers, cuboid_quaternions):
    print(f"Lengths: Dimensions: {len(dims)}, Center: {len(center)}, Quaternion: {len(quat)}")

cylinder_radii = convert_to_numpy_f32(sample["cylinder_radii"])
cylinder_heights = convert_to_numpy_f32(sample["cylinder_heights"])
cylinder_centers = convert_to_numpy_f32(sample["cylinder_centers"])
cylinder_quaternions = convert_to_numpy_f32(sample["cylinder_quats"])
for radius, height, center, quat in zip(cylinder_radii, cylinder_heights, cylinder_centers, cylinder_quaternions):
    print(f"Lengths: Radius: {len(radius)}, Height: {len(height)}, Center: {len(center)}, Quaternion: {len(quat)}")

In [None]:
import viz_client

urdf_path = "/workspace/assets/panda/panda_spheres.urdf"
if not os.path.exists(urdf_path):
    print(f"❌ URDF not found at {urdf_path}")
    print("Please update the urdf_path variable in test_connect()")
    raise FileNotFoundError(f"URDF not found at {urdf_path}")

viz_client.connect(urdf_path)
viz_client.publish_joints(joints=robot.neutral_config_dict)

In [None]:
viz_client.publish_obstacles(cuboid_centers=cuboid_centers,
                            cuboid_dims=cuboid_dims,
                            cuboid_quaternions=cuboid_quaternions,
                            cylinder_centers=cylinder_centers,
                            cylinder_radii=cylinder_radii,
                            cylinder_heights=cylinder_heights,
                            cylinder_quaternions=cylinder_quaternions,
                            color=[0.8, 0.5, 0.6])

In [None]:
viz_client.clear_obstacles()

In [None]:
viz_client.publish_joints(joints=robot.neutral_config_dict)

In [None]:
viz_client.shutdown()

## Sampler testing

In [None]:
from robofin.old.kinematics.numba import franka_arm_link_fk, franka_arm_visual_fk, franka_eef_visual_fk
from robofin.old.kinematics.torch import franka_arm_link_fk as torch_franka_arm_link_fk
from robofin.old.kinematics.torch import franka_arm_visual_fk as torch_franka_arm_visual_fk

fk = robot.fk(robot.neutral_config)

prismatic_joint = robot.auxiliary_joint_defaults["panda_finger_joint1"]
base_pose = np.eye(4)
franka_fk = franka_arm_link_fk(robot.neutral_config, prismatic_joint, base_pose)
franka_fk_dict = {
    "panda_link0": franka_fk[0],
    "panda_link1": franka_fk[1],
    "panda_link2": franka_fk[2],
    "panda_link3": franka_fk[3],
    "panda_link4": franka_fk[4],
    "panda_link5": franka_fk[5],
    "panda_link6": franka_fk[6],
    "panda_link7": franka_fk[7],
    "panda_link8": franka_fk[8],
    "panda_hand": franka_fk[9],
    "panda_grasptarget": franka_fk[10],
    "right_gripper": franka_fk[11],
    "panda_leftfinger": franka_fk[12],
    "panda_rightfinger": franka_fk[13],
}

torch_cfg = torch.Tensor(robot.neutral_config).unsqueeze(0)
torch_fk = robot.fk_torch(torch_cfg)

torch_base_pose = torch.Tensor(base_pose)
torch_franka_fk = torch_franka_arm_link_fk(torch_cfg, prismatic_joint, torch_base_pose)
torch_franka_fk_dict = {
    "panda_link0": torch_franka_fk[:,0],
    "panda_link1": torch_franka_fk[:,1],
    "panda_link2": torch_franka_fk[:,2],
    "panda_link3": torch_franka_fk[:,3],
    "panda_link4": torch_franka_fk[:,4],
    "panda_link5": torch_franka_fk[:,5],
    "panda_link6": torch_franka_fk[:,6],
    "panda_link7": torch_franka_fk[:,7],
    "panda_link8": torch_franka_fk[:,8],
    "panda_hand": torch_franka_fk[:,9],
    "panda_grasptarget": torch_franka_fk[:,10],
    "right_gripper": torch_franka_fk[:,11],
    "panda_leftfinger": torch_franka_fk[:,12],
    "panda_rightfinger": torch_franka_fk[:,13],
}

print(torch_fk["panda_hand"].shape)
print(torch_franka_fk_dict["panda_hand"].shape)


In [None]:
from robofin.samplers import NumpyRobotSampler, TorchRobotSampler
from robofin.old.samplers import NumpyFrankaSampler, TorchFrankaSampler

np_sampler = NumpyRobotSampler(robot)
torch_sampler = TorchRobotSampler(robot)

np_franka_sampler = NumpyFrankaSampler()
torch_franka_sampler = TorchFrankaSampler()

In [None]:
franka_ee_pts = np_franka_sampler.sample_end_effector(fk[robot.tcp_link_name].squeeze(), prismatic_joint)
ee_pts = np_sampler.sample_end_effector(fk[robot.tcp_link_name].squeeze())

print("---" * 10)
print(f"Franka EE points shape: {franka_ee_pts.shape}")
print(f"EE points shape: {ee_pts.shape}")

torch_franka_ee_pts = torch_franka_sampler.sample_end_effector(torch_franka_fk_dict[robot.tcp_link_name], prismatic_joint)
torch_ee_pts = torch_sampler.sample_end_effector(torch_fk[robot.tcp_link_name])

print("---" * 10)
print(f"Torch Franka EE points shape: {torch_franka_ee_pts.shape}")
print(f"Torch EE points shape: {torch_ee_pts.shape}")

In [None]:
def has_point(
    point_cloud: np.ndarray, point: np.ndarray, tolerance: float = 1e-6
) -> tuple[bool, np.ndarray]:
    """
    Returns:
    exists, idx (bool, np.ndarray): Whether `point` exists in the point cloud
        and the indices in the point cloud array where the point exists.
    """
    mask = np.max(np.abs(point_cloud - point), axis=1) <= tolerance
    exists = mask.any()
    idx = np.flatnonzero(mask)
    return exists, idx


def compare_point_clouds(
    pc1: np.ndarray, pc2: np.ndarray, abs_tol: float = 1e-7
) -> bool:
    """
    Return True if input point clouds are identical (within given tolerance)
    """
    # return np.allclose(pc1, pc2, atol=abs_tol)
    if np.allclose(pc1, pc2, atol=abs_tol):
        return True

    print("compare_point_clouds() returning False")
    print("Largest error:", np.abs(pc1 - pc2).max())

    return False

In [None]:
from termcolor import cprint
from robofin.samplers import get_points_on_robot_eef
from robofin.old.kinematics.numba import get_points_on_franka_eef, eef_pose_to_link8, franka_eef_link_fk, franka_arm_visual_fk

np.set_printoptions(precision=3, suppress=True)

# frame = robot.tcp_link_name # right_gripper
frame = "panda_hand"
eef_pose = fk[frame].squeeze()
# eef_pose = np.eye(4)
eef_fk = robot.eef_fk(eef_pose, frame)

lnk8_pose = eef_pose_to_link8(eef_pose, frame)
franka_eef_fk_arr = franka_eef_link_fk(prismatic_joint, lnk8_pose)
franka_eef_fk = {
    "panda_link8": franka_eef_fk_arr[0],
    "panda_hand": franka_eef_fk_arr[1],
    "panda_grasptarget": franka_eef_fk_arr[2],
    "right_gripper": franka_eef_fk_arr[3],
    "panda_leftfinger": franka_eef_fk_arr[4],
    "panda_rightfinger": franka_eef_fk_arr[5],
}
# visual_eef_fk = robot.eef_visual_fk(pose, frame, auxiliary_joint_values)

if np.allclose(eef_fk["panda_link8"], lnk8_pose):
    cprint("eef_fk['panda_link8'] == lnk8_pose", "green")

print("-" * 30)
faulty_links = []
for link_name in franka_eef_fk:
    if np.allclose(eef_fk[link_name], franka_eef_fk[link_name]):
        cprint(f"eef_fk['{link_name}'] EQUALS franka_eef_fk['{link_name}']", "cyan")
    else:
        cprint(f"eef_fk['{link_name}'] DOES NOT EQUAL franka_eef_fk['{link_name}']", "red")
        faulty_links.append(link_name)
    print("-" * 30)


eef_pts = get_points_on_robot_eef(robot, eef_pose, num_points=0, link_points=np_sampler.points, frame=frame)
franka_eef_pts = get_points_on_franka_eef(eef_pose, prismatic_joint=0.04, sample=0, 
                                          panda_hand_points=np_franka_sampler.points["eef_panda_hand"],
                                          panda_leftfinger_points=np_franka_sampler.points["eef_panda_leftfinger"],
                                          panda_rightfinger_points=np_franka_sampler.points["eef_panda_rightfinger"],
                                          frame=frame)


In [None]:
import viz_client

urdf_path = "/workspace/assets/panda/panda_spheres.urdf"
if not os.path.exists(urdf_path):
    print(f"❌ URDF not found at {urdf_path}")
    print("Please update the urdf_path variable in test_connect()")
    raise FileNotFoundError(f"URDF not found at {urdf_path}")

viz_client.connect(urdf_path)
viz_client.publish_joints(joints=robot.neutral_config_dict)

In [None]:
viz_client.publish_ghost_end_effector(pose=fk["panda_hand"], frame="panda_hand", color=[0.8, 0.2, 0.8])

In [None]:
viz_client.clear_ghost_end_effector()

In [None]:
config = robot.neutral_config.copy()
config[0] = robot.neutral_config[0] + 1.0 
viz_client.publish_ghost_robot(config, color=[0.5,0.6,0.8])

In [None]:
viz_client.clear_ghost_robot()

In [None]:
from robofin.old.robot_constants import FrankaConstants, RealFrankaConstants
print(FrankaConstants.JOINT_LIMITS[5])
print(RealFrankaConstants.JOINT_LIMITS[5])
print(robot.main_joint_limits[5])
