In [1]:
import random
import h5py
import numpy as np
from scipy.spatial import ConvexHull
from scipy.spatial.transform import Rotation as R
import cv2

###############################################################################
# Real-Robot Forward Kinematics Functions
###############################################################################
def dh_transform(a, alpha, d, theta):
    return np.array([
        [np.cos(theta), -np.sin(theta)*np.cos(alpha),  np.sin(theta)*np.sin(alpha),  a*np.cos(theta)],
        [np.sin(theta),  np.cos(theta)*np.cos(alpha), -np.cos(theta)*np.sin(alpha),  a*np.sin(theta)],
        [0,              np.sin(alpha),                np.cos(alpha),                d],
        [0,              0,                            0,                            1]
    ], dtype=float)

def franka_forward_kinematics(joint_angles):
    q1, q2, q3, q4, q5, q6, q7 = joint_angles
    T1 = dh_transform(a=0,       alpha=0,         d=0.333,   theta=q1)
    T2 = dh_transform(a=0,       alpha=-np.pi/2,  d=0,       theta=q2)
    T3 = dh_transform(a=0,       alpha=np.pi/2,   d=0.316,   theta=q3)
    T4 = dh_transform(a=0.0825,  alpha=np.pi/2,   d=0,       theta=q4)
    T5 = dh_transform(a=-0.0825, alpha=-np.pi/2,  d=0.384,   theta=q5)
    T6 = dh_transform(a=0,       alpha=np.pi/2,   d=0,       theta=q6)
    T7 = dh_transform(a=0.088,   alpha=np.pi/2,   d=0,       theta=q7)
    T_flange = dh_transform(a=0, alpha=0, d=0.107, theta=0)
    Rz_neg_45 = dh_transform(a=0, alpha=0, d=0, theta=-np.pi/4)
    T_panda_hand = T1 @ T2 @ T3 @ T4 @ T5 @ T6 @ T7 @ T_flange @ Rz_neg_45
    return T_panda_hand

def joint_angles_to_end_effector_pose(joint_angles):
    """
    Convert 7 joint angles into a 7D end-effector pose:
    [x, y, z, qx, qy, qz, qw].
    """
    T = franka_forward_kinematics(joint_angles)
    x, y, z = T[0, 3], T[1, 3], T[2, 3]
    rotation_matrix = T[:3, :3]
    quat = R.from_matrix(rotation_matrix).as_quat()  # [qx, qy, qz, qw]
    return np.array([x, y, z, quat[0], quat[1], quat[2], quat[3]], dtype=float)

###############################################################################
# Simulation Process Functions (Corrected Sim Code)
###############################################################################
# 1) Utility Functions (Quaternion normalization, Spherical/Cartesian Conversions)
def normalize_quaternion(q):
    """Normalize a quaternion, returning [0,0,0,1] if near zero."""
    norm = np.linalg.norm(q)
    if norm < 1e-6:
        return np.array([0, 0, 0, 1])
    return q / norm

def cart2sph(x, y, z):
    """Convert Cartesian to spherical coords (r, az, el)."""
    r = np.sqrt(x**2 + y**2 + z**2)
    az = np.arctan2(y, x)
    el = np.arctan2(z, np.sqrt(x**2 + y**2))
    return r, az, el

def sph2cart(r, az, el):
    """Convert spherical to Cartesian coords."""
    x = r * np.cos(el) * np.cos(az)
    y = r * np.cos(el) * np.sin(az)
    z = r * np.sin(el)
    return x, y, z

