In [1]:
import random
import h5py
import numpy as np
import plotly.graph_objects as go
from scipy.spatial import ConvexHull
from scipy.spatial.transform import Rotation as R
import cv2

###############################################################################
# 1) Cartesian <-> Spherical Conversions
###############################################################################
def cart2sph(x, y, z):
    r = np.sqrt(x**2 + y**2 + z**2)
    az = np.arctan2(y, x)      # in radians
    el = np.arctan2(z, np.sqrt(x**2 + y**2))
    return r, az, el

def sph2cart(r, az, el):
    x = r * np.cos(el) * np.cos(az)
    y = r * np.cos(el) * np.sin(az)
    z = r * np.sin(el)
    return x, y, z

###############################################################################
# 2) Cube Creation (Generate Cube Corners as Candidate Viewpoints)
###############################################################################
def make_cube(cx, cy, cz, rotation, side=0.005):
    half = side / 2.0
    corners = np.array([
        [-half, -half, -half],
        [-half, -half,  half],
        [-half,  half, -half],
        [-half,  half,  half],
        [ half, -half, -half],
        [ half, -half,  half],
        [ half,  half, -half],
        [ half,  half,  half]
    ])
    rotated_corners = rotation.apply(corners)
    rotated_corners += np.array([cx, cy, cz])
    return rotated_corners