# 2) Image Generation and FoV Helpers
def generate_image_frame(img_width, img_height, point_coord, dot_dia=10):
    image = np.zeros((img_height, img_width, 3), dtype=np.uint8)
    cv2.circle(image, point_coord, dot_dia // 2, (255, 255, 255), -1)
    return image

def generate_camera_image(camera_pose, camera_fov, point_obj,
                          img_width=640, img_height=480, dot_dia=10):
    cam_pos = np.array(camera_pose[:3])
    cam_quat = camera_pose[3:]
    cam_rot = R.from_quat(cam_quat).as_matrix()
    # For an orthonormal matrix, use the transpose instead of the inverse.
    point_in_cam = cam_rot.T @ (np.array(point_obj) - cam_pos)
    h_fov, v_fov = np.radians(camera_fov)
    x_cam, y_cam, z_cam = point_in_cam
    h_angle = np.arctan2(y_cam, x_cam)
    v_angle = np.arctan2(z_cam, x_cam)
    img_x = int((h_angle + h_fov / 2) / h_fov * img_width)
    img_y = int((v_angle + v_fov / 2) / v_fov * img_height)
    return generate_image_frame(img_width, img_height, (img_x, img_y), dot_dia)

def is_inFOV(camera_pose, camera_fov, point_obj):
    cam_pos = np.array(camera_pose[:3])
    if np.linalg.norm(np.array(point_obj) - cam_pos) < 1e-4:
        return True
    cam_quat = camera_pose[3:]
    cam_rot = R.from_quat(cam_quat).as_matrix()
    point_in_cam = cam_rot.T @ (np.array(point_obj) - cam_pos)
    h_fov, v_fov = np.radians(camera_fov)
    x_cam, y_cam, z_cam = point_in_cam
    if x_cam <= 0:
        return False
    h_angle = np.arctan2(y_cam, x_cam)
    v_angle = np.arctan2(z_cam, x_cam)
    return (abs(h_angle) <= h_fov / 2) and (abs(v_angle) <= v_fov / 2)

# 3) Minimal 6D Representation Conversion (not used for hull but kept for completeness)
def pose7d_to_6d(pose7d):
    """Convert a 7D pose (position + quaternion) to position + rotation vector."""
    pos = pose7d[:3]
    quat = normalize_quaternion(pose7d[3:7])
    rotvec = R.from_quat(quat).as_rotvec()
    return np.hstack([pos, rotvec])

# 4) Cube Creation (Candidate Viewpoints)
def make_cube(cx, cy, cz, rotation, side=0.005):
    half = side / 2.0
    corners = np.array([
        [-half, -half, -half],
        [-half, -half,  half],
        [-half,  half, -half],
        [-half,  half,  half],
        [ half, -half, -half],
        [ half, -half,  half],
        [ half,  half, -half],
        [ half,  half,  half]
    ])
    rotated_corners = rotation.apply(corners)
    rotated_corners += np.array([cx, cy, cz])
    return rotated_corners

# 5) Generate Candidates for Each Demo Using Cube Method and FoV Filtering
def generate_candidates_for_demo(positions, goal_pose, num_samples=5):
    n_points = len(positions)
    deg_to_rad = np.pi / 180.0
    max_deg = 5.0
    max_side = 0.1
    max_roll = 30 * deg_to_rad
    max_pitch = 30 * deg_to_rad
    max_yaw = 30 * deg_to_rad
    lambda_decay = 1.1
    camera_fov = (80, 80)
    img_width, img_height, dot_dia = 640, 480, 10

    goal_pos = goal_pose[:3]
    goal_quat = np.array([0, 0, 0, 1])  # neutral orientation for sampling

    dataset = []
    total_candidates = 0
    in_fov_candidates = 0

    for i in range(n_points):
        x, y, z = positions[i]
        dx = x - goal_pos[0]
        dy = y - goal_pos[1]
        dz = z - goal_pos[2]
        r_val, az, el = cart2sph(dx, dy, dz)
        for sample in range(num_samples):
            frac = np.exp(-lambda_decay * (i / (n_points - 1))) if n_points > 1 else 1.0
            delta_angle = frac * (max_deg * deg_to_rad)
            az_var = az + random.choice([-1, 1]) * delta_angle
            el_var = el + random.choice([-1, 1]) * delta_angle
            dx_var, dy_var, dz_var = sph2cart(r_val, az_var, el_var)
            # Offset from the goal position
            x_new = x + dx_var
            y_new = y + dy_var
            z_new = z + dz_var

            roll_off = frac * max_roll
            pitch_off = frac * max_pitch
            yaw_off = frac * max_yaw
            droll = random.choice([-1, 1]) * roll_off
            dpitch = random.choice([-1, 1]) * pitch_off
            dyaw = random.choice([-1, 1]) * yaw_off

            # Use 'xyz' Euler order as in the simulation process
            rot_obj = R.from_euler('zyx', [droll, dpitch, dyaw])
            quat = rot_obj.as_quat()
            quat = normalize_quaternion(quat)

            candidate_center = [x_new, y_new, z_new, quat[0], quat[1], quat[2], quat[3]]
            total_candidates += 1
            in_fov = is_inFOV(candidate_center, camera_fov, goal_pos)
            if in_fov:
                in_fov_candidates += 1
            img = generate_camera_image(candidate_center, camera_fov, goal_pos,
                                        img_width, img_height, dot_dia)
            dataset.append({
                "trajectory_idx": i,
                "pose_7d": candidate_center,
                "in_fov": in_fov,
                "image": img
            })

            # Also add the corners of a small cube around the candidate center
            candidates_cube = make_cube(x_new, y_new, z_new, rot_obj, side=frac * max_side)
            for corner in candidates_cube:
                total_candidates += 1
                candidate_pose = [corner[0], corner[1], corner[2],
                                  quat[0], quat[1], quat[2], quat[3]]
                in_fov2 = is_inFOV(candidate_pose, camera_fov, goal_pos)
                if in_fov2:
                    in_fov_candidates += 1
                img2 = generate_camera_image(candidate_pose, camera_fov, goal_pos,
                                             img_width, img_height, dot_dia)
                dataset.append({
                    "trajectory_idx": i,
                    "pose_7d": candidate_pose,
                    "in_fov": in_fov2,
                    "image": img2
                })

    print(f"Generated {total_candidates} candidate viewpoints; {in_fov_candidates} passed FoV check.")
    filtered = [vp for vp in dataset if vp["in_fov"]]
    return filtered

# 6) Compute Global Safe Set and Convex Hull in 3D for Positions + Orientation Info
def compute_convex_hull_all(all_candidates, all_main_states):
    """
    Build a 3D convex hull for the positions and determine orientation bounds.
    """
    candidate_positions = []
    candidate_forwards = []
    candidate_rotvecs = []
    for vp in all_candidates:
        p7 = vp["pose_7d"]
        candidate_positions.append(p7[:3])
        quat = normalize_quaternion(p7[3:7])
        candidate_forwards.append(R.from_quat(quat).apply([1, 0, 0]))
        candidate_rotvecs.append(R.from_quat(quat).as_rotvec())

    candidate_positions = np.array(candidate_positions)
    candidate_forwards = np.array(candidate_forwards)
    candidate_rotvecs = np.array(candidate_rotvecs)

    main_positions_list = []
    main_forwards_list = []
    main_rotvecs_list = []
    for states in all_main_states:
        states[:, 3:7] = np.array([normalize_quaternion(q) for q in states[:, 3:7]])
        main_positions_list.append(states[:, :3])
        fwd = []
        rotv = []
        for s in states:
            quat = normalize_quaternion(s[3:7])
            fwd.append(R.from_quat(quat).apply([1, 0, 0]))
            rotv.append(R.from_quat(quat).as_rotvec())
        main_forwards_list.append(np.array(fwd))
        main_rotvecs_list.append(np.array(rotv))

    main_positions = np.vstack(main_positions_list)
    main_forwards = np.vstack(main_forwards_list)
    main_rotvecs = np.vstack(main_rotvecs_list)

    all_positions = np.vstack((main_positions, candidate_positions))
    all_forwards = np.vstack((main_forwards, candidate_forwards))
    all_rotvecs = np.vstack((main_rotvecs, candidate_rotvecs))

    # Build a 3D convex hull using positions only.
    hull = ConvexHull(all_positions, qhull_options="QJ")

    # Determine the orientation bounding box (in rotation-vector space)
    ori_min = np.min(all_rotvecs, axis=0)
    ori_max = np.max(all_rotvecs, axis=0)

    return all_positions, hull, ori_min, ori_max, all_forwards

# 7) Save Safe Set + Convex Hull and Sample Random 6D Poses for Testing
def save_and_test_safe_set(all_positions_3d, hull, ori_min, ori_max, all_forwards, num_samples=20):
    np.savez("safe_set_6d_multiple_object_61_20.npz",
             safe_set_positions=all_positions_3d,
             hull_equations=hull.equations,
             hull_vertices=hull.vertices,
             orientation_min=ori_min,
             orientation_max=ori_max,
             safe_set_orientations=all_forwards)
    print("Safe set (3D positions + orientation bounds) saved to 'safe_set_6d_simulation_seed_10_6.npz'.")

    pos_mins = np.min(all_positions_3d, axis=0)
    pos_maxs = np.max(all_positions_3d, axis=0)
    samples = []
    for _ in range(num_samples):
        # Sample a position within the bounding box.
        sample_pos = np.random.uniform(pos_mins, pos_maxs)
        # Sample an orientation in rotation-vector space.
        sample_rotvec = np.random.uniform(ori_min, ori_max)
        quat = R.from_rotvec(sample_rotvec).as_quat()
        sample_pose7d = np.concatenate([sample_pos, quat])
        samples.append(sample_pose7d)
    samples = np.array(samples)

    tol = 1e-1
    for i, s in enumerate(samples):
        inside_pos = np.all(np.dot(hull.equations[:, :-1], s[:3]) + hull.equations[:, -1] <= tol)
        print(f"Sample {i+1} position is {'inside' if inside_pos else 'outside'} the safe set (position hull).")
    return samples

###############################################################################
# 8) Main Execution (Real-Robot Data Converted to Simulation Process)
###############################################################################
def main_execution():
    # Use the real-robot data file containing joint_states.
    file_path = "/home/franka_deoxys/data_franka/imgsd_demo/demo.hdf5"
    all_candidates = []
    all_main_states = []  # List to store 7D EEF poses for each demo

    with h5py.File(file_path, "r") as f:
        demo_keys = list(f["data"].keys())
        print("Available Demos:", demo_keys)
        for demo_name in demo_keys[:5]:
            demo_data = f["data"][demo_name]
            obs_group = demo_data["obs"]

            # For real-robot demos, we assume joint_states exist.
            if "joint_states" not in obs_group:
                print(f"Demo {demo_name} has no joint_states. Skipping.")
                continue

            joint_states = obs_group["joint_states"][:]
            eef_poses_7d = []
            for js in joint_states:
                eef_pose = joint_angles_to_end_effector_pose(js)
                eef_poses_7d.append(eef_pose)
            eef_poses_7d = np.array(eef_poses_7d)
            all_main_states.append(eef_poses_7d)

            # Determine goal pose using gripper data if available.
            if "gripper_states" in obs_group:
                gripper_qpos = obs_group["gripper_states"][:]
                # print(gripper_qpos)
                closed_indices = np.where(np.all(gripper_qpos < 0.02, axis=1))[0]
                goal_idx = closed_indices[0] if closed_indices.size > 0 else -1
            else:
                goal_idx = -1  # Use the last pose if no gripper info.
            print(f"Goal Index {goal_idx}")
            goal_pose = eef_poses_7d[goal_idx]

            positions = eef_poses_7d[:, :3]
            candidates = generate_candidates_for_demo(positions, goal_pose, num_samples=5)
            if not candidates:
                print(f"No candidate points for {demo_name}.")
                continue
            print(f"Demo {demo_name}: {len(candidates)} candidate points passed FoV check.")
            all_candidates.extend(candidates)

    if not all_candidates:
        print("No candidates from any demo. Exiting.")
        return

    print("Safe set generation started for simulation environment...")
    all_positions, hull, ori_min, ori_max, all_forwards = compute_convex_hull_all(all_candidates, all_main_states)
    print("Safe set generation ended.")

    save_and_test_safe_set(all_positions, hull, ori_min, ori_max, all_forwards, num_samples=20)

if __name__ == "__main__":
    main_execution()


Available Demos: ['demo_0', 'demo_1', 'demo_2', 'demo_3', 'demo_4', 'demo_5', 'demo_6', 'demo_7', 'demo_8']
Goal Index 139
Generated 6795 candidate viewpoints; 21 passed FoV check.
Demo demo_0: 21 candidate points passed FoV check.
Goal Index 197
Generated 9450 candidate viewpoints; 158 passed FoV check.
Demo demo_1: 158 candidate points passed FoV check.
Goal Index 198
Generated 9360 candidate viewpoints; 102 passed FoV check.
Demo demo_2: 102 candidate points passed FoV check.
Goal Index 176
Generated 8595 candidate viewpoints; 537 passed FoV check.
Demo demo_3: 537 candidate points passed FoV check.
Goal Index 258
Generated 11970 candidate viewpoints; 22 passed FoV check.
Demo demo_4: 22 candidate points passed FoV check.
Safe set generation started for simulation environment...
Safe set generation ended.
Safe set (3D positions + orientation bounds) saved to 'safe_set_6d_simulation_seed_10_6.npz'.
Sample 1 position is inside the safe set (position hull).
Sample 2 position is inside 

In [2]:
import numpy as np
import plotly.graph_objects as go
from scipy.spatial import ConvexHull

def plot_interactive_safe_set_with_cones(npz_filepath="safe_set_6d_simulation_seed_10_6.npz"):
    """
    Load safe-set data (positions + forward vectors) from an NPZ file,
    plot the convex hull of positions, and draw orientation cones.
    
    Args:
        npz_filepath (str): Path to the NPZ file containing:
            - "safe_set_positions": (N,3) array of 3D positions
            - "safe_set_orientations": (N,3) array of forward vectors
    """
    # Load data from the NPZ file
    data = np.load(npz_filepath)
    safe_set_positions = data["safe_set_positions"]  # shape (N, 3)
    safe_set_orientations = data["safe_set_orientations"]  # shape (N, 3)

    # Build the convex hull for the positions
    hull = ConvexHull(safe_set_positions, qhull_options="QJ")

    # 1) Scatter trace of the raw positions
    scatter_trace = go.Scatter3d(
        x=safe_set_positions[:, 0],
        y=safe_set_positions[:, 1],
        z=safe_set_positions[:, 2],
        mode="markers",
        marker=dict(size=3, color="blue"),
        name="Safe Set Positions"
    )

    # 2) Mesh trace for the convex hull
    mesh_trace = go.Mesh3d(
        x=safe_set_positions[:, 0],
        y=safe_set_positions[:, 1],
        z=safe_set_positions[:, 2],
        i=hull.simplices[:, 0],
        j=hull.simplices[:, 1],
        k=hull.simplices[:, 2],
        opacity=0.3,
        color="red",
        name="Convex Hull"
    )

    # 3) Orientation cones for each position
    # Normalize orientation vectors so that we can scale them uniformly
    norms = np.linalg.norm(safe_set_orientations, axis=1, keepdims=True)
    norms[norms == 0] = 1.0  # avoid division by zero
    unit_orientations = safe_set_orientations / norms

    # Adjust the scale of cones as needed
    scale = 0.02  
    u = unit_orientations[:, 0] * scale
    v = unit_orientations[:, 1] * scale
    w = unit_orientations[:, 2] * scale

    cone_trace = go.Cone(
        x=safe_set_positions[:, 0],
        y=safe_set_positions[:, 1],
        z=safe_set_positions[:, 2],
        u=u,
        v=v,
        w=w,
        sizemode="absolute",
        sizeref=scale,
        anchor="tail",
        colorscale="Blues",
        showscale=False,
        name="Orientation Cones"
    )

    # Combine all traces into a single figure
    fig = go.Figure(data=[mesh_trace, scatter_trace, cone_trace])
    fig.update_layout(
        title="Interactive Safe Set with Orientation Cones",
        scene=dict(
            xaxis_title="X",
            yaxis_title="Y",
            zaxis_title="Z"
        )
    )
    fig.show()

if __name__ == "__main__":
    plot_interactive_safe_set_with_cones("safe_set_6d_multiple_object_61_20.npz")