###############################################################################
# 3) FoV Checker and Image Generation Helpers
###############################################################################
def generate_image_frame(img_width, img_height, point_coord, dot_dia=10):
    image = np.zeros((img_height, img_width, 3), dtype=np.uint8)
    cv2.circle(image, point_coord, dot_dia//2, (255,255,255), -1)
    return image

def generate_camera_image(camera_pose, camera_fov, point_obj,
                          img_width=640, img_height=480, dot_dia=10):
    cam_pos = np.array(camera_pose[:3])
    cam_quat = camera_pose[3:]
    cam_rot = R.from_quat(cam_quat).as_matrix()
    point_in_cam = np.linalg.inv(cam_rot) @ (np.array(point_obj) - cam_pos)
    h_fov, v_fov = np.radians(camera_fov)
    x_cam, y_cam, z_cam = point_in_cam
    h_angle = np.arctan2(y_cam, x_cam)
    v_angle = np.arctan2(z_cam, x_cam)
    img_x = int((h_angle + h_fov/2) / h_fov * img_width)
    img_y = int((v_angle + v_fov/2) / v_fov * img_height)
    return generate_image_frame(img_width, img_height, (img_x, img_y), dot_dia)

def is_inFOV(camera_pose, camera_fov, point_obj):
    cam_pos = np.array(camera_pose[:3])
    if np.linalg.norm(np.array(point_obj) - cam_pos) < 1e-4:
        return True
    cam_quat = camera_pose[3:]
    cam_rot = R.from_quat(cam_quat).as_matrix()
    point_in_cam = np.linalg.inv(cam_rot) @ (np.array(point_obj) - cam_pos)
    h_fov, v_fov = np.radians(camera_fov)
    x_cam, y_cam, z_cam = point_in_cam
    if x_cam <= 0:
        return False
    h_angle = np.arctan2(y_cam, x_cam)
    v_angle = np.arctan2(z_cam, x_cam)
    return (abs(h_angle) <= h_fov/2) and (abs(v_angle) <= v_fov/2)

###############################################################################
# 4) Normalize Quaternion
###############################################################################
def normalize_quaternion(q):
    norm = np.linalg.norm(q)
    if norm < 1e-6:
        return np.array([0, 0, 0, 1])
    return q / norm

###############################################################################
# 5) Convert a 7D Pose to a Minimal 6D Representation
###############################################################################
def pose7d_to_6d(pose7d):
    """
    Convert a 7D pose [x, y, z, qx, qy, qz, qw] into a 6D pose consisting of:
      - Position: x, y, z
      - Orientation: a 3D rotation vector (obtained from the quaternion)
    """
    pos = pose7d[:3]
    quat = normalize_quaternion(pose7d[3:7])
    rotvec = R.from_quat(quat).as_rotvec()  # rotation vector representation (3D)
    return np.hstack([pos, rotvec])

###############################################################################
# 6) Generate Candidates for Each Demo Using Cube Method and FoV Filtering
###############################################################################
def generate_candidates_for_demo(positions, num_samples=5):
    """
    For a given main trajectory (positions: (N,3)), generate candidate 7D poses 
    (position + quaternion) using the cube method and random variations.
    Returns a list of candidate dictionaries.
    """
    n_points = len(positions)
    deg_to_rad = np.pi/180.0
    max_deg = 5.0            # in degrees
    max_side = 0.1
    max_roll = 50 * deg_to_rad
    max_pitch = 10 * deg_to_rad
    max_yaw = 10 * deg_to_rad
    lambda_decay = 1.5
    camera_fov = (80,80)
    img_width, img_height, dot_dia = 640, 480, 10

    goal_pos = positions[-1]
    goal_quat = np.array([0,0,0,1])
    
    dataset = []
    total_candidates = 0
    in_fov_candidates = 0

    for i in range(n_points):
        x, y, z = positions[i]
        dx = x - goal_pos[0]
        dy = y - goal_pos[1]
        dz = z - goal_pos[2]
        r_val, az, el = cart2sph(dx, dy, dz)
        for sample in range(num_samples):
            frac = np.exp(-lambda_decay*(i/(n_points-1))) if n_points > 1 else 1.0
            delta_angle = frac * (max_deg * deg_to_rad)
            az_var = az + random.choice([-1,1]) * delta_angle
            el_var = el + random.choice([-1,1]) * delta_angle
            dx_var, dy_var, dz_var = sph2cart(r_val, az_var, el_var)
            x_new = goal_pos[0] + dx_var
            y_new = goal_pos[1] + dy_var
            z_new = goal_pos[2] + dz_var

            roll_off = frac * max_roll
            pitch_off = frac * max_pitch
            yaw_off = frac * max_yaw
            droll = random.choice([-1,1]) * roll_off
            dpitch = random.choice([-1,1]) * pitch_off
            dyaw = random.choice([-1,1]) * yaw_off

            rot_obj = R.from_euler('zyx', [droll, dpitch, dyaw])
            quat = rot_obj.as_quat()
            quat = normalize_quaternion(quat)
            candidates = make_cube(x_new, y_new, z_new, rot_obj, side=frac*max_side)
            for corner in candidates:
                total_candidates += 1
                candidate_pose = [corner[0], corner[1], corner[2],
                                  quat[0], quat[1], quat[2], quat[3]]
                cam_pose = candidate_pose
                in_fov = is_inFOV(cam_pose, camera_fov, goal_pos)
                if in_fov:
                    in_fov_candidates += 1
                img = generate_camera_image(cam_pose, camera_fov, goal_pos, img_width, img_height, dot_dia)
                dataset.append({
                    "trajectory_idx": i,
                    "pose_7d": candidate_pose,
                    "in_fov": in_fov,
                    "image": img
                })
    print(f"Generated {total_candidates} candidate viewpoints; {in_fov_candidates} passed FoV check.")
    filtered = [vp for vp in dataset if vp["in_fov"]]
    return filtered

###############################################################################
# 7) Compute Global Safe Set and Convex Hull in 6D (Using Minimal Representation)
###############################################################################
def compute_convex_hull_all(all_candidates, all_main_states):
    """
    Merge candidate 7D poses from all demos with the main trajectory 7D poses (from joint_states),
    convert them to minimal 6D representation (position + rotation vector), and compute a convex hull.
    """
    # Convert candidate poses from 7D to 6D.
    candidate_poses_6d = np.array([pose7d_to_6d(vp["pose_7d"]) for vp in all_candidates])
    main_poses_list = []
    for states in all_main_states:
        # states is assumed to be of shape (T,7). Normalize quaternions.
        states[:, 3:7] = np.array([normalize_quaternion(q) for q in states[:, 3:7]])
        # Convert each 7D state to 6D.
        poses_6d = np.array([pose7d_to_6d(p) for p in states])
        # Optionally subsample
        main_poses_list.append(poses_6d[::2])
    main_poses_6d = np.vstack(main_poses_list)
    all_poses_6d = np.vstack((main_poses_6d, candidate_poses_6d))
    # Add a tiny jitter to avoid degeneracy.
    all_poses_6d += np.random.uniform(-1e-8, 1e-8, all_poses_6d.shape)
    hull = ConvexHull(all_poses_6d, qhull_options="QJ")
    return all_poses_6d, hull

###############################################################################
# 8) Save Safe Set and Convex Hull to File and Sample Random 6D Poses for Testing
###############################################################################
def save_and_test_safe_set(all_poses_6d, hull, num_samples=20):
    # Save the 6D safe set and hull equations.
    np.savez("safe_set_6d.npz", safe_set=all_poses_6d, hull_equations=hull.equations, hull_vertices=hull.vertices)
    print("Safe set and convex hull saved to 'safe_set_6d.npz'.")
    mins = np.min(all_poses_6d, axis=0)
    maxs = np.max(all_poses_6d, axis=0)
    samples = []
    for _ in range(num_samples):
        sample = np.random.uniform(mins, maxs)
        samples.append(sample)
    samples = np.array(samples)
    tol = 1e-8
    sample_results = []
    for s in samples:
        inside = np.all(np.dot(hull.equations[:, :-1], s) + hull.equations[:, -1] <= tol)
        sample_results.append(inside)
    sample_results = np.array(sample_results)
    for i, inside in enumerate(sample_results):
        print(f"Sample {i+1} is {'inside' if inside else 'outside'} the safe set.")
    return samples, sample_results

###############################################################################
# 9) Define Function to Check a Given 7D Pose Against the Safe Set (Converted to 6D)
###############################################################################
def pose_safe_distance(hull, pose7d, tol=1e-8):
    """
    Convert a 7D pose to 6D minimal representation and compute the signed distance
    from the pose to the safe set boundary.
    """
    s = pose7d_to_6d(pose7d)
    A = hull.equations[:, :-1]
    b = hull.equations[:, -1]
    norms = np.linalg.norm(A, axis=1)
    distances = (A.dot(s) + b) / norms
    safe_distance = np.max(distances)
    inside = safe_distance <= tol
    return inside, safe_distance

###############################################################################
# 10) Main Execution: Process All Demos and Generate Global Safe Set in 6D
###############################################################################
def main_execution():
    file_path = "duck_optimal_30.hdf5"
    all_candidates = []
    all_main_states = []  # each element: full 7D joint_states for a demo
    
    with h5py.File(file_path, "r") as f:
        demo_keys = list(f["data"].keys())
        print("Available Demos:", demo_keys)
        for demo_name in demo_keys:
            demo_data = f["data"][demo_name]
            obs_group = demo_data["obs"]
            if "joint_states" not in obs_group:
                print(f"Demo {demo_name} has no joint_states. Skipping.")
                continue
            states = obs_group["joint_states"][:]  # shape (T,7)
            all_main_states.append(states)
            print(f"Processing Demo {demo_name} with {states.shape[0]} 7D trajectory points.")
            positions = states[:, :3]
            candidates = generate_candidates_for_demo(positions, num_samples=5)
            if not candidates:
                print(f"No candidate points for Demo {demo_name}.")
                continue
            print(f"Demo {demo_name}: {len(candidates)} candidate points passed FoV check.")
            all_candidates.extend(candidates)
    
    if not all_candidates:
        print("No candidates from any demo.")
        return
    
    print("Safe set generation started...")
    all_poses_6d, hull = compute_convex_hull_all(all_candidates, all_main_states)
    print("Safe set generation ended...")
    save_and_test_safe_set(all_poses_6d, hull, num_samples=20)
    # Example: Test a specific pose.
    # test_pose = all_main_states[0][0]  # a 7D pose from the first demo
    # inside, safe_dist = pose_safe_distance(hull, test_pose)
    # print("Test pose:", test_pose)
    # print("Test pose is {} the safe set with safe distance {:.6f}".format("inside" if inside else "outside", safe_dist))
    
if __name__ == "__main__":
    main_execution()


Available Demos: ['demo_0', 'demo_1', 'demo_10', 'demo_11', 'demo_12', 'demo_13', 'demo_14', 'demo_15', 'demo_16', 'demo_17', 'demo_18', 'demo_19', 'demo_2', 'demo_20', 'demo_21', 'demo_22', 'demo_23', 'demo_24', 'demo_25', 'demo_26', 'demo_27', 'demo_28', 'demo_29', 'demo_3', 'demo_4', 'demo_5', 'demo_6', 'demo_7', 'demo_8', 'demo_9']
Processing Demo demo_0 with 200 7D trajectory points.
Generated 8000 candidate viewpoints; 195 passed FoV check.
Demo demo_0: 195 candidate points passed FoV check.
Processing Demo demo_1 with 174 7D trajectory points.
Generated 6960 candidate viewpoints; 125 passed FoV check.
Demo demo_1: 125 candidate points passed FoV check.
Processing Demo demo_10 with 194 7D trajectory points.
Generated 7760 candidate viewpoints; 166 passed FoV check.
Demo demo_10: 166 candidate points passed FoV check.
Processing Demo demo_11 with 172 7D trajectory points.
Generated 6880 candidate viewpoints; 249 passed FoV check.
Demo demo_11: 249 candidate points passed FoV check